175 lines
5.6 KiB
TypeScript
175 lines
5.6 KiB
TypeScript
import { describe, it, expect, afterEach, beforeEach } from "vitest";
|
|
import { mkdtempSync, rmSync } from "node:fs";
|
|
import { tmpdir } from "node:os";
|
|
import { join } from "node:path";
|
|
import { MailboxStore } from "../src/db.js";
|
|
import { buildServer } from "../src/server.js";
|
|
import type { FastifyInstance } from "fastify";
|
|
|
|
let dir: string;
|
|
let dbPath: string;
|
|
let store: MailboxStore;
|
|
let app: FastifyInstance;
|
|
let baseUrl: string;
|
|
|
|
beforeEach(async () => {
|
|
dir = mkdtempSync(join(tmpdir(), "claude-mailbox-watch-"));
|
|
dbPath = join(dir, "test.db");
|
|
store = new MailboxStore(dbPath);
|
|
app = await buildServer(
|
|
{
|
|
port: 0,
|
|
bind: "127.0.0.1",
|
|
dbPath,
|
|
hideAfterMinutes: 0,
|
|
deleteAfterMinutes: 0,
|
|
sweepIntervalMinutes: 0,
|
|
},
|
|
store,
|
|
);
|
|
await app.listen({ host: "127.0.0.1", port: 0 });
|
|
const addr = app.server.address();
|
|
if (!addr || typeof addr === "string") throw new Error("no address");
|
|
baseUrl = `http://127.0.0.1:${addr.port}`;
|
|
});
|
|
|
|
afterEach(async () => {
|
|
store.rejectAllWaiters();
|
|
await app.close();
|
|
store.close();
|
|
rmSync(dir, { recursive: true, force: true });
|
|
});
|
|
|
|
describe("GET /v1/watch", () => {
|
|
it("returns 200 with one pending message when one already exists", async () => {
|
|
store.upsertMailbox("alice");
|
|
store.upsertMailbox("bob");
|
|
store.send("alice", "bob", "hi bob");
|
|
|
|
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
expect(res.status).toBe(200);
|
|
const body = (await res.json()) as { from: string; body: string; sentAt: string };
|
|
expect(body.from).toBe("alice");
|
|
expect(body.body).toBe("hi bob");
|
|
expect(typeof body.sentAt).toBe("string");
|
|
});
|
|
|
|
it("blocks until a message arrives, then returns 200", async () => {
|
|
store.upsertMailbox("alice");
|
|
store.upsertMailbox("bob");
|
|
|
|
const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
|
|
setTimeout(() => {
|
|
store.send("alice", "bob", "delayed");
|
|
}, 50);
|
|
|
|
const res = await pending;
|
|
expect(res.status).toBe(200);
|
|
const body = (await res.json()) as { body: string };
|
|
expect(body.body).toBe("delayed");
|
|
});
|
|
|
|
it("returns 204 on timeout with no body", async () => {
|
|
store.upsertMailbox("bob");
|
|
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
expect(res.status).toBe(204);
|
|
expect(await res.text()).toBe("");
|
|
});
|
|
|
|
it("returns 409 with { reason: 'renamed', to } when mailbox is renamed mid-wait", async () => {
|
|
store.upsertMailbox("oldname");
|
|
const pending = fetch(`${baseUrl}/v1/watch?name=oldname&timeout=5`, {
|
|
headers: { "X-Mailbox": "oldname" },
|
|
});
|
|
setTimeout(() => store.rename("oldname", "newname"), 50);
|
|
const res = await pending;
|
|
expect(res.status).toBe(409);
|
|
const body = (await res.json()) as { reason: string; to: string };
|
|
expect(body).toEqual({ reason: "renamed", to: "newname" });
|
|
});
|
|
|
|
it("rejects mismatched X-Mailbox with 403", async () => {
|
|
store.upsertMailbox("bob");
|
|
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
|
|
headers: { "X-Mailbox": "alice" },
|
|
});
|
|
expect(res.status).toBe(403);
|
|
});
|
|
|
|
it("rejects missing X-Mailbox with 400", async () => {
|
|
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`);
|
|
expect(res.status).toBe(400);
|
|
});
|
|
|
|
it("rejects missing name with 400", async () => {
|
|
const res = await fetch(`${baseUrl}/v1/watch?timeout=1`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
expect(res.status).toBe(400);
|
|
});
|
|
|
|
it("caps timeout at 300 seconds server-side (rejects with 400 if too high)", async () => {
|
|
store.upsertMailbox("bob");
|
|
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=999`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
expect(res.status).toBe(400);
|
|
});
|
|
|
|
it("client disconnect cleans up the waiter (no leak)", async () => {
|
|
store.upsertMailbox("bob");
|
|
const ac = new AbortController();
|
|
const pending = fetch(`${baseUrl}/v1/watch?name=bob&timeout=5`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
signal: ac.signal,
|
|
}).catch((err) => err);
|
|
|
|
// Give the request a chance to register the waiter.
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
ac.abort();
|
|
await pending;
|
|
|
|
// Wait for the server-side TCP close event to propagate and remove the waiter.
|
|
await new Promise((r) => setTimeout(r, 100));
|
|
|
|
// Send after abort. No one should receive it (no waiter exists).
|
|
// It should still be queued for a future caller.
|
|
store.upsertMailbox("alice");
|
|
store.send("alice", "bob", "post-abort");
|
|
|
|
// A fresh check should immediately return the queued message.
|
|
const res = await fetch(`${baseUrl}/v1/watch?name=bob&timeout=1`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
expect(res.status).toBe(200);
|
|
const body = (await res.json()) as { body: string };
|
|
expect(body.body).toBe("post-abort");
|
|
});
|
|
|
|
it("two clients, one message: exactly one client receives it", async () => {
|
|
store.upsertMailbox("alice");
|
|
store.upsertMailbox("bob");
|
|
|
|
const r1 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
await new Promise((r) => setTimeout(r, 20));
|
|
const r2 = fetch(`${baseUrl}/v1/watch?name=bob&timeout=2`, {
|
|
headers: { "X-Mailbox": "bob" },
|
|
});
|
|
|
|
setTimeout(() => store.send("alice", "bob", "single"), 50);
|
|
|
|
const [res1, res2] = await Promise.all([r1, r2]);
|
|
const statuses = [res1.status, res2.status].sort();
|
|
expect(statuses).toEqual([200, 204]);
|
|
});
|
|
});
|