Skip to content

Commit c11c38b

Browse files
losfairclaude
andcommitted
fix: replace thread-local commit-group result with Arc<OnceLock> slot
The previous commit used thread-local storage to pass the commit group result to participating connections. This is incorrect because connections may migrate between threads when no commit group is active. Replace the thread-local with an Arc<OnceLock<CommitGroupResultData>> shared slot owned by the CommitGroup. Each connection receives its own Arc clone from append_intent() and stores it as a field. commit() writes the result into the shared slot. On the next lock() call, the connection reads its slot, applies the changelog, advances last_known_write_version, and drops the Arc. Use .as_ref() rather than .take() when probing the slot so that it survives unpopulated reads — if lock() fires between finalize and mv_commit_group_commit, the slot must remain for later consumption. Also adds unit tests for CommitGroupResultSlot and bumps version to 0.3.7. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent bb8b420 commit c11c38b

9 files changed

Lines changed: 110 additions & 64 deletions

File tree

Cargo.lock

Lines changed: 6 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mvclient/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mvclient"
3-
version = "0.3.6"
3+
version = "0.3.7"
44
edition = "2021"
55
license = "Apache-2.0"
66
authors = ["Heyang Zhou <heyang.zhou@icloud.com>"]

mvfs/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mvfs"
3-
version = "0.3.6"
3+
version = "0.3.7"
44
edition = "2021"
55
license = "Apache-2.0"
66
authors = ["Heyang Zhou <heyang.zhou@icloud.com>"]
@@ -15,7 +15,7 @@ anyhow = "1"
1515
thiserror = "2"
1616
futures = "0.3"
1717
tokio = { version = "1", features = ["full"] }
18-
mvclient = { path = "../mvclient", version = "0.3.6" }
18+
mvclient = { path = "../mvclient", version = "0.3.7" }
1919
tracing = "0.1"
2020
serde = { version = "1", features = ["derive"] }
2121
serde_json = "1"

mvfs/src/commit_group.rs

Lines changed: 75 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
collections::HashMap,
44
future::Future,
55
pin::Pin,
6-
sync::Arc,
6+
sync::{Arc, OnceLock},
77
};
88

99
use anyhow::{bail, Context, Result};
@@ -15,6 +15,7 @@ pub struct CommitGroup {
1515
intents: Vec<NamespaceCommitIntent>,
1616
client: Option<Arc<MultiVersionClient>>,
1717
dp: Option<Url>,
18+
result_slot: Arc<CommitGroupResultSlot>,
1819
}
1920

2021
impl Default for CommitGroup {
@@ -23,6 +24,9 @@ impl Default for CommitGroup {
2324
intents: Vec::new(),
2425
client: None,
2526
dp: None,
27+
result_slot: Arc::new(CommitGroupResultSlot {
28+
inner: OnceLock::new(),
29+
}),
2630
}
2731
}
2832
}
@@ -38,22 +42,34 @@ pub enum CommitOutput {
3842
Conflict,
3943
}
4044

41-
/// Stores the result of the last successful commit group so that participating
42-
/// connections can update their page caches and `last_known_write_version` when
43-
/// they next acquire a lock.
44-
struct CommitGroupResult {
45+
/// Shared slot that `commit()` writes into and each participating
46+
/// `Connection` reads from on its next `lock()` call. Thread-safe
47+
/// because connections may migrate threads after the commit group ends.
48+
pub struct CommitGroupResultSlot {
49+
inner: OnceLock<CommitGroupResultData>,
50+
}
51+
52+
struct CommitGroupResultData {
4553
version: String,
46-
/// Namespace keys that staged intents in this commit group.
47-
ns_keys: Vec<String>,
4854
/// Per-namespace changelog of pages modified by *concurrent* transactions
4955
/// between each namespace's assumed version and the committed version.
5056
/// A missing entry means the interval was too large to compute.
5157
changelog: HashMap<String, Vec<u32>>,
5258
}
5359

60+
impl CommitGroupResultSlot {
61+
/// Returns the committed version and the changelog entry for `ns_key`.
62+
/// `changelog` is `None` when the server could not compute the interval;
63+
/// the caller should do a full cache invalidation in that case.
64+
pub fn get_for_ns(&self, ns_key: &str) -> Option<(String, Option<Vec<u32>>)> {
65+
self.inner.get().map(|data| {
66+
(data.version.clone(), data.changelog.get(ns_key).cloned())
67+
})
68+
}
69+
}
70+
5471
thread_local! {
5572
static CURRENT_COMMIT_GROUP: RefCell<Option<CommitGroup>> = RefCell::new(None);
56-
static LAST_COMMIT_GROUP_RESULT: RefCell<Option<CommitGroupResult>> = RefCell::new(None);
5773
}
5874

5975
pub fn begin() -> Result<()> {
@@ -64,11 +80,7 @@ pub fn begin() -> Result<()> {
6480
}
6581
*cg = Some(CommitGroup::default());
6682
Ok(())
67-
})?;
68-
LAST_COMMIT_GROUP_RESULT.with(|r| {
69-
*r.borrow_mut() = None;
70-
});
71-
Ok(())
83+
})
7284
}
7385

