import { DatabaseSync, type StatementSync } from "node:sqlite"; import { mkdirSync } from "node:fs"; import { dirname } from "node:path"; export interface MailboxRow { name: string; created_at: string; last_seen_at: string; } export interface MessageRow { id: number; to_mailbox: string; from_mailbox: string; body: string; created_at: string; delivered_at: string | null; } export interface InboxStatus { pending: number; oldestAt: Date | null; } export interface MailboxInfo { name: string; lastSeenAt: Date; pendingForYou: number; } const DDL_STATEMENTS: string[] = [ `CREATE TABLE IF NOT EXISTS mailboxes ( name TEXT NOT NULL PRIMARY KEY, created_at TEXT NOT NULL, last_seen_at TEXT NOT NULL )`, `CREATE TABLE IF NOT EXISTS messages ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, to_mailbox TEXT NOT NULL REFERENCES mailboxes(name) ON DELETE RESTRICT, from_mailbox TEXT NOT NULL REFERENCES mailboxes(name) ON DELETE RESTRICT, body TEXT NOT NULL, created_at TEXT NOT NULL, delivered_at TEXT NULL )`, `CREATE INDEX IF NOT EXISTS ix_messages_to_delivered ON messages (to_mailbox, delivered_at)`, ]; function nowIso(): string { return new Date().toISOString(); } export type RenameFailure = "invalid" | "source-missing" | "target-exists"; export type WaitResult = | { kind: "message"; message: MessageRow } | { kind: "timeout" } | { kind: "renamed"; to: string } | { kind: "aborted" }; interface Waiter { resolve: (result: WaitResult) => void; } export class RenameError extends Error { constructor(message: string, public readonly reason: RenameFailure) { super(message); this.name = "RenameError"; } } function parseDate(s: string | null | undefined): Date | null { if (!s) return null; const normalized = s.includes("T") ? s : s.replace(" ", "T") + (s.endsWith("Z") ? "" : "Z"); const d = new Date(normalized); return isNaN(d.getTime()) ? null : d; } function runInTransaction(db: DatabaseSync, fn: () => T): T { db.exec("BEGIN"); try { const result = fn(); db.exec("COMMIT"); return result; } catch (err) { try { db.exec("ROLLBACK"); } catch { // ignore: original error already on its way up } throw err; } } export class MailboxStore { private readonly db: DatabaseSync; private readonly waiters = new Map>(); private readonly stmts: { findMailbox: StatementSync; insertMailbox: StatementSync; touchMailbox: StatementSync; listMailboxes: StatementSync; listMailboxesFiltered: StatementSync; listMailboxesFilteredAnon: StatementSync; insertMessage: StatementSync; countPending: StatementSync; oldestPending: StatementSync; selectPending: StatementSync; markDelivered: StatementSync; pendingByRecipient: StatementSync; findStaleCandidates: StatementSync; deleteMessagesForNames: StatementSync; deleteMailboxesByNames: StatementSync; selectOnePending: StatementSync; }; constructor(public readonly dbPath: string) { mkdirSync(dirname(dbPath), { recursive: true }); this.db = new DatabaseSync(dbPath); this.db.exec("PRAGMA journal_mode = WAL"); this.db.exec("PRAGMA foreign_keys = ON"); for (const sql of DDL_STATEMENTS) this.db.exec(sql); this.stmts = { findMailbox: this.db.prepare("SELECT * FROM mailboxes WHERE name = ?"), insertMailbox: this.db.prepare( "INSERT INTO mailboxes (name, created_at, last_seen_at) VALUES (?, ?, ?)", ), touchMailbox: this.db.prepare("UPDATE mailboxes SET last_seen_at = ? WHERE name = ?"), listMailboxes: this.db.prepare("SELECT * FROM mailboxes ORDER BY name"), listMailboxesFiltered: this.db.prepare( `SELECT * FROM mailboxes WHERE last_seen_at >= ? OR name = ? OR name IN ( SELECT DISTINCT from_mailbox FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL ) ORDER BY name`, ), listMailboxesFilteredAnon: this.db.prepare( "SELECT * FROM mailboxes WHERE last_seen_at >= ? ORDER BY name", ), insertMessage: this.db.prepare( "INSERT INTO messages (to_mailbox, from_mailbox, body, created_at, delivered_at) VALUES (?, ?, ?, ?, NULL)", ), countPending: this.db.prepare( "SELECT COUNT(*) AS n FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL", ), oldestPending: this.db.prepare( "SELECT created_at FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL ORDER BY id LIMIT 1", ), selectPending: this.db.prepare( "SELECT * FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL ORDER BY id", ), markDelivered: this.db.prepare( "UPDATE messages SET delivered_at = ? WHERE id IN (SELECT value FROM json_each(?))", ), pendingByRecipient: this.db.prepare( "SELECT to_mailbox, COUNT(*) AS n FROM messages WHERE delivered_at IS NULL GROUP BY to_mailbox", ), findStaleCandidates: this.db.prepare( `SELECT name FROM mailboxes WHERE last_seen_at < ? AND name NOT IN (SELECT to_mailbox FROM messages WHERE delivered_at IS NULL) AND name NOT IN (SELECT from_mailbox FROM messages WHERE delivered_at IS NULL)`, ), deleteMessagesForNames: this.db.prepare( `DELETE FROM messages WHERE to_mailbox IN (SELECT value FROM json_each(?)) OR from_mailbox IN (SELECT value FROM json_each(?))`, ), deleteMailboxesByNames: this.db.prepare( "DELETE FROM mailboxes WHERE name IN (SELECT value FROM json_each(?))", ), selectOnePending: this.db.prepare( "SELECT * FROM messages WHERE to_mailbox = ? AND delivered_at IS NULL ORDER BY id LIMIT 1", ), }; } close(): void { this.db.close(); } private consumeOne(name: string): MessageRow | null { return runInTransaction(this.db, () => { const row = this.stmts.selectOnePending.get(name) as MessageRow | undefined; if (!row) return null; const deliveredAt = nowIso(); this.stmts.markDelivered.run(deliveredAt, JSON.stringify([row.id])); return { ...row, delivered_at: deliveredAt }; }); } upsertMailbox(name: string): void { const now = nowIso(); const existing = this.stmts.findMailbox.get(name) as unknown as MailboxRow | undefined; if (existing) { this.stmts.touchMailbox.run(now, name); } else { this.stmts.insertMailbox.run(name, now, now); } } send(from: string, to: string, body: string): { id: number; queuedAt: Date } { const result = runInTransaction(this.db, () => { this.upsertMailbox(from); this.upsertMailbox(to); const createdAt = nowIso(); const insert = this.stmts.insertMessage.run(to, from, body, createdAt); return { id: Number(insert.lastInsertRowid), queuedAt: new Date(createdAt) }; }); this.notifyOneWaiter(to); return result; } peek(name: string): InboxStatus { const row = this.stmts.countPending.get(name) as { n: number }; if (row.n === 0) return { pending: 0, oldestAt: null }; const oldest = this.stmts.oldestPending.get(name) as { created_at: string } | undefined; return { pending: row.n, oldestAt: parseDate(oldest?.created_at) }; } checkInbox(name: string): MessageRow[] { return runInTransaction(this.db, () => { const pending = this.stmts.selectPending.all(name) as unknown as MessageRow[]; if (pending.length > 0) { const ids = pending.map((m) => m.id); this.stmts.markDelivered.run(nowIso(), JSON.stringify(ids)); } return pending; }); } waitForMessage(name: string, timeoutMs: number, signal: AbortSignal): Promise { const existing = this.consumeOne(name); if (existing) return Promise.resolve({ kind: "message" as const, message: existing }); if (signal.aborted) return Promise.resolve({ kind: "aborted" as const }); return new Promise((resolve) => { const waiter: Waiter = { resolve }; let bucket = this.waiters.get(name); if (!bucket) { bucket = new Set(); this.waiters.set(name, bucket); } bucket.add(waiter); const cleanup = (): void => { const b = this.waiters.get(name); if (b) { b.delete(waiter); if (b.size === 0) this.waiters.delete(name); } }; const timer = setTimeout(() => { cleanup(); resolve({ kind: "timeout" }); }, timeoutMs); signal.addEventListener( "abort", () => { clearTimeout(timer); cleanup(); resolve({ kind: "aborted" }); }, { once: true }, ); }); } // Invariant: synchronous from consumeOne to resolve. Introducing an `await` between them risks marking a message delivered with no listener to receive it. private notifyOneWaiter(name: string): void { const bucket = this.waiters.get(name); if (!bucket || bucket.size === 0) return; const first = bucket.values().next().value; if (!first) return; const msg = this.consumeOne(name); if (!msg) return; bucket.delete(first); if (bucket.size === 0) this.waiters.delete(name); first.resolve({ kind: "message", message: msg }); } private notifyRenamed(oldName: string, newName: string): void { const bucket = this.waiters.get(oldName); if (!bucket) return; for (const w of bucket) w.resolve({ kind: "renamed", to: newName }); this.waiters.delete(oldName); } waiterCount(name: string): number { return this.waiters.get(name)?.size ?? 0; } rejectAllWaiters(): void { for (const bucket of this.waiters.values()) { for (const w of bucket) w.resolve({ kind: "aborted" }); } this.waiters.clear(); } rename(from: string, to: string): { from: string; to: string; messagesTransferred: number } { const oldName = from.trim(); const newName = to.trim(); if (!oldName) throw new RenameError("from is required", "invalid"); if (!newName) throw new RenameError("to is required", "invalid"); if (oldName === newName) { this.upsertMailbox(oldName); return { from: oldName, to: newName, messagesTransferred: 0 }; } const result = runInTransaction(this.db, () => { const source = this.stmts.findMailbox.get(oldName) as unknown as MailboxRow | undefined; if (!source) throw new RenameError(`Mailbox '${oldName}' does not exist.`, "source-missing"); const target = this.stmts.findMailbox.get(newName) as unknown as MailboxRow | undefined; if (target) throw new RenameError(`Mailbox '${newName}' already exists.`, "target-exists"); const now = nowIso(); this.stmts.insertMailbox.run(newName, source.created_at, now); const movedTo = this.db .prepare("UPDATE messages SET to_mailbox = ? WHERE to_mailbox = ?") .run(newName, oldName); this.db .prepare("UPDATE messages SET from_mailbox = ? WHERE from_mailbox = ?") .run(newName, oldName); this.db.prepare("DELETE FROM mailboxes WHERE name = ?").run(oldName); return { from: oldName, to: newName, messagesTransferred: Number(movedTo.changes ?? 0) }; }); this.notifyRenamed(oldName, newName); return result; } listMailboxes(forName?: string, options?: { hideAfterMinutes?: number }): MailboxInfo[] { const hideAfterMinutes = options?.hideAfterMinutes; let rows: MailboxRow[]; if (hideAfterMinutes != null && hideAfterMinutes > 0) { const cutoff = new Date(Date.now() - hideAfterMinutes * 60_000).toISOString(); if (forName) { rows = this.stmts.listMailboxesFiltered.all( cutoff, forName, forName, ) as unknown as MailboxRow[]; } else { rows = this.stmts.listMailboxesFilteredAnon.all(cutoff) as unknown as MailboxRow[]; } } else { rows = this.stmts.listMailboxes.all() as unknown as MailboxRow[]; } const pendingMap = new Map(); if (forName) { const counts = this.stmts.pendingByRecipient.all() as { to_mailbox: string; n: number }[]; for (const c of counts) pendingMap.set(c.to_mailbox, c.n); } return rows.map((r) => ({ name: r.name, lastSeenAt: parseDate(r.last_seen_at) ?? new Date(0), pendingForYou: forName ? (pendingMap.get(forName) ?? 0) : 0, })); } pruneStale(deleteAfterMinutes: number): { deletedMailboxes: number; deletedMessages: number } { if (deleteAfterMinutes <= 0) return { deletedMailboxes: 0, deletedMessages: 0 }; const cutoff = new Date(Date.now() - deleteAfterMinutes * 60_000).toISOString(); return runInTransaction(this.db, () => { const candidates = this.stmts.findStaleCandidates.all(cutoff) as { name: string }[]; if (candidates.length === 0) return { deletedMailboxes: 0, deletedMessages: 0 }; const namesJson = JSON.stringify(candidates.map((c) => c.name)); const msgResult = this.stmts.deleteMessagesForNames.run(namesJson, namesJson); const mbxResult = this.stmts.deleteMailboxesByNames.run(namesJson); return { deletedMailboxes: Number(mbxResult.changes ?? 0), deletedMessages: Number(msgResult.changes ?? 0), }; }); } } export function rowToMessage(r: MessageRow): { id: number; from: string; body: string; sentAt: Date; } { return { id: r.id, from: r.from_mailbox, body: r.body, sentAt: parseDate(r.created_at) ?? new Date(0), }; }