@@ -83,21 +83,46 @@ BigSegmentStoreWrapper::StoreMembership BigSegmentStoreWrapper::LoadMembership(
8383 return *query->result ;
8484 }
8585
86- // Query the store outside any lock, then publish the result to waiters and
87- // drop the in-flight entry.
86+ // Ensures the in-flight entry is removed and waiters are notified on every
87+ // leader exit, including throws. If the leader exits without completing,
88+ // waiters receive a sentinel error rather than blocking forever.
89+ struct QueryCleanup {
90+ std::mutex& load_mutex;
91+ std::unordered_map<std::string, std::shared_ptr<InFlightQuery>>&
92+ in_flight;
93+ std::string const & key;
94+ std::shared_ptr<InFlightQuery> query;
95+ bool completed = false ;
96+
97+ ~QueryCleanup () {
98+ {
99+ std::lock_guard lock (load_mutex);
100+ in_flight.erase (key);
101+ }
102+ if (!completed) {
103+ std::lock_guard lock (query->mutex );
104+ if (!query->result .has_value ()) {
105+ query->result = tl::make_unexpected (std::string (
106+ " Big Segment lookup leader exited without setting a "
107+ " result" ));
108+ }
109+ }
110+ query->cv .notify_all ();
111+ }
112+ };
113+ QueryCleanup cleanup{load_mutex_, in_flight_, context_key, query};
114+
115+ // Query the store outside any lock, then publish the result. The cleanup
116+ // drops the in-flight entry on return (or throw).
88117 auto result = store_->GetMembership (HashContextKey (context_key));
89118 if (result.has_value ()) {
90119 cache_.Set (context_key, *result);
91120 }
92- {
93- std::lock_guard lock (load_mutex_);
94- in_flight_.erase (context_key);
95- }
96121 {
97122 std::lock_guard lock (query->mutex );
98123 query->result = result;
124+ cleanup.completed = true ;
99125 }
100- query->cv .notify_all ();
101126 return result;
102127}
103128
0 commit comments