@@ -69,59 +69,30 @@ impl Source {
6969 let currency = currency. to_uppercase ( ) ;
7070 let url = self . url ( & currency_lc, & currency) ;
7171
72- let resp : Value = client
72+ let response = client
7373 . get ( & url)
7474 . send ( )
7575 . await
76- . map_err ( |e| anyhow ! ( "Failed to request url {url} caused by: {:?}" , e. source( ) ) ) ?
77- . json ( )
78- . await
79- . map_err ( |e| {
80- anyhow ! (
81- "Failed to decode response body from {url}, caused by: {:?}" ,
82- e. source( )
83- )
84- } ) ?;
76+ . map_err ( |e| anyhow ! ( "Request failed {url}: {:?}" , e. source( ) ) ) ?;
8577
86- let reply_members = self . reply_members ( & currency_lc , & currency ) ;
78+ let status = response . status ( ) ;
8779
88- let mut current = & mut resp. clone ( ) ;
89- for member in reply_members {
90- if let Ok ( pos) = member. parse :: < usize > ( ) {
91- current = current. get_mut ( pos) . ok_or ( anyhow ! (
92- "Positional member `{}` not found in {} response: {}" ,
93- member,
94- self . name( ) ,
95- resp
96- ) ) ?;
97- } else {
98- current = current. get_mut ( & member) . ok_or ( anyhow ! (
99- "Member `{}` not found in {} response: {}" ,
100- member,
101- self . name( ) ,
102- resp
103- ) ) ?;
104- }
105- }
106- let price = match current {
107- Value :: Number ( number) => number
108- . as_f64 ( )
109- . ok_or ( anyhow ! ( "Json number price could not be converted to float" ) ) ?,
110- Value :: String ( string) => string
111- . parse :: < f64 > ( )
112- . map_err ( |e| anyhow ! ( "Price string could not be converted to float: {e}" ) ) ?,
113- _ => return Err ( anyhow ! ( "Price is invalid json type" ) ) ,
114- } ;
115-
116- if price == 0.0 {
117- log:: warn!( "{} returned 0.0 as price for {}" , self . name, currency) ;
80+ if !status. is_success ( ) {
11881 return Err ( anyhow ! (
119- "{} returned 0.0 as price for {}" ,
120- self . name,
121- currency
82+ "HTTP error {status} from {url}: body={}" ,
83+ response. text( ) . await ?
12284 ) ) ;
12385 }
12486
87+ let resp: Value = response
88+ . json ( )
89+ . await
90+ . map_err ( |e| anyhow ! ( "Failed to parse JSON from {url}: {:?}" , e. source( ) ) ) ?;
91+
92+ let reply_members = self . reply_members ( & currency_lc, & currency) ;
93+
94+ let price = extract_price_from_response ( & resp, & reply_members, self . name ( ) , & currency) ?;
95+
12596 log:: info!(
12697 "Fetched price in {}ms from {}: {:.2} {currency}/BTC" ,
12798 now. elapsed( ) . as_millis( ) ,
@@ -133,10 +104,46 @@ impl Source {
133104 }
134105}
135106
107+ fn extract_price_from_response (
108+ resp : & Value ,
109+ reply_members : & [ String ] ,
110+ name : & str ,
111+ currency : & str ,
112+ ) -> Result < f64 , anyhow:: Error > {
113+ let mut current = & mut resp. clone ( ) ;
114+ for member in reply_members {
115+ if let Ok ( pos) = member. parse :: < usize > ( ) {
116+ current = current. get_mut ( pos) . ok_or ( anyhow ! (
117+ "Positional member `{member}` not found in {name} response: {resp}"
118+ ) ) ?;
119+ } else {
120+ current = current. get_mut ( member) . ok_or ( anyhow ! (
121+ "Member `{member}` not found in {name} response: {resp}"
122+ ) ) ?;
123+ }
124+ }
125+ let price = match current {
126+ Value :: Number ( number) => number
127+ . as_f64 ( )
128+ . ok_or ( anyhow ! ( "Json number price could not be converted to float" ) ) ?,
129+ Value :: String ( string) => string
130+ . parse :: < f64 > ( )
131+ . map_err ( |e| anyhow ! ( "Price string could not be converted to float: {e}" ) ) ?,
132+ _ => return Err ( anyhow ! ( "Price is invalid json type" ) ) ,
133+ } ;
134+
135+ if price == 0.0 {
136+ log:: warn!( "{name} returned 0.0 as price for {currency}" ) ;
137+ return Err ( anyhow ! ( "{name} returned 0.0 as price for {currency}" ) ) ;
138+ }
139+ Ok ( price)
140+ }
141+
136142struct SourceHealth {
137143 source : Source ,
138144 failures : u32 ,
139145 backoff_until : Instant ,
146+ last_error : Option < String > ,
140147}
141148
142149impl SourceHealth {
@@ -145,18 +152,21 @@ impl SourceHealth {
145152 source,
146153 failures : 0 ,
147154 backoff_until : Instant :: now ( ) ,
155+ last_error : None ,
148156 }
149157 }
150158
151159 fn mark_success ( & mut self ) {
152160 self . failures = 0 ;
153161 self . backoff_until = Instant :: now ( ) ;
162+ self . last_error = None ;
154163 }
155164
156- fn mark_failure ( & mut self ) {
165+ fn mark_failure ( & mut self , error : String ) {
157166 self . failures += 1 ;
158167 let delay = INITIAL_BACKOFF * 2u32 . pow ( self . failures . min ( 10 ) ) ;
159168 self . backoff_until = Instant :: now ( ) + delay. min ( MAX_BACKOFF ) ;
169+ self . last_error = Some ( error) ;
160170 }
161171}
162172
@@ -396,8 +406,10 @@ impl BtcPriceOracle {
396406 }
397407
398408 Err ( e) => {
399- log:: warn!( "failed to get `{currency}` rate from {source_name}: {e}" ) ;
400- source_health. mark_failure ( ) ;
409+ let err_msg =
410+ format ! ( "failed to get `{currency}` rate from {source_name}: {e}" ) ;
411+ log:: warn!( "{err_msg}" ) ;
412+ source_health. mark_failure ( err_msg) ;
401413 }
402414 }
403415 }
@@ -495,8 +507,10 @@ impl BtcPriceOracle {
495507 }
496508 }
497509 Err ( e) => {
498- log:: warn!( "failed to get `{currency}` rate from {name}: {e}" ) ;
499- source_health. mark_failure ( ) ;
510+ let err_msg =
511+ format ! ( "failed to get `{currency}` rate from {name}: {e}" ) ;
512+ log:: warn!( "{err_msg}" ) ;
513+ source_health. mark_failure ( err_msg) ;
500514 }
501515 }
502516 }
@@ -533,34 +547,35 @@ impl BtcPriceOracle {
533547
534548 let inner = self . inner . lock ( ) . await ;
535549
536- // Give a helpful error if the source name is unknown entirely
537- if !inner. sources . contains_key ( source_name) {
538- let available = inner
539- . sources
540- . keys ( )
541- . cloned ( )
542- . collect :: < Vec < _ > > ( )
543- . join ( ", " ) ;
550+ let Some ( source_health) = inner. sources . get ( source_name) else {
551+ // Give a helpful error if the source name is unknown entirely
552+ let available = inner. sources . keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) . join ( ", " ) ;
544553 return Err ( anyhow ! (
545554 "Unknown source `{source_name}`. Available sources: {available}"
546555 ) ) ;
547- }
556+ } ;
548557
549558 let currency_cache = inner
550559 . currencies
551560 . get ( currency)
552561 . ok_or_else ( || anyhow ! ( "No rates available for `{currency}`" ) ) ?;
553562
554- let price_cache = currency_cache
555- . prices
556- . get ( source_name)
557- . ok_or_else ( || anyhow ! (
563+ let source_err = source_health
564+ . last_error
565+ . clone ( )
566+ . unwrap_or_else ( || format ! ( "{source_name} returned no result or error" ) ) ;
567+
568+ let price_cache = currency_cache. prices . get ( source_name) . ok_or_else ( || {
569+ anyhow ! (
558570 "Source `{source_name}` has no data for `{currency}`. \
559- The source may not support this currency or is currently backing off."
560- ) ) ?;
571+ The source may not support this currency: {source_err}"
572+ )
573+ } ) ?;
561574
562575 if price_cache. timestamp + SERVE_TTL <= Instant :: now ( ) {
563- return Err ( anyhow ! ( "Cached rate from `{source_name}` is expired" ) ) ;
576+ return Err ( anyhow ! (
577+ "Cached rate from `{source_name}` is expired. Last source error: {source_err}"
578+ ) ) ;
564579 }
565580
566581 Ok ( price_cache. price )
@@ -577,3 +592,164 @@ fn get_median(source_results: Vec<SourceResult>) -> f64 {
577592 f64:: midpoint ( prices[ mid - 1 ] , prices[ mid] )
578593 }
579594}
595+
596+ #[ test]
597+ fn test_sources ( ) {
598+ use crate :: add_default_sources;
599+ use serde_json:: json;
600+ let mut sources = Vec :: new ( ) ;
601+ add_default_sources ( & mut sources, false ) ;
602+ let mut responses = HashMap :: new ( ) ;
603+
604+ let coingecko_price = 72732f64 ;
605+ let coingecko_response = json ! ( { "bitcoin" : { "usd" : coingecko_price} } ) ;
606+ responses. insert ( "coingecko" , ( coingecko_response, coingecko_price) ) ;
607+
608+ let kraken_price = 72745.00000 ;
609+ let kraken_response = json ! ( {
610+ "error" : [ ] ,
611+ "result" : {
612+ "XXBTZUSD" : {
613+ "a" : [ "72745.00000" , "1" , "1.000" ] ,
614+ "b" : [ "72744.90000" , "3" , "3.000" ] ,
615+ "c" : [ kraken_price, "0.00033610" ] ,
616+ "v" : [ "299.69911717" , "640.87872182" ] ,
617+ "p" : [ "73231.75442" , "73435.07382" ] ,
618+ "t" : [ 17317 , 39634 ] ,
619+ "l" : [ "72613.70000" , "72613.70000" ] ,
620+ "h" : [ "73960.00000" , "74070.00000" ] ,
621+ "o" : "73569.90000" ,
622+ }
623+ } ,
624+ } ) ;
625+ responses. insert ( "kraken" , ( kraken_response, kraken_price) ) ;
626+
627+ let blockchain_info_price = 72749.07 ;
628+ let blockchain_info_response = json ! ( {
629+ "ARS" : {
630+ "15m" : 1.0250388182e8 ,
631+ "last" : 1.0250388182e8 ,
632+ "buy" : 1.0250388182e8 ,
633+ "sell" : 1.0250388182e8 ,
634+ "symbol" : "ARS" ,
635+ } ,
636+ "AUD" : {
637+ "15m" : 101319.81 ,
638+ "last" : 101319.81 ,
639+ "buy" : 101319.81 ,
640+ "sell" : 101319.81 ,
641+ "symbol" : "AUD" ,
642+ } ,
643+ "BRL" : {
644+ "15m" : 366724.43 ,
645+ "last" : 366724.43 ,
646+ "buy" : 366724.43 ,
647+ "sell" : 366724.43 ,
648+ "symbol" : "BRL" ,
649+ } ,
650+ "CAD" : {
651+ "15m" : 100512.08 ,
652+ "last" : 100512.08 ,
653+ "buy" : 100512.08 ,
654+ "sell" : 100512.08 ,
655+ "symbol" : "CAD" ,
656+ } ,
657+ "USD" : {
658+ "15m" : 72749.07 ,
659+ "last" : blockchain_info_price,
660+ "buy" : 72749.07 ,
661+ "sell" : 72749.07 ,
662+ "symbol" : "USD" ,
663+ } ,
664+ } ) ;
665+ responses. insert (
666+ "blockchain.info" ,
667+ ( blockchain_info_response, blockchain_info_price) ,
668+ ) ;
669+
670+ let bitstamp_price = 72748.45 ;
671+ let bitstamp_response = json ! ( {
672+ "timestamp" : "1780301548" ,
673+ "open" : "73568.00" ,
674+ "high" : "74094.65" ,
675+ "low" : "72611.68" ,
676+ "last" : bitstamp_price,
677+ "volume" : "619.76380694" ,
678+ "vwap" : "73542.01" ,
679+ "bid" : "72748.45" ,
680+ "ask" : "72748.46" ,
681+ "side" : "1" ,
682+ "open_24" : "73769.45" ,
683+ "percent_change_24" : "-1.38" ,
684+ "market_type" : "SPOT" ,
685+ } ) ;
686+ responses. insert ( "bitstamp" , ( bitstamp_response, bitstamp_price) ) ;
687+
688+ let coindesk_price = 72751.6828660636 ;
689+ let coindes_response = json ! ( {
690+ "Data" : {
691+ "BTC-USD" : {
692+ "TYPE" : "266" ,
693+ "MARKET" : "cadli" ,
694+ "INSTRUMENT" : "BTC-USD" ,
695+ "CCSEQ" : 1323841393 ,
696+ "VALUE" : coindesk_price,
697+ "VALUE_FLAG" : "UP" ,
698+ "VALUE_LAST_UPDATE_TS" : 1780301570 ,
699+ "VALUE_LAST_UPDATE_TS_NS" : 94000000 ,
700+ "LAST_UPDATE_QUANTITY" : 0.105 ,
701+ "LAST_UPDATE_QUOTE_QUANTITY" : 7642.36250656015 ,
702+ "LAST_UPDATE_VOLUME_TOP_TIER" : 0 ,
703+ "LAST_UPDATE_QUOTE_VOLUME_TOP_TIER" : 0 ,
704+ "LAST_UPDATE_VOLUME_DIRECT" : 0 ,
705+ "LAST_UPDATE_CCSEQ" : 1323894738 ,
706+ "CURRENT_HOUR_VOLUME" : 1574.29740546284 ,
707+ "CURRENT_HOUR_QUOTE_VOLUME" : 114448749.088967 ,
708+ "CURRENT_HOUR_VOLUME_TOP_TIER" : 815.533688797 ,
709+ "CURRENT_HOUR_QUOTE_VOLUME_TOP_TIER" : 59271744.4993065 ,
710+ "CURRENT_YEAR_CHANGE_PERCENTAGE" : -16.8891195575271 ,
711+ "MOVING_24_HOUR_VOLUME" : 127405.003580434 ,
712+ "MOVING_24_HOUR_QUOTE_VOLUME" : 9365693546.36435 ,
713+ "MOVING_24_HOUR_VOLUME_TOP_TIER" : 60402.3558377577 ,
714+ "MOVING_24_HOUR_QUOTE_VOLUME_TOP_TIER_DIRECT" : 867716024.677109 ,
715+ "MOVING_24_HOUR_OPEN" : 73850.0780201078 ,
716+ "MOVING_7_DAY_HIGH" : 77996.1623459993 ,
717+ "MOVING_7_DAY_LOW" : 72425.5243349043 ,
718+ "MOVING_7_DAY_TOTAL_INDEX_UPDATES" : 12822192 ,
719+ "MOVING_7_DAY_CHANGE" : -4535.8043571581 ,
720+ "MOVING_365_DAY_CHANGE_PERCENTAGE" : -31.154821757090602 ,
721+ "LIFETIME_FIRST_UPDATE_TS" : 1279408140 ,
722+ "LIFETIME_QUOTE_VOLUME_TOP_TIER_DIRECT" : 4851586542024.82 ,
723+ "LIFETIME_OPEN" : 0.04951 ,
724+ "LIFETIME_LOW" : 0.01 ,
725+ "LIFETIME_LOW_TS" : 1286572500 ,
726+ "LIFETIME_TOTAL_INDEX_UPDATES" : 1329464334 ,
727+ "LIFETIME_CHANGE" : 72751.6333560636 ,
728+ "LIFETIME_CHANGE_PERCENTAGE" : 146943311.16151 ,
729+ }
730+ } ,
731+ "Err" : { } ,
732+ } ) ;
733+ responses. insert ( "coindesk" , ( coindes_response, coindesk_price) ) ;
734+
735+ let coinbase_price = 72760.125 ;
736+ let coinbase_response =
737+ json ! ( { "data" : { "amount" : coinbase_price, "base" : "BTC" , "currency" : "USD" } } ) ;
738+ responses. insert ( "coinbase" , ( coinbase_response, coinbase_price) ) ;
739+
740+ let binance_price = 72715.35000000 ;
741+ let binance_response = json ! ( { "symbol" : "BTCUSD" , "price" : binance_price} ) ;
742+ responses. insert ( "binance" , ( binance_response, binance_price) ) ;
743+
744+ for source in & sources {
745+ let ( response, price) = responses. get ( source. name ( ) ) . unwrap ( ) ;
746+ let extracted_price = extract_price_from_response (
747+ response,
748+ & source. reply_members ( "usd" , "USD" ) ,
749+ & source. name ,
750+ "USD" ,
751+ )
752+ . unwrap ( ) ;
753+ assert_eq ! ( extracted_price, * price, "Failed for {}" , source. name( ) ) ;
754+ }
755+ }
0 commit comments