Skip to content

Commit 7c1cd05

Browse files
committed
improve rpc json requests handling
1 parent 9905dc1 commit 7c1cd05

2 files changed

Lines changed: 80 additions & 23 deletions

File tree

cli/v2/src/main.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ pub enum Commands {
163163
#[clap(long)]
164164
hash: Option<BlockHash>,
165165
},
166-
167166
ScanFrigate {
168167
#[clap(flatten)]
169168
rpc_args: RpcArgs,
@@ -172,7 +171,6 @@ pub enum Commands {
172171
#[clap(long)]
173172
hash: Option<BlockHash>,
174173
},
175-
176174
Create {
177175
/// Network
178176
#[clap(long, short, default_value = "signet")]

oracles/src/frigate/mod.rs

Lines changed: 80 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ use bitcoin::Txid;
66
use serde::{Deserialize, Serialize};
77
use serde_json::Value;
88

9+
const SUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.subscribe";
10+
const UNSUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.unsubscribe";
11+
const GET_RPC_METHOD: &str = "blockchain.transaction.get";
12+
const VERSION_RPC_METHOD: &str = "server.version";
13+
const BLOCK_HEADER_RPC_METHOD: &str = "blockchain.block.header";
14+
const STREAM_READ_BYTES: usize = 4096;
15+
pub const DUMMY_COINBASE: &str = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000";
16+
917
#[derive(Debug)]
1018
pub enum FrigateError {
1119
JsonRpc(jsonrpc::Error),
@@ -52,6 +60,7 @@ pub struct FrigateClient {
5260
pub host_url: String,
5361
client: Box<TcpStream>,
5462
pub request_timeout: Duration,
63+
next_request_id: u32,
5564
}
5665

5766
#[derive(Debug, Serialize, Deserialize)]
@@ -99,14 +108,6 @@ pub struct RequestPayload {
99108
pub jsonrpc: String,
100109
}
101110

102-
const SUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.subscribe";
103-
const UNSUBSCRIBE_RPC_METHOD: &str = "blockchain.silentpayments.unsubscribe";
104-
const GET_RPC_METHOD: &str = "blockchain.transaction.get";
105-
const VERSION_RPC_METHOD: &str = "server.version";
106-
const BLOCK_HEADER_RPC_METHOD: &str = "blockchain.block.header";
107-
const STREAM_READ_BYTES: usize = 4096;
108-
pub const DUMMY_COINBASE: &str = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1b03951a0604f15ccf5609013803062b9b5a0100072f425443432f20000000000000000000";
109-
110111
impl FrigateClient {
111112
pub async fn connect(host_url: &str) -> Result<Self, FrigateError> {
112113
let stream = TcpStream::connect(host_url)
@@ -117,6 +118,7 @@ impl FrigateClient {
117118
host_url: host_url.to_string(),
118119
client: Box::new(stream),
119120
request_timeout: Duration::from_secs(10),
121+
next_request_id: 1,
120122
})
121123
}
122124

@@ -126,6 +128,47 @@ impl FrigateClient {
126128
self
127129
}
128130

131+
/// Generates the next sequential request ID.
132+
fn next_id(&mut self) -> u32 {
133+
let id = self.next_request_id;
134+
self.next_request_id = self.next_request_id.wrapping_add(1);
135+
id
136+
}
137+
138+
/// Extracts JSON-RPC error from response if present.
139+
fn extract_rpc_error(response: &Value) -> Option<FrigateError> {
140+
if let Some(error) = response.get("error") {
141+
if error.is_object() {
142+
let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
143+
let message = error
144+
.get("message")
145+
.and_then(|v| v.as_str())
146+
.unwrap_or("Unknown error");
147+
let error_msg = format!("JSON-RPC error (code {}): {}", code, message);
148+
return Some(FrigateError::Generic(error_msg));
149+
}
150+
}
151+
None
152+
}
153+
154+
/// Validates that the response ID matches the expected request ID.
155+
fn validate_response_id(response: &Value, expected_id: u32) -> Result<(), FrigateError> {
156+
if let Some(id_value) = response.get("id") {
157+
let response_id = id_value.as_u64().map(|v| v as u32);
158+
if response_id != Some(expected_id) {
159+
return Err(FrigateError::Generic(format!(
160+
"Response ID mismatch: expected {}, got {:?}",
161+
expected_id, response_id
162+
)));
163+
}
164+
} else {
165+
return Err(FrigateError::Generic(
166+
"Response missing id field".to_string(),
167+
));
168+
}
169+
Ok(())
170+
}
171+
129172
pub async fn read_from_stream(&mut self, size: usize) -> Result<Value, FrigateError> {
130173
let mut buffer = vec![0; size];
131174
let n = self.client.read(&mut buffer).await?;
@@ -143,7 +186,11 @@ impl FrigateClient {
143186
}
144187
}
145188

146-
async fn send_request(&mut self, req_bytes: &[u8]) -> Result<Value, FrigateError> {
189+
async fn send_request(
190+
&mut self,
191+
req_bytes: &[u8],
192+
request_id: u32,
193+
) -> Result<Value, FrigateError> {
147194
match timeout(self.request_timeout, async {
148195
self.client.write_all(req_bytes).await?;
149196
self.client.write_all(b"\n").await?;
@@ -152,7 +199,14 @@ impl FrigateClient {
152199
})
153200
.await
154201
{
155-
Ok(res) => res,
202+
Ok(res) => {
203+
let response = res?;
204+
if let Some(error) = Self::extract_rpc_error(&response) {
205+
return Err(error);
206+
}
207+
Self::validate_response_id(&response, request_id)?;
208+
Ok(response)
209+
}
156210
Err(_) => Err(FrigateError::Generic(format!(
157211
"request timed out after {:?}",
158212
self.request_timeout
@@ -161,15 +215,16 @@ impl FrigateClient {
161215
}
162216

163217
pub async fn get_block_header(&mut self, height: u32) -> Result<String, FrigateError> {
218+
let request_id = self.next_id();
164219
let params = vec![height];
165220
let req = RequestPayload {
166221
method: BLOCK_HEADER_RPC_METHOD.to_string(),
167222
params: serde_json::json!(params),
168-
id: serde_json::Value::from(5),
223+
id: serde_json::Value::from(request_id),
169224
jsonrpc: "2.0".to_string(),
170225
};
171226
let req_bytes = serde_json::to_vec(&req)?;
172-
let res = self.send_request(&req_bytes).await?;
227+
let res = self.send_request(&req_bytes, request_id).await?;
173228

174229
tracing::debug!("[Block Header Request] Result {:?}", res);
175230
Ok(String::from(res["result"].as_str().unwrap()))
@@ -179,15 +234,16 @@ impl FrigateClient {
179234
&mut self,
180235
txid: String,
181236
) -> Result<(String, String), FrigateError> {
237+
let request_id = self.next_id();
182238
let params = vec![txid, "true".to_string()];
183239
let req = RequestPayload {
184240
method: GET_RPC_METHOD.to_string(),
185-
id: serde_json::Value::from(4),
241+
id: serde_json::Value::from(request_id),
186242
params: serde_json::json!(params),
187243
jsonrpc: "2.0".to_string(),
188244
};
189245
let req_bytes = serde_json::to_vec(&req)?;
190-
let res = self.send_request(&req_bytes).await?;
246+
let res = self.send_request(&req_bytes, request_id).await?;
191247

192248
tracing::debug!("[Get tx Request] Result {:#?}", res);
193249
let blockhash = String::from(res["result"]["blockhash"].as_str().unwrap());
@@ -196,17 +252,18 @@ impl FrigateClient {
196252
}
197253

198254
pub async fn version(&mut self) -> Result<(), FrigateError> {
199-
let params = vec!["frigate-cli", "1.4"];
255+
let request_id = self.next_id();
256+
let params = vec!["bdk-sp", "1.4"];
200257

201258
let req = RequestPayload {
202259
method: VERSION_RPC_METHOD.to_string(),
203260
params: serde_json::json!(params),
204-
id: serde_json::Value::from(3),
261+
id: serde_json::Value::from(request_id),
205262
jsonrpc: "2.0".to_string(),
206263
};
207264

208265
let req_bytes = serde_json::to_vec(&req)?;
209-
self.send_request(&req_bytes).await?;
266+
self.send_request(&req_bytes, request_id).await?;
210267

211268
Ok(())
212269
}
@@ -216,6 +273,7 @@ impl FrigateClient {
216273
req: &SubscribeRequest,
217274
) -> Result<Option<(Vec<History>, f32)>, FrigateError> {
218275
self.version().await?;
276+
let request_id = self.next_id();
219277
let mut params: Vec<Value> = vec![
220278
serde_json::json!(req.scan_priv_key),
221279
serde_json::json!(req.spend_pub_key),
@@ -232,12 +290,12 @@ impl FrigateClient {
232290
let req = RequestPayload {
233291
method: SUBSCRIBE_RPC_METHOD.to_string(),
234292
params: serde_json::json!(params),
235-
id: serde_json::Value::from(2),
293+
id: serde_json::Value::from(request_id),
236294
jsonrpc: "2.0".to_string(),
237295
};
238296

239297
let req_bytes = serde_json::to_vec(&req)?;
240-
let result = self.send_request(&req_bytes).await?;
298+
let result = self.send_request(&req_bytes, request_id).await?;
241299

242300
if result["result"].is_string() {
243301
tracing::info!(
@@ -257,6 +315,7 @@ impl FrigateClient {
257315
}
258316

259317
pub async fn unsubscribe(&mut self, req: &UnsubscribeRequest) -> Result<String, FrigateError> {
318+
let request_id = self.next_id();
260319
let params: Vec<Value> = vec![
261320
serde_json::json!(req.scan_privkey),
262321
serde_json::json!(req.spend_pubkey),
@@ -265,13 +324,13 @@ impl FrigateClient {
265324
self.version().await?;
266325
let req = RequestPayload {
267326
method: UNSUBSCRIBE_RPC_METHOD.to_string(),
268-
id: serde_json::Value::from(1),
327+
id: serde_json::Value::from(request_id),
269328
params: serde_json::json!(params),
270329
jsonrpc: "2.0".to_string(),
271330
};
272331

273332
let req_bytes = serde_json::to_vec(&req)?;
274-
let result = self.send_request(&req_bytes).await?;
333+
let result = self.send_request(&req_bytes, request_id).await?;
275334

276335
Ok(result["result"].to_string())
277336
}

0 commit comments

Comments
 (0)