From b960a6a6819ff3737d387e2842ee92a79d948a80 Mon Sep 17 00:00:00 2001 From: daywalker90 Date: Tue, 9 Jun 2026 13:37:46 +0200 Subject: [PATCH] currencyrate: propagate http errors to currencyrate rpc if a source is provided specifically coindesk was constantly hitting API rate limits causing our tests to fail so lets unit tests all endpoints with a snapshot of real responses and only allow for http error 401/429 in integration tests Also fix a flake in test_bkpr_currencyrate_persisted that would pick up a cached rate from CLN's own caching Changelog-None --- plugins/currencyrate-plugin/src/oracle.rs | 306 +++++++++++++++++----- tests/test_currencyrate.py | 161 +++--------- 2 files changed, 273 insertions(+), 194 deletions(-) diff --git a/plugins/currencyrate-plugin/src/oracle.rs b/plugins/currencyrate-plugin/src/oracle.rs index 5e53a140df8e..c2983deae405 100644 --- a/plugins/currencyrate-plugin/src/oracle.rs +++ b/plugins/currencyrate-plugin/src/oracle.rs @@ -69,59 +69,30 @@ impl Source { let currency = currency.to_uppercase(); let url = self.url(¤cy_lc, ¤cy); - let resp: Value = client + let response = client .get(&url) .send() .await - .map_err(|e| anyhow!("Failed to request url {url} caused by: {:?}", e.source()))? - .json() - .await - .map_err(|e| { - anyhow!( - "Failed to decode response body from {url}, caused by: {:?}", - e.source() - ) - })?; + .map_err(|e| anyhow!("Request failed {url}: {:?}", e.source()))?; - let reply_members = self.reply_members(¤cy_lc, ¤cy); + let status = response.status(); - let mut current = &mut resp.clone(); - for member in reply_members { - if let Ok(pos) = member.parse::() { - current = current.get_mut(pos).ok_or(anyhow!( - "Positional member `{}` not found in {} response: {}", - member, - self.name(), - resp - ))?; - } else { - current = current.get_mut(&member).ok_or(anyhow!( - "Member `{}` not found in {} response: {}", - member, - self.name(), - resp - ))?; - } - } - let price = match current { - Value::Number(number) => number - .as_f64() - .ok_or(anyhow!("Json number price could not be converted to float"))?, - Value::String(string) => string - .parse::() - .map_err(|e| anyhow!("Price string could not be converted to float: {e}"))?, - _ => return Err(anyhow!("Price is invalid json type")), - }; - - if price == 0.0 { - log::warn!("{} returned 0.0 as price for {}", self.name, currency); + if !status.is_success() { return Err(anyhow!( - "{} returned 0.0 as price for {}", - self.name, - currency + "HTTP error {status} from {url}: body={}", + response.text().await? )); } + let resp: Value = response + .json() + .await + .map_err(|e| anyhow!("Failed to parse JSON from {url}: {:?}", e.source()))?; + + let reply_members = self.reply_members(¤cy_lc, ¤cy); + + let price = extract_price_from_response(&resp, &reply_members, self.name(), ¤cy)?; + log::info!( "Fetched price in {}ms from {}: {:.2} {currency}/BTC", now.elapsed().as_millis(), @@ -133,10 +104,46 @@ impl Source { } } +fn extract_price_from_response( + resp: &Value, + reply_members: &[String], + name: &str, + currency: &str, +) -> Result { + let mut current = &mut resp.clone(); + for member in reply_members { + if let Ok(pos) = member.parse::() { + current = current.get_mut(pos).ok_or(anyhow!( + "Positional member `{member}` not found in {name} response: {resp}" + ))?; + } else { + current = current.get_mut(member).ok_or(anyhow!( + "Member `{member}` not found in {name} response: {resp}" + ))?; + } + } + let price = match current { + Value::Number(number) => number + .as_f64() + .ok_or(anyhow!("Json number price could not be converted to float"))?, + Value::String(string) => string + .parse::() + .map_err(|e| anyhow!("Price string could not be converted to float: {e}"))?, + _ => return Err(anyhow!("Price is invalid json type")), + }; + + if price == 0.0 { + log::warn!("{name} returned 0.0 as price for {currency}"); + return Err(anyhow!("{name} returned 0.0 as price for {currency}")); + } + Ok(price) +} + struct SourceHealth { source: Source, failures: u32, backoff_until: Instant, + last_error: Option, } impl SourceHealth { @@ -145,18 +152,21 @@ impl SourceHealth { source, failures: 0, backoff_until: Instant::now(), + last_error: None, } } fn mark_success(&mut self) { self.failures = 0; self.backoff_until = Instant::now(); + self.last_error = None; } - fn mark_failure(&mut self) { + fn mark_failure(&mut self, error: String) { self.failures += 1; let delay = INITIAL_BACKOFF * 2u32.pow(self.failures.min(10)); self.backoff_until = Instant::now() + delay.min(MAX_BACKOFF); + self.last_error = Some(error); } } @@ -396,8 +406,10 @@ impl BtcPriceOracle { } Err(e) => { - log::warn!("failed to get `{currency}` rate from {source_name}: {e}"); - source_health.mark_failure(); + let err_msg = + format!("failed to get `{currency}` rate from {source_name}: {e}"); + log::warn!("{err_msg}"); + source_health.mark_failure(err_msg); } } } @@ -495,8 +507,10 @@ impl BtcPriceOracle { } } Err(e) => { - log::warn!("failed to get `{currency}` rate from {name}: {e}"); - source_health.mark_failure(); + let err_msg = + format!("failed to get `{currency}` rate from {name}: {e}"); + log::warn!("{err_msg}"); + source_health.mark_failure(err_msg); } } } @@ -533,34 +547,35 @@ impl BtcPriceOracle { let inner = self.inner.lock().await; - // Give a helpful error if the source name is unknown entirely - if !inner.sources.contains_key(source_name) { - let available = inner - .sources - .keys() - .cloned() - .collect::>() - .join(", "); + let Some(source_health) = inner.sources.get(source_name) else { + // Give a helpful error if the source name is unknown entirely + let available = inner.sources.keys().cloned().collect::>().join(", "); return Err(anyhow!( "Unknown source `{source_name}`. Available sources: {available}" )); - } + }; let currency_cache = inner .currencies .get(currency) .ok_or_else(|| anyhow!("No rates available for `{currency}`"))?; - let price_cache = currency_cache - .prices - .get(source_name) - .ok_or_else(|| anyhow!( + let source_err = source_health + .last_error + .clone() + .unwrap_or_else(|| format!("{source_name} returned no result or error")); + + let price_cache = currency_cache.prices.get(source_name).ok_or_else(|| { + anyhow!( "Source `{source_name}` has no data for `{currency}`. \ - The source may not support this currency or is currently backing off." - ))?; + The source may not support this currency: {source_err}" + ) + })?; if price_cache.timestamp + SERVE_TTL <= Instant::now() { - return Err(anyhow!("Cached rate from `{source_name}` is expired")); + return Err(anyhow!( + "Cached rate from `{source_name}` is expired. Last source error: {source_err}" + )); } Ok(price_cache.price) @@ -577,3 +592,164 @@ fn get_median(source_results: Vec) -> f64 { f64::midpoint(prices[mid - 1], prices[mid]) } } + +#[test] +fn test_sources() { + use crate::add_default_sources; + use serde_json::json; + let mut sources = Vec::new(); + add_default_sources(&mut sources, false); + let mut responses = HashMap::new(); + + let coingecko_price = 72732f64; + let coingecko_response = json!({"bitcoin": {"usd": coingecko_price}}); + responses.insert("coingecko", (coingecko_response, coingecko_price)); + + let kraken_price = 72745.00000; + let kraken_response = json!({ + "error": [], + "result": { + "XXBTZUSD": { + "a": ["72745.00000", "1", "1.000"], + "b": ["72744.90000", "3", "3.000"], + "c": [kraken_price, "0.00033610"], + "v": ["299.69911717", "640.87872182"], + "p": ["73231.75442", "73435.07382"], + "t": [17317, 39634], + "l": ["72613.70000", "72613.70000"], + "h": ["73960.00000", "74070.00000"], + "o": "73569.90000", + } + }, + }); + responses.insert("kraken", (kraken_response, kraken_price)); + + let blockchain_info_price = 72749.07; + let blockchain_info_response = json!({ + "ARS": { + "15m": 1.0250388182e8, + "last": 1.0250388182e8, + "buy": 1.0250388182e8, + "sell": 1.0250388182e8, + "symbol": "ARS", + }, + "AUD": { + "15m": 101319.81, + "last": 101319.81, + "buy": 101319.81, + "sell": 101319.81, + "symbol": "AUD", + }, + "BRL": { + "15m": 366724.43, + "last": 366724.43, + "buy": 366724.43, + "sell": 366724.43, + "symbol": "BRL", + }, + "CAD": { + "15m": 100512.08, + "last": 100512.08, + "buy": 100512.08, + "sell": 100512.08, + "symbol": "CAD", + }, + "USD": { + "15m": 72749.07, + "last": blockchain_info_price, + "buy": 72749.07, + "sell": 72749.07, + "symbol": "USD", + }, + }); + responses.insert( + "blockchain.info", + (blockchain_info_response, blockchain_info_price), + ); + + let bitstamp_price = 72748.45; + let bitstamp_response = json!({ + "timestamp": "1780301548", + "open": "73568.00", + "high": "74094.65", + "low": "72611.68", + "last": bitstamp_price, + "volume": "619.76380694", + "vwap": "73542.01", + "bid": "72748.45", + "ask": "72748.46", + "side": "1", + "open_24": "73769.45", + "percent_change_24": "-1.38", + "market_type": "SPOT", + }); + responses.insert("bitstamp", (bitstamp_response, bitstamp_price)); + + let coindesk_price = 72751.6828660636; + let coindes_response = json!({ + "Data": { + "BTC-USD": { + "TYPE": "266", + "MARKET": "cadli", + "INSTRUMENT": "BTC-USD", + "CCSEQ": 1323841393, + "VALUE": coindesk_price, + "VALUE_FLAG": "UP", + "VALUE_LAST_UPDATE_TS": 1780301570, + "VALUE_LAST_UPDATE_TS_NS": 94000000, + "LAST_UPDATE_QUANTITY": 0.105, + "LAST_UPDATE_QUOTE_QUANTITY": 7642.36250656015, + "LAST_UPDATE_VOLUME_TOP_TIER": 0, + "LAST_UPDATE_QUOTE_VOLUME_TOP_TIER": 0, + "LAST_UPDATE_VOLUME_DIRECT": 0, + "LAST_UPDATE_CCSEQ": 1323894738, + "CURRENT_HOUR_VOLUME": 1574.29740546284, + "CURRENT_HOUR_QUOTE_VOLUME": 114448749.088967, + "CURRENT_HOUR_VOLUME_TOP_TIER": 815.533688797, + "CURRENT_HOUR_QUOTE_VOLUME_TOP_TIER": 59271744.4993065, + "CURRENT_YEAR_CHANGE_PERCENTAGE": -16.8891195575271, + "MOVING_24_HOUR_VOLUME": 127405.003580434, + "MOVING_24_HOUR_QUOTE_VOLUME": 9365693546.36435, + "MOVING_24_HOUR_VOLUME_TOP_TIER": 60402.3558377577, + "MOVING_24_HOUR_QUOTE_VOLUME_TOP_TIER_DIRECT": 867716024.677109, + "MOVING_24_HOUR_OPEN": 73850.0780201078, + "MOVING_7_DAY_HIGH": 77996.1623459993, + "MOVING_7_DAY_LOW": 72425.5243349043, + "MOVING_7_DAY_TOTAL_INDEX_UPDATES": 12822192, + "MOVING_7_DAY_CHANGE": -4535.8043571581, + "MOVING_365_DAY_CHANGE_PERCENTAGE": -31.154821757090602, + "LIFETIME_FIRST_UPDATE_TS": 1279408140, + "LIFETIME_QUOTE_VOLUME_TOP_TIER_DIRECT": 4851586542024.82, + "LIFETIME_OPEN": 0.04951, + "LIFETIME_LOW": 0.01, + "LIFETIME_LOW_TS": 1286572500, + "LIFETIME_TOTAL_INDEX_UPDATES": 1329464334, + "LIFETIME_CHANGE": 72751.6333560636, + "LIFETIME_CHANGE_PERCENTAGE": 146943311.16151, + } + }, + "Err": {}, + }); + responses.insert("coindesk", (coindes_response, coindesk_price)); + + let coinbase_price = 72760.125; + let coinbase_response = + json!({"data": {"amount": coinbase_price, "base": "BTC", "currency": "USD"}}); + responses.insert("coinbase", (coinbase_response, coinbase_price)); + + let binance_price = 72715.35000000; + let binance_response = json!({"symbol": "BTCUSD", "price": binance_price}); + responses.insert("binance", (binance_response, binance_price)); + + for source in &sources { + let (response, price) = responses.get(source.name()).unwrap(); + let extracted_price = extract_price_from_response( + response, + &source.reply_members("usd", "USD"), + &source.name, + "USD", + ) + .unwrap(); + assert_eq!(extracted_price, *price, "Failed for {}", source.name()); + } +} diff --git a/tests/test_currencyrate.py b/tests/test_currencyrate.py index 94a44f103ff6..fb684d2cc7c8 100644 --- a/tests/test_currencyrate.py +++ b/tests/test_currencyrate.py @@ -5,7 +5,7 @@ from utils import wait_for, only_one from pyln.client import RpcError from fixtures import * # noqa: F401,F403 -from flask import Flask, jsonify +from flask import Flask, jsonify, request from werkzeug.serving import make_server @@ -45,84 +45,24 @@ def median_rate(rateslist): return range(int(rate * 0.99), int(rate * 1.01)) -def test_apis_batch1(node_factory): - opts = { - "currencyrate-disable-source": ["bitstamp", "coinbase"], - } - l1 = node_factory.get_node(options=opts) - - rateslist = l1.rpc.call("listcurrencyrates", ["USD"])['currencyrates'] - LOGGER.info(rateslist) - rates = {entry["source"]: entry["amount"] for entry in rateslist} - - assert "bitstamp" not in rates - assert "coinbase" not in rates - - assert "coingecko" in rates - assert "kraken" in rates - assert "blockchain.info" in rates - assert "coindesk" in rates - assert "binance" in rates - - # Death to the 58k gang! - assert rates["coingecko"] > 58000 - assert rates["kraken"] > 58000 - assert rates["blockchain.info"] > 58000 - assert rates["coindesk"] > 58000 - assert rates["binance"] > 58000 - - rates = [ - rates["coingecko"], - rates["kraken"], - rates["blockchain.info"], - rates["coindesk"], - rates["binance"], - ] +def test_apis(node_factory): + l1 = node_factory.get_node() - rates.sort() - - convert = l1.rpc.call("currencyconvert", [100, "USD"]) - LOGGER.info(convert) - - assert "msat" in convert - assert convert["msat"] > 0 - assert convert["msat"] in median_conversion(100, rateslist) - - assert int(l1.rpc.currencyrate("usd")['rate']) in median_rate(rateslist) - - -def test_apis_batch2(node_factory): - opts = { - "currencyrate-disable-source": [ - "coingecko", - "kraken", - "blockchain.info", - "coindesk", - "binance", - ], - } - l1 = node_factory.get_node(options=opts) + for source in ALL_RESOURCES: + try: + rate = l1.rpc.call("currencyrate", ["USD", source])["rate"] + LOGGER.info(rate) + assert rate > 10_000 + except RpcError as e: + LOGGER.warning(f"{source} reported error: {e}") + msg = str(e) + assert "HTTP error 429" in msg or "HTTP error 401" in msg + continue rateslist = l1.rpc.call("listcurrencyrates", ["USD"])['currencyrates'] LOGGER.info(rateslist) - rates = {entry["source"]: entry["amount"] for entry in rateslist} - - assert "bitstamp" in rates - assert "coinbase" in rates - - assert "coingecko" not in rates - assert "kraken" not in rates - assert "blockchain.info" not in rates - assert "coindesk" not in rates - assert "binance" not in rates - - assert rates["bitstamp"] > 0 - assert rates["coinbase"] > 0 + rates = [{entry["source"]: entry["amount"] for entry in rateslist}] - rates = [ - rates["bitstamp"], - rates["coinbase"], - ] rates.sort() convert = l1.rpc.call("currencyconvert", [100, "USD"]) @@ -132,51 +72,7 @@ def test_apis_batch2(node_factory): assert convert["msat"] > 0 assert convert["msat"] in median_conversion(100, rateslist) - assert int(l1.rpc.currencyrate("USD")['rate']) in median_rate(rateslist) - - -def test_custom_source(node_factory): - opts = { - "currencyrate-disable-source": ALL_RESOURCES, - "currencyrate-add-source": [ - r"my-coingecko,https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies={currency_lc},bitcoin,{currency_lc}", - r"my-kraken,https://api.kraken.com/0/public/Ticker?pair=XXBTZ{currency},result,XXBTZ{currency},c,0", - ], - } - l1 = node_factory.get_node(options=opts) - - rateslist = l1.rpc.call("listcurrencyrates", ["USD"])['currencyrates'] - LOGGER.info(rateslist) - rates = {entry["source"]: entry["amount"] for entry in rateslist} - - assert "bitstamp" not in rates - assert "coinbase" not in rates - assert "coingecko" not in rates - assert "kraken" not in rates - assert "blockchain.info" not in rates - assert "coindesk" not in rates - assert "binance" not in rates - - assert "my-coingecko" in rates - assert "my-kraken" in rates - - assert rates["my-coingecko"] > 0 - assert rates["my-kraken"] > 0 - - rates = [ - rates["my-coingecko"], - rates["my-kraken"], - ] - rates.sort() - - convert = l1.rpc.call("currencyconvert", [100, "USD"]) - LOGGER.info(convert) - - assert "msat" in convert - assert convert["msat"] > 0 - assert convert["msat"] in median_conversion(100, rateslist) - - assert int(l1.rpc.currencyrate("USD")['rate']) in median_rate(rateslist) + assert int(l1.rpc.currencyrate("usd")['rate']) in median_rate(rateslist) def test_no_sources(node_factory): @@ -193,8 +89,13 @@ def test_no_sources(node_factory): LOGGER.info(rates) -def test_invalid_currency(node_factory): - opts = {} +def test_invalid_currency(node_factory, fake_rateserver): + opts = { + "currencyrate-disable-source": ALL_RESOURCES, + "currencyrate-add-source": [ + f"invalid,{fake_rateserver['url']}/invalid?currency={{currency_lc}},price" + ], + } l1 = node_factory.get_node(options=opts) with pytest.raises( @@ -204,19 +105,13 @@ def test_invalid_currency(node_factory): rates = l1.rpc.call("listcurrencyrates", ["XXX"]) LOGGER.info(rates) - l1.daemon.wait_for_logs(["failed to get `XXX` rate from bitstamp", - "failed to get `XXX` rate from coinbase", - "failed to get `XXX` rate from coingecko", - "failed to get `XXX` rate from kraken", - "failed to get `XXX` rate from blockchain.info", - "failed to get `XXX` rate from coindesk", - "failed to get `XXX` rate from binance"]) + l1.daemon.wait_for_log("failed to get `XXX` rate from invalid") class _ServerThread(threading.Thread): def __init__(self, app): super().__init__(daemon=True) - self._server = make_server("127.0.0.1", 0, app) + self._server = make_server("127.0.0.1", 0, app, threaded=True) self.port = self._server.server_port def run(self): @@ -259,6 +154,11 @@ def too_low(): def midpoint(): return jsonify({"price": state["midpoint"]}) + @app.get("/invalid") + def invalid(): + currency = request.args.get("currency") + return jsonify({"price": state[currency]}) + srv = _ServerThread(app) srv.start() try: @@ -440,6 +340,9 @@ def test_bkpr_currencyrate_persisted(node_factory, fake_rateserver): fake_rateserver["state"]["slow"] = 150_000_000 new_median = (fake_rateserver["state"]["fast"] + fake_rateserver["state"]["slow"]) / 2 + # CLN caches rates for 60s + time.sleep(61) + new_events = l1.rpc.bkpr_listaccountevents()["events"] assert new_events == events