feat(remote): add RemoteClient with Bonjour discovery, DB download, NDJSON command channel

feat/music-streaming
Laurent 1 month ago
parent 463ecb518b
commit 4bf1a5e4ef
  1. 393
      Music/Remote/RemoteClient.swift

@ -0,0 +1,393 @@
import Foundation
import Network
import os
@MainActor
@Observable
final class RemoteClient: RemoteCommandSender {
// MARK: - Public State
var connectionState = ConnectionState.disconnected
var discoveredHosts: [(name: String, endpoint: NWEndpoint)] = []
var onPlaybackState: ((PlaybackStatePayload) -> Void)?
var onDBReady: (() -> Void)?
// MARK: - Private State
private var browser: NWBrowser?
private var commandTransport: NDJSONTransport?
private var hostEndpoint: NWEndpoint?
private var pingTimer: Timer?
private var missedPings = 0
private let logger = RemoteLogger.client
// MARK: - Remote DB Path
static var remoteDBPath: String {
let appSupport = FileManager.default.urls(
for: .applicationSupportDirectory, in: .userDomainMask
).first!.appendingPathComponent("Music", isDirectory: true)
return appSupport.appendingPathComponent("remote_db.sqlite").path
}
// MARK: - Discovery
/// Start scanning for `_musicremote._tcp` services on the local network.
func startDiscovery() {
let descriptor = NWBrowser.Descriptor.bonjour(type: "_musicremote._tcp", domain: nil)
let browser = NWBrowser(for: descriptor, using: .tcp)
browser.stateUpdateHandler = { [weak self] state in
Task { @MainActor [weak self] in
guard let self else { return }
switch state {
case .ready:
self.logger.info("Browser ready")
case .failed(let error):
self.logger.error("Browser failed: \(error.localizedDescription)")
case .cancelled:
self.logger.info("Browser cancelled")
default:
break
}
}
}
browser.browseResultsChangedHandler = { [weak self] results, _ in
Task { @MainActor [weak self] in
guard let self else { return }
self.discoveredHosts = results.compactMap { result in
if case .service(let name, _, _, _) = result.endpoint {
return (name: name, endpoint: result.endpoint)
}
return nil
}
self.logger.info("Discovered \(self.discoveredHosts.count) host(s)")
}
}
browser.start(queue: .main)
self.browser = browser
transition(to: .discovering)
}
/// Stop scanning and clear discovered hosts.
func stopDiscovery() {
browser?.cancel()
browser = nil
discoveredHosts = []
}
// MARK: - Connection
/// Connect to a discovered host: download its DB, then establish the command channel.
func connect(to host: (name: String, endpoint: NWEndpoint)) {
stopDiscovery()
hostEndpoint = host.endpoint
transition(to: .foundHost(host.name))
transition(to: .downloadingDB)
downloadDatabase(on: host.endpoint, hostName: host.name)
}
/// Tear down everything: command channel, timers, and the local remote DB file.
func disconnect() {
pingTimer?.invalidate()
pingTimer = nil
missedPings = 0
commandTransport?.close()
commandTransport = nil
hostEndpoint = nil
stopDiscovery()
deleteRemoteDB()
transition(to: .disconnected)
}
// MARK: - RemoteCommandSender
func sendCommand(_ command: RemoteCommand) {
guard let transport = commandTransport else {
logger.warning("Cannot send command — no command channel")
return
}
transport.send(command)
}
// MARK: - DB Download
/// Open a TCP connection to the host, send `GET /db`, and save the response body to disk.
private func downloadDatabase(on endpoint: NWEndpoint, hostName: String) {
let connection = NWConnection(to: endpoint, using: .tcp)
connection.stateUpdateHandler = { [weak self] state in
Task { @MainActor [weak self] in
guard let self else { return }
switch state {
case .ready:
self.logger.info("DB connection ready")
self.sendDBRequest(on: connection, hostName: hostName)
case .failed(let error):
self.logger.error("DB connection failed: \(error.localizedDescription)")
self.transition(to: .disconnected)
default:
break
}
}
}
connection.start(queue: .main)
}
/// Send the HTTP GET request for the database.
private func sendDBRequest(on connection: NWConnection, hostName: String) {
let request = "GET /db HTTP/1.1\r\nHost: \(hostName)\r\nConnection: close\r\n\r\n"
connection.send(content: Data(request.utf8), completion: .contentProcessed { [weak self] error in
Task { @MainActor [weak self] in
guard let self else { return }
if let error {
self.logger.error("Failed to send DB request: \(error.localizedDescription)")
self.transition(to: .disconnected)
return
}
self.receiveDBResponse(on: connection, accumulated: Data(), hostName: hostName)
}
})
}
/// Accumulate the full HTTP response for the DB download, then strip headers and save.
private func receiveDBResponse(on connection: NWConnection, accumulated: Data, hostName: String) {
connection.receive(minimumIncompleteLength: 1, maximumLength: 1_048_576) { [weak self] data, _, isComplete, error in
Task { @MainActor [weak self] in
guard let self else { return }
var buffer = accumulated
if let data {
buffer.append(data)
}
if isComplete || error != nil {
// Done receiving extract body after HTTP headers
connection.cancel()
self.handleDBData(buffer, hostName: hostName)
} else {
// Keep reading
self.receiveDBResponse(on: connection, accumulated: buffer, hostName: hostName)
}
}
}
}
/// Strip the HTTP headers from the response and write the SQLite body to disk.
private func handleDBData(_ data: Data, hostName: String) {
// Find the header/body separator: \r\n\r\n
let separator: [UInt8] = [0x0D, 0x0A, 0x0D, 0x0A]
guard let separatorRange = data.range(of: Data(separator)) else {
logger.error("DB response missing HTTP header separator")
transition(to: .disconnected)
return
}
let body = data[separatorRange.upperBound...]
guard !body.isEmpty else {
logger.error("DB response body is empty")
transition(to: .disconnected)
return
}
// Ensure the directory exists
let dirURL = URL(fileURLWithPath: Self.remoteDBPath).deletingLastPathComponent()
try? FileManager.default.createDirectory(at: dirURL, withIntermediateDirectories: true)
do {
try body.write(to: URL(fileURLWithPath: Self.remoteDBPath))
logger.info("Database saved (\(body.count) bytes) to \(Self.remoteDBPath)")
connectCommandChannel(hostName: hostName)
} catch {
logger.error("Failed to write DB: \(error.localizedDescription)")
transition(to: .disconnected)
}
}
// MARK: - Command Channel
/// Open a second TCP connection to the same endpoint and upgrade it to the command channel.
private func connectCommandChannel(hostName: String) {
guard let endpoint = hostEndpoint else {
logger.error("No endpoint stored for command channel")
transition(to: .disconnected)
return
}
transition(to: .connectingCommandChannel)
let connection = NWConnection(to: endpoint, using: .tcp)
connection.stateUpdateHandler = { [weak self] state in
Task { @MainActor [weak self] in
guard let self else { return }
switch state {
case .ready:
self.logger.info("Command channel TCP ready")
self.sendCmdRequest(on: connection, hostName: hostName)
case .failed(let error):
self.logger.error("Command channel connection failed: \(error.localizedDescription)")
self.transition(to: .disconnected)
default:
break
}
}
}
connection.start(queue: .main)
}
/// Send the HTTP GET request to upgrade to the command channel.
private func sendCmdRequest(on connection: NWConnection, hostName: String) {
let request = "GET /cmd HTTP/1.1\r\nHost: \(hostName)\r\nConnection: keep-alive\r\n\r\n"
connection.send(content: Data(request.utf8), completion: .contentProcessed { [weak self] error in
Task { @MainActor [weak self] in
guard let self else { return }
if let error {
self.logger.error("Failed to send cmd request: \(error.localizedDescription)")
self.transition(to: .disconnected)
return
}
self.receiveCmdHeader(on: connection, accumulated: Data(), hostName: hostName)
}
})
}
/// Read the HTTP 200 response header before upgrading to NDJSON streaming.
private func receiveCmdHeader(on connection: NWConnection, accumulated: Data, hostName: String) {
connection.receive(minimumIncompleteLength: 1, maximumLength: 65_536) { [weak self] data, _, _, error in
Task { @MainActor [weak self] in
guard let self else { return }
if let error {
self.logger.error("Failed to read cmd header: \(error.localizedDescription)")
self.transition(to: .disconnected)
return
}
var buffer = accumulated
if let data {
buffer.append(data)
}
// Check if we've received the full HTTP header
let separator: [UInt8] = [0x0D, 0x0A, 0x0D, 0x0A]
if buffer.range(of: Data(separator)) != nil {
self.commandChannelReady(connection: connection, hostName: hostName)
} else {
// Keep reading until we get the full header
self.receiveCmdHeader(on: connection, accumulated: buffer, hostName: hostName)
}
}
}
}
/// The command channel HTTP handshake is complete set up NDJSON transport.
private func commandChannelReady(connection: NWConnection, hostName: String) {
let transport = NDJSONTransport(connection: connection, logger: logger)
self.commandTransport = transport
transport.onLine = { [weak self] line in
self?.handleEventLine(line)
}
transport.onClose = { [weak self] in
Task { @MainActor [weak self] in
guard let self else { return }
self.logger.info("Command channel closed by host")
self.pingTimer?.invalidate()
self.pingTimer = nil
self.commandTransport = nil
if self.connectionState.isConnected {
self.transition(to: .connectionLost("Host closed connection"))
}
}
}
transport.startReceiving()
// Send handshake
let appVersion = Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "unknown"
let handshake = HandshakeMessage(protocolVersion: RemoteProtocolVersion, appVersion: appVersion)
transport.send(handshake)
// Start keep-alive ping timer
missedPings = 0
pingTimer = Timer.scheduledTimer(withTimeInterval: 5.0, repeats: true) { [weak self] _ in
Task { @MainActor [weak self] in
guard let self else { return }
self.missedPings += 1
if self.missedPings >= 3 {
self.logger.warning("Keep-alive timeout — \(self.missedPings) consecutive pings unanswered")
self.pingTimer?.invalidate()
self.pingTimer = nil
self.commandTransport?.close()
self.commandTransport = nil
self.transition(to: .connectionLost("Host not responding"))
}
}
}
transition(to: .connected(hostName))
logger.info("Command channel established with \(hostName)")
}
// MARK: - Event Handling
/// Parse an incoming NDJSON line as a `HostEvent` and dispatch to the appropriate callback.
private func handleEventLine(_ line: String) {
guard let data = line.data(using: .utf8) else { return }
let decoder = JSONDecoder()
do {
let event = try decoder.decode(HostEvent.self, from: data)
switch event {
case .playbackState(let payload):
missedPings = 0
onPlaybackState?(payload)
case .dbReady:
onDBReady?()
case .error(let message):
logger.error("Host error: \(message)")
}
} catch {
logger.error("Failed to decode host event: \(error.localizedDescription)")
}
}
// MARK: - State Machine
/// Transition to a new connection state, validating via `canTransition(to:)`.
private func transition(to newState: ConnectionState) {
let oldState = connectionState
guard oldState != newState else { return }
guard oldState.canTransition(to: newState) else {
logger.warning("Invalid transition: \(String(describing: oldState))\(String(describing: newState))")
return
}
logger.info("State: \(String(describing: oldState))\(String(describing: newState))")
connectionState = newState
}
// MARK: - Helpers
/// Delete the local remote database file if it exists.
private func deleteRemoteDB() {
let path = Self.remoteDBPath
if FileManager.default.fileExists(atPath: path) {
do {
try FileManager.default.removeItem(atPath: path)
logger.info("Deleted remote DB at \(path)")
} catch {
logger.error("Failed to delete remote DB: \(error.localizedDescription)")
}
}
}
}
Loading…
Cancel
Save