feat(remote): add NDJSONTransport for line-buffered JSON framing over TCP

feat/music-streaming
Laurent 1 month ago
parent b0359f127b
commit c754858f21
  1. 32
      Music/Remote/NDJSONLineBuffer.swift
  2. 88
      Music/Remote/NDJSONTransport.swift
  3. 32
      MusicTests/NDJSONTransportTests.swift

@ -0,0 +1,32 @@
import Foundation
/// A pure, testable line-buffered parser.
/// Receives arbitrary Data chunks, splits on newlines, emits complete lines via callback.
final class NDJSONLineBuffer: @unchecked Sendable {
private var buffer = ""
private let onLine: (String) -> Void
init(onLine: @escaping (String) -> Void) {
self.onLine = onLine
}
/// Append data to the internal buffer and emit every complete line (delimited by `\n`).
/// Partial lines are retained until the next `feed()` call completes them.
func feed(_ data: Data) {
guard let chunk = String(data: data, encoding: .utf8) else { return }
buffer.append(chunk)
while let newlineIndex = buffer.firstIndex(of: "\n") {
let line = String(buffer[buffer.startIndex..<newlineIndex])
buffer = String(buffer[buffer.index(after: newlineIndex)...])
if !line.isEmpty {
onLine(line)
}
}
}
/// Clear the internal buffer, discarding any incomplete line.
func reset() {
buffer = ""
}
}

@ -0,0 +1,88 @@
import Foundation
import Network
import os
/// Wraps an `NWConnection` for sending and receiving newline-delimited JSON messages.
@MainActor
final class NDJSONTransport {
private let connection: NWConnection
private let lineBuffer: NDJSONLineBuffer
private let logger: os.Logger
private let encoder: JSONEncoder = {
let e = JSONEncoder()
e.outputFormatting = [.sortedKeys]
return e
}()
/// Called for each complete line received from the connection.
var onLine: ((String) -> Void)?
/// Called when the connection is closed or encounters an error.
var onClose: (() -> Void)?
init(connection: NWConnection, logger: os.Logger) {
self.connection = connection
self.logger = logger
// Capture self weakly in the line buffer callback to avoid retain cycles.
// The buffer is created before self is fully initialized, so we set the
// actual forwarding closure after init completes via a two-phase approach.
var forwardLine: ((String) -> Void)?
self.lineBuffer = NDJSONLineBuffer { line in
forwardLine?(line)
}
forwardLine = { [weak self] line in
self?.onLine?(line)
}
}
/// JSON-encode the message, append a newline, and send it over the connection.
func send<T: Encodable>(_ message: T) {
do {
var data = try encoder.encode(message)
data.append(contentsOf: [UInt8(ascii: "\n")])
connection.send(content: data, completion: .contentProcessed { [weak self] error in
if let error {
self?.logger.error("Send failed: \(error.localizedDescription)")
}
})
} catch {
logger.error("Encode failed: \(error.localizedDescription)")
}
}
/// Begin the receive loop. Data is fed into the line buffer, which emits
/// complete lines via the `onLine` callback.
func startReceiving() {
receiveNext()
}
/// Cancel the underlying connection.
func close() {
connection.cancel()
}
// MARK: - Private
private func receiveNext() {
connection.receive(minimumIncompleteLength: 1, maximumLength: 65_536) { [weak self] content, _, isComplete, error in
Task { @MainActor [weak self] in
guard let self else { return }
if let data = content, !data.isEmpty {
self.lineBuffer.feed(data)
}
if isComplete {
self.logger.info("Connection closed by peer")
self.onClose?()
} else if let error {
self.logger.error("Receive error: \(error.localizedDescription)")
self.onClose?()
} else {
self.receiveNext()
}
}
}
}
}

@ -0,0 +1,32 @@
import Testing
import Foundation
@testable import Music
struct NDJSONTransportTests {
// Verifies that feed() correctly splits newline-delimited input into individual lines.
@Test func splitsLinesCorrectly() {
var lines: [String] = []
let buffer = NDJSONLineBuffer { lines.append($0) }
buffer.feed(Data("{\"type\":\"pause\"}\n{\"type\":\"resume\"}\n".utf8))
#expect(lines == ["{\"type\":\"pause\"}", "{\"type\":\"resume\"}"])
}
// Verifies that a line split across two TCP reads is reassembled.
@Test func handlesPartialLines() {
var lines: [String] = []
let buffer = NDJSONLineBuffer { lines.append($0) }
buffer.feed(Data("{\"type\":\"pa".utf8))
#expect(lines.isEmpty)
buffer.feed(Data("use\"}\n".utf8))
#expect(lines == ["{\"type\":\"pause\"}"])
}
// Verifies empty lines are ignored.
@Test func ignoresEmptyLines() {
var lines: [String] = []
let buffer = NDJSONLineBuffer { lines.append($0) }
buffer.feed(Data("\n\n{\"type\":\"next\"}\n\n".utf8))
#expect(lines == ["{\"type\":\"next\"}"])
}
}
Loading…
Cancel
Save