diff --git a/node/src/db.ts b/node/src/db.ts index b37c331..0a368ec 100644 --- a/node/src/db.ts +++ b/node/src/db.ts @@ -52,6 +52,16 @@ function nowIso(): string { export type RenameFailure = "invalid" | "source-missing" | "target-exists"; +export type WaitResult = + | { kind: "message"; message: MessageRow } + | { kind: "timeout" } + | { kind: "renamed"; to: string } + | { kind: "aborted" }; + +interface Waiter { + resolve: (result: WaitResult) => void; +} + export class RenameError extends Error { constructor(message: string, public readonly reason: RenameFailure) { super(message); @@ -84,6 +94,7 @@ function runInTransaction(db: DatabaseSync, fn: () => T): T { export class MailboxStore { private readonly db: DatabaseSync; + private readonly waiters = new Map>(); private readonly stmts: { findMailbox: StatementSync; @@ -101,6 +112,7 @@ export class MailboxStore { findStaleCandidates: StatementSync; deleteMessagesForNames: StatementSync; deleteMailboxesByNames: StatementSync; + selectOnePending: StatementSync; }; constructor(public readonly dbPath: string) { @@ -162,6 +174,9 @@ export class MailboxStore { deleteMailboxesByNames: this.db.prepare( "DELETE FROM mailboxes WHERE name IN (SELECT value FROM json_each(?))", ), + selectOnePending: this.db.prepare( + "SELECT * FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL ORDER BY id LIMIT 1", + ), }; } @@ -169,6 +184,16 @@ export class MailboxStore { this.db.close(); } + private consumeOne(name: string): MessageRow | null { + return runInTransaction(this.db, () => { + const row = this.stmts.selectOnePending.get(name) as MessageRow | undefined; + if (!row) return null; + const deliveredAt = nowIso(); + this.stmts.markDelivered.run(deliveredAt, JSON.stringify([row.id])); + return { ...row, delivered_at: deliveredAt }; + }); + } + upsertMailbox(name: string): void { const now = nowIso(); const existing = this.stmts.findMailbox.get(name) as unknown as MailboxRow | undefined; @@ -180,13 +205,15 @@ export class MailboxStore { } send(from: string, to: string, body: string): { id: number; queuedAt: Date } { - return runInTransaction(this.db, () => { + const result = runInTransaction(this.db, () => { this.upsertMailbox(from); this.upsertMailbox(to); const createdAt = nowIso(); - const result = this.stmts.insertMessage.run(to, from, body, createdAt); - return { id: Number(result.lastInsertRowid), queuedAt: new Date(createdAt) }; + const insert = this.stmts.insertMessage.run(to, from, body, createdAt); + return { id: Number(insert.lastInsertRowid), queuedAt: new Date(createdAt) }; }); + this.notifyOneWaiter(to); + return result; } peek(name: string): InboxStatus { @@ -207,6 +234,72 @@ export class MailboxStore { }); } + waitForMessage(name: string, timeoutMs: number, signal: AbortSignal): Promise { + const existing = this.consumeOne(name); + if (existing) return Promise.resolve({ kind: "message" as const, message: existing }); + + if (signal.aborted) return Promise.resolve({ kind: "aborted" as const }); + + return new Promise((resolve) => { + const waiter: Waiter = { resolve }; + let bucket = this.waiters.get(name); + if (!bucket) { + bucket = new Set(); + this.waiters.set(name, bucket); + } + bucket.add(waiter); + + const cleanup = (): void => { + const b = this.waiters.get(name); + if (b) { + b.delete(waiter); + if (b.size === 0) this.waiters.delete(name); + } + }; + + const timer = setTimeout(() => { + cleanup(); + resolve({ kind: "timeout" }); + }, timeoutMs); + + signal.addEventListener( + "abort", + () => { + clearTimeout(timer); + cleanup(); + resolve({ kind: "aborted" }); + }, + { once: true }, + ); + }); + } + + private notifyOneWaiter(name: string): void { + const bucket = this.waiters.get(name); + if (!bucket || bucket.size === 0) return; + const first = bucket.values().next().value; + if (!first) return; + const msg = this.consumeOne(name); + if (!msg) return; + bucket.delete(first); + if (bucket.size === 0) this.waiters.delete(name); + first.resolve({ kind: "message", message: msg }); + } + + private notifyRenamed(oldName: string, newName: string): void { + const bucket = this.waiters.get(oldName); + if (!bucket) return; + for (const w of bucket) w.resolve({ kind: "renamed", to: newName }); + this.waiters.delete(oldName); + } + + rejectAllWaiters(): void { + for (const bucket of this.waiters.values()) { + for (const w of bucket) w.resolve({ kind: "aborted" }); + } + this.waiters.clear(); + } + rename(from: string, to: string): { from: string; to: string; messagesTransferred: number } { const oldName = from.trim(); const newName = to.trim(); @@ -217,7 +310,7 @@ export class MailboxStore { return { from: oldName, to: newName, messagesTransferred: 0 }; } - return runInTransaction(this.db, () => { + const result = runInTransaction(this.db, () => { const source = this.stmts.findMailbox.get(oldName) as unknown as MailboxRow | undefined; if (!source) throw new RenameError(`Mailbox '${oldName}' does not exist.`, "source-missing"); const target = this.stmts.findMailbox.get(newName) as unknown as MailboxRow | undefined; @@ -234,6 +327,8 @@ export class MailboxStore { this.db.prepare("DELETE FROM mailboxes WHERE name = ?").run(oldName); return { from: oldName, to: newName, messagesTransferred: Number(movedTo.changes ?? 0) }; }); + this.notifyRenamed(oldName, newName); + return result; } listMailboxes(forName?: string, options?: { hideAfterMinutes?: number }): MailboxInfo[] { diff --git a/node/tests/db-watch.test.ts b/node/tests/db-watch.test.ts new file mode 100644 index 0000000..b81bee6 --- /dev/null +++ b/node/tests/db-watch.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { MailboxStore } from "../src/db.js"; + +let dir: string; +let store: MailboxStore; + +beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), "claude-mailbox-wait-")); + store = new MailboxStore(join(dir, "test.db")); +}); + +afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); +}); + +describe("MailboxStore.waitForMessage", () => { + it("returns an already-pending message immediately", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + store.send("alice", "bob", "hello"); + + const ac = new AbortController(); + const result = await store.waitForMessage("bob", 1000, ac.signal); + expect(result.kind).toBe("message"); + if (result.kind === "message") { + expect(result.message.body).toBe("hello"); + expect(result.message.from_mailbox).toBe("alice"); + expect(result.message.delivered_at).not.toBeNull(); + } + }); + + it("blocks until a message arrives, then resolves", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + + const ac = new AbortController(); + const pending = store.waitForMessage("bob", 5000, ac.signal); + + setTimeout(() => store.send("alice", "bob", "later"), 50); + + const result = await pending; + expect(result.kind).toBe("message"); + if (result.kind === "message") expect(result.message.body).toBe("later"); + }); + + it("resolves with timeout when nothing arrives", async () => { + store.upsertMailbox("bob"); + const ac = new AbortController(); + const result = await store.waitForMessage("bob", 80, ac.signal); + expect(result.kind).toBe("timeout"); + }); + + it("resolves with aborted when the signal fires", async () => { + store.upsertMailbox("bob"); + const ac = new AbortController(); + const pending = store.waitForMessage("bob", 5000, ac.signal); + setTimeout(() => ac.abort(), 30); + const result = await pending; + expect(result.kind).toBe("aborted"); + }); + + it("resolves with renamed when the mailbox is renamed mid-wait", async () => { + store.upsertMailbox("oldname"); + const ac = new AbortController(); + const pending = store.waitForMessage("oldname", 5000, ac.signal); + setTimeout(() => store.rename("oldname", "newname"), 30); + const result = await pending; + expect(result.kind).toBe("renamed"); + if (result.kind === "renamed") expect(result.to).toBe("newname"); + }); + + it("FIFO single-delivery: two waiters, one send, only the first gets the message", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + + const ac1 = new AbortController(); + const ac2 = new AbortController(); + const w1 = store.waitForMessage("bob", 5000, ac1.signal); + // Stagger so w1 registers first. + await new Promise((r) => setTimeout(r, 10)); + const w2 = store.waitForMessage("bob", 200, ac2.signal); + + store.send("alice", "bob", "for-w1"); + + const [r1, r2] = await Promise.all([w1, w2]); + expect(r1.kind).toBe("message"); + if (r1.kind === "message") expect(r1.message.body).toBe("for-w1"); + expect(r2.kind).toBe("timeout"); + }); + + it("two pending messages are drained by two reconnecting waiters", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + store.send("alice", "bob", "m1"); + store.send("alice", "bob", "m2"); + + const ac = new AbortController(); + const r1 = await store.waitForMessage("bob", 1000, ac.signal); + const r2 = await store.waitForMessage("bob", 1000, ac.signal); + expect(r1.kind).toBe("message"); + expect(r2.kind).toBe("message"); + if (r1.kind === "message" && r2.kind === "message") { + expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]); + } + }); +});