first iteration of quiz workflow

This commit is contained in:
Daniel Bulant 2026-04-30 21:54:13 +02:00
parent 58878752a8
commit 575374547c
No known key found for this signature in database
6 changed files with 411 additions and 3 deletions

View file

@ -1,5 +1,6 @@
import { DBOS } from "@dbos-inc/dbos-sdk";
import "./workflows/sync";
import "./workflows/quiz";
DBOS.setConfig({
name: "itpdp",

View file

@ -24,6 +24,22 @@ export type PartySocketOutgoing =
| { type: "ping" }
| { type: "member_payload"; payload: unknown };
export type QuizState = {
status: "idle" | "running" | "results";
workflowId: string | null;
questionIndex: number;
currentQuestion: {
text: string;
options: string[];
correct: number;
} | null;
answers: Record<
string,
{ playerId: string; selected: number; correct: boolean }
>;
scores: Record<string, number>;
};
export type PartySocketEvent =
| { type: "snapshot"; party: Party | null; members: PartyMemberWithUser[] }
| { type: "party_status"; party: Party; members: PartyMemberWithUser[] }

View file

@ -4,6 +4,7 @@ import { betterAuthElysia } from "../auth";
import { db } from "../db";
import { getMemberRecord, getPartyStatus } from "../party-data";
import type { QuizState } from "../party-types";
function userTopic(userId: string) {
return `user:${userId}`;
@ -25,6 +26,28 @@ export const pubsub = {
},
};
async function broadcastQuizState(ws: any, partyId: string) {
const partyRecord = await db.query.party.findFirst({
where: { id: partyId },
});
if (!partyRecord) return;
const quizData = ((partyRecord.data ?? {}) as Record<string, unknown>).quiz as
| QuizState
| undefined;
if (!quizData) return;
if (!quizData) return;
ws.publish(
partyTopic(partyId),
JSON.stringify({
type: "quiz_state",
quiz: quizData,
}),
);
}
export const topic = {
user: userTopic,
party: partyTopic,
@ -68,6 +91,8 @@ export const partySocketApp = new Elysia()
members: snapshot.members,
}),
);
await broadcastQuizState(ws, membership.partyId);
}
},
message: async (ws, message) => {

166
api/src/routes/quiz.ts Normal file
View file

@ -0,0 +1,166 @@
import { DBOS } from "@dbos-inc/dbos-sdk";
import { eq } from "drizzle-orm";
import Elysia, { t } from "elysia";
import { betterAuthElysia } from "../auth";
import { db } from "../db";
import { party } from "../db/schema";
import { getMemberRecord } from "../party-data";
import type { QuizState } from "../party-types";
import { QuizWorkflow, quizQueue } from "../workflows/quiz";
import { pubsub } from "./party-socket";
const quizWf = new QuizWorkflow();
export const quizRoutes = new Elysia()
.use(betterAuthElysia)
.group("/party/:partyId/quiz", (app) =>
app
.post(
"/start",
async ({ user, set, params }) => {
const membership = await getMemberRecord(db, user.id);
if (!membership || membership.partyId !== params.partyId) {
set.status = 403;
return { error: "Not a member of this party" };
}
const existingQuiz = await db
.select({ status: party.status })
.from(party)
.where(eq(party.id, params.partyId))
.limit(1)
.then((rows) => rows[0]);
if (existingQuiz?.status === "started") {
set.status = 409;
return { error: "Quiz already running" };
}
const handle = await DBOS.startWorkflow(quizWf.startQuiz, {
queueName: quizQueue.name,
})(params.partyId);
await db
.update(party)
.set({
status: "started",
lastUpdated: new Date(),
})
.where(eq(party.id, params.partyId));
pubsub.publish(
`party:${params.partyId}`,
JSON.stringify({
type: "party_status",
party: { status: "started" },
members: [],
}),
);
return {
message: "Quiz started",
workflowId: handle.workflowID,
};
},
{ auth: true },
)
.post(
"/response",
async ({ user, body, params, set }) => {
const existingQuiz = await db
.select({ data: party.data })
.from(party)
.where(eq(party.id, params.partyId))
.limit(1)
.then((rows) => rows[0]);
if (!existingQuiz) {
set.status = 404;
return { error: "Party not found" };
}
const quizData = (
(existingQuiz.data ?? {}) as Record<string, unknown>
).quiz as QuizState | undefined;
if (!quizData || quizData.status !== "running") {
set.status = 400;
return { error: "Quiz not running" };
}
if (!quizData.workflowId) {
set.status = 500;
return { error: "Workflow ID not found" };
}
await DBOS.send(
quizData.workflowId,
{ playerId: user.id, selected: body.selected },
"quiz_responses",
);
const updatedParty = await db
.select({ data: party.data })
.from(party)
.where(eq(party.id, params.partyId))
.limit(1)
.then((rows) => rows[0]);
const updatedQuizData = (
(updatedParty?.data ?? {}) as Record<string, unknown>
).quiz as QuizState | undefined;
if (updatedQuizData) {
pubsub.publish(
`party:${params.partyId}`,
JSON.stringify({
type: "quiz_state",
quiz: updatedQuizData,
}),
);
}
return { message: "Response recorded" };
},
{
auth: true,
body: t.Object({
selected: t.Integer(),
}),
},
)
.get(
"/status",
async ({ params, set }) => {
const existingQuiz = await db
.select({ data: party.data })
.from(party)
.where(eq(party.id, params.partyId))
.limit(1)
.then((rows) => rows[0]);
if (!existingQuiz) {
set.status = 404;
return { error: "Party not found" };
}
const quizData = (
(existingQuiz.data ?? {}) as Record<string, unknown>
).quiz as QuizState | undefined;
return {
quiz:
quizData ??
({
status: "idle",
workflowId: null,
questionIndex: 0,
currentQuestion: null,
answers: {},
scores: {},
} satisfies QuizState),
};
},
{ auth: true },
),
);

View file

@ -1,3 +1,4 @@
/** biome-ignore-all lint/style/noNonNullAssertion: <explanation> */
import { DBOS } from "@dbos-inc/dbos-sdk";
import { describe, expect, it } from "vitest";
import {
@ -284,8 +285,7 @@ describe("PartyAnalysisWorkflow", () => {
describe("analyzeParty - similarity calculation", () => {
it("calculates Jaccard-like similarity using min/max scoring", async () => {
const { partyId, userIdA, userIdB } =
await seedPartyWithTwoSimilarUsers();
const { partyId } = await seedPartyWithTwoSimilarUsers();
const result = await partyAnalysisWorkflow.analyzeParty(partyId);
@ -305,7 +305,6 @@ describe("PartyAnalysisWorkflow", () => {
await partyAnalysisWorkflow.analyzeParty(partyId);
const { db } = await import("../../db");
const { party } = await import("../../db/schema");
const savedParty = await db.query.party.findFirst({
where: { id: partyId },

201
api/src/workflows/quiz.ts Normal file
View file

@ -0,0 +1,201 @@
import { ConfiguredInstance, DBOS, WorkflowQueue } from "@dbos-inc/dbos-sdk";
import { eq } from "drizzle-orm";
import { db } from "../db";
import { party, partyMember } from "../db/schema";
import type { QuizState } from "../party-types";
const TOTAL_QUESTIONS = 5;
const QUESTION_TIMEOUT_SECONDS = 30;
export const quizQueue = new WorkflowQueue("quiz_queue", { concurrency: 1 });
type Response = {
playerId: string;
selected: number;
};
export class QuizWorkflow extends ConfiguredInstance {
constructor() {
super("QuizWorkflow");
}
@DBOS.step()
private async updatePartyData(
partyId: string,
quizState: QuizState,
): Promise<void> {
await db.transaction(async (tx) => {
const currentParty = await tx
.select({ data: party.data })
.from(party)
.where(eq(party.id, partyId))
.limit(1)
.then((rows) => rows[0]);
if (!currentParty) return;
const currentData = (currentParty.data ?? {}) as Record<string, unknown>;
await tx
.update(party)
.set({
data: { ...currentData, quiz: quizState },
lastUpdated: new Date(),
})
.where(eq(party.id, partyId));
});
}
@DBOS.step()
async generateQuestion(index: number): Promise<{
text: string;
options: string[];
correct: number;
}> {
// Placeholder - returns same question for now, question generation comes later
const questions: {
text: string;
options: string[];
correct: number;
}[] = [
{
text: "What is the most common genre in your party's shared taste?",
options: ["Hip-Hop", "Rock", "Electronic", "Jazz"],
correct: 0,
},
{
text: "Which artist do most party members follow?",
options: ["Artist A", "Artist B", "Artist C", "Artist D"],
correct: 1,
},
{
text: "What percentage of the party shares at least 1 album?",
options: ["0-25%", "25-50%", "50-75%", "75-100%"],
correct: 2,
},
{
text: "Who has the most diverse taste in the party?",
options: ["Player A", "Player B", "Player C", "Player D"],
correct: 0,
},
{
text: "Which track appears most in everyone's top 50?",
options: ["Track A", "Track B", "Track C", "Track D"],
correct: 3,
},
];
const question = questions[index % questions.length];
if (!question) {
throw new Error("Question not found");
}
return question;
}
@DBOS.step()
async saveAnswer(
_partyId: string,
_questionIndex: number,
_answer: Response & { correct: boolean },
): Promise<void> {
// Answers stored transiently in party.data, not persisted separately
}
@DBOS.workflow()
async startQuiz(partyId: string): Promise<void> {
const quizState: QuizState = {
status: "running",
workflowId: DBOS.workflowID ?? null,
questionIndex: 0,
currentQuestion: null,
answers: {},
scores: {},
};
// Initialize quiz state
await this.updatePartyData(partyId, quizState);
// Get party members to initialize scores
const members = await this.getPartyMembers(partyId);
for (const member of members) {
quizState.scores[member.userId] = 0;
}
for (let i = 0; i < TOTAL_QUESTIONS; i++) {
quizState.questionIndex = i;
const question = await this.generateQuestion(i);
quizState.currentQuestion = question;
quizState.answers = {};
await this.updatePartyData(partyId, quizState);
await DBOS.setEvent(`quiz_q${i}_question`, question);
await DBOS.setEvent(`quiz_q${i}_status`, "question");
await DBOS.setEvent(`quiz_q${i}_index`, i);
// Wait for all responses with timeout
const memberIds = new Set(members.map((m) => m.userId));
const receivedPlayers = new Set<string>();
while (receivedPlayers.size < memberIds.size) {
const response = await DBOS.recv<Response>(
"quiz_responses",
QUESTION_TIMEOUT_SECONDS,
);
if (response === null) {
// Timeout - fill in missing players with no answer
for (const memberId of memberIds) {
if (!receivedPlayers.has(memberId)) {
receivedPlayers.add(memberId);
quizState.answers[memberId] = {
playerId: memberId,
selected: -1,
correct: false,
};
}
}
break;
}
receivedPlayers.add(response.playerId);
const isCorrect = response.selected === question.correct;
quizState.answers[response.playerId] = {
...response,
correct: isCorrect,
};
if (isCorrect) {
quizState.scores[response.playerId] =
(quizState.scores[response.playerId] ?? 0) + 10;
}
await this.updatePartyData(partyId, quizState);
await DBOS.setEvent(`quiz_q${i}_answers`, quizState.answers);
await DBOS.setEvent(`quiz_q${i}_scores`, quizState.scores);
await DBOS.setEvent(`quiz_q${i}_status`, "results");
}
await this.saveAnswer(partyId, i, {
playerId: "system",
selected: question.correct,
correct: true,
});
}
// Quiz complete
quizState.status = "results";
await this.updatePartyData(partyId, quizState);
await DBOS.setEvent("quiz_final_status", "results");
await DBOS.setEvent("quiz_final_scores", quizState.scores);
}
@DBOS.step()
private async getPartyMembers(
partyId: string,
): Promise<{ id: string; userId: string }[]> {
return db
.select({ id: partyMember.id, userId: partyMember.userId })
.from(partyMember)
.where(eq(partyMember.partyId, partyId));
}
}