You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Once we drop support for Python 3.10, we should adopt asyncio.TaskGroup (available in Python 3.11+) to replace manual task management patterns (asyncio.create_task + manual cancellation + asyncio.gather). This improves error propagation, automatic cleanup, and overall code clarity.
Current Implementation
1. RequestQueueClientAsync.batch_add_requests — High priority
TaskGroup could simplify the __aenter__/__aexit__ lifecycle management by tying task lifetime to the group's scope. However, start() currently returns the raw Task object — we need to check if any callers depend on this before refactoring.
Better error handling: TaskGroup automatically propagates exceptions from any task. If a worker fails unexpectedly before queue.join() completes, all remaining workers are cancelled and the exception propagates cleanly via ExceptionGroup.
Cleaner code: Context manager pattern ensures proper cleanup — no risk of fire-and-forget task leaks.
More Pythonic: Uses modern Python structured concurrency patterns.
Automatic waiting: No need for explicit asyncio.gather() calls or manual cancellation loops.
Notes
This is blocked until Python 3.10 support is dropped (see target_python_version in pyproject.toml).
Summary
Once we drop support for Python 3.10, we should adopt
asyncio.TaskGroup(available in Python 3.11+) to replace manual task management patterns (asyncio.create_task+ manual cancellation +asyncio.gather). This improves error propagation, automatic cleanup, and overall code clarity.Current Implementation
1.
RequestQueueClientAsync.batch_add_requests— High prioritysrc/apify_client/clients/resource_clients/request_queue.py(lines 712–790)The batch processing spawns multiple worker tasks to process batched API calls in parallel using manual task management:
2.
StreamedLogAsync— Medium prioritysrc/apify_client/clients/resource_clients/log.py(lines 332–377)Manages a single background streaming task with
asyncio.create_task()and manual cancel/await instop():3.
StatusMessageWatcherAsync— Medium prioritysrc/apify_client/clients/resource_clients/log.py(lines 428–480)Nearly identical pattern to
StreamedLogAsync— a single background polling task with manual start/cancel/await:4. Documentation example — Low priority
docs/03_examples/code/02_tasks_async.py(lines 44–45)Uses
asyncio.gather()to run multiple task clients concurrently:Proposed Improvement
Batch processing (request_queue.py)
Use
asyncio.TaskGroupfor cleaner code and automatic exception handling:Log streaming and status watcher (log.py)
TaskGroupcould simplify the__aenter__/__aexit__lifecycle management by tying task lifetime to the group's scope. However,start()currently returns the rawTaskobject — we need to check if any callers depend on this before refactoring.Documentation example
Update to showcase the modern
TaskGrouppattern:Benefits
TaskGroupautomatically propagates exceptions from any task. If a worker fails unexpectedly beforequeue.join()completes, all remaining workers are cancelled and the exception propagates cleanly viaExceptionGroup.asyncio.gather()calls or manual cancellation loops.Notes
target_python_versioninpyproject.toml).TaskGroupshines most with fan-out concurrency (Write design document #1).ExceptionGroupraised byTaskGroupon failure differs fromgather(return_exceptions=True)behavior — tests should be reviewed accordingly.Prerequisites
TaskGrouprequires Python 3.11+).