Skip to content

Commit 6fd2658

Browse files
committed
fix: changes realted to bitreq usage.
1 parent 7ed3e6b commit 6fd2658

File tree

2 files changed

+89
-32
lines changed

2 files changed

+89
-32
lines changed

lightning-block-sync/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ all-features = true
1616
rustdoc-args = ["--cfg", "docsrs"]
1717

1818
[features]
19-
rest-client = [ "serde_json", "bitreq" ]
20-
rpc-client = [ "serde_json", "bitreq" ]
21-
tokio = [ "dep:tokio", "bitreq/async" ]
19+
rest-client = [ "serde_json", "dep:bitreq" ]
20+
rpc-client = [ "serde_json", "dep:bitreq" ]
21+
tokio = [ "dep:tokio", "bitreq?/async" ]
2222

2323
[dependencies]
2424
bitcoin = "0.32.2"

lightning-block-sync/src/http.rs

Lines changed: 86 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
44
use serde_json;
55

6+
#[cfg(feature = "tokio")]
7+
use bitreq::RequestExt;
8+
69
use std::convert::TryFrom;
710
use std::fmt;
811
use std::net::{SocketAddr, ToSocketAddrs};
@@ -78,9 +81,14 @@ impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
7881
}
7982
}
8083

84+
/// Maximum number of cached connections in the connection pool.
85+
const MAX_CONNECTIONS: usize = 10;
86+
8187
/// Client for making HTTP requests.
8288
pub(crate) struct HttpClient {
8389
address: SocketAddr,
90+
#[cfg(feature = "tokio")]
91+
client: bitreq::Client,
8492
}
8593

8694
impl HttpClient {
@@ -96,13 +104,17 @@ impl HttpClient {
96104
Some(address) => address,
97105
};
98106

99-
// Verify reachability by attempting a connection (matches original behavior).
107+
// Verify reachability by attempting a connection.
100108
let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
101109
stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
102110
stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
103111
drop(stream);
104112

105-
Ok(Self { address })
113+
Ok(Self {
114+
address,
115+
#[cfg(feature = "tokio")]
116+
client: bitreq::Client::new(MAX_CONNECTIONS),
117+
})
106118
}
107119

108120
/// Sends a `GET` request for a resource identified by `uri` at the `host`.
@@ -186,9 +198,9 @@ impl HttpClient {
186198
}
187199

188200
/// Sends an HTTP request message and reads the response, returning its body.
189-
async fn send_request(&mut self, request: bitreq::Request) -> std::io::Result<Vec<u8>> {
201+
async fn send_request(&self, request: bitreq::Request) -> std::io::Result<Vec<u8>> {
190202
#[cfg(feature = "tokio")]
191-
let response = request.send_async().await.map_err(bitreq_to_io_error)?;
203+
let response = request.send_async_with_client(&self.client).await.map_err(bitreq_to_io_error)?;
192204
#[cfg(not(feature = "tokio"))]
193205
let response = request.send().map_err(bitreq_to_io_error)?;
194206

@@ -216,7 +228,8 @@ fn bitreq_to_io_error(err: bitreq::Error) -> std::io::Error {
216228
| bitreq::Error::MalformedChunkLength
217229
| bitreq::Error::MalformedChunkEnd
218230
| bitreq::Error::MalformedContentLength
219-
| bitreq::Error::InvalidUtf8InResponse => ErrorKind::InvalidData,
231+
| bitreq::Error::InvalidUtf8InResponse
232+
| bitreq::Error::InvalidUtf8InBody(_) => ErrorKind::InvalidData,
220233
bitreq::Error::AddressNotFound | bitreq::Error::HttpsFeatureNotEnabled => {
221234
ErrorKind::InvalidInput
222235
},
@@ -321,18 +334,26 @@ mod endpoint_tests {
321334
#[cfg(test)]
322335
pub(crate) mod client_tests {
323336
use super::*;
324-
use std::io::BufRead;
325-
use std::io::Write;
337+
use std::io::{BufRead, Read, Write};
326338

327339
/// Server for handling HTTP client requests with a stock response.
328340
pub struct HttpServer {
329341
address: std::net::SocketAddr,
330-
#[allow(dead_code)]
331-
handler: std::thread::JoinHandle<()>,
332-
#[allow(dead_code)]
342+
handler: Option<std::thread::JoinHandle<()>>,
333343
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
334344
}
335345

346+
impl Drop for HttpServer {
347+
fn drop(&mut self) {
348+
self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
349+
// Make a connection to unblock the listener's accept() call
350+
let _ = std::net::TcpStream::connect(self.address);
351+
if let Some(handler) = self.handler.take() {
352+
let _ = handler.join();
353+
}
354+
}
355+
}
356+
336357
/// Body of HTTP response messages.
337358
pub enum MessageBody<T: ToString> {
338359
Empty,
@@ -409,39 +430,75 @@ pub(crate) mod client_tests {
409430
let shutdown_signaled = std::sync::Arc::clone(&shutdown);
410431
let handler = std::thread::spawn(move || {
411432
for stream in listener.incoming() {
412-
let mut stream = stream.unwrap();
433+
if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
434+
return;
435+
}
436+
437+
let stream = stream.unwrap();
413438
stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
439+
stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
414440

415-
let lines_read = std::io::BufReader::new(&stream)
416-
.lines()
417-
.take_while(|line| !line.as_ref().unwrap().is_empty())
418-
.count();
419-
if lines_read == 0 {
420-
continue;
421-
}
441+
let mut reader = std::io::BufReader::new(stream);
422442

423-
for chunk in response.as_bytes().chunks(16) {
443+
// Handle multiple requests on the same connection (keep-alive)
444+
loop {
424445
if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
425446
return;
426-
} else {
427-
if let Err(_) = stream.write(chunk) {
447+
}
448+
449+
// Read request headers
450+
let mut lines_read = 0;
451+
let mut content_length: usize = 0;
452+
loop {
453+
let mut line = String::new();
454+
match reader.read_line(&mut line) {
455+
Ok(0) => break, // eof
456+
Ok(_) => {
457+
if line == "\r\n" || line == "\n" {
458+
break; // end of headers
459+
}
460+
// Parse content_length for POST body handling
461+
if let Some(value) = line.strip_prefix("Content-Length:") {
462+
content_length = value.trim().parse().unwrap_or(0);
463+
}
464+
lines_read += 1;
465+
},
466+
Err(_) => break, // Read error or timeout
467+
}
468+
}
469+
470+
if lines_read == 0 {
471+
break; // No request received, connection closed
472+
}
473+
474+
// Consume request body if present (needed for POST keep-alive)
475+
if content_length > 0 {
476+
let mut body = vec![0u8; content_length];
477+
if reader.read_exact(&mut body).is_err() {
428478
break;
429479
}
430-
if let Err(_) = stream.flush() {
480+
}
481+
482+
// Send response
483+
let stream = reader.get_mut();
484+
let mut write_error = false;
485+
for chunk in response.as_bytes().chunks(16) {
486+
if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
487+
return;
488+
}
489+
if stream.write(chunk).is_err() || stream.flush().is_err() {
490+
write_error = true;
431491
break;
432492
}
433493
}
494+
if write_error {
495+
break;
496+
}
434497
}
435498
}
436499
});
437500

438-
Self { address, handler, shutdown }
439-
}
440-
441-
#[allow(dead_code)]
442-
fn shutdown(self) {
443-
self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
444-
self.handler.join().unwrap();
501+
Self { address, handler: Some(handler), shutdown }
445502
}
446503

447504
pub fn endpoint(&self) -> HttpEndpoint {

0 commit comments

Comments
 (0)