Skip to content

Commit 9ecee7b

Browse files
committed
Add S3 timeout config to bottomless replicator and fix integration tests
- Add LIBSQL_BOTTOMLESS_S3_READ_TIMEOUT_SECS (default 5s) - Add LIBSQL_BOTTOMLESS_S3_CONNECT_TIMEOUT_SECS (default 5s) - Add LIBSQL_BOTTOMLESS_S3_OPERATION_ATTEMPT_TIMEOUT_SECS (default 10s) - Configure TimeoutConfig on aws_sdk_s3::Config in bottomless::replicator::Options::client_config() - Update meta_store.rs Options construction to include new timeout fields - Remove #[ignore] from network_partition test - Fix test fixtures: endpoint timing, image caching, mut minio
1 parent 1169364 commit 9ecee7b

9 files changed

Lines changed: 131 additions & 34 deletions

File tree

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ libsql-sqlite3/**.o.tmp
1212

1313
/bindings/c/generated
1414
/bindings/c/**.xcframework
15-
/bindings/**/.DS_Store
15+
/bindings/**/.DS_Store

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottomless/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ anyhow = "1.0.66"
1313
async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] }
1414
aws-config = { version = "1" }
1515
aws-sdk-s3 = { version = "1" }
16+
aws-smithy-types = { version = "1" }
1617
bytes = "1"
1718
libsql-sys = { path = "../libsql-sys" }
1819
libsql_replication = { path = "../libsql-replication" }

bottomless/src/replicator.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption;
88
use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
99
use aws_config::BehaviorVersion;
1010
use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider};
11+
use aws_smithy_types::timeout::TimeoutConfig;
1112
use aws_sdk_s3::error::SdkError;
1213
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
1314
use aws_sdk_s3::operation::get_object::GetObjectError;
@@ -121,6 +122,12 @@ pub struct Options {
121122
pub s3_max_parallelism: usize,
122123
/// Max number of retries for S3 operations
123124
pub s3_max_retries: u32,
125+
/// Timeout for reading the first byte of an S3 response (seconds)
126+
pub s3_read_timeout_secs: u64,
127+
/// Timeout for establishing a TCP connection to S3 (seconds)
128+
pub s3_connect_timeout_secs: u64,
129+
/// Timeout for a single S3 operation attempt, including retries (seconds)
130+
pub s3_operation_attempt_timeout_secs: u64,
124131
/// Skip snapshot upload per checkpoint.
125132
pub skip_snapshot: bool,
126133
/// Skip uploading snapshots on shutdown
@@ -145,6 +152,11 @@ impl Options {
145152
"LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY was not set"
146153
))?;
147154
let session_token: Option<String> = self.session_token.clone();
155+
let timeout_config = TimeoutConfig::builder()
156+
.read_timeout(Duration::from_secs(self.s3_read_timeout_secs))
157+
.connect_timeout(Duration::from_secs(self.s3_connect_timeout_secs))
158+
.operation_attempt_timeout(Duration::from_secs(self.s3_operation_attempt_timeout_secs))
159+
.build();
148160
let conf = loader
149161
.behavior_version(BehaviorVersion::latest())
150162
.region(Region::new(region))
@@ -159,6 +171,7 @@ impl Options {
159171
aws_sdk_s3::config::retry::RetryConfig::standard()
160172
.with_max_attempts(self.s3_max_retries),
161173
)
174+
.timeout_config(timeout_config)
162175
.build();
163176

