Skip to content

Commit 29961b2

Browse files
authored
Partition data (#543)
1 parent a80a562 commit 29961b2

2 files changed

Lines changed: 104 additions & 1 deletion

File tree

docs/partition-data.md

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Partition Data
2+
3+
**Partition** is the process of dividing a data stream into multiple **substreams**, while preserving the **event order** within each substream as it appeared in the original stream.
4+
5+
Each substream created through partitioning maintains its own **clock** and **timestamp**, which are used internally by the streaming engine to calculate the **watermark**. Because each substream has an **independent clock**, its timestamp progression — and thus its stream processing — is **isolated** from other substreams.
6+
7+
Partitioning is essential for scenarios where different entities in a stream have **unaligned clocks or timestamps**. Without partitioning, performing time-based window operations (e.g., tumbling or hopping windows) across the entire stream could result in inaccurate aggregations or dropped events.
8+
9+
## Syntax
10+
11+
```sql
12+
SELECT ...
13+
FROM ...
14+
PARTITION BY col, ...
15+
GROUP BY col1, col2, ...
16+
EMIT ...;
17+
```
18+
19+
## Example
20+
21+
Consider a stream containing sensor metrics from **10,000 vehicles**, all published into a single Kafka partition.
22+
- Each vehicle’s onboard sensor generates timestamps based on its local clock, which may not be synchronized with others.
23+
- You may also want to analyze driver behavior or performance metrics per vehicle individually.
24+
25+
By partitioning the stream using the `vehicle_id`, Timeplus can create 10,000 substreams — one for each vehicle — ensuring:
26+
- Event order is preserved per vehicle
27+
- Each substream advances its watermark independently
28+
- Windowing and aggregation can be performed correctly without data skipping or skew
29+
30+
```sql
31+
CREATE STREAM trucks
32+
(
33+
lpn string, -- license plate number
34+
lat float32, -- latitude
35+
lon float32, -- longitude
36+
spd float32, -- speed
37+
mil float32, -- mileage
38+
state string,
39+
city string
40+
);
41+
42+
SELECT
43+
window_start,
44+
lpn,
45+
count()
46+
FROM tumble(trucks, 2s)
47+
PARTITION BY lpn
48+
GROUP BY window_start, lpn;
49+
```
50+
51+
**Explanation**:
52+
- This query partitions the `trucks` stream by the license plate number (`lpn`).
53+
- Each unique lpn forms an **independent substream**.
54+
- A **2-second tumbling window** is applied to each substream to count the number of events per truck.
55+
56+
**Sample Events**:
57+
58+
The following events come from two trucks (lic1 and lic2) whose timestamps are unaligned across regions.
59+
60+
```sql
61+
-- Truck lic1 in San Francisco
62+
insert into trucks(lpn, lat, lon, spd, mil, state, city, _tp_time) values ('lic1', 37.7749, 37.7749, 23.5, 12462, 'CA', 'San Francisco', '2015-10-01 13:00:44');
63+
64+
-- Truck lic2 in Manhattan
65+
insert into trucks(lpn, lat, lon, spd, mil, state, city, _tp_time) values ('lic2', 40.7128, 74.0060, 32.3, 32567, 'NY', 'Manhattan', '2015-10-01 20:10:22');
66+
67+
-- Truck lic1 in San Francisco
68+
insert into trucks(lpn, lat, lon, spd, mil, state, city, _tp_time) values ('lic1', 37.7341, 37.7258, 13.5, 12472, 'CA', 'San Francisco', '2015-10-01 13:00:45');
69+
70+
-- Truck lic2 in Manhattan
71+
insert into trucks(lpn, lat, lon, spd, mil, state, city, _tp_time) values ('lic2', 40.7325, 74.0213, 32.4, 32570, 'NY', 'Manhattan', '2015-10-01 20:10:23');
72+
73+
-- Truck lic1 in San Francisco
74+
insert into trucks(lpn, lat, lon, spd, mil, state, city, _tp_time) values ('lic1', 37.7719, 37.7730, 33.5, 12473, 'CA', 'San Francisco', '2015-10-01 13:00:46');
75+
76+
-- Truck lic2 in Manhattan
77+
insert into trucks(lpn, lat, lon, spd, mil, state, city, _tp_time) values ('lic2', 40.7335, 74.0223, 12.3, 32581, 'NY', 'Manhattan', '2015-10-01 20:10:24');
78+
```
79+
80+
**Output**:
81+
```
82+
┌────────────window_start─┬─lpn──┬─count()─┐
83+
│ 2015-10-01 13:00:44.000 │ lic1 │ 2 │
84+
└─────────────────────────┴──────┴─────────┘
85+
┌────────────window_start─┬─lpn──┬─count()─┐
86+
│ 2015-10-01 20:10:22.000 │ lic2 │ 2 │
87+
└─────────────────────────┴──────┴─────────┘
88+
```
89+
90+
**Key Takeaway**
91+
Even though the two trucks (`lic1` and `lic2`) have unaligned clocks, partitioning ensures that:
92+
- Each truck’s events are processed **independently** in its own substream.
93+
- Tumbling windows operate **correctly and in order** per substream.
94+
- There is **no timestamp interference** between trucks, avoiding data skew or dropped events.
95+
96+
## Performance Considerations
97+
98+
The performance of partitioning is closely tied to the number of substreams created after partitioning.
99+
Each substream maintains its own independent watermark and timestamp, which increases memory and scheduling overhead as the number of partitions grows.
100+
101+
In practice, if you have a very large number of substreams, consider:
102+
- Applying light [data shuffling](/shuffle-data) to balance the workload.
103+
- Using high-performance [sessionization](/global-aggregation#emit-after-session-close) techniques to optimize aggregation efficiency across partitions.

docs/shuffle-data.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ FROM ...
1919
SHUFFLE BY col1, ...
2020
GROUP BY col1, col2, ...
2121
EMIT ...
22-
SETTINGS substreams=<num-sub-streams>
22+
SETTINGS substreams=<num_sub_streams>;
2323
```
2424

2525
> Note: The columns in the `SHUFFLE BY` clause must be a subset of the `GROUP BY` columns to ensure correct aggregation results.

0 commit comments

Comments
 (0)