From c808a8770e1e5050bc999d2a853b78a1a7807e3d Mon Sep 17 00:00:00 2001 From: janpeter visser Date: Fri, 1 May 2026 14:50:43 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20registerWorker=20helper=20=E2=80=94=20u?= =?UTF-8?q?psert=20ClaudeWorker=20+=20pg=5Fnotify=20worker=5Fconnected?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- __tests__/worker.test.ts | 96 ++++++++++++++++++++++++++++++++++++++++ src/presence/worker.ts | 60 +++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 __tests__/worker.test.ts create mode 100644 src/presence/worker.ts diff --git a/__tests__/worker.test.ts b/__tests__/worker.test.ts new file mode 100644 index 0000000..1054b48 --- /dev/null +++ b/__tests__/worker.test.ts @@ -0,0 +1,96 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' + +vi.mock('../src/prisma.js', () => ({ + prisma: { + claudeWorker: { + upsert: vi.fn(), + deleteMany: vi.fn(), + }, + }, +})) + +import { prisma } from '../src/prisma.js' +import { registerWorker, unregisterWorker } from '../src/presence/worker.js' + +const mockPrisma = prisma as unknown as { + claudeWorker: { upsert: ReturnType; deleteMany: ReturnType } +} + +beforeEach(() => { + vi.clearAllMocks() + mockPrisma.claudeWorker.upsert.mockResolvedValue({}) + mockPrisma.claudeWorker.deleteMany.mockResolvedValue({ count: 1 }) +}) + +describe('registerWorker', () => { + it('upserts a ClaudeWorker record for the given token', async () => { + await registerWorker({ userId: 'u1', tokenId: 't1' }) + + expect(mockPrisma.claudeWorker.upsert).toHaveBeenCalledWith({ + where: { token_id: 't1' }, + create: { user_id: 'u1', token_id: 't1', product_id: null }, + update: { last_seen_at: expect.any(Date), product_id: null }, + }) + }) + + it('passes product_id to upsert when provided', async () => { + await registerWorker({ userId: 'u1', tokenId: 't1', productId: 'p1' }) + + expect(mockPrisma.claudeWorker.upsert).toHaveBeenCalledWith( + expect.objectContaining({ + create: expect.objectContaining({ product_id: 'p1' }), + update: expect.objectContaining({ product_id: 'p1' }), + }), + ) + }) + + it('sends pg_notify worker_connected with correct payload', async () => { + const mockNotify = vi.fn().mockResolvedValue(undefined) + + await registerWorker({ userId: 'u1', tokenId: 't1', productId: 'p1', notify: mockNotify }) + + expect(mockNotify).toHaveBeenCalledWith({ + type: 'worker_connected', + user_id: 'u1', + token_id: 't1', + product_id: 'p1', + }) + }) + + it('does not throw when notify fails', async () => { + const mockNotify = vi.fn().mockRejectedValue(new Error('pg down')) + + await expect( + registerWorker({ userId: 'u1', tokenId: 't1', notify: mockNotify }), + ).resolves.toBeUndefined() + expect(mockPrisma.claudeWorker.upsert).toHaveBeenCalled() + }) +}) + +describe('unregisterWorker', () => { + it('deletes the ClaudeWorker record for the token', async () => { + await unregisterWorker({ userId: 'u1', tokenId: 't1' }) + + expect(mockPrisma.claudeWorker.deleteMany).toHaveBeenCalledWith({ where: { token_id: 't1' } }) + }) + + it('sends pg_notify worker_disconnected', async () => { + const mockNotify = vi.fn().mockResolvedValue(undefined) + + await unregisterWorker({ userId: 'u1', tokenId: 't1', notify: mockNotify }) + + expect(mockNotify).toHaveBeenCalledWith({ + type: 'worker_disconnected', + user_id: 'u1', + token_id: 't1', + }) + }) + + it('does not throw when notify fails during unregister', async () => { + const mockNotify = vi.fn().mockRejectedValue(new Error('pg down')) + + await expect( + unregisterWorker({ userId: 'u1', tokenId: 't1', notify: mockNotify }), + ).resolves.toBeUndefined() + }) +}) diff --git a/src/presence/worker.ts b/src/presence/worker.ts new file mode 100644 index 0000000..762f286 --- /dev/null +++ b/src/presence/worker.ts @@ -0,0 +1,60 @@ +import { Client } from 'pg' +import { prisma } from '../prisma.js' + +export async function pgNotify(payload: Record): Promise { + const pg = new Client({ connectionString: process.env.DATABASE_URL }) + await pg.connect() + await pg.query('SELECT pg_notify($1, $2)', ['scrum4me_changes', JSON.stringify(payload)]) + await pg.end() +} + +export async function registerWorker(opts: { + userId: string + tokenId: string + productId?: string | null + notify?: (payload: Record) => Promise +}): Promise { + await prisma.claudeWorker.upsert({ + where: { token_id: opts.tokenId }, + create: { + user_id: opts.userId, + token_id: opts.tokenId, + product_id: opts.productId ?? null, + }, + update: { + last_seen_at: new Date(), + product_id: opts.productId ?? null, + }, + }) + + const notify = opts.notify ?? pgNotify + try { + await notify({ + type: 'worker_connected', + user_id: opts.userId, + token_id: opts.tokenId, + product_id: opts.productId ?? null, + }) + } catch { + // non-fatal + } +} + +export async function unregisterWorker(opts: { + userId: string + tokenId: string + notify?: (payload: Record) => Promise +}): Promise { + await prisma.claudeWorker.deleteMany({ where: { token_id: opts.tokenId } }).catch(() => {}) + + const notify = opts.notify ?? pgNotify + try { + await notify({ + type: 'worker_disconnected', + user_id: opts.userId, + token_id: opts.tokenId, + }) + } catch { + // non-fatal + } +}