progress on device connection

This commit is contained in:
Daniel Bulant 2026-05-11 20:21:01 +02:00
parent ea1db833f3
commit 0475ee343a
No known key found for this signature in database
5 changed files with 204 additions and 98 deletions

View file

@ -58,6 +58,19 @@ export const partyMember = pgTable(
],
);
export const deviceConnection = pgTable(
"device_connection",
{
id: text().primaryKey().notNull(),
userId: text()
.references(() => user.id)
.notNull(),
connectedAt: timestamp().defaultNow().notNull(),
lastSeen: timestamp().defaultNow().notNull(),
},
(deviceConnection) => [index().on(deviceConnection.userId)],
);
export const platform = pgEnum("enum_platform", ["spotify", "apple"]);
export const artist = pgTable("artist", {

View file

@ -10,7 +10,7 @@ import { partyApp } from "./routes/party";
import { partySocketApp, pubsub } from "./routes/party-socket";
import { quizRoutes } from "./routes/quiz.ts";
import { statsApp } from "./routes/stats.ts";
import { deviceSocketApp } from "./routes/device-socket.ts";
import { deviceClaimApp, deviceSocketApp } from "./routes/device-socket.ts";
const app = new Elysia()
.use(betterAuthElysia)
@ -21,8 +21,9 @@ const app = new Elysia()
.use(partyApp)
.use(partyAnalysisApp)
.use(partySocketApp)
.use(quizRoutes)
.use(deviceSocketApp)
.use(deviceClaimApp)
.use(quizRoutes)
.use(deviceSocketApp)
.get("/", () => ({ ok: true })),
)
.listen(4000);

View file

@ -1,47 +1,134 @@
import { eq } from "drizzle-orm";
import Elysia from "elysia";
import { betterAuthElysia } from "../auth";
import { db } from "../db";
import { getMemberRecord, getPartyStatus } from "../party-data";
import {
broadcastQuizState,
partyTopic,
socketPartyId,
userTopic,
} from "./party-socket";
import { deviceConnection } from "../db/schema";
import { getMemberRecord } from "../party-data";
import type { PartySocketEvent } from "../party-types";
import { pubsub, topic } from "./party-socket";
type DeviceSocketMessage =
| { type: "device_message"; deviceId: string; payload: unknown }
| { type: "hello" }
| { type: "device_event"; deviceId: string; event: PartySocketEvent };
let devProxySocket: WebSocket | null = null;
function isDeviceMessage(
value: unknown,
): value is Extract<DeviceSocketMessage, { type: "device_message" }> {
return (
typeof value === "object" &&
value !== null &&
(value as { type?: unknown }).type === "device_message" &&
"deviceId" in value
);
}
export async function claimDeviceForUser(deviceId: string, userId: string) {
await db
.insert(deviceConnection)
.values({
id: deviceId,
userId,
connectedAt: new Date(),
lastSeen: new Date(),
})
.onConflictDoUpdate({
target: deviceConnection.id,
set: {
userId,
lastSeen: new Date(),
},
});
}
export async function publishDeviceEventForUser(
userId: string,
event: PartySocketEvent,
) {
if (!devProxySocket || devProxySocket.readyState !== WebSocket.OPEN) return;
const devices = await db
.select()
.from(deviceConnection)
.where(eq(deviceConnection.userId, userId));
for (const device of devices) {
devProxySocket.send(
JSON.stringify({
type: "device_event",
deviceId: device.id,
event,
} satisfies DeviceSocketMessage),
);
}
}
async function forwardDevicePayload(deviceId: string, payload: unknown) {
const device = await db
.select()
.from(deviceConnection)
.where(eq(deviceConnection.id, deviceId))
.then((rows) => rows[0]);
if (!device) return;
const membership = await getMemberRecord(db, device.userId);
if (!membership) return;
const payloadString = JSON.stringify(payload);
if (payloadString.length > 8_000) return;
pubsub.publish(
topic.party(membership.partyId),
JSON.stringify({
type: "member_payload",
fromUserId: device.userId,
payload,
} satisfies PartySocketEvent),
);
}
export const deviceSocketApp = new Elysia().group("/dev-socket", (app) =>
app
.use(betterAuthElysia)
.get("/test", () => ({ ok: 1 }))
.ws("/ws", {
async open(ws) {
const id = "zzxWcTUntIWTHkX8atEOv7Neiu7XEz9t";
ws.subscribe(userTopic(id));
const membership = await getMemberRecord(db, id);
if (!membership) {
ws.send(
JSON.stringify({
type: "snapshot",
party: null,
members: [],
}),
);
open(ws) {
devProxySocket = ws as unknown as WebSocket;
ws.send(
JSON.stringify({ type: "hello" } satisfies DeviceSocketMessage),
);
},
message: async (_ws, message) => {
if (typeof message !== "string") return;
let parsed: DeviceSocketMessage;
try {
parsed = JSON.parse(message) as DeviceSocketMessage;
} catch {
return;
}
socketPartyId.set(ws, membership.partyId);
ws.subscribe(partyTopic(membership.partyId));
const snapshot = await getPartyStatus(membership.partyId);
if (snapshot) {
ws.send(
JSON.stringify({
type: "snapshot",
party: snapshot.party,
members: snapshot.members,
}),
);
await broadcastQuizState(ws, membership.partyId);
}
if (!isDeviceMessage(parsed)) return;
await forwardDevicePayload(parsed.deviceId, parsed.payload);
},
close() {
if (devProxySocket === null) return;
devProxySocket = null;
},
}),
);
export const deviceClaimApp = new Elysia()
.use(betterAuthElysia)
.group("/devices", (app) =>
app.post(
"/:deviceId/connect",
async ({ user, params }) => {
await claimDeviceForUser(params.deviceId, user.id);
return { ok: true, deviceId: params.deviceId, userId: user.id };
},
{ auth: true },
),
);

View file

@ -10,7 +10,8 @@ import {
getPartyStatus,
leaveParty,
} from "../party-data";
import type { PartySnapshot } from "../party-types";
import type { PartySnapshot, PartySocketEvent } from "../party-types";
import { publishDeviceEventForUser } from "./device-socket";
import { pubsub, topic } from "./party-socket";
function broadcastSnapshot(partyId: string, snapshot: PartySnapshot | null) {
@ -27,6 +28,7 @@ function broadcastSnapshot(partyId: string, snapshot: PartySnapshot | null) {
function broadcastToUser(userId: string, event: Record<string, unknown>) {
pubsub.publish(topic.user(userId), JSON.stringify(event));
void publishDeviceEventForUser(userId, event as PartySocketEvent);
}
function isValidStatus(

View file

@ -1,75 +1,78 @@
import type { Socket } from "bun";
import type { PartySocketEvent, PartyState } from "../api/src/party-types";
const sockets = new Set<Socket>();
let lastData: string;
type ApiEnvelope =
| { type: "hello" }
| { type: "device_event"; deviceId: string; event: unknown };
const socket = Bun.listen({
const sockets = new Map<string, Socket>();
const socketIds = new WeakMap<Socket, string>();
const apiSocket = new WebSocket("ws://localhost:4000/api/dev-socket/ws");
function socketDeviceId(socket: Socket) {
return socketIds.get(socket);
}
function registerSocket(socket: Socket, deviceId: string) {
const existing = sockets.get(deviceId);
if (existing && existing !== socket) existing.end();
sockets.set(deviceId, socket);
socketIds.set(socket, deviceId);
}
const listener = Bun.listen({
port: 7070,
hostname: "0.0.0.0",
socket: {
open(socket) {
socket.setKeepAlive(true);
},
data(socket, buf) {
console.log("Data from", socket.remoteAddress, socket.remotePort)
const str = new TextDecoder().decode(buf);
console.log(str);
const raw = new TextDecoder().decode(buf).trim();
if (!raw) return;
const currentDeviceId = socketDeviceId(socket);
if (!currentDeviceId) {
registerSocket(socket, raw);
return;
}
apiSocket?.send(
JSON.stringify({
type: "device_message",
deviceId: currentDeviceId,
payload: raw,
}),
);
},
open(socket) {
console.log("Connection from", socket.remoteAddress, socket.remotePort)
sockets.add(socket);
socket.setKeepAlive(true);
if (lastData) socket.write(lastData);
},
close(socket) {
console.log("Connection closed", socket.remoteAddress, socket.remotePort)
sockets.delete(socket);
close(socket) {
const deviceId = socketDeviceId(socket);
if (deviceId && sockets.get(deviceId) === socket) {
sockets.delete(deviceId);
}
},
},
});
const ws: WebSocket | null = new WebSocket(
"ws://localhost:4000/api/dev-socket/ws",
);
ws.onerror = (e) => {
console.error(e);
};
ws.onopen = () => {
console.log("WebSocket open");
};
ws.onmessage = (e) => {
const data = JSON.parse(e.data) as PartySocketEvent;
console.log(data);
switch (data.type) {
case "party_status": {
const { party } = data;
if (!party) return;
const partyData = party.data;
if (!partyData) return;
const { currentQuestion } = partyData;
console.log(currentQuestion);
const text = currentQuestion?.text;
if (!text) return;
const obj = {
type: currentQuestion.type,
points: currentQuestion.points,
} as Record<string, string | number>;
if (currentQuestion.type === "numeric") {
obj.rangeMin = currentQuestion.range.min;
obj.rangeMax = currentQuestion.range.max;
}
const objText = Object.entries(obj)
.map(([k, v]) => `${k}=${v}`)
.join(" ");
const writeData = `$$\n${objText}\n${text}\n`;
lastData = writeData;
for (const socket of sockets) {
socket.write(writeData);
}
break;
}
apiSocket.onmessage = (e) => {
let message: ApiEnvelope;
try {
message = JSON.parse(e.data) as ApiEnvelope;
} catch {
return;
}
if (message.type !== "device_event") return;
const socket = sockets.get(message.deviceId);
if (!socket) return;
socket.write(`${JSON.stringify(message.event)}\n`);
};
console.log(`Started on :${socket.port}`);
apiSocket.onerror = (error) => {
console.error(error);
};
apiSocket.onopen = () => {
console.log("Connected to API device socket");
};
console.log(`Started on :${listener.port}`);