diff --git a/Music/Remote/NDJSONLineBuffer.swift b/Music/Remote/NDJSONLineBuffer.swift new file mode 100644 index 0000000..cbeec82 --- /dev/null +++ b/Music/Remote/NDJSONLineBuffer.swift @@ -0,0 +1,32 @@ +import Foundation + +/// A pure, testable line-buffered parser. +/// Receives arbitrary Data chunks, splits on newlines, emits complete lines via callback. +final class NDJSONLineBuffer: @unchecked Sendable { + private var buffer = "" + private let onLine: (String) -> Void + + init(onLine: @escaping (String) -> Void) { + self.onLine = onLine + } + + /// Append data to the internal buffer and emit every complete line (delimited by `\n`). + /// Partial lines are retained until the next `feed()` call completes them. + func feed(_ data: Data) { + guard let chunk = String(data: data, encoding: .utf8) else { return } + buffer.append(chunk) + + while let newlineIndex = buffer.firstIndex(of: "\n") { + let line = String(buffer[buffer.startIndex.. Void)? + + /// Called when the connection is closed or encounters an error. + var onClose: (() -> Void)? + + init(connection: NWConnection, logger: os.Logger) { + self.connection = connection + self.logger = logger + + // Capture self weakly in the line buffer callback to avoid retain cycles. + // The buffer is created before self is fully initialized, so we set the + // actual forwarding closure after init completes via a two-phase approach. + var forwardLine: ((String) -> Void)? + self.lineBuffer = NDJSONLineBuffer { line in + forwardLine?(line) + } + forwardLine = { [weak self] line in + self?.onLine?(line) + } + } + + /// JSON-encode the message, append a newline, and send it over the connection. + func send(_ message: T) { + do { + var data = try encoder.encode(message) + data.append(contentsOf: [UInt8(ascii: "\n")]) + connection.send(content: data, completion: .contentProcessed { [weak self] error in + if let error { + self?.logger.error("Send failed: \(error.localizedDescription)") + } + }) + } catch { + logger.error("Encode failed: \(error.localizedDescription)") + } + } + + /// Begin the receive loop. Data is fed into the line buffer, which emits + /// complete lines via the `onLine` callback. + func startReceiving() { + receiveNext() + } + + /// Cancel the underlying connection. + func close() { + connection.cancel() + } + + // MARK: - Private + + private func receiveNext() { + connection.receive(minimumIncompleteLength: 1, maximumLength: 65_536) { [weak self] content, _, isComplete, error in + Task { @MainActor [weak self] in + guard let self else { return } + + if let data = content, !data.isEmpty { + self.lineBuffer.feed(data) + } + + if isComplete { + self.logger.info("Connection closed by peer") + self.onClose?() + } else if let error { + self.logger.error("Receive error: \(error.localizedDescription)") + self.onClose?() + } else { + self.receiveNext() + } + } + } + } +} diff --git a/MusicTests/NDJSONTransportTests.swift b/MusicTests/NDJSONTransportTests.swift new file mode 100644 index 0000000..8f2371b --- /dev/null +++ b/MusicTests/NDJSONTransportTests.swift @@ -0,0 +1,32 @@ +import Testing +import Foundation +@testable import Music + +struct NDJSONTransportTests { + + // Verifies that feed() correctly splits newline-delimited input into individual lines. + @Test func splitsLinesCorrectly() { + var lines: [String] = [] + let buffer = NDJSONLineBuffer { lines.append($0) } + buffer.feed(Data("{\"type\":\"pause\"}\n{\"type\":\"resume\"}\n".utf8)) + #expect(lines == ["{\"type\":\"pause\"}", "{\"type\":\"resume\"}"]) + } + + // Verifies that a line split across two TCP reads is reassembled. + @Test func handlesPartialLines() { + var lines: [String] = [] + let buffer = NDJSONLineBuffer { lines.append($0) } + buffer.feed(Data("{\"type\":\"pa".utf8)) + #expect(lines.isEmpty) + buffer.feed(Data("use\"}\n".utf8)) + #expect(lines == ["{\"type\":\"pause\"}"]) + } + + // Verifies empty lines are ignored. + @Test func ignoresEmptyLines() { + var lines: [String] = [] + let buffer = NDJSONLineBuffer { lines.append($0) } + buffer.feed(Data("\n\n{\"type\":\"next\"}\n\n".utf8)) + #expect(lines == ["{\"type\":\"next\"}"]) + } +}