Skip to content

Commit ec83b54

Browse files
authored
feat: Orchestrator - Optimization - Use mini-batches when processing queues (#248)
Optimization: Process same queue multiple times so that multiple fast operations can be batched together. Before: Queued (fast), Running (slow), Queued (fast), Running (slow), Queued (fast), Running (slow) After: Queued (fast), Queued (fast), Queued (fast), Running (slow) After: Queued (fast), Queued (slow), Running (slow) Process same queue multiple times: * For up to X milliseconds (e.g. 200ms) * Up to Y times (reduces blast radius of time-related bugs) * If the queue is not empty
1 parent 8c8d4ed commit ec83b54

1 file changed

Lines changed: 33 additions & 6 deletions

File tree

cloud_pipelines_backend/orchestrator_sql.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ def __init__(
5050
default_task_annotations: dict[str, Any] | None = None,
5151
sleep_seconds_between_queue_sweeps: float = 1.0,
5252
output_data_purge_duration: datetime.timedelta = None,
53+
*,
54+
# Internal/experimental:
55+
_max_queue_batch_size: int = 1,
56+
_max_queue_batch_duration: datetime.timedelta = datetime.timedelta(),
5357
):
5458
self._session_factory = session_factory
5559
self._launcher = launcher
@@ -62,6 +66,9 @@ def __init__(
6266
self._running_executions_queue_idle = False
6367
self._output_data_purge_duration = output_data_purge_duration
6468

69+
self._max_queue_batch_size = _max_queue_batch_size
70+
self._max_queue_batch_duration = _max_queue_batch_duration
71+
6572
def run_loop(self):
6673
while True:
6774
try:
@@ -77,12 +84,32 @@ def process_each_queue_once(self):
7784
self.internal_process_running_executions_queue,
7885
]
7986
for queue_handler in queue_handlers:
80-
try:
81-
with self._session_factory() as session:
82-
queue_handler(session=session)
83-
except Exception as exc:
84-
_logger.exception(f"Error while executing {queue_handler=}")
85-
bugsnag_instrumentation.notify(exception=exc)
87+
# Optimization: Process same queue multiple times so that multiple fast operations can be batched together.
88+
# Before: Queued (fast), Running (slow), Queued (fast), Running (slow), Queued (fast), Running (slow)
89+
# After: Queued (fast), Queued (fast), Queued (fast), Running (slow)
90+
# After: Queued (fast), Queued (slow), Running (slow)
91+
# Process same queue multiple times
92+
# * For up to X milliseconds (e.g. 200ms)
93+
# * Up to Y times (reduces blast radius of time-related bugs)
94+
# * If the queue is not empty
95+
# Using monotonic timer to avoid bugs when time changes
96+
start_time_seconds = time.monotonic()
97+
for _ in range(self._max_queue_batch_size):
98+
try:
99+
with self._session_factory() as session:
100+
queue_had_items = queue_handler(session=session)
101+
if not queue_had_items:
102+
break
103+
except Exception as exc:
104+
_logger.exception(f"Error while executing {queue_handler=}")
105+
bugsnag_instrumentation.notify(exception=exc)
106+
if (
107+
datetime.timedelta(
108+
milliseconds=(time.monotonic() - start_time_seconds) * 1000
109+
)
110+
> self._max_queue_batch_duration
111+
):
112+
break
86113

87114
def internal_process_queued_executions_queue(self, session: orm.Session):
88115
query = (

0 commit comments

Comments
 (0)