Files
ClaudeMailbox/node/src/server.ts
Mika Kuns bc53daf6e6 fix(server): set explicit connectionTimeout to bound long-poll sockets
Fastify's default connectionTimeout is 0 (no timeout). With /v1/watch
holding requests open for up to 300s, an OS-level cap prevents a stuck
socket from persisting forever even if app-level cleanup misses a case.
Set just above the watch max so a healthy long-poll never races the
socket timeout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 16:24:26 +02:00

221 lines
6.8 KiB
TypeScript

import Fastify, { type FastifyInstance, type FastifyReply, type FastifyRequest } from "fastify";
import { readFileSync } from "node:fs";
import { join, dirname } from "node:path";
import { fileURLToPath } from "node:url";
import { MailboxStore, RenameError, rowToMessage } from "./db.js";
import type { DaemonConfig } from "./config.js";
import { registerMcp } from "./mcp.js";
export const HEADER_NAME = "x-mailbox";
declare module "fastify" {
interface FastifyRequest {
mailboxName?: string;
}
}
function readVersion(): string {
try {
const here = dirname(fileURLToPath(import.meta.url));
const pkg = JSON.parse(readFileSync(join(here, "..", "package.json"), "utf8")) as {
version?: string;
};
return pkg.version ?? "unknown";
} catch {
return "unknown";
}
}
const ANONYMOUS_PATHS = new Set(["/v1/list", "/v1/peek"]);
export async function buildServer(cfg: DaemonConfig, store: MailboxStore): Promise<FastifyInstance> {
const app = Fastify({
logger: true,
connectionTimeout: 310_000,
});
const version = readVersion();
app.addHook("onRequest", async (req: FastifyRequest, reply: FastifyReply) => {
const url = req.url.split("?")[0] ?? "/";
if (url === "/health" || url === "/mcp" || url.startsWith("/mcp/")) return;
const headerValue = req.headers[HEADER_NAME];
const name = (Array.isArray(headerValue) ? headerValue[0] : headerValue ?? "").trim();
if (!name) {
if (ANONYMOUS_PATHS.has(url)) return;
reply.code(400).send({ error: `Missing ${HEADER_NAME} header.` });
return reply;
}
req.mailboxName = name;
store.upsertMailbox(name);
});
app.get("/health", async () => ({
status: "ok",
version,
dbPath: cfg.dbPath,
}));
app.post<{ Body: { to?: string; body?: string } }>("/v1/send", async (req, reply) => {
const { to, body } = req.body ?? {};
if (!to || !body) {
reply.code(400);
return { error: "to and body are required" };
}
const from = req.mailboxName!;
const result = store.send(from, to, body);
return { id: result.id, queuedAt: result.queuedAt.toISOString() };
});
app.get<{ Querystring: { name?: string } }>("/v1/peek", async (req, reply) => {
const name = (req.query.name ?? "").trim();
if (!name) {
reply.code(400);
return { error: "name is required" };
}
const status = store.peek(name);
return {
pending: status.pending,
oldestAt: status.oldestAt?.toISOString() ?? null,
};
});
app.post<{ Querystring: { name?: string } }>("/v1/check-inbox", async (req, reply) => {
const name = (req.query.name ?? "").trim();
if (name !== req.mailboxName) {
reply.code(403);
return { error: "X-Mailbox header must match name." };
}
return store.checkInbox(name).map((m) => {
const msg = rowToMessage(m);
return { ...msg, sentAt: msg.sentAt.toISOString() };
});
});
app.get("/v1/list", async (req) => {
const name = req.mailboxName;
return store
.listMailboxes(name, { hideAfterMinutes: cfg.hideAfterMinutes })
.map((m) => ({
name: m.name,
lastSeenAt: m.lastSeenAt.toISOString(),
pendingForYou: m.pendingForYou,
}));
});
app.post<{ Body: { to?: string } }>("/v1/rename", async (req, reply) => {
const from = req.mailboxName!;
const to = (req.body?.to ?? "").trim();
if (!to) {
reply.code(400);
return { error: "to is required" };
}
try {
const r = store.rename(from, to);
return { from: r.from, to: r.to, messagesTransferred: r.messagesTransferred };
} catch (err) {
if (err instanceof RenameError) {
reply.code(err.reason === "target-exists" ? 409 : 400);
return { error: err.message, reason: err.reason };
}
throw err;
}
});
const WATCH_DEFAULT_TIMEOUT_S = 25;
const WATCH_MAX_TIMEOUT_S = 300;
app.get<{ Querystring: { name?: string; timeout?: string } }>(
"/v1/watch",
async (req, reply) => {
const name = (req.query.name ?? "").trim();
if (!name) {
reply.code(400);
return { error: "name is required" };
}
if (name !== req.mailboxName) {
reply.code(403);
return { error: "X-Mailbox header must match name." };
}
const rawTimeout = req.query.timeout;
const timeoutS = rawTimeout != null ? parseInt(rawTimeout, 10) : WATCH_DEFAULT_TIMEOUT_S;
if (!Number.isFinite(timeoutS) || timeoutS <= 0 || timeoutS > WATCH_MAX_TIMEOUT_S) {
reply.code(400);
return { error: `timeout must be 1..${WATCH_MAX_TIMEOUT_S} seconds` };
}
const ac = new AbortController();
const onClose = (): void => ac.abort();
req.raw.once("close", onClose);
try {
const result = await store.waitForMessage(name, timeoutS * 1000, ac.signal);
if (result.kind === "message") {
const msg = rowToMessage(result.message);
reply.code(200);
return { ...msg, sentAt: msg.sentAt.toISOString() };
}
if (result.kind === "renamed") {
reply.code(409);
return { reason: "renamed", to: result.to };
}
if (result.kind === "timeout") {
reply.code(204);
return reply.send();
}
// aborted — client gone; hand control to Fastify without writing to the dead socket.
reply.hijack();
return;
} finally {
req.raw.off("close", onClose);
}
},
);
await registerMcp(app, store, cfg.hideAfterMinutes);
return app;
}
function startSweep(
store: MailboxStore,
cfg: DaemonConfig,
log: FastifyInstance["log"],
): NodeJS.Timeout | null {
if (cfg.sweepIntervalMinutes <= 0 || cfg.deleteAfterMinutes <= 0) return null;
const runOnce = (): void => {
try {
const r = store.pruneStale(cfg.deleteAfterMinutes);
if (r.deletedMailboxes > 0 || r.deletedMessages > 0) {
log.info(
r,
`Pruned ${r.deletedMailboxes} stale mailbox(es) and ${r.deletedMessages} delivered message(s)`,
);
}
} catch (err) {
log.error({ err }, "Stale-mailbox sweep failed");
}
};
runOnce();
const timer = setInterval(runOnce, cfg.sweepIntervalMinutes * 60_000);
timer.unref?.();
return timer;
}
export async function startServer(
cfg: DaemonConfig,
): Promise<{ app: FastifyInstance; store: MailboxStore; sweepTimer: NodeJS.Timeout | null }> {
const store = new MailboxStore(cfg.dbPath);
const app = await buildServer(cfg, store);
const timerRef: { current: NodeJS.Timeout | null } = { current: null };
app.addHook("onClose", async () => {
if (timerRef.current) clearInterval(timerRef.current);
});
await app.listen({ host: cfg.bind, port: cfg.port });
timerRef.current = startSweep(store, cfg, app.log);
return { app, store, sweepTimer: timerRef.current };
}