7486
pub fn is_active() -> bool {
@@ -90,7 +102,7 @@ pub fn append_intent(
90102
client: &Arc<MultiVersionClient>,
91103
dp: Option<&Url>,
92104
intent: NamespaceCommitIntent,
93-
) -> Result<()> {
105+
) -> Result<Arc<CommitGroupResultSlot>> {
94106
CURRENT_COMMIT_GROUP.with(|cg| {
95107
let mut cg = cg.borrow_mut();
96108
let cg = cg
@@ -104,8 +116,9 @@ pub fn append_intent(
104116
cg.dp = dp.cloned();
105117
}
106118

119+
let slot = cg.result_slot.clone();
107120
cg.intents.push(intent);
108-
Ok(())
121+
Ok(slot)
109122
})
110123
}
111124

@@ -121,7 +134,7 @@ pub fn commit(
121134
return Ok(CommitOutput::Empty);
122135
}
123136

124-
let ns_keys: Vec<String> = cg.intents.iter().map(|i| i.init.ns_key.clone()).collect();
137+
let result_slot = cg.result_slot.clone();
125138

126139
let client = cg
127140
.client
@@ -133,12 +146,9 @@ pub fn commit(
133146

134147
Ok(match result {
135148
Some(result) => {
136-
LAST_COMMIT_GROUP_RESULT.with(|r| {
137-
*r.borrow_mut() = Some(CommitGroupResult {
138-
version: result.version.clone(),
139-
ns_keys,
140-
changelog: result.changelog.clone(),
141-
});
149+
let _ = result_slot.inner.set(CommitGroupResultData {
150+
version: result.version.clone(),
151+
changelog: result.changelog.clone(),
142152
});
143153
CommitOutput::Committed(result)
144154
}
@@ -152,24 +162,6 @@ pub fn rollback() -> Result<()> {
152162
Ok(())
153163
}
154164

155-
/// If a commit group recently succeeded and `ns_key` participated in it,
156-
/// returns `(committed_version, changelog)`. `changelog` is `None` when
157-
/// the server could not compute the interval (too many concurrent changes);
158-
/// in that case the caller should do a full cache invalidation.
159-
pub fn take_commit_group_result_for_ns(ns_key: &str) -> Option<(String, Option<Vec<u32>>)> {
160-
LAST_COMMIT_GROUP_RESULT.with(|r| {
161-
let r = r.borrow();
162-
r.as_ref().and_then(|result| {
163-
if result.ns_keys.iter().any(|k| k == ns_key) {
164-
let changelog = result.changelog.get(ns_key).cloned();
165-
Some((result.version.clone(), changelog))
166-
} else {
167-
None
168-
}
169-
})
170-
})
171-
}
172-
173165
#[cfg(test)]
174166
mod tests {
175167
use super::*;
@@ -233,4 +225,47 @@ mod tests {
233225

234226
rollback().unwrap();
235227
}
228+
229+
#[test]
230+
fn result_slot_empty_before_commit() {
231+
let slot = Arc::new(CommitGroupResultSlot {
232+
inner: OnceLock::new(),
233+
});
234+
assert!(slot.get_for_ns("test").is_none());
235+
}
236+
237+
#[test]
238+
fn result_slot_returns_changelog_after_population() {
239+
let slot = Arc::new(CommitGroupResultSlot {
240+
inner: OnceLock::new(),
241+
});
242+
243+
let _ = slot.inner.set(CommitGroupResultData {
244+
version: "abc123".into(),
245+
changelog: HashMap::from([("ns1".into(), vec![1, 2, 3])]),
246+
});
247+
248+
// Participating namespace gets version + changelog
249+
let (version, changelog) = slot.get_for_ns("ns1").unwrap();
250+
assert_eq!(version, "abc123");
251+
assert_eq!(changelog, Some(vec![1, 2, 3]));
252+
253+
// Non-participating namespace gets version but no changelog entry
254+
let (version, changelog) = slot.get_for_ns("ns2").unwrap();
255+
assert_eq!(version, "abc123");
256+
assert!(changelog.is_none());
257+
}
258+
259+
#[test]
260+
fn append_intent_returns_shared_slot() {
261+
begin().unwrap();
262+
263+
let slot1 = append_intent(&test_client(), None, test_intent()).unwrap();
264+
let slot2 = append_intent(&test_client(), None, test_intent()).unwrap();
265+
266+
// Both intents share the same slot
267+
assert!(Arc::ptr_eq(&slot1, &slot2));
268+
269+
rollback().unwrap();
270+
}
236271
}

mvfs/src/vfs.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ impl MultiVersionVfs {
137137
write_buffer: HashMap::new(),
138138
virtual_version_counter: 0,
139139
last_known_write_version: None,
140+
pending_commit_group_result: None,
140141
};
141142
Ok(conn)
142143
}
@@ -163,6 +164,11 @@ pub struct Connection {
163164
virtual_version_counter: u32,
164165

165166
last_known_write_version: Option<String>,
167+
168+
/// Shared slot populated by `commit_group::commit()` after a successful
169+
/// commit group. Consumed on the next `lock()` call to apply the
170+
/// changelog and advance `last_known_write_version`.
171+
pending_commit_group_result: Option<Arc<commit_group::CommitGroupResultSlot>>,
166172
}
167173

168174
#[derive(Default)]
@@ -448,8 +454,10 @@ impl Connection {
448454
}
449455

450456
if let Some(intent) = intent {
451-
commit_group::append_intent(&self.client, self.dp.as_ref(), intent)
452-
.expect("failed to append to commit group");
457+
let result_slot =
458+
commit_group::append_intent(&self.client, self.dp.as_ref(), intent)
459+
.expect("failed to append to commit group");
460+
self.pending_commit_group_result = Some(result_slot);
453461
tracing::info!("added intent to commit group");
454462
} else {
455463
tracing::info!("transaction is empty (commit group)");
@@ -636,11 +644,14 @@ impl Connection {
636644
// participated, apply the changelog and advance
637645
// last_known_write_version so the subsequent interval request
638646
// covers only changes *after* the commit group.
639-
if let Some((committed_version, changelog)) =
640-
commit_group::take_commit_group_result_for_ns(
641-
self.client.config().ns_key.as_str(),
642-
)
647+
if let Some((committed_version, changelog)) = self
648+
.pending_commit_group_result
649+
.as_ref()
650+
.and_then(|slot| {
651+
slot.get_for_ns(self.client.config().ns_key.as_str())
652+
})
643653
{
654+
self.pending_commit_group_result = None;
644655
match changelog {
645656
Some(pages) => {
646657
if !pages.is_empty() {

mvsqlite-fuse/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mvsqlite-fuse"
3-
version = "0.3.6"
3+
version = "0.3.7"
44
edition = "2021"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -11,7 +11,7 @@ thiserror = "2"
1111
tokio = { version = "1", features = ["full"] }
1212
log = "0.4"
1313
rand = "0.9"
14-
mvclient = { path = "../mvclient", version = "0.3.6" }
14+
mvclient = { path = "../mvclient", version = "0.3.7" }
1515
tracing = "0.1"
1616
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt", "tracing-log", "json"] }
1717
libc = "0.2"
@@ -20,7 +20,7 @@ serde = { version = "1", features = ["derive"] }
2020
serde_json = "1"
2121
lazy_static = "1.4.0"
2222
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
23-
mvfs = { path = "../mvfs", version = "0.3.6" }
23+
mvfs = { path = "../mvfs", version = "0.3.7" }
2424
fuser = "0.14"
2525
structopt = "0.3.26"
2626
indexmap = "2"

mvsqlite/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mvsqlite"
3-
version = "0.3.6"
3+
version = "0.3.7"
44
edition = "2021"
55
license = "Apache-2.0"
66
authors = ["Heyang Zhou <heyang.zhou@icloud.com>"]
@@ -17,7 +17,7 @@ tokio = { version = "1", features = ["full"] }
1717
log = "0.4"
1818
rand = "0.9"
1919
stackful = "0.1.5"
20-
mvclient = { path = "../mvclient", version = "0.3.6" }
20+
mvclient = { path = "../mvclient", version = "0.3.7" }
2121
tracing = "0.1"
2222
ctor = "0.2"
2323
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt", "json"], optional = true }
@@ -27,7 +27,7 @@ serde = { version = "1", features = ["derive"] }
2727
serde_json = "1"
2828
lazy_static = "1.4.0"
2929
reqwest = { version = "0.12", default-features = false, features = ["http2"] }
30-
mvfs = { path = "../mvfs", version = "0.3.6" }
30+
mvfs = { path = "../mvfs", version = "0.3.7" }
3131

3232
[features]
3333
default = ["loadext", "syscall", "rustls-tls", "global-init"]

mvstore-stress/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
[package]
22
name = "mvstore-stress"
3-
version = "0.3.6"
3+
version = "0.3.7"
44
edition = "2021"
55

66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9-
mvclient = { path = "../mvclient", version = "0.3.6" }
9+
mvclient = { path = "../mvclient", version = "0.3.7" }
1010
anyhow = "1"
1111
thiserror = "2"
1212
tokio = { version = "1", features = ["full"] }

0 commit comments

Comments
 (0)