Skip to content

Commit 0f94840

Browse files
zxqfd555Manul from Pathway
authored andcommitted
dynamic worker count scaling (#9730)
GitOrigin-RevId: 69c2c0dbbb8f551d71031a9efe25001f3b3d8a1b
1 parent fc7aa0d commit 0f94840

20 files changed

Lines changed: 751 additions & 54 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
88
### Added
99
- `pw.io.kafka.read` and `pw.io.kafka.write` connectors now support OAUTHBEARER authentication.
1010
- `pw.io.mongodb.write` connector now supports an `output_table_type` parameter with two modes: `stream_of_changes` (default) and `snapshot`. In `snapshot` mode, the connector maintains the current state of the Pathway table in MongoDB using the `_id` field as the primary key, while `stream_of_changes` preserves the existing behavior by writing all events with `time` and `diff` flags to reflect transactional minibatches and the nature of each change.
11-
11+
- Workers can now automatically scale up or down based on pipeline load, using a configurable monitoring window. This feature requires persistence to be enabled and can be configured via `worker_scaling_enabled` and `workload_tracking_window_ms` in `pw.persistence.Config`. Please refer to the tutorial for more details.
1212

1313
## [0.29.0] - 2026-01-22
1414

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
---
2+
title: 'Dynamic Worker Scaling'
3+
description: 'This page describes how to set up dynamic worker count scaling in Pathway'
4+
---
5+
6+
# Dynamic Worker Scaling
7+
8+
Programs built with Pathway allow you to scale workload and split heavy pipelines into parallel units of execution. This enables effective parallelization and higher throughput for computationally intensive workloads.
9+
10+
To run computations in parallel, you can use the `pathway spawn` command and specify the number of workers to use.
11+
12+
This way:
13+
- Each worker executes in parallel.
14+
- Workers can be implemented either as **processes** or as **threads**.
15+
- By increasing the number of workers, you can split computations into more parallel parts and achieve better throughput.
16+
17+
When you start your program this way, it will run **continuously with exactly `N` workers** for its entire lifetime. It helps in the set-ups with a consistent load profile, but has certain limitations if the load profile is volatile: if the system load is low, resources may be underutilized, while if the load increases, the fixed number of workers may become a bottleneck. In other words, the worker count doesn't adapt automatically and you have a **static execution model**.
18+
19+
In many real-world scenarios, it is desirable for the number of workers to be dynamic: under low load, the number of active workers should decrease to save resources, while under high load, the number of workers should increase proportionally to handle the demand. To support this use case, Pathway provides **built-in autoscaling mechanisms** that allow worker counts to grow and shrink automatically based on workload.
20+
21+
## How Dynamic Scaling Works
22+
23+
Understanding the basics of the scaling mechanism is useful for setting it up efficiently.
24+
25+
When you launch a Pathway computation via `pathway spawn`, an orchestrator creates and manages the workers. It receives information from the workers based on which the number of workers can be adjusted.
26+
27+
Each worker, if configured, tracks its load profile. This tracking is based on the time spent on computation over a sliding window (2 minutes by default, configurable by the user), as well as the worker's idle time and the intervals the scheduler expected the computations to take. After the full window interval has passed, patterns with excessive idle time lead to worker termination, where an exit code informs the orchestrator that the engine should be scaled down. Conversely, if computations consistently fall behind, the orchestrator determines that the computation should be restarted with a larger number of workers.
28+
29+
After receiving a signal, the orchestrator adjusts the number of workers and restarts the computation. The sharding and work-splitting mechanisms are updated for the new worker count.
30+
31+
To ensure computations resume from the point reached when the decision to change the number of workers was made, **data persistence is required**. Scaling can also be configured within the data persistence settings.
32+
33+
Last, but not the least, please note that the described procedure implies a full restart of the computation graph. Persistence mitigates this, but does not eliminate restart costs.
34+
35+
### Worker Count Adjustment Rules
36+
37+
As mentioned, the number of workers can be recalculated by the orchestration process. The following rules illustrate how this adjustment works:
38+
39+
- **Increasing workers:** The orchestrator doubles the current number of workers.
40+
- Example: If you have 4 workers and scaling up is required, the system will increase the count to 8.
41+
- **Decreasing workers:** The orchestrator halves the current number of workers.
42+
- Example: If you have 4 workers and scaling down is required, the system will reduce the count to 2.
43+
44+
In any case, you can't have less than one worker. Therefore, even if the pipeline is very light and only one worker is running, the orchestrator cannot reduce the number further.
45+
46+
The scaling process scales only by increasing or decreasing the number of **processes**. Threads are **not used for dynamic scaling** in this mechanism. This way, if your initial configuration uses thread workers or uses both, threads and processes, the scaling will only change the process number. For example, if you launched the computation with one process, containing two workers, the upscaling will lead to two processes, having two workers each. On the other hand, downscaling from the initial configuration in this case won't be possible, since the number of processes is already equal to one.
47+
48+
### License Limitations
49+
50+
You need a Pathway License in order for the scaling to work. You can obtain your free Pathway Scale license [here](/get-license). The page contains instructions for getting the license and using it in the pipeline.
51+
52+
Please note that the free scaling license has a constraint on the maximum worker count: **no more than 8**. This way, if the free tier is used, once the system uses 8 workers, it won't scale up, even if the computation falls behind. Note also that if the system scales up, the number of workers after scaling in the free tier can't exceed 8.
53+
54+
## Configuring and Running
55+
56+
With these restrictions in mind, you are ready to configure and run auto-scaling.
57+
58+
First, you need to create a persistence configuration to preserve the computation state and progress between worker restarts, as without it, once restarted, the computation will commence from the beginning. A simple version would look as follows:
59+
60+
```python
61+
from pathway.internals import api
62+
63+
persistence_config = pw.persistence.Config(
64+
backend=pw.persistence.Backend.filesystem(your_persistent_storage_path), # e.g., /tmp/Pathway-Cache
65+
persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
66+
)
67+
```
68+
69+
Please note the `persistence_mode` parameter: in high-load scenarios, it is crucial to use `OPERATOR_PERSISTING`. This mode allows the system to dump only the state of internal computation structures, avoiding heavy recomputations that may occur if upscaling becomes necessary.
70+
71+
However, this configuration does not yet include scaling settings; in this form, scaling remains disabled. You need to enable it by toggling the `worker_scaling_enabled` flag:
72+
73+
```python
74+
from pathway.internals import api
75+
76+
persistence_config = pw.persistence.Config(
77+
backend=pw.persistence.Backend.filesystem(your_persistent_storage_path), # e.g., /tmp/Pathway-Cache
78+
persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
79+
worker_scaling_enabled=True,
80+
)
81+
```
82+
83+
With this setting, the program will track the workload and be capable of scaling up and down. By default, statistics are computed over a two-minute window. You can change this by specifying the number of milliseconds in the `workload_tracking_window_ms` parameter:
84+
85+
```python
86+
from pathway.internals import api
87+
88+
persistence_config = pw.persistence.Config(
89+
backend=pw.persistence.Backend.filesystem(your_persistent_storage_path), # e.g., /tmp/Pathway-Cache
90+
persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
91+
worker_scaling_enabled=True,
92+
workload_tracking_window_ms=300000, # 5 minutes
93+
)
94+
```
95+
96+
Keep in mind that this value should not be too small. At startup, data sources may not kick off immediately, taking several seconds to begin providing data. During these initial seconds, the graph will be underloaded because there is no computation to perform without input. Therefore, ensure the window allows for this initial startup duration and is at least 20-30 seconds long.
97+
98+
Once you have configured the persistence settings, pass the object as the `persistence_config` parameter in the `pw.run` method:
99+
100+
```python
101+
pw.run(persistence_config=persistence_config)
102+
```
103+
104+
Finally, you can spawn the execution using a console command, for example: `pathway spawn -n 2 python pipeline.py`. This command starts the pipeline, having initially two workers, each one being a process.
105+
106+
## Conclusion
107+
108+
To manage a **dynamic number of workers**, follow these steps:
109+
110+
1. **Configure persistence**
111+
It is strongly recommended to use **operator persistence** (`OPERATOR_PERSISTING`) to ensure computation state is safely stored between worker restarts and the restarts are as fast as possible.
112+
113+
2. **Enable worker scaling**
114+
In your persistence configuration, set the `worker_scaling_enabled` flag. By default, scaling is disabled.
115+
116+
3. **Adjust the workload tracking window**
117+
Set the appropriate `workload_tracking_window_ms` to control how the orchestrator evaluates workload patterns or leave the default, which is two minutes.
118+
119+
4. **Start the computation**
120+
Launch your pipeline with `pathway spawn`. You can also specify the **initial number of workers** at startup:
121+
122+
```bash
123+
pathway spawn -n <initial_worker_count> python pipeline.py
124+
```
125+
126+
If you have any questions, feel free to reach out on [Discord](http://discord.com/invite/pathway) or open an issue on our [GitHub](https://github.com/pathwaycom/pathway/issues/).

external/timely-dataflow/timely/src/worker.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,17 @@ pub trait AsWorker : Scheduler {
206206
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely") }
207207
}
208208

209+
/// Contains statistics from `step_or_park` method.
210+
#[derive(Debug)]
211+
pub struct WorkerStepStats {
212+
/// Denotes if more computational steps are needed.
213+
pub has_more_work: bool,
214+
215+
/// Contains the duration spent on computation, excluding
216+
/// time spent during possible thread parking.
217+
pub compute_duration: Duration,
218+
}
219+
209220
/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
210221
/// and has a list of dataflows that it manages.
211222
pub struct Worker<A: Allocate> {
@@ -299,7 +310,7 @@ impl<A: Allocate> Worker<A> {
299310
/// worker.step();
300311
/// });
301312
/// ```
302-
pub fn step(&mut self) -> bool {
313+
pub fn step(&mut self) -> WorkerStepStats {
303314
self.step_or_park(Some(Duration::from_secs(0)))
304315
}
305316

@@ -330,7 +341,7 @@ impl<A: Allocate> Worker<A> {
330341
/// worker.step_or_park(Some(Duration::from_secs(1)));
331342
/// });
332343
/// ```
333-
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
344+
pub fn step_or_park(&mut self, duration: Option<Duration>) -> WorkerStepStats {
334345

335346
{ // Process channel events. Activate responders.
336347
let mut allocator = self.allocator.borrow_mut();
@@ -367,7 +378,7 @@ impl<A: Allocate> Worker<A> {
367378
(x, y) => x.or(y),
368379
};
369380

370-
if delay != Some(Duration::new(0,0)) {
381+
let compute_duration = if delay != Some(Duration::new(0,0)) {
371382

372383
// Log parking and flush log.
373384
if let Some(l) = self.logging().as_mut() {
@@ -381,8 +392,12 @@ impl<A: Allocate> Worker<A> {
381392

382393
// Log return from unpark.
383394
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
395+
396+
// Nothing happens, the thread was parked all the time
397+
Duration::ZERO
384398
}
385399
else { // Schedule active dataflows.
400+
let computation_started_at = Instant::now();
386401

387402
let active_dataflows = &mut self.active_dataflows;
388403
self.activations
@@ -404,12 +419,17 @@ impl<A: Allocate> Worker<A> {
404419
}
405420
}
406421
}
407-
}
422+
423+
computation_started_at.elapsed()
424+
};
408425

409426
// Clean up, indicate if dataflows remain.
410427
self.logging.borrow_mut().flush();
411428
self.allocator.borrow_mut().release();
412-
!self.dataflows.borrow().is_empty()
429+
WorkerStepStats {
430+
has_more_work: !self.dataflows.borrow().is_empty(),
431+
compute_duration
432+
}
413433
}
414434

415435
/// Calls `self.step()` as long as `func` evaluates to true.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import pytest
2+
3+
from pathway.tests.utils import UniquePortDispenser
4+
5+
# The configuration is different because there are many workers
6+
# and each one must provide unique range of 8 consecutive ports,
7+
# (as the max number of workers) not just one as in other tests
8+
PORT_DISPENSER = UniquePortDispenser(
9+
range_start=1000,
10+
worker_range_size=600,
11+
step_size=8,
12+
)
13+
14+
15+
@pytest.fixture
16+
def port(testrun_uid):
17+
yield PORT_DISPENSER.get_unique_port(testrun_uid)
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import argparse
2+
import json
3+
import logging
4+
import time
5+
6+
import pathway as pw
7+
from pathway.internals import api
8+
9+
10+
class StreamerSubject(pw.io.python.ConnectorSubject):
11+
12+
def __init__(self, rate: int, *args, **kwargs):
13+
super().__init__(*args, **kwargs)
14+
self.rate = rate
15+
self.current_number = 10000000000000
16+
17+
def run(self):
18+
while True:
19+
second_started_at = time.time()
20+
for index in range(self.rate):
21+
event_time = time.time()
22+
next_json = {
23+
"number": self.current_number,
24+
}
25+
self.current_number += 1
26+
27+
self.next_json(next_json)
28+
if index > 0 or index == self.rate - 1:
29+
expected_duration = index * 1.0 / self.rate
30+
actual_duration = event_time - second_started_at
31+
if actual_duration > expected_duration + 0.1:
32+
logging.warning(
33+
"The streaming severely falls behind the target frequency"
34+
)
35+
elif actual_duration < expected_duration:
36+
time.sleep(expected_duration - actual_duration)
37+
38+
39+
@pw.udf(deterministic=True)
40+
def is_prime(event_json) -> bool:
41+
number = json.loads(event_json)["number"]
42+
if number < 2:
43+
return False
44+
45+
is_prime_flag = number % 2 != 0
46+
i = 3
47+
while i * i <= number and is_prime_flag:
48+
if number % i == 0:
49+
is_prime_flag = False
50+
break
51+
i += 2
52+
53+
return is_prime_flag
54+
55+
56+
if __name__ == "__main__":
57+
parser = argparse.ArgumentParser()
58+
parser.add_argument("--rate", type=int, required=True)
59+
parser.add_argument("--persistent-storage-path", type=str, required=True)
60+
args = parser.parse_args()
61+
62+
table = pw.io.python.read(subject=StreamerSubject(rate=args.rate), format="raw")
63+
table = table.select(prime=is_prime(pw.this.data))
64+
pw.io.null.write(table)
65+
66+
pw.run(
67+
persistence_config=pw.persistence.Config(
68+
backend=pw.persistence.Backend.filesystem(args.persistent_storage_path),
69+
persistence_mode=api.PersistenceMode.OPERATOR_PERSISTING,
70+
worker_scaling_enabled=True,
71+
workload_tracking_window_ms=60000,
72+
),
73+
monitoring_level=pw.MonitoringLevel.NONE,
74+
)

0 commit comments

Comments
 (0)