Skip to content

Commit e0e2624

Browse files
committed
More documentation comments
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
1 parent 6b360ac commit e0e2624

10 files changed

Lines changed: 55 additions & 30 deletions

File tree

packages/pynumaflow/pynumaflow/accumulator/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,12 @@ async def _watch_for_shutdown():
220220
try:
221221
await server.wait_for_termination()
222222
except asyncio.CancelledError:
223-
# SIGTERM received — aiorun cancels all tasks. We must stop
224-
# the gRPC server explicitly so its __del__ doesn't try to
225-
# schedule a coroutine on the already-closed event loop.
223+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
224+
# path (where _watch_for_shutdown calls server.stop()), this path
225+
# must stop the gRPC server explicitly. Without this, the server
226+
# object is never stopped and when it is garbage-collected, its
227+
# __del__ tries to schedule a cleanup coroutine on an event loop
228+
# that is already closed, causing errors/warnings.
226229
_LOGGER.info("Received cancellation, stopping server gracefully...")
227230
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
228231

packages/pynumaflow/pynumaflow/batchmapper/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,12 @@ async def _watch_for_shutdown():
155155
try:
156156
await server.wait_for_termination()
157157
except asyncio.CancelledError:
158-
# SIGTERM received — aiorun cancels all tasks. We must stop
159-
# the gRPC server explicitly so its __del__ doesn't try to
160-
# schedule a coroutine on the already-closed event loop.
158+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
159+
# path (where _watch_for_shutdown calls server.stop()), this path
160+
# must stop the gRPC server explicitly. Without this, the server
161+
# object is never stopped and when it is garbage-collected, its
162+
# __del__ tries to schedule a cleanup coroutine on an event loop
163+
# that is already closed, causing errors/warnings.
161164
_LOGGER.info("Received cancellation, stopping server gracefully...")
162165
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
163166

packages/pynumaflow/pynumaflow/mapper/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,12 @@ async def _watch_for_shutdown():
153153
try:
154154
await server.wait_for_termination()
155155
except asyncio.CancelledError:
156-
# SIGTERM received — aiorun cancels all tasks. We must stop
157-
# the gRPC server explicitly so its __del__ doesn't try to
158-
# schedule a coroutine on the already-closed event loop.
156+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
157+
# path (where _watch_for_shutdown calls server.stop()), this path
158+
# must stop the gRPC server explicitly. Without this, the server
159+
# object is never stopped and when it is garbage-collected, its
160+
# __del__ tries to schedule a cleanup coroutine on an event loop
161+
# that is already closed, causing errors/warnings.
159162
_LOGGER.info("Received cancellation, stopping server gracefully...")
160163
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
161164

packages/pynumaflow/pynumaflow/mapstreamer/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,12 @@ async def _watch_for_shutdown():
174174
try:
175175
await server.wait_for_termination()
176176
except asyncio.CancelledError:
177-
# SIGTERM received — aiorun cancels all tasks. We must stop
178-
# the gRPC server explicitly so its __del__ doesn't try to
179-
# schedule a coroutine on the already-closed event loop.
177+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
178+
# path (where _watch_for_shutdown calls server.stop()), this path
179+
# must stop the gRPC server explicitly. Without this, the server
180+
# object is never stopped and when it is garbage-collected, its
181+
# __del__ tries to schedule a cleanup coroutine on an event loop
182+
# that is already closed, causing errors/warnings.
180183
_LOGGER.info("Received cancellation, stopping server gracefully...")
181184
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
182185

packages/pynumaflow/pynumaflow/reducer/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,12 @@ async def _watch_for_shutdown():
206206
try:
207207
await server.wait_for_termination()
208208
except asyncio.CancelledError:
209-
# SIGTERM received — aiorun cancels all tasks. We must stop
210-
# the gRPC server explicitly so its __del__ doesn't try to
211-
# schedule a coroutine on the already-closed event loop.
209+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
210+
# path (where _watch_for_shutdown calls server.stop()), this path
211+
# must stop the gRPC server explicitly. Without this, the server
212+
# object is never stopped and when it is garbage-collected, its
213+
# __del__ tries to schedule a cleanup coroutine on an event loop
214+
# that is already closed, causing errors/warnings.
212215
_LOGGER.info("Received cancellation, stopping server gracefully...")
213216
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
214217

