You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
272 lines
9.4 KiB
272 lines
9.4 KiB
#!/usr/bin/env python3
|
|
"""One-time backfill of real bitrate onto tracks stored with bitrate 0 or NULL.
|
|
|
|
ScannerService writes `bitrate = Int(estimatedDataRate / 1000)` at scan time.
|
|
AVFoundation's estimatedDataRate returns 0 for some files (long/VBR MP3s), so a
|
|
literal 0 gets stored; other tracks were imported before bitrate existed and are
|
|
NULL. This script recomputes bitrate for those rows using ffprobe, falling back
|
|
to fileSize*8/duration (the same average the app's importer now uses) when
|
|
ffprobe is unavailable or can't determine a value.
|
|
|
|
Dry-run by default. Pass --apply to write (a timestamped backup is made first).
|
|
|
|
Usage:
|
|
python3 backfill_bitrate.py [--db <path>] [--apply]
|
|
python3 backfill_bitrate.py --self-test
|
|
|
|
Stdlib only; uses ffprobe if present on PATH (optional).
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import shutil
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime
|
|
from urllib.parse import unquote
|
|
|
|
# Default DB path for the sandboxed app (bundle id com.staxriver.mu). Computed from
|
|
# $HOME so it resolves to the right user on whichever Mac the script runs on.
|
|
DEFAULT_DB = os.path.expanduser(
|
|
"~/Library/Containers/com.staxriver.mu/Data/Library/"
|
|
"Application Support/Music/db.sqlite"
|
|
)
|
|
|
|
|
|
def norm_path(u):
|
|
"""Reduce a file:// URL (or bare path) to a comparable, on-disk POSIX path.
|
|
|
|
The app stores `fileURL` as Foundation's url.absoluteString (a percent-encoded
|
|
file URL). Decode it, drop the file:// (or file://localhost) prefix, NFC-
|
|
normalize, and strip a trailing slash so it can be stat'd on APFS.
|
|
"""
|
|
s = u
|
|
if s.startswith("file://"):
|
|
s = s[len("file://"):]
|
|
if s.startswith("localhost/"):
|
|
s = s[len("localhost"):] # leaves the leading "/"
|
|
s = unquote(s)
|
|
s = unicodedata.normalize("NFC", s)
|
|
if len(s) > 1 and s.endswith("/"):
|
|
s = s[:-1]
|
|
return s
|
|
|
|
|
|
def parse_ffprobe_bitrate(stdout):
|
|
"""Parse ffprobe's bit_rate stdout (bits/sec) into integer kbps, or None.
|
|
|
|
Returns None for empty output, 'N/A', or any non-integer text so the caller
|
|
falls back to the formula.
|
|
"""
|
|
s = stdout.strip()
|
|
if not s or s == "N/A":
|
|
return None
|
|
try:
|
|
return round(int(s) / 1000)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def kbps_from_ffprobe(path):
|
|
"""Return integer kbps from ffprobe's format bit_rate, or None if unavailable.
|
|
|
|
None on: ffprobe not installed, ffprobe error, or N/A/empty/non-integer output.
|
|
"""
|
|
try:
|
|
out = subprocess.run(
|
|
["ffprobe", "-v", "error", "-show_entries", "format=bit_rate",
|
|
"-of", "default=nw=1:nk=1", path],
|
|
capture_output=True, text=True, timeout=30,
|
|
)
|
|
except (FileNotFoundError, subprocess.SubprocessError):
|
|
return None
|
|
return parse_ffprobe_bitrate(out.stdout)
|
|
|
|
|
|
def kbps_from_formula(file_size, duration):
|
|
"""Average kbps from size (bytes) and duration (seconds): size*8/duration/1000.
|
|
|
|
Returns None when inputs can't yield a meaningful value (missing size, or
|
|
non-positive/missing duration).
|
|
"""
|
|
if file_size is None or file_size <= 0 or duration is None or duration <= 0:
|
|
return None
|
|
return round(file_size * 8 / duration / 1000)
|
|
|
|
|
|
def resolve_bitrate(path, duration):
|
|
"""Best available kbps for an on-disk file: ffprobe first, formula fallback.
|
|
|
|
`duration` is the DB's stored seconds; file size is read from disk. Returns
|
|
None if neither method can produce a positive value.
|
|
"""
|
|
kbps = kbps_from_ffprobe(path)
|
|
if kbps is not None and kbps > 0:
|
|
return kbps
|
|
try:
|
|
size = os.path.getsize(path)
|
|
except OSError:
|
|
size = None
|
|
return kbps_from_formula(size, duration)
|
|
|
|
|
|
def ffprobe_available():
|
|
"""Return True if ffprobe is on PATH."""
|
|
return shutil.which("ffprobe") is not None
|
|
|
|
|
|
def self_test():
|
|
"""Fast smoke check of the pure helpers (no DB, no ffprobe needed)."""
|
|
# ffprobe stdout parsing
|
|
assert parse_ffprobe_bitrate("256005\n") == 256
|
|
assert parse_ffprobe_bitrate("N/A") is None
|
|
assert parse_ffprobe_bitrate("") is None
|
|
assert parse_ffprobe_bitrate("garbage") is None
|
|
|
|
# formula: 230_358_479 bytes over 7198.54 s -> 256 kbps (matches ffprobe sample)
|
|
assert kbps_from_formula(230_358_479, 7198.5371428571425) == 256
|
|
assert kbps_from_formula(None, 100) is None
|
|
assert kbps_from_formula(1000, 0) is None
|
|
assert kbps_from_formula(1000, None) is None
|
|
|
|
# path normalization (NFD vs NFC accents, percent-encoding, localhost host)
|
|
nfc = norm_path("file:///Users/x/Mu%CC%81sica/Cafe%CC%81.mp3")
|
|
nfd = norm_path("file://localhost/Users/x/M%C3%BAsica/Caf%C3%A9.mp3")
|
|
assert nfc == nfd == "/Users/x/Música/Café.mp3", (nfc, nfd)
|
|
assert norm_path("file:///a/b%20c%23d.mp3") == "/a/b c#d.mp3"
|
|
|
|
# resolve_bitrate composition: a missing file yields None regardless of whether
|
|
# ffprobe is installed (ffprobe errors on the path -> None; getsize raises
|
|
# OSError -> formula gets size=None -> None).
|
|
assert resolve_bitrate("/nonexistent/file.mp3", 100) is None
|
|
|
|
print("self-test OK")
|
|
|
|
|
|
def fetch_rows(db_path):
|
|
"""Return candidate rows: (id, fileURL, duration, bitrate) where bitrate is 0/NULL."""
|
|
con = sqlite3.connect(db_path)
|
|
try:
|
|
return con.execute(
|
|
"SELECT id, fileURL, duration, bitrate FROM tracks "
|
|
"WHERE bitrate = 0 OR bitrate IS NULL"
|
|
).fetchall()
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def build_updates(rows):
|
|
"""Resolve a new bitrate for each candidate row.
|
|
|
|
Returns (updates, missing, undeterminable):
|
|
- updates: list of {id, file_url, old, new} where new is a positive kbps
|
|
- missing: (id, path) for rows whose file is not on disk (left untouched)
|
|
- undeterminable: (id, path) for on-disk files whose bitrate couldn't be found
|
|
"""
|
|
updates, missing, undeterminable = [], [], []
|
|
for row_id, file_url, duration, old in rows:
|
|
path = norm_path(file_url)
|
|
if not os.path.exists(path):
|
|
missing.append((row_id, path))
|
|
continue
|
|
new = resolve_bitrate(path, duration)
|
|
if new is None or new <= 0:
|
|
undeterminable.append((row_id, path))
|
|
continue
|
|
updates.append({"id": row_id, "file_url": file_url, "old": old, "new": new})
|
|
return updates, missing, undeterminable
|
|
|
|
|
|
def backup_db(db_path):
|
|
"""Copy db.sqlite (+ -wal, -shm) under backups/<timestamp>/ next to the DB."""
|
|
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
backup_dir = os.path.join(os.path.dirname(db_path), "backups", stamp)
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
for suffix in ("", "-wal", "-shm"):
|
|
src = db_path + suffix
|
|
if os.path.exists(src):
|
|
shutil.copy2(src, os.path.join(backup_dir, os.path.basename(src)))
|
|
return backup_dir
|
|
|
|
|
|
def apply_updates(db_path, updates):
|
|
"""Write bitrate updates in a single transaction, then checkpoint the WAL."""
|
|
con = sqlite3.connect(db_path)
|
|
try:
|
|
con.execute("BEGIN")
|
|
con.executemany("UPDATE tracks SET bitrate=:new WHERE id=:id", updates)
|
|
con.commit()
|
|
con.execute("PRAGMA wal_checkpoint(TRUNCATE)")
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def run(db_path, apply):
|
|
rows = fetch_rows(db_path)
|
|
updates, missing, undeterminable = build_updates(rows)
|
|
|
|
print(f"Candidate rows (bitrate 0 or NULL): {len(rows)}")
|
|
print(f"Resolvable (will set): {len(updates)}")
|
|
print(f"Skipped — file missing on disk: {len(missing)}")
|
|
print(f"Skipped — could not determine: {len(undeterminable)}")
|
|
if not ffprobe_available():
|
|
print("NOTE: ffprobe not on PATH — used the filesize/duration formula for all rows.")
|
|
print()
|
|
|
|
for u in updates[:15]:
|
|
name = os.path.basename(norm_path(u["file_url"]))
|
|
old = "NULL" if u["old"] is None else u["old"]
|
|
print(f" • {name}")
|
|
print(f" bitrate {old} -> {u['new']} kbps")
|
|
if len(updates) > 15:
|
|
print(f" ... and {len(updates) - 15} more")
|
|
print()
|
|
|
|
if missing[:5]:
|
|
print("Sample of skipped (file missing on disk, left untouched):")
|
|
for row_id, path in missing[:5]:
|
|
print(f" - [{row_id}] {os.path.basename(path)}")
|
|
print()
|
|
|
|
if undeterminable[:5]:
|
|
print("Sample of skipped (could not determine bitrate, left untouched):")
|
|
for row_id, path in undeterminable[:5]:
|
|
print(f" - [{row_id}] {os.path.basename(path)}")
|
|
print()
|
|
|
|
if not apply:
|
|
print("DRY RUN — nothing written. Re-run with --apply to commit these changes.")
|
|
return
|
|
|
|
if not updates:
|
|
print("Nothing to apply.")
|
|
return
|
|
|
|
backup_dir = backup_db(db_path)
|
|
print(f"Backup written to: {backup_dir}")
|
|
apply_updates(db_path, updates)
|
|
print(f"Applied {len(updates)} bitrate updates to {db_path}")
|
|
|
|
|
|
def main(argv=None):
|
|
p = argparse.ArgumentParser(description=__doc__)
|
|
p.add_argument("--db", default=DEFAULT_DB, help=f"App DB path (default: {DEFAULT_DB})")
|
|
p.add_argument("--apply", action="store_true", help="Write changes (default: dry run).")
|
|
p.add_argument("--self-test", action="store_true", help="Run the built-in smoke test.")
|
|
args = p.parse_args(argv)
|
|
|
|
if args.self_test:
|
|
self_test()
|
|
return 0
|
|
|
|
if not os.path.exists(args.db):
|
|
p.error(f"DB not found: {args.db}")
|
|
|
|
run(args.db, args.apply)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|
|
|