From b05e6f2bd7f908cf8102a2393d35b80bdf0f6e31 Mon Sep 17 00:00:00 2001 From: Mika Kuns Date: Wed, 20 May 2026 16:19:11 +0200 Subject: [PATCH] feat(server): add GET /v1/watch long-poll endpoint with abort + rename handling Co-Authored-By: Claude Opus 4.7 (1M context) --- node/src/server.ts | 47 +++++++++ node/tests/server-watch.test.ts | 174 ++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) create mode 100644 node/tests/server-watch.test.ts diff --git a/node/src/server.ts b/node/src/server.ts index a58605d..2d59aba 100644 --- a/node/src/server.ts +++ b/node/src/server.ts @@ -121,6 +121,53 @@ export async function buildServer(cfg: DaemonConfig, store: MailboxStore): Promi } }); + const WATCH_DEFAULT_TIMEOUT_S = 25; + const WATCH_MAX_TIMEOUT_S = 300; + + app.get<{ Querystring: { name?: string; timeout?: string } }>( + "/v1/watch", + async (req, reply) => { + const name = (req.query.name ?? "").trim(); + if (!name) { + reply.code(400); + return { error: "name is required" }; + } + if (name !== req.mailboxName) { + reply.code(403); + return { error: "X-Mailbox header must match name." }; + } + + const rawTimeout = req.query.timeout; + const timeoutS = rawTimeout != null ? parseInt(rawTimeout, 10) : WATCH_DEFAULT_TIMEOUT_S; + if (!Number.isFinite(timeoutS) || timeoutS <= 0 || timeoutS > WATCH_MAX_TIMEOUT_S) { + reply.code(400); + return { error: `timeout must be 1..${WATCH_MAX_TIMEOUT_S} seconds` }; + } + + const ac = new AbortController(); + req.raw.on("close", () => ac.abort()); + + const result = await store.waitForMessage(name, timeoutS * 1000, ac.signal); + + if (result.kind === "message") { + const msg = rowToMessage(result.message); + reply.code(200); + return { ...msg, sentAt: msg.sentAt.toISOString() }; + } + if (result.kind === "renamed") { + reply.code(409); + return { reason: "renamed", to: result.to }; + } + if (result.kind === "timeout") { + reply.code(204); + return reply.send(); + } + // aborted — client gone, no response needed (Fastify will swallow). + reply.code(499); + return reply.send(); + }, + ); + await registerMcp(app, store, cfg.hideAfterMinutes); return app; diff --git a/node/tests/server-watch.test.ts b/node/tests/server-watch.test.ts new file mode 100644 index 0000000..47374b6 --- /dev/null +++ b/node/tests/server-watch.test.ts @@ -0,0 +1,174 @@ +import { describe, it, expect, afterEach, beforeEach } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { MailboxStore } from "../src/db.js"; +import { buildServer } from "../src/server.js"; +import type { FastifyInstance } from "fastify"; + +let dir: string; +let dbPath: string; +let store: MailboxStore; +let app: FastifyInstance; +let baseUrl: string; + +beforeEach(async () => { + dir = mkdtempSync(join(tmpdir(), "claude-mailbox-watch-")); + dbPath = join(dir, "test.db"); + store = new MailboxStore(dbPath); + app = await buildServer( + { + port: 0, + bind: "127.0.0.1", + dbPath, + hideAfterMinutes: 0, + deleteAfterMinutes: 0, + sweepIntervalMinutes: 0, + }, + store, + ); + await app.listen({ host: "127.0.0.1", port: 0 }); + const addr = app.server.address(); + if (!addr || typeof addr === "string") throw new Error("no address"); + baseUrl = `http://127.0.0.1:${addr.port}`; +}); + +afterEach(async () => { + store.rejectAllWaiters(); + await app.close(); + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +describe("GET /v1/watch", () => { + it("returns 200 with one pending message when one already exists", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + store.send("alice", "bob", "hi bob"); + + const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { + headers: { "X-Mailbox": "bob" }, + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { from: string; body: string; sentAt: string }; + expect(body.from).toBe("alice"); + expect(body.body).toBe("hi bob"); + expect(typeof body.sentAt).toBe("string"); + }); + + it("blocks until a message arrives, then returns 200", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + + const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, { + headers: { "X-Mailbox": "bob" }, + }); + + setTimeout(() => { + store.send("alice", "bob", "delayed"); + }, 50); + + const res = await pending; + expect(res.status).toBe(200); + const body = (await res.json()) as { body: string }; + expect(body.body).toBe("delayed"); + }); + + it("returns 204 on timeout with no body", async () => { + store.upsertMailbox("bob"); + const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { + headers: { "X-Mailbox": "bob" }, + }); + expect(res.status).toBe(204); + expect(await res.text()).toBe(""); + }); + + it("returns 409 with { reason: 'renamed', to } when mailbox is renamed mid-wait", async () => { + store.upsertMailbox("oldname"); + const pending = fetch(`${baseUrl}/v1/watch?name=oldname&timeout=5`, { + headers: { "X-Mailbox": "oldname" }, + }); + setTimeout(() => store.rename("oldname", "newname"), 50); + const res = await pending; + expect(res.status).toBe(409); + const body = (await res.json()) as { reason: string; to: string }; + expect(body).toEqual({ reason: "renamed", to: "newname" }); + }); + + it("rejects mismatched X-Mailbox with 403", async () => { + store.upsertMailbox("bob"); + const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { + headers: { "X-Mailbox": "alice" }, + }); + expect(res.status).toBe(403); + }); + + it("rejects missing X-Mailbox with 400", async () => { + const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`); + expect(res.status).toBe(400); + }); + + it("rejects missing name with 400", async () => { + const res = await fetch(`${baseUrl}/v1/watch?timeout=1`, { + headers: { "X-Mailbox": "bob" }, + }); + expect(res.status).toBe(400); + }); + + it("caps timeout at 300 seconds server-side (rejects with 400 if too high)", async () => { + store.upsertMailbox("bob"); + const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=999`, { + headers: { "X-Mailbox": "bob" }, + }); + expect(res.status).toBe(400); + }); + + it("client disconnect cleans up the waiter (no leak)", async () => { + store.upsertMailbox("bob"); + const ac = new AbortController(); + const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, { + headers: { "X-Mailbox": "bob" }, + signal: ac.signal, + }).catch((err) => err); + + // Give the request a chance to register the waiter. + await new Promise((r) => setTimeout(r, 50)); + ac.abort(); + await pending; + + // Wait for the server-side TCP close event to propagate and remove the waiter. + await new Promise((r) => setTimeout(r, 100)); + + // Send after abort. No one should receive it (no waiter exists). + // It should still be queued for a future caller. + store.upsertMailbox("alice"); + store.send("alice", "bob", "post-abort"); + + // A fresh check should immediately return the queued message. + const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { + headers: { "X-Mailbox": "bob" }, + }); + expect(res.status).toBe(200); + const body = (await res.json()) as { body: string }; + expect(body.body).toBe("post-abort"); + }); + + it("two clients, one message: exactly one client receives it", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + + const r1 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, { + headers: { "X-Mailbox": "bob" }, + }); + await new Promise((r) => setTimeout(r, 20)); + const r2 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, { + headers: { "X-Mailbox": "bob" }, + }); + + setTimeout(() => store.send("alice", "bob", "single"), 50); + + const [res1, res2] = await Promise.all([r1, r2]); + const statuses = [res1.status, res2.status].sort(); + expect(statuses).toEqual([200, 204]); + }); +});