working sync

This commit is contained in:
Daniel Bulant 2026-04-20 18:29:36 +02:00
parent 332ddd76cf
commit a386920a90
No known key found for this signature in database
2 changed files with 82 additions and 30 deletions

View file

@ -29,7 +29,7 @@ import {
track,
trackArtist,
} from "./schema";
import { and, eq, inArray } from "drizzle-orm";
import { and, eq, inArray, sql } from "drizzle-orm";
import { defaultSdk } from "../auth";
export const PLATFORM_SPOTIFY = "spotify" as const;
@ -57,10 +57,9 @@ export async function upsertImages(images: Image[], dbClient: DbLike = db) {
}
export async function upsertGenres(genres: string[], dbClient: DbLike = db) {
await dbClient
.insert(genre)
.values(genres.map((name) => ({ name })))
.onConflictDoNothing();
const values = genres.filter(Boolean).map((name) => ({ name }));
if (values.length === 0) return;
await dbClient.insert(genre).values(values).onConflictDoNothing();
}
export async function upsertArtists(artists: Artist[], dbClient: DbLike = db) {
@ -136,7 +135,14 @@ async function lookupMissingArtists(
) {
const missingArtistIds = await getMissingArtists(artistIds, dbClient);
if (missingArtistIds.length === 0) return [];
const missingArtists = await defaultSdk.artists.get(missingArtistIds);
let missingArtists: Artist[] = [];
for (let i = 0; i < missingArtistIds.length / 50; i++) {
missingArtists.push(
...(await defaultSdk.artists.get(
missingArtistIds.slice(i * 50, (i + 1) * 50),
)),
);
}
await upsertArtists(missingArtists, dbClient);
return missingArtists;
}
@ -268,19 +274,20 @@ export async function upsertAlbums(
.innerJoin(album, eq(album.platform_id, spotifyAlbum.id)),
)
.onConflictDoNothing();
await dbClient
.insert(albumGenre)
.select(
dbClient
.select({
albumId: album.id,
genreId: genre.id,
})
.from(genre)
.where(inArray(genre.name, spotifyAlbum.genres))
.innerJoin(album, eq(album.platform_id, spotifyAlbum.id)),
)
.onConflictDoNothing();
if (spotifyAlbum.genres?.length > 0)
await dbClient
.insert(albumGenre)
.select(
dbClient
.select({
albumId: album.id,
genreId: genre.id,
})
.from(genre)
.where(inArray(genre.name, sql`${spotifyAlbum.genres}`))
.innerJoin(album, eq(album.platform_id, spotifyAlbum.id)),
)
.onConflictDoNothing();
}
}

View file

@ -70,34 +70,79 @@ export class SpotifySyncWorkflow extends ConfiguredInstance {
};
}
@DBOS.step()
private async persistSpotifyData(userId: string, data: SyncPayload) {
await this.persistTopArtists(userId, data.topArtistsByTimeline);
await this.persistTopTracks(userId, data.topTracksByTimeline);
await this.persistFollowedArtists(userId, data.followedArtists);
await this.persistSavedAlbums(userId, data.savedAlbums);
await this.persistSavedTracks(userId, data.savedTracks);
await this.persistPlaybackHistory(userId, data.recentlyPlayed);
}
@DBOS.step()
private async persistTopArtists(
userId: string,
topArtistsByTimeline: Record<Timeline, Artist[]>,
) {
await db.transaction(async (tx) => {
await tx.delete(topArtist).where(eq(topArtist.userId, userId));
await tx.delete(topTrack).where(eq(topTrack.userId, userId));
await tx.delete(savedAlbum).where(eq(savedAlbum.userId, userId));
await tx.delete(savedTrack).where(eq(savedTrack.userId, userId));
await tx.delete(followedArtist).where(eq(followedArtist.userId, userId));
for (const timeline of timelines) {
await upsertTopArtists(
userId,
timeline,
data.topArtistsByTimeline[timeline],
topArtistsByTimeline[timeline],
tx,
);
}
});
}
@DBOS.step()
private async persistTopTracks(
userId: string,
topTracksByTimeline: Record<Timeline, Track[]>,
) {
await db.transaction(async (tx) => {
await tx.delete(topTrack).where(eq(topTrack.userId, userId));
for (const timeline of timelines) {
await upsertTopTracks(
userId,
timeline,
data.topTracksByTimeline[timeline],
topTracksByTimeline[timeline],
tx,
);
}
await upsertFollowedArtists(userId, data.followedArtists, tx);
await upsertSavedAlbums(userId, data.savedAlbums, tx);
await upsertSavedTracks(userId, data.savedTracks, tx);
await upsertPlaybackHistory(userId, data.recentlyPlayed, tx);
});
}
@DBOS.step()
private async persistFollowedArtists(userId: string, artists: Artist[]) {
await db.transaction(async (tx) => {
await tx.delete(followedArtist).where(eq(followedArtist.userId, userId));
await upsertFollowedArtists(userId, artists, tx);
});
}
@DBOS.step()
private async persistSavedAlbums(userId: string, albums: SavedAlbum[]) {
await db.transaction(async (tx) => {
await tx.delete(savedAlbum).where(eq(savedAlbum.userId, userId));
await upsertSavedAlbums(userId, albums, tx);
});
}
@DBOS.step()
private async persistSavedTracks(userId: string, tracks: SavedTrack[]) {
await db.transaction(async (tx) => {
await tx.delete(savedTrack).where(eq(savedTrack.userId, userId));
await upsertSavedTracks(userId, tracks, tx);
});
}
@DBOS.step()
private async persistPlaybackHistory(userId: string, items: PlayHistory[]) {
await db.transaction(async (tx) => {
await upsertPlaybackHistory(userId, items, tx);
});
}