164177
let s3_config = aws_sdk_s3::config::Builder::from(&conf)
@@ -233,6 +246,12 @@ impl Options {
233246
),
234247
};
235248
let s3_max_retries = env_var_or("LIBSQL_BOTTOMLESS_S3_MAX_RETRIES", 10).parse::<u32>()?;
249+
let s3_read_timeout_secs =
250+
env_var_or("LIBSQL_BOTTOMLESS_S3_READ_TIMEOUT_SECS", 5).parse::<u64>()?;
251+
let s3_connect_timeout_secs =
252+
env_var_or("LIBSQL_BOTTOMLESS_S3_CONNECT_TIMEOUT_SECS", 5).parse::<u64>()?;
253+
let s3_operation_attempt_timeout_secs =
254+
env_var_or("LIBSQL_BOTTOMLESS_S3_OPERATION_ATTEMPT_TIMEOUT_SECS", 10).parse::<u64>()?;
236255
let cipher = match encryption_cipher {
237256
Some(cipher) => Cipher::from_str(&cipher)?,
238257
None => Cipher::default(),
@@ -261,6 +280,9 @@ impl Options {
261280
region,
262281
bucket_name,
263282
s3_max_retries,
283+
s3_read_timeout_secs,
284+
s3_connect_timeout_secs,
285+
s3_operation_attempt_timeout_secs,
264286
skip_snapshot,
265287
skip_shutdown_upload,
266288
})

libsql-server/src/namespace/meta_store.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ pub async fn metastore_connection_maker(
129129
max_batch_interval: config.backup_interval,
130130
s3_max_parallelism: 32,
131131
s3_max_retries: 10,
132+
s3_read_timeout_secs: 5,
133+
s3_connect_timeout_secs: 5,
134+
s3_operation_attempt_timeout_secs: 10,
132135
skip_snapshot: false,
133136
skip_shutdown_upload: false,
134137
};

