We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 287b841 commit e8474f0Copy full SHA for e8474f0
1 file changed
packages/pynumaflow/pynumaflow/sinker/servicer/sync_servicer.py
@@ -35,6 +35,7 @@ def SinkFn(
35
Applies a sink function to datum elements.
36
"""
37
38
+ req_queue = None
39
try:
40
# The first message to be received should be a valid handshake
41
req = next(request_iterator)
@@ -89,7 +90,8 @@ def SinkFn(
89
90
# (e.g. gRPC stream broke while the handler was waiting for the next message).
91
# This lets it exit gracefully and release any user-held resources
92
# before the process shuts down.
- req_queue.close()
93
+ if req_queue is not None:
94
+ req_queue.close()
95
self.error = err
96
self.shutdown_event.set()
97
return
0 commit comments