Skip to content

Commit 1c67654

Browse files
committed
Move inflight updates before yield to grpc to avoid racing
1 parent 5a0e8f4 commit 1c67654

2 files changed

Lines changed: 11 additions & 3 deletions

File tree

weaviate/collections/batch/async_.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,9 @@ async def __send(
337337
)
338338
yield batch_pb2.BatchStreamRequest(stop=batch_pb2.BatchStreamRequest.Stop())
339339
return
340-
yield req.proto
341340
self.__inflight_objs.update(req.uuids)
342341
self.__inflight_refs.update(req.beacons)
342+
yield req.proto
343343
continue
344344
except asyncio.TimeoutError:
345345
if self.__is_shutting_down.is_set():

weaviate/collections/batch/sync.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,10 @@ def __send(
289289
)
290290
yield batch_pb2.BatchStreamRequest(stop=batch_pb2.BatchStreamRequest.Stop())
291291
return
292-
yield req.proto
293292
with self.__acks_lock:
294293
self.__inflight_objs.update(req.uuids)
295294
self.__inflight_refs.update(req.beacons)
295+
yield req.proto
296296
continue
297297
except Empty:
298298
if self.__is_shutting_down.is_set():
@@ -304,7 +304,9 @@ def __send(
304304
elif self.__is_hungup.is_set():
305305
logger.info("Detected hung up stream, closing the client-side of the stream")
306306
return
307-
logger.info("Timed out getting request from queue, but not stopping, continuing...")
307+
logger.debug(
308+
"Timed out getting request from queue, but not stopping, continuing..."
309+
)
308310
logger.info("Batch send thread exiting due to exception...")
309311

310312
def __recv(self) -> None:
@@ -575,6 +577,11 @@ def _add_object(
575577
self.__objs_count += 1
576578

577579
while self.__is_blocked():
580+
logger.warning("Batch is blocked, waiting to add more objects...")
581+
if len(self.__inflight_objs) >= self.__batch_size:
582+
logger.info(
583+
f"Too many inflight_objs, waiting for acknowledgements from the server: {len(self.__inflight_objs)}, {self.__batch_size}"
584+
)
578585
self.__check_bg_threads_alive()
579586
time.sleep(0.01)
580587

@@ -617,6 +624,7 @@ def _add_reference(
617624
self.__refs_cache[batch_reference._to_beacon()] = batch_reference
618625
self.__refs_count += 1
619626
while self.__is_blocked():
627+
logger.warning("Batch is blocked, waiting to add more references...")
620628
self.__check_bg_threads_alive()
621629
time.sleep(0.01)
622630

0 commit comments

Comments
 (0)