#!/usr/bin/env python3 """One-time backfill of real bitrate onto tracks stored with bitrate 0 or NULL. ScannerService writes `bitrate = Int(estimatedDataRate / 1000)` at scan time. AVFoundation's estimatedDataRate returns 0 for some files (long/VBR MP3s), so a literal 0 gets stored; other tracks were imported before bitrate existed and are NULL. This script recomputes bitrate for those rows using ffprobe, falling back to fileSize*8/duration (the same average the app's importer now uses) when ffprobe is unavailable or can't determine a value. Dry-run by default. Pass --apply to write (a timestamped backup is made first). Usage: python3 backfill_bitrate.py [--db ] [--apply] python3 backfill_bitrate.py --self-test Stdlib only; uses ffprobe if present on PATH (optional). """ import argparse import os import shutil import sqlite3 import subprocess import sys import unicodedata from datetime import datetime from urllib.parse import unquote # Default DB path for the sandboxed app (bundle id com.staxriver.mu). Computed from # $HOME so it resolves to the right user on whichever Mac the script runs on. DEFAULT_DB = os.path.expanduser( "~/Library/Containers/com.staxriver.mu/Data/Library/" "Application Support/Music/db.sqlite" ) def norm_path(u): """Reduce a file:// URL (or bare path) to a comparable, on-disk POSIX path. The app stores `fileURL` as Foundation's url.absoluteString (a percent-encoded file URL). Decode it, drop the file:// (or file://localhost) prefix, NFC- normalize, and strip a trailing slash so it can be stat'd on APFS. """ s = u if s.startswith("file://"): s = s[len("file://"):] if s.startswith("localhost/"): s = s[len("localhost"):] # leaves the leading "/" s = unquote(s) s = unicodedata.normalize("NFC", s) if len(s) > 1 and s.endswith("/"): s = s[:-1] return s def parse_ffprobe_bitrate(stdout): """Parse ffprobe's bit_rate stdout (bits/sec) into integer kbps, or None. Returns None for empty output, 'N/A', or any non-integer text so the caller falls back to the formula. """ s = stdout.strip() if not s or s == "N/A": return None try: return round(int(s) / 1000) except ValueError: return None def kbps_from_ffprobe(path): """Return integer kbps from ffprobe's format bit_rate, or None if unavailable. None on: ffprobe not installed, ffprobe error, or N/A/empty/non-integer output. """ try: out = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=bit_rate", "-of", "default=nw=1:nk=1", path], capture_output=True, text=True, timeout=30, ) except (FileNotFoundError, subprocess.SubprocessError): return None return parse_ffprobe_bitrate(out.stdout) def kbps_from_formula(file_size, duration): """Average kbps from size (bytes) and duration (seconds): size*8/duration/1000. Returns None when inputs can't yield a meaningful value (missing size, or non-positive/missing duration). """ if file_size is None or file_size <= 0 or duration is None or duration <= 0: return None return round(file_size * 8 / duration / 1000) def resolve_bitrate(path, duration): """Best available kbps for an on-disk file: ffprobe first, formula fallback. `duration` is the DB's stored seconds; file size is read from disk. Returns None if neither method can produce a positive value. """ kbps = kbps_from_ffprobe(path) if kbps is not None and kbps > 0: return kbps try: size = os.path.getsize(path) except OSError: size = None return kbps_from_formula(size, duration) def ffprobe_available(): """Return True if ffprobe is on PATH.""" return shutil.which("ffprobe") is not None def self_test(): """Fast smoke check of the pure helpers (no DB, no ffprobe needed).""" # ffprobe stdout parsing assert parse_ffprobe_bitrate("256005\n") == 256 assert parse_ffprobe_bitrate("N/A") is None assert parse_ffprobe_bitrate("") is None assert parse_ffprobe_bitrate("garbage") is None # formula: 230_358_479 bytes over 7198.54 s -> 256 kbps (matches ffprobe sample) assert kbps_from_formula(230_358_479, 7198.5371428571425) == 256 assert kbps_from_formula(None, 100) is None assert kbps_from_formula(1000, 0) is None assert kbps_from_formula(1000, None) is None # path normalization (NFD vs NFC accents, percent-encoding, localhost host) nfc = norm_path("file:///Users/x/Mu%CC%81sica/Cafe%CC%81.mp3") nfd = norm_path("file://localhost/Users/x/M%C3%BAsica/Caf%C3%A9.mp3") assert nfc == nfd == "/Users/x/Música/Café.mp3", (nfc, nfd) assert norm_path("file:///a/b%20c%23d.mp3") == "/a/b c#d.mp3" # resolve_bitrate composition: a missing file yields None regardless of whether # ffprobe is installed (ffprobe errors on the path -> None; getsize raises # OSError -> formula gets size=None -> None). assert resolve_bitrate("/nonexistent/file.mp3", 100) is None print("self-test OK") def fetch_rows(db_path): """Return candidate rows: (id, fileURL, duration, bitrate) where bitrate is 0/NULL.""" con = sqlite3.connect(db_path) try: return con.execute( "SELECT id, fileURL, duration, bitrate FROM tracks " "WHERE bitrate = 0 OR bitrate IS NULL" ).fetchall() finally: con.close() def build_updates(rows): """Resolve a new bitrate for each candidate row. Returns (updates, missing, undeterminable): - updates: list of {id, file_url, old, new} where new is a positive kbps - missing: (id, path) for rows whose file is not on disk (left untouched) - undeterminable: (id, path) for on-disk files whose bitrate couldn't be found """ updates, missing, undeterminable = [], [], [] for row_id, file_url, duration, old in rows: path = norm_path(file_url) if not os.path.exists(path): missing.append((row_id, path)) continue new = resolve_bitrate(path, duration) if new is None or new <= 0: undeterminable.append((row_id, path)) continue updates.append({"id": row_id, "file_url": file_url, "old": old, "new": new}) return updates, missing, undeterminable def backup_db(db_path): """Copy db.sqlite (+ -wal, -shm) under backups// next to the DB.""" stamp = datetime.now().strftime("%Y%m%d-%H%M%S") backup_dir = os.path.join(os.path.dirname(db_path), "backups", stamp) os.makedirs(backup_dir, exist_ok=True) for suffix in ("", "-wal", "-shm"): src = db_path + suffix if os.path.exists(src): shutil.copy2(src, os.path.join(backup_dir, os.path.basename(src))) return backup_dir def apply_updates(db_path, updates): """Write bitrate updates in a single transaction, then checkpoint the WAL.""" con = sqlite3.connect(db_path) try: con.execute("BEGIN") con.executemany("UPDATE tracks SET bitrate=:new WHERE id=:id", updates) con.commit() con.execute("PRAGMA wal_checkpoint(TRUNCATE)") finally: con.close() def run(db_path, apply): rows = fetch_rows(db_path) updates, missing, undeterminable = build_updates(rows) print(f"Candidate rows (bitrate 0 or NULL): {len(rows)}") print(f"Resolvable (will set): {len(updates)}") print(f"Skipped — file missing on disk: {len(missing)}") print(f"Skipped — could not determine: {len(undeterminable)}") if not ffprobe_available(): print("NOTE: ffprobe not on PATH — used the filesize/duration formula for all rows.") print() for u in updates[:15]: name = os.path.basename(norm_path(u["file_url"])) old = "NULL" if u["old"] is None else u["old"] print(f" • {name}") print(f" bitrate {old} -> {u['new']} kbps") if len(updates) > 15: print(f" ... and {len(updates) - 15} more") print() if missing[:5]: print("Sample of skipped (file missing on disk, left untouched):") for row_id, path in missing[:5]: print(f" - [{row_id}] {os.path.basename(path)}") print() if undeterminable[:5]: print("Sample of skipped (could not determine bitrate, left untouched):") for row_id, path in undeterminable[:5]: print(f" - [{row_id}] {os.path.basename(path)}") print() if not apply: print("DRY RUN — nothing written. Re-run with --apply to commit these changes.") return if not updates: print("Nothing to apply.") return backup_dir = backup_db(db_path) print(f"Backup written to: {backup_dir}") apply_updates(db_path, updates) print(f"Applied {len(updates)} bitrate updates to {db_path}") def main(argv=None): p = argparse.ArgumentParser(description=__doc__) p.add_argument("--db", default=DEFAULT_DB, help=f"App DB path (default: {DEFAULT_DB})") p.add_argument("--apply", action="store_true", help="Write changes (default: dry run).") p.add_argument("--self-test", action="store_true", help="Run the built-in smoke test.") args = p.parse_args(argv) if args.self_test: self_test() return 0 if not os.path.exists(args.db): p.error(f"DB not found: {args.db}") run(args.db, args.apply) return 0 if __name__ == "__main__": sys.exit(main())