Mapping My Motorcycle Ride Through Songs

Once I finished blogging my recent ride to Kupalli and back, I thought it would be great to map the songs to the location. I was scrobbling my music to ListenBrainz, and any GPS was logged as usual. Using the good old way of using timestamps to match things, I mapped location to songs and produced a geojson. Here is the map of them. Since day 1 and day 3 were the longest, I heard as many songs as possible. I have added day 2 for completeness, but I didn’t listen to much music that day.

The songs are mainly from the playlists “Road Trip Across the World” and “Road Trip Across Karnataka“.

Mapping My Motorcycle Ride Through Songs
Mapping My Motorcycle Ride Through Songs

Day 1

Day 2

Day 3

The script is simple and straightforward.

#!/usr/bin/env -S uv run --script
# /// script
# requires-python = ">=3.9"
# dependencies = [
#   "gpxpy",
#   "requests",
# ]
# ///

import sys
import json
import os
from datetime import datetime, timezone
from bisect import bisect_left

import gpxpy
import requests


LB_ROOT = "https://api.listenbrainz.org"
USERNAME = "thejeshgn"
MAX_GAP_SECONDS = 600   # 10 min window


def load_gpx_points(path):
    with open(path, "r", encoding="utf-8") as f:
        gpx = gpxpy.parse(f)

    pts = []
    for tr in gpx.tracks:
        for seg in tr.segments:
            for p in seg.points:
                if not p.time:
                    continue

                t = p.time
                if t.tzinfo is None:
                    t = t.replace(tzinfo=timezone.utc)
                else:
                    t = t.astimezone(timezone.utc)

                pts.append(
                    {
                        "time": t,
                        "lat": p.latitude,
                        "lon": p.longitude,
                        "ele": p.elevation,
                    }
                )

    pts.sort(key=lambda x: x["time"])
    return pts


def build_thumbnail_and_track_urls(
    release_mbid: str | None,
    cover_art_url: str | None,
    recording_mbid: str | None,
    recording_msid: str | None,
):
    """
    Build thumbnail and ListenBrainz URL.

    Priority:
    - URL based on release_mbid: https://listenbrainz.org/release/<release_mbid>/
    - Fallback URL based on recording id: https://listenbrainz.org/track/<recording_*id>/
    - Thumbnail from cover_art_url, else Cover Art Archive with release_mbid
    """

    # URL: prefer release URL
    track_url = None
    rec_id = recording_mbid or recording_msid
    if rec_id:
        track_url = f"https://listenbrainz.org/track/{rec_id}/"

    # Thumbnail / cover art
    thumb = cover_art_url
    if not thumb and release_mbid:
        thumb = f"https://coverartarchive.org/release/{release_mbid}/front-250"

    return thumb, track_url


def fetch_listens(min_ts):
    url = f"{LB_ROOT}/1/user/{USERNAME}/listens"
    params = {
        "min_ts": int(min_ts),
        "count": 10000,
    }

    headers = {}
    token = os.getenv("LISTENBRAINZ_TOKEN")
    if token:
        headers["Authorization"] = f"Token {token}"

    r = requests.get(url, params=params, headers=headers, timeout=30)
    r.raise_for_status()

    listens_raw = r.json().get("payload", {}).get("listens", [])
    listens = []

    for it in listens_raw:
        ts = it.get("listened_at")
        if not ts:
            continue

        t = datetime.fromtimestamp(ts, tz=timezone.utc)
        md = it.get("track_metadata", {}) or {}
        add = md.get("mbid_mapping", {}) or {}

        release_mbid = add.get("release_mbid")
        recording_mbid = add.get("recording_mbid")
        artist_mbids = add.get("artist_mbids")
        recording_msid = add.get("recording_msid") or it.get("recording_msid")
        cover_art_url = add.get("cover_art_url")

        thumbnail_url, track_url = build_thumbnail_and_track_urls(
            release_mbid=release_mbid,
            cover_art_url=cover_art_url,
            recording_mbid=recording_mbid,
            recording_msid=recording_msid,
        )

        listens.append(
            {
                "time": t,
                "track_name": md.get("track_name"),
                "artist_name": md.get("artist_name"),
                "release_name": md.get("release_name"),
                "release_mbid": release_mbid,
                "recording_mbid": recording_mbid,
                "recording_msid": recording_msid,
                "artist_mbids": artist_mbids,
                "thumbnail_url": thumbnail_url,
                "track_url": track_url,
            }
        )

    listens.sort(key=lambda x: x["time"])
    return listens


