send updates in workflow

This commit is contained in:
Daniel Bulant 2026-05-01 19:39:29 +02:00
parent 5c2d2c8cf0
commit 21f859c480
No known key found for this signature in database
2 changed files with 111 additions and 97 deletions

View file

@ -3,7 +3,7 @@ import { eq } from "drizzle-orm";
import Elysia, { t } from "elysia";
import { betterAuthElysia } from "../auth";
import { db } from "../db";
import { party } from "../db/schema";
import { party, partyMember } from "../db/schema";
import { getMemberRecord } from "../party-data";
import type { QuizState } from "../party-types";
import { QuizWorkflow, quizQueue } from "../workflows/quiz";
@ -38,6 +38,7 @@ export const quizRoutes = new Elysia()
const handle = await DBOS.startWorkflow(quizWf.startQuiz, {
queueName: quizQueue.name,
enqueueOptions: { queuePartitionKey: params.partyId },
})(params.partyId);
await db
@ -48,12 +49,20 @@ export const quizRoutes = new Elysia()
})
.where(eq(party.id, params.partyId));
const members = await db
.select({
id: partyMember.id,
userId: partyMember.userId,
})
.from(partyMember)
.where(eq(partyMember.partyId, params.partyId));
pubsub.publish(
`party:${params.partyId}`,
JSON.stringify({
type: "party_status",
party: { status: "started" },
members: [],
members,
}),
);

View file

