Files
ClaudeMailbox/node/tests/db-watch.test.ts
2026-05-20 16:15:39 +02:00

156 lines
5.4 KiB
TypeScript

import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { MailboxStore } from "../src/db.js";
let dir: string;
let store: MailboxStore;
beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), "claude-mailbox-wait-"));
store = new MailboxStore(join(dir, "test.db"));
});
afterEach(() => {
store.close();
rmSync(dir, { recursive: true, force: true });
});
describe("MailboxStore.waitForMessage", () => {
it("returns an already-pending message immediately", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
store.send("alice", "bob", "hello");
const ac = new AbortController();
const result = await store.waitForMessage("bob", 1000, ac.signal);
expect(result.kind).toBe("message");
if (result.kind === "message") {
expect(result.message.body).toBe("hello");
expect(result.message.from_mailbox).toBe("alice");
expect(result.message.delivered_at).not.toBeNull();
}
});
it("blocks until a message arrives, then resolves", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const ac = new AbortController();
const pending = store.waitForMessage("bob", 5000, ac.signal);
setTimeout(() => store.send("alice", "bob", "later"), 50);
const result = await pending;
expect(result.kind).toBe("message");
if (result.kind === "message") expect(result.message.body).toBe("later");
});
it("resolves with timeout when nothing arrives", async () => {
store.upsertMailbox("bob");
const ac = new AbortController();
const result = await store.waitForMessage("bob", 80, ac.signal);
expect(result.kind).toBe("timeout");
});
it("resolves with aborted when the signal fires", async () => {
store.upsertMailbox("bob");
const ac = new AbortController();
const pending = store.waitForMessage("bob", 5000, ac.signal);
setTimeout(() => ac.abort(), 30);
const result = await pending;
expect(result.kind).toBe("aborted");
});
it("resolves with renamed when the mailbox is renamed mid-wait", async () => {
store.upsertMailbox("oldname");
const ac = new AbortController();
const pending = store.waitForMessage("oldname", 5000, ac.signal);
setTimeout(() => store.rename("oldname", "newname"), 30);
const result = await pending;
expect(result.kind).toBe("renamed");
if (result.kind === "renamed") expect(result.to).toBe("newname");
});
it("FIFO single-delivery: two waiters, one send, only the first gets the message", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const ac1 = new AbortController();
const ac2 = new AbortController();
const w1 = store.waitForMessage("bob", 5000, ac1.signal);
// Stagger so w1 registers first.
await new Promise((r) => setTimeout(r, 10));
const w2 = store.waitForMessage("bob", 200, ac2.signal);
store.send("alice", "bob", "for-w1");
const [r1, r2] = await Promise.all([w1, w2]);
expect(r1.kind).toBe("message");
if (r1.kind === "message") expect(r1.message.body).toBe("for-w1");
expect(r2.kind).toBe("timeout");
});
it("two pending messages are drained by two reconnecting waiters", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
store.send("alice", "bob", "m1");
store.send("alice", "bob", "m2");
const ac = new AbortController();
const r1 = await store.waitForMessage("bob", 1000, ac.signal);
const r2 = await store.waitForMessage("bob", 1000, ac.signal);
expect(r1.kind).toBe("message");
expect(r2.kind).toBe("message");
if (r1.kind === "message" && r2.kind === "message") {
expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]);
}
});
it("abort racing send: message is either delivered or remains pending, never lost", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const ac = new AbortController();
const pending = store.waitForMessage("bob", 5000, ac.signal);
// Fire abort and send in the same tick — either order is valid as long as the message is never lost.
ac.abort();
store.send("alice", "bob", "racy");
const r = await pending;
if (r.kind === "aborted") {
// Message must still be in DB for the next caller.
expect(store.peek("bob").pending).toBe(1);
} else if (r.kind === "message") {
expect(r.message.body).toBe("racy");
expect(store.peek("bob").pending).toBe(0);
} else {
throw new Error(`unexpected kind: ${r.kind}`);
}
});
it("rejectAllWaiters resolves every pending waiter as aborted and empties the bucket map", async () => {
store.upsertMailbox("bob");
store.upsertMailbox("carol");
const ac1 = new AbortController();
const ac2 = new AbortController();
const ac3 = new AbortController();
const w1 = store.waitForMessage("bob", 5000, ac1.signal);
const w2 = store.waitForMessage("bob", 5000, ac2.signal);
const w3 = store.waitForMessage("carol", 5000, ac3.signal);
// Give all three a chance to register their waiters.
await new Promise((r) => setTimeout(r, 10));
store.rejectAllWaiters();
const [r1, r2, r3] = await Promise.all([w1, w2, w3]);
expect(r1.kind).toBe("aborted");
expect(r2.kind).toBe("aborted");
expect(r3.kind).toBe("aborted");
});
});