diff --git a/src/server/signals/poller.test.ts b/src/server/signals/poller.test.ts new file mode 100644 index 0000000..881ab54 --- /dev/null +++ b/src/server/signals/poller.test.ts @@ -0,0 +1,24 @@ +import { describe, expect, test } from 'bun:test'; +import { dedupeTruthEvents, passesNotional } from './poller'; + +describe('passesNotional', () => { + test('amount × Preis gegen MIN_NOTIONAL_USD (50k)', () => { + expect(passesNotional(10, 6000)).toBe(true); // 60k + expect(passesNotional(10, 4000)).toBe(false); // 40k + expect(passesNotional(10, null)).toBe(false); // kein Preis → kein Event + }); +}); + +describe('dedupeTruthEvents', () => { + const H = 3600_000; + test('max. ein Event je Coin pro 72h, über DB-Bestand + Batch hinweg', () => { + const existing = new Map([['BTC', 1000 * H]]); + const batch = [ + { symbol: 'BTC', ts: 1000 * H + 71 * H, url: 'u1' }, // < 72h nach Bestand → raus + { symbol: 'BTC', ts: 1000 * H + 73 * H, url: 'u2' }, // ≥ 72h → bleibt + { symbol: 'BTC', ts: 1000 * H + 74 * H, url: 'u3' }, // < 72h nach u2 → raus + { symbol: 'ETH', ts: 1000 * H, url: 'u4' }, // anderer Coin → bleibt + ]; + expect(dedupeTruthEvents(batch, existing).map((e) => e.url)).toEqual(['u2', 'u4']); + }); +}); diff --git a/src/server/signals/poller.ts b/src/server/signals/poller.ts new file mode 100644 index 0000000..ae56090 --- /dev/null +++ b/src/server/signals/poller.ts @@ -0,0 +1,114 @@ +import { and, desc, eq } from 'drizzle-orm'; +import { db } from '../db/client'; +import { trumpEvents, trumpSignalState } from '../db/schema'; +import { getCandles } from '../market/candle-store'; +import { fetchTransfers, getBlockNumber, getBlockTs } from './onchain'; +import { matchCoins, parseTruthFeed } from './truth'; +import { COIN_KEYWORDS, MIN_NOTIONAL_USD, TRUTH_DEDUPE_MS, TRUTH_FEED_URL } from './watchlist'; + +const M15 = 15 * 60 * 1000; +/** Obergrenze Blöcke je Zyklus (~4 getLogs-Calls); Ethereum macht ~25 Blöcke/5min — reichlich Aufholpuffer. */ +const MAX_BLOCKS_PER_CYCLE = 20_000; + +export function passesNotional(amount: number, price: number | null): boolean { + return price !== null && amount * price >= MIN_NOTIONAL_USD; +} + +/** existing: Coin → eventTs des jüngsten Truth-Events in der DB. Batch muss ts-aufsteigend sein. */ +export function dedupeTruthEvents( + batch: { symbol: string; ts: number; url: string }[], + existing: Map, +): { symbol: string; ts: number; url: string }[] { + // Erst ts-aufsteigend sortieren, um lastTs korrekt zu befüllen; dann Akzeptanz-Set bauen + const sorted = [...batch].sort((a, b) => a.ts - b.ts); + const lastTs = new Map(existing); + const accepted = new Set(); + for (const e of sorted) { + const prev = lastTs.get(e.symbol); + if (prev !== undefined && e.ts - prev < TRUTH_DEDUPE_MS) continue; + lastTs.set(e.symbol, e.ts); + accepted.add(e.url); + } + // Ursprüngliche Batch-Reihenfolge beibehalten + return batch.filter((e) => accepted.has(e.url)); +} + +/** Letzter 15m-Close ≤ ts als USD-Proxy (USDT≈USD). null wenn keine Candle vorhanden. */ +async function priceAt(instrument: string, ts: number): Promise { + const candles = await getCandles(instrument as any, ts - 24 * 3600_000, ts + M15); + return candles.length > 0 ? candles[candles.length - 1].close : null; +} + +export async function pollOnchain(): Promise { + const head = await getBlockNumber(); + const [state] = await db.select().from(trumpSignalState).where(eq(trumpSignalState.id, 1)); + if (!state) { + // Erster Lauf: ab jetzt scannen (Historie macht trump-backfill) + await db.insert(trumpSignalState).values({ id: 1, lastBlock: head }); + return 0; + } + const from = state.lastBlock + 1; + const to = Math.min(head, state.lastBlock + MAX_BLOCKS_PER_CYCLE); + if (from > to) return 0; + + const transfers = await fetchTransfers(from, to); + let inserted = 0; + const blockTs = new Map(); + for (const t of transfers) { + if (!blockTs.has(t.blockNumber)) blockTs.set(t.blockNumber, await getBlockTs(t.blockNumber)); + const ts = blockTs.get(t.blockNumber)!; + const price = t.instrument ? await priceAt(t.instrument, ts) : null; + if (!passesNotional(t.amount, price)) continue; + await db + .insert(trumpEvents) + .values({ + source: 'onchain', token: t.symbol, instrument: t.instrument, + eventTs: new Date(ts), ref: t.txHash, notionalUsd: t.amount * price!, + }) + .onConflictDoNothing(); + inserted++; + } + await db.update(trumpSignalState).set({ lastBlock: to, updatedAt: new Date() }).where(eq(trumpSignalState.id, 1)); + return inserted; +} + +export async function pollTruth(): Promise { + const res = await fetch(TRUTH_FEED_URL, { signal: AbortSignal.timeout(15_000) }); + if (!res.ok) throw new Error(`trumpstruth HTTP ${res.status}`); + const posts = parseTruthFeed(await res.text()); + + const candidates: { symbol: string; ts: number; url: string }[] = []; + for (const p of posts) for (const symbol of matchCoins(p.text)) candidates.push({ symbol, ts: p.ts, url: p.url }); + if (candidates.length === 0) return 0; + + const symbols = [...new Set(candidates.map((c) => c.symbol))]; + const existing = new Map(); + for (const s of symbols) { + const [row] = await db + .select({ eventTs: trumpEvents.eventTs }) + .from(trumpEvents) + .where(and(eq(trumpEvents.source, 'truth'), eq(trumpEvents.token, s))) + .orderBy(desc(trumpEvents.eventTs)) + .limit(1); + if (row) existing.set(s, row.eventTs.getTime()); + } + + let inserted = 0; + for (const e of dedupeTruthEvents(candidates, existing)) { + const kw = COIN_KEYWORDS.find((c) => c.symbol === e.symbol)!; + await db + .insert(trumpEvents) + .values({ source: 'truth', token: e.symbol, instrument: kw.instrument, eventTs: new Date(e.ts), ref: e.url }) + .onConflictDoNothing(); + inserted++; + } + return inserted; +} + +/** Beide Quellen, Fehler isoliert (eine tote Quelle stoppt die andere nicht). */ +export async function pollSignals(): Promise { + const results = await Promise.allSettled([pollOnchain(), pollTruth()]); + for (const r of results) { + if (r.status === 'rejected') console.error('Signal-Poller-Fehler:', r.reason); + } +}