@@ -3,7 +3,7 @@ use std::{
33 collections:: HashMap ,
44 future:: Future ,
55 pin:: Pin ,
6- sync:: Arc ,
6+ sync:: { Arc , OnceLock } ,
77} ;
88
99use anyhow:: { bail, Context , Result } ;
@@ -15,6 +15,7 @@ pub struct CommitGroup {
1515 intents : Vec < NamespaceCommitIntent > ,
1616 client : Option < Arc < MultiVersionClient > > ,
1717 dp : Option < Url > ,
18+ result_slot : Arc < CommitGroupResultSlot > ,
1819}
1920
2021impl Default for CommitGroup {
@@ -23,6 +24,9 @@ impl Default for CommitGroup {
2324 intents : Vec :: new ( ) ,
2425 client : None ,
2526 dp : None ,
27+ result_slot : Arc :: new ( CommitGroupResultSlot {
28+ inner : OnceLock :: new ( ) ,
29+ } ) ,
2630 }
2731 }
2832}
@@ -38,22 +42,34 @@ pub enum CommitOutput {
3842 Conflict ,
3943}
4044
41- /// Stores the result of the last successful commit group so that participating
42- /// connections can update their page caches and `last_known_write_version` when
43- /// they next acquire a lock.
44- struct CommitGroupResult {
45+ /// Shared slot that `commit()` writes into and each participating
46+ /// `Connection` reads from on its next `lock()` call. Thread-safe
47+ /// because connections may migrate threads after the commit group ends.
48+ pub struct CommitGroupResultSlot {
49+ inner : OnceLock < CommitGroupResultData > ,
50+ }
51+
52+ struct CommitGroupResultData {
4553 version : String ,
46- /// Namespace keys that staged intents in this commit group.
47- ns_keys : Vec < String > ,
4854 /// Per-namespace changelog of pages modified by *concurrent* transactions
4955 /// between each namespace's assumed version and the committed version.
5056 /// A missing entry means the interval was too large to compute.
5157 changelog : HashMap < String , Vec < u32 > > ,
5258}
5359
60+ impl CommitGroupResultSlot {
61+ /// Returns the committed version and the changelog entry for `ns_key`.
62+ /// `changelog` is `None` when the server could not compute the interval;
63+ /// the caller should do a full cache invalidation in that case.
64+ pub fn get_for_ns ( & self , ns_key : & str ) -> Option < ( String , Option < Vec < u32 > > ) > {
65+ self . inner . get ( ) . map ( |data| {
66+ ( data. version . clone ( ) , data. changelog . get ( ns_key) . cloned ( ) )
67+ } )
68+ }
69+ }
70+
5471thread_local ! {
5572 static CURRENT_COMMIT_GROUP : RefCell <Option <CommitGroup >> = RefCell :: new( None ) ;
56- static LAST_COMMIT_GROUP_RESULT : RefCell <Option <CommitGroupResult >> = RefCell :: new( None ) ;
5773}
5874
5975pub fn begin ( ) -> Result < ( ) > {
@@ -64,11 +80,7 @@ pub fn begin() -> Result<()> {
6480 }
6581 * cg = Some ( CommitGroup :: default ( ) ) ;
6682 Ok ( ( ) )
67- } ) ?;
68- LAST_COMMIT_GROUP_RESULT . with ( |r| {
69- * r. borrow_mut ( ) = None ;
70- } ) ;
71- Ok ( ( ) )
83+ } )
7284}
7385
7486pub fn is_active ( ) -> bool {
@@ -90,7 +102,7 @@ pub fn append_intent(
90102 client : & Arc < MultiVersionClient > ,
91103 dp : Option < & Url > ,
92104 intent : NamespaceCommitIntent ,
93- ) -> Result < ( ) > {
105+ ) -> Result < Arc < CommitGroupResultSlot > > {
94106 CURRENT_COMMIT_GROUP . with ( |cg| {
95107 let mut cg = cg. borrow_mut ( ) ;
96108 let cg = cg
@@ -104,8 +116,9 @@ pub fn append_intent(
104116 cg. dp = dp. cloned ( ) ;
105117 }
106118
119+ let slot = cg. result_slot . clone ( ) ;
107120 cg. intents . push ( intent) ;
108- Ok ( ( ) )
121+ Ok ( slot )
109122 } )
110123}
111124
@@ -121,7 +134,7 @@ pub fn commit(
121134 return Ok ( CommitOutput :: Empty ) ;
122135 }
123136
124- let ns_keys : Vec < String > = cg. intents . iter ( ) . map ( |i| i . init . ns_key . clone ( ) ) . collect ( ) ;
137+ let result_slot = cg. result_slot . clone ( ) ;
125138
126139 let client = cg
127140 . client
@@ -133,12 +146,9 @@ pub fn commit(
133146
134147 Ok ( match result {
135148 Some ( result) => {
136- LAST_COMMIT_GROUP_RESULT . with ( |r| {
137- * r. borrow_mut ( ) = Some ( CommitGroupResult {
138- version : result. version . clone ( ) ,
139- ns_keys,
140- changelog : result. changelog . clone ( ) ,
141- } ) ;
149+ let _ = result_slot. inner . set ( CommitGroupResultData {
150+ version : result. version . clone ( ) ,
151+ changelog : result. changelog . clone ( ) ,
142152 } ) ;
143153 CommitOutput :: Committed ( result)
144154 }
@@ -152,24 +162,6 @@ pub fn rollback() -> Result<()> {
152162 Ok ( ( ) )
153163}
154164
155- /// If a commit group recently succeeded and `ns_key` participated in it,
156- /// returns `(committed_version, changelog)`. `changelog` is `None` when
157- /// the server could not compute the interval (too many concurrent changes);
158- /// in that case the caller should do a full cache invalidation.
159- pub fn take_commit_group_result_for_ns ( ns_key : & str ) -> Option < ( String , Option < Vec < u32 > > ) > {
160- LAST_COMMIT_GROUP_RESULT . with ( |r| {
161- let r = r. borrow ( ) ;
162- r. as_ref ( ) . and_then ( |result| {
163- if result. ns_keys . iter ( ) . any ( |k| k == ns_key) {
164- let changelog = result. changelog . get ( ns_key) . cloned ( ) ;
165- Some ( ( result. version . clone ( ) , changelog) )
166- } else {
167- None
168- }
169- } )
170- } )
171- }
172-
173165#[ cfg( test) ]
174166mod tests {
175167 use super :: * ;
@@ -233,4 +225,47 @@ mod tests {
233225
234226 rollback ( ) . unwrap ( ) ;
235227 }
228+
229+ #[ test]
230+ fn result_slot_empty_before_commit ( ) {
231+ let slot = Arc :: new ( CommitGroupResultSlot {
232+ inner : OnceLock :: new ( ) ,
233+ } ) ;
234+ assert ! ( slot. get_for_ns( "test" ) . is_none( ) ) ;
235+ }
236+
237+ #[ test]
238+ fn result_slot_returns_changelog_after_population ( ) {
239+ let slot = Arc :: new ( CommitGroupResultSlot {
240+ inner : OnceLock :: new ( ) ,
241+ } ) ;
242+
243+ let _ = slot. inner . set ( CommitGroupResultData {
244+ version : "abc123" . into ( ) ,
245+ changelog : HashMap :: from ( [ ( "ns1" . into ( ) , vec ! [ 1 , 2 , 3 ] ) ] ) ,
246+ } ) ;
247+
248+ // Participating namespace gets version + changelog
249+ let ( version, changelog) = slot. get_for_ns ( "ns1" ) . unwrap ( ) ;
250+ assert_eq ! ( version, "abc123" ) ;
251+ assert_eq ! ( changelog, Some ( vec![ 1 , 2 , 3 ] ) ) ;
252+
253+ // Non-participating namespace gets version but no changelog entry
254+ let ( version, changelog) = slot. get_for_ns ( "ns2" ) . unwrap ( ) ;
255+ assert_eq ! ( version, "abc123" ) ;
256+ assert ! ( changelog. is_none( ) ) ;
257+ }
258+
259+ #[ test]
260+ fn append_intent_returns_shared_slot ( ) {
261+ begin ( ) . unwrap ( ) ;
262+
263+ let slot1 = append_intent ( & test_client ( ) , None , test_intent ( ) ) . unwrap ( ) ;
264+ let slot2 = append_intent ( & test_client ( ) , None , test_intent ( ) ) . unwrap ( ) ;
265+
266+ // Both intents share the same slot
267+ assert ! ( Arc :: ptr_eq( & slot1, & slot2) ) ;
268+
269+ rollback ( ) . unwrap ( ) ;
270+ }
236271}
0 commit comments