Skip to main content
import httpx, json

with httpx.stream("GET", "https://api.rekko.ai/v1/stream",
    params={"events": "price_shift,whale_alert"},
    headers={"Authorization": "Bearer YOUR_API_KEY"},
) as resp:
    for line in resp.iter_lines():
        if line:
            print(json.loads(line))

What this page covers

  • Server-Sent Events (SSE) for real-time market monitoring
  • Webhooks for push notifications to your server
  • Event types: price shifts, whale alerts, analysis completions
  • Complete receiver implementations in Python, cURL, and JavaScript
  • When to use SSE vs webhooks vs polling

Why real-time data?

Prediction market prices move in response to news, large trades, and liquidity shifts. A market that was fairly priced an hour ago may have a 5% edge now. Real-time monitoring lets you:
  • React to price shifts before the market corrects
  • Detect whale trades that signal informed positioning
  • Get notified when a deep analysis completes
  • Build event-driven trading bots instead of polling on an interval

Server-Sent Events (SSE)

SSE is a persistent HTTP connection where the server pushes events to the client. No WebSocket setup needed — it works over standard HTTP.

Connect to the stream

import httpx
import json

with httpx.stream(
    "GET",
    "https://api.rekko.ai/v1/stream",
    params={"events": "price_shift,whale_alert,analysis_complete"},
    headers={"Authorization": "Bearer YOUR_API_KEY"},
    timeout=None,  # Keep connection open
) as response:
    for line in response.iter_lines():
        if not line:
            continue
        event = json.loads(line)
        if event["type"] == "price_shift":
            print(f"Price move: {event['market_id']}{event['old_yes']:.2f}{event['new_yes']:.2f}")
        elif event["type"] == "whale_alert":
            print(f"Whale: {event['market_id']} — ${event['flow']:,.0f} {event['side']}")
        elif event["type"] == "analysis_complete":
            print(f"Analysis ready: {event['analysis_id']}")

Event types

EventDescriptionKey fields
price_shiftMarket price moved significantlymarket_id, platform, old_yes, new_yes, change_pct
whale_alertLarge trade detectedmarket_id, platform, side, flow, price
analysis_completeDeep analysis finishedanalysis_id, market_id, platform

Filter events

Pass only the event types you want via the events parameter:
GET /v1/stream?events=price_shift           # Only price moves
GET /v1/stream?events=whale_alert           # Only large trades
GET /v1/stream?events=price_shift,whale_alert  # Both

Webhooks

Webhooks push events to your server via HTTP POST. Unlike SSE (which requires an open connection), webhooks fire asynchronously — your server receives a POST request for each event.

Register a webhook

import httpx

client = httpx.Client(
    base_url="https://api.rekko.ai/v1",
    headers={"Authorization": "Bearer YOUR_API_KEY"},
)

webhook = client.post("/webhooks", json={
    "url": "https://your-server.com/webhook",
    "events": ["price_shift", "whale_alert"],
    "secret": "your_hmac_secret_here",
}).json()

print(f"Webhook ID: {webhook['webhook_id']}")

Manage webhooks

# List your webhooks
hooks = client.get("/webhooks").json()

# Delete a webhook
client.delete(f"/webhooks/{webhook_id}")

Webhook payload

Each webhook POST sends a JSON body with the event data:
{
  "type": "price_shift",
  "market_id": "KXFED-26MAR19",
  "platform": "kalshi",
  "old_yes": 0.62,
  "new_yes": 0.68,
  "change_pct": 9.7,
  "timestamp": "2026-03-21T14:30:00Z"
}

Verify webhook signatures

Each webhook includes an x-webhook-signature header with an HMAC-SHA256 signature of the body using your secret:
import hmac
import hashlib

def verify_signature(body: bytes, signature: str, secret: str) -> bool:
    expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature)

Complete webhook receiver (FastAPI)

"""Webhook receiver for Rekko prediction market events."""
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib

app = FastAPI()
WEBHOOK_SECRET = "your_hmac_secret_here"

@app.post("/webhook")
async def handle_webhook(request: Request):
    body = await request.body()
    signature = request.headers.get("x-webhook-signature", "")

    expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(expected, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = await request.json()

    if event["type"] == "price_shift":
        print(f"Price move: {event['market_id']}{event['change_pct']:.1f}%")
        # Trigger re-analysis or trade logic here

    elif event["type"] == "whale_alert":
        print(f"Whale trade: {event['market_id']} — ${event['flow']:,.0f}")
        # Alert your trading system

    elif event["type"] == "analysis_complete":
        print(f"Analysis ready: {event['analysis_id']}")
        # Fetch the completed analysis

    return {"ok": True}

SSE vs webhooks vs polling

ApproachLatencyInfrastructureUse case
SSE streamingReal-timeOpen connectionDesktop bot, monitoring dashboard
WebhooksNear real-timePublic endpointServer-side trading bot, cloud functions
Polling (GET /markets)Depends on intervalNoneSimple scripts, cron jobs
SSE is best when you need immediate reaction and have a long-running process (trading bot on a server, monitoring script on your laptop). Webhooks are best for serverless architectures or when you want events to trigger specific actions without maintaining an open connection. Polling is simplest but adds latency. Use the GET /v1/markets endpoint on an interval (e.g., every 60 seconds) for low-frequency monitoring.

Complete event-driven trading bot

Combining SSE with the trading API:
"""Event-driven prediction market trading bot."""
import httpx
import json

API_KEY = "YOUR_API_KEY"
BANKROLL = 10_000
MIN_SHIFT_PCT = 5.0  # React to shifts > 5%

client = httpx.Client(
    base_url="https://api.rekko.ai/v1",
    headers={"Authorization": f"Bearer {API_KEY}"},
    timeout=120.0,
)

def handle_price_shift(event: dict):
    """Analyze and potentially trade on a price shift."""
    market_id = event["market_id"]
    platform = event["platform"]
    print(f"\nPrice shift detected: {market_id} ({event['change_pct']:.1f}%)")

    # Get a trading signal
    signal = client.post("/signals", params={"wait": "true"}, json={
        "market": f"{platform}/{market_id}",
        "risk_limit": "medium",
    }).json()

    if signal["recommendation"] == "NO_TRADE":
        print(f"  No trade — edge insufficient")
        return

    size = signal["size_pct"] * BANKROLL
    print(f"  {signal['recommendation']} — edge: {signal['edge']:.1%}, size: ${size:.0f}")

    # Paper trade
    client.post("/trades/shadow", json={
        "ticker": market_id,
        "platform": platform,
        "side": "yes" if "YES" in signal["recommendation"] else "no",
        "size_usd": size,
    })
    print(f"  Shadow trade placed")

# Main event loop
print("Listening for price shifts...")
with httpx.stream(
    "GET",
    f"{client.base_url}/stream",
    params={"events": "price_shift"},
    headers=client.headers,
    timeout=None,
) as response:
    for line in response.iter_lines():
        if not line:
            continue
        event = json.loads(line)
        if event.get("change_pct", 0) >= MIN_SHIFT_PCT:
            handle_price_shift(event)

What’s next

Build a trading bot

Full trading bot tutorial with screening and analysis.

SSE API reference

Full endpoint documentation for the SSE stream.

Webhooks API reference

Full endpoint documentation for webhook management.