You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
393 lines
15 KiB
393 lines
15 KiB
import Foundation
|
|
import Network
|
|
import os
|
|
|
|
@MainActor
|
|
@Observable
|
|
final class RemoteClient: RemoteCommandSender {
|
|
|
|
// MARK: - Public State
|
|
|
|
var connectionState = ConnectionState.disconnected
|
|
var discoveredHosts: [(name: String, endpoint: NWEndpoint)] = []
|
|
var onPlaybackState: ((PlaybackStatePayload) -> Void)?
|
|
var onDBReady: (() -> Void)?
|
|
|
|
// MARK: - Private State
|
|
|
|
private var browser: NWBrowser?
|
|
private var commandTransport: NDJSONTransport?
|
|
private var hostEndpoint: NWEndpoint?
|
|
private var pingTimer: Timer?
|
|
private var missedPings = 0
|
|
|
|
private let logger = RemoteLogger.client
|
|
|
|
// MARK: - Remote DB Path
|
|
|
|
static var remoteDBPath: String {
|
|
let appSupport = FileManager.default.urls(
|
|
for: .applicationSupportDirectory, in: .userDomainMask
|
|
).first!.appendingPathComponent("Music", isDirectory: true)
|
|
return appSupport.appendingPathComponent("remote_db.sqlite").path
|
|
}
|
|
|
|
// MARK: - Discovery
|
|
|
|
/// Start scanning for `_musicremote._tcp` services on the local network.
|
|
func startDiscovery() {
|
|
let descriptor = NWBrowser.Descriptor.bonjour(type: "_musicremote._tcp", domain: nil)
|
|
let browser = NWBrowser(for: descriptor, using: .tcp)
|
|
|
|
browser.stateUpdateHandler = { [weak self] state in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
switch state {
|
|
case .ready:
|
|
self.logger.info("Browser ready")
|
|
case .failed(let error):
|
|
self.logger.error("Browser failed: \(error.localizedDescription)")
|
|
case .cancelled:
|
|
self.logger.info("Browser cancelled")
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
browser.browseResultsChangedHandler = { [weak self] results, _ in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
self.discoveredHosts = results.compactMap { result in
|
|
if case .service(let name, _, _, _) = result.endpoint {
|
|
return (name: name, endpoint: result.endpoint)
|
|
}
|
|
return nil
|
|
}
|
|
self.logger.info("Discovered \(self.discoveredHosts.count) host(s)")
|
|
}
|
|
}
|
|
|
|
browser.start(queue: .main)
|
|
self.browser = browser
|
|
transition(to: .discovering)
|
|
}
|
|
|
|
/// Stop scanning and clear discovered hosts.
|
|
func stopDiscovery() {
|
|
browser?.cancel()
|
|
browser = nil
|
|
discoveredHosts = []
|
|
}
|
|
|
|
// MARK: - Connection
|
|
|
|
/// Connect to a discovered host: download its DB, then establish the command channel.
|
|
func connect(to host: (name: String, endpoint: NWEndpoint)) {
|
|
stopDiscovery()
|
|
hostEndpoint = host.endpoint
|
|
transition(to: .foundHost(host.name))
|
|
transition(to: .downloadingDB)
|
|
downloadDatabase(on: host.endpoint, hostName: host.name)
|
|
}
|
|
|
|
/// Tear down everything: command channel, timers, and the local remote DB file.
|
|
func disconnect() {
|
|
pingTimer?.invalidate()
|
|
pingTimer = nil
|
|
missedPings = 0
|
|
commandTransport?.close()
|
|
commandTransport = nil
|
|
hostEndpoint = nil
|
|
stopDiscovery()
|
|
deleteRemoteDB()
|
|
transition(to: .disconnected)
|
|
}
|
|
|
|
// MARK: - RemoteCommandSender
|
|
|
|
func sendCommand(_ command: RemoteCommand) {
|
|
guard let transport = commandTransport else {
|
|
logger.warning("Cannot send command — no command channel")
|
|
return
|
|
}
|
|
transport.send(command)
|
|
}
|
|
|
|
// MARK: - DB Download
|
|
|
|
/// Open a TCP connection to the host, send `GET /db`, and save the response body to disk.
|
|
private func downloadDatabase(on endpoint: NWEndpoint, hostName: String) {
|
|
let connection = NWConnection(to: endpoint, using: .tcp)
|
|
|
|
connection.stateUpdateHandler = { [weak self] state in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
switch state {
|
|
case .ready:
|
|
self.logger.info("DB connection ready")
|
|
self.sendDBRequest(on: connection, hostName: hostName)
|
|
case .failed(let error):
|
|
self.logger.error("DB connection failed: \(error.localizedDescription)")
|
|
self.transition(to: .disconnected)
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
connection.start(queue: .main)
|
|
}
|
|
|
|
/// Send the HTTP GET request for the database.
|
|
private func sendDBRequest(on connection: NWConnection, hostName: String) {
|
|
let request = "GET /db HTTP/1.1\r\nHost: \(hostName)\r\nConnection: close\r\n\r\n"
|
|
connection.send(content: Data(request.utf8), completion: .contentProcessed { [weak self] error in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
if let error {
|
|
self.logger.error("Failed to send DB request: \(error.localizedDescription)")
|
|
self.transition(to: .disconnected)
|
|
return
|
|
}
|
|
self.receiveDBResponse(on: connection, accumulated: Data(), hostName: hostName)
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Accumulate the full HTTP response for the DB download, then strip headers and save.
|
|
private func receiveDBResponse(on connection: NWConnection, accumulated: Data, hostName: String) {
|
|
connection.receive(minimumIncompleteLength: 1, maximumLength: 1_048_576) { [weak self] data, _, isComplete, error in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
|
|
var buffer = accumulated
|
|
if let data {
|
|
buffer.append(data)
|
|
}
|
|
|
|
if isComplete || error != nil {
|
|
// Done receiving — extract body after HTTP headers
|
|
connection.cancel()
|
|
self.handleDBData(buffer, hostName: hostName)
|
|
} else {
|
|
// Keep reading
|
|
self.receiveDBResponse(on: connection, accumulated: buffer, hostName: hostName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Strip the HTTP headers from the response and write the SQLite body to disk.
|
|
private func handleDBData(_ data: Data, hostName: String) {
|
|
// Find the header/body separator: \r\n\r\n
|
|
let separator: [UInt8] = [0x0D, 0x0A, 0x0D, 0x0A]
|
|
guard let separatorRange = data.range(of: Data(separator)) else {
|
|
logger.error("DB response missing HTTP header separator")
|
|
transition(to: .disconnected)
|
|
return
|
|
}
|
|
|
|
let body = data[separatorRange.upperBound...]
|
|
guard !body.isEmpty else {
|
|
logger.error("DB response body is empty")
|
|
transition(to: .disconnected)
|
|
return
|
|
}
|
|
|
|
// Ensure the directory exists
|
|
let dirURL = URL(fileURLWithPath: Self.remoteDBPath).deletingLastPathComponent()
|
|
try? FileManager.default.createDirectory(at: dirURL, withIntermediateDirectories: true)
|
|
|
|
do {
|
|
try body.write(to: URL(fileURLWithPath: Self.remoteDBPath))
|
|
logger.info("Database saved (\(body.count) bytes) to \(Self.remoteDBPath)")
|
|
connectCommandChannel(hostName: hostName)
|
|
} catch {
|
|
logger.error("Failed to write DB: \(error.localizedDescription)")
|
|
transition(to: .disconnected)
|
|
}
|
|
}
|
|
|
|
// MARK: - Command Channel
|
|
|
|
/// Open a second TCP connection to the same endpoint and upgrade it to the command channel.
|
|
private func connectCommandChannel(hostName: String) {
|
|
guard let endpoint = hostEndpoint else {
|
|
logger.error("No endpoint stored for command channel")
|
|
transition(to: .disconnected)
|
|
return
|
|
}
|
|
|
|
transition(to: .connectingCommandChannel)
|
|
|
|
let connection = NWConnection(to: endpoint, using: .tcp)
|
|
|
|
connection.stateUpdateHandler = { [weak self] state in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
switch state {
|
|
case .ready:
|
|
self.logger.info("Command channel TCP ready")
|
|
self.sendCmdRequest(on: connection, hostName: hostName)
|
|
case .failed(let error):
|
|
self.logger.error("Command channel connection failed: \(error.localizedDescription)")
|
|
self.transition(to: .disconnected)
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
connection.start(queue: .main)
|
|
}
|
|
|
|
/// Send the HTTP GET request to upgrade to the command channel.
|
|
private func sendCmdRequest(on connection: NWConnection, hostName: String) {
|
|
let request = "GET /cmd HTTP/1.1\r\nHost: \(hostName)\r\nConnection: keep-alive\r\n\r\n"
|
|
connection.send(content: Data(request.utf8), completion: .contentProcessed { [weak self] error in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
if let error {
|
|
self.logger.error("Failed to send cmd request: \(error.localizedDescription)")
|
|
self.transition(to: .disconnected)
|
|
return
|
|
}
|
|
self.receiveCmdHeader(on: connection, accumulated: Data(), hostName: hostName)
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Read the HTTP 200 response header before upgrading to NDJSON streaming.
|
|
private func receiveCmdHeader(on connection: NWConnection, accumulated: Data, hostName: String) {
|
|
connection.receive(minimumIncompleteLength: 1, maximumLength: 65_536) { [weak self] data, _, _, error in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
|
|
if let error {
|
|
self.logger.error("Failed to read cmd header: \(error.localizedDescription)")
|
|
self.transition(to: .disconnected)
|
|
return
|
|
}
|
|
|
|
var buffer = accumulated
|
|
if let data {
|
|
buffer.append(data)
|
|
}
|
|
|
|
// Check if we've received the full HTTP header
|
|
let separator: [UInt8] = [0x0D, 0x0A, 0x0D, 0x0A]
|
|
if buffer.range(of: Data(separator)) != nil {
|
|
self.commandChannelReady(connection: connection, hostName: hostName)
|
|
} else {
|
|
// Keep reading until we get the full header
|
|
self.receiveCmdHeader(on: connection, accumulated: buffer, hostName: hostName)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// The command channel HTTP handshake is complete — set up NDJSON transport.
|
|
private func commandChannelReady(connection: NWConnection, hostName: String) {
|
|
let transport = NDJSONTransport(connection: connection, logger: logger)
|
|
self.commandTransport = transport
|
|
|
|
transport.onLine = { [weak self] line in
|
|
self?.handleEventLine(line)
|
|
}
|
|
|
|
transport.onClose = { [weak self] in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
self.logger.info("Command channel closed by host")
|
|
self.pingTimer?.invalidate()
|
|
self.pingTimer = nil
|
|
self.commandTransport = nil
|
|
if self.connectionState.isConnected {
|
|
self.transition(to: .connectionLost("Host closed connection"))
|
|
}
|
|
}
|
|
}
|
|
|
|
transport.startReceiving()
|
|
|
|
// Send handshake
|
|
let appVersion = Bundle.main.infoDictionary?["CFBundleShortVersionString"] as? String ?? "unknown"
|
|
let handshake = HandshakeMessage(protocolVersion: RemoteProtocolVersion, appVersion: appVersion)
|
|
transport.send(handshake)
|
|
|
|
// Start keep-alive ping timer
|
|
missedPings = 0
|
|
pingTimer = Timer.scheduledTimer(withTimeInterval: 5.0, repeats: true) { [weak self] _ in
|
|
Task { @MainActor [weak self] in
|
|
guard let self else { return }
|
|
self.missedPings += 1
|
|
if self.missedPings >= 3 {
|
|
self.logger.warning("Keep-alive timeout — \(self.missedPings) consecutive pings unanswered")
|
|
self.pingTimer?.invalidate()
|
|
self.pingTimer = nil
|
|
self.commandTransport?.close()
|
|
self.commandTransport = nil
|
|
self.transition(to: .connectionLost("Host not responding"))
|
|
}
|
|
}
|
|
}
|
|
|
|
transition(to: .connected(hostName))
|
|
logger.info("Command channel established with \(hostName)")
|
|
}
|
|
|
|
// MARK: - Event Handling
|
|
|
|
/// Parse an incoming NDJSON line as a `HostEvent` and dispatch to the appropriate callback.
|
|
private func handleEventLine(_ line: String) {
|
|
guard let data = line.data(using: .utf8) else { return }
|
|
let decoder = JSONDecoder()
|
|
|
|
do {
|
|
let event = try decoder.decode(HostEvent.self, from: data)
|
|
switch event {
|
|
case .playbackState(let payload):
|
|
missedPings = 0
|
|
onPlaybackState?(payload)
|
|
case .dbReady:
|
|
onDBReady?()
|
|
case .error(let message):
|
|
logger.error("Host error: \(message)")
|
|
}
|
|
} catch {
|
|
logger.error("Failed to decode host event: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
|
|
// MARK: - State Machine
|
|
|
|
/// Transition to a new connection state, validating via `canTransition(to:)`.
|
|
private func transition(to newState: ConnectionState) {
|
|
let oldState = connectionState
|
|
guard oldState != newState else { return }
|
|
|
|
guard oldState.canTransition(to: newState) else {
|
|
logger.warning("Invalid transition: \(String(describing: oldState)) → \(String(describing: newState))")
|
|
return
|
|
}
|
|
|
|
logger.info("State: \(String(describing: oldState)) → \(String(describing: newState))")
|
|
connectionState = newState
|
|
}
|
|
|
|
// MARK: - Helpers
|
|
|
|
/// Delete the local remote database file if it exists.
|
|
private func deleteRemoteDB() {
|
|
let path = Self.remoteDBPath
|
|
if FileManager.default.fileExists(atPath: path) {
|
|
do {
|
|
try FileManager.default.removeItem(atPath: path)
|
|
logger.info("Deleted remote DB at \(path)")
|
|
} catch {
|
|
logger.error("Failed to delete remote DB: \(error.localizedDescription)")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|