You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
253 lines
9.0 KiB
253 lines
9.0 KiB
#!/usr/bin/env python3
|
|
"""One-time backfill of Date Added / play stats from Apple Music into the app DB.
|
|
|
|
ScannerService stamps `dateAdded = Date()` at scan time, so the app DB holds scan
|
|
dates rather than the real "date added" from Apple Music. This script reads the
|
|
ground truth from a Music.app library export (File > Library > Export Library...)
|
|
and overwrites dateAdded, playCount, rating and lastPlayedAt on tracks it can match
|
|
by file path.
|
|
|
|
Dry-run by default. Pass --apply to write (a timestamped backup is made first).
|
|
|
|
Usage:
|
|
python3 backfill_itunes_dates.py --xml <Library.xml> [--db <path>] [--apply]
|
|
python3 backfill_itunes_dates.py --self-test
|
|
|
|
Stdlib only; needs python3 (ships with Xcode Command Line Tools).
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import plistlib
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime
|
|
from urllib.parse import unquote
|
|
|
|
# Default DB path for the sandboxed app (bundle id com.staxriver.mu). Computed from
|
|
# $HOME so it resolves to the right user on whichever Mac the script runs on.
|
|
DEFAULT_DB = os.path.expanduser(
|
|
"~/Library/Containers/com.staxriver.mu/Data/Library/"
|
|
"Application Support/Music/db.sqlite"
|
|
)
|
|
|
|
|
|
def norm_path(u):
|
|
"""Reduce a file:// URL (or bare path) to a comparable POSIX path.
|
|
|
|
Both the app's stored `fileURL` (Foundation's url.absoluteString) and Music.app's
|
|
`Location` are percent-encoded file URLs, but they can differ in host form
|
|
(file:/// vs file://localhost/) and Unicode normalization (APFS keeps filenames
|
|
in one form, the URL encoders may emit another). Normalizing both to a decoded,
|
|
NFC, trailing-slash-free path makes accented filenames compare equal.
|
|
"""
|
|
s = u
|
|
if s.startswith("file://"):
|
|
s = s[len("file://"):]
|
|
if s.startswith("localhost/"):
|
|
s = s[len("localhost"):] # leaves the leading "/"
|
|
s = unquote(s)
|
|
s = unicodedata.normalize("NFC", s)
|
|
if len(s) > 1 and s.endswith("/"):
|
|
s = s[:-1]
|
|
return s
|
|
|
|
|
|
def fmt_dt(dt):
|
|
"""Format a datetime as GRDB's .datetime string (UTC), or None.
|
|
|
|
plistlib parses <date> values into naive datetimes already expressed in UTC,
|
|
which is exactly what GRDB stores (e.g. '2026-05-24 06:46:01.713'). We emit
|
|
millisecond precision (.000) to match the column's existing shape; GRDB reads
|
|
both with and without millis, so this round-trips.
|
|
"""
|
|
if dt is None:
|
|
return None
|
|
return dt.strftime("%Y-%m-%d %H:%M:%S") + ".000"
|
|
|
|
|
|
def parse_library(xml_path):
|
|
"""Parse a Music.app library export into {norm_path: fields}.
|
|
|
|
Only tracks with a Location (i.e. real local files) are included; Apple Music
|
|
streaming entries have no Location and are skipped.
|
|
"""
|
|
with open(xml_path, "rb") as fp:
|
|
plist = plistlib.load(fp)
|
|
|
|
music = {}
|
|
for track in plist.get("Tracks", {}).values():
|
|
location = track.get("Location")
|
|
if not location:
|
|
continue
|
|
music[norm_path(location)] = {
|
|
"date_added": track.get("Date Added"),
|
|
"play_count": track.get("Play Count"),
|
|
"rating": track.get("Rating"),
|
|
"play_date_utc": track.get("Play Date UTC"),
|
|
}
|
|
return music
|
|
|
|
|
|
def build_updates(db_rows, music_map):
|
|
"""Compute UPDATE tuples for matched tracks (blunt overwrite, Music is truth).
|
|
|
|
db_rows: iterable of (id, fileURL, current_dateAdded).
|
|
Returns (updates, unmatched) where:
|
|
- updates is a list of dicts: id, file_url, old_date, dateAdded, playCount,
|
|
rating, lastPlayedAt
|
|
- unmatched is a list of (id, fileURL) present in the DB but not the export.
|
|
"""
|
|
updates = []
|
|
unmatched = []
|
|
for row_id, file_url, current_date in db_rows:
|
|
m = music_map.get(norm_path(file_url))
|
|
if m is None:
|
|
unmatched.append((row_id, file_url))
|
|
continue
|
|
# dateAdded is NOT NULL: if the export somehow lacks it, keep what's there.
|
|
new_date = fmt_dt(m["date_added"]) if m["date_added"] else current_date
|
|
rating = min(5, int(m["rating"] or 0) // 20) # Music 0-100 -> 0-5 stars
|
|
updates.append({
|
|
"id": row_id,
|
|
"file_url": file_url,
|
|
"old_date": current_date,
|
|
"dateAdded": new_date,
|
|
"playCount": int(m["play_count"] or 0),
|
|
"rating": rating,
|
|
"lastPlayedAt": fmt_dt(m["play_date_utc"]),
|
|
})
|
|
return updates, unmatched
|
|
|
|
|
|
def backup_db(db_path):
|
|
"""Copy db.sqlite (+ -wal, -shm) next to it under backups/<timestamp>/."""
|
|
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
backup_dir = os.path.join(os.path.dirname(db_path), "backups", stamp)
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|
for suffix in ("", "-wal", "-shm"):
|
|
src = db_path + suffix
|
|
if os.path.exists(src):
|
|
shutil.copy2(src, os.path.join(backup_dir, os.path.basename(src)))
|
|
return backup_dir
|
|
|
|
|
|
def apply_updates(db_path, updates):
|
|
"""Write updates in a single transaction, then checkpoint the WAL."""
|
|
con = sqlite3.connect(db_path)
|
|
try:
|
|
con.execute("BEGIN")
|
|
con.executemany(
|
|
"UPDATE tracks SET dateAdded=:dateAdded, playCount=:playCount, "
|
|
"rating=:rating, lastPlayedAt=:lastPlayedAt WHERE id=:id",
|
|
updates,
|
|
)
|
|
con.commit()
|
|
con.execute("PRAGMA wal_checkpoint(TRUNCATE)")
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def fetch_db_rows(db_path):
|
|
con = sqlite3.connect(db_path)
|
|
try:
|
|
return con.execute("SELECT id, fileURL, dateAdded FROM tracks").fetchall()
|
|
finally:
|
|
con.close()
|
|
|
|
|
|
def run(xml_path, db_path, apply):
|
|
music_map = parse_library(xml_path)
|
|
db_rows = fetch_db_rows(db_path)
|
|
updates, unmatched = build_updates(db_rows, music_map)
|
|
|
|
matched_paths = {norm_path(u["file_url"]) for u in updates}
|
|
unmatched_xml = [p for p in music_map if p not in matched_paths]
|
|
|
|
print(f"DB tracks: {len(db_rows)}")
|
|
print(f"Local files in XML: {len(music_map)}")
|
|
print(f"Matched (will set): {len(updates)}")
|
|
print(f"In DB, not in XML: {len(unmatched)}")
|
|
print(f"In XML, not in DB: {len(unmatched_xml)}")
|
|
print()
|
|
|
|
for u in updates[:10]:
|
|
name = os.path.basename(norm_path(u["file_url"]))
|
|
print(f" • {name}")
|
|
print(f" dateAdded {u['old_date']} -> {u['dateAdded']}")
|
|
print(f" playCount={u['playCount']} rating={u['rating']} "
|
|
f"lastPlayedAt={u['lastPlayedAt']}")
|
|
if len(updates) > 10:
|
|
print(f" ... and {len(updates) - 10} more")
|
|
print()
|
|
|
|
if unmatched[:5]:
|
|
print("Sample of DB tracks with no XML match (left untouched):")
|
|
for row_id, file_url in unmatched[:5]:
|
|
print(f" - [{row_id}] {os.path.basename(norm_path(file_url))}")
|
|
print()
|
|
|
|
if not apply:
|
|
print("DRY RUN — nothing written. Re-run with --apply to commit these changes.")
|
|
return
|
|
|
|
if not updates:
|
|
print("Nothing to apply.")
|
|
return
|
|
|
|
backup_dir = backup_db(db_path)
|
|
print(f"Backup written to: {backup_dir}")
|
|
apply_updates(db_path, updates)
|
|
print(f"Applied {len(updates)} updates to {db_path}")
|
|
|
|
|
|
def self_test():
|
|
"""Fast smoke check of the matching + formatting core."""
|
|
nfc = norm_path("file:///Users/x/Mu%CC%81sica/Cafe%CC%81.mp3") # NFD source
|
|
nfd = norm_path("file://localhost/Users/x/M%C3%BAsica/Caf%C3%A9.mp3") # NFC source
|
|
assert nfc == nfd == "/Users/x/Música/Café.mp3", (nfc, nfd)
|
|
assert norm_path("file:///a/b%20c%23d.mp3") == "/a/b c#d.mp3"
|
|
|
|
music = {"/a/song.mp3": {
|
|
"date_added": datetime(2021, 3, 14, 9, 26, 53),
|
|
"play_count": 7, "rating": 80,
|
|
"play_date_utc": datetime(2024, 1, 2, 3, 4, 5),
|
|
}}
|
|
rows = [(1, "file:///a/song.mp3", "2026-05-24 06:46:01.713"),
|
|
(2, "file:///a/missing.mp3", "2026-05-24 06:46:01.999")]
|
|
updates, unmatched = build_updates(rows, music)
|
|
assert len(updates) == 1 and len(unmatched) == 1
|
|
u = updates[0]
|
|
assert u["dateAdded"] == "2021-03-14 09:26:53.000", u["dateAdded"]
|
|
assert u["playCount"] == 7 and u["rating"] == 4
|
|
assert u["lastPlayedAt"] == "2024-01-02 03:04:05.000"
|
|
print("self-test OK")
|
|
|
|
|
|
def main(argv=None):
|
|
p = argparse.ArgumentParser(description=__doc__)
|
|
p.add_argument("--xml", help="Path to Music.app Library export (XML plist).")
|
|
p.add_argument("--db", default=DEFAULT_DB, help=f"App DB path (default: {DEFAULT_DB})")
|
|
p.add_argument("--apply", action="store_true", help="Write changes (default: dry run).")
|
|
p.add_argument("--self-test", action="store_true", help="Run the built-in smoke test.")
|
|
args = p.parse_args(argv)
|
|
|
|
if args.self_test:
|
|
self_test()
|
|
return 0
|
|
|
|
if not args.xml:
|
|
p.error("--xml is required (export it via Music.app: File > Library > Export Library...)")
|
|
if not os.path.exists(args.xml):
|
|
p.error(f"XML not found: {args.xml}")
|
|
if not os.path.exists(args.db):
|
|
p.error(f"DB not found: {args.db}")
|
|
|
|
run(args.xml, args.db, args.apply)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|
|
|