@ -3,11 +3,15 @@ import { eq } from "drizzle-orm";
import { db } from "../db";
import { party, partyMember } from "../db/schema";
import type { QuizState } from "../party-types";
import { pubsub } from "../routes/party-socket";
const TOTAL_QUESTIONS = 5;
const QUESTION_TIMEOUT_SECONDS = 30;
export const quizQueue = new WorkflowQueue("quiz_queue", { concurrency: 1 });
export const quizQueue = new WorkflowQueue("quiz_queue", {
concurrency: 1,
partitionQueue: true,
});
type Response = {
playerId: string;
@ -19,6 +23,94 @@ export class QuizWorkflow extends ConfiguredInstance {
super("QuizWorkflow");
}
@DBOS.workflow()
async startQuiz(partyId: string): Promise<void> {
const quizState: QuizState = {
status: "running",
workflowId: DBOS.workflowID ?? null,
questionIndex: 0,
currentQuestion: null,
answers: {},
scores: {},
};
// Initialize quiz state
await this.updatePartyData(partyId, quizState);
await this.broadcastState(partyId, quizState);
// Get party members to initialize scores
const members = await this.getPartyMembers(partyId);
for (const member of members) {
quizState.scores[member.userId] = 0;
}
for (let i = 0; i < TOTAL_QUESTIONS; i++) {
quizState.questionIndex = i;
const question = await this.generateQuestion(i);
quizState.currentQuestion = question;
quizState.answers = {};
await this.updatePartyData(partyId, quizState);
await this.broadcastState(partyId, quizState);
await DBOS.setEvent(`quiz_q${i}_question`, question);
await DBOS.setEvent(`quiz_q${i}_status`, "question");
await DBOS.setEvent(`quiz_q${i}_index`, i);
// Wait for all responses with timeout
const memberIds = new Set(members.map((m) => m.userId));
const receivedPlayers = new Set<string>();
while (receivedPlayers.size < memberIds.size) {
const response = await DBOS.recv<Response>(
"quiz_responses",
QUESTION_TIMEOUT_SECONDS,
);
if (response === null) {
// Timeout - fill in missing players with no answer
for (const memberId of memberIds) {
if (!receivedPlayers.has(memberId)) {
receivedPlayers.add(memberId);
quizState.answers[memberId] = {
playerId: memberId,
selected: -1,
correct: false,
};
}
}
await this.broadcastState(partyId, quizState);
break;
}
receivedPlayers.add(response.playerId);
const isCorrect = response.selected === question.correct;
quizState.answers[response.playerId] = {
...response,
correct: isCorrect,
};
if (isCorrect) {
quizState.scores[response.playerId] =
(quizState.scores[response.playerId] ?? 0) + 10;
}
await this.updatePartyData(partyId, quizState);
await this.broadcastState(partyId, quizState);
await DBOS.setEvent(`quiz_q${i}_answers`, quizState.answers);
await DBOS.setEvent(`quiz_q${i}_scores`, quizState.scores);
await DBOS.setEvent(`quiz_q${i}_status`, "results");
}
}
// Quiz complete
quizState.status = "results";
await this.updatePartyData(partyId, quizState);
await this.broadcastState(partyId, quizState);
await DBOS.setEvent("quiz_final_status", "results");
await DBOS.setEvent("quiz_final_scores", quizState.scores);
}
@DBOS.step()
private async updatePartyData(
partyId: string,
@ -92,101 +184,14 @@ export class QuizWorkflow extends ConfiguredInstance {
}
@DBOS.step()
async saveAnswer(
_partyId: string,
_questionIndex: number,
_answer: Response & { correct: boolean },
private async broadcastState(
partyId: string,
quizState: QuizState,
): Promise<void> {
// Answers stored transiently in party.data, not persisted separately
}
@DBOS.workflow()
async startQuiz(partyId: string): Promise<void> {
const quizState: QuizState = {
status: "running",
workflowId: DBOS.workflowID ?? null,
questionIndex: 0,
currentQuestion: null,
answers: {},
scores: {},
};
// Initialize quiz state
await this.updatePartyData(partyId, quizState);
// Get party members to initialize scores
const members = await this.getPartyMembers(partyId);
for (const member of members) {
quizState.scores[member.userId] = 0;
}
for (let i = 0; i < TOTAL_QUESTIONS; i++) {
quizState.questionIndex = i;
const question = await this.generateQuestion(i);
quizState.currentQuestion = question;
quizState.answers = {};
await this.updatePartyData(partyId, quizState);
await DBOS.setEvent(`quiz_q${i}_question`, question);
await DBOS.setEvent(`quiz_q${i}_status`, "question");
await DBOS.setEvent(`quiz_q${i}_index`, i);
// Wait for all responses with timeout
const memberIds = new Set(members.map((m) => m.userId));
const receivedPlayers = new Set<string>();
while (receivedPlayers.size < memberIds.size) {
const response = await DBOS.recv<Response>(
"quiz_responses",
QUESTION_TIMEOUT_SECONDS,
);
if (response === null) {
// Timeout - fill in missing players with no answer
for (const memberId of memberIds) {
if (!receivedPlayers.has(memberId)) {
receivedPlayers.add(memberId);
quizState.answers[memberId] = {
playerId: memberId,
selected: -1,
correct: false,
};
}
}
break;
}
receivedPlayers.add(response.playerId);
const isCorrect = response.selected === question.correct;
quizState.answers[response.playerId] = {
...response,
correct: isCorrect,
};
if (isCorrect) {
quizState.scores[response.playerId] =
(quizState.scores[response.playerId] ?? 0) + 10;
}
await this.updatePartyData(partyId, quizState);
await DBOS.setEvent(`quiz_q${i}_answers`, quizState.answers);
await DBOS.setEvent(`quiz_q${i}_scores`, quizState.scores);
await DBOS.setEvent(`quiz_q${i}_status`, "results");
}
await this.saveAnswer(partyId, i, {
playerId: "system",
selected: question.correct,
correct: true,
});
}
// Quiz complete
quizState.status = "results";
await this.updatePartyData(partyId, quizState);
await DBOS.setEvent("quiz_final_status", "results");
await DBOS.setEvent("quiz_final_scores", quizState.scores);
pubsub.publish(
`party:${partyId}`,
JSON.stringify({ type: "quiz_state", quiz: quizState }),
);
}
@DBOS.step()