feat(remote): add HostServer with Bonjour, HTTP DB download, and NDJSON command channel

feat/music-streaming
Laurent 1 month ago
parent e5b5c249b4
commit 463ecb518b
  1. 2
      Music/Music.entitlements
  2. 363
      Music/Remote/HostServer.swift
  3. 9
      Music/Services/DatabaseService.swift
  4. 164
      MusicTests/HostServerIntegrationTests.swift

@ -12,6 +12,8 @@
<true/>
<key>com.apple.security.network.client</key>
<true/>
<key>com.apple.security.network.server</key>
<true/>
<key>com.apple.security.device.audio-input</key>
<true/>
</dict>

@ -0,0 +1,363 @@
import Foundation
import Network
import os
@MainActor
@Observable
final class HostServer {
var isHosting = false
var connectedRemoteName: String?
private(set) var actualPort: UInt16?
private let dbPath: String
private var listener: NWListener?
private var commandTransport: NDJSONTransport?
private var commandConnection: NWConnection?
private var player: PlayerViewModel?
private var db: DatabaseService?
private var stateTimer: Timer?
private let logger = RemoteLogger.host
init(dbPath: String) {
self.dbPath = dbPath
}
/// Configure the server with a player and database for command dispatch.
/// Pass `nil` for either if not needed (e.g. DB-only serving without a player).
func configure(player: PlayerViewModel?, db: DatabaseService?) {
self.player = player
self.db = db
}
/// Start the Bonjour listener on a random TCP port.
func start() throws {
let params = NWParameters.tcp
let listener = try NWListener(using: params)
listener.service = NWListener.Service(type: "_musicremote._tcp")
listener.stateUpdateHandler = { [weak self] state in
Task { @MainActor [weak self] in
guard let self else { return }
switch state {
case .ready:
if let port = listener.port?.rawValue {
self.actualPort = port
self.logger.info("Listener ready on port \(port)")
}
self.isHosting = true
case .failed(let error):
self.logger.error("Listener failed: \(error.localizedDescription)")
self.isHosting = false
case .cancelled:
self.isHosting = false
default:
break
}
}
}
listener.newConnectionHandler = { [weak self] connection in
Task { @MainActor [weak self] in
self?.handleNewConnection(connection)
}
}
listener.start(queue: .main)
self.listener = listener
// Start periodic playback-state timer
stateTimer = Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true) { [weak self] _ in
Task { @MainActor [weak self] in
self?.sendPlaybackState()
}
}
}
/// Stop the server, close all connections, and remove the Bonjour advertisement.
func stop() {
stateTimer?.invalidate()
stateTimer = nil
commandTransport?.close()
commandTransport = nil
commandConnection?.cancel()
commandConnection = nil
connectedRemoteName = nil
listener?.cancel()
listener = nil
actualPort = nil
isHosting = false
}
// MARK: - Connection Handling
/// Receive the first chunk of data from a new connection to determine the HTTP route.
private func handleNewConnection(_ connection: NWConnection) {
connection.stateUpdateHandler = { [weak self] state in
Task { @MainActor [weak self] in
if case .failed(let error) = state {
self?.logger.error("Connection failed: \(error.localizedDescription)")
}
}
}
connection.start(queue: .main)
// Read the initial HTTP request line to determine the route
connection.receive(minimumIncompleteLength: 1, maximumLength: 65_536) { [weak self] data, _, _, error in
Task { @MainActor [weak self] in
guard let self else { return }
if let error {
self.logger.error("Failed to read request: \(error.localizedDescription)")
connection.cancel()
return
}
guard let data, let request = String(data: data, encoding: .utf8) else {
connection.cancel()
return
}
self.routeRequest(request, on: connection)
}
}
}
/// Parse the HTTP request line and dispatch to the appropriate handler.
private func routeRequest(_ request: String, on connection: NWConnection) {
let firstLine = request.split(separator: "\r\n").first.map(String.init) ?? request
logger.info("Request: \(firstLine)")
if firstLine.hasPrefix("GET /db") {
handleDBRequest(on: connection)
} else if firstLine.hasPrefix("GET /cmd") {
handleCommandRequest(on: connection)
} else {
sendHTTP(status: "404 Not Found", body: Data("Not Found".utf8), on: connection, close: true)
}
}
// MARK: - GET /db
/// Serve the SQLite database as an HTTP response.
/// Uses SQLite's backup API to produce a self-contained copy that includes
/// all WAL data, avoiding races with concurrent writers.
private func handleDBRequest(on connection: NWConnection) {
do {
let data: Data
if let db {
// Create a temporary copy via the backup API so the served file
// is self-contained (no WAL/SHM dependency) and consistent.
let tempURL = FileManager.default.temporaryDirectory
.appendingPathComponent(UUID().uuidString + ".sqlite")
defer { try? FileManager.default.removeItem(at: tempURL) }
try db.backup(to: tempURL.path)
data = try Data(contentsOf: tempURL)
} else {
// Fallback: serve the raw file when no DatabaseService is configured
data = try Data(contentsOf: URL(fileURLWithPath: dbPath))
}
logger.info("Serving database (\(data.count) bytes)")
sendHTTP(
status: "200 OK",
body: data,
contentType: "application/octet-stream",
on: connection,
close: true
)
} catch {
logger.error("Failed to read database: \(error.localizedDescription)")
sendHTTP(
status: "500 Internal Server Error",
body: Data("Failed to read database".utf8),
on: connection,
close: true
)
}
}
// MARK: - GET /cmd
/// Upgrade the connection to an NDJSON command channel.
private func handleCommandRequest(on connection: NWConnection) {
// Only one command channel at a time
if commandTransport != nil {
logger.warning("Rejecting second command channel")
sendHTTP(
status: "409 Conflict",
body: Data("Command channel already connected".utf8),
on: connection,
close: true
)
return
}
// Send the HTTP 200 response header, then upgrade to NDJSON streaming
let header = "HTTP/1.1 200 OK\r\nContent-Type: application/x-ndjson\r\nTransfer-Encoding: chunked\r\nConnection: keep-alive\r\n\r\n"
connection.send(content: Data(header.utf8), completion: .contentProcessed { [weak self] error in
Task { @MainActor [weak self] in
guard let self else { return }
if let error {
self.logger.error("Failed to send cmd header: \(error.localizedDescription)")
connection.cancel()
return
}
self.setupCommandTransport(on: connection)
}
})
}
/// Wire up the NDJSON transport for reading commands and sending events.
private func setupCommandTransport(on connection: NWConnection) {
let transport = NDJSONTransport(connection: connection, logger: logger)
self.commandTransport = transport
self.commandConnection = connection
transport.onLine = { [weak self] line in
Task { @MainActor [weak self] in
self?.handleIncomingLine(line)
}
}
transport.onClose = { [weak self] in
Task { @MainActor [weak self] in
guard let self else { return }
self.logger.info("Command channel closed")
self.commandTransport = nil
self.commandConnection = nil
self.connectedRemoteName = nil
}
}
transport.startReceiving()
logger.info("Command channel established")
// Send initial playback state
sendPlaybackState()
}
// MARK: - Command Dispatch
/// Process an incoming NDJSON line try handshake first, then remote command.
private func handleIncomingLine(_ line: String) {
guard let data = line.data(using: .utf8) else { return }
let decoder = JSONDecoder()
// Try parsing as a handshake message first
if let handshake = try? decoder.decode(HandshakeMessage.self, from: data) {
logger.info("Handshake received: v\(handshake.protocolVersion), app \(handshake.appVersion)")
connectedRemoteName = handshake.appVersion
sendPlaybackState()
return
}
// Parse as a remote command
do {
let command = try decoder.decode(RemoteCommand.self, from: data)
logger.info("Received command: \(String(describing: command))")
dispatchCommand(command)
} catch {
logger.error("Failed to decode command: \(error.localizedDescription)")
commandTransport?.send(HostEvent.error(message: "Invalid command"))
}
}
/// Execute the remote command against the player.
private func dispatchCommand(_ command: RemoteCommand) {
guard let player else {
logger.warning("No player configured, ignoring command")
return
}
switch command {
case .play(let trackId, let queueIds):
handlePlayCommand(trackId: trackId, queueIds: queueIds, player: player)
case .pause:
player.pause()
case .resume:
player.resume()
case .next:
player.next()
case .previous:
player.previous()
case .seek(let position):
player.seek(to: position)
case .setVolume(let level):
player.setVolume(level)
case .toggleShuffle:
player.toggleShuffle()
case .refreshDB:
commandTransport?.send(HostEvent.dbReady)
return // Don't send playback state for refreshDB, just the dbReady event
}
// After each command, send current playback state
sendPlaybackState()
}
/// Handle the play command by fetching tracks and setting up the queue.
private func handlePlayCommand(trackId: Int64, queueIds: [Int64], player: PlayerViewModel) {
guard let db else {
logger.warning("No database configured, cannot handle play command")
commandTransport?.send(HostEvent.error(message: "No database available"))
return
}
do {
let tracks = try db.fetchTracksByIds(queueIds)
guard let track = tracks.first(where: { $0.id == trackId }) else {
logger.warning("Track \(trackId) not found in database")
commandTransport?.send(HostEvent.error(message: "Track not found"))
return
}
player.setQueue(tracks)
player.play(track)
} catch {
logger.error("Failed to fetch tracks: \(error.localizedDescription)")
commandTransport?.send(HostEvent.error(message: "Database error"))
}
}
// MARK: - State Updates
/// Build and send the current playback state to the connected remote.
private func sendPlaybackState() {
guard let transport = commandTransport else { return }
let payload = PlaybackStatePayload(
trackId: player?.currentTrack?.id,
isPlaying: player?.isPlaying ?? false,
currentTime: player?.currentTime ?? 0,
duration: player?.duration ?? 0,
volume: player?.volume ?? 0.65,
isShuffled: player?.isShuffled ?? false
)
transport.send(HostEvent.playbackState(payload))
}
// MARK: - HTTP Helper
/// Send an HTTP response with the given status, body, and content type.
private func sendHTTP(
status: String,
body: Data?,
contentType: String = "text/plain",
on connection: NWConnection,
close: Bool
) {
let bodyData = body ?? Data()
let header = "HTTP/1.1 \(status)\r\nContent-Type: \(contentType)\r\nContent-Length: \(bodyData.count)\r\nConnection: \(close ? "close" : "keep-alive")\r\n\r\n"
var responseData = Data(header.utf8)
responseData.append(bodyData)
connection.send(content: responseData, completion: .contentProcessed { _ in
if close {
connection.cancel()
}
})
}
}

