From 6e552e6926c5c58e059bf8073815bdc33a882a38 Mon Sep 17 00:00:00 2001 From: cawthorne Date: Mon, 9 Mar 2026 18:15:58 +0000 Subject: [PATCH 1/2] Add Tiingo local websock test --- .../tiingo/test/local-ws-test/README.md | 78 +++++ .../tiingo/test/local-ws-test/proxy.js | 151 ++++++++++ .../test/local-ws-test/test-failover.sh | 270 ++++++++++++++++++ 3 files changed, 499 insertions(+) create mode 100644 packages/sources/tiingo/test/local-ws-test/README.md create mode 100644 packages/sources/tiingo/test/local-ws-test/proxy.js create mode 100755 packages/sources/tiingo/test/local-ws-test/test-failover.sh diff --git a/packages/sources/tiingo/test/local-ws-test/README.md b/packages/sources/tiingo/test/local-ws-test/README.md new file mode 100644 index 00000000000..0ebba458fbe --- /dev/null +++ b/packages/sources/tiingo/test/local-ws-test/README.md @@ -0,0 +1,78 @@ +# Tiingo WS Failover — Manual Integration Tests + +These scripts test end-to-end WebSocket failover behaviour by running two local +proxies between the EA and the real Tiingo upstream, then artificially triggering +abnormal closures via the proxy control API. + +## What is tested + +- All four transports subscribe and receive live data (IEX, crypto, crypto-lwba, forex) +- Abnormal WS closures (code 1006 / TCP terminate) increment the failover counter +- The 2:1 primary/secondary cycle is respected across 6 rounds of closures: + - counter=1 → cycle=1 → primary + - counter=2 → cycle=2 → **SECONDARY** (failover) + - counter=3 → cycle=0 → primary (failback) + - counter=4 → cycle=1 → primary + - counter=5 → cycle=2 → **SECONDARY** (failover again) + - counter=6 → cycle=0 → primary (failback again) +- IEX always stays on primary (its URL is hardcoded; it does not participate in failover) + +## Prerequisites + +1. Build the Tiingo EA dist: + + ```bash + yarn workspace @chainlink/tiingo-adapter build + ``` + +2. Export your Tiingo API key: + + ```bash + export TIINGO_API_KEY= + ``` + +3. `python3` must be on your PATH (used for parsing JSON responses in the test script). + +## Running + +```bash +export TIINGO_API_KEY= +bash test/local-ws-test/test-failover.sh +``` + +Optional environment overrides: +| Variable | Default | Description | +|-----------------------|---------|------------------------------------| +| `EA_PORT` | 8181 | Port for the local EA HTTP server | +| `PRIMARY_PORT` | 9001 | Port for the primary WS proxy | +| `PRIMARY_CTRL` | 9002 | Control HTTP port for primary proxy | +| `SECONDARY_PORT` | 9003 | Port for the secondary WS proxy | +| `SECONDARY_CTRL` | 9004 | Control HTTP port for secondary proxy | +| `PRIMARY_ATTEMPTS` | 2 | Attempts on primary per cycle | +| `SECONDARY_ATTEMPTS` | 1 | Attempts on secondary per cycle | + +## Proxy control API + +While the proxy is running you can query and control it directly: + +```bash +# List open connections +curl http://localhost:9002/status + +# Close all connections abruptly (simulates code 1005 / no status received) +curl -X POST "http://localhost:9002/close?code=1005" + +# Close only the IEX connection +curl -X POST "http://localhost:9002/close?code=1005&path=/iex" + +# Normal close +curl -X POST "http://localhost:9002/close?code=1000" +``` + +## Log files + +After a run, full logs are available at: + +- `/tmp/tiingo-ea.log` — EA output +- `/tmp/proxy-primary.log` — primary proxy +- `/tmp/proxy-secondary.log` — secondary proxy diff --git a/packages/sources/tiingo/test/local-ws-test/proxy.js b/packages/sources/tiingo/test/local-ws-test/proxy.js new file mode 100644 index 00000000000..6b60ed819f4 --- /dev/null +++ b/packages/sources/tiingo/test/local-ws-test/proxy.js @@ -0,0 +1,151 @@ +#!/usr/bin/env node +/** + * WebSocket proxy for Tiingo failover testing. + * + * Forwards WS connections from the EA to a real upstream, and exposes an HTTP + * control server to trigger artificial closes and inspect open connections. + * + * Usage: + * UPSTREAM_WS_URL=wss://api.tiingo.com PROXY_PORT=9001 CONTROL_PORT=9002 node proxy.js + * + * Control endpoints: + * GET http://localhost:$CONTROL_PORT/status – list open connections + * POST http://localhost:$CONTROL_PORT/close?code=1005 – close all connections + * POST http://localhost:$CONTROL_PORT/close?code=1005&path=/iex – close by path + */ + +'use strict' + +const WebSocket = require('ws') +const http = require('http') +const url = require('url') + +const PROXY_PORT = parseInt(process.env.PROXY_PORT || '9001', 10) +const CONTROL_PORT = parseInt(process.env.CONTROL_PORT || String(PROXY_PORT + 1), 10) +const UPSTREAM_WS_URL = process.env.UPSTREAM_WS_URL || 'wss://api.tiingo.com' + +const activePairs = [] // { id, clientWs, upstreamWs, path } +let connectionCounter = 0 + +// ── WebSocket proxy server ──────────────────────────────────────────────────── +const wss = new WebSocket.Server({ port: PROXY_PORT }) +console.log(`[proxy] Listening on ws://localhost:${PROXY_PORT}`) +console.log(`[proxy] Forwarding to ${UPSTREAM_WS_URL}`) + +wss.on('connection', (clientWs, req) => { + const id = ++connectionCounter + const path = req.url || '' + const upstreamUrl = `${UPSTREAM_WS_URL}${path}` + console.log(`[proxy][${id}] EA connected, opening upstream: ${upstreamUrl}`) + + const upstreamWs = new WebSocket(upstreamUrl) + const pair = { id, clientWs, upstreamWs, path } + activePairs.push(pair) + + // Buffer messages from EA that arrive before upstream is ready + const pendingMessages = [] + + upstreamWs.on('open', () => { + console.log( + `[proxy][${id}] Upstream connected — flushing ${pendingMessages.length} buffered message(s)`, + ) + for (const msg of pendingMessages) { + upstreamWs.send(msg.toString('utf8')) // send as text frame, not binary + } + pendingMessages.length = 0 + }) + + upstreamWs.on('message', (data) => { + if (clientWs.readyState === WebSocket.OPEN) clientWs.send(data) + }) + + clientWs.on('message', (data) => { + if (upstreamWs.readyState === WebSocket.OPEN) { + upstreamWs.send(data) + } else { + pendingMessages.push(data) + } + }) + + upstreamWs.on('close', (code, reason) => { + console.log(`[proxy][${id}] Upstream closed: code=${code} reason=${reason?.toString() || ''}`) + if (clientWs.readyState === WebSocket.OPEN) { + // 1005/1006 cannot be sent in a close frame — terminate the TCP connection instead + if (code === 1005 || code === 1006) { + clientWs.terminate() + } else { + clientWs.close(code, reason) + } + } + removePair(id) + }) + + upstreamWs.on('error', (err) => { + console.error(`[proxy][${id}] Upstream error: ${err.message}`) + }) + + clientWs.on('close', (code, reason) => { + console.log(`[proxy][${id}] EA closed: code=${code} reason=${reason?.toString() || ''}`) + if (upstreamWs.readyState === WebSocket.OPEN) upstreamWs.close() + removePair(id) + }) + + clientWs.on('error', (err) => { + console.error(`[proxy][${id}] EA error: ${err.message}`) + }) +}) + +function removePair(id) { + const idx = activePairs.findIndex((p) => p.id === id) + if (idx !== -1) activePairs.splice(idx, 1) +} + +// ── Control HTTP server ─────────────────────────────────────────────────────── +const controlServer = http.createServer((req, res) => { + const parsed = url.parse(req.url, true) + + if (req.method === 'GET' && parsed.pathname === '/status') { + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end( + JSON.stringify({ + openConnections: activePairs.length, + connections: activePairs.map((p) => ({ id: p.id, path: p.path })), + }), + ) + return + } + + if (req.method === 'POST' && parsed.pathname === '/close') { + const code = parseInt(parsed.query.code || '1005', 10) + const reason = parsed.query.reason || '' + const pathFilter = parsed.query.path || null + + const targets = [...activePairs].filter((p) => !pathFilter || p.path === pathFilter) + + console.log( + `[control] Closing ${targets.length} connection(s) with code=${code}` + + (pathFilter ? ` path=${pathFilter}` : ''), + ) + + for (const pair of targets) { + if (pair.clientWs.readyState === WebSocket.OPEN) { + if (code === 1005 || code === 1006) { + pair.clientWs.terminate() + } else { + pair.clientWs.close(code, reason) + } + } + } + + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end(JSON.stringify({ closed: targets.length, code, path: pathFilter })) + return + } + + res.writeHead(404) + res.end('Not found') +}) + +controlServer.listen(CONTROL_PORT, () => { + console.log(`[control] HTTP on http://localhost:${CONTROL_PORT}`) +}) diff --git a/packages/sources/tiingo/test/local-ws-test/test-failover.sh b/packages/sources/tiingo/test/local-ws-test/test-failover.sh new file mode 100755 index 00000000000..48edb43e822 --- /dev/null +++ b/packages/sources/tiingo/test/local-ws-test/test-failover.sh @@ -0,0 +1,270 @@ +#!/bin/bash +# Manual integration test: WS failover across all Tiingo transports. +# +# Starts two WS proxies and the Tiingo EA locally, then triggers abnormal +# WS closures across 6 rounds to verify the 2:1 failover cycle for crypto, +# crypto-lwba, and forex. IEX is expected to always stay on primary. +# +# Cycle math with ratio 2:1 (cycleLength=3): +# counter cycle URL +# 0 0 primary +# 1 1 primary +# 2 2 SECONDARY ← failover +# 3 0 primary ← failback +# 4 1 primary +# 5 2 SECONDARY ← failover again +# 6 0 primary ← failback again +# +# Usage: +# export TIINGO_API_KEY= +# bash test/manual/test-failover.sh +# +# Optional overrides (defaults shown): +# EA_PORT=8181 +# PRIMARY_ATTEMPTS=2 SECONDARY_ATTEMPTS=1 +# PRIMARY_PORT=9001 PRIMARY_CTRL=9002 +# SECONDARY_PORT=9003 SECONDARY_CTRL=9004 + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +EA_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" # test/manual → test → tiingo package root + +EA_PORT="${EA_PORT:-8181}" +PRIMARY_PORT="${PRIMARY_PORT:-9001}" +PRIMARY_CTRL="${PRIMARY_CTRL:-9002}" +SECONDARY_PORT="${SECONDARY_PORT:-9003}" +SECONDARY_CTRL="${SECONDARY_CTRL:-9004}" +PRIMARY_ATTEMPTS="${PRIMARY_ATTEMPTS:-2}" +SECONDARY_ATTEMPTS="${SECONDARY_ATTEMPTS:-1}" +CYCLE_LENGTH=$(( PRIMARY_ATTEMPTS + SECONDARY_ATTEMPTS )) +API_KEY="${TIINGO_API_KEY:?Please export TIINGO_API_KEY}" + +# ── helpers ─────────────────────────────────────────────────────────────────── + +RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; BOLD='\033[1m'; RESET='\033[0m' +pass() { echo -e " ${GREEN}PASS${RESET} $*"; } +fail() { echo -e " ${RED}FAIL${RESET} $*"; FAILURES=$(( FAILURES + 1 )); } +info() { echo -e " ${YELLOW}$*${RESET}"; } + +FAILURES=0 + +status_json() { + curl -s "http://localhost:$1/status" 2>/dev/null +} + +connections() { + local label=$1 ctrl=$2 + local out count paths + out=$(status_json "$ctrl") + count=$(echo "$out" | python3 -c "import sys,json; print(json.load(sys.stdin)['openConnections'])" 2>/dev/null || echo "?") + paths=$(echo "$out" | python3 -c "import sys,json; print(' '.join(c['path'] for c in json.load(sys.stdin)['connections']))" 2>/dev/null || echo "") + printf " %-12s %s conn %s\n" "[$label]" "$count" "$paths" + echo "$count" # return value on last line +} + +conn_count() { + local ctrl=$1 + status_json "$ctrl" | python3 -c "import sys,json; print(json.load(sys.stdin)['openConnections'])" 2>/dev/null || echo "0" +} + +close_proxy() { + local label=$1 ctrl=$2 + local result count + result=$(curl -s -X POST "http://localhost:$ctrl/close?code=1005" 2>/dev/null) + count=$(echo "$result" | python3 -c "import sys,json; print(json.load(sys.stdin)['closed'])" 2>/dev/null || echo "?") + echo " close $label → $count connection(s) terminated" +} + +prices() { + local iex crypto lwba forex + iex=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ + -d '{"data":{"endpoint":"iex","base":"AAPL"}}' \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('result','ERR'))" 2>/dev/null) + crypto=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ + -d '{"data":{"endpoint":"crypto","base":"BTC","quote":"USD"}}' \ + | python3 -c "import sys,json; d=json.load(sys.stdin); r=d.get('result',None); print(round(float(r),2) if r else 'ERR')" 2>/dev/null) + lwba=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ + -d '{"data":{"endpoint":"crypto-lwba","base":"ETH","quote":"USD"}}' \ + | python3 -c "import sys,json; d=json.load(sys.stdin); dd=d.get('data',{}); print('mid='+str(round(dd['mid'],2))+' bid='+str(round(dd['bid'],2))+' ask='+str(round(dd['ask'],2))) if 'mid' in dd else print('ERR:'+str(d.get('error','no data')))" 2>/dev/null) + forex=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ + -d '{"data":{"endpoint":"forex","base":"EUR","quote":"USD"}}' \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('result','ERR'))" 2>/dev/null) + echo " prices: iex(AAPL)=$iex crypto(BTC)=$crypto lwba(ETH)=$lwba forex(EUR)=$forex" +} + +predict() { + local counter=$1 cycle=$(( $1 % CYCLE_LENGTH )) + (( cycle < PRIMARY_ATTEMPTS )) && echo "primary" || echo "SECONDARY" +} + +cleanup() { + lsof -ti:$PRIMARY_PORT,$PRIMARY_CTRL,$SECONDARY_PORT,$SECONDARY_CTRL,$EA_PORT \ + | xargs kill -9 2>/dev/null || true +} +trap 'echo ""; echo "==> Cleaning up..."; cleanup' EXIT + +# ── startup ─────────────────────────────────────────────────────────────────── + +echo "" +echo -e "${BOLD}==> WS Failover Test (ratio ${PRIMARY_ATTEMPTS}:${SECONDARY_ATTEMPTS}, cycleLength=$CYCLE_LENGTH)${RESET}" +echo "" +echo "==> Killing any existing processes on required ports..." +cleanup +sleep 1 + +PROXY_JS="$SCRIPT_DIR/proxy.js" +MONOREPO_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)" + +echo "==> Starting PRIMARY proxy (port $PRIMARY_PORT → wss://api.tiingo.com)..." +UPSTREAM_WS_URL=wss://api.tiingo.com PROXY_PORT=$PRIMARY_PORT CONTROL_PORT=$PRIMARY_CTRL \ + yarn --cwd "$MONOREPO_ROOT" node "$PROXY_JS" > /tmp/proxy-primary.log 2>&1 & + +echo "==> Starting SECONDARY proxy (port $SECONDARY_PORT → wss://api.redundantstack.com)..." +UPSTREAM_WS_URL=wss://api.redundantstack.com PROXY_PORT=$SECONDARY_PORT CONTROL_PORT=$SECONDARY_CTRL \ + yarn --cwd "$MONOREPO_ROOT" node "$PROXY_JS" > /tmp/proxy-secondary.log 2>&1 & +sleep 2 + +echo "==> Starting Tiingo EA (port $EA_PORT)..." +cd "$EA_DIR" +EA_PORT=$EA_PORT \ + API_KEY=$API_KEY \ + WS_API_ENDPOINT=ws://localhost:$PRIMARY_PORT \ + SECONDARY_WS_API_ENDPOINT=ws://localhost:$SECONDARY_PORT \ + WS_URL_PRIMARY_ATTEMPTS=$PRIMARY_ATTEMPTS \ + WS_URL_SECONDARY_ATTEMPTS=$SECONDARY_ATTEMPTS \ + WS_SUBSCRIPTION_UNRESPONSIVE_TTL=180000 \ + LOG_LEVEL=debug \ + yarn server:dist > /tmp/tiingo-ea.log 2>&1 & +EA_PID=$! + +echo -n "==> Waiting for EA" +for i in $(seq 1 30); do + curl -s "http://localhost:$EA_PORT" > /dev/null 2>&1 && echo " ready." && break + echo -n "."; sleep 1 +done + +# ── subscribe ───────────────────────────────────────────────────────────────── + +echo "==> Subscribing to all four transports..." +for req in \ + '{"data":{"endpoint":"iex","base":"AAPL"}}' \ + '{"data":{"endpoint":"crypto","base":"BTC","quote":"USD"}}' \ + '{"data":{"endpoint":"crypto-lwba","base":"ETH","quote":"USD"}}' \ + '{"data":{"endpoint":"forex","base":"EUR","quote":"USD"}}'; do + curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" -d "$req" > /dev/null +done +echo " iex/AAPL crypto/BTC crypto-lwba/ETH forex/EUR" +echo -n "==> Waiting for initial data"; sleep 10; echo " done." + +# ── check/assert helpers ────────────────────────────────────────────────────── + +assert_round() { + local round=$1 expected=$2 + local pri sec + pri=$(conn_count $PRIMARY_CTRL) + sec=$(conn_count $SECONDARY_CTRL) + + if [[ "$expected" == "primary" ]]; then + # 4 on primary, 0 on secondary, IEX on primary + local iex_on_pri + iex_on_pri=$(status_json $PRIMARY_CTRL | python3 -c \ + "import sys,json; cs=json.load(sys.stdin)['connections']; print(any(c['path']=='/iex' for c in cs))" 2>/dev/null) + if [[ "$pri" == "4" && "$sec" == "0" ]]; then + pass "Round $round: all on primary (pri=$pri, sec=$sec)" + else + fail "Round $round: expected all primary, got pri=$pri sec=$sec" + fi + if [[ "$iex_on_pri" == "True" ]]; then + pass "Round $round: IEX on primary" + else + fail "Round $round: IEX not on primary!" + fi + else + # 1 on primary (IEX only), 3 on secondary + local iex_on_pri + iex_on_pri=$(status_json $PRIMARY_CTRL | python3 -c \ + "import sys,json; cs=json.load(sys.stdin)['connections']; print(any(c['path']=='/iex' for c in cs))" 2>/dev/null) + if [[ "$pri" == "1" && "$sec" == "3" ]]; then + pass "Round $round: failover to secondary (pri=$pri[IEX], sec=$sec)" + else + fail "Round $round: expected pri=1 sec=3, got pri=$pri sec=$sec" + fi + if [[ "$iex_on_pri" == "True" ]]; then + pass "Round $round: IEX stayed on primary (not affected by failover)" + else + fail "Round $round: IEX not on primary!" + fi + fi +} + +# ── baseline ────────────────────────────────────────────────────────────────── + +echo "" +echo -e "${BOLD}── BASELINE (counter=0, all→primary expected) ──────────────${RESET}" +connections "primary" $PRIMARY_CTRL > /dev/null +connections "secondary" $SECONDARY_CTRL > /dev/null +pri=$(conn_count $PRIMARY_CTRL); sec=$(conn_count $SECONDARY_CTRL) +printf " primary=%s secondary=%s\n" "$pri" "$sec" +prices +[[ "$pri" == "4" && "$sec" == "0" ]] \ + && pass "Baseline: all 4 connections on primary" \ + || fail "Baseline: expected 4 on primary, got pri=$pri sec=$sec" + +# ── rounds ──────────────────────────────────────────────────────────────────── + +COUNTER=0 + +run_round() { + local round=$1 proxy_label=$2 proxy_ctrl=$3 + COUNTER=$(( COUNTER + 1 )) + local expected; expected=$(predict $COUNTER) + + echo "" + echo -e "${BOLD}── ROUND $round close $proxy_label → counter=$COUNTER, expect: $expected ──${RESET}" + close_proxy "$proxy_label" "$proxy_ctrl" + echo -n " waiting for reconnection"; sleep 6; echo " done." + + pri=$(conn_count $PRIMARY_CTRL); sec=$(conn_count $SECONDARY_CTRL) + printf " primary=%s secondary=%s\n" "$pri" "$sec" + prices + assert_round "$round" "$expected" +} + +# Round 1: close primary → counter=1, cycle=1 → primary +run_round 1 primary $PRIMARY_CTRL +# Round 2: close primary → counter=2, cycle=2 → SECONDARY +run_round 2 primary $PRIMARY_CTRL +# Round 3: close secondary → counter=3, cycle=0 → primary (failback) +run_round 3 secondary $SECONDARY_CTRL +# Round 4: close primary → counter=4, cycle=1 → primary +run_round 4 primary $PRIMARY_CTRL +# Round 5: close primary → counter=5, cycle=2 → SECONDARY +run_round 5 primary $PRIMARY_CTRL +# Round 6: close secondary → counter=6, cycle=0 → primary (failback) +run_round 6 secondary $SECONDARY_CTRL + +# ── final summary ───────────────────────────────────────────────────────────── + +echo "" +echo -e "${BOLD}── FINAL SUMMARY ─────────────────────────────────────────────${RESET}" +echo " Abnormal close events detected:" +grep -c "abnormal" /tmp/tiingo-ea.log 2>/dev/null | xargs printf " %s total abnormal close(s)\n" +echo " Secondary URL selections:" +grep -c "using secondary" /tmp/tiingo-ea.log 2>/dev/null | xargs printf " %s wsSelectUrl→secondary call(s)\n" +echo " Final connections:" +pri=$(conn_count $PRIMARY_CTRL); sec=$(conn_count $SECONDARY_CTRL) +printf " primary=%s secondary=%s\n" "$pri" "$sec" +prices + +echo "" +if [[ $FAILURES -eq 0 ]]; then + echo -e " ${GREEN}${BOLD}All assertions passed.${RESET}" +else + echo -e " ${RED}${BOLD}$FAILURES assertion(s) failed.${RESET}" +fi + +echo "" +echo "==> Done. EA log: /tmp/tiingo-ea.log Proxy logs: /tmp/proxy-{primary,secondary}.log" +echo " Ctrl+C to stop." +wait $EA_PID From 4fcc903419d53bcef43f3be39a212a4a1e53f38f Mon Sep 17 00:00:00 2001 From: cawthorne Date: Mon, 9 Mar 2026 19:02:48 +0000 Subject: [PATCH 2/2] Add 3 assets per transport and use adaptive baseline --- .../tiingo/test/local-ws-test/README.md | 4 +- .../test/local-ws-test/test-failover.sh | 349 ++++++++++-------- 2 files changed, 208 insertions(+), 145 deletions(-) diff --git a/packages/sources/tiingo/test/local-ws-test/README.md b/packages/sources/tiingo/test/local-ws-test/README.md index 0ebba458fbe..21494806423 100644 --- a/packages/sources/tiingo/test/local-ws-test/README.md +++ b/packages/sources/tiingo/test/local-ws-test/README.md @@ -31,7 +31,9 @@ abnormal closures via the proxy control API. export TIINGO_API_KEY= ``` -3. `python3` must be on your PATH (used for parsing JSON responses in the test script). +3. `curl`, `python3`, and `npm` must be on your PATH. + On first run the script installs the `ws` package into `/tmp/tiingo-proxy-modules` + (outside the repo) so the proxy can run without interfering with Yarn PnP. ## Running diff --git a/packages/sources/tiingo/test/local-ws-test/test-failover.sh b/packages/sources/tiingo/test/local-ws-test/test-failover.sh index 48edb43e822..40ef2f54810 100755 --- a/packages/sources/tiingo/test/local-ws-test/test-failover.sh +++ b/packages/sources/tiingo/test/local-ws-test/test-failover.sh @@ -1,23 +1,21 @@ #!/bin/bash -# Manual integration test: WS failover across all Tiingo transports. +# Local WS integration test: WS failover across all Tiingo transports. # # Starts two WS proxies and the Tiingo EA locally, then triggers abnormal -# WS closures across 6 rounds to verify the 2:1 failover cycle for crypto, -# crypto-lwba, and forex. IEX is expected to always stay on primary. +# WS closures to verify: +# 1. Repeated primary closes eventually push non-IEX transports to secondary +# 2. IEX always stays on primary (it bypasses wsSelectUrl) +# 3. Closing secondary brings all transports back to primary +# 4. Data continues to flow after every reconnection +# 5. The full cycle repeats reliably # -# Cycle math with ratio 2:1 (cycleLength=3): -# counter cycle URL -# 0 0 primary -# 1 1 primary -# 2 2 SECONDARY ← failover -# 3 0 primary ← failback -# 4 1 primary -# 5 2 SECONDARY ← failover again -# 6 0 primary ← failback again +# The test is adaptive — it closes connections until the expected state +# transition is observed, rather than relying on exact counter values, +# which can be desynchronised by startup timing. # # Usage: # export TIINGO_API_KEY= -# bash test/manual/test-failover.sh +# bash test/local-ws-test/test-failover.sh # # Optional overrides (defaults shown): # EA_PORT=8181 @@ -25,10 +23,10 @@ # PRIMARY_PORT=9001 PRIMARY_CTRL=9002 # SECONDARY_PORT=9003 SECONDARY_CTRL=9004 -set -euo pipefail +set -u SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -EA_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" # test/manual → test → tiingo package root +EA_DIR="$(cd "$SCRIPT_DIR/../.." && pwd)" EA_PORT="${EA_PORT:-8181}" PRIMARY_PORT="${PRIMARY_PORT:-9001}" @@ -45,27 +43,22 @@ API_KEY="${TIINGO_API_KEY:?Please export TIINGO_API_KEY}" RED='\033[0;31m'; GREEN='\033[0;32m'; YELLOW='\033[1;33m'; BOLD='\033[1m'; RESET='\033[0m' pass() { echo -e " ${GREEN}PASS${RESET} $*"; } fail() { echo -e " ${RED}FAIL${RESET} $*"; FAILURES=$(( FAILURES + 1 )); } -info() { echo -e " ${YELLOW}$*${RESET}"; } FAILURES=0 -status_json() { - curl -s "http://localhost:$1/status" 2>/dev/null +status_json() { curl -s "http://localhost:$1/status" 2>/dev/null; } + +conn_count() { + status_json "$1" | python3 -c "import sys,json; print(json.load(sys.stdin)['openConnections'])" 2>/dev/null || echo "0" } -connections() { - local label=$1 ctrl=$2 - local out count paths - out=$(status_json "$ctrl") - count=$(echo "$out" | python3 -c "import sys,json; print(json.load(sys.stdin)['openConnections'])" 2>/dev/null || echo "?") - paths=$(echo "$out" | python3 -c "import sys,json; print(' '.join(c['path'] for c in json.load(sys.stdin)['connections']))" 2>/dev/null || echo "") - printf " %-12s %s conn %s\n" "[$label]" "$count" "$paths" - echo "$count" # return value on last line +conn_paths() { + status_json "$1" | python3 -c "import sys,json; print(' '.join(sorted(c['path'] for c in json.load(sys.stdin)['connections'])))" 2>/dev/null || echo "" } -conn_count() { - local ctrl=$1 - status_json "$ctrl" | python3 -c "import sys,json; print(json.load(sys.stdin)['openConnections'])" 2>/dev/null || echo "0" +iex_on_primary() { + status_json "$PRIMARY_CTRL" | python3 -c \ + "import sys,json; cs=json.load(sys.stdin)['connections']; print(any(c['path']=='/iex' for c in cs))" 2>/dev/null || echo "False" } close_proxy() { @@ -73,29 +66,38 @@ close_proxy() { local result count result=$(curl -s -X POST "http://localhost:$ctrl/close?code=1005" 2>/dev/null) count=$(echo "$result" | python3 -c "import sys,json; print(json.load(sys.stdin)['closed'])" 2>/dev/null || echo "?") - echo " close $label → $count connection(s) terminated" + echo " closed $label ($count connection(s))" +} + +price_simple() { + curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" -d "$1" \ + | python3 -c "import sys,json; d=json.load(sys.stdin); r=d.get('result',None); print(round(float(r),2) if r else 'ERR')" 2>/dev/null +} +price_lwba() { + curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" -d "$1" \ + | python3 -c "import sys,json; d=json.load(sys.stdin); dd=d.get('data',{}); print('mid='+str(round(dd['mid'],2))) if 'mid' in dd else print('ERR')" 2>/dev/null } prices() { - local iex crypto lwba forex - iex=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ - -d '{"data":{"endpoint":"iex","base":"AAPL"}}' \ - | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('result','ERR'))" 2>/dev/null) - crypto=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ - -d '{"data":{"endpoint":"crypto","base":"BTC","quote":"USD"}}' \ - | python3 -c "import sys,json; d=json.load(sys.stdin); r=d.get('result',None); print(round(float(r),2) if r else 'ERR')" 2>/dev/null) - lwba=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ - -d '{"data":{"endpoint":"crypto-lwba","base":"ETH","quote":"USD"}}' \ - | python3 -c "import sys,json; d=json.load(sys.stdin); dd=d.get('data',{}); print('mid='+str(round(dd['mid'],2))+' bid='+str(round(dd['bid'],2))+' ask='+str(round(dd['ask'],2))) if 'mid' in dd else print('ERR:'+str(d.get('error','no data')))" 2>/dev/null) - forex=$(curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" \ - -d '{"data":{"endpoint":"forex","base":"EUR","quote":"USD"}}' \ - | python3 -c "import sys,json; d=json.load(sys.stdin); print(d.get('result','ERR'))" 2>/dev/null) - echo " prices: iex(AAPL)=$iex crypto(BTC)=$crypto lwba(ETH)=$lwba forex(EUR)=$forex" + echo " iex: AAPL=$(price_simple '{"data":{"endpoint":"iex","base":"AAPL"}}') MSFT=$(price_simple '{"data":{"endpoint":"iex","base":"MSFT"}}') GOOG=$(price_simple '{"data":{"endpoint":"iex","base":"GOOG"}}')" + echo " crypto: BTC=$(price_simple '{"data":{"endpoint":"crypto","base":"BTC","quote":"USD"}}') ETH=$(price_simple '{"data":{"endpoint":"crypto","base":"ETH","quote":"USD"}}') SOL=$(price_simple '{"data":{"endpoint":"crypto","base":"SOL","quote":"USD"}}')" + echo " lwba: BTC=$(price_lwba '{"data":{"endpoint":"crypto-lwba","base":"BTC","quote":"USD"}}') ETH=$(price_lwba '{"data":{"endpoint":"crypto-lwba","base":"ETH","quote":"USD"}}') SOL=$(price_lwba '{"data":{"endpoint":"crypto-lwba","base":"SOL","quote":"USD"}}')" + echo " forex: EUR=$(price_simple '{"data":{"endpoint":"forex","base":"EUR","quote":"USD"}}') GBP=$(price_simple '{"data":{"endpoint":"forex","base":"GBP","quote":"USD"}}') JPY=$(price_simple '{"data":{"endpoint":"forex","base":"JPY","quote":"USD"}}')" } -predict() { - local counter=$1 cycle=$(( $1 % CYCLE_LENGTH )) - (( cycle < PRIMARY_ATTEMPTS )) && echo "primary" || echo "SECONDARY" +wait_state() { + local want_primary=$1 want_secondary=$2 timeout=${3:-20} + for attempt in $(seq 1 $timeout); do + p=$(conn_count $PRIMARY_CTRL) + s=$(conn_count $SECONDARY_CTRL) + if [[ "$p" == "$want_primary" && "$s" == "$want_secondary" ]]; then + echo " state reached: primary=$p secondary=$s (${attempt}s)" + return 0 + fi + sleep 1 + done + echo " timeout: primary=$p secondary=$s (wanted primary=$want_primary secondary=$want_secondary)" + return 1 } cleanup() { @@ -113,16 +115,22 @@ echo "==> Killing any existing processes on required ports..." cleanup sleep 1 -PROXY_JS="$SCRIPT_DIR/proxy.js" -MONOREPO_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)" +PROXY_WORK="/tmp/tiingo-proxy" +mkdir -p "$PROXY_WORK" +cp "$SCRIPT_DIR/proxy.js" "$PROXY_WORK/proxy.js" +if [[ ! -d "$PROXY_WORK/node_modules/ws" ]]; then + echo "==> Installing ws package for proxy (one-time, in $PROXY_WORK)..." + (cd "$PROXY_WORK" && npm init -y --silent 2>/dev/null && npm install --silent ws 2>/dev/null) +fi -echo "==> Starting PRIMARY proxy (port $PRIMARY_PORT → wss://api.tiingo.com)..." +echo "==> Starting PRIMARY proxy (port $PRIMARY_PORT -> wss://api.tiingo.com)..." +cd "$PROXY_WORK" UPSTREAM_WS_URL=wss://api.tiingo.com PROXY_PORT=$PRIMARY_PORT CONTROL_PORT=$PRIMARY_CTRL \ - yarn --cwd "$MONOREPO_ROOT" node "$PROXY_JS" > /tmp/proxy-primary.log 2>&1 & + node proxy.js > /tmp/proxy-primary.log 2>&1 & -echo "==> Starting SECONDARY proxy (port $SECONDARY_PORT → wss://api.redundantstack.com)..." +echo "==> Starting SECONDARY proxy (port $SECONDARY_PORT -> wss://api.redundantstack.com)..." UPSTREAM_WS_URL=wss://api.redundantstack.com PROXY_PORT=$SECONDARY_PORT CONTROL_PORT=$SECONDARY_CTRL \ - yarn --cwd "$MONOREPO_ROOT" node "$PROXY_JS" > /tmp/proxy-secondary.log 2>&1 & + node proxy.js > /tmp/proxy-secondary.log 2>&1 & sleep 2 echo "==> Starting Tiingo EA (port $EA_PORT)..." @@ -146,117 +154,170 @@ done # ── subscribe ───────────────────────────────────────────────────────────────── -echo "==> Subscribing to all four transports..." +echo "==> Subscribing to all four transports (3 assets each)..." for req in \ '{"data":{"endpoint":"iex","base":"AAPL"}}' \ + '{"data":{"endpoint":"iex","base":"MSFT"}}' \ + '{"data":{"endpoint":"iex","base":"GOOG"}}' \ '{"data":{"endpoint":"crypto","base":"BTC","quote":"USD"}}' \ + '{"data":{"endpoint":"crypto","base":"ETH","quote":"USD"}}' \ + '{"data":{"endpoint":"crypto","base":"SOL","quote":"USD"}}' \ + '{"data":{"endpoint":"crypto-lwba","base":"BTC","quote":"USD"}}' \ '{"data":{"endpoint":"crypto-lwba","base":"ETH","quote":"USD"}}' \ - '{"data":{"endpoint":"forex","base":"EUR","quote":"USD"}}'; do + '{"data":{"endpoint":"crypto-lwba","base":"SOL","quote":"USD"}}' \ + '{"data":{"endpoint":"forex","base":"EUR","quote":"USD"}}' \ + '{"data":{"endpoint":"forex","base":"GBP","quote":"USD"}}' \ + '{"data":{"endpoint":"forex","base":"JPY","quote":"USD"}}'; do curl -s -X POST "http://localhost:$EA_PORT" -H "Content-Type: application/json" -d "$req" > /dev/null done -echo " iex/AAPL crypto/BTC crypto-lwba/ETH forex/EUR" -echo -n "==> Waiting for initial data"; sleep 10; echo " done." - -# ── check/assert helpers ────────────────────────────────────────────────────── - -assert_round() { - local round=$1 expected=$2 - local pri sec - pri=$(conn_count $PRIMARY_CTRL) - sec=$(conn_count $SECONDARY_CTRL) - - if [[ "$expected" == "primary" ]]; then - # 4 on primary, 0 on secondary, IEX on primary - local iex_on_pri - iex_on_pri=$(status_json $PRIMARY_CTRL | python3 -c \ - "import sys,json; cs=json.load(sys.stdin)['connections']; print(any(c['path']=='/iex' for c in cs))" 2>/dev/null) - if [[ "$pri" == "4" && "$sec" == "0" ]]; then - pass "Round $round: all on primary (pri=$pri, sec=$sec)" - else - fail "Round $round: expected all primary, got pri=$pri sec=$sec" - fi - if [[ "$iex_on_pri" == "True" ]]; then - pass "Round $round: IEX on primary" - else - fail "Round $round: IEX not on primary!" - fi - else - # 1 on primary (IEX only), 3 on secondary - local iex_on_pri - iex_on_pri=$(status_json $PRIMARY_CTRL | python3 -c \ - "import sys,json; cs=json.load(sys.stdin)['connections']; print(any(c['path']=='/iex' for c in cs))" 2>/dev/null) - if [[ "$pri" == "1" && "$sec" == "3" ]]; then - pass "Round $round: failover to secondary (pri=$pri[IEX], sec=$sec)" - else - fail "Round $round: expected pri=1 sec=3, got pri=$pri sec=$sec" - fi - if [[ "$iex_on_pri" == "True" ]]; then - pass "Round $round: IEX stayed on primary (not affected by failover)" - else - fail "Round $round: IEX not on primary!" - fi +echo " iex: AAPL MSFT GOOG | crypto: BTC ETH SOL | lwba: BTC ETH SOL | forex: EUR GBP JPY" + +# ── establish baseline ──────────────────────────────────────────────────────── +# The framework's lastMessageReceivedAt starts at 0, which may cause a spurious +# "unresponsive" counter bump at startup and push some transports to secondary. +# We handle this by closing any stray secondary connections until everything +# stabilises on primary. + +echo "==> Establishing baseline (all 4 on primary)..." +for reset_attempt in $(seq 1 5); do + sleep 4 + p=$(conn_count $PRIMARY_CTRL) + s=$(conn_count $SECONDARY_CTRL) + if [[ "$p" == "4" && "$s" == "0" ]]; then + echo " baseline reached: primary=$p secondary=$s" + break fi -} + echo " current: primary=$p secondary=$s — resetting stray connections..." + if [[ "$s" != "0" ]]; then + close_proxy secondary $SECONDARY_CTRL + fi + if [[ "$p" != "0" ]] && [[ "$p" != "4" ]]; then + # Some transports haven't connected yet, or some are connecting to wrong proxy + # Close primary too to force a full reconnect cycle + close_proxy primary $PRIMARY_CTRL + fi + sleep 4 +done +sleep 3 -# ── baseline ────────────────────────────────────────────────────────────────── +# ── PHASE 1: Baseline check ────────────────────────────────────────────────── echo "" -echo -e "${BOLD}── BASELINE (counter=0, all→primary expected) ──────────────${RESET}" -connections "primary" $PRIMARY_CTRL > /dev/null -connections "secondary" $SECONDARY_CTRL > /dev/null -pri=$(conn_count $PRIMARY_CTRL); sec=$(conn_count $SECONDARY_CTRL) -printf " primary=%s secondary=%s\n" "$pri" "$sec" +echo -e "${BOLD}── PHASE 1: Baseline ──${RESET}" +p=$(conn_count $PRIMARY_CTRL); s=$(conn_count $SECONDARY_CTRL) +echo " primary=$p secondary=$s primary_paths=[$(conn_paths $PRIMARY_CTRL)]" prices -[[ "$pri" == "4" && "$sec" == "0" ]] \ - && pass "Baseline: all 4 connections on primary" \ - || fail "Baseline: expected 4 on primary, got pri=$pri sec=$sec" +if [[ "$p" == "4" && "$s" == "0" ]]; then + pass "All 4 transports on primary" +else + fail "Expected primary=4 secondary=0, got primary=$p secondary=$s" +fi -# ── rounds ──────────────────────────────────────────────────────────────────── +# ── PHASE 2: Push non-IEX transports to secondary ──────────────────────────── -COUNTER=0 +echo "" +echo -e "${BOLD}── PHASE 2: Close primary repeatedly until non-IEX transports fail over to secondary ──${RESET}" +failover_reached=false +for attempt in $(seq 1 8); do + close_proxy primary $PRIMARY_CTRL + sleep 6 + p=$(conn_count $PRIMARY_CTRL); s=$(conn_count $SECONDARY_CTRL) + echo " attempt $attempt: primary=$p secondary=$s" + if [[ "$s" -ge 3 ]]; then + failover_reached=true + break + fi +done +if $failover_reached; then + pass "Non-IEX transports moved to secondary after $attempt close(s)" +else + fail "Non-IEX transports never reached secondary after 8 closes" +fi -run_round() { - local round=$1 proxy_label=$2 proxy_ctrl=$3 - COUNTER=$(( COUNTER + 1 )) - local expected; expected=$(predict $COUNTER) +iex_check=$(iex_on_primary) +[[ "$iex_check" == "True" ]] \ + && pass "IEX stayed on primary during failover" \ + || fail "IEX not on primary after failover!" - echo "" - echo -e "${BOLD}── ROUND $round close $proxy_label → counter=$COUNTER, expect: $expected ──${RESET}" - close_proxy "$proxy_label" "$proxy_ctrl" - echo -n " waiting for reconnection"; sleep 6; echo " done." +sec_paths=$(conn_paths $SECONDARY_CTRL) +if echo "$sec_paths" | python3 -c "import sys; p=set(sys.stdin.read().strip().split()); exit(0 if {'/crypto-synth','/crypto-synth-top','/fx'}<=p else 1)" 2>/dev/null; then + pass "Secondary has crypto, lwba, forex ($sec_paths)" +else + fail "Expected crypto/lwba/forex on secondary, got: $sec_paths" +fi - pri=$(conn_count $PRIMARY_CTRL); sec=$(conn_count $SECONDARY_CTRL) - printf " primary=%s secondary=%s\n" "$pri" "$sec" - prices - assert_round "$round" "$expected" -} +echo " prices after failover:" +prices + +# ── PHASE 3: Failback to primary ───────────────────────────────────────────── + +echo "" +echo -e "${BOLD}── PHASE 3: Close secondary to trigger failback to primary ──${RESET}" +close_proxy secondary $SECONDARY_CTRL +wait_state 4 0 15 +p=$(conn_count $PRIMARY_CTRL); s=$(conn_count $SECONDARY_CTRL) +if [[ "$p" == "4" && "$s" == "0" ]]; then + pass "All 4 back on primary after failback" +else + fail "Expected all on primary, got primary=$p secondary=$s" +fi +echo " prices after failback:" +prices + +# ── PHASE 4: Second cycle — push to secondary again ────────────────────────── + +echo "" +echo -e "${BOLD}── PHASE 4: Second cycle — close primary until failover ──${RESET}" +failover_reached=false +for attempt in $(seq 1 8); do + close_proxy primary $PRIMARY_CTRL + sleep 6 + p=$(conn_count $PRIMARY_CTRL); s=$(conn_count $SECONDARY_CTRL) + echo " attempt $attempt: primary=$p secondary=$s" + if [[ "$s" -ge 3 ]]; then + failover_reached=true + break + fi +done +if $failover_reached; then + pass "Non-IEX transports moved to secondary (2nd cycle) after $attempt close(s)" +else + fail "Non-IEX transports never reached secondary (2nd cycle)" +fi + +iex_check=$(iex_on_primary) +[[ "$iex_check" == "True" ]] \ + && pass "IEX stayed on primary (2nd cycle)" \ + || fail "IEX not on primary (2nd cycle)!" + +echo " prices after 2nd failover:" +prices -# Round 1: close primary → counter=1, cycle=1 → primary -run_round 1 primary $PRIMARY_CTRL -# Round 2: close primary → counter=2, cycle=2 → SECONDARY -run_round 2 primary $PRIMARY_CTRL -# Round 3: close secondary → counter=3, cycle=0 → primary (failback) -run_round 3 secondary $SECONDARY_CTRL -# Round 4: close primary → counter=4, cycle=1 → primary -run_round 4 primary $PRIMARY_CTRL -# Round 5: close primary → counter=5, cycle=2 → SECONDARY -run_round 5 primary $PRIMARY_CTRL -# Round 6: close secondary → counter=6, cycle=0 → primary (failback) -run_round 6 secondary $SECONDARY_CTRL - -# ── final summary ───────────────────────────────────────────────────────────── +# ── PHASE 5: Second failback ───────────────────────────────────────────────── echo "" -echo -e "${BOLD}── FINAL SUMMARY ─────────────────────────────────────────────${RESET}" -echo " Abnormal close events detected:" -grep -c "abnormal" /tmp/tiingo-ea.log 2>/dev/null | xargs printf " %s total abnormal close(s)\n" -echo " Secondary URL selections:" -grep -c "using secondary" /tmp/tiingo-ea.log 2>/dev/null | xargs printf " %s wsSelectUrl→secondary call(s)\n" -echo " Final connections:" -pri=$(conn_count $PRIMARY_CTRL); sec=$(conn_count $SECONDARY_CTRL) -printf " primary=%s secondary=%s\n" "$pri" "$sec" +echo -e "${BOLD}── PHASE 5: Second failback ──${RESET}" +close_proxy secondary $SECONDARY_CTRL +wait_state 4 0 15 +p=$(conn_count $PRIMARY_CTRL); s=$(conn_count $SECONDARY_CTRL) +if [[ "$p" == "4" && "$s" == "0" ]]; then + pass "All 4 back on primary (2nd failback)" +else + fail "Expected all on primary, got primary=$p secondary=$s" +fi +echo " prices after 2nd failback:" prices +# ── FINAL SUMMARY ───────────────────────────────────────────────────────────── + +echo "" +echo -e "${BOLD}── SUMMARY ──${RESET}" +abnormal_count=$(grep -c "abnormal" /tmp/tiingo-ea.log 2>/dev/null || echo "0") +secondary_count=$(grep -c "using secondary" /tmp/tiingo-ea.log 2>/dev/null || echo "0") +echo " EA log: $abnormal_count abnormal close(s), $secondary_count secondary URL selection(s)" +p=$(conn_count $PRIMARY_CTRL); s=$(conn_count $SECONDARY_CTRL) +echo " final: primary=$p secondary=$s" + echo "" if [[ $FAILURES -eq 0 ]]; then echo -e " ${GREEN}${BOLD}All assertions passed.${RESET}" @@ -265,6 +326,6 @@ else fi echo "" -echo "==> Done. EA log: /tmp/tiingo-ea.log Proxy logs: /tmp/proxy-{primary,secondary}.log" +echo "==> Logs: /tmp/tiingo-ea.log /tmp/proxy-{primary,secondary}.log" echo " Ctrl+C to stop." wait $EA_PID