You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
Music/scripts/backfill_bitrate.py

272 lines
9.4 KiB

#!/usr/bin/env python3
"""One-time backfill of real bitrate onto tracks stored with bitrate 0 or NULL.
ScannerService writes `bitrate = Int(estimatedDataRate / 1000)` at scan time.
AVFoundation's estimatedDataRate returns 0 for some files (long/VBR MP3s), so a
literal 0 gets stored; other tracks were imported before bitrate existed and are
NULL. This script recomputes bitrate for those rows using ffprobe, falling back
to fileSize*8/duration (the same average the app's importer now uses) when
ffprobe is unavailable or can't determine a value.
Dry-run by default. Pass --apply to write (a timestamped backup is made first).
Usage:
python3 backfill_bitrate.py [--db <path>] [--apply]
python3 backfill_bitrate.py --self-test
Stdlib only; uses ffprobe if present on PATH (optional).
"""
import argparse
import os
import shutil
import sqlite3
import subprocess
import sys
import unicodedata
from datetime import datetime
from urllib.parse import unquote
# Default DB path for the sandboxed app (bundle id com.staxriver.mu). Computed from
# $HOME so it resolves to the right user on whichever Mac the script runs on.
DEFAULT_DB = os.path.expanduser(
"~/Library/Containers/com.staxriver.mu/Data/Library/"
"Application Support/Music/db.sqlite"
)
def norm_path(u):
"""Reduce a file:// URL (or bare path) to a comparable, on-disk POSIX path.
The app stores `fileURL` as Foundation's url.absoluteString (a percent-encoded
file URL). Decode it, drop the file:// (or file://localhost) prefix, NFC-
normalize, and strip a trailing slash so it can be stat'd on APFS.
"""
s = u
if s.startswith("file://"):
s = s[len("file://"):]
if s.startswith("localhost/"):
s = s[len("localhost"):] # leaves the leading "/"
s = unquote(s)
s = unicodedata.normalize("NFC", s)
if len(s) > 1 and s.endswith("/"):
s = s[:-1]
return s
def parse_ffprobe_bitrate(stdout):
"""Parse ffprobe's bit_rate stdout (bits/sec) into integer kbps, or None.
Returns None for empty output, 'N/A', or any non-integer text so the caller
falls back to the formula.
"""
s = stdout.strip()
if not s or s == "N/A":
return None
try:
return round(int(s) / 1000)
except ValueError:
return None
def kbps_from_ffprobe(path):
"""Return integer kbps from ffprobe's format bit_rate, or None if unavailable.
None on: ffprobe not installed, ffprobe error, or N/A/empty/non-integer output.
"""
try:
out = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=bit_rate",
"-of", "default=nw=1:nk=1", path],
capture_output=True, text=True, timeout=30,
)
except (FileNotFoundError, subprocess.SubprocessError):
return None
return parse_ffprobe_bitrate(out.stdout)
def kbps_from_formula(file_size, duration):
"""Average kbps from size (bytes) and duration (seconds): size*8/duration/1000.
Returns None when inputs can't yield a meaningful value (missing size, or
non-positive/missing duration).
"""
if file_size is None or file_size <= 0 or duration is None or duration <= 0:
return None
return round(file_size * 8 / duration / 1000)
def resolve_bitrate(path, duration):
"""Best available kbps for an on-disk file: ffprobe first, formula fallback.
`duration` is the DB's stored seconds; file size is read from disk. Returns
None if neither method can produce a positive value.
"""
kbps = kbps_from_ffprobe(path)
if kbps is not None and kbps > 0:
return kbps
try:
size = os.path.getsize(path)
except OSError:
size = None
return kbps_from_formula(size, duration)
def ffprobe_available():
"""Return True if ffprobe is on PATH."""
return shutil.which("ffprobe") is not None
def self_test():
"""Fast smoke check of the pure helpers (no DB, no ffprobe needed)."""
# ffprobe stdout parsing
assert parse_ffprobe_bitrate("256005\n") == 256
assert parse_ffprobe_bitrate("N/A") is None
assert parse_ffprobe_bitrate("") is None
assert parse_ffprobe_bitrate("garbage") is None
# formula: 230_358_479 bytes over 7198.54 s -> 256 kbps (matches ffprobe sample)
assert kbps_from_formula(230_358_479, 7198.5371428571425) == 256
assert kbps_from_formula(None, 100) is None
assert kbps_from_formula(1000, 0) is None
assert kbps_from_formula(1000, None) is None
# path normalization (NFD vs NFC accents, percent-encoding, localhost host)
nfc = norm_path("file:///Users/x/Mu%CC%81sica/Cafe%CC%81.mp3")
nfd = norm_path("file://localhost/Users/x/M%C3%BAsica/Caf%C3%A9.mp3")
assert nfc == nfd == "/Users/x/Música/Café.mp3", (nfc, nfd)
assert norm_path("file:///a/b%20c%23d.mp3") == "/a/b c#d.mp3"
# resolve_bitrate composition: a missing file yields None regardless of whether
# ffprobe is installed (ffprobe errors on the path -> None; getsize raises
# OSError -> formula gets size=None -> None).
assert resolve_bitrate("/nonexistent/file.mp3", 100) is None
print("self-test OK")
def fetch_rows(db_path):
"""Return candidate rows: (id, fileURL, duration, bitrate) where bitrate is 0/NULL."""
con = sqlite3.connect(db_path)
try:
return con.execute(
"SELECT id, fileURL, duration, bitrate FROM tracks "
"WHERE bitrate = 0 OR bitrate IS NULL"
).fetchall()
finally:
con.close()
def build_updates(rows):
"""Resolve a new bitrate for each candidate row.
Returns (updates, missing, undeterminable):
- updates: list of {id, file_url, old, new} where new is a positive kbps
- missing: (id, path) for rows whose file is not on disk (left untouched)
- undeterminable: (id, path) for on-disk files whose bitrate couldn't be found
"""
updates, missing, undeterminable = [], [], []
for row_id, file_url, duration, old in rows:
path = norm_path(file_url)
if not os.path.exists(path):
missing.append((row_id, path))
continue
new = resolve_bitrate(path, duration)
if new is None or new <= 0:
undeterminable.append((row_id, path))
continue
updates.append({"id": row_id, "file_url": file_url, "old": old, "new": new})
return updates, missing, undeterminable
def backup_db(db_path):
"""Copy db.sqlite (+ -wal, -shm) under backups/<timestamp>/ next to the DB."""
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
backup_dir = os.path.join(os.path.dirname(db_path), "backups", stamp)
os.makedirs(backup_dir, exist_ok=True)
for suffix in ("", "-wal", "-shm"):
src = db_path + suffix
if os.path.exists(src):
shutil.copy2(src, os.path.join(backup_dir, os.path.basename(src)))
return backup_dir
def apply_updates(db_path, updates):
"""Write bitrate updates in a single transaction, then checkpoint the WAL."""
con = sqlite3.connect(db_path)
try:
con.execute("BEGIN")
con.executemany("UPDATE tracks SET bitrate=:new WHERE id=:id", updates)
con.commit()
con.execute("PRAGMA wal_checkpoint(TRUNCATE)")
finally:
con.close()
def run(db_path, apply):
rows = fetch_rows(db_path)
updates, missing, undeterminable = build_updates(rows)
print(f"Candidate rows (bitrate 0 or NULL): {len(rows)}")
print(f"Resolvable (will set): {len(updates)}")
print(f"Skipped — file missing on disk: {len(missing)}")
print(f"Skipped — could not determine: {len(undeterminable)}")
if not ffprobe_available():
print("NOTE: ffprobe not on PATH — used the filesize/duration formula for all rows.")
print()
for u in updates[:15]:
name = os.path.basename(norm_path(u["file_url"]))
old = "NULL" if u["old"] is None else u["old"]
print(f"{name}")
print(f" bitrate {old} -> {u['new']} kbps")
if len(updates) > 15:
print(f" ... and {len(updates) - 15} more")
print()
if missing[:5]:
print("Sample of skipped (file missing on disk, left untouched):")
for row_id, path in missing[:5]:
print(f" - [{row_id}] {os.path.basename(path)}")
print()
if undeterminable[:5]:
print("Sample of skipped (could not determine bitrate, left untouched):")
for row_id, path in undeterminable[:5]:
print(f" - [{row_id}] {os.path.basename(path)}")
print()
if not apply:
print("DRY RUN — nothing written. Re-run with --apply to commit these changes.")
return
if not updates:
print("Nothing to apply.")
return
backup_dir = backup_db(db_path)
print(f"Backup written to: {backup_dir}")
apply_updates(db_path, updates)
print(f"Applied {len(updates)} bitrate updates to {db_path}")
def main(argv=None):
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--db", default=DEFAULT_DB, help=f"App DB path (default: {DEFAULT_DB})")
p.add_argument("--apply", action="store_true", help="Write changes (default: dry run).")
p.add_argument("--self-test", action="store_true", help="Run the built-in smoke test.")
args = p.parse_args(argv)
if args.self_test:
self_test()
return 0
if not os.path.exists(args.db):
p.error(f"DB not found: {args.db}")
run(args.db, args.apply)
return 0
if __name__ == "__main__":
sys.exit(main())