diff --git a/docs/src/content/docs/cli.mdx b/docs/src/content/docs/cli.mdx index 812f361a..272d62a2 100644 --- a/docs/src/content/docs/cli.mdx +++ b/docs/src/content/docs/cli.mdx @@ -185,6 +185,7 @@ cocoindex update [OPTIONS] APP_TARGET | `--reset` | Drop existing setup before updating (equivalent to running 'cocoindex drop' first). | | `--full-reprocess` | Reprocess everything and invalidate existing caches. | | `-L, --live` | Run in live mode (live components continue processing after initial update). | +| `--preview` | Compute target actions without applying them. Prints planned actions. | | `--help` | Show this message and exit. | --- diff --git a/python/cocoindex/_internal/app.py b/python/cocoindex/_internal/app.py index 1c44967a..d0726f76 100644 --- a/python/cocoindex/_internal/app.py +++ b/python/cocoindex/_internal/app.py @@ -57,10 +57,12 @@ def __init__( self, init_coro: Any, # Coroutine that returns core.UpdateHandle main_fn: Any = None, + preview: bool = False, ) -> None: self._init_coro = init_coro self._core_handle: core.UpdateHandle | None = None self._main_fn = main_fn # used for return type inspection + self._preview = preview async def _ensure_started(self) -> core.UpdateHandle: if self._core_handle is None: @@ -133,6 +135,9 @@ async def watch(self) -> AsyncIterator[UpdateSnapshot[R]]: async def result(self) -> R: """Await the update result. Raises on error.""" handle = await self._ensure_started() + if self._preview: + await handle.result() + return handle.take_preview_actions() # type: ignore[return-value] pyvalue: Any = await handle.result() return pyvalue.get(fn_ret_deserializer(self._main_fn)) # type: ignore[no-any-return] @@ -301,6 +306,7 @@ def update( *, full_reprocess: bool = False, live: bool = False, + preview: bool = False, ) -> UpdateHandle[R]: """ Start an update and return a handle for tracking progress and awaiting the result. @@ -312,6 +318,8 @@ def update( full_reprocess: If True, reprocess everything and invalidate existing caches. live: If True, run in live mode (live components continue processing after mark_ready). + preview: If True, compute target actions without applying them. + The handle's result will be a list of raw action objects. Returns: An UpdateHandle that provides access to stats(), watch(), and result(). @@ -327,10 +335,11 @@ async def _init() -> core.UpdateHandle: processor, full_reprocess=full_reprocess, live=live, + preview=preview, host_ctx=env._context_provider, ) - return UpdateHandle(_init(), main_fn=self._main_fn) + return UpdateHandle(_init(), main_fn=self._main_fn, preview=preview) def update_blocking( self, @@ -338,7 +347,8 @@ def update_blocking( report_to_stdout: bool = False, full_reprocess: bool = False, live: bool = False, - ) -> R: + preview: bool = False, + ) -> R | list[Any]: """ Update the app synchronously (run the app once to process all pending changes). @@ -347,9 +357,11 @@ def update_blocking( full_reprocess: If True, reprocess everything and invalidate existing caches. live: If True, run in live mode (live components continue processing after mark_ready). + preview: If True, compute target actions without applying them. + Returns a list of raw action objects instead of the main function result. Returns: - The result of the main function. + The result of the main function, or a list of actions in preview mode. """ env, core_app = self._get_core_env_app_sync() root_path = core.StablePath() @@ -362,7 +374,10 @@ def update_blocking( host_ctx=env._context_provider, report_to_stdout=report_to_stdout, live=live, + preview=preview, ) + if preview: + return pyvalue # type: ignore[no-any-return] return pyvalue.get(fn_ret_deserializer(self._main_fn)) # type: ignore[no-any-return] async def drop(self) -> None: diff --git a/python/cocoindex/_internal/core.pyi b/python/cocoindex/_internal/core.pyi index 2851adce..11f2e5e2 100644 --- a/python/cocoindex/_internal/core.pyi +++ b/python/cocoindex/_internal/core.pyi @@ -188,6 +188,7 @@ class UpdateHandle: def stats_snapshot(self) -> tuple[int, bool, dict[str, dict[str, int]]]: ... def changed(self) -> Coroutine[Any, Any, int]: ... def result(self) -> Coroutine[Any, Any, StoredValue]: ... + def take_preview_actions(self) -> list[Any]: ... # --- DropHandle --- class DropHandle: @@ -209,12 +210,14 @@ class App: host_ctx: Any = None, report_to_stdout: bool = False, live: bool = False, - ) -> StoredValue: ... + preview: bool = False, + ) -> StoredValue | list[Any]: ... def update_async( self, root_processor: ComponentProcessor[T_co], full_reprocess: bool = False, live: bool = False, + preview: bool = False, host_ctx: Any = None, ) -> UpdateHandle: ... def drop(self, host_ctx: Any = None, report_to_stdout: bool = False) -> None: ... diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index be82ffb0..7931d915 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -686,6 +686,13 @@ async def _stop_all_environments() -> None: default=False, help="Run in live mode (live components continue processing after initial update).", ) +@click.option( + "--preview", + is_flag=True, + show_default=True, + default=False, + help="Compute target actions without applying them. Prints planned actions.", +) def update( app_target: str, force: bool, @@ -693,12 +700,18 @@ def update( reset: bool, full_reprocess: bool, live: bool, + preview: bool, ) -> None: """ Run an app in catch-up mode. With --live, run in live mode. `APP_TARGET`: `path/to/app.py`, `module`, `path/to/app.py:app_name`, or `module:app_name`. """ + if preview and reset: + raise click.UsageError("--preview and --reset cannot be used together.") + if preview and live: + raise click.UsageError("--preview and --live cannot be used together.") + app = _load_app(app_target) async def _do(cancelled: Any) -> None: @@ -711,6 +724,20 @@ async def _do(cancelled: Any) -> None: f"Running app '{app._name}' from environment '{env.name}' (db path: {env.settings.db_path})" ) + if preview: + handle = app.update( + full_reprocess=full_reprocess, + preview=True, + ) + actions: list[Any] = await handle.result() + click.echo("Preview: planned target actions") + if actions: + for action in actions: + click.echo(f" {action!r}") + else: + click.echo(" No target actions planned.") + return + # --reset: drop existing state first (equivalent to `cocoindex drop ...`) if reset: if not force: diff --git a/python/tests/cli/flat_target_app.py b/python/tests/cli/flat_target_app.py new file mode 100644 index 00000000..039a9e41 --- /dev/null +++ b/python/tests/cli/flat_target_app.py @@ -0,0 +1,60 @@ +"""Test app with flat/leaf target states only (no child providers).""" + +from __future__ import annotations + +import pathlib +from typing import Any, Collection + +import cocoindex as coco + +_HERE = pathlib.Path(__file__).resolve().parent +DB_PATH = _HERE / "cocoindex.db" + +env = coco.Environment(coco.Settings.from_env(db_path=DB_PATH)) + + +class _FlatStore: + def __init__(self) -> None: + self.data: dict[str, Any] = {} + + def _sink( + self, + context_provider: coco.ContextProvider, + actions: Collection[tuple[str, Any | coco.NonExistenceType]], + /, + ) -> None: + for key, value in actions: + if coco.is_non_existence(value): + self.data.pop(key, None) + else: + self.data[key] = value + + def reconcile( + self, + key: coco.StableKey, + desired_state: Any | coco.NonExistenceType, + prev_possible_records: Collection[Any], + prev_may_be_missing: bool, + ) -> ( + coco.TargetReconcileOutput[tuple[str, Any | coco.NonExistenceType], Any] | None + ): + assert isinstance(key, str) + return coco.TargetReconcileOutput( + action=(key, desired_state), + sink=coco.TargetActionSink.from_fn(self._sink), + tracking_record=desired_state, + ) + + +_flat_store = _FlatStore() +_provider = coco.register_root_target_states_provider( + "test_cli/flat_preview", _flat_store +) + + +@coco.fn +def build() -> None: + coco.declare_target_state(_provider.target_state("x", 42)) + + +app = coco.App(coco.AppConfig(name="FlatPreviewApp", environment=env), build) diff --git a/python/tests/cli/test_cli.py b/python/tests/cli/test_cli.py index 6e17619f..cb630bd3 100644 --- a/python/tests/cli/test_cli.py +++ b/python/tests/cli/test_cli.py @@ -690,6 +690,31 @@ def test_drop_quiet_suppresses_informational_output(self) -> None: # ============================================================================= +class TestPreview: + """Tests for the --preview flag on update.""" + + def test_preview_prints_actions(self) -> None: + """update --preview should print planned actions without writing.""" + result = run_cli("update", "./flat_target_app.py", "--preview") + assert "Preview: planned target actions" in result.stdout + + def test_preview_reset_rejected(self) -> None: + """--preview --reset should be rejected.""" + result = run_cli( + "update", "./single_app.py", "--preview", "--reset", check=False + ) + assert result.returncode != 0 + assert "cannot be used together" in result.stderr.lower() + + def test_preview_live_rejected(self) -> None: + """--preview --live should be rejected.""" + result = run_cli( + "update", "./single_app.py", "--preview", "--live", check=False + ) + assert result.returncode != 0 + assert "cannot be used together" in result.stderr.lower() + + class TestShowTree: """Tests for the show command with --tree flag.""" diff --git a/python/tests/core/test_component_target_states.py b/python/tests/core/test_component_target_states.py index dce5d0aa..500e8809 100644 --- a/python/tests/core/test_component_target_states.py +++ b/python/tests/core/test_component_target_states.py @@ -1110,3 +1110,24 @@ def test_mount_target_delete() -> None: } assert DictsTarget.store.metrics.collect() == {"sink": 2, "delete": 1} assert DictsTarget.store.collect_child_metrics() == {"sink": 1, "upsert": 1} + + +################################################################################## +# Test: preview rejects child target providers +################################################################################## + + +def test_preview_rejects_child_target_providers() -> None: + DictsTarget.store.clear() + _source_data.clear() + + app = coco.App( + coco.AppConfig( + name="test_preview_rejects_child_target_providers", environment=coco_env + ), + _declare_dicts_data_together, + ) + + _source_data["D1"] = {"a": 1} + with pytest.raises(Exception, match="child target providers"): + app.update_blocking(preview=True) diff --git a/python/tests/core/test_flat_target_states.py b/python/tests/core/test_flat_target_states.py index 125cde67..2f84903b 100644 --- a/python/tests/core/test_flat_target_states.py +++ b/python/tests/core/test_flat_target_states.py @@ -178,6 +178,44 @@ def test_async_global_dict_target_state_insert() -> None: assert AsyncGlobalDictTarget.store.metrics.collect() == {"sink": 1, "upsert": 1} +def test_global_dict_preview_returns_actions_without_writing() -> None: + GlobalDictTarget.store.clear() + _source_data.clear() + + app = coco.App( + coco.AppConfig( + name="test_global_dict_preview_returns_actions", environment=coco_env + ), + declare_global_dict_entries, + ) + + _source_data["a"] = 1 + _source_data["b"] = 2 + actions = app.update_blocking(preview=True) + + assert isinstance(actions, list) + assert len(actions) > 0 + assert GlobalDictTarget.store.data == {} + + +@pytest.mark.asyncio +async def test_global_dict_preview_async() -> None: + GlobalDictTarget.store.clear() + _source_data.clear() + + app = coco.App( + coco.AppConfig(name="test_global_dict_preview_async", environment=coco_env), + declare_global_dict_entries, + ) + + _source_data["a"] = 1 + actions = await app.update(preview=True) + + assert isinstance(actions, list) + assert len(actions) > 0 + assert GlobalDictTarget.store.data == {} + + def test_global_dict_target_state_proceed_with_exception() -> None: GlobalDictTarget.store.clear() _source_data.clear() diff --git a/rust/core/src/engine/app.rs b/rust/core/src/engine/app.rs index 305a77fd..e39675ba 100644 --- a/rust/core/src/engine/app.rs +++ b/rust/core/src/engine/app.rs @@ -3,7 +3,7 @@ use crate::engine::stats::{ProcessingStats, VersionedProcessingStats}; use crate::prelude::*; use crate::engine::component::Component; -use crate::engine::context::AppContext; +use crate::engine::context::{AppContext, PreviewActionCollector}; use crate::engine::environment::{AppRegistration, Environment}; use crate::engine::runtime::get_runtime; @@ -17,6 +17,8 @@ pub struct AppUpdateOptions { pub full_reprocess: bool, /// If true, enable live component mode for this update. pub live: bool, + /// If true, compute target actions without applying them or committing state. + pub preview: bool, } /// Handle returned by `App::update` or `App::drop_app` that provides access to @@ -98,18 +100,28 @@ impl App { root_processor: Prof::ComponentProc, options: AppUpdateOptions, host_ctx: Arc, - ) -> Result> { + ) -> Result<( + AppOpHandle, + Option>, + )> { crate::telemetry::track("app_update"); // Refresh the app token if a prior operation (e.g. drop_app) cancelled // it, so this update starts with a non-cancelled token. self.app_ctx().reset_cancellation_token_if_cancelled(); let processing_stats = ProcessingStats::new(); let version_rx = processing_stats.subscribe(); + let preview_collector: Option> = if options.preview { + Some(Arc::new(std::sync::Mutex::new(Vec::new()))) + } else { + None + }; let context = self.root_component.new_processor_context_for_build( None, processing_stats.clone(), options.full_reprocess, options.live, + options.preview, + preview_collector.clone(), host_ctx, )?; @@ -143,12 +155,15 @@ impl App { .instrument(span), ); - Ok(AppOpHandle { - task, - stats: processing_stats, - version_rx, - live, - }) + Ok(( + AppOpHandle { + task, + stats: processing_stats, + version_rx, + live, + }, + preview_collector, + )) } /// Drop the app, reverting all target states and clearing the database. diff --git a/rust/core/src/engine/component.rs b/rust/core/src/engine/component.rs index 75e0442e..bbe599f0 100644 --- a/rust/core/src/engine/component.rs +++ b/rust/core/src/engine/component.rs @@ -8,7 +8,7 @@ use std::sync::atomic::AtomicU64; use crate::engine::context::FnCallContext; use crate::engine::context::{ AppContext, ComponentDeleteContext, ComponentProcessingAction, ComponentProcessingMode, - ComponentProcessorContext, MemoStatesPayload, + ComponentProcessorContext, MemoStatesPayload, PreviewActionCollector, }; use crate::engine::execution::{ cleanup_tombstone, post_submit_for_build, submit, update_component_memo_states, @@ -356,6 +356,8 @@ impl Component { parent_ctx.processing_stats().clone(), parent_ctx.full_reprocess(), parent_ctx.live(), // use_mount inherits live from parent + parent_ctx.preview(), + parent_ctx.preview_collector().cloned(), parent_ctx.host_ctx().clone(), )?; self.run(processor, child_ctx).await @@ -382,6 +384,8 @@ impl Component { parent_ctx.processing_stats().clone(), parent_ctx.full_reprocess(), parent_ctx.live(), // mount inherits live from parent + parent_ctx.preview(), + parent_ctx.preview_collector().cloned(), parent_ctx.host_ctx().clone(), )?; self.run_in_background(processor, child_ctx, on_error, pre_execute_check) @@ -885,6 +889,8 @@ impl Component { processing_stats: ProcessingStats, full_reprocess: bool, live: bool, + preview: bool, + preview_collector: Option>, host_ctx: Arc, ) -> Result> { let providers = if let Some(parent_ctx) = parent_ctx { @@ -916,7 +922,13 @@ impl Component { parent_ctx.cloned(), processing_stats, host_ctx, - ComponentProcessingAction::new_build(providers, full_reprocess, live), + ComponentProcessingAction::new_build( + providers, + full_reprocess, + live, + preview, + preview_collector, + ), )) } diff --git a/rust/core/src/engine/context.rs b/rust/core/src/engine/context.rs index 017c2c3a..72cc05f4 100644 --- a/rust/core/src/engine/context.rs +++ b/rust/core/src/engine/context.rs @@ -178,10 +178,16 @@ pub(crate) struct ComponentBuildingState { pub fn_call_memos: HashMap>>>, } +/// Shared collector for preview actions across all components in a single update. +pub(crate) type PreviewActionCollector = + Arc::TargetAction>>>; + pub(crate) struct ComponentBuildContext { pub state: Mutex>>, pub full_reprocess: bool, pub live: bool, + pub preview: bool, + pub preview_collector: Option>, } pub(crate) struct ComponentDeleteContext { @@ -204,6 +210,8 @@ impl ComponentProcessingAction { providers: rpds::HashTrieMapSync>, full_reprocess: bool, live: bool, + preview: bool, + preview_collector: Option>, ) -> Self { Self::Build(ComponentBuildContext { state: Mutex::new(Some(ComponentBuildingState { @@ -216,6 +224,8 @@ impl ComponentProcessingAction { })), full_reprocess, live, + preview, + preview_collector, }) } } @@ -384,6 +394,20 @@ impl ComponentProcessorContext { } } + pub fn preview(&self) -> bool { + match &self.inner.processing_action { + ComponentProcessingAction::Build(build_ctx) => build_ctx.preview, + ComponentProcessingAction::Delete { .. } => false, + } + } + + pub(crate) fn preview_collector(&self) -> Option<&PreviewActionCollector> { + match &self.inner.processing_action { + ComponentProcessingAction::Build(build_ctx) => build_ctx.preview_collector.as_ref(), + ComponentProcessingAction::Delete { .. } => None, + } + } + pub fn live(&self) -> bool { match &self.inner.processing_action { ComponentProcessingAction::Build(build_ctx) => build_ctx.live, diff --git a/rust/core/src/engine/execution.rs b/rust/core/src/engine/execution.rs index 782e4bb2..9fb2c9f0 100644 --- a/rust/core/src/engine/execution.rs +++ b/rust/core/src/engine/execution.rs @@ -1328,8 +1328,69 @@ pub(crate) async fn submit( let mut pending_fulfillments: Vec<(TargetStateProvider, Prof::TargetHdl)> = Vec::new(); let target_states_providers_owned = target_states_providers.clone(); + let preview = comp_ctx.preview(); + + if preview { + // Preview mode: run pre_commit in a standalone write transaction that is + // intentionally dropped (aborted) so no tracking state is committed. + let db_env = comp_ctx.app_ctx().env().db_env().clone(); + let pre_commit_out = tokio::task::spawn_blocking(move || { + let mut wtxn = db_env.write_txn()?; + let result = pre_commit( + &mut wtxn, + &db, + comp_mode, + &stable_path, + full_reprocess, + processor_name_owned.as_deref(), + &encoded_target_state_info_key, + &memo_del_key, + &contained_target_state_paths, + &target_states_providers_owned, + declared_target_states, + ); + // Intentionally drop wtxn without commit (aborts the transaction). + drop(wtxn); + result + }) + .await + .map_err(|e| internal_error!("preview pre_commit task panicked: {e}"))??; + + let Some(pre_commit_out) = pre_commit_out else { + return Ok(SubmitOutput { + built_target_states_providers: None, + touched_previous_states: false, + }); + }; + if let Some(ref name) = pre_commit_out.processor_name_for_del { + collect_processor_name_name_for_del(name); + } + + // Reject unsupported child-provider / nested target cases. + for input in pre_commit_out.actions_by_sinks.values() { + if input.child_providers.is_some() { + client_bail!( + "preview currently supports flat/leaf target actions only; \ + target actions requiring child target providers are not supported yet" + ); + } + } + + // Push all actions to the shared collector. + if let Some(collector) = comp_ctx.preview_collector() { + let mut guard = collector.lock().unwrap(); + for (_sink, input) in pre_commit_out.actions_by_sinks { + guard.extend(input.actions); + } + } + + return Ok(SubmitOutput { + built_target_states_providers, + touched_previous_states: pre_commit_out.previously_exists, + }); + } - // Reconcile and pre-commit target states + // Normal (non-preview) mode: reconcile and pre-commit target states via txn_batcher. let pre_commit_out = comp_ctx .app_ctx() .env() diff --git a/rust/core/src/engine/live_component.rs b/rust/core/src/engine/live_component.rs index 8c7d8a7c..4f3af857 100644 --- a/rust/core/src/engine/live_component.rs +++ b/rust/core/src/engine/live_component.rs @@ -107,6 +107,7 @@ impl MountLivePending { parent_ctx.host_ctx().clone(), parent_ctx.full_reprocess(), live, + parent_ctx.preview(), providers, ); @@ -253,6 +254,7 @@ pub struct LiveComponentController { host_ctx: Arc, full_reprocess: bool, live: bool, + preview: bool, /// Providers inherited from the parent component context at creation time. /// Immutable — process() may not call use_mount(), so no new providers are created. @@ -267,6 +269,7 @@ impl LiveComponentController { host_ctx: Arc, full_reprocess: bool, live: bool, + preview: bool, providers: rpds::HashTrieMapSync>, ) -> Self { Self { @@ -276,6 +279,7 @@ impl LiveComponentController { host_ctx, full_reprocess, live, + preview, providers, } } @@ -310,6 +314,8 @@ impl LiveComponentController { self.providers.clone(), self.full_reprocess, self.live, + self.preview, + None, ), ); @@ -353,6 +359,8 @@ impl LiveComponentController { self.providers.clone(), self.full_reprocess, self.live, + self.preview, + None, ), ); diff --git a/rust/py/src/app.rs b/rust/py/src/app.rs index 67a1b217..19c22d63 100644 --- a/rust/py/src/app.rs +++ b/rust/py/src/app.rs @@ -31,6 +31,8 @@ fn snapshot_to_py<'py>( Ok(dict) } +type PyPreviewCollector = Arc>>>; + #[pyclass(name = "UpdateHandle")] pub struct PyUpdateHandle { handle: Mutex>>, @@ -38,6 +40,7 @@ pub struct PyUpdateHandle { /// Persistent receiver shared across `changed()` calls via Arc. /// Using tokio::Mutex so it can be held across .await points. version_rx: Arc>>, + preview_collector: Option, } impl PyUpdateHandle { @@ -48,6 +51,7 @@ impl PyUpdateHandle { handle: Mutex::new(Some(handle)), stats, version_rx, + preview_collector: None, } } } @@ -94,6 +98,19 @@ impl PyUpdateHandle { Ok(ret) }) } + + /// Returns collected preview actions as a Python list. Call after result(). + pub fn take_preview_actions<'py>( + &mut self, + py: Python<'py>, + ) -> PyResult> { + let actions = self + .preview_collector + .as_ref() + .map(|c| std::mem::take(&mut *c.lock().unwrap())) + .unwrap_or_default(); + pyo3::types::PyList::new(py, actions.iter().map(|a| a.bind(py))).map_err(|e| e.into()) + } } #[pyclass(name = "DropHandle")] @@ -174,27 +191,31 @@ impl PyApp { Ok(Self(Arc::new(app))) } - #[pyo3(signature = (root_processor, full_reprocess, host_ctx, live=false))] + #[pyo3(signature = (root_processor, full_reprocess, host_ctx, live=false, preview=false))] pub fn update_async( &self, root_processor: PyComponentProcessor, full_reprocess: bool, host_ctx: Py, live: bool, + preview: bool, ) -> PyResult { let app = self.0.clone(); let options = AppUpdateOptions { full_reprocess, live, + preview, }; let host_ctx = Arc::new(host_ctx); - let handle = app + let (handle, preview_collector) = app .update(root_processor, options, host_ctx) .into_py_result()?; - Ok(PyUpdateHandle::new(handle)) + let mut uh = PyUpdateHandle::new(handle); + uh.preview_collector = preview_collector; + Ok(uh) } - #[pyo3(signature = (root_processor, full_reprocess, host_ctx, report_to_stdout=false, live=false))] + #[pyo3(signature = (root_processor, full_reprocess, host_ctx, report_to_stdout=false, live=false, preview=false))] pub fn update( &self, py: Python<'_>, @@ -203,24 +224,39 @@ impl PyApp { host_ctx: Py, report_to_stdout: bool, live: bool, - ) -> PyResult { + preview: bool, + ) -> PyResult> { let app = self.0.clone(); let options = AppUpdateOptions { full_reprocess, live, + preview, }; let host_ctx = Arc::new(host_ctx); py.detach(|| { get_runtime().block_on(async move { - let handle = app + let (handle, preview_collector) = app .update(root_processor, options, host_ctx) .into_py_result()?; - if report_to_stdout { - rust_show_progress(handle, ProgressDisplayOptions::default()) - .await - .into_py_result() + if preview { + handle.result().await.into_py_result()?; + let actions = preview_collector + .map(|c| std::mem::take(&mut *c.lock().unwrap())) + .unwrap_or_default(); + Python::attach(|py| { + let list = + pyo3::types::PyList::new(py, actions.iter().map(|a| a.bind(py)))?; + Ok(list.unbind().into_any()) + }) + } else if report_to_stdout { + let ret: PyStoredValue = + rust_show_progress(handle, ProgressDisplayOptions::default()) + .await + .into_py_result()?; + Python::attach(|py| Ok(Py::new(py, ret)?.into_any())) } else { - handle.result().await.into_py_result() + let ret: PyStoredValue = handle.result().await.into_py_result()?; + Python::attach(|py| Ok(Py::new(py, ret)?.into_any())) } }) })