Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 241 additions & 65 deletions plugins/currencyrate-plugin/src/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,59 +69,30 @@ impl Source {
let currency = currency.to_uppercase();
let url = self.url(&currency_lc, &currency);

let resp: Value = client
let response = client
.get(&url)
.send()
.await
.map_err(|e| anyhow!("Failed to request url {url} caused by: {:?}", e.source()))?
.json()
.await
.map_err(|e| {
anyhow!(
"Failed to decode response body from {url}, caused by: {:?}",
e.source()
)
})?;
.map_err(|e| anyhow!("Request failed {url}: {:?}", e.source()))?;

let reply_members = self.reply_members(&currency_lc, &currency);
let status = response.status();

let mut current = &mut resp.clone();
for member in reply_members {
if let Ok(pos) = member.parse::<usize>() {
current = current.get_mut(pos).ok_or(anyhow!(
"Positional member `{}` not found in {} response: {}",
member,
self.name(),
resp
))?;
} else {
current = current.get_mut(&member).ok_or(anyhow!(
"Member `{}` not found in {} response: {}",
member,
self.name(),
resp
))?;
}
}
let price = match current {
Value::Number(number) => number
.as_f64()
.ok_or(anyhow!("Json number price could not be converted to float"))?,
Value::String(string) => string
.parse::<f64>()
.map_err(|e| anyhow!("Price string could not be converted to float: {e}"))?,
_ => return Err(anyhow!("Price is invalid json type")),
};

if price == 0.0 {
log::warn!("{} returned 0.0 as price for {}", self.name, currency);
if !status.is_success() {
return Err(anyhow!(
"{} returned 0.0 as price for {}",
self.name,
currency
"HTTP error {status} from {url}: body={}",
response.text().await?
));
}

let resp: Value = response
.json()
.await
Comment thread
nGoline marked this conversation as resolved.
.map_err(|e| anyhow!("Failed to parse JSON from {url}: {:?}", e.source()))?;

let reply_members = self.reply_members(&currency_lc, &currency);

let price = extract_price_from_response(&resp, &reply_members, self.name(), &currency)?;

log::info!(
"Fetched price in {}ms from {}: {:.2} {currency}/BTC",
now.elapsed().as_millis(),
Expand All @@ -133,10 +104,46 @@ impl Source {
}
}

fn extract_price_from_response(
resp: &Value,
reply_members: &[String],
name: &str,
currency: &str,
) -> Result<f64, anyhow::Error> {
let mut current = &mut resp.clone();
for member in reply_members {
if let Ok(pos) = member.parse::<usize>() {
current = current.get_mut(pos).ok_or(anyhow!(
"Positional member `{member}` not found in {name} response: {resp}"
))?;
} else {
current = current.get_mut(member).ok_or(anyhow!(
"Member `{member}` not found in {name} response: {resp}"
))?;
}
}
let price = match current {
Value::Number(number) => number
.as_f64()
.ok_or(anyhow!("Json number price could not be converted to float"))?,
Value::String(string) => string
.parse::<f64>()
.map_err(|e| anyhow!("Price string could not be converted to float: {e}"))?,
_ => return Err(anyhow!("Price is invalid json type")),
};

if price == 0.0 {
log::warn!("{name} returned 0.0 as price for {currency}");
return Err(anyhow!("{name} returned 0.0 as price for {currency}"));
}
Ok(price)
}

struct SourceHealth {
Comment thread
nGoline marked this conversation as resolved.
source: Source,
failures: u32,
backoff_until: Instant,
last_error: Option<String>,
}

impl SourceHealth {
Expand All @@ -145,18 +152,21 @@ impl SourceHealth {
source,
failures: 0,
backoff_until: Instant::now(),
last_error: None,
}
}

fn mark_success(&mut self) {
self.failures = 0;
self.backoff_until = Instant::now();
self.last_error = None;
}

fn mark_failure(&mut self) {
fn mark_failure(&mut self, error: String) {
self.failures += 1;
let delay = INITIAL_BACKOFF * 2u32.pow(self.failures.min(10));
self.backoff_until = Instant::now() + delay.min(MAX_BACKOFF);
self.last_error = Some(error);
}
}

Expand Down Expand Up @@ -396,8 +406,10 @@ impl BtcPriceOracle {
}

Err(e) => {
log::warn!("failed to get `{currency}` rate from {source_name}: {e}");
source_health.mark_failure();
let err_msg =
format!("failed to get `{currency}` rate from {source_name}: {e}");
log::warn!("{err_msg}");
source_health.mark_failure(err_msg);
}
}
}
Expand Down Expand Up @@ -495,8 +507,10 @@ impl BtcPriceOracle {
}
}
Err(e) => {
log::warn!("failed to get `{currency}` rate from {name}: {e}");
source_health.mark_failure();
let err_msg =
format!("failed to get `{currency}` rate from {name}: {e}");
log::warn!("{err_msg}");
source_health.mark_failure(err_msg);
}
}
}
Expand Down Expand Up @@ -533,34 +547,35 @@ impl BtcPriceOracle {

let inner = self.inner.lock().await;

// Give a helpful error if the source name is unknown entirely
if !inner.sources.contains_key(source_name) {
let available = inner
.sources
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ");
let Some(source_health) = inner.sources.get(source_name) else {
// Give a helpful error if the source name is unknown entirely
let available = inner.sources.keys().cloned().collect::<Vec<_>>().join(", ");
return Err(anyhow!(
"Unknown source `{source_name}`. Available sources: {available}"
));
}
};

let currency_cache = inner
.currencies
.get(currency)
.ok_or_else(|| anyhow!("No rates available for `{currency}`"))?;

let price_cache = currency_cache
.prices
.get(source_name)
.ok_or_else(|| anyhow!(
let source_err = source_health
.last_error
.clone()
.unwrap_or_else(|| format!("{source_name} returned no result or error"));

let price_cache = currency_cache.prices.get(source_name).ok_or_else(|| {
anyhow!(
"Source `{source_name}` has no data for `{currency}`. \
The source may not support this currency or is currently backing off."
))?;
The source may not support this currency: {source_err}"
)
})?;

if price_cache.timestamp + SERVE_TTL <= Instant::now() {
return Err(anyhow!("Cached rate from `{source_name}` is expired"));
return Err(anyhow!(
"Cached rate from `{source_name}` is expired. Last source error: {source_err}"
));
}

Ok(price_cache.price)
Expand All @@ -577,3 +592,164 @@ fn get_median(source_results: Vec<SourceResult>) -> f64 {
f64::midpoint(prices[mid - 1], prices[mid])
}
}

