Skip to content

Commit 7d722f5

Browse files
committed
Bugfix: Harden cancel state for async capture runners
- Add OperationState.CANCELLED to capture operation states - Set cancelled state when runner catches CancelledError - Track runner task in operation metadata for callbacks - Add task done callback to mark cancelled on task cancel - Prevent race where cancelled operations report null state - Keep final state logging in runner finally block - 2026-02-28 23:24:40
1 parent 1c49305 commit 7d722f5

2 files changed

Lines changed: 81 additions & 59 deletions

File tree

src/pypnm/api/routes/advance/common/capture_service.py

Lines changed: 78 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ async def start(self) -> tuple[GroupId, OperationId]:
105105
"duration": self.duration,
106106
"interval": self.interval,
107107
"time_remaining": self.time_remaining,
108-
"samples": []
108+
"samples": [],
109+
"task": None,
109110
}
110111

111112
self.setOperationFinalInvocation(operation_id, False)
@@ -115,78 +116,96 @@ async def start(self) -> tuple[GroupId, OperationId]:
115116
f"({self.duration}s @ {self.interval}s interval)")
116117

117118
async def _runner() -> None:
119+
try:
120+
end_time = start_time + self.duration
118121

119-
end_time = start_time + self.duration
120-
121-
while (time.time() < end_time) and self._ops[operation_id]["state"] == OperationState.RUNNING:
122-
123-
now = time.time()
124-
remaining = max(0, int(end_time - now))
125-
self._ops[operation_id]["time_remaining"] = remaining
126-
iteration_ts = Generate.time_stamp()
127-
128-
# Add a waitup front so that it can goto the next function
129-
await asyncio.sleep(self.interval)
130-
131-
try:
132-
msg_rsp = await self._capture_message_response()
133-
samples = self._process_captures(msg_rsp)
134-
for sample in samples:
135-
self._ops[operation_id]["samples"].append(sample)
136-
if sample.transaction_id:
137-
self._cap_group.add_transaction(sample.transaction_id)
138-
self.logger.debug(f"[{operation_id}] Captured sample txn={sample.transaction_id}")
139-
140-
except Exception as exc:
141-
error_msg = str(exc)
142-
self.logger.error(f"[{operation_id}] Capture error: {error_msg}", exc_info=True)
143-
self._ops[operation_id]["samples"].append(CaptureSample(timestamp = cast(TimeStamp, iteration_ts),
144-
transaction_id = "",
145-
filename = "",
146-
error = error_msg))
147-
148-
# Complete if still running
149-
if self._ops[operation_id]["state"] == OperationState.RUNNING:
150-
151-
self._ops[operation_id]["state"] = OperationState.COMPLETED
152-
iteration_ts = time.time()
153-
154-
try:
122+
while (time.time() < end_time) and self._ops[operation_id]["state"] == OperationState.RUNNING:
155123

156-
self.logger.debug(f'Runner ended, Final Invocation , One Last Cycle before ending'
157-
f'state={self._ops[operation_id]["state"]}'
158-
f'time-remaining={self._ops[operation_id]["time_remaining"]}')
124+
now = time.time()
125+
remaining = max(0, int(end_time - now))
126+
self._ops[operation_id]["time_remaining"] = remaining
127+
iteration_ts = Generate.time_stamp()
159128

160-
self.setOperationFinalInvocation(operation_id, True)
161-
msg_rsp:MessageResponse = await self._capture_message_response()
129+
# Add a waitup front so that it can goto the next function
130+
await asyncio.sleep(self.interval)
162131

