From 1d6652b7c8ecb0e476ba1534dc89d141c064a7dd Mon Sep 17 00:00:00 2001 From: janpeter visser Date: Fri, 1 May 2026 14:31:39 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20presence=20layer=20=E2=80=94=20register?= =?UTF-8?q?Worker,=20startHeartbeat,=20registerShutdownHandlers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/presence/heartbeat.ts | 23 +++++++++++++++ src/presence/shutdown.ts | 20 +++++++++++++ src/presence/worker.ts | 61 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 src/presence/heartbeat.ts create mode 100644 src/presence/shutdown.ts create mode 100644 src/presence/worker.ts diff --git a/src/presence/heartbeat.ts b/src/presence/heartbeat.ts new file mode 100644 index 0000000..f4cb230 --- /dev/null +++ b/src/presence/heartbeat.ts @@ -0,0 +1,23 @@ +import { prisma } from '../prisma.js' + +export function startHeartbeat(opts: { + tokenId: string + intervalMs?: number +}): { stop: () => void } { + const timer = setInterval(async () => { + try { + const result = await prisma.claudeWorker.updateMany({ + where: { token_id: opts.tokenId }, + data: { last_seen_at: new Date() }, + }) + if (result.count === 0) { + console.error('[scrum4me-mcp] Heartbeat: worker record not found — token may be revoked. Stopping.') + clearInterval(timer) + } + } catch { + // non-fatal + } + }, opts.intervalMs ?? 5_000) + + return { stop: () => clearInterval(timer) } +} diff --git a/src/presence/shutdown.ts b/src/presence/shutdown.ts new file mode 100644 index 0000000..53eb567 --- /dev/null +++ b/src/presence/shutdown.ts @@ -0,0 +1,20 @@ +import { unregisterWorker } from './worker.js' + +export function registerShutdownHandlers(opts: { + userId: string + tokenId: string + stopHeartbeat: () => void +}): void { + let exiting = false + + const shutdown = async () => { + if (exiting) return + exiting = true + opts.stopHeartbeat() + await unregisterWorker({ userId: opts.userId, tokenId: opts.tokenId }) + process.exit(0) + } + + process.on('SIGTERM', () => void shutdown()) + process.on('SIGINT', () => void shutdown()) +} diff --git a/src/presence/worker.ts b/src/presence/worker.ts new file mode 100644 index 0000000..9fb39d7 --- /dev/null +++ b/src/presence/worker.ts @@ -0,0 +1,61 @@ +import { Client } from 'pg' +import { prisma } from '../prisma.js' + +export async function registerWorker(opts: { + userId: string + tokenId: string + productId?: string | null +}): Promise { + await prisma.claudeWorker.upsert({ + where: { token_id: opts.tokenId }, + create: { + user_id: opts.userId, + token_id: opts.tokenId, + product_id: opts.productId ?? null, + }, + update: { + last_seen_at: new Date(), + product_id: opts.productId ?? null, + }, + }) + + try { + const pg = new Client({ connectionString: process.env.DATABASE_URL }) + await pg.connect() + await pg.query('SELECT pg_notify($1, $2)', [ + 'scrum4me_changes', + JSON.stringify({ + type: 'worker_connected', + user_id: opts.userId, + token_id: opts.tokenId, + product_id: opts.productId ?? null, + }), + ]) + await pg.end() + } catch { + // non-fatal + } +} + +export async function unregisterWorker(opts: { + userId: string + tokenId: string +}): Promise { + await prisma.claudeWorker.deleteMany({ where: { token_id: opts.tokenId } }).catch(() => {}) + + try { + const pg = new Client({ connectionString: process.env.DATABASE_URL }) + await pg.connect() + await pg.query('SELECT pg_notify($1, $2)', [ + 'scrum4me_changes', + JSON.stringify({ + type: 'worker_disconnected', + user_id: opts.userId, + token_id: opts.tokenId, + }), + ]) + await pg.end() + } catch { + // non-fatal + } +}