libsql-server/tests/bottomless/basic_restore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ async fn test_basic_restore() {
3838
.expect("Failed to cleanup dbs dir");
3939

4040
// Phase 3: Start sqld - should restore from minio
41-
let endpoint2 = sqld.http_endpoint();
4241
sqld.start(data_dir.path())
4342
.await
4443
.expect("Failed to restart sqld");
44+
let endpoint2 = sqld.http_endpoint();
4545
sqld.wait_for_ready(Duration::from_secs(60))
4646
.await
4747
.expect("sqld did not become ready after restore");

libsql-server/tests/bottomless/fixtures.rs

Lines changed: 98 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,73 @@
1-
use std::path::Path;
2-
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
1+
use std::path::{Path, PathBuf};
2+
use std::sync::atomic::{AtomicU64, Ordering};
3+
use std::sync::OnceLock;
34
use std::time::Duration;
45

5-
static PORT_COUNTER: AtomicU16 = AtomicU16::new(0);
66
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
7+
static TEST_IMAGE_TAG: OnceLock<String> = OnceLock::new();
8+
9+
fn build_test_image() -> String {
10+
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
11+
let repo_root = manifest_dir
12+
.parent()
13+
.expect("CARGO_MANIFEST_DIR should have a parent");
14+
let tag = "libsql-server:test".to_string();
15+
16+
// Check if image already exists
17+
let check_output = std::process::Command::new("docker")
18+
.args(["images", "-q", &tag])
19+
.output()
20+
.expect("failed to run docker images");
21+
22+
if !check_output.stdout.is_empty() {
23+
return tag;
24+
}
25+
26+
let output = std::process::Command::new("docker")
27+
.arg("build")
28+
.arg("-t")
29+
.arg(&tag)
30+
.arg("-f")
31+
.arg(repo_root.join("Dockerfile"))
32+
.arg(repo_root)
33+
.output()
34+
.expect("failed to run docker build");
35+
36+
if !output.status.success() {
37+
panic!(
38+
"docker build failed: {}",
39+
String::from_utf8_lossy(&output.stderr)
40+
);
41+
}
42+
43+
tag
44+
}
745

8-
fn next_port() -> u16 {
9-
let counter = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
10-
20000 + (counter % 10000)
46+
fn get_test_image() -> &'static str {
47+
TEST_IMAGE_TAG.get_or_init(|| build_test_image())
48+
}
49+
50+
async fn docker_host_port(container_name: &str, container_port: u16) -> anyhow::Result<u16> {
51+
let output = tokio::process::Command::new("docker")
52+
.args(["port", container_name, &format!("{}", container_port)])
53+
.output()
54+
.await?;
55+
if !output.status.success() {
56+
anyhow::bail!(
57+
"docker port failed: {}",
58+
String::from_utf8_lossy(&output.stderr)
59+
);
60+
}
61+
let line = String::from_utf8_lossy(&output.stdout);
62+
// Format: "0.0.0.0:49153"
63+
let port = line
64+
.trim()
65+
.split(':')
66+
.last()
67+
.ok_or_else(|| anyhow::anyhow!("unexpected docker port output: {}", line))?
68+
.parse::<u16>()
69+
.map_err(|e| anyhow::anyhow!("failed to parse port from '{}': {}", line, e))?;
70+
Ok(port)
1171
}
1272

1373
fn unique_id() -> String {
@@ -31,8 +91,6 @@ pub struct MinioFixture {
3191
impl MinioFixture {
3292
pub async fn start() -> anyhow::Result<Self> {
3393
let uid = unique_id();
34-
let api_port = next_port();
35-
let console_port = next_port();
3694
let container_name = format!("minio-test-{}", uid);
3795
let network_name = format!("sqld-net-{}", uid);
3896

@@ -48,7 +106,7 @@ impl MinioFixture {
48106
);
49107
}
50108

51-
// Start minio container
109+
// Start minio container with random host ports
52110
let run_output = tokio::process::Command::new("docker")
53111
.args([
54112
"run",
@@ -58,9 +116,9 @@ impl MinioFixture {
58116
"--network",
59117
&network_name,
60118
"-p",
61-
&format!("{}:9000", api_port),
119+
":9000",
62120
"-p",
63-
&format!("{}:9001", console_port),
121+
":9001",
64122
"-e",
65123
"MINIO_ROOT_USER=minioadmin",
66124
"-e",
@@ -85,6 +143,10 @@ impl MinioFixture {
85143
);
86144
}
87145

146+
// Discover dynamically assigned host ports
147+
let api_port = docker_host_port(&container_name, 9000).await?;
148+
let console_port = docker_host_port(&container_name, 9001).await?;
149+
88150
// Wait for minio to be ready
89151
let client = reqwest::Client::new();
90152
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
@@ -146,10 +208,6 @@ impl MinioFixture {
146208
})
147209
}
148210

149-
pub fn endpoint(&self) -> String {
150-
format!("http://127.0.0.1:{}", self.api_port)
151-
}
152-
153211
pub fn internal_endpoint(&self) -> String {
154212
format!("http://{}:9000", self.container_name)
155213
}
@@ -168,7 +226,7 @@ impl MinioFixture {
168226
Ok(())
169227
}
170228

171-
pub async fn restart(&self) -> anyhow::Result<()> {
229+
pub async fn restart(&mut self) -> anyhow::Result<()> {
172230
let output = tokio::process::Command::new("docker")
173231
.args(["start", &self.container_name])
174232
.output()
@@ -179,6 +237,9 @@ impl MinioFixture {
179237
String::from_utf8_lossy(&output.stderr)
180238
);
181239
}
240+
// Re-discover host ports after restart
241+
self.api_port = docker_host_port(&self.container_name, 9000).await?;
242+
self.console_port = docker_host_port(&self.container_name, 9001).await?;
182243
// Wait for minio to be ready
183244
let client = reqwest::Client::new();
184245
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
@@ -208,18 +269,19 @@ impl MinioFixture {
208269
}
209270
}
210271

211-
pub struct SqldFixture<'a> {
212-
minio: &'a MinioFixture,
272+
pub struct SqldFixture {
273+
network_name: String,
274+
internal_endpoint: String,
213275
http_port: u16,
214276
pub container_name: String,
215277
}
216278

217-
impl<'a> SqldFixture<'a> {
218-
pub fn new(minio: &'a MinioFixture) -> Self {
219-
let http_port = next_port();
279+
impl SqldFixture {
280+
pub fn new(minio: &MinioFixture) -> Self {
220281
Self {
221-
minio,
222-
http_port,
282+
network_name: minio.network_name.clone(),
283+
internal_endpoint: minio.internal_endpoint(),
284+
http_port: 0,
223285
container_name: format!("sqld-test-{}", unique_id()),
224286
}
225287
}
@@ -232,7 +294,6 @@ impl<'a> SqldFixture<'a> {
232294
.await;
233295

234296
let data_dir_str = data_dir.to_str().unwrap();
235-
let network_name = &self.minio.network_name;
236297

237298
let run_output = tokio::process::Command::new("docker")
238299
.args([
@@ -241,11 +302,11 @@ impl<'a> SqldFixture<'a> {
241302
"--name",
242303
&self.container_name,
243304
"--network",
244-
network_name,
305+
&self.network_name,
245306
"-p",
246-
&format!("{}:8080", self.http_port),
307+
":8080",
247308
"-e",
248-
&format!("LIBSQL_BOTTOMLESS_ENDPOINT={}", self.minio.internal_endpoint()),
309+
&format!("LIBSQL_BOTTOMLESS_ENDPOINT={}", self.internal_endpoint),
249310
"-e",
250311
"LIBSQL_BOTTOMLESS_BUCKET=bottomless",
251312
"-e",
@@ -258,9 +319,15 @@ impl<'a> SqldFixture<'a> {
258319
"SQLD_ENABLE_BOTTOMLESS_REPLICATION=true",
259320
"-e",
260321
"SQLD_DB_PATH=/var/lib/sqld",
322+
"-e",
323+
"LIBSQL_BOTTOMLESS_S3_READ_TIMEOUT_SECS=5",
324+
"-e",
325+
"LIBSQL_BOTTOMLESS_S3_CONNECT_TIMEOUT_SECS=5",
326+
"-e",
327+
"LIBSQL_BOTTOMLESS_S3_OPERATION_ATTEMPT_TIMEOUT_SECS=10",
261328
"-v",
262329
&format!("{}:/var/lib/sqld", data_dir_str),
263-
"ghcr.io/tursodatabase/libsql-server:latest",
330+
get_test_image(),
264331
])
265332
.output()
266333
.await?;
@@ -272,6 +339,8 @@ impl<'a> SqldFixture<'a> {
272339
);
273340
}
274341

342+
self.http_port = docker_host_port(&self.container_name, 8080).await?;
343+
275344
Ok(())
276345
}
277346

@@ -314,6 +383,7 @@ impl<'a> SqldFixture<'a> {
314383
String::from_utf8_lossy(&output.stderr)
315384
);
316385
}
386+
self.http_port = docker_host_port(&self.container_name, 8080).await?;
317387
Ok(())
318388
}
319389

libsql-server/tests/bottomless/minio_interrupted.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::Duration;
55
async fn test_restore_completes_after_minio_killed() {
66
let _ = tracing_subscriber::fmt::try_init();
77

8-
let minio = MinioFixture::start()
8+
let mut minio = MinioFixture::start()
99
.await
1010
.expect("Failed to start minio");
1111

libsql-server/tests/bottomless/sqld_interrupted.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ async fn test_restore_completes_after_sqld_killed() {
5353
sqld.kill().await.expect("Failed to kill sqld");
5454

5555
// Phase 4: Restart sqld - must complete restore
56-
let endpoint2 = sqld.http_endpoint();
5756
sqld.restart().await.expect("Failed to restart sqld");
57+
let endpoint2 = sqld.http_endpoint();
5858
sqld.wait_for_ready(Duration::from_secs(60))
5959
.await
6060
.expect("sqld did not become ready after interrupted restore");

0 commit comments

Comments
 (0)