import "dotenv/config"; import http from "node:http"; import express, { type Request, type Response } from "express"; import { Server as SocketIOServer } from "socket.io"; import { createClient } from "redis"; import { createAdapter } from "@socket.io/redis-adapter"; import { verifyToken } from "@clerk/backend"; import { db, sql } from "@fitai/database"; interface ThreadAccess { id: string; type: "gym" | "dm"; gymId: string | null; trainerId: string | null; clientId: string | null; } interface IncomingChatMessagePayload { threadId?: string; body?: string; clientMessageId?: string; attachments?: unknown; } interface AuthenticatedSocketData { user: { id: string; role: "superAdmin" | "admin" | "trainer" | "client"; gymId: string | null; }; } const app = express(); const server = http.createServer(app); const sendRateWindowByUser = new Map(); const typingRateWindowByKey = new Map(); const readRateWindowByKey = new Map(); const SEND_LIMIT_PER_MINUTE = 30; const TYPING_LIMIT_PER_10_SECONDS = 20; const READ_LIMIT_PER_10_SECONDS = 30; let redisRateClient: ReturnType | null = null; const io = new SocketIOServer< Record, Record, Record, AuthenticatedSocketData >(server, { cors: { origin: process.env.REALTIME_CORS_ORIGIN?.split(",") ?? "*", credentials: true, }, path: process.env.REALTIME_SOCKET_PATH ?? "/socket.io", }); app.get("/healthz", (_req: Request, res: Response) => { res.json({ ok: true, service: "realtime" }); }); async function attachRedisAdapter(): Promise { const redisUrl = process.env.REDIS_URL; if (!redisUrl) { return; } const pubClient = createClient({ url: redisUrl }); const subClient = pubClient.duplicate(); await pubClient.connect(); await subClient.connect(); redisRateClient = pubClient; io.adapter(createAdapter(pubClient, subClient)); } io.use(async (socket: any, next: (err?: Error) => void) => { try { const token = typeof socket.handshake.auth?.token === "string" ? socket.handshake.auth.token : null; const userId = await verifyAndExtractUserId(token); if (!userId) { return next(new Error("Unauthorized")); } const rows = await db.all(sql` SELECT id, role, gym_id as gymId FROM users WHERE id = ${userId} LIMIT 1 `); const user = rows?.[0] as | { id: string; role: "superAdmin" | "admin" | "trainer" | "client"; gymId: string | null; } | undefined; if (!user) { return next(new Error("Unauthorized")); } socket.data.user = user; return next(); } catch { return next(new Error("Unauthorized")); } }); io.on("connection", (socket: any) => { socket.on("chat:subscribe", async (payload: { threadId?: string }) => { const threadId = payload?.threadId; if (!threadId) { socket.emit("chat:error", { code: "invalid_payload" }); return; } const allowed = await canAccessThread(socket.data.user, threadId); if (!allowed) { socket.emit("chat:error", { code: "forbidden" }); return; } await socket.join(`thread:${threadId}`); socket.emit("chat:subscribed", { threadId }); }); socket.on("chat:unsubscribe", async (payload: { threadId?: string }) => { const threadId = payload?.threadId; if (!threadId) { return; } await socket.leave(`thread:${threadId}`); }); socket.on( "chat:typing", async (payload: { threadId?: string; isTyping?: boolean }) => { const threadId = payload?.threadId; if (!threadId) { return; } const typingAllowed = await checkRateLimit( typingRateWindowByKey, `typing:${socket.data.user.id}:${threadId}`, TYPING_LIMIT_PER_10_SECONDS, 10, ); if (!typingAllowed) { return; } const allowed = await canAccessThread(socket.data.user, threadId); if (!allowed) { return; } socket.to(`thread:${threadId}`).emit("chat:typing", { threadId, userId: socket.data.user.id, isTyping: Boolean(payload?.isTyping), }); }, ); socket.on("chat:send", async (payload: IncomingChatMessagePayload) => { try { const threadId = typeof payload?.threadId === "string" ? payload.threadId : null; const body = typeof payload?.body === "string" ? payload.body.trim() : ""; if (!threadId || !body) { socket.emit("chat:error", { code: "invalid_payload", message: "threadId and body are required", }); return; } const sendAllowed = await checkRateLimit( sendRateWindowByUser, `send:${socket.data.user.id}`, SEND_LIMIT_PER_MINUTE, 60, ); if (!sendAllowed) { socket.emit("chat:error", { code: "rate_limited", message: "Too many messages. Please slow down.", }); return; } if (body.length > 2000) { socket.emit("chat:error", { code: "message_too_long", message: "Message is too long", }); return; } const access = await getThreadAccess(socket.data.user, threadId); if (!access) { socket.emit("chat:error", { code: "forbidden", message: "Forbidden" }); return; } await ensureThreadMembership( threadId, socket.data.user.id, socket.data.user.role, ); const clientMessageId = typeof payload.clientMessageId === "string" && payload.clientMessageId.trim().length > 0 ? payload.clientMessageId.trim() : null; if (clientMessageId) { const existingRows = await db.all(sql` SELECT id, thread_id as threadId, sender_user_id as senderUserId, body, kind, attachments_json as attachmentsJson, client_message_id as clientMessageId, created_at as createdAt, edited_at as editedAt, deleted_at as deletedAt FROM chat_messages WHERE sender_user_id = ${socket.data.user.id} AND client_message_id = ${clientMessageId} LIMIT 1 `); if (existingRows?.[0]) { const message = mapMessageRow(existingRows[0] as MessageRow); socket.emit("chat:message:ack", { threadId, clientMessageId, message, duplicate: true, }); return; } } const attachments = normalizeAttachments(payload.attachments); const now = nowEpochSeconds(); const messageId = generateId("msg"); await db.run(sql` INSERT INTO chat_messages ( id, thread_id, sender_user_id, body, kind, attachments_json, client_message_id, created_at ) VALUES ( ${messageId}, ${threadId}, ${socket.data.user.id}, ${body}, ${"text"}, ${attachments.length > 0 ? JSON.stringify(attachments) : null}, ${clientMessageId}, ${now} ) `); await db.run(sql` UPDATE chat_threads SET last_message_at = ${now}, updated_at = ${now} WHERE id = ${threadId} `); const createdRows = await db.all(sql` SELECT id, thread_id as threadId, sender_user_id as senderUserId, body, kind, attachments_json as attachmentsJson, client_message_id as clientMessageId, created_at as createdAt, edited_at as editedAt, deleted_at as deletedAt FROM chat_messages WHERE id = ${messageId} LIMIT 1 `); const createdRow = createdRows?.[0] as MessageRow | undefined; if (!createdRow) { socket.emit("chat:error", { code: "send_failed", message: "Failed to persist message", }); return; } const message = mapMessageRow(createdRow); await socket.join(`thread:${threadId}`); io.to(`thread:${threadId}`).emit("chat:message:new", { threadId, message, }); io.to(`thread:${threadId}`).emit("chat:read:update", { threadId, userId: socket.data.user.id, lastReadMessageId: message.id, }); socket.emit("chat:message:ack", { threadId, clientMessageId, message, }); } catch (error) { socket.emit("chat:error", { code: "internal_error", message: error instanceof Error ? error.message : "Failed to send", }); } }); socket.on( "chat:read", async (payload: { threadId?: string; lastReadMessageId?: string }) => { const threadId = payload?.threadId; if (!threadId) { return; } const readAllowed = await checkRateLimit( readRateWindowByKey, `read:${socket.data.user.id}:${threadId}`, READ_LIMIT_PER_10_SECONDS, 10, ); if (!readAllowed) { return; } const allowed = await canAccessThread(socket.data.user, threadId); if (!allowed) { return; } await ensureThreadMembership( threadId, socket.data.user.id, socket.data.user.role, ); const now = Math.floor(Date.now() / 1000); await db.run(sql` UPDATE chat_thread_members SET last_read_message_id = ${payload?.lastReadMessageId ?? null}, last_read_at = ${now} WHERE thread_id = ${threadId} AND user_id = ${socket.data.user.id} `); io.to(`thread:${threadId}`).emit("chat:read:update", { threadId, userId: socket.data.user.id, lastReadMessageId: payload?.lastReadMessageId ?? null, }); }, ); }); async function canAccessThread( user: AuthenticatedSocketData["user"], threadId: string, ): Promise { const access = await getThreadAccess(user, threadId); return Boolean(access); } async function getThreadAccess( user: AuthenticatedSocketData["user"], threadId: string, ): Promise { const rows = await db.all(sql` SELECT id, type, gym_id as gymId, trainer_id as trainerId, client_id as clientId, archived_at as archivedAt FROM chat_threads WHERE id = ${threadId} LIMIT 1 `); const thread = rows?.[0] as | { id: string; type: "gym" | "dm"; gymId: string | null; trainerId: string | null; clientId: string | null; archivedAt: number | null; } | undefined; if (!thread || thread.archivedAt) { return null; } if (user.role === "superAdmin") { return { id: thread.id, type: thread.type, gymId: thread.gymId, trainerId: thread.trainerId, clientId: thread.clientId, }; } if (thread.type === "gym") { if (!user.gymId || user.gymId !== thread.gymId) { return null; } return { id: thread.id, type: thread.type, gymId: thread.gymId, trainerId: thread.trainerId, clientId: thread.clientId, }; } const isParticipant = user.id === thread.trainerId || user.id === thread.clientId; if (!isParticipant) { return null; } if (thread.trainerId && thread.clientId) { const assignmentRows = await db.all(sql` SELECT id FROM trainer_client_assignments WHERE trainer_id = ${thread.trainerId} AND client_id = ${thread.clientId} AND is_active = 1 LIMIT 1 `); if (!assignmentRows?.[0]) { return null; } } return { id: thread.id, type: thread.type, gymId: thread.gymId, trainerId: thread.trainerId, clientId: thread.clientId, }; } async function verifyAndExtractUserId( token: string | null, ): Promise { if (!token) { return null; } const secretKey = process.env.CLERK_SECRET_KEY; if (!secretKey) { return null; } try { const verified = await verifyToken(token, { secretKey, clockSkewInMs: 5000, }); if (typeof verified.sub === "string" && verified.sub.trim().length > 0) { return verified.sub; } const fallback = typeof verified.sid === "string" && verified.sid.trim().length > 0 ? verified.sid : null; if (fallback) { const rows = await db.all(sql` SELECT id FROM users WHERE id = ${fallback} LIMIT 1 `); if (rows?.[0]) { return fallback; } } const payload = JSON.parse( Buffer.from(token.split(".")[1], "base64url").toString(), ); if (typeof payload.sub === "string" && payload.sub.trim().length > 0) { return payload.sub; } return null; } catch { return null; } } async function main() { await attachRedisAdapter(); const port = Number(process.env.REALTIME_PORT ?? 3001); server.listen(port, () => { console.log(`[realtime] listening on :${port}`); }); } async function ensureThreadMembership( threadId: string, userId: string, role: AuthenticatedSocketData["user"]["role"], ): Promise { const now = nowEpochSeconds(); await db.run(sql` INSERT OR IGNORE INTO chat_thread_members ( id, thread_id, user_id, role_in_thread, joined_at, muted ) VALUES ( ${generateId("member")}, ${threadId}, ${userId}, ${role}, ${now}, ${0} ) `); await db.run(sql` UPDATE chat_thread_members SET left_at = NULL WHERE thread_id = ${threadId} AND user_id = ${userId} `); } function normalizeAttachments( raw: unknown, ): Array<{ url: string; type: string; name?: string }> { if (!Array.isArray(raw)) { return []; } return raw .filter( (item): item is { url: string; type: string; name?: string } => item !== null && typeof item === "object" && typeof (item as { url?: unknown }).url === "string" && typeof (item as { type?: unknown }).type === "string", ) .map((item) => ({ url: item.url, type: item.type, name: typeof item.name === "string" ? item.name : undefined, })); } function parseAttachments( value: string | null, ): Array<{ url: string; type: string; name?: string }> { if (!value) { return []; } try { return normalizeAttachments(JSON.parse(value)); } catch { return []; } } function mapMessageRow(row: MessageRow) { return { id: row.id, threadId: row.threadId, senderUserId: row.senderUserId, body: row.body, kind: row.kind, attachments: parseAttachments(row.attachmentsJson), clientMessageId: row.clientMessageId, createdAt: new Date(Number(row.createdAt) * 1000).toISOString(), editedAt: row.editedAt ? new Date(Number(row.editedAt) * 1000).toISOString() : null, deletedAt: row.deletedAt ? new Date(Number(row.deletedAt) * 1000).toISOString() : null, }; } function generateId(prefix: string): string { return `${prefix}_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`; } function nowEpochSeconds(): number { return Math.floor(Date.now() / 1000); } export async function checkRateLimit( windows: Map, key: string, limit: number, windowSeconds: number, ): Promise { if (redisRateClient) { try { const redisKey = `fitai:chat:rl:${key}`; const count = await redisRateClient.incr(redisKey); if (count === 1) { await redisRateClient.expire(redisKey, windowSeconds); } return count <= limit; } catch { // Continue with in-memory fallback below. } } const now = Date.now(); const current = windows.get(key) ?? []; const windowMs = windowSeconds * 1000; const withinWindow = current.filter( (timestamp) => now - timestamp < windowMs, ); if (withinWindow.length >= limit) { windows.set(key, withinWindow); return false; } withinWindow.push(now); windows.set(key, withinWindow); return true; } interface MessageRow { id: string; threadId: string; senderUserId: string; body: string; kind: "text" | "system"; attachmentsJson: string | null; clientMessageId: string | null; createdAt: number; editedAt: number | null; deletedAt: number | null; } void main();