Skip to content

Commit 164e6b8

Browse files
jja725claude
andcommitted
perf: release GIL during I/O operations to prevent blocking
Addresses #21 - Release the Global Interpreter Lock during all blocking operations to allow Python threads to run concurrently. **Changes:** - Wrapped all `runtime.block_on()` calls in `py.allow_threads()` - Applies to: create(), add(), compact(), compaction_stats(), checkout(), search(), list() **Benefits:** - Python interpreter no longer freezes during operations - Background threads (heartbeats, UI) remain responsive - Critical for S3-backed stores (50-500ms+ latency) - Critical for long-running compaction operations **Pattern:** ```rust py.allow_threads(|| { self.runtime .block_on(async_operation()) .map_err(to_py_err) })? ``` This ensures concurrent Python execution while Rust performs expensive I/O and computation. All tests pass (19 passed, 2 skipped). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 58e17d0 commit 164e6b8

1 file changed

Lines changed: 50 additions & 26 deletions

File tree

python/src/lib.rs

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl Context {
113113
#[pyo3(signature = (uri, *, storage_options=None, compaction_config=None))]
114114
fn create(
115115
_cls: &Bound<'_, PyType>,
116+
py: Python<'_>,
116117
uri: &str,
117118
storage_options: Option<&Bound<'_, PyDict>>,
118119
compaction_config: Option<&Bound<'_, PyDict>>,
@@ -124,9 +125,13 @@ impl Context {
124125
compaction: compaction_config_from_dict(compaction_config)?,
125126
};
126127

127-
let store = runtime
128-
.block_on(ContextStore::open_with_options(uri, options))
129-
.map_err(to_py_err)?;
128+
// Release GIL during dataset initialization (may involve S3 operations)
129+
let store = py.allow_threads(|| {
130+
runtime
131+
.block_on(ContextStore::open_with_options(uri, options))
132+
.map_err(to_py_err)
133+
})?;
134+
130135
let run_id = new_run_id();
131136
Ok(Self {
132137
inner: RustContext::new(uri),
@@ -155,6 +160,7 @@ impl Context {
155160
#[pyo3(signature = (role, content, data_type = None))]
156161
fn add(
157162
&mut self,
163+
py: Python<'_>,
158164
role: &str,
159165
content: &Bound<'_, PyAny>,
160166
data_type: Option<&str>,
@@ -191,9 +197,13 @@ impl Context {
191197
embedding: None,
192198
};
193199

194-
self.runtime
195-
.block_on(self.store.add(std::slice::from_ref(&record)))
196-
.map_err(to_py_err)?;
200+
// Release GIL during storage write
201+
py.allow_threads(|| {
202+
self.runtime
203+
.block_on(self.store.add(std::slice::from_ref(&record)))
204+
.map_err(to_py_err)
205+
})?;
206+
197207
self.inner.add(role, &inner_content, data_type);
198208
Ok(())
199209
}
@@ -212,10 +222,13 @@ impl Context {
212222
}
213223
}
214224

215-
fn checkout(&mut self, version_id: u64) -> PyResult<()> {
216-
self.runtime
217-
.block_on(self.store.checkout(version_id))
218-
.map_err(to_py_err)?;
225+
fn checkout(&mut self, py: Python<'_>, version_id: u64) -> PyResult<()> {
226+
// Release GIL during checkout operation
227+
py.allow_threads(|| {
228+
self.runtime
229+
.block_on(self.store.checkout(version_id))
230+
.map_err(to_py_err)
231+
})?;
219232
self.run_id = new_run_id();
220233
Ok(())
221234
}
@@ -227,10 +240,13 @@ impl Context {
227240
query: Vec<f32>,
228241
limit: Option<usize>,
229242
) -> PyResult<Vec<PyObject>> {
230-
let hits = self
231-
.runtime
232-
.block_on(self.store.search(&query, limit))
233-
.map_err(to_py_err)?;
243+
// Release GIL during vector search
244+
let hits = py.allow_threads(|| {
245+
self.runtime
246+
.block_on(self.store.search(&query, limit))
247+
.map_err(to_py_err)
248+
})?;
249+
234250
hits.into_iter()
235251
.map(|hit| search_hit_to_py(py, hit))
236252
.collect()
@@ -243,10 +259,13 @@ impl Context {
243259
limit: Option<usize>,
244260
offset: Option<usize>,
245261
) -> PyResult<Vec<PyObject>> {
246-
let records = self
247-
.runtime
248-
.block_on(self.store.list(limit, offset))
249-
.map_err(to_py_err)?;
262+
// Release GIL during data retrieval
263+
let records = py.allow_threads(|| {
264+
self.runtime
265+
.block_on(self.store.list(limit, offset))
266+
.map_err(to_py_err)
267+
})?;
268+
250269
records
251270
.into_iter()
252271
.map(|record| record_to_py(py, record))
@@ -260,6 +279,7 @@ impl Context {
260279
target_rows_per_fragment: Option<usize>,
261280
materialize_deletions: Option<bool>,
262281
) -> PyResult<PyObject> {
282+
// Prepare config before releasing GIL
263283
let config = if target_rows_per_fragment.is_some() || materialize_deletions.is_some() {
264284
let mut cfg = self.store.compaction_config.clone();
265285
if let Some(rows) = target_rows_per_fragment {
@@ -273,19 +293,23 @@ impl Context {
273293
None
274294
};
275295

276-
let metrics = self
277-
.runtime
278-
.block_on(self.store.compact(config))
279-
.map_err(to_py_err)?;
296+
// Release GIL during expensive compaction operation
297+
let metrics = py.allow_threads(|| {
298+
self.runtime
299+
.block_on(self.store.compact(config))
300+
.map_err(to_py_err)
301+
})?;
280302

281303
compaction_metrics_to_py(py, metrics)
282304
}
283305

284306
fn compaction_stats(&self, py: Python<'_>) -> PyResult<PyObject> {
285-
let stats = self
286-
.runtime
287-
.block_on(self.store.compaction_stats())
288-
.map_err(to_py_err)?;
307+
// Release GIL during stats query
308+
let stats = py.allow_threads(|| {
309+
self.runtime
310+
.block_on(self.store.compaction_stats())
311+
.map_err(to_py_err)
312+
})?;
289313

290314
compaction_stats_to_py(py, stats)
291315
}

0 commit comments

Comments
 (0)