feat: Signal-Poller (on-chain Cursor-Scan + Truth-RSS, Notional-Filter, 72h-Dedupe)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
24
src/server/signals/poller.test.ts
Normal file
24
src/server/signals/poller.test.ts
Normal file
@@ -0,0 +1,24 @@
|
||||
import { describe, expect, test } from 'bun:test';
|
||||
import { dedupeTruthEvents, passesNotional } from './poller';
|
||||
|
||||
describe('passesNotional', () => {
|
||||
test('amount × Preis gegen MIN_NOTIONAL_USD (50k)', () => {
|
||||
expect(passesNotional(10, 6000)).toBe(true); // 60k
|
||||
expect(passesNotional(10, 4000)).toBe(false); // 40k
|
||||
expect(passesNotional(10, null)).toBe(false); // kein Preis → kein Event
|
||||
});
|
||||
});
|
||||
|
||||
describe('dedupeTruthEvents', () => {
|
||||
const H = 3600_000;
|
||||
test('max. ein Event je Coin pro 72h, über DB-Bestand + Batch hinweg', () => {
|
||||
const existing = new Map([['BTC', 1000 * H]]);
|
||||
const batch = [
|
||||
{ symbol: 'BTC', ts: 1000 * H + 71 * H, url: 'u1' }, // < 72h nach Bestand → raus
|
||||
{ symbol: 'BTC', ts: 1000 * H + 73 * H, url: 'u2' }, // ≥ 72h → bleibt
|
||||
{ symbol: 'BTC', ts: 1000 * H + 74 * H, url: 'u3' }, // < 72h nach u2 → raus
|
||||
{ symbol: 'ETH', ts: 1000 * H, url: 'u4' }, // anderer Coin → bleibt
|
||||
];
|
||||
expect(dedupeTruthEvents(batch, existing).map((e) => e.url)).toEqual(['u2', 'u4']);
|
||||
});
|
||||
});
|
||||
114
src/server/signals/poller.ts
Normal file
114
src/server/signals/poller.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { and, desc, eq } from 'drizzle-orm';
|
||||
import { db } from '../db/client';
|
||||
import { trumpEvents, trumpSignalState } from '../db/schema';
|
||||
import { getCandles } from '../market/candle-store';
|
||||
import { fetchTransfers, getBlockNumber, getBlockTs } from './onchain';
|
||||
import { matchCoins, parseTruthFeed } from './truth';
|
||||
import { COIN_KEYWORDS, MIN_NOTIONAL_USD, TRUTH_DEDUPE_MS, TRUTH_FEED_URL } from './watchlist';
|
||||
|
||||
const M15 = 15 * 60 * 1000;
|
||||
/** Obergrenze Blöcke je Zyklus (~4 getLogs-Calls); Ethereum macht ~25 Blöcke/5min — reichlich Aufholpuffer. */
|
||||
const MAX_BLOCKS_PER_CYCLE = 20_000;
|
||||
|
||||
export function passesNotional(amount: number, price: number | null): boolean {
|
||||
return price !== null && amount * price >= MIN_NOTIONAL_USD;
|
||||
}
|
||||
|
||||
/** existing: Coin → eventTs des jüngsten Truth-Events in der DB. Batch muss ts-aufsteigend sein. */
|
||||
export function dedupeTruthEvents(
|
||||
batch: { symbol: string; ts: number; url: string }[],
|
||||
existing: Map<string, number>,
|
||||
): { symbol: string; ts: number; url: string }[] {
|
||||
// Erst ts-aufsteigend sortieren, um lastTs korrekt zu befüllen; dann Akzeptanz-Set bauen
|
||||
const sorted = [...batch].sort((a, b) => a.ts - b.ts);
|
||||
const lastTs = new Map(existing);
|
||||
const accepted = new Set<string>();
|
||||
for (const e of sorted) {
|
||||
const prev = lastTs.get(e.symbol);
|
||||
if (prev !== undefined && e.ts - prev < TRUTH_DEDUPE_MS) continue;
|
||||
lastTs.set(e.symbol, e.ts);
|
||||
accepted.add(e.url);
|
||||
}
|
||||
// Ursprüngliche Batch-Reihenfolge beibehalten
|
||||
return batch.filter((e) => accepted.has(e.url));
|
||||
}
|
||||
|
||||
/** Letzter 15m-Close ≤ ts als USD-Proxy (USDT≈USD). null wenn keine Candle vorhanden. */
|
||||
async function priceAt(instrument: string, ts: number): Promise<number | null> {
|
||||
const candles = await getCandles(instrument as any, ts - 24 * 3600_000, ts + M15);
|
||||
return candles.length > 0 ? candles[candles.length - 1].close : null;
|
||||
}
|
||||
|
||||
export async function pollOnchain(): Promise<number> {
|
||||
const head = await getBlockNumber();
|
||||
const [state] = await db.select().from(trumpSignalState).where(eq(trumpSignalState.id, 1));
|
||||
if (!state) {
|
||||
// Erster Lauf: ab jetzt scannen (Historie macht trump-backfill)
|
||||
await db.insert(trumpSignalState).values({ id: 1, lastBlock: head });
|
||||
return 0;
|
||||
}
|
||||
const from = state.lastBlock + 1;
|
||||
const to = Math.min(head, state.lastBlock + MAX_BLOCKS_PER_CYCLE);
|
||||
if (from > to) return 0;
|
||||
|
||||
const transfers = await fetchTransfers(from, to);
|
||||
let inserted = 0;
|
||||
const blockTs = new Map<number, number>();
|
||||
for (const t of transfers) {
|
||||
if (!blockTs.has(t.blockNumber)) blockTs.set(t.blockNumber, await getBlockTs(t.blockNumber));
|
||||
const ts = blockTs.get(t.blockNumber)!;
|
||||
const price = t.instrument ? await priceAt(t.instrument, ts) : null;
|
||||
if (!passesNotional(t.amount, price)) continue;
|
||||
await db
|
||||
.insert(trumpEvents)
|
||||
.values({
|
||||
source: 'onchain', token: t.symbol, instrument: t.instrument,
|
||||
eventTs: new Date(ts), ref: t.txHash, notionalUsd: t.amount * price!,
|
||||
})
|
||||
.onConflictDoNothing();
|
||||
inserted++;
|
||||
}
|
||||
await db.update(trumpSignalState).set({ lastBlock: to, updatedAt: new Date() }).where(eq(trumpSignalState.id, 1));
|
||||
return inserted;
|
||||
}
|
||||
|
||||
export async function pollTruth(): Promise<number> {
|
||||
const res = await fetch(TRUTH_FEED_URL, { signal: AbortSignal.timeout(15_000) });
|
||||
if (!res.ok) throw new Error(`trumpstruth HTTP ${res.status}`);
|
||||
const posts = parseTruthFeed(await res.text());
|
||||
|
||||
const candidates: { symbol: string; ts: number; url: string }[] = [];
|
||||
for (const p of posts) for (const symbol of matchCoins(p.text)) candidates.push({ symbol, ts: p.ts, url: p.url });
|
||||
if (candidates.length === 0) return 0;
|
||||
|
||||
const symbols = [...new Set(candidates.map((c) => c.symbol))];
|
||||
const existing = new Map<string, number>();
|
||||
for (const s of symbols) {
|
||||
const [row] = await db
|
||||
.select({ eventTs: trumpEvents.eventTs })
|
||||
.from(trumpEvents)
|
||||
.where(and(eq(trumpEvents.source, 'truth'), eq(trumpEvents.token, s)))
|
||||
.orderBy(desc(trumpEvents.eventTs))
|
||||
.limit(1);
|
||||
if (row) existing.set(s, row.eventTs.getTime());
|
||||
}
|
||||
|
||||
let inserted = 0;
|
||||
for (const e of dedupeTruthEvents(candidates, existing)) {
|
||||
const kw = COIN_KEYWORDS.find((c) => c.symbol === e.symbol)!;
|
||||
await db
|
||||
.insert(trumpEvents)
|
||||
.values({ source: 'truth', token: e.symbol, instrument: kw.instrument, eventTs: new Date(e.ts), ref: e.url })
|
||||
.onConflictDoNothing();
|
||||
inserted++;
|
||||
}
|
||||
return inserted;
|
||||
}
|
||||
|
||||
/** Beide Quellen, Fehler isoliert (eine tote Quelle stoppt die andere nicht). */
|
||||
export async function pollSignals(): Promise<void> {
|
||||
const results = await Promise.allSettled([pollOnchain(), pollTruth()]);
|
||||
for (const r of results) {
|
||||
if (r.status === 'rejected') console.error('Signal-Poller-Fehler:', r.reason);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user