@ -113,6 +113,15 @@ nonisolated final class DatabaseService: Sendable {
try migrator.migrate(db)
}
// MARK: - Maintenance
/// Create a self-contained copy of the database at the given path using
/// SQLite's online backup API. The copy includes all WAL data and is safe
/// to serve or transfer without additional files.
func backup(to destinationPath: String) throws {
try dbPool.backup(to: DatabaseQueue(path: destinationPath))
}
// MARK: - Write
func insert(_ track: inout Track) throws {

@ -0,0 +1,164 @@
import Testing
import Foundation
import Network
@testable import Music
@MainActor
struct HostServerIntegrationTests {
// Starts a HostServer, connects via TCP, sends GET /db,
// verifies the response contains valid SQLite data.
@Test(.timeLimit(.minutes(1)))
func dbDownloadReturnsValidSQLite() async throws {
// 1. Create a temp directory and a SQLite database with one track
let tempDir = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
try FileManager.default.createDirectory(at: tempDir, withIntermediateDirectories: true)
let dbPath = tempDir.appendingPathComponent("db.sqlite").path
let db = try DatabaseService(path: dbPath)
var track = Track.fixture(fileURL: "/test.mp3")
try db.insert(&track)
// 2. Start the HostServer (configured with db for WAL checkpoint) and wait for the listener to be ready
let server = HostServer(dbPath: dbPath)
server.configure(player: nil, db: db)
try server.start()
try await Task.sleep(for: .milliseconds(200))
let port = server.actualPort!
// 3. Perform an HTTP GET /db request to download the database
let responseData = try await httpGet(host: "127.0.0.1", port: port, path: "/db")
// 4. Verify the response starts with the SQLite magic header
let header = String(data: responseData.prefix(16), encoding: .utf8) ?? ""
#expect(header.hasPrefix("SQLite format 3"))
// 5. Write the downloaded data to disk and verify it contains the inserted track
let downloadedPath = tempDir.appendingPathComponent("downloaded.sqlite").path
try responseData.write(to: URL(fileURLWithPath: downloadedPath))
let downloadedDb = try DatabaseService(path: downloadedPath)
#expect(try downloadedDb.trackCount() == 1)
// 6. Clean up
server.stop()
try? FileManager.default.removeItem(at: tempDir)
}
// Connects to /cmd, sends a pause command, verifies a playbackState event comes back.
@Test(.timeLimit(.minutes(1)))
func commandChannelRoundTrip() async throws {
// 1. Create a temp directory and an empty SQLite database
let tempDir = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)
try FileManager.default.createDirectory(at: tempDir, withIntermediateDirectories: true)
let dbPath = tempDir.appendingPathComponent("db.sqlite").path
_ = try DatabaseService(path: dbPath)
// 2. Set up the player and server with command dispatch configured
let audio = AudioService()
let player = PlayerViewModel(audio: audio, db: nil)
let server = HostServer(dbPath: dbPath)
server.configure(player: player, db: nil)
try server.start()
try await Task.sleep(for: .milliseconds(200))
let port = server.actualPort!
// 3. Open a command channel connection via GET /cmd
let connection = try await connectCommandChannel(host: "127.0.0.1", port: port)
// 4. Send a pause command as NDJSON
let pauseCmd = try JSONEncoder().encode(RemoteCommand.pause)
var lineData = pauseCmd
lineData.append(contentsOf: "\n".utf8)
connection.send(content: lineData, completion: .contentProcessed { _ in })
// 5. Wait for and decode the playbackState response event
let responseLine = try await receiveOneLine(on: connection)
let event = try JSONDecoder().decode(HostEvent.self, from: Data(responseLine.utf8))
// 6. Verify it is a playbackState event with isPlaying == false
if case .playbackState(let payload) = event {
#expect(payload.isPlaying == false)
} else {
Issue.record("Expected playbackState, got \(event)")
}
// 7. Clean up
connection.cancel()
server.stop()
try? FileManager.default.removeItem(at: tempDir)
}
// MARK: - Helpers
/// Performs a simple HTTP GET using NWConnection and returns the response body.
private func httpGet(host: String, port: UInt16, path: String) async throws -> Data {
try await withCheckedThrowingContinuation { continuation in
let connection = NWConnection(
host: NWEndpoint.Host(host),
port: NWEndpoint.Port(rawValue: port)!,
using: .tcp
)
connection.stateUpdateHandler = { state in
if case .ready = state {
// Send the HTTP request
let request = "GET \(path) HTTP/1.1\r\nHost: \(host)\r\nConnection: close\r\n\r\n"
connection.send(content: Data(request.utf8), completion: .contentProcessed { _ in })
// Receive the full response (Connection: close ensures we get everything)
connection.receiveMessage { data, _, _, error in
if let error {
continuation.resume(throwing: error)
} else if let data, let range = data.range(of: Data("\r\n\r\n".utf8)) {
// Strip HTTP headers, return just the body
continuation.resume(returning: Data(data[range.upperBound...]))
} else {
continuation.resume(returning: data ?? Data())
}
connection.cancel()
}
} else if case .failed(let error) = state {
continuation.resume(throwing: error)
}
}
connection.start(queue: .main)
}
}
/// Opens a TCP connection to the /cmd endpoint and waits for the HTTP response header.
private func connectCommandChannel(host: String, port: UInt16) async throws -> NWConnection {
try await withCheckedThrowingContinuation { continuation in
let connection = NWConnection(
host: NWEndpoint.Host(host),
port: NWEndpoint.Port(rawValue: port)!,
using: .tcp
)
connection.stateUpdateHandler = { state in
if case .ready = state {
// Send the HTTP upgrade request for the command channel
let request = "GET /cmd HTTP/1.1\r\nHost: \(host)\r\nConnection: keep-alive\r\n\r\n"
connection.send(content: Data(request.utf8), completion: .contentProcessed { _ in })
// Consume the HTTP 200 response header
connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) { _, _, _, _ in
continuation.resume(returning: connection)
}
} else if case .failed(let error) = state {
continuation.resume(throwing: error)
}
}
connection.start(queue: .main)
}
}
/// Reads one newline-delimited line from a connection.
private func receiveOneLine(on connection: NWConnection) async throws -> String {
try await withCheckedThrowingContinuation { continuation in
connection.receive(minimumIncompleteLength: 1, maximumLength: 65536) { data, _, _, error in
if let error {
continuation.resume(throwing: error)
} else {
let text = data.flatMap { String(data: $0, encoding: .utf8) } ?? ""
// Extract the first complete line from the received data
continuation.resume(returning: text.split(separator: "\n").first.map(String.init) ?? text)
}
}
}
}
}
Loading…
Cancel
Save