diff --git a/api/src/dbos.ts b/api/src/dbos.ts index 6afce72..b2c8772 100644 --- a/api/src/dbos.ts +++ b/api/src/dbos.ts @@ -1,5 +1,6 @@ import { DBOS } from "@dbos-inc/dbos-sdk"; import "./workflows/sync"; +import "./workflows/quiz"; DBOS.setConfig({ name: "itpdp", diff --git a/api/src/party-types.ts b/api/src/party-types.ts index 3d3a1dd..f1a888d 100644 --- a/api/src/party-types.ts +++ b/api/src/party-types.ts @@ -24,6 +24,22 @@ export type PartySocketOutgoing = | { type: "ping" } | { type: "member_payload"; payload: unknown }; +export type QuizState = { + status: "idle" | "running" | "results"; + workflowId: string | null; + questionIndex: number; + currentQuestion: { + text: string; + options: string[]; + correct: number; + } | null; + answers: Record< + string, + { playerId: string; selected: number; correct: boolean } + >; + scores: Record; +}; + export type PartySocketEvent = | { type: "snapshot"; party: Party | null; members: PartyMemberWithUser[] } | { type: "party_status"; party: Party; members: PartyMemberWithUser[] } diff --git a/api/src/routes/party-socket.ts b/api/src/routes/party-socket.ts index 13d75ff..8b4f1e7 100644 --- a/api/src/routes/party-socket.ts +++ b/api/src/routes/party-socket.ts @@ -4,6 +4,7 @@ import { betterAuthElysia } from "../auth"; import { db } from "../db"; import { getMemberRecord, getPartyStatus } from "../party-data"; +import type { QuizState } from "../party-types"; function userTopic(userId: string) { return `user:${userId}`; @@ -25,6 +26,28 @@ export const pubsub = { }, }; +async function broadcastQuizState(ws: any, partyId: string) { + const partyRecord = await db.query.party.findFirst({ + where: { id: partyId }, + }); + + if (!partyRecord) return; + + const quizData = ((partyRecord.data ?? {}) as Record).quiz as + | QuizState + | undefined; + if (!quizData) return; + if (!quizData) return; + + ws.publish( + partyTopic(partyId), + JSON.stringify({ + type: "quiz_state", + quiz: quizData, + }), + ); +} + export const topic = { user: userTopic, party: partyTopic, @@ -68,6 +91,8 @@ export const partySocketApp = new Elysia() members: snapshot.members, }), ); + + await broadcastQuizState(ws, membership.partyId); } }, message: async (ws, message) => { diff --git a/api/src/routes/quiz.ts b/api/src/routes/quiz.ts new file mode 100644 index 0000000..cff221a --- /dev/null +++ b/api/src/routes/quiz.ts @@ -0,0 +1,166 @@ +import { DBOS } from "@dbos-inc/dbos-sdk"; +import { eq } from "drizzle-orm"; +import Elysia, { t } from "elysia"; +import { betterAuthElysia } from "../auth"; +import { db } from "../db"; +import { party } from "../db/schema"; +import { getMemberRecord } from "../party-data"; +import type { QuizState } from "../party-types"; +import { QuizWorkflow, quizQueue } from "../workflows/quiz"; +import { pubsub } from "./party-socket"; + +const quizWf = new QuizWorkflow(); + +export const quizRoutes = new Elysia() + .use(betterAuthElysia) + .group("/party/:partyId/quiz", (app) => + app + .post( + "/start", + async ({ user, set, params }) => { + const membership = await getMemberRecord(db, user.id); + if (!membership || membership.partyId !== params.partyId) { + set.status = 403; + return { error: "Not a member of this party" }; + } + + const existingQuiz = await db + .select({ status: party.status }) + .from(party) + .where(eq(party.id, params.partyId)) + .limit(1) + .then((rows) => rows[0]); + + if (existingQuiz?.status === "started") { + set.status = 409; + return { error: "Quiz already running" }; + } + + const handle = await DBOS.startWorkflow(quizWf.startQuiz, { + queueName: quizQueue.name, + })(params.partyId); + + await db + .update(party) + .set({ + status: "started", + lastUpdated: new Date(), + }) + .where(eq(party.id, params.partyId)); + + pubsub.publish( + `party:${params.partyId}`, + JSON.stringify({ + type: "party_status", + party: { status: "started" }, + members: [], + }), + ); + + return { + message: "Quiz started", + workflowId: handle.workflowID, + }; + }, + { auth: true }, + ) + .post( + "/response", + async ({ user, body, params, set }) => { + const existingQuiz = await db + .select({ data: party.data }) + .from(party) + .where(eq(party.id, params.partyId)) + .limit(1) + .then((rows) => rows[0]); + + if (!existingQuiz) { + set.status = 404; + return { error: "Party not found" }; + } + + const quizData = ( + (existingQuiz.data ?? {}) as Record + ).quiz as QuizState | undefined; + + if (!quizData || quizData.status !== "running") { + set.status = 400; + return { error: "Quiz not running" }; + } + + if (!quizData.workflowId) { + set.status = 500; + return { error: "Workflow ID not found" }; + } + + await DBOS.send( + quizData.workflowId, + { playerId: user.id, selected: body.selected }, + "quiz_responses", + ); + + const updatedParty = await db + .select({ data: party.data }) + .from(party) + .where(eq(party.id, params.partyId)) + .limit(1) + .then((rows) => rows[0]); + + const updatedQuizData = ( + (updatedParty?.data ?? {}) as Record + ).quiz as QuizState | undefined; + + if (updatedQuizData) { + pubsub.publish( + `party:${params.partyId}`, + JSON.stringify({ + type: "quiz_state", + quiz: updatedQuizData, + }), + ); + } + + return { message: "Response recorded" }; + }, + { + auth: true, + body: t.Object({ + selected: t.Integer(), + }), + }, + ) + .get( + "/status", + async ({ params, set }) => { + const existingQuiz = await db + .select({ data: party.data }) + .from(party) + .where(eq(party.id, params.partyId)) + .limit(1) + .then((rows) => rows[0]); + + if (!existingQuiz) { + set.status = 404; + return { error: "Party not found" }; + } + + const quizData = ( + (existingQuiz.data ?? {}) as Record + ).quiz as QuizState | undefined; + + return { + quiz: + quizData ?? + ({ + status: "idle", + workflowId: null, + questionIndex: 0, + currentQuestion: null, + answers: {}, + scores: {}, + } satisfies QuizState), + }; + }, + { auth: true }, + ), + ); diff --git a/api/src/workflows/__tests__/party-analysis.test.ts b/api/src/workflows/__tests__/party-analysis.test.ts index 129ea20..1fe6142 100644 --- a/api/src/workflows/__tests__/party-analysis.test.ts +++ b/api/src/workflows/__tests__/party-analysis.test.ts @@ -1,3 +1,4 @@ +/** biome-ignore-all lint/style/noNonNullAssertion: */ import { DBOS } from "@dbos-inc/dbos-sdk"; import { describe, expect, it } from "vitest"; import { @@ -284,8 +285,7 @@ describe("PartyAnalysisWorkflow", () => { describe("analyzeParty - similarity calculation", () => { it("calculates Jaccard-like similarity using min/max scoring", async () => { - const { partyId, userIdA, userIdB } = - await seedPartyWithTwoSimilarUsers(); + const { partyId } = await seedPartyWithTwoSimilarUsers(); const result = await partyAnalysisWorkflow.analyzeParty(partyId); @@ -305,7 +305,6 @@ describe("PartyAnalysisWorkflow", () => { await partyAnalysisWorkflow.analyzeParty(partyId); const { db } = await import("../../db"); - const { party } = await import("../../db/schema"); const savedParty = await db.query.party.findFirst({ where: { id: partyId }, diff --git a/api/src/workflows/quiz.ts b/api/src/workflows/quiz.ts new file mode 100644 index 0000000..ec2604e --- /dev/null +++ b/api/src/workflows/quiz.ts @@ -0,0 +1,201 @@ +import { ConfiguredInstance, DBOS, WorkflowQueue } from "@dbos-inc/dbos-sdk"; +import { eq } from "drizzle-orm"; +import { db } from "../db"; +import { party, partyMember } from "../db/schema"; +import type { QuizState } from "../party-types"; + +const TOTAL_QUESTIONS = 5; +const QUESTION_TIMEOUT_SECONDS = 30; + +export const quizQueue = new WorkflowQueue("quiz_queue", { concurrency: 1 }); + +type Response = { + playerId: string; + selected: number; +}; + +export class QuizWorkflow extends ConfiguredInstance { + constructor() { + super("QuizWorkflow"); + } + + @DBOS.step() + private async updatePartyData( + partyId: string, + quizState: QuizState, + ): Promise { + await db.transaction(async (tx) => { + const currentParty = await tx + .select({ data: party.data }) + .from(party) + .where(eq(party.id, partyId)) + .limit(1) + .then((rows) => rows[0]); + + if (!currentParty) return; + + const currentData = (currentParty.data ?? {}) as Record; + await tx + .update(party) + .set({ + data: { ...currentData, quiz: quizState }, + lastUpdated: new Date(), + }) + .where(eq(party.id, partyId)); + }); + } + + @DBOS.step() + async generateQuestion(index: number): Promise<{ + text: string; + options: string[]; + correct: number; + }> { + // Placeholder - returns same question for now, question generation comes later + const questions: { + text: string; + options: string[]; + correct: number; + }[] = [ + { + text: "What is the most common genre in your party's shared taste?", + options: ["Hip-Hop", "Rock", "Electronic", "Jazz"], + correct: 0, + }, + { + text: "Which artist do most party members follow?", + options: ["Artist A", "Artist B", "Artist C", "Artist D"], + correct: 1, + }, + { + text: "What percentage of the party shares at least 1 album?", + options: ["0-25%", "25-50%", "50-75%", "75-100%"], + correct: 2, + }, + { + text: "Who has the most diverse taste in the party?", + options: ["Player A", "Player B", "Player C", "Player D"], + correct: 0, + }, + { + text: "Which track appears most in everyone's top 50?", + options: ["Track A", "Track B", "Track C", "Track D"], + correct: 3, + }, + ]; + + const question = questions[index % questions.length]; + if (!question) { + throw new Error("Question not found"); + } + return question; + } + + @DBOS.step() + async saveAnswer( + _partyId: string, + _questionIndex: number, + _answer: Response & { correct: boolean }, + ): Promise { + // Answers stored transiently in party.data, not persisted separately + } + + @DBOS.workflow() + async startQuiz(partyId: string): Promise { + const quizState: QuizState = { + status: "running", + workflowId: DBOS.workflowID ?? null, + questionIndex: 0, + currentQuestion: null, + answers: {}, + scores: {}, + }; + + // Initialize quiz state + await this.updatePartyData(partyId, quizState); + + // Get party members to initialize scores + const members = await this.getPartyMembers(partyId); + for (const member of members) { + quizState.scores[member.userId] = 0; + } + + for (let i = 0; i < TOTAL_QUESTIONS; i++) { + quizState.questionIndex = i; + + const question = await this.generateQuestion(i); + quizState.currentQuestion = question; + quizState.answers = {}; + + await this.updatePartyData(partyId, quizState); + await DBOS.setEvent(`quiz_q${i}_question`, question); + await DBOS.setEvent(`quiz_q${i}_status`, "question"); + await DBOS.setEvent(`quiz_q${i}_index`, i); + + // Wait for all responses with timeout + const memberIds = new Set(members.map((m) => m.userId)); + const receivedPlayers = new Set(); + + while (receivedPlayers.size < memberIds.size) { + const response = await DBOS.recv( + "quiz_responses", + QUESTION_TIMEOUT_SECONDS, + ); + + if (response === null) { + // Timeout - fill in missing players with no answer + for (const memberId of memberIds) { + if (!receivedPlayers.has(memberId)) { + receivedPlayers.add(memberId); + quizState.answers[memberId] = { + playerId: memberId, + selected: -1, + correct: false, + }; + } + } + break; + } + + receivedPlayers.add(response.playerId); + const isCorrect = response.selected === question.correct; + quizState.answers[response.playerId] = { + ...response, + correct: isCorrect, + }; + + if (isCorrect) { + quizState.scores[response.playerId] = + (quizState.scores[response.playerId] ?? 0) + 10; + } + + await this.updatePartyData(partyId, quizState); + await DBOS.setEvent(`quiz_q${i}_answers`, quizState.answers); + await DBOS.setEvent(`quiz_q${i}_scores`, quizState.scores); + await DBOS.setEvent(`quiz_q${i}_status`, "results"); + } + + await this.saveAnswer(partyId, i, { + playerId: "system", + selected: question.correct, + correct: true, + }); + } + + // Quiz complete + quizState.status = "results"; + await this.updatePartyData(partyId, quizState); + await DBOS.setEvent("quiz_final_status", "results"); + await DBOS.setEvent("quiz_final_scores", quizState.scores); + } + + @DBOS.step() + private async getPartyMembers( + partyId: string, + ): Promise<{ id: string; userId: string }[]> { + return db + .select({ id: partyMember.id, userId: partyMember.userId }) + .from(partyMember) + .where(eq(partyMember.partyId, partyId)); + } +}