feat(server): add GET /v1/watch long-poll endpoint with abort + rename handling

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-05-20 16:19:11 +02:00
parent b74e969229
commit b05e6f2bd7
2 changed files with 221 additions and 0 deletions

View File

@@ -0,0 +1,174 @@
import { describe, it, expect, afterEach, beforeEach } from "vitest";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { MailboxStore } from "../src/db.js";
import { buildServer } from "../src/server.js";
import type { FastifyInstance } from "fastify";
let dir: string;
let dbPath: string;
let store: MailboxStore;
let app: FastifyInstance;
let baseUrl: string;
beforeEach(async () => {
dir = mkdtempSync(join(tmpdir(), "claude-mailbox-watch-"));
dbPath = join(dir, "test.db");
store = new MailboxStore(dbPath);
app = await buildServer(
{
port: 0,
bind: "127.0.0.1",
dbPath,
hideAfterMinutes: 0,
deleteAfterMinutes: 0,
sweepIntervalMinutes: 0,
},
store,
);
await app.listen({ host: "127.0.0.1", port: 0 });
const addr = app.server.address();
if (!addr || typeof addr === "string") throw new Error("no address");
baseUrl = `http://127.0.0.1:${addr.port}`;
});
afterEach(async () => {
store.rejectAllWaiters();
await app.close();
store.close();
rmSync(dir, { recursive: true, force: true });
});
describe("GET /v1/watch", () => {
it("returns 200 with one pending message when one already exists", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
store.send("alice", "bob", "hi bob");
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
headers: { "X-Mailbox": "bob" },
});
expect(res.status).toBe(200);
const body = (await res.json()) as { from: string; body: string; sentAt: string };
expect(body.from).toBe("alice");
expect(body.body).toBe("hi bob");
expect(typeof body.sentAt).toBe("string");
});
it("blocks until a message arrives, then returns 200", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, {
headers: { "X-Mailbox": "bob" },
});
setTimeout(() => {
store.send("alice", "bob", "delayed");
}, 50);
const res = await pending;
expect(res.status).toBe(200);
const body = (await res.json()) as { body: string };
expect(body.body).toBe("delayed");
});
it("returns 204 on timeout with no body", async () => {
store.upsertMailbox("bob");
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
headers: { "X-Mailbox": "bob" },
});
expect(res.status).toBe(204);
expect(await res.text()).toBe("");
});
it("returns 409 with { reason: 'renamed', to } when mailbox is renamed mid-wait", async () => {
store.upsertMailbox("oldname");
const pending = fetch(`${baseUrl}/v1/watch?name=oldname&timeout=5`, {
headers: { "X-Mailbox": "oldname" },
});
setTimeout(() => store.rename("oldname", "newname"), 50);
const res = await pending;
expect(res.status).toBe(409);
const body = (await res.json()) as { reason: string; to: string };
expect(body).toEqual({ reason: "renamed", to: "newname" });
});
it("rejects mismatched X-Mailbox with 403", async () => {
store.upsertMailbox("bob");
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
headers: { "X-Mailbox": "alice" },
});
expect(res.status).toBe(403);
});
it("rejects missing X-Mailbox with 400", async () => {
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`);
expect(res.status).toBe(400);
});
it("rejects missing name with 400", async () => {
const res = await fetch(`${baseUrl}/v1/watch?timeout=1`, {
headers: { "X-Mailbox": "bob" },
});
expect(res.status).toBe(400);
});
it("caps timeout at 300 seconds server-side (rejects with 400 if too high)", async () => {
store.upsertMailbox("bob");
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=999`, {
headers: { "X-Mailbox": "bob" },
});
expect(res.status).toBe(400);
});
it("client disconnect cleans up the waiter (no leak)", async () => {
store.upsertMailbox("bob");
const ac = new AbortController();
const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, {
headers: { "X-Mailbox": "bob" },
signal: ac.signal,
}).catch((err) => err);
// Give the request a chance to register the waiter.
await new Promise((r) => setTimeout(r, 50));
ac.abort();
await pending;
// Wait for the server-side TCP close event to propagate and remove the waiter.
await new Promise((r) => setTimeout(r, 100));
// Send after abort. No one should receive it (no waiter exists).
// It should still be queued for a future caller.
store.upsertMailbox("alice");
store.send("alice", "bob", "post-abort");
// A fresh check should immediately return the queued message.
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
headers: { "X-Mailbox": "bob" },
});
expect(res.status).toBe(200);
const body = (await res.json()) as { body: string };
expect(body.body).toBe("post-abort");
});
it("two clients, one message: exactly one client receives it", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const r1 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, {
headers: { "X-Mailbox": "bob" },
});
await new Promise((r) => setTimeout(r, 20));
const r2 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, {
headers: { "X-Mailbox": "bob" },
});
setTimeout(() => store.send("alice", "bob", "single"), 50);
const [res1, res2] = await Promise.all([r1, r2]);
const statuses = [res1.status, res2.status].sort();
expect(statuses).toEqual([200, 204]);
});
});