diff --git a/node/src/db.ts b/node/src/db.ts index 0a368ec..97ebf20 100644 --- a/node/src/db.ts +++ b/node/src/db.ts @@ -274,6 +274,7 @@ export class MailboxStore { }); } + // Invariant: synchronous from consumeOne to resolve. Introducing an `await` between them risks marking a message delivered with no listener to receive it. private notifyOneWaiter(name: string): void { const bucket = this.waiters.get(name); if (!bucket || bucket.size === 0) return; diff --git a/node/tests/db-watch.test.ts b/node/tests/db-watch.test.ts index b81bee6..041710d 100644 --- a/node/tests/db-watch.test.ts +++ b/node/tests/db-watch.test.ts @@ -107,4 +107,49 @@ describe("MailboxStore.waitForMessage", () => { expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]); } }); + + it("abort racing send: message is either delivered or remains pending, never lost", async () => { + store.upsertMailbox("alice"); + store.upsertMailbox("bob"); + + const ac = new AbortController(); + const pending = store.waitForMessage("bob", 5000, ac.signal); + + // Fire abort and send in the same tick — either order is valid as long as the message is never lost. + ac.abort(); + store.send("alice", "bob", "racy"); + + const r = await pending; + if (r.kind === "aborted") { + // Message must still be in DB for the next caller. + expect(store.peek("bob").pending).toBe(1); + } else if (r.kind === "message") { + expect(r.message.body).toBe("racy"); + expect(store.peek("bob").pending).toBe(0); + } else { + throw new Error(`unexpected kind: ${r.kind}`); + } + }); + + it("rejectAllWaiters resolves every pending waiter as aborted and empties the bucket map", async () => { + store.upsertMailbox("bob"); + store.upsertMailbox("carol"); + + const ac1 = new AbortController(); + const ac2 = new AbortController(); + const ac3 = new AbortController(); + const w1 = store.waitForMessage("bob", 5000, ac1.signal); + const w2 = store.waitForMessage("bob", 5000, ac2.signal); + const w3 = store.waitForMessage("carol", 5000, ac3.signal); + + // Give all three a chance to register their waiters. + await new Promise((r) => setTimeout(r, 10)); + + store.rejectAllWaiters(); + + const [r1, r2, r3] = await Promise.all([w1, w2, w3]); + expect(r1.kind).toBe("aborted"); + expect(r2.kind).toBe("aborted"); + expect(r3.kind).toBe("aborted"); + }); });