feat(db): add waitForMessage with FIFO single-delivery and rename signaling

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mika Kuns
2026-05-20 16:10:17 +02:00
parent 407f3a8f16
commit 31584fe623
2 changed files with 209 additions and 4 deletions

View File

@@ -52,6 +52,16 @@ function nowIso(): string {
export type RenameFailure = "invalid" | "source-missing" | "target-exists";
export type WaitResult =
| { kind: "message"; message: MessageRow }
| { kind: "timeout" }
| { kind: "renamed"; to: string }
| { kind: "aborted" };
interface Waiter {
resolve: (result: WaitResult) => void;
}
export class RenameError extends Error {
constructor(message: string, public readonly reason: RenameFailure) {
super(message);
@@ -84,6 +94,7 @@ function runInTransaction<T>(db: DatabaseSync, fn: () => T): T {
export class MailboxStore {
private readonly db: DatabaseSync;
private readonly waiters = new Map<string, Set<Waiter>>();
private readonly stmts: {
findMailbox: StatementSync;
@@ -101,6 +112,7 @@ export class MailboxStore {
findStaleCandidates: StatementSync;
deleteMessagesForNames: StatementSync;
deleteMailboxesByNames: StatementSync;
selectOnePending: StatementSync;
};
constructor(public readonly dbPath: string) {
@@ -162,6 +174,9 @@ export class MailboxStore {
deleteMailboxesByNames: this.db.prepare(
"DELETE FROM mailboxes WHERE name IN (SELECT value FROM json_each(?))",
),
selectOnePending: this.db.prepare(
"SELECT * FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL ORDER BY id LIMIT 1",
),
};
}
@@ -169,6 +184,16 @@ export class MailboxStore {
this.db.close();
}
private consumeOne(name: string): MessageRow | null {
return runInTransaction(this.db, () => {
const row = this.stmts.selectOnePending.get(name) as MessageRow | undefined;
if (!row) return null;
const deliveredAt = nowIso();
this.stmts.markDelivered.run(deliveredAt, JSON.stringify([row.id]));
return { ...row, delivered_at: deliveredAt };
});
}
upsertMailbox(name: string): void {
const now = nowIso();
const existing = this.stmts.findMailbox.get(name) as unknown as MailboxRow | undefined;
@@ -180,13 +205,15 @@ export class MailboxStore {
}
send(from: string, to: string, body: string): { id: number; queuedAt: Date } {
return runInTransaction(this.db, () => {
const result = runInTransaction(this.db, () => {
this.upsertMailbox(from);
this.upsertMailbox(to);
const createdAt = nowIso();
const result = this.stmts.insertMessage.run(to, from, body, createdAt);
return { id: Number(result.lastInsertRowid), queuedAt: new Date(createdAt) };
const insert = this.stmts.insertMessage.run(to, from, body, createdAt);
return { id: Number(insert.lastInsertRowid), queuedAt: new Date(createdAt) };
});
this.notifyOneWaiter(to);
return result;
}
peek(name: string): InboxStatus {
@@ -207,6 +234,72 @@ export class MailboxStore {
});
}
waitForMessage(name: string, timeoutMs: number, signal: AbortSignal): Promise<WaitResult> {
const existing = this.consumeOne(name);
if (existing) return Promise.resolve({ kind: "message" as const, message: existing });
if (signal.aborted) return Promise.resolve({ kind: "aborted" as const });
return new Promise<WaitResult>((resolve) => {
const waiter: Waiter = { resolve };
let bucket = this.waiters.get(name);
if (!bucket) {
bucket = new Set();
this.waiters.set(name, bucket);
}
bucket.add(waiter);
const cleanup = (): void => {
const b = this.waiters.get(name);
if (b) {
b.delete(waiter);
if (b.size === 0) this.waiters.delete(name);
}
};
const timer = setTimeout(() => {
cleanup();
resolve({ kind: "timeout" });
}, timeoutMs);
signal.addEventListener(
"abort",
() => {
clearTimeout(timer);
cleanup();
resolve({ kind: "aborted" });
},
{ once: true },
);
});
}
private notifyOneWaiter(name: string): void {
const bucket = this.waiters.get(name);
if (!bucket || bucket.size === 0) return;
const first = bucket.values().next().value;
if (!first) return;
const msg = this.consumeOne(name);
if (!msg) return;
bucket.delete(first);
if (bucket.size === 0) this.waiters.delete(name);
first.resolve({ kind: "message", message: msg });
}
private notifyRenamed(oldName: string, newName: string): void {
const bucket = this.waiters.get(oldName);
if (!bucket) return;
for (const w of bucket) w.resolve({ kind: "renamed", to: newName });
this.waiters.delete(oldName);
}
rejectAllWaiters(): void {
for (const bucket of this.waiters.values()) {
for (const w of bucket) w.resolve({ kind: "aborted" });
}
this.waiters.clear();
}
rename(from: string, to: string): { from: string; to: string; messagesTransferred: number } {
const oldName = from.trim();
const newName = to.trim();
@@ -217,7 +310,7 @@ export class MailboxStore {
return { from: oldName, to: newName, messagesTransferred: 0 };
}
return runInTransaction(this.db, () => {
const result = runInTransaction(this.db, () => {
const source = this.stmts.findMailbox.get(oldName) as unknown as MailboxRow | undefined;
if (!source) throw new RenameError(`Mailbox '${oldName}' does not exist.`, "source-missing");
const target = this.stmts.findMailbox.get(newName) as unknown as MailboxRow | undefined;
@@ -234,6 +327,8 @@ export class MailboxStore {
this.db.prepare("DELETE FROM mailboxes WHERE name = ?").run(oldName);
return { from: oldName, to: newName, messagesTransferred: Number(movedTo.changes ?? 0) };
});
this.notifyRenamed(oldName, newName);
return result;
}
listMailboxes(forName?: string, options?: { hideAfterMinutes?: number }): MailboxInfo[] {

110
node/tests/db-watch.test.ts Normal file
View File

@@ -0,0 +1,110 @@
import { describe, it, expect, beforeEach, afterEach } from "vitest";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { MailboxStore } from "../src/db.js";
let dir: string;
let store: MailboxStore;
beforeEach(() => {
dir = mkdtempSync(join(tmpdir(), "claude-mailbox-wait-"));
store = new MailboxStore(join(dir, "test.db"));
});
afterEach(() => {
store.close();
rmSync(dir, { recursive: true, force: true });
});
describe("MailboxStore.waitForMessage", () => {
it("returns an already-pending message immediately", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
store.send("alice", "bob", "hello");
const ac = new AbortController();
const result = await store.waitForMessage("bob", 1000, ac.signal);
expect(result.kind).toBe("message");
if (result.kind === "message") {
expect(result.message.body).toBe("hello");
expect(result.message.from_mailbox).toBe("alice");
expect(result.message.delivered_at).not.toBeNull();
}
});
it("blocks until a message arrives, then resolves", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const ac = new AbortController();
const pending = store.waitForMessage("bob", 5000, ac.signal);
setTimeout(() => store.send("alice", "bob", "later"), 50);
const result = await pending;
expect(result.kind).toBe("message");
if (result.kind === "message") expect(result.message.body).toBe("later");
});
it("resolves with timeout when nothing arrives", async () => {
store.upsertMailbox("bob");
const ac = new AbortController();
const result = await store.waitForMessage("bob", 80, ac.signal);
expect(result.kind).toBe("timeout");
});
it("resolves with aborted when the signal fires", async () => {
store.upsertMailbox("bob");
const ac = new AbortController();
const pending = store.waitForMessage("bob", 5000, ac.signal);
setTimeout(() => ac.abort(), 30);
const result = await pending;
expect(result.kind).toBe("aborted");
});
it("resolves with renamed when the mailbox is renamed mid-wait", async () => {
store.upsertMailbox("oldname");
const ac = new AbortController();
const pending = store.waitForMessage("oldname", 5000, ac.signal);
setTimeout(() => store.rename("oldname", "newname"), 30);
const result = await pending;
expect(result.kind).toBe("renamed");
if (result.kind === "renamed") expect(result.to).toBe("newname");
});
it("FIFO single-delivery: two waiters, one send, only the first gets the message", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
const ac1 = new AbortController();
const ac2 = new AbortController();
const w1 = store.waitForMessage("bob", 5000, ac1.signal);
// Stagger so w1 registers first.
await new Promise((r) => setTimeout(r, 10));
const w2 = store.waitForMessage("bob", 200, ac2.signal);
store.send("alice", "bob", "for-w1");
const [r1, r2] = await Promise.all([w1, w2]);
expect(r1.kind).toBe("message");
if (r1.kind === "message") expect(r1.message.body).toBe("for-w1");
expect(r2.kind).toBe("timeout");
});
it("two pending messages are drained by two reconnecting waiters", async () => {
store.upsertMailbox("alice");
store.upsertMailbox("bob");
store.send("alice", "bob", "m1");
store.send("alice", "bob", "m2");
const ac = new AbortController();
const r1 = await store.waitForMessage("bob", 1000, ac.signal);
const r2 = await store.waitForMessage("bob", 1000, ac.signal);
expect(r1.kind).toBe("message");
expect(r2.kind).toBe("message");
if (r1.kind === "message" && r2.kind === "message") {
expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]);
}
});
});