format issues + socket

This commit is contained in:
Daniel Bulant 2026-04-21 22:01:53 +02:00
parent 7738645a69
commit f79a9893a1
No known key found for this signature in database
16 changed files with 1984 additions and 1654 deletions

6
api/AGENTS.md Normal file
View file

@ -0,0 +1,6 @@
Run biome and typescript checks after your changes:
```
bun x biome ci
bun x tsc --noEmit
```

View file

@ -1,11 +1,17 @@
import { defineConfig } from "drizzle-kit"; import { defineConfig } from "drizzle-kit";
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
throw new Error("Missing required env var: DATABASE_URL");
}
export default defineConfig({ export default defineConfig({
out: "./drizzle", out: "./drizzle",
schema: "./src/db/schema.ts", schema: "./src/db/schema.ts",
dialect: "postgresql", dialect: "postgresql",
dbCredentials: { dbCredentials: {
url: process.env.DATABASE_URL!, url: databaseUrl,
}, },
schemaFilter: ["public"], schemaFilter: ["public"],
}); });

View file

@ -1,23 +1,24 @@
{ {
"name": "api", "name": "api",
"module": "index.ts", "module": "index.ts",
"type": "module", "type": "module",
"private": true, "private": true,
"main": "src/index.ts", "main": "src/index.ts",
"scripts": { "scripts": {
"dev": "bun run --watch src/index.ts" "dev": "bun run --watch src/index.ts",
}, "typecheck": "tsc --noEmit"
"devDependencies": { },
"@types/bun": "latest" "devDependencies": {
}, "@types/bun": "latest"
"peerDependencies": { },
"typescript": "^5" "peerDependencies": {
}, "typescript": "^5"
"dependencies": { },
"@dbos-inc/dbos-sdk": "^4.14.6", "dependencies": {
"@spotify/web-api-ts-sdk": "^1.2.0", "@dbos-inc/dbos-sdk": "^4.14.6",
"@statsfm/statsfm.js": "github.com:statsfm/statsfm.js", "@spotify/web-api-ts-sdk": "^1.2.0",
"better-auth": "^1.6.5", "@statsfm/statsfm.js": "github.com:statsfm/statsfm.js",
"elysia": "catalog:" "better-auth": "^1.6.5",
} "elysia": "catalog:"
}
} }

View file

@ -1,59 +1,65 @@
import { SpotifyApi } from "@spotify/web-api-ts-sdk";
import { betterAuth } from "better-auth"; import { betterAuth } from "better-auth";
import { drizzleAdapter } from "better-auth/adapters/drizzle"; import { drizzleAdapter } from "better-auth/adapters/drizzle";
import Elysia from "elysia";
import { db } from "./db"; import { db } from "./db";
import Elysia, { status, type Context } from "elysia";
import * as schema from "./db/auth-schema"; import * as schema from "./db/auth-schema";
import { SpotifyApi } from "@spotify/web-api-ts-sdk";
export const SPOTIFY_CLIENT_ID = process.env.SPOTIFY_CLIENT_ID!; const requireEnv = (name: string): string => {
export const SPOTIFY_CLIENT_SECRET = process.env.SPOTIFY_CLIENT_SECRET!; const value = process.env[name];
if (!value) throw new Error(`Missing required env var: ${name}`);
return value;
};
export const SPOTIFY_CLIENT_ID = requireEnv("SPOTIFY_CLIENT_ID");
export const SPOTIFY_CLIENT_SECRET = requireEnv("SPOTIFY_CLIENT_SECRET");
export const defaultSdk = SpotifyApi.withClientCredentials( export const defaultSdk = SpotifyApi.withClientCredentials(
SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_ID,
SPOTIFY_CLIENT_SECRET, SPOTIFY_CLIENT_SECRET,
); );
export const auth = betterAuth({ export const auth = betterAuth({
database: drizzleAdapter(db, { database: drizzleAdapter(db, {
provider: "pg", provider: "pg",
schema, schema,
}), }),
socialProviders: { socialProviders: {
spotify: { spotify: {
clientId: SPOTIFY_CLIENT_ID, clientId: SPOTIFY_CLIENT_ID,
clientSecret: SPOTIFY_CLIENT_SECRET, clientSecret: SPOTIFY_CLIENT_SECRET,
scope: [ scope: [
"user-read-playback-state", "user-read-playback-state",
"user-read-currently-playing", "user-read-currently-playing",
"user-modify-playback-state", "user-modify-playback-state",
"playlist-read-private", "playlist-read-private",
"playlist-read-collaborative", "playlist-read-collaborative",
"user-follow-read", "user-follow-read",
"user-top-read", "user-top-read",
"user-read-recently-played", "user-read-recently-played",
"user-library-read", "user-library-read",
// "user-personalized", // "user-personalized",
"user-read-email", "user-read-email",
], ],
}, },
}, },
}); });
export const betterAuthElysia = new Elysia({ name: "better-auth" }) export const betterAuthElysia = new Elysia({ name: "better-auth" })
.mount(auth.handler) .mount(auth.handler)
.macro({ .macro({
auth: { auth: {
async resolve({ status, request: { headers } }) { async resolve({ status, request: { headers } }) {
const session = await auth.api.getSession({ const session = await auth.api.getSession({
headers, headers,
}); });
if (!session) return status(401); if (!session) return status(401);
return { return {
user: session.user, user: session.user,
session: session.session, session: session.session,
}; };
}, },
}, },
}); });

View file

@ -1,93 +1,93 @@
import { relations } from "drizzle-orm/_relations"; import { relations } from "drizzle-orm/_relations";
import { pgTable, text, timestamp, boolean, index } from "drizzle-orm/pg-core"; import { boolean, index, pgTable, text, timestamp } from "drizzle-orm/pg-core";
export const user = pgTable("user", { export const user = pgTable("user", {
id: text("id").primaryKey(), id: text("id").primaryKey(),
name: text("name").notNull(), name: text("name").notNull(),
email: text("email").notNull().unique(), email: text("email").notNull().unique(),
emailVerified: boolean("email_verified").default(false).notNull(), emailVerified: boolean("email_verified").default(false).notNull(),
image: text("image"), image: text("image"),
createdAt: timestamp("created_at").defaultNow().notNull(), createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at") updatedAt: timestamp("updated_at")
.defaultNow() .defaultNow()
.$onUpdate(() => /* @__PURE__ */ new Date()) .$onUpdate(() => /* @__PURE__ */ new Date())
.notNull(), .notNull(),
}); });
export const session = pgTable( export const session = pgTable(
"session", "session",
{ {
id: text("id").primaryKey(), id: text("id").primaryKey(),
expiresAt: timestamp("expires_at").notNull(), expiresAt: timestamp("expires_at").notNull(),
token: text("token").notNull().unique(), token: text("token").notNull().unique(),
createdAt: timestamp("created_at").defaultNow().notNull(), createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at") updatedAt: timestamp("updated_at")
.$onUpdate(() => /* @__PURE__ */ new Date()) .$onUpdate(() => /* @__PURE__ */ new Date())
.notNull(), .notNull(),
ipAddress: text("ip_address"), ipAddress: text("ip_address"),
userAgent: text("user_agent"), userAgent: text("user_agent"),
userId: text("user_id") userId: text("user_id")
.notNull() .notNull()
.references(() => user.id, { onDelete: "cascade" }), .references(() => user.id, { onDelete: "cascade" }),
}, },
(table) => [index("session_userId_idx").on(table.userId)], (table) => [index("session_userId_idx").on(table.userId)],
); );
export const account = pgTable( export const account = pgTable(
"account", "account",
{ {
id: text("id").primaryKey(), id: text("id").primaryKey(),
accountId: text("account_id").notNull(), accountId: text("account_id").notNull(),
providerId: text("provider_id").notNull(), providerId: text("provider_id").notNull(),
userId: text("user_id") userId: text("user_id")
.notNull() .notNull()
.references(() => user.id, { onDelete: "cascade" }), .references(() => user.id, { onDelete: "cascade" }),
accessToken: text("access_token"), accessToken: text("access_token"),
refreshToken: text("refresh_token"), refreshToken: text("refresh_token"),
idToken: text("id_token"), idToken: text("id_token"),
accessTokenExpiresAt: timestamp("access_token_expires_at"), accessTokenExpiresAt: timestamp("access_token_expires_at"),
refreshTokenExpiresAt: timestamp("refresh_token_expires_at"), refreshTokenExpiresAt: timestamp("refresh_token_expires_at"),
scope: text("scope"), scope: text("scope"),
password: text("password"), password: text("password"),
createdAt: timestamp("created_at").defaultNow().notNull(), createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at") updatedAt: timestamp("updated_at")
.$onUpdate(() => /* @__PURE__ */ new Date()) .$onUpdate(() => /* @__PURE__ */ new Date())
.notNull(), .notNull(),
}, },
(table) => [index("account_userId_idx").on(table.userId)], (table) => [index("account_userId_idx").on(table.userId)],
); );
export const verification = pgTable( export const verification = pgTable(
"verification", "verification",
{ {
id: text("id").primaryKey(), id: text("id").primaryKey(),
identifier: text("identifier").notNull(), identifier: text("identifier").notNull(),
value: text("value").notNull(), value: text("value").notNull(),
expiresAt: timestamp("expires_at").notNull(), expiresAt: timestamp("expires_at").notNull(),
createdAt: timestamp("created_at").defaultNow().notNull(), createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at") updatedAt: timestamp("updated_at")
.defaultNow() .defaultNow()
.$onUpdate(() => /* @__PURE__ */ new Date()) .$onUpdate(() => /* @__PURE__ */ new Date())
.notNull(), .notNull(),
}, },
(table) => [index("verification_identifier_idx").on(table.identifier)], (table) => [index("verification_identifier_idx").on(table.identifier)],
); );
export const userRelations = relations(user, ({ many }) => ({ export const userRelations = relations(user, ({ many }) => ({
sessions: many(session), sessions: many(session),
accounts: many(account), accounts: many(account),
})); }));
export const sessionRelations = relations(session, ({ one }) => ({ export const sessionRelations = relations(session, ({ one }) => ({
user: one(user, { user: one(user, {
fields: [session.userId], fields: [session.userId],
references: [user.id], references: [user.id],
}), }),
})); }));
export const accountRelations = relations(account, ({ one }) => ({ export const accountRelations = relations(account, ({ one }) => ({
user: one(user, { user: one(user, {
fields: [account.userId], fields: [account.userId],
references: [user.id], references: [user.id],
}), }),
})); }));

