#!/usr/bin/env python3 """Collect network-facing Nix package/library dependency metadata for fern/eisen. The script intentionally starts from explicit service-facing roots instead of the full NixOS closure. The full closure includes desktop/session packages and base system plumbing that are not meaningfully "reachable through network". """ from __future__ import annotations import argparse import csv import json import os import re import subprocess import sys import time import urllib.error import urllib.parse import urllib.request from collections import deque from pathlib import Path from typing import Any REPO = Path(__file__).resolve().parents[1] OUT = REPO / "analysis" HTTP_TIMEOUT = 8 # Higher numbers are processed first. These are the Internet/LAN/Tailscale-facing # services and containers configured by servers/fern and servers/eisen. ROOTS = [ (100, "fern", "service", "caddy", "config.services.caddy.package"), (98, "fern", "service", "openssh", "config.programs.ssh.package"), (97, "fern", "service", "llama-swap", "config.services.llama-swap.package"), (96, "fern", "service", "llama-cpp-server", "pkgs.llama-cpp"), (94, "fern", "service", "nix-serve", "config.services.nix-serve.package"), (92, "fern", "service", "steam-network-runtime", "config.programs.steam.package"), (90, "fern", "service", "kdeconnect", "pkgs.kdePackages.kdeconnect-kde"), (88, "fern", "service", "openrgb", "config.services.hardware.openrgb.package"), (86, "fern", "service", "docker", "config.virtualisation.docker.package"), (100, "eisen", "service", "caddy", "config.services.caddy.package"), (99, "eisen", "service", "tailscale", "config.services.tailscale.package"), (98, "eisen", "service", "openssh", "config.programs.ssh.package"), (97, "eisen", "service", "jellyfin", "config.services.jellyfin.package"), (96, "eisen", "service", "sonarr", "config.services.sonarr.package"), (95, "eisen", "service", "radarr", "config.services.radarr.package"), (94, "eisen", "service", "prowlarr", "config.services.prowlarr.package"), (93, "eisen", "service", "karakeep", "config.services.karakeep.package"), (92, "eisen", "service", "uptime-kuma", "config.services.uptime-kuma.package"), (91, "eisen", "service", "grafana", "config.services.grafana.package"), (90, "eisen", "service", "prometheus", "config.services.prometheus.package"), (89, "eisen", "service", "prometheus-node-exporter", "pkgs.prometheus-node-exporter"), (88, "eisen", "service", "exportarr-sonarr", "pkgs.exportarr"), (87, "eisen", "service", "exportarr-radarr", "pkgs.exportarr"), (86, "eisen", "service", "exportarr-prowlarr", "pkgs.exportarr"), (85, "eisen", "service", "glance", "config.services.glance.package"), (84, "eisen", "service", "dnsmasq", "config.services.dnsmasq.package"), (83, "eisen", "service", "docker", "config.virtualisation.docker.package"), (82, "eisen", "service", "llama-swap-exporter", "pkgs.callPackage ./servers/eisen/llama-swap-exporter/default.nix { }"), ] CONTAINER_ROOTS = [ (80, "eisen", "container", "gluetun", "qmcgaw/gluetun"), (79, "eisen", "container", "qbittorrent", "lscr.io/linuxserver/qbittorrent"), (78, "eisen", "container", "jackett", "lscr.io/linuxserver/jackett"), (77, "eisen", "container", "prometheus-qb", "ghcr.io/esanchezm/prometheus-qbittorrent-exporter"), (76, "eisen", "container", "tolgee", "tolgee/tolgee"), ] GITHUB_RE = re.compile(r"github\.com[:/](?P[^/]+)/(?P[^/#?]+?)(?:\.git|/|#|\?|$)") STORE_HASH_PREFIX_RE = re.compile(r"^[0-9a-z]{32}-(?P.+)$") COMMON_UPSTREAMS = { "acl": ("https://git.savannah.nongnu.org/cgit/acl.git", "C"), "attr": ("https://git.savannah.nongnu.org/cgit/attr.git", "C"), "avahi": ("https://github.com/avahi/avahi", "C"), "bluez": ("https://git.kernel.org/pub/scm/bluetooth/bluez.git", "C"), "bzip2": ("https://sourceware.org/git/bzip2.git", "C"), "curl": ("https://github.com/curl/curl", "C"), "dbus": ("https://gitlab.freedesktop.org/dbus/dbus", "C"), "double-conversion": ("https://github.com/google/double-conversion", "C++"), "ffmpeg": ("https://git.ffmpeg.org/ffmpeg.git", "C"), "fuse": ("https://github.com/libfuse/libfuse", "C"), "glib": ("https://gitlab.gnome.org/GNOME/glib", "C"), "glibc": ("https://sourceware.org/git/glibc.git", "C"), "graphviz": ("https://gitlab.com/graphviz/graphviz", "C"), "gtk+3": ("https://gitlab.gnome.org/GNOME/gtk", "C"), "libarchive": ("https://github.com/libarchive/libarchive", "C"), "libbpf": ("https://github.com/libbpf/libbpf", "C"), "libbsd": ("https://gitlab.freedesktop.org/libbsd/libbsd", "C"), "libcbor": ("https://github.com/PJK/libcbor", "C"), "libedit": ("https://www.thrysoee.dk/editline/", "C"), "libfido2": ("https://github.com/Yubico/libfido2", "C"), "libmnl": ("https://git.netfilter.org/libmnl", "C"), "libnftnl": ("https://git.netfilter.org/libnftnl", "C"), "libpcap": ("https://github.com/the-tcpdump-group/libpcap", "C"), "libuv": ("https://github.com/libuv/libuv", "C"), "libxml2": ("https://gitlab.gnome.org/GNOME/libxml2", "C"), "libxslt": ("https://gitlab.gnome.org/GNOME/libxslt", "C"), "ncurses": ("https://invisible-island.net/ncurses/", "C"), "oniguruma": ("https://github.com/kkos/oniguruma", "C"), "openssl": ("https://github.com/openssl/openssl", "C"), "pcre2": ("https://github.com/PCRE2Project/pcre2", "C"), "pcsclite": ("https://pcsclite.apdu.fr/", "C"), "rhash": ("https://github.com/rhash/RHash", "C"), "sqlite": ("https://sqlite.org/src", "C"), "systemd": ("https://github.com/systemd/systemd", "C"), "xz": ("https://git.tukaani.org/xz.git", "C"), "zlib": ("https://github.com/madler/zlib", "C"), } NUGET_NAME_PREFIXES = ( "AngleSharp", "AspNetCore", "Azure.", "BouncyCastle", "Castle.", "Dapper", "DryIoc", "Fluent", "HarfBuzzSharp", "ICU4N", "Jellyfin.", "MailKit", "MetaBrainz.", "Microsoft.", "Mono.", "NETStandard.", "Newtonsoft.", "NLog", "NodaTime", "NuGet.", "NUnit", "RestSharp", "Serilog", "Servarr.", "SkiaSharp", "SQLitePCLRaw", "StyleCop.", "System.", "runtime.", ) NUGET_REPO_OVERRIDES = { "AngleSharp": "https://github.com/AngleSharp/AngleSharp", "AngleSharp.Xml": "https://github.com/AngleSharp/AngleSharp.Xml", "BitFaster.Caching": "https://github.com/bitfaster/BitFaster.Caching", "BlurHashSharp": "https://github.com/MarkusPalcer/BlurHashSharp", "BlurHashSharp.SkiaSharp": "https://github.com/MarkusPalcer/BlurHashSharp", "BouncyCastle.Cryptography": "https://github.com/bcgit/bc-csharp", "Castle.Core": "https://github.com/castleproject/Core", "Dapper": "https://github.com/DapperLib/Dapper", "DryIoc.dll": "https://github.com/dadhi/DryIoc", "DryIoc.Microsoft.DependencyInjection": "https://github.com/dadhi/DryIoc", "FluentAssertions": "https://github.com/fluentassertions/fluentassertions", "FluentMigrator": "https://github.com/fluentmigrator/fluentmigrator", "FluentMigrator.Abstractions": "https://github.com/fluentmigrator/fluentmigrator", "FluentMigrator.Extensions.Postgres": "https://github.com/fluentmigrator/fluentmigrator", "FluentMigrator.Runner.Core": "https://github.com/fluentmigrator/fluentmigrator", "FluentMigrator.Runner.Postgres": "https://github.com/fluentmigrator/fluentmigrator", "FluentMigrator.Runner.SQLite": "https://github.com/fluentmigrator/fluentmigrator", "FluentValidation": "https://github.com/FluentValidation/FluentValidation", "HarfBuzzSharp": "https://github.com/mono/SkiaSharp", "HarfBuzzSharp.NativeAssets.Linux": "https://github.com/mono/SkiaSharp", "HarfBuzzSharp.NativeAssets.Win32": "https://github.com/mono/SkiaSharp", "HarfBuzzSharp.NativeAssets.macOS": "https://github.com/mono/SkiaSharp", "ICU4N": "https://github.com/NightOwl888/ICU4N", "ICU4N.Transliterator": "https://github.com/NightOwl888/ICU4N", "MailKit": "https://github.com/jstedfast/MailKit", "MetaBrainz.Common": "https://github.com/Zastai/MetaBrainz.Common", "MetaBrainz.Common.Json": "https://github.com/Zastai/MetaBrainz.Common.Json", "MetaBrainz.MusicBrainz": "https://github.com/Zastai/MetaBrainz.MusicBrainz", "Microsoft.Data.SqlClient": "https://github.com/dotnet/SqlClient", "Microsoft.Data.SqlClient.SNI.runtime": "https://github.com/dotnet/SqlClient", "Microsoft.Data.Sqlite": "https://github.com/dotnet/efcore", "Microsoft.Data.Sqlite.Core": "https://github.com/dotnet/efcore", "Newtonsoft.Json": "https://github.com/JamesNK/Newtonsoft.Json", "NLog": "https://github.com/NLog/NLog", "NodaTime": "https://github.com/nodatime/nodatime", "NUnit": "https://github.com/nunit/nunit", "NUnit3TestAdapter": "https://github.com/nunit/nunit3-vs-adapter", "RestSharp": "https://github.com/restsharp/RestSharp", "RestSharp.Serializers.SystemTextJson": "https://github.com/restsharp/RestSharp", "Sentry": "https://github.com/getsentry/sentry-dotnet", "Serilog": "https://github.com/serilog/serilog", "SkiaSharp": "https://github.com/mono/SkiaSharp", "SkiaSharp.HarfBuzz": "https://github.com/mono/SkiaSharp", "SkiaSharp.NativeAssets.Linux": "https://github.com/mono/SkiaSharp", "SkiaSharp.NativeAssets.Win32": "https://github.com/mono/SkiaSharp", "SkiaSharp.NativeAssets.macOS": "https://github.com/mono/SkiaSharp", "SQLitePCLRaw.bundle_e_sqlite3": "https://github.com/ericsink/SQLitePCL.raw", "SQLitePCLRaw.core": "https://github.com/ericsink/SQLitePCL.raw", "SQLitePCLRaw.lib.e_sqlite3": "https://github.com/ericsink/SQLitePCL.raw", "SQLitePCLRaw.provider.e_sqlite3": "https://github.com/ericsink/SQLitePCL.raw", "StyleCop.Analyzers": "https://github.com/DotNetAnalyzers/StyleCopAnalyzers", } def run(cmd: list[str], *, timeout: int = 120) -> str: proc = subprocess.run( cmd, cwd=REPO, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, ) if proc.returncode != 0: raise RuntimeError(f"command failed: {' '.join(cmd)}\n{proc.stderr}") return proc.stdout def write_json_atomic(path: Path, data: dict[str, Any]) -> None: tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n") tmp.replace(path) def nix_string(s: str) -> str: return json.dumps(s) def root_expr() -> str: rows = [] for priority, host, kind, name, expr in ROOTS: cfg = "flake.nixosConfigurations.fern" if host == "fern" else "flake.colmenaHive.nodes.eisen" rows.append( "(let node = " + cfg + "; config = node.config; pkgs = node.pkgs; pkg = " + expr + "; in mkRoot " + str(priority) + " " + nix_string(host) + " " + nix_string(kind) + " " + nix_string(name) + " pkg)" ) return """ let flake = builtins.getFlake (toString ./.); clean = s: builtins.unsafeDiscardStringContext (toString s); listOrNull = x: if builtins.isList x then map clean x else if x == null then [] else [ (clean x) ]; mkRoot = priority: host: kind: rootName: pkg: { inherit priority host kind rootName; packageName = pkg.name or rootName; pname = pkg.pname or null; version = pkg.version or null; storePath = clean pkg; drv = if pkg ? drvPath then clean pkg.drvPath else null; homepage = pkg.meta.homepage or null; description = pkg.meta.description or null; sourceUrls = listOrNull (pkg.src.urls or (pkg.src.url or null)); }; in [ """ + "\n".join(rows) + "\n]" def eval_roots() -> list[dict[str, Any]]: data = run(["nix", "eval", "--impure", "--json", "--expr", root_expr()], timeout=240) roots = json.loads(data) for priority, host, kind, name, image in CONTAINER_ROOTS: roots.append( { "priority": priority, "host": host, "kind": kind, "rootName": name, "packageName": image, "pname": name, "version": None, "storePath": None, "drv": None, "homepage": None, "description": "OCI image configured in virtualisation.oci-containers", "sourceUrls": [], "image": image, } ) return sorted(roots, key=lambda r: (-int(r["priority"]), r["host"], r["rootName"])) def derivation_show_recursive(drv: str) -> dict[str, Any]: data = run(["nix", "derivation", "show", "-r", drv], timeout=300) parsed = json.loads(data) # Nix 2.30+ returns {"version": 3, "derivations": {"basename.drv": ...}}; # older Nix returned {"/nix/store/...drv": ...}. Normalize to basename keys. derivations = parsed.get("derivations") if isinstance(parsed, dict) else None if isinstance(derivations, dict): return derivations return {Path(k).name: v for k, v in parsed.items()} def drv_meta(drv: str, all_drvs: dict[str, Any]) -> dict[str, Any]: item = all_drvs.get(Path(drv).name, all_drvs.get(drv, {})) env = item.get("env", {}) name = clean_library_name(env.get("pname") or env.get("name") or Path(drv).name.removesuffix(".drv")) return { "name": name, "version": env.get("version"), "homepage": env.get("homepage") or env.get("meta.homepage"), "description": env.get("meta.description") or env.get("description"), "source_link": source_from_env(env), "language": infer_language(name, env), } def clean_library_name(name: str) -> str: match = STORE_HASH_PREFIX_RE.match(name) if match: name = match.group("name") for suffix in (".nupkg", ".tar.gz", ".tar.xz", ".zip", ".drv"): if name.endswith(suffix): name = name[: -len(suffix)] return name def source_from_env(env: dict[str, str]) -> str | None: for key in ("src", "urls", "url", "cargoDeps", "npmDeps", "goModules"): val = env.get(key) if val and ("http" in val or "github" in val): return val for key, val in env.items(): if key.lower().endswith("url") and val and ("http" in val or "github" in val): return val return None def infer_language(name: str, env: dict[str, str]) -> str | None: text = " ".join([name, env.get("nativeBuildInputs", ""), env.get("buildInputs", "")]).lower() if "python" in text or name.startswith("python"): return "Python" if "cargo" in text or "rustc" in text: return "Rust" if "go" in text and ("gomod" in text or "goModules" in env): return "Go" if "node" in text or "npm" in text or "pnpm" in text or "yarn" in text: return "JavaScript/TypeScript" if "cmake" in text or "gcc" in text or "clang" in text: return "C/C++" if name.startswith(("qt", "k", "lib")): return "C/C++" return None def static_upstream(name: str) -> dict[str, str] | None: base = re.sub(r"-\d+(?:\.\d+).*$", "", name) if base in COMMON_UPSTREAMS: source, language = COMMON_UPSTREAMS[base] return {"source_link": source, "language": language} if name.startswith("qt") or name in {"qca", "phonon", "poppler"}: return {"source_link": f"https://code.qt.io/cgit/qt/{base}.git", "language": "C++"} if name.startswith("gst-") or name == "gstreamer": project = "gstreamer" if name == "gstreamer" else base return {"source_link": f"https://gitlab.freedesktop.org/gstreamer/{project}", "language": "C"} kde_prefixes = ( "karchive", "kauth", "kbookmarks", "kcmutils", "kcodecs", "kcompletion", "kconfig", "kconfigwidgets", "kcoreaddons", "kcrash", "kdbusaddons", "kdeclarative", "kded", "kdnssd", "kdoctools", "kfilemetadata", "kguiaddons", "ki18n", "kiconthemes", "kidletime", "kio", "kirigami", "kitemmodels", "kitemviews", "kjobwidgets", "knotifications", "kpackage", "kparts", "kpeople", "kpty", "kservice", "kstatusnotifieritem", "ksvg", "ktextwidgets", "kwallet", "kwidgetsaddons", "kwindowsystem", "kxmlgui", "solid", "sonnet", "syntax-highlighting", ) if base.startswith(kde_prefixes): return {"source_link": f"https://invent.kde.org/frameworks/{base}", "language": "C++"} return None def github_repo(*values: str | None) -> str | None: for value in values: if not value: continue match = GITHUB_RE.search(value) if match: return f"{match.group('owner')}/{match.group('repo')}" return None def noisy_for_review(row: dict[str, Any]) -> bool: name = row["library"].lower() drv_path = row.get("drv_path", "").lower() if ".nupkg" in drv_path and not row.get("version_in_use"): return True noisy_exact = { "bash", "coreutils", "coreutils-full", "stdenv-linux", "install-shell-files", "version-check-hook", "writable-tmpdir-as-home-hook", "auto-patchelf-hook", "pkg-config-wrapper", "gcc-wrapper", "gnumake", "cmake", "ninja", "patchelf", "remove-references-to", "strip-nondeterminism", } if name in noisy_exact: return True noisy_bits = ( "-source", "source-", "-go-modules", "builder.sh", "setup-hook", "-hook", ".patch", ".diff", "testdata", "fixture", ) return any(bit in name for bit in noisy_bits) def github_json(path: str) -> dict[str, Any] | None: req = urllib.request.Request( f"https://api.github.com/{path}", headers={"Accept": "application/vnd.github+json", "User-Agent": "dotfiles-analysis"}, ) try: with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as res: return json.loads(res.read().decode()) except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError): return None def http_json(url: str) -> dict[str, Any] | None: req = urllib.request.Request( url, headers={"Accept": "application/json", "User-Agent": "dotfiles-analysis"}, ) try: with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) as res: return json.loads(res.read().decode()) except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError): return None def normalize_repo_url(value: str | None) -> str | None: if not value: return None value = value.strip() if value.startswith("git+"): value = value[4:] if value.startswith("git://github.com/"): value = "https://github.com/" + value.removeprefix("git://github.com/") if value.startswith("git@github.com:"): value = "https://github.com/" + value.removeprefix("git@github.com:") if value.endswith(".git"): value = value[:-4] return value def parse_ecosystem(row: dict[str, Any]) -> tuple[str | None, str | None, str | None]: name = row["library"] version = row.get("version_in_use") or None drv = row.get("drv_path", "") is_nuget_like = name.startswith(NUGET_NAME_PREFIXES) if ".nupkg" in drv or is_nuget_like: # The derivation rows have clean name/version; the raw .nupkg rows are # filtered from review but can still be enriched in summary/deps. if not version and ".nupkg" in drv: base = clean_library_name(Path(drv).name.removesuffix(".drv")) m = re.match(r"(.+)\.(\d+(?:\.\d+)+(?:[-.][0-9A-Za-z]+)*)$", base) if m: name, version = m.group(1), m.group(2) return "nuget", name, version if row["root_name"] in {"nix-serve"} and "perl5." in drv: return "cpan", name, version if name.startswith("python") or "python" in drv: py_name = re.sub(r"^python\d+(?:\.\d+)?-", "", name) return "pypi", py_name, version if "node_modules" in row.get("dependency_path", "") or row["root_name"] in {"karakeep", "uptime-kuma"}: return "npm", name, version if "cargo" in name.lower() or "rust" in drv.lower(): return "crates", name, version return None, None, None def apply_ecosystem_overrides(ecosystem: str, package: str, result: dict[str, Any]) -> dict[str, Any]: if ecosystem == "nuget": source = NUGET_REPO_OVERRIDES.get(package) if not source and package.startswith("Microsoft.AspNetCore."): source = "https://github.com/dotnet/aspnetcore" if not source and package.startswith("Microsoft.EntityFrameworkCore"): source = "https://github.com/dotnet/efcore" if not source and package.startswith("Microsoft.Build"): source = "https://github.com/dotnet/msbuild" if not source and package.startswith("Microsoft.Identity.Client"): source = "https://github.com/AzureAD/microsoft-authentication-library-for-dotnet" if not source and (package.startswith("Microsoft.") or package.startswith("System.") or package.startswith("runtime.")): source = "https://github.com/dotnet/runtime" if source: result["source_link"] = source result["github_repo"] = github_repo(source) result.setdefault("language", "C#") return result def release_date_from_pypi(files: list[dict[str, Any]]) -> str | None: dates = [f.get("upload_time_iso_8601") for f in files if f.get("upload_time_iso_8601")] return min(dates) if dates else None def enrich_ecosystem(ecosystem: str, package: str, version: str | None, cache: dict[str, Any]) -> dict[str, Any]: key = ecosystem_cache_key(ecosystem, package, version) if key in cache: return cache[key] result: dict[str, Any] = {"ecosystem": ecosystem} quoted = urllib.parse.quote(package, safe="") if ecosystem == "nuget": result["language"] = "C#" if version: data = http_json(f"https://api.nuget.org/v3/registration5-semver1/{package.lower()}/{version.lower()}.json") entry = (data or {}).get("catalogEntry", {}) if isinstance(entry, str): entry = http_json(entry) or {} repo = entry.get("repository") or {} repo_url = repo.get("url") if isinstance(repo, dict) else None repo_url = normalize_repo_url(repo_url or entry.get("repositoryUrl") or entry.get("projectUrl")) result.update( { "source_link": repo_url or entry.get("projectUrl"), "release_date": entry.get("published"), } ) index = http_json(f"https://api.nuget.org/v3-flatcontainer/{package.lower()}/index.json") versions = (index or {}).get("versions") or [] if versions: result["latest_version"] = versions[-1] elif ecosystem == "npm": data = http_json(f"https://registry.npmjs.org/{quoted}") or {} info = data.get("versions", {}).get(version or "", {}) if version else {} repo = info.get("repository") or data.get("repository") or {} repo_url = repo.get("url") if isinstance(repo, dict) else repo latest = (data.get("dist-tags") or {}).get("latest") result.update( { "source_link": normalize_repo_url(repo_url) or data.get("homepage"), "latest_version": latest, "release_date": (data.get("time") or {}).get(version or ""), "latest_release_date": (data.get("time") or {}).get(latest or ""), "language": "JavaScript/TypeScript", } ) elif ecosystem == "pypi": data = http_json(f"https://pypi.org/pypi/{quoted}/json") or {} info = data.get("info", {}) urls = info.get("project_urls") or {} source = urls.get("Source") or urls.get("Source Code") or urls.get("Homepage") or info.get("home_page") or info.get("package_url") latest = info.get("version") result.update( { "source_link": normalize_repo_url(source), "latest_version": latest, "release_date": release_date_from_pypi((data.get("releases") or {}).get(version or "", [])), "latest_release_date": release_date_from_pypi((data.get("releases") or {}).get(latest or "", [])), "language": "Python", } ) elif ecosystem == "crates": data = http_json(f"https://crates.io/api/v1/crates/{quoted}") or {} crate = data.get("crate", {}) result.update( { "source_link": normalize_repo_url(crate.get("repository") or crate.get("homepage")), "latest_version": crate.get("max_stable_version") or crate.get("newest_version"), "latest_release_date": crate.get("updated_at"), "language": "Rust", } ) elif ecosystem == "cpan": dist = package.replace("::", "-") result.update( { "source_link": f"https://metacpan.org/pod/{package}", "language": "Perl", } ) data = http_json(f"https://fastapi.metacpan.org/v1/release/{urllib.parse.quote(dist, safe='')}") or {} resources = ((data.get("metadata") or {}).get("resources") or {}) repo = resources.get("repository") or {} repo_url = repo.get("url") if isinstance(repo, dict) else repo result.update( { "source_link": normalize_repo_url(repo_url) or result["source_link"], "latest_version": data.get("version"), "latest_release_date": data.get("date"), } ) if str(data.get("version")) == str(version): result["release_date"] = data.get("date") result["github_repo"] = github_repo(result.get("source_link")) result = apply_ecosystem_overrides(ecosystem, package, result) cache[key] = result return result def ecosystem_cache_key(ecosystem: str | None, package: str | None, version: str | None) -> str: return f"{ecosystem}:{package}:{version or ''}" def enrich_github(repo: str, cache: dict[str, Any], sleep: float) -> dict[str, Any]: if repo in cache: return cache[repo] data = github_json(f"repos/{repo}") or {} if sleep: time.sleep(sleep) latest = github_json(f"repos/{repo}/releases/latest") or {} if sleep: time.sleep(sleep) result = { "github_repo": repo, "github_stars": data.get("stargazers_count"), "language": data.get("language"), "source_link": data.get("html_url"), "latest_version": latest.get("tag_name"), "latest_release_date": latest.get("published_at"), } cache[repo] = result return result def walk_deps(root: dict[str, Any], all_drvs: dict[str, Any], max_depth: int) -> list[dict[str, Any]]: start = root.get("drv") if not start: return [] start_key = Path(start).name rows = [] seen = {start_key} queue = deque([(start_key, [], 0)]) while queue: drv, path, depth = queue.popleft() if depth >= max_depth: continue item = all_drvs.get(drv, {}) input_drvs = item.get("inputDrvs") or (item.get("inputs") or {}).get("drvs") or {} for dep_drv in sorted(input_drvs.keys()): dep_key = Path(dep_drv).name if dep_key in seen: continue seen.add(dep_key) meta = drv_meta(dep_key, all_drvs) static = static_upstream(meta["name"]) or {} source_link = meta["source_link"] or static.get("source_link") language = meta["language"] or static.get("language") dep_path = path + [meta["name"]] rows.append( { "host": root["host"], "root_kind": root["kind"], "root_name": root["rootName"], "root_package": root["packageName"], "library": meta["name"], "version_in_use": meta["version"], "dep_depth": depth + 1, "dependency_path": " -> ".join([root["rootName"]] + dep_path), "drv_path": dep_key, "homepage": meta["homepage"], "source_link": source_link, "language": language, "github_repo": github_repo(meta["homepage"], source_link), "github_stars": None, "ecosystem": None, "release_date": None, "latest_version": None, "latest_release_date": None, } ) queue.append((dep_key, dep_path, depth + 1)) return rows def write_csv(path: Path, rows: list[dict[str, Any]], fields: list[str]) -> None: with path.open("w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore") writer.writeheader() writer.writerows(rows) def ecosystem_priority(ecosystem: str | None) -> int: return { "cpan": 0, "npm": 1, "pypi": 2, "crates": 3, "nuget": 4, }.get(ecosystem or "", 9) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--max-roots", type=int, default=18) parser.add_argument("--max-depth", type=int, default=2) parser.add_argument("--github-limit", type=int, default=80) parser.add_argument("--github-sleep", type=float, default=0.1) parser.add_argument("--ecosystem-limit", type=int, default=400) args = parser.parse_args() OUT.mkdir(exist_ok=True) roots = eval_roots() selected = [r for r in roots if r.get("drv")][: args.max_roots] dep_rows: list[dict[str, Any]] = [] for root in selected: print(f"walking {root['host']}:{root['rootName']} {root['packageName']}", file=sys.stderr) try: all_drvs = derivation_show_recursive(root["drv"]) except RuntimeError as exc: print(exc, file=sys.stderr) continue dep_rows.extend(walk_deps(root, all_drvs, args.max_depth)) ecosystem_cache_path = OUT / "ecosystem-cache.json" ecosystem_cache = json.loads(ecosystem_cache_path.read_text()) if ecosystem_cache_path.exists() else {} ecosystem_keys = [] ecosystem_key_scores: dict[tuple[str | None, str | None, str | None], tuple[int, int, str, str]] = {} ecosystem_rows: dict[tuple[str | None, str | None, str | None], list[dict[str, Any]]] = {} for row in dep_rows: ecosystem, package, version = parse_ecosystem(row) if ecosystem and package: key = (ecosystem, package, version) ecosystem_rows.setdefault(key, []).append(row) if key not in ecosystem_keys: ecosystem_keys.append(key) review_score = 0 if not noisy_for_review(row) else 1 score = (review_score, ecosystem_priority(ecosystem), package.lower(), version or "") if key not in ecosystem_key_scores or score < ecosystem_key_scores[key]: ecosystem_key_scores[key] = score ecosystem_keys.sort(key=lambda key: ecosystem_key_scores.get(key, (9, 9, "", ""))) selected_ecosystem_keys = ecosystem_keys if args.ecosystem_limit < 0 else ecosystem_keys[: args.ecosystem_limit] for idx, (ecosystem, package, version) in enumerate(selected_ecosystem_keys, start=1): if idx % 25 == 1: print(f"enriching ecosystem metadata {idx}/{len(selected_ecosystem_keys)}", file=sys.stderr) meta = enrich_ecosystem(ecosystem, package, version, ecosystem_cache) for row in ecosystem_rows.get((ecosystem, package, version), []): row.update({k: v for k, v in meta.items() if v is not None and (not row.get(k) or k in {"ecosystem", "release_date"})}) if idx % 25 == 0: write_json_atomic(ecosystem_cache_path, ecosystem_cache) selected_ecosystem_key_set = set(selected_ecosystem_keys) cached_only_keys = [ key for key in ecosystem_keys if ecosystem_cache_key(*key) in ecosystem_cache and key not in selected_ecosystem_key_set ] for ecosystem, package, version in cached_only_keys: meta = ecosystem_cache[ecosystem_cache_key(ecosystem, package, version)] for row in ecosystem_rows.get((ecosystem, package, version), []): row.update({k: v for k, v in meta.items() if v is not None and (not row.get(k) or k in {"ecosystem", "release_date"})}) write_json_atomic(ecosystem_cache_path, ecosystem_cache) cache_path = OUT / "github-cache.json" cache = json.loads(cache_path.read_text()) if cache_path.exists() else {} repos = [] for row in dep_rows: repo = row.get("github_repo") if repo and repo not in repos: repos.append(repo) for idx, repo in enumerate(repos[: args.github_limit], start=1): if idx % 25 == 1: print(f"enriching GitHub metadata {idx}/{min(len(repos), args.github_limit)}", file=sys.stderr) gh = enrich_github(repo, cache, args.github_sleep) for row in dep_rows: if row.get("github_repo") == repo: row.update({k: v for k, v in gh.items() if v is not None}) if idx % 25 == 0: write_json_atomic(cache_path, cache) for root in roots: repo = github_repo(root.get("homepage"), " ".join(root.get("sourceUrls") or [])) root["github_repo"] = repo root["github_stars"] = None root["ecosystem"] = "nix" root["release_date"] = None root["latest_version"] = None root["latest_release_date"] = None root["language"] = None if repo: gh = enrich_github(repo, cache, args.github_sleep) root.update({k: v for k, v in gh.items() if v is not None}) write_json_atomic(cache_path, cache) root_fields = [ "priority", "host", "kind", "rootName", "packageName", "pname", "version", "drv", "storePath", "homepage", "description", "sourceUrls", "image", "github_repo", "github_stars", "ecosystem", "release_date", "latest_version", "latest_release_date", "language", ] dep_fields = [ "host", "root_kind", "root_name", "root_package", "library", "version_in_use", "dep_depth", "dependency_path", "drv_path", "homepage", "source_link", "github_repo", "github_stars", "ecosystem", "release_date", "latest_version", "latest_release_date", "language", ] write_csv(OUT / "network-package-roots.csv", roots, root_fields) write_csv(OUT / "network-library-deps.csv", dep_rows, dep_fields) # One row per library, preserving the first root/path encountered. This is # convenient for hand-reviewing uncommon deps before opening the full edge CSV. summary: dict[str, dict[str, Any]] = {} for row in dep_rows: key = row["drv_path"] summary.setdefault(key, row.copy()) write_csv( OUT / "network-library-summary.csv", sorted(summary.values(), key=lambda r: (r.get("github_stars") is not None, r.get("github_stars") or 0, r["library"])), dep_fields, ) review_rows = [r for r in summary.values() if not noisy_for_review(r)] write_csv( OUT / "network-library-review.csv", sorted(review_rows, key=lambda r: (r.get("github_stars") is not None, r.get("github_stars") or 0, r["library"])), dep_fields, ) print(f"wrote {len(roots)} roots and {len(dep_rows)} dependency rows", file=sys.stderr) return 0 if __name__ == "__main__": raise SystemExit(main())