packages/pynumaflow/pynumaflow/reducer/servicer/async_servicer.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,7 @@ async def ReduceFn(
159159

160160
except BaseException as e:
161161
_LOGGER.critical("Reduce Error", exc_info=True)
162-
err_msg = f"{ERR_UDF_EXCEPTION_STRING}: {repr(e)}"
163-
_LOGGER.critical(err_msg, exc_info=True)
164-
update_context_err(context, e, err_msg)
162+
update_context_err(context, e, f"{ERR_UDF_EXCEPTION_STRING}: {repr(e)}")
165163
self._error = e
166164
if self._shutdown_event is not None:
167165
self._shutdown_event.set()

packages/pynumaflow/pynumaflow/reducestreamer/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,12 @@ async def _watch_for_shutdown():
223223
try:
224224
await server.wait_for_termination()
225225
except asyncio.CancelledError:
226-
# SIGTERM received — aiorun cancels all tasks. We must stop
227-
# the gRPC server explicitly so its __del__ doesn't try to
228-
# schedule a coroutine on the already-closed event loop.
226+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
227+
# path (where _watch_for_shutdown calls server.stop()), this path
228+
# must stop the gRPC server explicitly. Without this, the server
229+
# object is never stopped and when it is garbage-collected, its
230+
# __del__ tries to schedule a cleanup coroutine on an event loop
231+
# that is already closed, causing errors/warnings.
229232
_LOGGER.info("Received cancellation, stopping server gracefully...")
230233
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
231234

packages/pynumaflow/pynumaflow/sinker/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,12 @@ async def _watch_for_shutdown():
177177
try:
178178
await server.wait_for_termination()
179179
except asyncio.CancelledError:
180-
# SIGTERM received — aiorun cancels all tasks. We must stop
181-
# the gRPC server explicitly so its __del__ doesn't try to
182-
# schedule a coroutine on the already-closed event loop.
180+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
181+
# path (where _watch_for_shutdown calls server.stop()), this path
182+
# must stop the gRPC server explicitly. Without this, the server
183+
# object is never stopped and when it is garbage-collected, its
184+
# __del__ tries to schedule a cleanup coroutine on an event loop
185+
# that is already closed, causing errors/warnings.
183186
_LOGGER.info("Received cancellation, stopping server gracefully...")
184187
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
185188

packages/pynumaflow/pynumaflow/sourcer/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,12 @@ async def _watch_for_shutdown():
214214
try:
215215
await server.wait_for_termination()
216216
except asyncio.CancelledError:
217-
# SIGTERM received — aiorun cancels all tasks. We must stop
218-
# the gRPC server explicitly so its __del__ doesn't try to
219-
# schedule a coroutine on the already-closed event loop.
217+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
218+
# path (where _watch_for_shutdown calls server.stop()), this path
219+
# must stop the gRPC server explicitly. Without this, the server
220+
# object is never stopped and when it is garbage-collected, its
221+
# __del__ tries to schedule a cleanup coroutine on an event loop
222+
# that is already closed, causing errors/warnings.
220223
_LOGGER.info("Received cancellation, stopping server gracefully...")
221224
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
222225

packages/pynumaflow/pynumaflow/sourcetransformer/async_server.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,12 @@ async def _watch_for_shutdown():
176176
try:
177177
await server.wait_for_termination()
178178
except asyncio.CancelledError:
179-
# SIGTERM received — aiorun cancels all tasks. We must stop
180-
# the gRPC server explicitly so its __del__ doesn't try to
181-
# schedule a coroutine on the already-closed event loop.
179+
# SIGTERM received — aiorun cancels all tasks. Unlike the UDF-error
180+
# path (where _watch_for_shutdown calls server.stop()), this path
181+
# must stop the gRPC server explicitly. Without this, the server
182+
# object is never stopped and when it is garbage-collected, its
183+
# __del__ tries to schedule a cleanup coroutine on an event loop
184+
# that is already closed, causing errors/warnings.
182185
_LOGGER.info("Received cancellation, stopping server gracefully...")
183186
await server.stop(NUMAFLOW_GRPC_SHUTDOWN_GRACE_PERIOD_SECONDS)
184187

0 commit comments

Comments
 (0)