163-
# This is here to before any last operation at the time of the completion of the task
164-
if msg_rsp.status == ServiceStatusCode.SKIP_MESSAGE_RESPONSE:
165-
self.logger.info('Skipping last _capture_message_response()')
166-
else:
132+
try:
133+
msg_rsp = await self._capture_message_response()
167134
samples = self._process_captures(msg_rsp)
168135
for sample in samples:
169136
self._ops[operation_id]["samples"].append(sample)
170137
if sample.transaction_id:
171138
self._cap_group.add_transaction(sample.transaction_id)
172-
self.logger.info(f"[{operation_id}] Captured sample txn={sample.transaction_id}")
173-
174-
except Exception as exc:
175-
error_msg = str(exc)
176-
self.logger.error(f"[{operation_id}] Capture error: {error_msg}", exc_info=True)
177-
self._ops[operation_id]["samples"].append(
178-
CaptureSample(timestamp = cast(TimeStamp, iteration_ts),
179-
transaction_id = "",
180-
filename = "",
181-
error =error_msg))
182-
183-
self.logger.info(f"[{operation_id}] Capture session ended with state={self._ops[operation_id]['state']}")
139+
self.logger.debug(f"[{operation_id}] Captured sample txn={sample.transaction_id}")
140+
141+
except Exception as exc:
142+
error_msg = str(exc)
143+
self.logger.error(f"[{operation_id}] Capture error: {error_msg}", exc_info=True)
144+
self._ops[operation_id]["samples"].append(CaptureSample(timestamp = cast(TimeStamp, iteration_ts),
145+
transaction_id = "",
146+
filename = "",
147+
error = error_msg))
148+
149+
# Complete if still running
150+
if self._ops[operation_id]["state"] == OperationState.RUNNING:
151+
152+
self._ops[operation_id]["state"] = OperationState.COMPLETED
153+
iteration_ts = time.time()
154+
155+
try:
156+
157+
self.logger.debug(f'Runner ended, Final Invocation , One Last Cycle before ending'
158+
f'state={self._ops[operation_id]["state"]}'
159+
f'time-remaining={self._ops[operation_id]["time_remaining"]}')
160+
161+
self.setOperationFinalInvocation(operation_id, True)
162+
msg_rsp:MessageResponse = await self._capture_message_response()
163+
164+
# This is here to before any last operation at the time of the completion of the task
165+
if msg_rsp.status == ServiceStatusCode.SKIP_MESSAGE_RESPONSE:
166+
self.logger.info('Skipping last _capture_message_response()')
167+
else:
168+
samples = self._process_captures(msg_rsp)
169+
for sample in samples:
170+
self._ops[operation_id]["samples"].append(sample)
171+
if sample.transaction_id:
172+
self._cap_group.add_transaction(sample.transaction_id)
173+
self.logger.info(f"[{operation_id}] Captured sample txn={sample.transaction_id}")
174+
175+
except Exception as exc:
176+
error_msg = str(exc)
177+
self.logger.error(f"[{operation_id}] Capture error: {error_msg}", exc_info=True)
178+
self._ops[operation_id]["samples"].append(
179+
CaptureSample(timestamp = cast(TimeStamp, iteration_ts),
180+
transaction_id = "",
181+
filename = "",
182+
error =error_msg))
183+
except asyncio.CancelledError:
184+
if operation_id in self._ops and self._ops[operation_id]["state"] == OperationState.RUNNING:
185+
self._ops[operation_id]["state"] = OperationState.CANCELLED
186+
self.logger.info(f"[{operation_id}] Capture session cancelled")
187+
raise
188+
finally:
189+
if operation_id in self._ops:
190+
self.logger.info(
191+
f"[{operation_id}] Capture session ended with state={self._ops[operation_id]['state']}",
192+
)
184193

185194
###############
186195
# Main RUNNER #
187196
###############
197+
def _on_runner_done(task: asyncio.Task[None]) -> None:
198+
op = self._ops.get(operation_id)
199+
if op is None:
200+
return
201+
if task.cancelled() and op.get("state") in (OperationState.RUNNING, None):
202+
op["state"] = OperationState.CANCELLED
203+
self.logger.info(f"[{operation_id}] Capture session marked cancelled by task callback")
204+
188205
try:
189-
asyncio.create_task(_runner())
206+
task = asyncio.create_task(_runner())
207+
self._ops[operation_id]["task"] = task
208+
task.add_done_callback(_on_runner_done)
190209
except Exception as exc:
191210
self.logger.error(f"Failed to schedule capture runner task, reason={exc}", exc_info=True)
192211
raise

src/pypnm/api/routes/advance/common/operation_state.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ class OperationState(str, Enum):
1515
The capture task is currently in progress.
1616
COMPLETED:
1717
The capture task finished normally (duration elapsed).
18+
CANCELLED:
19+
The capture task was cancelled by task cancellation.
1820
STOPPED:
1921
The capture task was halted early by user request.
2022
UNKNOWN:
@@ -23,5 +25,6 @@ class OperationState(str, Enum):
2325

2426
RUNNING = "running" # Task is active and samples are being collected
2527
COMPLETED = "completed" # Task reached its full duration and ended
28+
CANCELLED = "cancelled" # Task was cancelled (asyncio task cancellation)
2629
STOPPED = "stopped" # Task was explicitly stopped before completion
2730
UNKNOWN = "unknown" # No such operation ID or state is indeterminate

0 commit comments

Comments
 (0)