def match(points, listens):
    """
    Match GPX points to listens, but only emit one point when the song changes.

    Logic:
    - Compress consecutive listens with the same "song key"
      (recording_mbid / recording_msid / track+artist).
    - For each such listen, pick the closest GPX point in time.
    """

    if not points or not listens:
        return []

    # Prebuild list of point times for bisect
    point_times = [p["time"] for p in points]

    def song_key(l):
        return (
            l.get("recording_mbid"),
            l.get("recording_msid"),
            l.get("track_name"),
            l.get("artist_name"),
        )

    # Keep only listens where the song actually changes
    filtered_listens = []
    last_key = None
    for l in listens:
        k = song_key(l)
        if k != last_key:
            filtered_listens.append(l)
            last_key = k

    out = []

    from bisect import bisect_left

    for l in filtered_listens:
        idx = bisect_left(point_times, l["time"])
        candidates = []
        if idx > 0:
            candidates.append(points[idx - 1])
        if idx < len(points):
            candidates.append(points[idx])

        if not candidates:
            continue

        best_p = min(
            candidates,
            key=lambda p: abs((p["time"] - l["time"]).total_seconds()),
        )

        dt = abs((best_p["time"] - l["time"]).total_seconds())
        if dt <= MAX_GAP_SECONDS:
            out.append((best_p, l))

    return out


def clean_props(d: dict) -> dict:
    """Return a copy without None values."""
    return {k: v for k, v in d.items() if v is not None}


def to_geojson(pairs):
    feats = []

    for p, l in pairs:
        base_props = {
            "gpx_time": p["time"].isoformat(),
            "gpx_elevation": p["ele"],
            "track_name": l.get("track_name"),
            "artist_name": l.get("artist_name"),
            "release_name": l.get("release_name"),
            "listened_at": l["time"].isoformat(),
            "source": "ListenBrainz",
            # important IDs
            "release_mbid": l.get("release_mbid"),
            # optional IDs
            "recording_mbid": l.get("recording_mbid"),
            "recording_msid": l.get("recording_msid"),
            "artist_mbids": l.get("artist_mbids"),
            # links and media
            "thumbnail_url": l.get("thumbnail_url"),
            "track_url": l.get("track_url"),
        }

        props = clean_props(base_props)

        feats.append(
            {
                "type": "Feature",
                "geometry": {
                    "type": "Point",
                    "coordinates": [p["lon"], p["lat"]],
                },
                "properties": props,
            }
        )

    return {"type": "FeatureCollection", "features": feats}


def main():
    if len(sys.argv) != 3:
        print("Usage: uv run gpx_lb.py <track.gpx> <out.geojson>")
        sys.exit(1)

    gpx_path = sys.argv[1]
    out_path = sys.argv[2]

    pts = load_gpx_points(gpx_path)
    if not pts:
        print("No GPX points with timestamps.")
        sys.exit(1)

    start_ts = int(pts[0]["time"].timestamp())
    listens = fetch_listens(start_ts - MAX_GAP_SECONDS)

    if not listens:
        print("No listens returned from ListenBrainz for that period.")
        sys.exit(0)

    pairs = match(pts, listens)
    gj = to_geojson(pairs)

    with open(out_path, "w", encoding="utf-8") as f:
        json.dump(gj, f, ensure_ascii=False, indent=2)

    print(f"Created {len(gj['features'])} features → {out_path}")


if __name__ == "__main__":
    main()


You can read this blog using RSS Feed. But if you are the person who loves getting emails, then you can join my readers by signing up.

Join 2,256 other subscribers

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.