View file

@ -1,4 +1,10 @@
import { drizzle } from "drizzle-orm/node-postgres"; import { drizzle } from "drizzle-orm/node-postgres";
import { relations } from "./schema"; import { relations } from "./schema";
export const db = drizzle(process.env.DATABASE_URL!, { relations }); const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
throw new Error("Missing required env var: DATABASE_URL");
}
export const db = drizzle(databaseUrl, { relations });

File diff suppressed because it is too large Load diff

View file

@ -1,488 +1,504 @@
import type { import type {
Album, Album,
Artist, Artist,
Image, Image,
PlayHistory, PlayHistory,
SavedAlbum, SavedAlbum,
SavedTrack, SavedTrack,
SimplifiedAlbum, SimplifiedAlbum,
SimplifiedArtist, SimplifiedArtist,
Track, Track,
} from "@spotify/web-api-ts-sdk"; } from "@spotify/web-api-ts-sdk";
import { db } from ".";
import {
album,
albumArtist,
albumGenre,
albumImage,
artist,
artistGenre,
artistImage,
followedArtist,
genre,
playbackHistory,
platformImage,
savedAlbum,
savedTrack,
topArtist,
topTrack,
track,
trackArtist,
} from "./schema";
import { and, eq, inArray, sql } from "drizzle-orm"; import { and, eq, inArray, sql } from "drizzle-orm";
import { defaultSdk } from "../auth"; import { defaultSdk } from "../auth";
import { db } from ".";
import {
album,
albumArtist,
albumGenre,
albumImage,
artist,
artistGenre,
artistImage,
followedArtist,
genre,
platformImage,
playbackHistory,
savedAlbum,
savedTrack,
topArtist,
topTrack,
track,
trackArtist,
} from "./schema";
export const PLATFORM_SPOTIFY = "spotify" as const; export const PLATFORM_SPOTIFY = "spotify" as const;
type DbClient = typeof db; type DbClient = typeof db;
type DbTransaction = Parameters<typeof db.transaction>[0] extends ( type DbTransaction = Parameters<typeof db.transaction>[0] extends (
tx: infer T, tx: infer T,
) => Promise<any> ) => Promise<unknown>
? T ? T
: never; : never;
type DbLike = DbClient | DbTransaction; type DbLike = DbClient | DbTransaction;
const requireMapEntry = <K, V>(map: Map<K, V>, key: K, label: string): V => {
const value = map.get(key);
if (!value) throw new Error(`Missing ${label} for ${String(key)}`);
return value;
};
export async function upsertImages(images: Image[], dbClient: DbLike = db) { export async function upsertImages(images: Image[], dbClient: DbLike = db) {
await dbClient await dbClient
.insert(platformImage) .insert(platformImage)
.values( .values(
images.map(({ url, height, width }) => ({ images.map(({ url, height, width }) => ({
platform: PLATFORM_SPOTIFY, platform: PLATFORM_SPOTIFY,
url, url,
height, height,
width, width,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
export async function upsertGenres(genres: string[], dbClient: DbLike = db) { export async function upsertGenres(genres: string[], dbClient: DbLike = db) {
const values = genres.filter(Boolean).map((name) => ({ name })); const values = genres.filter(Boolean).map((name) => ({ name }));
if (values.length === 0) return; if (values.length === 0) return;
await dbClient.insert(genre).values(values).onConflictDoNothing(); await dbClient.insert(genre).values(values).onConflictDoNothing();
} }
export async function upsertArtists(artists: Artist[], dbClient: DbLike = db) { export async function upsertArtists(artists: Artist[], dbClient: DbLike = db) {
await dbClient await dbClient
.insert(artist) .insert(artist)
.values( .values(
artists.map(({ id, name, images, genres, popularity, type }) => ({ artists.map(({ id, name, popularity, type }) => ({
platform: PLATFORM_SPOTIFY, platform: PLATFORM_SPOTIFY,
platform_id: id, platform_id: id,
name, name,
popularity, popularity,
type, type,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
await upsertImages( await upsertImages(
artists.flatMap((a) => a.images), artists.flatMap((a) => a.images),
dbClient, dbClient,
); );
await upsertGenres( await upsertGenres(
artists.flatMap((a) => a.genres), artists.flatMap((a) => a.genres),
dbClient, dbClient,
); );
for (const spotifyArtist of artists) { for (const spotifyArtist of artists) {
await dbClient await dbClient
.insert(artistImage) .insert(artistImage)
.select( .select(
dbClient dbClient
.select({ .select({
artistId: artist.id, artistId: artist.id,
imageId: platformImage.id, imageId: platformImage.id,
}) })
.from(platformImage) .from(platformImage)
.where( .where(
and( and(
eq(platformImage.platform, PLATFORM_SPOTIFY), eq(platformImage.platform, PLATFORM_SPOTIFY),
inArray( inArray(
platformImage.url, platformImage.url,
spotifyArtist.images.map((t) => t.url), spotifyArtist.images.map((t) => t.url),
), ),
), ),
) )
.innerJoin(artist, eq(artist.platform_id, spotifyArtist.id)), .innerJoin(artist, eq(artist.platform_id, spotifyArtist.id)),
) )
.onConflictDoNothing(); .onConflictDoNothing();
await dbClient await dbClient
.insert(artistGenre) .insert(artistGenre)
.select( .select(
dbClient dbClient
.select({ .select({
artistId: artist.id, artistId: artist.id,
genreId: genre.id, genreId: genre.id,
}) })
.from(genre) .from(genre)
.where(inArray(genre.name, spotifyArtist.genres)) .where(inArray(genre.name, spotifyArtist.genres))
.innerJoin(artist, eq(artist.platform_id, spotifyArtist.id)), .innerJoin(artist, eq(artist.platform_id, spotifyArtist.id)),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
} }
async function getMissingArtists(artistIds: string[], dbClient: DbLike = db) { async function getMissingArtists(artistIds: string[], dbClient: DbLike = db) {
const existingArtists = await dbClient const existingArtists = await dbClient
.select({ id: artist.id }) .select({ id: artist.id })
.from(artist) .from(artist)
.where(inArray(artist.platform_id, artistIds)); .where(inArray(artist.platform_id, artistIds));
return artistIds.filter((id) => !existingArtists.some((a) => a.id === id)); return artistIds.filter((id) => !existingArtists.some((a) => a.id === id));
} }
async function lookupMissingArtists( async function lookupMissingArtists(
artistIds: string[], artistIds: string[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
const missingArtistIds = await getMissingArtists(artistIds, dbClient); const missingArtistIds = await getMissingArtists(artistIds, dbClient);
if (missingArtistIds.length === 0) return []; if (missingArtistIds.length === 0) return [];
let missingArtists: Artist[] = []; const missingArtists: Artist[] = [];
for (let i = 0; i < missingArtistIds.length / 50; i++) { for (let i = 0; i < missingArtistIds.length / 50; i++) {
missingArtists.push( missingArtists.push(
...(await defaultSdk.artists.get( ...(await defaultSdk.artists.get(
missingArtistIds.slice(i * 50, (i + 1) * 50), missingArtistIds.slice(i * 50, (i + 1) * 50),
)), )),
); );
} }
await upsertArtists(missingArtists, dbClient); await upsertArtists(missingArtists, dbClient);
return missingArtists; return missingArtists;
} }
function isFullArtistArray( function isFullArtistArray(
artists: Artist[] | SimplifiedArtist[], artists: Artist[] | SimplifiedArtist[],
): artists is Artist[] { ): artists is Artist[] {
return "images" in artists[0]!; const firstArtist = artists[0];
if (!firstArtist) return false;
return "images" in firstArtist;
} }
async function upsertMissingArtists( async function upsertMissingArtists(
artists: SimplifiedArtist[] | Artist[], artists: SimplifiedArtist[] | Artist[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (artists.length === 0) return; if (artists.length === 0) return;
let missingArtists: Artist[]; let missingArtists: Artist[];
if (isFullArtistArray(artists)) { if (isFullArtistArray(artists)) {
const missingArtistIds = await getMissingArtists( const missingArtistIds = await getMissingArtists(
artists.map((t) => t.id), artists.map((t) => t.id),
dbClient, dbClient,
); );
if (missingArtistIds.length === 0) return; if (missingArtistIds.length === 0) return;
missingArtists = artists.filter((a) => missingArtistIds.includes(a.id)); missingArtists = artists.filter((a) => missingArtistIds.includes(a.id));
} else { } else {
missingArtists = await lookupMissingArtists( missingArtists = await lookupMissingArtists(
artists.map((t) => t.id), artists.map((t) => t.id),
dbClient, dbClient,
); );
} }
if (missingArtists.length === 0) return; if (missingArtists.length === 0) return;
await upsertArtists(missingArtists, dbClient); await upsertArtists(missingArtists, dbClient);
return missingArtists; return missingArtists;
} }
async function getArtistIdMap(artistIds: string[], dbClient: DbLike = db) { async function getArtistIdMap(artistIds: string[], dbClient: DbLike = db) {
if (artistIds.length === 0) return new Map<string, string>(); if (artistIds.length === 0) return new Map<string, string>();
const rows = await dbClient const rows = await dbClient
.select({ id: artist.id, platform_id: artist.platform_id }) .select({ id: artist.id, platform_id: artist.platform_id })
.from(artist) .from(artist)
.where(inArray(artist.platform_id, artistIds)); .where(inArray(artist.platform_id, artistIds));
return new Map(rows.map((row) => [row.platform_id, row.id])); return new Map(rows.map((row) => [row.platform_id, row.id]));
} }
async function getAlbumIdMap(albumIds: string[], dbClient: DbLike = db) { async function getAlbumIdMap(albumIds: string[], dbClient: DbLike = db) {
if (albumIds.length === 0) return new Map<string, string>(); if (albumIds.length === 0) return new Map<string, string>();
const rows = await dbClient const rows = await dbClient
.select({ id: album.id, platform_id: album.platform_id }) .select({ id: album.id, platform_id: album.platform_id })
.from(album) .from(album)
.where(inArray(album.platform_id, albumIds)); .where(inArray(album.platform_id, albumIds));
return new Map(rows.map((row) => [row.platform_id ?? "", row.id])); return new Map(
rows
.filter((row) => row.platform_id)
.map((row) => [row.platform_id as string, row.id]),
);
} }
async function getTrackIdMap(trackIds: string[], dbClient: DbLike = db) { async function getTrackIdMap(trackIds: string[], dbClient: DbLike = db) {
if (trackIds.length === 0) return new Map<string, string>(); if (trackIds.length === 0) return new Map<string, string>();
const rows = await dbClient const rows = await dbClient
.select({ id: track.id, platform_id: track.platform_id }) .select({ id: track.id, platform_id: track.platform_id })
.from(track) .from(track)
.where(inArray(track.platform_id, trackIds)); .where(inArray(track.platform_id, trackIds));
return new Map(rows.map((row) => [row.platform_id ?? "", row.id])); return new Map(
rows
.filter((row) => row.platform_id)
.map((row) => [row.platform_id as string, row.id]),
);
} }
export async function upsertAlbums( export async function upsertAlbums(
albums: Album[] | SimplifiedAlbum[], albums: Album[] | SimplifiedAlbum[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
await upsertMissingArtists( await upsertMissingArtists(
albums.flatMap((a) => a.artists), albums.flatMap((a) => a.artists),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(album) .insert(album)
.values( .values(
albums.map(({ id, name, type, popularity, release_date, label }) => ({ albums.map(({ id, name, type, popularity, release_date, label }) => ({
platform: PLATFORM_SPOTIFY, platform: PLATFORM_SPOTIFY,
platform_id: id, platform_id: id,
name, name,
type, type,
popularity, popularity,
release_date: new Date(release_date), release_date: new Date(release_date),
label, label,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
await upsertImages( await upsertImages(
albums.flatMap((a) => a.images), albums.flatMap((a) => a.images),
dbClient, dbClient,
); );
await upsertGenres( await upsertGenres(
albums.flatMap((a) => a.genres), albums.flatMap((a) => a.genres),
dbClient, dbClient,
); );
for (const spotifyAlbum of albums) { for (const spotifyAlbum of albums) {
await dbClient await dbClient
.insert(albumImage) .insert(albumImage)
.select( .select(
dbClient dbClient
.select({ .select({
albumId: album.id, albumId: album.id,
imageId: platformImage.id, imageId: platformImage.id,
}) })
.from(platformImage) .from(platformImage)
.where( .where(
and( and(
eq(platformImage.platform, PLATFORM_SPOTIFY), eq(platformImage.platform, PLATFORM_SPOTIFY),
inArray( inArray(
platformImage.url, platformImage.url,
spotifyAlbum.images.map((t) => t.url), spotifyAlbum.images.map((t) => t.url),
), ),
), ),
) )
.innerJoin(album, eq(album.platform_id, spotifyAlbum.id)), .innerJoin(album, eq(album.platform_id, spotifyAlbum.id)),
) )
.onConflictDoNothing(); .onConflictDoNothing();
await dbClient await dbClient
.insert(albumArtist) .insert(albumArtist)
.select( .select(
dbClient dbClient
.select({ .select({
albumId: album.id, albumId: album.id,
artistId: artist.id, artistId: artist.id,
}) })
.from(artist) .from(artist)
.where( .where(
inArray( inArray(
artist.platform_id, artist.platform_id,
spotifyAlbum.artists.map((t) => t.id), spotifyAlbum.artists.map((t) => t.id),
), ),
) )
.innerJoin(album, eq(album.platform_id, spotifyAlbum.id)), .innerJoin(album, eq(album.platform_id, spotifyAlbum.id)),
) )
.onConflictDoNothing(); .onConflictDoNothing();
if (spotifyAlbum.genres?.length > 0) if (spotifyAlbum.genres?.length > 0)
await dbClient await dbClient
.insert(albumGenre) .insert(albumGenre)
.select( .select(
dbClient dbClient
.select({ .select({
albumId: album.id, albumId: album.id,
genreId: genre.id, genreId: genre.id,
}) })
.from(genre) .from(genre)
.where(inArray(genre.name, sql`${spotifyAlbum.genres}`)) .where(inArray(genre.name, sql`${spotifyAlbum.genres}`))
.innerJoin(album, eq(album.platform_id, spotifyAlbum.id)), .innerJoin(album, eq(album.platform_id, spotifyAlbum.id)),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
} }
export async function upsertTracks(tracks: Track[], dbClient: DbLike = db) { export async function upsertTracks(tracks: Track[], dbClient: DbLike = db) {
if (tracks.length === 0) return; if (tracks.length === 0) return;
await upsertAlbums( await upsertAlbums(
tracks.map((t) => t.album), tracks.map((t) => t.album),
dbClient, dbClient,
); );
await upsertMissingArtists( await upsertMissingArtists(
tracks.flatMap((t) => t.artists), tracks.flatMap((t) => t.artists),
dbClient, dbClient,
); );
const albumIdMap = await getAlbumIdMap( const albumIdMap = await getAlbumIdMap(
tracks.map((t) => t.album.id), tracks.map((t) => t.album.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(track) .insert(track)
.values( .values(
tracks.map((spotifyTrack) => ({ tracks.map((spotifyTrack) => ({
albumId: albumIdMap.get(spotifyTrack.album.id)!, albumId: requireMapEntry(albumIdMap, spotifyTrack.album.id, "albumId"),
name: spotifyTrack.name, name: spotifyTrack.name,
platform: PLATFORM_SPOTIFY, platform: PLATFORM_SPOTIFY,
platform_id: spotifyTrack.id, platform_id: spotifyTrack.id,
popularity: spotifyTrack.popularity, popularity: spotifyTrack.popularity,
duration: spotifyTrack.duration_ms, duration: spotifyTrack.duration_ms,
explicit: spotifyTrack.explicit, explicit: spotifyTrack.explicit,
disc_number: spotifyTrack.disc_number, disc_number: spotifyTrack.disc_number,
track_number: spotifyTrack.track_number, track_number: spotifyTrack.track_number,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
for (const spotifyTrack of tracks) { for (const spotifyTrack of tracks) {
await dbClient await dbClient
.insert(trackArtist) .insert(trackArtist)
.select( .select(
dbClient dbClient
.select({ .select({
trackId: track.id, trackId: track.id,
artistId: artist.id, artistId: artist.id,
}) })
.from(artist) .from(artist)
.where( .where(
inArray( inArray(
artist.platform_id, artist.platform_id,
spotifyTrack.artists.map((t) => t.id), spotifyTrack.artists.map((t) => t.id),
), ),
) )
.innerJoin(track, eq(track.platform_id, spotifyTrack.id)), .innerJoin(track, eq(track.platform_id, spotifyTrack.id)),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
} }
export async function upsertTopArtists( export async function upsertTopArtists(
userId: string, userId: string,
timeline: "short_term" | "medium_term" | "long_term", timeline: "short_term" | "medium_term" | "long_term",
artists: Artist[], artists: Artist[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (artists.length === 0) return; if (artists.length === 0) return;
await upsertArtists(artists, dbClient); await upsertArtists(artists, dbClient);
const artistIdMap = await getArtistIdMap( const artistIdMap = await getArtistIdMap(
artists.map((t) => t.id), artists.map((t) => t.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(topArtist) .insert(topArtist)
.values( .values(
artists.map((spotifyArtist, index) => ({ artists.map((spotifyArtist, index) => ({
artistId: artistIdMap.get(spotifyArtist.id)!, artistId: requireMapEntry(artistIdMap, spotifyArtist.id, "artistId"),
position: index + 1, position: index + 1,
userId, userId,
timeline, timeline,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
export async function upsertTopTracks( export async function upsertTopTracks(
userId: string, userId: string,
timeline: "short_term" | "medium_term" | "long_term", timeline: "short_term" | "medium_term" | "long_term",
tracks: Track[], tracks: Track[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (tracks.length === 0) return; if (tracks.length === 0) return;
await upsertTracks(tracks, dbClient); await upsertTracks(tracks, dbClient);
const trackIdMap = await getTrackIdMap( const trackIdMap = await getTrackIdMap(
tracks.map((t) => t.id), tracks.map((t) => t.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(topTrack) .insert(topTrack)
.values( .values(
tracks.map((spotifyTrack, index) => ({ tracks.map((spotifyTrack, index) => ({
trackId: trackIdMap.get(spotifyTrack.id)!, trackId: requireMapEntry(trackIdMap, spotifyTrack.id, "trackId"),
position: index + 1, position: index + 1,
userId, userId,
timeline, timeline,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
export async function upsertSavedAlbums( export async function upsertSavedAlbums(
userId: string, userId: string,
saved: SavedAlbum[], saved: SavedAlbum[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (saved.length === 0) return; if (saved.length === 0) return;
const albums = saved.map((item) => item.album); const albums = saved.map((item) => item.album);
await upsertAlbums(albums, dbClient); await upsertAlbums(albums, dbClient);
const albumIdMap = await getAlbumIdMap( const albumIdMap = await getAlbumIdMap(
albums.map((t) => t.id), albums.map((t) => t.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(savedAlbum) .insert(savedAlbum)
.values( .values(
saved.map((item) => ({ saved.map((item) => ({
albumId: albumIdMap.get(item.album.id)!, albumId: requireMapEntry(albumIdMap, item.album.id, "albumId"),
userId, userId,
saved_at: new Date(item.added_at), saved_at: new Date(item.added_at),
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
export async function upsertSavedTracks( export async function upsertSavedTracks(
userId: string, userId: string,
saved: SavedTrack[], saved: SavedTrack[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (saved.length === 0) return; if (saved.length === 0) return;
const tracks = saved.map((item) => item.track); const tracks = saved.map((item) => item.track);
await upsertTracks(tracks, dbClient); await upsertTracks(tracks, dbClient);
const trackIdMap = await getTrackIdMap( const trackIdMap = await getTrackIdMap(
tracks.map((t) => t.id), tracks.map((t) => t.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(savedTrack) .insert(savedTrack)
.values( .values(
saved.map((item) => ({ saved.map((item) => ({
trackId: trackIdMap.get(item.track.id)!, trackId: requireMapEntry(trackIdMap, item.track.id, "trackId"),
userId, userId,
saved_at: new Date(item.added_at), saved_at: new Date(item.added_at),
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
export async function upsertFollowedArtists( export async function upsertFollowedArtists(
userId: string, userId: string,
artists: Artist[], artists: Artist[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (artists.length === 0) return; if (artists.length === 0) return;
await upsertArtists(artists, dbClient); await upsertArtists(artists, dbClient);
const artistIdMap = await getArtistIdMap( const artistIdMap = await getArtistIdMap(
artists.map((t) => t.id), artists.map((t) => t.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(followedArtist) .insert(followedArtist)
.values( .values(
artists.map((spotifyArtist) => ({ artists.map((spotifyArtist) => ({
artistId: artistIdMap.get(spotifyArtist.id)!, artistId: requireMapEntry(artistIdMap, spotifyArtist.id, "artistId"),
userId, userId,
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }
export async function upsertPlaybackHistory( export async function upsertPlaybackHistory(
userId: string, userId: string,
items: PlayHistory[], items: PlayHistory[],
dbClient: DbLike = db, dbClient: DbLike = db,
) { ) {
if (items.length === 0) return; if (items.length === 0) return;
const tracks = items.map((item) => item.track); const tracks = items.map((item) => item.track);
await upsertTracks(tracks, dbClient); await upsertTracks(tracks, dbClient);
const trackIdMap = await getTrackIdMap( const trackIdMap = await getTrackIdMap(
tracks.map((t) => t.id), tracks.map((t) => t.id),
dbClient, dbClient,
); );
await dbClient await dbClient
.insert(playbackHistory) .insert(playbackHistory)
.values( .values(
items.map((item) => ({ items.map((item) => ({
trackId: trackIdMap.get(item.track.id)!, trackId: requireMapEntry(trackIdMap, item.track.id, "trackId"),
userId, userId,
played_at: new Date(item.played_at), played_at: new Date(item.played_at),
})), })),
) )
.onConflictDoNothing(); .onConflictDoNothing();
} }

View file

@ -2,6 +2,6 @@ import { DBOS } from "@dbos-inc/dbos-sdk";
import "./workflows/sync"; import "./workflows/sync";
DBOS.setConfig({ DBOS.setConfig({
name: "itpdp", name: "itpdp",
systemDatabaseUrl: process.env.DATABASE_URL, systemDatabaseUrl: process.env.DATABASE_URL,
}); });

View file

@ -1,19 +1,19 @@
import { Elysia, t } from "elysia"; import { DBOS } from "@dbos-inc/dbos-sdk";
import { Elysia } from "elysia";
import { betterAuthElysia } from "./auth"; import { betterAuthElysia } from "./auth";
import { syncApp } from "./routes/sync"; import { syncApp } from "./routes/sync";
import { DBOS } from "@dbos-inc/dbos-sdk";
import "./workflows/sync"; import "./workflows/sync";
import "./dbos.ts"; import "./dbos.ts";
import { statsApp } from "./routes/stats.ts";
import { partyApp } from "./routes/party"; import { partyApp } from "./routes/party";
import { statsApp } from "./routes/stats.ts";
const app = new Elysia() const app = new Elysia()
.use(betterAuthElysia) .use(betterAuthElysia)
.group("/api", (app) => app.use(syncApp).use(statsApp).use(partyApp)) .group("/api", (app) => app.use(syncApp).use(statsApp).use(partyApp))
.listen(4000); .listen(4000);
export type App = typeof app; export type App = typeof app;
await DBOS.launch({ await DBOS.launch({
conductorKey: process.env.DBOS_CONDUCTOR_KEY, conductorKey: process.env.DBOS_CONDUCTOR_KEY,
}); });

85
api/src/party-sockets.ts Normal file
View file

@ -0,0 +1,85 @@
type PartySocketEvent = {
type: string;
[key: string]: unknown;
};
type WebSocketLike = {
send: (data: string) => void;
close?: (code?: number, reason?: string) => void;
};
const partySockets = new Map<string, Map<string, Set<WebSocketLike>>>();
function getPartyUserSockets(partyId: string, userId: string) {
const partyMap = partySockets.get(partyId);
if (!partyMap) return null;
return partyMap.get(userId) ?? null;
}
export function registerPartySocket(
partyId: string,
userId: string,
ws: WebSocketLike,
) {
let partyMap = partySockets.get(partyId);
if (!partyMap) {
partyMap = new Map();
partySockets.set(partyId, partyMap);
}
let userSockets = partyMap.get(userId);
if (!userSockets) {
userSockets = new Set();
partyMap.set(userId, userSockets);
}
userSockets.add(ws);
}
export function unregisterPartySocket(
partyId: string,
userId: string,
ws: WebSocketLike,
) {
const partyMap = partySockets.get(partyId);
if (!partyMap) return;
const userSockets = partyMap.get(userId);
if (!userSockets) return;
userSockets.delete(ws);
if (userSockets.size === 0) {
partyMap.delete(userId);
}
if (partyMap.size === 0) {
partySockets.delete(partyId);
}
}
export function broadcastPartyEvent(partyId: string, event: PartySocketEvent) {
const partyMap = partySockets.get(partyId);
if (!partyMap) return;
const payload = JSON.stringify(event);
for (const userSockets of partyMap.values()) {
for (const ws of userSockets) {
ws.send(payload);
}
}
}
export function sendPartyEventToUser(
partyId: string,
userId: string,
event: PartySocketEvent,
) {
const userSockets = getPartyUserSockets(partyId, userId);
if (!userSockets) return;
const payload = JSON.stringify(event);
for (const ws of userSockets) {
ws.send(payload);
}
}

View file

@ -1,8 +1,14 @@
import Elysia, { t } from "elysia";
import { and, eq } from "drizzle-orm"; import { and, eq } from "drizzle-orm";
import { betterAuthElysia } from "../auth"; import Elysia, { t } from "elysia";
import { auth, betterAuthElysia } from "../auth";
import { db } from "../db"; import { db } from "../db";
import { party, partyMember } from "../db/schema"; import { party, partyMember } from "../db/schema";
import {
broadcastPartyEvent,
registerPartySocket,
sendPartyEventToUser,
unregisterPartySocket,
} from "../party-sockets";
const PARTY_STATUS = ["created", "started", "ended"] as const; const PARTY_STATUS = ["created", "started", "ended"] as const;
@ -10,289 +16,485 @@ type PartyStatus = (typeof PARTY_STATUS)[number];
type DbClient = typeof db; type DbClient = typeof db;
type DbTransaction = Parameters<typeof db.transaction>[0] extends ( type DbTransaction = Parameters<typeof db.transaction>[0] extends (
tx: infer T, tx: infer T,
) => Promise<any> ) => Promise<unknown>
? T ? T
: never; : never;
type DbLike = DbClient | DbTransaction; type DbLike = DbClient | DbTransaction;
type PartySnapshot = NonNullable<Awaited<ReturnType<typeof getPartyStatus>>>;
type PartySocketMessage =
| {
type: "member_payload";
payload: unknown;
}
| {
type: "ping";
};
const MAX_MEMBER_PAYLOAD_SIZE = 8_000;
type PartyWsData = {
user?: { id: string };
partyId?: string;
};
async function getPartyForUser(userId: string) { async function getPartyForUser(userId: string) {
const memberships = await db.query.partyMember.findMany({ const memberships = await db.query.partyMember.findMany({
where: { where: {
userId, userId,
}, },
with: { with: {
party: true, party: true,
}, },
limit: 1, limit: 1,
}); });
return memberships[0]?.party ?? null; return memberships[0]?.party ?? null;
} }
async function getMemberRecord(dbClient: DbLike, userId: string) { async function getMemberRecord(dbClient: DbLike, userId: string) {
return ( return (
(await dbClient.query.partyMember.findFirst({ (await dbClient.query.partyMember.findFirst({
where: { where: {
userId, userId,
}, },
})) ?? null })) ?? null
); );
} }
async function getPartyStatus(partyId: string) { async function getPartyStatus(partyId: string) {
const party = await db.query.party.findFirst({ const party = await db.query.party.findFirst({
where: { where: {
id: partyId, id: partyId,
}, },
}); });
if (!party) return null; if (!party) return null;
const members = await db.query.partyMember.findMany({ const members = await db.query.partyMember.findMany({
where: { where: {
partyId, partyId,
}, },
with: { with: {
user: true, user: true,
}, },
orderBy: { orderBy: {
joinedAt: "asc", joinedAt: "asc",
}, },
}); });
return { return {
party, party,
members, members,
}; };
}
function broadcastSnapshot(partyId: string, snapshot: PartySnapshot | null) {
if (!snapshot) return;
broadcastPartyEvent(partyId, {
type: "party_status",
party: snapshot.party,
members: snapshot.members,
});
}
function getPayloadSize(payload: unknown) {
try {
return JSON.stringify(payload).length;
} catch {
return Infinity;
}
} }
async function cleanupPartyIfEmpty(dbClient: DbLike, partyId: string) { async function cleanupPartyIfEmpty(dbClient: DbLike, partyId: string) {
const members = await dbClient.query.partyMember.findMany({ const members = await dbClient.query.partyMember.findMany({
where: { where: {
partyId, partyId,
}, },
limit: 1, limit: 1,
}); });
if (members.length > 0) return; if (members.length > 0) return;
await dbClient.delete(party).where(eq(party.id, partyId)); await dbClient.delete(party).where(eq(party.id, partyId));
} }
async function leaveParty(dbClient: DbLike, userId: string) { async function leaveParty(dbClient: DbLike, userId: string) {
const member = await getMemberRecord(dbClient, userId); const member = await getMemberRecord(dbClient, userId);
if (!member) return null; if (!member) return null;
await dbClient.delete(partyMember).where(eq(partyMember.id, member.id)); await dbClient.delete(partyMember).where(eq(partyMember.id, member.id));
const nextHost = await dbClient.query.partyMember.findFirst({ const nextHost = await dbClient.query.partyMember.findFirst({
where: { where: {
partyId: member.partyId, partyId: member.partyId,
}, },
orderBy: { orderBy: {
joinedAt: "asc", joinedAt: "asc",
}, },
}); });
if (nextHost) { let newHostId: string | null = null;
const currentParty = await dbClient.query.party.findFirst({ if (nextHost) {
where: { const currentParty = await dbClient.query.party.findFirst({
id: member.partyId, where: {
}, id: member.partyId,
}); },
if (currentParty?.hostId === userId) { });
await dbClient if (currentParty?.hostId === userId) {
.update(party) await dbClient
.set({ .update(party)
hostId: nextHost.userId, .set({
lastUpdated: new Date(), hostId: nextHost.userId,
}) lastUpdated: new Date(),
.where(eq(party.id, member.partyId)); })
} .where(eq(party.id, member.partyId));
} newHostId = nextHost.userId;
await cleanupPartyIfEmpty(dbClient, member.partyId); }
return member.partyId; }
await cleanupPartyIfEmpty(dbClient, member.partyId);
return {
partyId: member.partyId,
newHostId,
};
} }
function isValidStatus(status: string): status is PartyStatus { function isValidStatus(status: string): status is PartyStatus {
return PARTY_STATUS.includes(status as PartyStatus); return PARTY_STATUS.includes(status as PartyStatus);
} }
export const partyApp = new Elysia() export const partyApp = new Elysia()
.use(betterAuthElysia) .use(betterAuthElysia)
.group("/party", (app) => .group("/party", (app) =>
app app
.get( .ws("/ws", {
"/status", beforeHandle: async ({ request, set }) => {
async ({ user }) => { const session = await auth.api.getSession({
const currentParty = await getPartyForUser(user.id); headers: request.headers,
if (!currentParty) return { party: null, members: [] }; });
const status = await getPartyStatus(currentParty.id); if (!session) {
return status ?? { party: null, members: [] }; set.status = 401;
}, return;
{ auth: true }, }
) return {
.post( user: session.user,
"/join", session: session.session,
async ({ user, body, set }) => { };
const targetUserId = body.targetUserId; },
const targetUser = await db.query.user.findFirst({ open: async (ws) => {
where: { const data = ws.data as unknown as PartyWsData;
id: targetUserId, const user = data.user;
}, if (!user) return;
}); const membership = await getMemberRecord(db, user.id);
if (!targetUser) { if (!membership) {
set.status = 404; ws.send(
return { error: "Target user not found." }; JSON.stringify({
} type: "error",
message: "You are not in a party.",
}),
);
ws.close?.(1008, "Not in a party");
return;
}
let partyId: string | null = null; const snapshot = await getPartyStatus(membership.partyId);
await db.transaction(async (tx) => { data.partyId = membership.partyId;
await leaveParty(tx, user.id); registerPartySocket(membership.partyId, user.id, ws);
if (snapshot) {
ws.send(
JSON.stringify({
type: "snapshot",
party: snapshot.party,
members: snapshot.members,
}),
);
}
},
message: async (ws, message: PartySocketMessage) => {
const data = ws.data as unknown as PartyWsData;
const user = data.user;
if (!user) return;
if (message.type === "ping") {
ws.send(JSON.stringify({ type: "pong" }));
return;
}
const targetMembership = await getMemberRecord(tx, targetUserId); if (message.type !== "member_payload") return;
if (targetMembership) { const membership = await getMemberRecord(db, user.id);
partyId = targetMembership.partyId; if (!membership) return;
await tx
.update(party)
.set({
hostId: targetUserId,
lastUpdated: new Date(),
})
.where(eq(party.id, partyId));
} else {
const created = await tx
.insert(party)
.values({
status: "created",
hostId: targetUserId,
})
.returning({ id: party.id });
partyId = created[0]!.id;
await tx.insert(partyMember).values({
partyId,
userId: targetUserId,
});
}
await tx if (getPayloadSize(message.payload) > MAX_MEMBER_PAYLOAD_SIZE) {
.insert(partyMember) ws.send(
.values({ partyId, userId: user.id }) JSON.stringify({
.onConflictDoNothing(); type: "error",
}); message: "Payload too large.",
}),
);
return;
}
if (!partyId) return { party: null, members: [] }; const currentParty = await db.query.party.findFirst({
const status = await getPartyStatus(partyId); where: { id: membership.partyId },
return status ?? { party: null, members: [] }; });
}, if (!currentParty) return;
{
auth: true,
body: t.Object({
targetUserId: t.String(),
}),
},
)
.post(
"/leave",
async ({ user }) => {
const partyId = await db.transaction(async (tx) => {
return await leaveParty(tx, user.id);
});
if (!partyId) return { party: null, members: [] };
const status = await getPartyStatus(partyId);
return status ?? { party: null, members: [] };
},
{ auth: true },
)
.post(
"/kick",
async ({ user, body, set }) => {
const currentMembership = await getMemberRecord(db, user.id);
if (!currentMembership) {
set.status = 400;
return { error: "You are not in a party." };
}
const currentParty = await db.query.party.findFirst({ sendPartyEventToUser(membership.partyId, currentParty.hostId, {
where: { type: "member_payload",
id: currentMembership.partyId, fromUserId: user.id,
}, payload: message.payload,
}); });
if (!currentParty || currentParty.hostId !== user.id) { },
set.status = 403; close: async (ws) => {
return { error: "Only the host can kick members." }; const data = ws.data as unknown as PartyWsData;
} const user = data.user;
const { partyId } = data;
if (!user) return;
if (!partyId) {
const membership = await getMemberRecord(db, user.id);
if (!membership) return;
unregisterPartySocket(membership.partyId, user.id, ws);
return;
}
unregisterPartySocket(partyId, user.id, ws);
},
body: t.Union([
t.Object({ type: t.Literal("ping") }),
t.Object({ type: t.Literal("member_payload"), payload: t.Any() }),
]),
})
.get(
"/status",
async ({ user }) => {
const currentParty = await getPartyForUser(user.id);
if (!currentParty) return { party: null, members: [] };
const status = await getPartyStatus(currentParty.id);
return status ?? { party: null, members: [] };
},
{ auth: true },
)
.post(
"/join",
async ({ user, body, set }) => {
const targetUserId = body.targetUserId;
const targetUser = await db.query.user.findFirst({
where: {
id: targetUserId,
},
});
if (!targetUser) {
set.status = 404;
return { error: "Target user not found." };
}
if (body.memberUserId === user.id) { const { partyId, hostChanged, leaveResult } = await db.transaction(
set.status = 400; async (tx) => {
return { error: "Host cannot kick themselves." }; const leaveResult = await leaveParty(tx, user.id);
} let partyId: string | null = null;
let hostChanged = false;
await db.transaction(async (tx) => { const targetMembership = await getMemberRecord(tx, targetUserId);
await tx if (targetMembership) {
.delete(partyMember) partyId = targetMembership.partyId;
.where( await tx
and( .update(party)
eq(partyMember.partyId, currentMembership.partyId), .set({
eq(partyMember.userId, body.memberUserId), hostId: targetUserId,
), lastUpdated: new Date(),
); })
await cleanupPartyIfEmpty(tx, currentMembership.partyId); .where(eq(party.id, partyId));
}); hostChanged = true;
const status = await getPartyStatus(currentMembership.partyId); } else {
return status ?? { party: null, members: [] }; const created = await tx
}, .insert(party)
{ .values({
auth: true, status: "created",
body: t.Object({ hostId: targetUserId,
memberUserId: t.String(), })
}), .returning({ id: party.id });
}, const createdId = created[0]?.id ?? null;
) if (!createdId) {
.post( return {
"/status", partyId: null,
async ({ user, body, set }) => { hostChanged,
const currentMembership = await getMemberRecord(db, user.id); leaveResult,
if (!currentMembership) { };
set.status = 400; }
return { error: "You are not in a party." }; partyId = createdId;
} await tx.insert(partyMember).values({
partyId,
userId: targetUserId,
});
}
const currentParty = await db.query.party.findFirst({ if (!partyId) {
where: { return {
id: currentMembership.partyId, partyId: null,
}, hostChanged,
}); leaveResult,
if (!currentParty || currentParty.hostId !== user.id) { };
set.status = 403; }
return { error: "Only the host can update party status." };
}
if (!isValidStatus(body.status)) { await tx
set.status = 400; .insert(partyMember)
return { error: "Invalid party status." }; .values({ partyId, userId: user.id })
} .onConflictDoNothing();
const currentData = return {
currentParty?.data && typeof currentParty.data === "object" partyId,
? currentParty.data hostChanged,
: {}; leaveResult,
const nextData = body.data };
? { ...currentData, ...body.data } },
: currentData; );
await db.transaction(async (tx) => { if (!partyId) return { party: null, members: [] };
await tx const status = await getPartyStatus(partyId);
.update(party) if (leaveResult?.newHostId) {
.set({ broadcastPartyEvent(leaveResult.partyId, {
status: body.status, type: "host_changed",
data: nextData, hostId: leaveResult.newHostId,
lastUpdated: new Date(), });
}) }
.where(eq(party.id, currentMembership.partyId)); if (hostChanged) {
}); broadcastPartyEvent(partyId, {
type: "host_changed",
hostId: targetUserId,
});
}
broadcastPartyEvent(partyId, {
type: "member_joined",
userId: user.id,
});
broadcastSnapshot(partyId, status);
return status ?? { party: null, members: [] };
},
{
auth: true,
body: t.Object({
targetUserId: t.String(),
}),
},
)
.post(
"/leave",
async ({ user }) => {
const result = await db.transaction(async (tx) => {
return await leaveParty(tx, user.id);
});
if (!result) return { party: null, members: [] };
const status = await getPartyStatus(result.partyId);
broadcastPartyEvent(result.partyId, {
type: "member_left",
userId: user.id,
});
if (result.newHostId) {
broadcastPartyEvent(result.partyId, {
type: "host_changed",
hostId: result.newHostId,
});
}
broadcastSnapshot(result.partyId, status);
return status ?? { party: null, members: [] };
},
{ auth: true },
)
.post(
"/kick",
async ({ user, body, set }) => {
const currentMembership = await getMemberRecord(db, user.id);
if (!currentMembership) {
set.status = 400;
return { error: "You are not in a party." };
}
const status = await getPartyStatus(currentMembership.partyId); const currentParty = await db.query.party.findFirst({
return status ?? { party: null, members: [] }; where: {
}, id: currentMembership.partyId,
{ },
auth: true, });
body: t.Object({ if (!currentParty || currentParty.hostId !== user.id) {
status: t.Enum({ created: "created", started: "started" }), set.status = 403;
data: t.Optional(t.Any()), return { error: "Only the host can kick members." };
}), }
},
), if (body.memberUserId === user.id) {
); set.status = 400;
return { error: "Host cannot kick themselves." };
}
await db.transaction(async (tx) => {
await tx
.delete(partyMember)
.where(
and(
eq(partyMember.partyId, currentMembership.partyId),
eq(partyMember.userId, body.memberUserId),
),
);
await cleanupPartyIfEmpty(tx, currentMembership.partyId);
});
const status = await getPartyStatus(currentMembership.partyId);
broadcastPartyEvent(currentMembership.partyId, {
type: "member_left",
userId: body.memberUserId,
kickedBy: user.id,
});
broadcastSnapshot(currentMembership.partyId, status);
return status ?? { party: null, members: [] };
},
{
auth: true,
body: t.Object({
memberUserId: t.String(),
}),
},
)
.post(
"/status",
async ({ user, body, set }) => {
const currentMembership = await getMemberRecord(db, user.id);
if (!currentMembership) {
set.status = 400;
return { error: "You are not in a party." };
}
const currentParty = await db.query.party.findFirst({
where: {
id: currentMembership.partyId,
},
});
if (!currentParty || currentParty.hostId !== user.id) {
set.status = 403;
return { error: "Only the host can update party status." };
}
if (!isValidStatus(body.status)) {
set.status = 400;
return { error: "Invalid party status." };
}
const currentData =
currentParty?.data && typeof currentParty.data === "object"
? currentParty.data
: {};
const nextData = body.data
? { ...currentData, ...body.data }
: currentData;
await db.transaction(async (tx) => {
await tx
.update(party)
.set({
status: body.status,
data: nextData,
lastUpdated: new Date(),
})
.where(eq(party.id, currentMembership.partyId));
});
const status = await getPartyStatus(currentMembership.partyId);
broadcastSnapshot(currentMembership.partyId, status);
return status ?? { party: null, members: [] };
},
{
auth: true,
body: t.Object({
status: t.Enum({ created: "created", started: "started" }),
data: t.Optional(t.Any()),
}),
},
),
);

View file

@ -1,79 +1,79 @@
import Elysia from "elysia";
import { sql } from "drizzle-orm"; import { sql } from "drizzle-orm";
import Elysia from "elysia";
import { betterAuthElysia } from "../auth"; import { betterAuthElysia } from "../auth";
import { db } from "../db"; import { db } from "../db";
import { import {
artistGenre, artistGenre,
genre, genre,
savedTrack, savedTrack,
topTrack, topTrack,
trackArtist, trackArtist,
} from "../db/schema"; } from "../db/schema";
export const statsApp = new Elysia().use(betterAuthElysia).get( export const statsApp = new Elysia().use(betterAuthElysia).get(
"/stats", "/stats",
async ({ user }) => { async ({ user }) => {
const topArtists = await db.query.topArtist.findMany({ const topArtists = await db.query.topArtist.findMany({
limit: 10, limit: 10,
with: { with: {
artist: { artist: {
with: { with: {
genres: true, genres: true,
images: true, images: true,
}, },
}, },
}, },
where: { where: {
userId: user.id, userId: user.id,
}, },
}); });
const topTracks = await db.query.topTrack.findMany({ const topTracks = await db.query.topTrack.findMany({
limit: 10, limit: 10,
with: { with: {
track: { track: {
with: { with: {
album: { album: {
with: { with: {
images: true, images: true,
}, },
}, },
artists: { artists: {
with: { with: {
genres: true, genres: true,
}, },
}, },
}, },
}, },
}, },
where: { where: {
userId: user.id, userId: user.id,
timeline: "medium_term", timeline: "medium_term",
}, },
orderBy: { orderBy: {
position: "desc", position: "desc",
}, },
}); });
const recentTracks = await db.query.playbackHistory.findMany({ const recentTracks = await db.query.playbackHistory.findMany({
limit: 10, limit: 10,
with: { with: {
track: { track: {
with: { with: {
album: { album: {
with: { with: {
images: true, images: true,
}, },
}, },
}, },
}, },
}, },
where: { where: {
userId: user.id, userId: user.id,
}, },
}); });
const topGenresResult = await db.execute<{ const topGenresResult = await db.execute<{
name: string; name: string;
count: number; count: number;
}>(sql` }>(sql`
select ${genre.name} as name, count(*)::int as count select ${genre.name} as name, count(*)::int as count
from ( from (
select distinct ${trackArtist.trackId} as track_id, ${artistGenre.genreId} as genre_id select distinct ${trackArtist.trackId} as track_id, ${artistGenre.genreId} as genre_id
@ -95,10 +95,10 @@ export const statsApp = new Elysia().use(betterAuthElysia).get(
order by count desc order by count desc
limit 10 limit 10
`); `);
const topGenres = topGenresResult.rows; const topGenres = topGenresResult.rows;
return { topArtists, topTracks, recentTracks, topGenres }; return { topArtists, topTracks, recentTracks, topGenres };
}, },
{ {
auth: true, auth: true,
}, },
); );

View file

@ -3,11 +3,11 @@ import { betterAuthElysia } from "../auth";
import { spotifySyncWorkflow } from "../workflows/sync"; import { spotifySyncWorkflow } from "../workflows/sync";
export const syncApp = new Elysia().use(betterAuthElysia).post( export const syncApp = new Elysia().use(betterAuthElysia).post(
"/sync", "/sync",
async ({ user }) => { async ({ user }) => {
return await spotifySyncWorkflow.syncUser(user.id); return await spotifySyncWorkflow.syncUser(user.id);
}, },
{ {
auth: true, auth: true,
}, },
); );

View file

@ -1,246 +1,247 @@
import { DBOS, ConfiguredInstance } from "@dbos-inc/dbos-sdk"; import { ConfiguredInstance, DBOS } from "@dbos-inc/dbos-sdk";
import { SpotifyApi } from "@spotify/web-api-ts-sdk";
import type { import type {
Artist, Artist,
PlayHistory, PlayHistory,
SavedAlbum, SavedAlbum,
SavedTrack, SavedTrack,
Track, Track,
} from "@spotify/web-api-ts-sdk"; } from "@spotify/web-api-ts-sdk";
import { SpotifyApi } from "@spotify/web-api-ts-sdk";
import { eq } from "drizzle-orm";
import { auth, SPOTIFY_CLIENT_ID } from "../auth"; import { auth, SPOTIFY_CLIENT_ID } from "../auth";
import { db } from "../db"; import { db } from "../db";
import { import {
followedArtist, followedArtist,
savedAlbum, savedAlbum,
savedTrack, savedTrack,
topArtist, topArtist,
topTrack, topTrack,
} from "../db/schema"; } from "../db/schema";
import { import {
upsertFollowedArtists, upsertFollowedArtists,
upsertPlaybackHistory, upsertPlaybackHistory,
upsertSavedAlbums, upsertSavedAlbums,
upsertSavedTracks, upsertSavedTracks,
upsertTopArtists, upsertTopArtists,
upsertTopTracks, upsertTopTracks,
} from "../db/spotify"; } from "../db/spotify";
import { eq } from "drizzle-orm";
const timelines = ["short_term", "medium_term", "long_term"] as const; const timelines = ["short_term", "medium_term", "long_term"] as const;
type Timeline = (typeof timelines)[number]; type Timeline = (typeof timelines)[number];
type SyncPayload = { type SyncPayload = {
topArtistsByTimeline: Record<Timeline, Artist[]>; topArtistsByTimeline: Record<Timeline, Artist[]>;
topTracksByTimeline: Record<Timeline, Track[]>; topTracksByTimeline: Record<Timeline, Track[]>;
followedArtists: Artist[]; followedArtists: Artist[];
savedAlbums: SavedAlbum[]; savedAlbums: SavedAlbum[];
savedTracks: SavedTrack[]; savedTracks: SavedTrack[];
recentlyPlayed: PlayHistory[]; recentlyPlayed: PlayHistory[];
}; };
export class SpotifySyncWorkflow extends ConfiguredInstance { export class SpotifySyncWorkflow extends ConfiguredInstance {
constructor(name: string) { @DBOS.workflow()
super(name); async syncUser(userId: string) {
} console.log("Sync start");
const data = await this.fetchSpotifyData(userId);
console.log("Sync data fetched");
await this.persistSpotifyData(userId, data);
console.log("Synced");
return { ok: true };
}
@DBOS.workflow() private async fetchSpotifyData(userId: string): Promise<SyncPayload> {
async syncUser(userId: string) { const topArtistsByTimeline = await this.fetchTopArtists(userId);
console.log("Sync start"); const topTracksByTimeline = await this.fetchTopTracks(userId);
const data = await this.fetchSpotifyData(userId); const followedArtists = await this.fetchFollowedArtists(userId);
console.log("Sync data fetched"); const savedAlbums = await this.fetchSavedAlbums(userId);
await this.persistSpotifyData(userId, data); const savedTracks = await this.fetchSavedTracks(userId);
console.log("Synced"); const recentlyPlayed = await this.fetchRecentlyPlayed(userId);
return { ok: true }; return {
} topArtistsByTimeline,
topTracksByTimeline,
followedArtists,
savedAlbums,
savedTracks,
recentlyPlayed,
};
}
private async fetchSpotifyData(userId: string): Promise<SyncPayload> { private async persistSpotifyData(userId: string, data: SyncPayload) {
const topArtistsByTimeline = await this.fetchTopArtists(userId); await this.persistTopArtists(userId, data.topArtistsByTimeline);
const topTracksByTimeline = await this.fetchTopTracks(userId); await this.persistTopTracks(userId, data.topTracksByTimeline);
const followedArtists = await this.fetchFollowedArtists(userId); await this.persistFollowedArtists(userId, data.followedArtists);
const savedAlbums = await this.fetchSavedAlbums(userId); await this.persistSavedAlbums(userId, data.savedAlbums);
const savedTracks = await this.fetchSavedTracks(userId); await this.persistSavedTracks(userId, data.savedTracks);
const recentlyPlayed = await this.fetchRecentlyPlayed(userId); await this.persistPlaybackHistory(userId, data.recentlyPlayed);
return { }
topArtistsByTimeline,
topTracksByTimeline,
followedArtists,
savedAlbums,
savedTracks,
recentlyPlayed,
};
}
private async persistSpotifyData(userId: string, data: SyncPayload) { @DBOS.step()
await this.persistTopArtists(userId, data.topArtistsByTimeline); private async persistTopArtists(
await this.persistTopTracks(userId, data.topTracksByTimeline); userId: string,
await this.persistFollowedArtists(userId, data.followedArtists); topArtistsByTimeline: Record<Timeline, Artist[]>,
await this.persistSavedAlbums(userId, data.savedAlbums); ) {
await this.persistSavedTracks(userId, data.savedTracks); await db.transaction(async (tx) => {
await this.persistPlaybackHistory(userId, data.recentlyPlayed); await tx.delete(topArtist).where(eq(topArtist.userId, userId));
} for (const timeline of timelines) {
await upsertTopArtists(
userId,
timeline,
topArtistsByTimeline[timeline],
tx,
);
}
});
}
@DBOS.step() @DBOS.step()
private async persistTopArtists( private async persistTopTracks(
userId: string, userId: string,
topArtistsByTimeline: Record<Timeline, Artist[]>, topTracksByTimeline: Record<Timeline, Track[]>,
) { ) {
await db.transaction(async (tx) => { await db.transaction(async (tx) => {
await tx.delete(topArtist).where(eq(topArtist.userId, userId)); await tx.delete(topTrack).where(eq(topTrack.userId, userId));
for (const timeline of timelines) { for (const timeline of timelines) {
await upsertTopArtists( await upsertTopTracks(
userId, userId,
timeline, timeline,
topArtistsByTimeline[timeline], topTracksByTimeline[timeline],
tx, tx,
); );
} }
}); });
} }
@DBOS.step() @DBOS.step()
private async persistTopTracks( private async persistFollowedArtists(userId: string, artists: Artist[]) {
userId: string, await db.transaction(async (tx) => {
topTracksByTimeline: Record<Timeline, Track[]>, await tx.delete(followedArtist).where(eq(followedArtist.userId, userId));
) { await upsertFollowedArtists(userId, artists, tx);
await db.transaction(async (tx) => { });
await tx.delete(topTrack).where(eq(topTrack.userId, userId)); }
for (const timeline of timelines) {
await upsertTopTracks(
userId,
timeline,
topTracksByTimeline[timeline],
tx,
);
}
});
}
@DBOS.step() @DBOS.step()
private async persistFollowedArtists(userId: string, artists: Artist[]) { private async persistSavedAlbums(userId: string, albums: SavedAlbum[]) {
await db.transaction(async (tx) => { await db.transaction(async (tx) => {
await tx.delete(followedArtist).where(eq(followedArtist.userId, userId)); await tx.delete(savedAlbum).where(eq(savedAlbum.userId, userId));
await upsertFollowedArtists(userId, artists, tx); await upsertSavedAlbums(userId, albums, tx);
}); });
} }
@DBOS.step() @DBOS.step()
private async persistSavedAlbums(userId: string, albums: SavedAlbum[]) { private async persistSavedTracks(userId: string, tracks: SavedTrack[]) {
await db.transaction(async (tx) => { await db.transaction(async (tx) => {
await tx.delete(savedAlbum).where(eq(savedAlbum.userId, userId)); await tx.delete(savedTrack).where(eq(savedTrack.userId, userId));
await upsertSavedAlbums(userId, albums, tx); await upsertSavedTracks(userId, tracks, tx);
}); });
} }
@DBOS.step() @DBOS.step()
private async persistSavedTracks(userId: string, tracks: SavedTrack[]) { private async persistPlaybackHistory(userId: string, items: PlayHistory[]) {
await db.transaction(async (tx) => { await db.transaction(async (tx) => {
await tx.delete(savedTrack).where(eq(savedTrack.userId, userId)); await upsertPlaybackHistory(userId, items, tx);
await upsertSavedTracks(userId, tracks, tx); });
}); }
}
@DBOS.step() @DBOS.step()
private async persistPlaybackHistory(userId: string, items: PlayHistory[]) { private async fetchTopArtists(
await db.transaction(async (tx) => { userId: string,
await upsertPlaybackHistory(userId, items, tx); ): Promise<Record<Timeline, Artist[]>> {
}); const sdk = await this.createSdk(userId);
} const topArtistsByTimeline = {} as Record<Timeline, Artist[]>;
for (const timeline of timelines) {
const topArtists = await sdk.currentUser.topItems(
"artists",
timeline,
50,
);
topArtistsByTimeline[timeline] = topArtists.items;
}
return topArtistsByTimeline;
}
@DBOS.step() @DBOS.step()
private async fetchTopArtists( private async fetchTopTracks(
userId: string, userId: string,
): Promise<Record<Timeline, Artist[]>> { ): Promise<Record<Timeline, Track[]>> {
const sdk = await this.createSdk(userId); const sdk = await this.createSdk(userId);
const topArtistsByTimeline = {} as Record<Timeline, Artist[]>; const topTracksByTimeline = {} as Record<Timeline, Track[]>;
for (const timeline of timelines) { for (const timeline of timelines) {
const topArtists = await sdk.currentUser.topItems( const topTracks = await sdk.currentUser.topItems("tracks", timeline, 50);
"artists", topTracksByTimeline[timeline] = topTracks.items;
timeline, }
50, return topTracksByTimeline;
); }
topArtistsByTimeline[timeline] = topArtists.items;
}
return topArtistsByTimeline;
}
@DBOS.step() @DBOS.step()
private async fetchTopTracks( private async fetchFollowedArtists(userId: string): Promise<Artist[]> {
userId: string, const sdk = await this.createSdk(userId);
): Promise<Record<Timeline, Track[]>> { const followed: Artist[] = [];
const sdk = await this.createSdk(userId); let after: string | undefined;
const topTracksByTimeline = {} as Record<Timeline, Track[]>; while (true) {
for (const timeline of timelines) { const page = await sdk.currentUser.followedArtists(after, 50);
const topTracks = await sdk.currentUser.topItems("tracks", timeline, 50); const artists = page.artists;
topTracksByTimeline[timeline] = topTracks.items; followed.push(...artists.items);
} if (!artists.next || artists.items.length === 0) break;
return topTracksByTimeline; const lastArtist = artists.items.at(-1);
} after = lastArtist?.id;
}
return followed;
}
@DBOS.step() @DBOS.step()
private async fetchFollowedArtists(userId: string): Promise<Artist[]> { private async fetchSavedAlbums(userId: string): Promise<SavedAlbum[]> {
const sdk = await this.createSdk(userId); const sdk = await this.createSdk(userId);
const followed: Artist[] = []; const saved: SavedAlbum[] = [];
let after: string | undefined; let offset = 0;
while (true) { while (true) {
const page = await sdk.currentUser.followedArtists(after, 50); const page = await sdk.currentUser.albums.savedAlbums(50, offset);
const artists = page.artists; saved.push(...page.items);
followed.push(...artists.items); offset += page.items.length;
if (!artists.next || artists.items.length === 0) break; if (!page.next || offset >= page.total) break;
after = artists.items[artists.items.length - 1]!.id; }
} return saved;
return followed; }
}
@DBOS.step() @DBOS.step()
private async fetchSavedAlbums(userId: string): Promise<SavedAlbum[]> { private async fetchSavedTracks(userId: string): Promise<SavedTrack[]> {
const sdk = await this.createSdk(userId); const sdk = await this.createSdk(userId);
const saved: SavedAlbum[] = []; const saved: SavedTrack[] = [];
let offset = 0; let offset = 0;
while (true) { while (true) {
const page = await sdk.currentUser.albums.savedAlbums(50, offset); const page = await sdk.currentUser.tracks.savedTracks(50, offset);
saved.push(...page.items); saved.push(...page.items);
offset += page.items.length; offset += page.items.length;
if (!page.next || offset >= page.total) break; if (!page.next || offset >= page.total) break;
} }
return saved; return saved;
} }
@DBOS.step() @DBOS.step()
private async fetchSavedTracks(userId: string): Promise<SavedTrack[]> { private async fetchRecentlyPlayed(userId: string): Promise<PlayHistory[]> {
const sdk = await this.createSdk(userId); const sdk = await this.createSdk(userId);
const saved: SavedTrack[] = []; const recentlyPlayed = await sdk.player.getRecentlyPlayedTracks(50);
let offset = 0; return recentlyPlayed.items;
while (true) { }
const page = await sdk.currentUser.tracks.savedTracks(50, offset);
saved.push(...page.items);
offset += page.items.length;
if (!page.next || offset >= page.total) break;
}
return saved;
}
@DBOS.step() private async createSdk(userId: string) {
private async fetchRecentlyPlayed(userId: string): Promise<PlayHistory[]> { const accessToken = await auth.api.getAccessToken({
const sdk = await this.createSdk(userId); body: {
const recentlyPlayed = await sdk.player.getRecentlyPlayedTracks(50); userId,
return recentlyPlayed.items; providerId: "spotify",
} },
});
private async createSdk(userId: string) { return SpotifyApi.withAccessToken(SPOTIFY_CLIENT_ID, {
const accessToken = await auth.api.getAccessToken({ access_token: accessToken.accessToken,
body: { expires_in: accessToken.accessTokenExpiresAt
userId, ? Date.now() - Number(accessToken.accessTokenExpiresAt)
providerId: "spotify", : 0,
}, expires: accessToken.accessTokenExpiresAt
}); ? Number(accessToken.accessTokenExpiresAt)
return SpotifyApi.withAccessToken(SPOTIFY_CLIENT_ID, { : 0,
access_token: accessToken.accessToken, refresh_token: "",
expires_in: Date.now() - Number(accessToken.accessTokenExpiresAt!), token_type: "",
expires: Number(accessToken.accessTokenExpiresAt), });
refresh_token: "", }
token_type: "",
});
}
} }
export const spotifySyncWorkflow = new SpotifySyncWorkflow("spotify-sync"); export const spotifySyncWorkflow = new SpotifySyncWorkflow("spotify-sync");

View file

@ -1,30 +1,30 @@
{ {
"compilerOptions": { "compilerOptions": {
// Environment setup & latest features // Environment setup & latest features
"lib": ["ESNext"], "lib": ["ESNext"],
"target": "ESNext", "target": "ESNext",
"module": "Preserve", "module": "Preserve",
"moduleDetection": "force", "moduleDetection": "force",
"jsx": "react-jsx", "jsx": "react-jsx",
"allowJs": true, "allowJs": true,
"experimentalDecorators": true, "experimentalDecorators": true,
// Bundler mode // Bundler mode
"moduleResolution": "bundler", "moduleResolution": "bundler",
"allowImportingTsExtensions": true, "allowImportingTsExtensions": true,
"verbatimModuleSyntax": true, "verbatimModuleSyntax": true,
"noEmit": true, "noEmit": true,
// Best practices // Best practices
"strict": true, "strict": true,
"skipLibCheck": true, "skipLibCheck": true,
"noFallthroughCasesInSwitch": true, "noFallthroughCasesInSwitch": true,
"noUncheckedIndexedAccess": true, "noUncheckedIndexedAccess": true,
"noImplicitOverride": true, "noImplicitOverride": true,
// Some stricter flags (disabled by default) // Some stricter flags (disabled by default)
"noUnusedLocals": false, "noUnusedLocals": false,
"noUnusedParameters": false, "noUnusedParameters": false,
"noPropertyAccessFromIndexSignature": false, "noPropertyAccessFromIndexSignature": false
}, }
} }