import { createFileRoute } from "@tanstack/react-router"; import { serverEvents, type ServerEvent } from "@/lib/events/emitter"; import { logger } from "@/lib/logger"; import { superTokensRequestMiddleware } from "@/utils/supertokens"; let activeConnections = 0; export const Route = createFileRoute("/api/events/$")({ server: { middleware: [superTokensRequestMiddleware], handlers: { GET: ({ request }) => { activeConnections++; const connectionId = `conn_${Date.now()}_${Math.random().toString(36).substring(2, 11)}`; logger.info(`ServerEvents | New connection ${connectionId}. Active: ${activeConnections}`); const stream = new ReadableStream({ start(controller) { const connectMessage = `data: ${JSON.stringify({ type: "connected" })}\n\n`; controller.enqueue(new TextEncoder().encode(connectMessage)); const handleEvent = (event: ServerEvent) => { logger.info("ServerEvents | Event received", event); const message = `data: ${JSON.stringify(event)}\n\n`; try { if (!controller.desiredSize || controller.desiredSize <= 0) { logger.warn("ServerEvents | Stream closed, skipping event"); return; } controller.enqueue(new TextEncoder().encode(message)); } catch (error) { logger.error("ServerEvents | Error sending SSE message", error); } }; serverEvents.on("test", handleEvent); serverEvents.on("match", handleEvent); serverEvents.on("reaction", handleEvent); const pingInterval = setInterval(() => { try { if (!controller.desiredSize || controller.desiredSize <= 0) { clearInterval(pingInterval); return; } const pingMessage = `data: ${JSON.stringify({ type: "ping", timestamp: Date.now() })}\n\n`; controller.enqueue(new TextEncoder().encode(pingMessage)); } catch (e) { logger.error("ServerEvents | Ping interval error", e); clearInterval(pingInterval); } }, 15000); setTimeout(() => { try { const heartbeatMessage = `data: ${JSON.stringify({ type: "heartbeat", timestamp: Date.now() })}\n\n`; controller.enqueue(new TextEncoder().encode(heartbeatMessage)); } catch (e) { logger.error("ServerEvents | Heartbeat error", e); } }, 1000); const cleanup = () => { activeConnections--; serverEvents.off("test", handleEvent); serverEvents.off("match", handleEvent); serverEvents.off("reaction", handleEvent); clearInterval(pingInterval); logger.info(`ServerEvents | Connection ${connectionId} cleanup completed. Active: ${activeConnections}`); }; request.signal?.addEventListener("abort", cleanup); return cleanup; }, }); return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-store, must-revalidate", "Connection": "keep-alive", "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Headers": "Cache-Control", "X-Accel-Buffering": "no", "X-Proxy-Buffering": "no", "Proxy-Buffering": "off", "Transfer-Encoding": "chunked", }, }); }, }, }, });