diff --git a/api/src/routes/quiz.ts b/api/src/routes/quiz.ts index cff221a..cdc65d8 100644 --- a/api/src/routes/quiz.ts +++ b/api/src/routes/quiz.ts @@ -3,7 +3,7 @@ import { eq } from "drizzle-orm"; import Elysia, { t } from "elysia"; import { betterAuthElysia } from "../auth"; import { db } from "../db"; -import { party } from "../db/schema"; +import { party, partyMember } from "../db/schema"; import { getMemberRecord } from "../party-data"; import type { QuizState } from "../party-types"; import { QuizWorkflow, quizQueue } from "../workflows/quiz"; @@ -38,6 +38,7 @@ export const quizRoutes = new Elysia() const handle = await DBOS.startWorkflow(quizWf.startQuiz, { queueName: quizQueue.name, + enqueueOptions: { queuePartitionKey: params.partyId }, })(params.partyId); await db @@ -48,12 +49,20 @@ export const quizRoutes = new Elysia() }) .where(eq(party.id, params.partyId)); + const members = await db + .select({ + id: partyMember.id, + userId: partyMember.userId, + }) + .from(partyMember) + .where(eq(partyMember.partyId, params.partyId)); + pubsub.publish( `party:${params.partyId}`, JSON.stringify({ type: "party_status", party: { status: "started" }, - members: [], + members, }), ); diff --git a/api/src/workflows/quiz.ts b/api/src/workflows/quiz.ts index ec2604e..537eb02 100644 --- a/api/src/workflows/quiz.ts +++ b/api/src/workflows/quiz.ts @@ -3,11 +3,15 @@ import { eq } from "drizzle-orm"; import { db } from "../db"; import { party, partyMember } from "../db/schema"; import type { QuizState } from "../party-types"; +import { pubsub } from "../routes/party-socket"; const TOTAL_QUESTIONS = 5; const QUESTION_TIMEOUT_SECONDS = 30; -export const quizQueue = new WorkflowQueue("quiz_queue", { concurrency: 1 }); +export const quizQueue = new WorkflowQueue("quiz_queue", { + concurrency: 1, + partitionQueue: true, +}); type Response = { playerId: string; @@ -19,6 +23,94 @@ export class QuizWorkflow extends ConfiguredInstance { super("QuizWorkflow"); } + @DBOS.workflow() + async startQuiz(partyId: string): Promise { + const quizState: QuizState = { + status: "running", + workflowId: DBOS.workflowID ?? null, + questionIndex: 0, + currentQuestion: null, + answers: {}, + scores: {}, + }; + + // Initialize quiz state + await this.updatePartyData(partyId, quizState); + await this.broadcastState(partyId, quizState); + + // Get party members to initialize scores + const members = await this.getPartyMembers(partyId); + for (const member of members) { + quizState.scores[member.userId] = 0; + } + + for (let i = 0; i < TOTAL_QUESTIONS; i++) { + quizState.questionIndex = i; + + const question = await this.generateQuestion(i); + quizState.currentQuestion = question; + quizState.answers = {}; + + await this.updatePartyData(partyId, quizState); + await this.broadcastState(partyId, quizState); + await DBOS.setEvent(`quiz_q${i}_question`, question); + await DBOS.setEvent(`quiz_q${i}_status`, "question"); + await DBOS.setEvent(`quiz_q${i}_index`, i); + + // Wait for all responses with timeout + const memberIds = new Set(members.map((m) => m.userId)); + const receivedPlayers = new Set(); + + while (receivedPlayers.size < memberIds.size) { + const response = await DBOS.recv( + "quiz_responses", + QUESTION_TIMEOUT_SECONDS, + ); + + if (response === null) { + // Timeout - fill in missing players with no answer + for (const memberId of memberIds) { + if (!receivedPlayers.has(memberId)) { + receivedPlayers.add(memberId); + quizState.answers[memberId] = { + playerId: memberId, + selected: -1, + correct: false, + }; + } + } + await this.broadcastState(partyId, quizState); + break; + } + + receivedPlayers.add(response.playerId); + const isCorrect = response.selected === question.correct; + quizState.answers[response.playerId] = { + ...response, + correct: isCorrect, + }; + + if (isCorrect) { + quizState.scores[response.playerId] = + (quizState.scores[response.playerId] ?? 0) + 10; + } + + await this.updatePartyData(partyId, quizState); + await this.broadcastState(partyId, quizState); + await DBOS.setEvent(`quiz_q${i}_answers`, quizState.answers); + await DBOS.setEvent(`quiz_q${i}_scores`, quizState.scores); + await DBOS.setEvent(`quiz_q${i}_status`, "results"); + } + } + + // Quiz complete + quizState.status = "results"; + await this.updatePartyData(partyId, quizState); + await this.broadcastState(partyId, quizState); + await DBOS.setEvent("quiz_final_status", "results"); + await DBOS.setEvent("quiz_final_scores", quizState.scores); + } + @DBOS.step() private async updatePartyData( partyId: string, @@ -92,101 +184,14 @@ export class QuizWorkflow extends ConfiguredInstance { } @DBOS.step() - async saveAnswer( - _partyId: string, - _questionIndex: number, - _answer: Response & { correct: boolean }, + private async broadcastState( + partyId: string, + quizState: QuizState, ): Promise { - // Answers stored transiently in party.data, not persisted separately - } - - @DBOS.workflow() - async startQuiz(partyId: string): Promise { - const quizState: QuizState = { - status: "running", - workflowId: DBOS.workflowID ?? null, - questionIndex: 0, - currentQuestion: null, - answers: {}, - scores: {}, - }; - - // Initialize quiz state - await this.updatePartyData(partyId, quizState); - - // Get party members to initialize scores - const members = await this.getPartyMembers(partyId); - for (const member of members) { - quizState.scores[member.userId] = 0; - } - - for (let i = 0; i < TOTAL_QUESTIONS; i++) { - quizState.questionIndex = i; - - const question = await this.generateQuestion(i); - quizState.currentQuestion = question; - quizState.answers = {}; - - await this.updatePartyData(partyId, quizState); - await DBOS.setEvent(`quiz_q${i}_question`, question); - await DBOS.setEvent(`quiz_q${i}_status`, "question"); - await DBOS.setEvent(`quiz_q${i}_index`, i); - - // Wait for all responses with timeout - const memberIds = new Set(members.map((m) => m.userId)); - const receivedPlayers = new Set(); - - while (receivedPlayers.size < memberIds.size) { - const response = await DBOS.recv( - "quiz_responses", - QUESTION_TIMEOUT_SECONDS, - ); - - if (response === null) { - // Timeout - fill in missing players with no answer - for (const memberId of memberIds) { - if (!receivedPlayers.has(memberId)) { - receivedPlayers.add(memberId); - quizState.answers[memberId] = { - playerId: memberId, - selected: -1, - correct: false, - }; - } - } - break; - } - - receivedPlayers.add(response.playerId); - const isCorrect = response.selected === question.correct; - quizState.answers[response.playerId] = { - ...response, - correct: isCorrect, - }; - - if (isCorrect) { - quizState.scores[response.playerId] = - (quizState.scores[response.playerId] ?? 0) + 10; - } - - await this.updatePartyData(partyId, quizState); - await DBOS.setEvent(`quiz_q${i}_answers`, quizState.answers); - await DBOS.setEvent(`quiz_q${i}_scores`, quizState.scores); - await DBOS.setEvent(`quiz_q${i}_status`, "results"); - } - - await this.saveAnswer(partyId, i, { - playerId: "system", - selected: question.correct, - correct: true, - }); - } - - // Quiz complete - quizState.status = "results"; - await this.updatePartyData(partyId, quizState); - await DBOS.setEvent("quiz_final_status", "results"); - await DBOS.setEvent("quiz_final_scores", quizState.scores); + pubsub.publish( + `party:${partyId}`, + JSON.stringify({ type: "quiz_state", quiz: quizState }), + ); } @DBOS.step()