Skip to content

Commit cec80ff

Browse files
committed
fix: use keyed_window.keys for task lookup in accumulator task manager
payload.keys is empty on CLOSE operations since no data is attached, causing "accumulator task not found" errors. keyed_window.keys is the authoritative key identity populated for all operation types. Also updates tests to set keyedWindow.keys independently from payload.keys to match real platform behavior. Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent a66cdda commit cec80ff

2 files changed

Lines changed: 28 additions & 9 deletions

File tree

packages/pynumaflow/pynumaflow/accumulator/servicer/task_manager.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,9 @@ async def close_task(self, req):
104104
3. Wait for all the results from the task to be written to the global result queue
105105
4. Remove the task from the tracker
106106
"""
107-
d = req.payload
108-
keys = d.keys
107+
# Use keyed_window.keys for task lookup since payload.keys may be empty
108+
# (e.g., CLOSE operations don't carry data, so payload.keys is not populated).
109+
keys = req.keyed_window.keys
109110
unified_key = build_unique_key_name(keys)
110111
curr_task = self.tasks.get(unified_key, None)
111112

@@ -127,7 +128,9 @@ async def create_task(self, req):
127128
it creates a new task or appends the request to the existing task.
128129
"""
129130
d = req.payload
130-
keys = d.keys
131+
# Use keyed_window.keys for task lookup — the authoritative key identity
132+
# for the window, consistent across all operation types (OPEN, APPEND, CLOSE).
133+
keys = req.keyed_window.keys
131134
unified_key = build_unique_key_name(keys)
132135
curr_task = self.tasks.get(unified_key, None)
133136

@@ -178,7 +181,8 @@ async def send_datum_to_task(self, req):
178181
If the task does not exist, create it.
179182
"""
180183
d = req.payload
181-
keys = d.keys
184+
# Use keyed_window.keys for task lookup to match the key used in create_task/close_task.
185+
keys = req.keyed_window.keys
182186
unified_key = build_unique_key_name(keys)
183187
result = self.tasks.get(unified_key, None)
184188
if not result:

packages/pynumaflow/tests/accumulator/test_async_accumulator.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
def request_generator(count, request, resetkey: bool = False, send_close: bool = False):
3131
for i in range(count):
3232
if resetkey:
33-
# Clear previous keys and add new ones
33+
# Update keys on both payload and keyedWindow to match real platform behavior
3434
del request.payload.keys[:]
3535
request.payload.keys.extend([f"key-{i}"])
36+
del request.operation.keyedWindow.keys[:]
37+
request.operation.keyedWindow.keys.extend([f"key-{i}"])
3638

3739
# Set operation based on index - first is OPEN, rest are APPEND
3840
if i == 0:
@@ -52,9 +54,11 @@ def request_generator(count, request, resetkey: bool = False, send_close: bool =
5254
def request_generator_append_only(count, request, resetkey: bool = False):
5355
for i in range(count):
5456
if resetkey:
55-
# Clear previous keys and add new ones
57+
# Update keys on both payload and keyedWindow to match real platform behavior
5658
del request.payload.keys[:]
5759
request.payload.keys.extend([f"key-{i}"])
60+
del request.operation.keyedWindow.keys[:]
61+
request.operation.keyedWindow.keys.extend([f"key-{i}"])
5862

5963
# Set operation to APPEND for all requests
6064
request.operation.event = accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND
@@ -64,9 +68,11 @@ def request_generator_append_only(count, request, resetkey: bool = False):
6468
def request_generator_mixed(count, request, resetkey: bool = False):
6569
for i in range(count):
6670
if resetkey:
67-
# Clear previous keys and add new ones
71+
# Update keys on both payload and keyedWindow to match real platform behavior
6872
del request.payload.keys[:]
6973
request.payload.keys.extend([f"key-{i}"])
74+
del request.operation.keyedWindow.keys[:]
75+
request.operation.keyedWindow.keys.extend([f"key-{i}"])
7076

7177
if i % 2 == 0:
7278
# Set operation to APPEND for even requests
@@ -107,17 +113,26 @@ def start_request() -> accumulator_pb2.AccumulatorRequest:
107113

108114
def start_request_without_open() -> accumulator_pb2.AccumulatorRequest:
109115
event_time_timestamp, watermark_timestamp = get_time_args()
110-
116+
window = accumulator_pb2.KeyedWindow(
117+
start=mock_interval_window_start(),
118+
end=mock_interval_window_end(),
119+
slot="slot-0",
120+
keys=["test_key"],
121+
)
111122
payload = accumulator_pb2.Payload(
112123
keys=["test_key"],
113124
value=mock_message(),
114125
event_time=event_time_timestamp,
115126
watermark=watermark_timestamp,
116127
id="test_id",
117128
)
118-
129+
operation = accumulator_pb2.AccumulatorRequest.WindowOperation(
130+
event=accumulator_pb2.AccumulatorRequest.WindowOperation.Event.APPEND,
131+
keyedWindow=window,
132+
)
119133
request = accumulator_pb2.AccumulatorRequest(
120134
payload=payload,
135+
operation=operation,
121136
)
122137
return request
123138

0 commit comments

Comments
 (0)