import { describe, it, expect, beforeEach, afterEach } from "vitest"; import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { MailboxStore } from "../src/db.js"; let dir: string; let store: MailboxStore; beforeEach(() => { dir = mkdtempSync(join(tmpdir(), "claude-mailbox-wait-")); store = new MailboxStore(join(dir, "test.db")); }); afterEach(() => { store.close(); rmSync(dir, { recursive: true, force: true }); }); describe("MailboxStore.waitForMessage", () => { it("returns an already-pending message immediately", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); store.send("alice", "bob", "hello"); const ac = new AbortController(); const result = await store.waitForMessage("bob", 1000, ac.signal); expect(result.kind).toBe("message"); if (result.kind === "message") { expect(result.message.body).toBe("hello"); expect(result.message.from_mailbox).toBe("alice"); expect(result.message.delivered_at).not.toBeNull(); } }); it("blocks until a message arrives, then resolves", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); const ac = new AbortController(); const pending = store.waitForMessage("bob", 5000, ac.signal); setTimeout(() => store.send("alice", "bob", "later"), 50); const result = await pending; expect(result.kind).toBe("message"); if (result.kind === "message") expect(result.message.body).toBe("later"); }); it("resolves with timeout when nothing arrives", async () => { store.upsertMailbox("bob"); const ac = new AbortController(); const result = await store.waitForMessage("bob", 80, ac.signal); expect(result.kind).toBe("timeout"); }); it("resolves with aborted when the signal fires", async () => { store.upsertMailbox("bob"); const ac = new AbortController(); const pending = store.waitForMessage("bob", 5000, ac.signal); setTimeout(() => ac.abort(), 30); const result = await pending; expect(result.kind).toBe("aborted"); }); it("resolves with renamed when the mailbox is renamed mid-wait", async () => { store.upsertMailbox("oldname"); const ac = new AbortController(); const pending = store.waitForMessage("oldname", 5000, ac.signal); setTimeout(() => store.rename("oldname", "newname"), 30); const result = await pending; expect(result.kind).toBe("renamed"); if (result.kind === "renamed") expect(result.to).toBe("newname"); }); it("FIFO single-delivery: two waiters, one send, only the first gets the message", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); const ac1 = new AbortController(); const ac2 = new AbortController(); const w1 = store.waitForMessage("bob", 5000, ac1.signal); // Stagger so w1 registers first. await new Promise((r) => setTimeout(r, 10)); const w2 = store.waitForMessage("bob", 200, ac2.signal); store.send("alice", "bob", "for-w1"); const [r1, r2] = await Promise.all([w1, w2]); expect(r1.kind).toBe("message"); if (r1.kind === "message") expect(r1.message.body).toBe("for-w1"); expect(r2.kind).toBe("timeout"); }); it("two pending messages are drained by two reconnecting waiters", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); store.send("alice", "bob", "m1"); store.send("alice", "bob", "m2"); const ac = new AbortController(); const r1 = await store.waitForMessage("bob", 1000, ac.signal); const r2 = await store.waitForMessage("bob", 1000, ac.signal); expect(r1.kind).toBe("message"); expect(r2.kind).toBe("message"); if (r1.kind === "message" && r2.kind === "message") { expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]); } }); it("abort racing send: message is either delivered or remains pending, never lost", async () => { store.upsertMailbox("alice"); store.upsertMailbox("bob"); const ac = new AbortController(); const pending = store.waitForMessage("bob", 5000, ac.signal); // Fire abort and send in the same tick — either order is valid as long as the message is never lost. ac.abort(); store.send("alice", "bob", "racy"); const r = await pending; if (r.kind === "aborted") { // Message must still be in DB for the next caller. expect(store.peek("bob").pending).toBe(1); } else if (r.kind === "message") { expect(r.message.body).toBe("racy"); expect(store.peek("bob").pending).toBe(0); } else { throw new Error(`unexpected kind: ${r.kind}`); } }); it("rejectAllWaiters resolves every pending waiter as aborted and empties the bucket map", async () => { store.upsertMailbox("bob"); store.upsertMailbox("carol"); const ac1 = new AbortController(); const ac2 = new AbortController(); const ac3 = new AbortController(); const w1 = store.waitForMessage("bob", 5000, ac1.signal); const w2 = store.waitForMessage("bob", 5000, ac2.signal); const w3 = store.waitForMessage("carol", 5000, ac3.signal); // Give all three a chance to register their waiters. await new Promise((r) => setTimeout(r, 10)); store.rejectAllWaiters(); const [r1, r2, r3] = await Promise.all([w1, w2, w3]); expect(r1.kind).toBe("aborted"); expect(r2.kind).toBe("aborted"); expect(r3.kind).toBe("aborted"); }); });