From 4bf1a5e4ef726f26ac5013da22c50f095355bf20 Mon Sep 17 00:00:00 2001 From: Laurent Date: Tue, 26 May 2026 21:41:26 +0200 Subject: [PATCH] feat(remote): add RemoteClient with Bonjour discovery, DB download, NDJSON command channel --- Music/Remote/RemoteClient.swift | 393 ++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 Music/Remote/RemoteClient.swift diff --git a/Music/Remote/RemoteClient.swift b/Music/Remote/RemoteClient.swift new file mode 100644 index 0000000..7364dce --- /dev/null +++ b/Music/Remote/RemoteClient.swift @@ -0,0 +1,393 @@ +import Foundation +import Network +import os + +@MainActor +@Observable +final class RemoteClient: RemoteCommandSender { + + // MARK: - Public State + + var connectionState = ConnectionState.disconnected + var discoveredHosts: [(name: String, endpoint: NWEndpoint)] = [] + var onPlaybackState: ((PlaybackStatePayload) -> Void)? + var onDBReady: (() -> Void)? + + // MARK: - Private State + + private var browser: NWBrowser? + private var commandTransport: NDJSONTransport? + private var hostEndpoint: NWEndpoint? + private var pingTimer: Timer? + private var missedPings = 0 + + private let logger = RemoteLogger.client + + // MARK: - Remote DB Path + + static var remoteDBPath: String { + let appSupport = FileManager.default.urls( + for: .applicationSupportDirectory, in: .userDomainMask + ).first!.appendingPathComponent("Music", isDirectory: true) + return appSupport.appendingPathComponent("remote_db.sqlite").path + } + + // MARK: - Discovery + + /// Start scanning for `_musicremote._tcp` services on the local network. + func startDiscovery() { + let descriptor = NWBrowser.Descriptor.bonjour(type: "_musicremote._tcp", domain: nil) + let browser = NWBrowser(for: descriptor, using: .tcp) + + browser.stateUpdateHandler = { [weak self] state in + Task { @MainActor [weak self] in + guard let self else { return } + switch state { + case .ready: + self.logger.info("Browser ready") + case .failed(let error): + self.logger.error("Browser failed: \(error.localizedDescription)") + case .cancelled: + self.logger.info("Browser cancelled") + default: + break + } + } + } + + browser.browseResultsChangedHandler = { [weak self] results, _ in + Task { @MainActor [weak self] in + guard let self else { return } + self.discoveredHosts = results.compactMap { result in + if case .service(let name, _, _, _) = result.endpoint { + return (name: name, endpoint: result.endpoint) + } + return nil + } + self.logger.info("Discovered \(self.discoveredHosts.count) host(s)") + } + } + + browser.start(queue: .main) + self.browser = browser + transition(to: .discovering) + } + + /// Stop scanning and clear discovered hosts. + func stopDiscovery() { + browser?.cancel() + browser = nil + discoveredHosts = [] + } + + // MARK: - Connection + + /// Connect to a discovered host: download its DB, then establish the command channel. + func connect(to host: (name: String, endpoint: NWEndpoint)) { + stopDiscovery() + hostEndpoint = host.endpoint + transition(to: .foundHost(host.name)) + transition(to: .downloadingDB) + downloadDatabase(on: host.endpoint, hostName: host.name) + } + + /// Tear down everything: command channel, timers, and the local remote DB file. + func disconnect() { + pingTimer?.invalidate() + pingTimer = nil + missedPings = 0 + commandTransport?.close() + commandTransport = nil + hostEndpoint = nil + stopDiscovery() + deleteRemoteDB() + transition(to: .disconnected) + } + + // MARK: - RemoteCommandSender + + func sendCommand(_ command: RemoteCommand) { + guard let transport = commandTransport else { + logger.warning("Cannot send command — no command channel") + return + } + transport.send(command) + } + + // MARK: - DB Download + + /// Open a TCP connection to the host, send `GET /db`, and save the response body to disk. + private func downloadDatabase(on endpoint: NWEndpoint, hostName: String) { + let connection = NWConnection(to: endpoint, using: .tcp) + + connection.stateUpdateHandler = { [weak self] state in + Task { @MainActor [weak self] in + guard let self else { return } + switch state { + case .ready: + self.logger.info("DB connection ready") + self.sendDBRequest(on: connection, hostName: hostName) + case .failed(let error): + self.logger.error("DB connection failed: \(error.localizedDescription)") + self.transition(to: .disconnected) + default: + break + } + } + } + + connection.start(queue: .main) + } + + /// Send the HTTP GET request for the database. + private func sendDBRequest(on connection: NWConnection, hostName: String) { + let request = "GET /db HTTP/1.1\r\nHost: \(hostName)\r\nConnection: close\r\n\r\n" + connection.send(content: Data(request.utf8), completion: .contentProcessed { [weak self] error in + Task { @MainActor [weak self] in + guard let self else { return } + if let error { + self.logger.error("Failed to send DB request: \(error.localizedDescription)") + self.transition(to: .disconnected) + return + } + self.receiveDBResponse(on: connection, accumulated: Data(), hostName: hostName) + } + }) + } + + /// Accumulate the full HTTP response for the DB download, then strip headers and save. + private func receiveDBResponse(on connection: NWConnection, accumulated: Data, hostName: String) { + connection.receive(minimumIncompleteLength: 1, maximumLength: 1_048_576) { [weak self] data, _, isComplete, error in + Task { @MainActor [weak self] in + guard let self else { return } + + var buffer = accumulated + if let data { + buffer.append(data) + } + + if isComplete || error != nil { + // Done receiving — extract body after HTTP headers + connection.cancel() + self.handleDBData(buffer, hostName: hostName) + } else { + // Keep reading + self.receiveDBResponse(on: connection, accumulated: buffer, hostName: hostName) + } + } + } + } + + /// Strip the HTTP headers from the response and write the SQLite body to disk. + private func handleDBData(_ data: Data, hostName: String) { + // Find the header/body separator: \r\n\r\n + let separator: [UInt8] = [0x0D, 0x0A, 0x0D, 0x0A] + guard let separatorRange = data.range(of: Data(separator)) else { + logger.error("DB response missing HTTP header separator") + transition(to: .disconnected) + return + } + + let body = data[separatorRange.upperBound...] + guard !body.isEmpty else { + logger.error("DB response body is empty") + transition(to: .disconnected) + return + } + + // Ensure the directory exists + let dirURL = URL(fileURLWithPath: Self.remoteDBPath).deletingLastPathComponent() + try? FileManager.default.createDirectory(at: dirURL, withIntermediateDirectories: true) + + do { + try body.write(to: URL(fileURLWithPath: Self.remoteDBPath)) + logger.info("Database saved (\(body.count) bytes) to \(Self.remoteDBPath)") + connectCommandChannel(hostName: hostName) + } catch { + logger.error("Failed to write DB: \(error.localizedDescription)") + transition(to: .disconnected) + } + } + + // MARK: - Command Channel + + /// Open a second TCP connection to the same endpoint and upgrade it to the command channel. + private func connectCommandChannel(hostName: String) { + guard let endpoint = hostEndpoint else { + logger.error("No endpoint stored for command channel") + transition(to: .disconnected) + return + } + + transition(to: .connectingCommandChannel) + + let connection = NWConnection(to: endpoint, using: .tcp) + + connection.stateUpdateHandler = { [weak self] state in + Task { @MainActor [weak self] in + guard let self else { return } + switch state { + case .ready: + self.logger.info("Command channel TCP ready") + self.sendCmdRequest(on: connection, hostName: hostName) + case .failed(let error): + self.logger.error("Command channel connection failed: \(error.localizedDescription)") + self.transition(to: .disconnected) + default: + break + } + } + } + + connection.start(queue: .main) + } + + /// Send the HTTP GET request to upgrade to the command channel. + private func sendCmdRequest(on connection: NWConnection, hostName: String) { + let request = "GET /cmd HTTP/1.1\r\nHost: \(hostName)\r\nConnection: keep-alive\r\n\r\n" + connection.send(content: Data(request.utf8), completion: .contentProcessed { [weak self] error in + Task { @MainActor [weak self] in + guard let self else { return } + if let error { + self.logger.error("Failed to send cmd request: \(error.localizedDescription)") + self.transition(to: .disconnected) + return + } + self.receiveCmdHeader(on: connection, accumulated: Data(), hostName: hostName) + } + }) + } + + /// Read the HTTP 200 response header before upgrading to NDJSON streaming. + private func receiveCmdHeader(on connection: NWConnection, accumulated: Data, hostName: String) { + connection.receive(minimumIncompleteLength: 1, maximumLength: 65_536) { [weak self] data, _, _, error in + Task { @MainActor [weak self] in + guard let self else { return } + + if let error { + self.logger.error("Failed to read cmd header: \(error.localizedDescription)") + self.transition(to: .disconnected) + return + } + + var buffer = accumulated + if let data { + buffer.append(data) + } + + // Check if we've received the full HTTP header + let separator: [UInt8] = [0x0D, 0x0A, 0x0D, 0x0A] + if buffer.range(of: Data(separator)) != nil { + self.commandChannelReady(connection: connection, hostName: hostName) + } else { + // Keep reading until we get the full header + self.receiveCmdHeader(on: connection, accumulated: buffer, hostName: hostName) + } + } + } + } + + /// The command channel HTTP handshake is complete — set up NDJSON transport. + private func commandChannelReady(connection: NWConnection, hostName: String) { + let transport = NDJSONTransport(connection: connection, logger: logger) + self.commandTransport = transport + + transport.onLine = { [weak self] line in + self?.handleEventLine(line) + } + + transport.onClose = { [weak self] in + Task { @MainActor [weak self] in + guard let self else { return } + self.logger.info("Command channel closed by host") + self.pingTimer?.invalidate() + self.pingTimer = nil + self.commandTransport = nil + if self.connectionState.isConnected { + self.transition(to: .connectionLost("Host closed connection")) + } + } + } + + transport.startReceiving() + + // Send handshake + let appVersion = Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "unknown" + let handshake = HandshakeMessage(protocolVersion: RemoteProtocolVersion, appVersion: appVersion) + transport.send(handshake) + + // Start keep-alive ping timer + missedPings = 0 + pingTimer = Timer.scheduledTimer(withTimeInterval: 5.0, repeats: true) { [weak self] _ in + Task { @MainActor [weak self] in + guard let self else { return } + self.missedPings += 1 + if self.missedPings >= 3 { + self.logger.warning("Keep-alive timeout — \(self.missedPings) consecutive pings unanswered") + self.pingTimer?.invalidate() + self.pingTimer = nil + self.commandTransport?.close() + self.commandTransport = nil + self.transition(to: .connectionLost("Host not responding")) + } + } + } + + transition(to: .connected(hostName)) + logger.info("Command channel established with \(hostName)") + } + + // MARK: - Event Handling + + /// Parse an incoming NDJSON line as a `HostEvent` and dispatch to the appropriate callback. + private func handleEventLine(_ line: String) { + guard let data = line.data(using: .utf8) else { return } + let decoder = JSONDecoder() + + do { + let event = try decoder.decode(HostEvent.self, from: data) + switch event { + case .playbackState(let payload): + missedPings = 0 + onPlaybackState?(payload) + case .dbReady: + onDBReady?() + case .error(let message): + logger.error("Host error: \(message)") + } + } catch { + logger.error("Failed to decode host event: \(error.localizedDescription)") + } + } + + // MARK: - State Machine + + /// Transition to a new connection state, validating via `canTransition(to:)`. + private func transition(to newState: ConnectionState) { + let oldState = connectionState + guard oldState != newState else { return } + + guard oldState.canTransition(to: newState) else { + logger.warning("Invalid transition: \(String(describing: oldState)) → \(String(describing: newState))") + return + } + + logger.info("State: \(String(describing: oldState)) → \(String(describing: newState))") + connectionState = newState + } + + // MARK: - Helpers + + /// Delete the local remote database file if it exists. + private func deleteRemoteDB() { + let path = Self.remoteDBPath + if FileManager.default.fileExists(atPath: path) { + do { + try FileManager.default.removeItem(atPath: path) + logger.info("Deleted remote DB at \(path)") + } catch { + logger.error("Failed to delete remote DB: \(error.localizedDescription)") + } + } + } +}