From 0475ee343a1a124325d56f65d0d0d5946d23afb7 Mon Sep 17 00:00:00 2001 From: Daniel Bulant Date: Mon, 11 May 2026 20:21:01 +0200 Subject: [PATCH] progress on device connection --- api/src/db/schema.ts | 13 +++ api/src/index.ts | 7 +- api/src/routes/device-socket.ts | 155 +++++++++++++++++++++++++------- api/src/routes/party.ts | 4 +- dev-proxy/index.ts | 123 ++++++++++++------------- 5 files changed, 204 insertions(+), 98 deletions(-) diff --git a/api/src/db/schema.ts b/api/src/db/schema.ts index 143885f..419102a 100644 --- a/api/src/db/schema.ts +++ b/api/src/db/schema.ts @@ -58,6 +58,19 @@ export const partyMember = pgTable( ], ); +export const deviceConnection = pgTable( + "device_connection", + { + id: text().primaryKey().notNull(), + userId: text() + .references(() => user.id) + .notNull(), + connectedAt: timestamp().defaultNow().notNull(), + lastSeen: timestamp().defaultNow().notNull(), + }, + (deviceConnection) => [index().on(deviceConnection.userId)], +); + export const platform = pgEnum("enum_platform", ["spotify", "apple"]); export const artist = pgTable("artist", { diff --git a/api/src/index.ts b/api/src/index.ts index baa84bb..d8d6caa 100644 --- a/api/src/index.ts +++ b/api/src/index.ts @@ -10,7 +10,7 @@ import { partyApp } from "./routes/party"; import { partySocketApp, pubsub } from "./routes/party-socket"; import { quizRoutes } from "./routes/quiz.ts"; import { statsApp } from "./routes/stats.ts"; -import { deviceSocketApp } from "./routes/device-socket.ts"; +import { deviceClaimApp, deviceSocketApp } from "./routes/device-socket.ts"; const app = new Elysia() .use(betterAuthElysia) @@ -21,8 +21,9 @@ const app = new Elysia() .use(partyApp) .use(partyAnalysisApp) .use(partySocketApp) - .use(quizRoutes) - .use(deviceSocketApp) + .use(deviceClaimApp) + .use(quizRoutes) + .use(deviceSocketApp) .get("/", () => ({ ok: true })), ) .listen(4000); diff --git a/api/src/routes/device-socket.ts b/api/src/routes/device-socket.ts index d0110e5..419cc21 100644 --- a/api/src/routes/device-socket.ts +++ b/api/src/routes/device-socket.ts @@ -1,47 +1,134 @@ +import { eq } from "drizzle-orm"; import Elysia from "elysia"; +import { betterAuthElysia } from "../auth"; import { db } from "../db"; -import { getMemberRecord, getPartyStatus } from "../party-data"; -import { - broadcastQuizState, - partyTopic, - socketPartyId, - userTopic, -} from "./party-socket"; +import { deviceConnection } from "../db/schema"; +import { getMemberRecord } from "../party-data"; +import type { PartySocketEvent } from "../party-types"; +import { pubsub, topic } from "./party-socket"; + +type DeviceSocketMessage = + | { type: "device_message"; deviceId: string; payload: unknown } + | { type: "hello" } + | { type: "device_event"; deviceId: string; event: PartySocketEvent }; + +let devProxySocket: WebSocket | null = null; + +function isDeviceMessage( + value: unknown, +): value is Extract { + return ( + typeof value === "object" && + value !== null && + (value as { type?: unknown }).type === "device_message" && + "deviceId" in value + ); +} + +export async function claimDeviceForUser(deviceId: string, userId: string) { + await db + .insert(deviceConnection) + .values({ + id: deviceId, + userId, + connectedAt: new Date(), + lastSeen: new Date(), + }) + .onConflictDoUpdate({ + target: deviceConnection.id, + set: { + userId, + lastSeen: new Date(), + }, + }); +} + +export async function publishDeviceEventForUser( + userId: string, + event: PartySocketEvent, +) { + if (!devProxySocket || devProxySocket.readyState !== WebSocket.OPEN) return; + + const devices = await db + .select() + .from(deviceConnection) + .where(eq(deviceConnection.userId, userId)); + + for (const device of devices) { + devProxySocket.send( + JSON.stringify({ + type: "device_event", + deviceId: device.id, + event, + } satisfies DeviceSocketMessage), + ); + } +} + +async function forwardDevicePayload(deviceId: string, payload: unknown) { + const device = await db + .select() + .from(deviceConnection) + .where(eq(deviceConnection.id, deviceId)) + .then((rows) => rows[0]); + if (!device) return; + + const membership = await getMemberRecord(db, device.userId); + if (!membership) return; + + const payloadString = JSON.stringify(payload); + if (payloadString.length > 8_000) return; + + pubsub.publish( + topic.party(membership.partyId), + JSON.stringify({ + type: "member_payload", + fromUserId: device.userId, + payload, + } satisfies PartySocketEvent), + ); +} export const deviceSocketApp = new Elysia().group("/dev-socket", (app) => app + .use(betterAuthElysia) .get("/test", () => ({ ok: 1 })) .ws("/ws", { - async open(ws) { - const id = "zzxWcTUntIWTHkX8atEOv7Neiu7XEz9t"; - ws.subscribe(userTopic(id)); - const membership = await getMemberRecord(db, id); - if (!membership) { - ws.send( - JSON.stringify({ - type: "snapshot", - party: null, - members: [], - }), - ); + open(ws) { + devProxySocket = ws as unknown as WebSocket; + ws.send( + JSON.stringify({ type: "hello" } satisfies DeviceSocketMessage), + ); + }, + message: async (_ws, message) => { + if (typeof message !== "string") return; + + let parsed: DeviceSocketMessage; + try { + parsed = JSON.parse(message) as DeviceSocketMessage; + } catch { return; } - socketPartyId.set(ws, membership.partyId); - ws.subscribe(partyTopic(membership.partyId)); - - const snapshot = await getPartyStatus(membership.partyId); - if (snapshot) { - ws.send( - JSON.stringify({ - type: "snapshot", - party: snapshot.party, - members: snapshot.members, - }), - ); - - await broadcastQuizState(ws, membership.partyId); - } + if (!isDeviceMessage(parsed)) return; + await forwardDevicePayload(parsed.deviceId, parsed.payload); + }, + close() { + if (devProxySocket === null) return; + devProxySocket = null; }, }), ); + +export const deviceClaimApp = new Elysia() + .use(betterAuthElysia) + .group("/devices", (app) => + app.post( + "/:deviceId/connect", + async ({ user, params }) => { + await claimDeviceForUser(params.deviceId, user.id); + return { ok: true, deviceId: params.deviceId, userId: user.id }; + }, + { auth: true }, + ), + ); diff --git a/api/src/routes/party.ts b/api/src/routes/party.ts index 96477b1..1b36285 100644 --- a/api/src/routes/party.ts +++ b/api/src/routes/party.ts @@ -10,7 +10,8 @@ import { getPartyStatus, leaveParty, } from "../party-data"; -import type { PartySnapshot } from "../party-types"; +import type { PartySnapshot, PartySocketEvent } from "../party-types"; +import { publishDeviceEventForUser } from "./device-socket"; import { pubsub, topic } from "./party-socket"; function broadcastSnapshot(partyId: string, snapshot: PartySnapshot | null) { @@ -27,6 +28,7 @@ function broadcastSnapshot(partyId: string, snapshot: PartySnapshot | null) { function broadcastToUser(userId: string, event: Record) { pubsub.publish(topic.user(userId), JSON.stringify(event)); + void publishDeviceEventForUser(userId, event as PartySocketEvent); } function isValidStatus( diff --git a/dev-proxy/index.ts b/dev-proxy/index.ts index 1dc78e8..882f512 100644 --- a/dev-proxy/index.ts +++ b/dev-proxy/index.ts @@ -1,75 +1,78 @@ import type { Socket } from "bun"; -import type { PartySocketEvent, PartyState } from "../api/src/party-types"; -const sockets = new Set(); -let lastData: string; +type ApiEnvelope = + | { type: "hello" } + | { type: "device_event"; deviceId: string; event: unknown }; -const socket = Bun.listen({ +const sockets = new Map(); +const socketIds = new WeakMap(); +const apiSocket = new WebSocket("ws://localhost:4000/api/dev-socket/ws"); + +function socketDeviceId(socket: Socket) { + return socketIds.get(socket); +} + +function registerSocket(socket: Socket, deviceId: string) { + const existing = sockets.get(deviceId); + if (existing && existing !== socket) existing.end(); + sockets.set(deviceId, socket); + socketIds.set(socket, deviceId); +} + +const listener = Bun.listen({ port: 7070, hostname: "0.0.0.0", socket: { + open(socket) { + socket.setKeepAlive(true); + }, data(socket, buf) { - console.log("Data from", socket.remoteAddress, socket.remotePort) - const str = new TextDecoder().decode(buf); - console.log(str); + const raw = new TextDecoder().decode(buf).trim(); + if (!raw) return; + + const currentDeviceId = socketDeviceId(socket); + if (!currentDeviceId) { + registerSocket(socket, raw); + return; + } + + apiSocket?.send( + JSON.stringify({ + type: "device_message", + deviceId: currentDeviceId, + payload: raw, + }), + ); }, - open(socket) { - console.log("Connection from", socket.remoteAddress, socket.remotePort) - sockets.add(socket); - socket.setKeepAlive(true); - if (lastData) socket.write(lastData); - }, - close(socket) { - console.log("Connection closed", socket.remoteAddress, socket.remotePort) - sockets.delete(socket); + close(socket) { + const deviceId = socketDeviceId(socket); + if (deviceId && sockets.get(deviceId) === socket) { + sockets.delete(deviceId); + } }, }, }); -const ws: WebSocket | null = new WebSocket( - "ws://localhost:4000/api/dev-socket/ws", -); - -ws.onerror = (e) => { - console.error(e); -}; - -ws.onopen = () => { - console.log("WebSocket open"); -}; - -ws.onmessage = (e) => { - const data = JSON.parse(e.data) as PartySocketEvent; - console.log(data); - switch (data.type) { - case "party_status": { - const { party } = data; - if (!party) return; - const partyData = party.data; - if (!partyData) return; - const { currentQuestion } = partyData; - console.log(currentQuestion); - const text = currentQuestion?.text; - if (!text) return; - const obj = { - type: currentQuestion.type, - points: currentQuestion.points, - } as Record; - if (currentQuestion.type === "numeric") { - obj.rangeMin = currentQuestion.range.min; - obj.rangeMax = currentQuestion.range.max; - } - const objText = Object.entries(obj) - .map(([k, v]) => `${k}=${v}`) - .join(" "); - const writeData = `$$\n${objText}\n${text}\n`; - lastData = writeData; - for (const socket of sockets) { - socket.write(writeData); - } - break; - } +apiSocket.onmessage = (e) => { + let message: ApiEnvelope; + try { + message = JSON.parse(e.data) as ApiEnvelope; + } catch { + return; } + + if (message.type !== "device_event") return; + const socket = sockets.get(message.deviceId); + if (!socket) return; + socket.write(`${JSON.stringify(message.event)}\n`); }; -console.log(`Started on :${socket.port}`); +apiSocket.onerror = (error) => { + console.error(error); +}; + +apiSocket.onopen = () => { + console.log("Connected to API device socket"); +}; + +console.log(`Started on :${listener.port}`);