import { describe, it, expect, afterEach, beforeEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { MailboxStore } from "../src/db.js"; import { buildServer } from "../src/server.js"; import type { FastifyInstance } from "fastify"; let dir: string; let dbPath: string; let store: MailboxStore; let app: FastifyInstance; let baseUrl: string; beforeEach(async () => { dir = mkdtempSync(join(tmpdir(), "claude-mailbox-watch-")); dbPath = join(dir, "test.db"); store = new MailboxStore(dbPath); app = await buildServer( { port: 0, bind: "127.0.0.1", dbPath, hideAfterMinutes: 0, deleteAfterMinutes: 0, sweepIntervalMinutes: 0, }, store, ); await app.listen({ host: "127.0.0.1", port: 0 }); const addr = app.server.address(); if (!addr || typeof addr === "string") throw new Error("no address"); baseUrl = `http://127.0.0.1:${addr.port}`; }); afterEach(async () => { store.rejectAllWaiters(); await app.close(); store.close(); rmSync(dir, { recursive: true, force: true }); }); describe("GET /v1/watch", () => { it("returns 200 with one pending message when one already exists", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); store.send("alice", "bob", "hi bob"); const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { headers: { "X-Mailbox": "bob" }, }); expect(res.status).toBe(200); const body = (await res.json()) as { from: string; body: string; sentAt: string }; expect(body.from).toBe("alice"); expect(body.body).toBe("hi bob"); expect(typeof body.sentAt).toBe("string"); }); it("blocks until a message arrives, then returns 200", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, { headers: { "X-Mailbox": "bob" }, }); setTimeout(() => { store.send("alice", "bob", "delayed"); }, 50); const res = await pending; expect(res.status).toBe(200); const body = (await res.json()) as { body: string }; expect(body.body).toBe("delayed"); }); it("returns 204 on timeout with no body", async () => { store.upsertMailbox("bob"); const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { headers: { "X-Mailbox": "bob" }, }); expect(res.status).toBe(204); expect(await res.text()).toBe(""); }); it("returns 409 with { reason: 'renamed', to } when mailbox is renamed mid-wait", async () => { store.upsertMailbox("oldname"); const pending = fetch(`${baseUrl}/v1/watch?name=oldname&timeout=5`, { headers: { "X-Mailbox": "oldname" }, }); setTimeout(() => store.rename("oldname", "newname"), 50); const res = await pending; expect(res.status).toBe(409); const body = (await res.json()) as { reason: string; to: string }; expect(body).toEqual({ reason: "renamed", to: "newname" }); }); it("rejects mismatched X-Mailbox with 403", async () => { store.upsertMailbox("bob"); const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { headers: { "X-Mailbox": "alice" }, }); expect(res.status).toBe(403); }); it("rejects missing X-Mailbox with 400", async () => { const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`); expect(res.status).toBe(400); }); it("rejects missing name with 400", async () => { const res = await fetch(`${baseUrl}/v1/watch?timeout=1`, { headers: { "X-Mailbox": "bob" }, }); expect(res.status).toBe(400); }); it("caps timeout at 300 seconds server-side (rejects with 400 if too high)", async () => { store.upsertMailbox("bob"); const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=999`, { headers: { "X-Mailbox": "bob" }, }); expect(res.status).toBe(400); }); it("client disconnect cleans up the waiter (no leak)", async () => { store.upsertMailbox("bob"); const ac = new AbortController(); const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, { headers: { "X-Mailbox": "bob" }, signal: ac.signal, }).catch((err) => err); // Give the request a chance to register the waiter. await new Promise((r) => setTimeout(r, 50)); ac.abort(); await pending; // Wait for the server-side TCP close event to propagate and remove the waiter. await new Promise((r) => setTimeout(r, 100)); // Send after abort. No one should receive it (no waiter exists). // It should still be queued for a future caller. store.upsertMailbox("alice"); store.send("alice", "bob", "post-abort"); // A fresh check should immediately return the queued message. const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, { headers: { "X-Mailbox": "bob" }, }); expect(res.status).toBe(200); const body = (await res.json()) as { body: string }; expect(body.body).toBe("post-abort"); }); it("two clients, one message: exactly one client receives it", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); const r1 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, { headers: { "X-Mailbox": "bob" }, }); await new Promise((r) => setTimeout(r, 20)); const r2 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, { headers: { "X-Mailbox": "bob" }, }); setTimeout(() => store.send("alice", "bob", "single"), 50); const [res1, res2] = await Promise.all([r1, r2]); const statuses = [res1.status, res2.status].sort(); expect(statuses).toEqual([200, 204]); }); });