test(db): add abort-racing-send + rejectAllWaiters coverage; document notifyOneWaiter invariant
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -274,6 +274,7 @@ export class MailboxStore {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Invariant: synchronous from consumeOne to resolve. Introducing an `await` between them risks marking a message delivered with no listener to receive it.
|
||||||
private notifyOneWaiter(name: string): void {
|
private notifyOneWaiter(name: string): void {
|
||||||
const bucket = this.waiters.get(name);
|
const bucket = this.waiters.get(name);
|
||||||
if (!bucket || bucket.size === 0) return;
|
if (!bucket || bucket.size === 0) return;
|
||||||
|
|||||||
@@ -107,4 +107,49 @@ describe("MailboxStore.waitForMessage", () => {
|
|||||||
expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]);
|
expect([r1.message.body, r2.message.body]).toEqual(["m1", "m2"]);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("abort racing send: message is either delivered or remains pending, never lost", async () => {
|
||||||
|
store.upsertMailbox("alice");
|
||||||
|
store.upsertMailbox("bob");
|
||||||
|
|
||||||
|
const ac = new AbortController();
|
||||||
|
const pending = store.waitForMessage("bob", 5000, ac.signal);
|
||||||
|
|
||||||
|
// Fire abort and send in the same tick — either order is valid as long as the message is never lost.
|
||||||
|
ac.abort();
|
||||||
|
store.send("alice", "bob", "racy");
|
||||||
|
|
||||||
|
const r = await pending;
|
||||||
|
if (r.kind === "aborted") {
|
||||||
|
// Message must still be in DB for the next caller.
|
||||||
|
expect(store.peek("bob").pending).toBe(1);
|
||||||
|
} else if (r.kind === "message") {
|
||||||
|
expect(r.message.body).toBe("racy");
|
||||||
|
expect(store.peek("bob").pending).toBe(0);
|
||||||
|
} else {
|
||||||
|
throw new Error(`unexpected kind: ${r.kind}`);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejectAllWaiters resolves every pending waiter as aborted and empties the bucket map", async () => {
|
||||||
|
store.upsertMailbox("bob");
|
||||||
|
store.upsertMailbox("carol");
|
||||||
|
|
||||||
|
const ac1 = new AbortController();
|
||||||
|
const ac2 = new AbortController();
|
||||||
|
const ac3 = new AbortController();
|
||||||
|
const w1 = store.waitForMessage("bob", 5000, ac1.signal);
|
||||||
|
const w2 = store.waitForMessage("bob", 5000, ac2.signal);
|
||||||
|
const w3 = store.waitForMessage("carol", 5000, ac3.signal);
|
||||||
|
|
||||||
|
// Give all three a chance to register their waiters.
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
|
||||||
|
store.rejectAllWaiters();
|
||||||
|
|
||||||
|
const [r1, r2, r3] = await Promise.all([w1, w2, w3]);
|
||||||
|
expect(r1.kind).toBe("aborted");
|
||||||
|
expect(r2.kind).toBe("aborted");
|
||||||
|
expect(r3.kind).toBe("aborted");
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user