#[test]
fn test_sources() {
use crate::add_default_sources;
use serde_json::json;
let mut sources = Vec::new();
add_default_sources(&mut sources, false);
let mut responses = HashMap::new();

let coingecko_price = 72732f64;
let coingecko_response = json!({"bitcoin": {"usd": coingecko_price}});
responses.insert("coingecko", (coingecko_response, coingecko_price));

let kraken_price = 72745.00000;
let kraken_response = json!({
"error": [],
"result": {
"XXBTZUSD": {
"a": ["72745.00000", "1", "1.000"],
"b": ["72744.90000", "3", "3.000"],
"c": [kraken_price, "0.00033610"],
"v": ["299.69911717", "640.87872182"],
"p": ["73231.75442", "73435.07382"],
"t": [17317, 39634],
"l": ["72613.70000", "72613.70000"],
"h": ["73960.00000", "74070.00000"],
"o": "73569.90000",
}
},
});
responses.insert("kraken", (kraken_response, kraken_price));

let blockchain_info_price = 72749.07;
let blockchain_info_response = json!({
"ARS": {
"15m": 1.0250388182e8,
"last": 1.0250388182e8,
"buy": 1.0250388182e8,
"sell": 1.0250388182e8,
"symbol": "ARS",
},
"AUD": {
"15m": 101319.81,
"last": 101319.81,
"buy": 101319.81,
"sell": 101319.81,
"symbol": "AUD",
},
"BRL": {
"15m": 366724.43,
"last": 366724.43,
"buy": 366724.43,
"sell": 366724.43,
"symbol": "BRL",
},
"CAD": {
"15m": 100512.08,
"last": 100512.08,
"buy": 100512.08,
"sell": 100512.08,
"symbol": "CAD",
},
"USD": {
"15m": 72749.07,
"last": blockchain_info_price,
"buy": 72749.07,
"sell": 72749.07,
"symbol": "USD",
},
});
responses.insert(
"blockchain.info",
(blockchain_info_response, blockchain_info_price),
);

let bitstamp_price = 72748.45;
let bitstamp_response = json!({
"timestamp": "1780301548",
"open": "73568.00",
"high": "74094.65",
"low": "72611.68",
"last": bitstamp_price,
"volume": "619.76380694",
"vwap": "73542.01",
"bid": "72748.45",
"ask": "72748.46",
"side": "1",
"open_24": "73769.45",
"percent_change_24": "-1.38",
"market_type": "SPOT",
});
responses.insert("bitstamp", (bitstamp_response, bitstamp_price));

let coindesk_price = 72751.6828660636;
let coindes_response = json!({
"Data": {
"BTC-USD": {
"TYPE": "266",
"MARKET": "cadli",
"INSTRUMENT": "BTC-USD",
"CCSEQ": 1323841393,
"VALUE": coindesk_price,
"VALUE_FLAG": "UP",
"VALUE_LAST_UPDATE_TS": 1780301570,
"VALUE_LAST_UPDATE_TS_NS": 94000000,
"LAST_UPDATE_QUANTITY": 0.105,
"LAST_UPDATE_QUOTE_QUANTITY": 7642.36250656015,
"LAST_UPDATE_VOLUME_TOP_TIER": 0,
"LAST_UPDATE_QUOTE_VOLUME_TOP_TIER": 0,
"LAST_UPDATE_VOLUME_DIRECT": 0,
"LAST_UPDATE_CCSEQ": 1323894738,
"CURRENT_HOUR_VOLUME": 1574.29740546284,
"CURRENT_HOUR_QUOTE_VOLUME": 114448749.088967,
"CURRENT_HOUR_VOLUME_TOP_TIER": 815.533688797,
"CURRENT_HOUR_QUOTE_VOLUME_TOP_TIER": 59271744.4993065,
"CURRENT_YEAR_CHANGE_PERCENTAGE": -16.8891195575271,
"MOVING_24_HOUR_VOLUME": 127405.003580434,
"MOVING_24_HOUR_QUOTE_VOLUME": 9365693546.36435,
"MOVING_24_HOUR_VOLUME_TOP_TIER": 60402.3558377577,
"MOVING_24_HOUR_QUOTE_VOLUME_TOP_TIER_DIRECT": 867716024.677109,
"MOVING_24_HOUR_OPEN": 73850.0780201078,
"MOVING_7_DAY_HIGH": 77996.1623459993,
"MOVING_7_DAY_LOW": 72425.5243349043,
"MOVING_7_DAY_TOTAL_INDEX_UPDATES": 12822192,
"MOVING_7_DAY_CHANGE": -4535.8043571581,
"MOVING_365_DAY_CHANGE_PERCENTAGE": -31.154821757090602,
"LIFETIME_FIRST_UPDATE_TS": 1279408140,
"LIFETIME_QUOTE_VOLUME_TOP_TIER_DIRECT": 4851586542024.82,
"LIFETIME_OPEN": 0.04951,
"LIFETIME_LOW": 0.01,
"LIFETIME_LOW_TS": 1286572500,
"LIFETIME_TOTAL_INDEX_UPDATES": 1329464334,
"LIFETIME_CHANGE": 72751.6333560636,
"LIFETIME_CHANGE_PERCENTAGE": 146943311.16151,
}
},
"Err": {},
});
responses.insert("coindesk", (coindes_response, coindesk_price));

let coinbase_price = 72760.125;
let coinbase_response =
json!({"data": {"amount": coinbase_price, "base": "BTC", "currency": "USD"}});
responses.insert("coinbase", (coinbase_response, coinbase_price));

let binance_price = 72715.35000000;
let binance_response = json!({"symbol": "BTCUSD", "price": binance_price});
responses.insert("binance", (binance_response, binance_price));

for source in &sources {
let (response, price) = responses.get(source.name()).unwrap();
let extracted_price = extract_price_from_response(
response,
&source.reply_members("usd", "USD"),
&source.name,
"USD",
)
.unwrap();
assert_eq!(extracted_price, *price, "Failed for {}", source.name());
}
}
Loading
Loading