Skip to content

Commit a674ec1

Browse files
committed
Add safety hatch in-case server doesn't clos its side of the stream in time (300s)
1 parent 66d5c96 commit a674ec1

3 files changed

Lines changed: 29 additions & 1 deletion

File tree

weaviate/collections/batch/async_.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from weaviate.collections.batch.base import (
1616
GCP_STREAM_TIMEOUT,
17+
SHUTDOWN_TIMEOUT,
1718
ObjectsBatchRequest,
1819
ReferencesBatchRequest,
1920
_BatchDataWrapper,
@@ -182,8 +183,20 @@ async def recv_wrapper() -> None:
182183
loop=loop,
183184
)
184185

185-
async def _wait(self):
186+
async def _wait(self) -> None:
186187
assert self.__bg_tasks is not None
188+
deadline = time.time() + SHUTDOWN_TIMEOUT
189+
while time.time() < deadline:
190+
if not self.__bg_tasks.all_alive():
191+
break
192+
await asyncio.sleep(0.1)
193+
if self.__bg_tasks.all_alive():
194+
logger.warning(
195+
f"Background batch tasks did not exit within {SHUTDOWN_TIMEOUT}s. "
196+
f"Forcing shutdown. inflight_objs={len(self.__inflight_objs)}, "
197+
f"inflight_refs={len(self.__inflight_refs)}"
198+
)
199+
self.__shutdown_loop.set() # force __loop to exit
187200
await self.__bg_tasks.gather()
188201

189202
# copy the results to the public results

weaviate/collections/batch/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
GCP_STREAM_TIMEOUT = (
6565
160 # GCP connections have a max lifetime of 180s, leave 20s of buffer as safety
6666
)
67+
SHUTDOWN_TIMEOUT = 300 # time to wait for background threads to exit after shutdown is initiated, in seconds, in the event the server never hangs up
6768

6869

6970
class BatchRequest(ABC, Generic[TBatchInput, TBatchReturn]):

weaviate/collections/batch/sync.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from weaviate.collections.batch.base import (
1111
GCP_STREAM_TIMEOUT,
12+
SHUTDOWN_TIMEOUT,
1213
ObjectsBatchRequest,
1314
ReferencesBatchRequest,
1415
_BatchDataWrapper,
@@ -135,6 +136,19 @@ def _start(self) -> None:
135136
)
136137

137138
def _wait(self) -> None:
139+
deadline = time.time() + SHUTDOWN_TIMEOUT
140+
while time.time() < deadline:
141+
if not self.__bg_threads.is_alive():
142+
break
143+
time.sleep(0.1)
144+
if self.__bg_threads.is_alive():
145+
logger.warning(
146+
f"Background batch threads did not exit within {SHUTDOWN_TIMEOUT}s. "
147+
f"Forcing shutdown. inflight_objs={len(self.__inflight_objs)}, "
148+
f"inflight_refs={len(self.__inflight_refs)}"
149+
)
150+
self.__shutdown_loop.set() # force __loop to exit
151+
138152
self.__bg_threads.join()
139153

140154
# copy the results to the public results

0 commit comments

Comments
 (0)