Skip to content

Commit 73b28fc

Browse files
authored
fix(datapoints): restore backpressure in insert_multiple to fix memory regression vs v7 (#2594)
1 parent 43efd75 commit 73b28fc

3 files changed

Lines changed: 53 additions & 12 deletions

File tree

cognite/client/_api/datapoints.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2419,17 +2419,18 @@ def _create_payload_tasks(
24192419
yield payload
24202420

24212421
async def _insert_datapoints(self, payload: list[dict[str, Any]]) -> None:
2422-
# Convert to memory intensive format as late as possible (and clean up after insert)
2423-
for dct in payload:
2424-
dct["datapoints"] = [dp.dump() for dp in dct["datapoints"]]
2425-
headers: dict[str, str] | None = None
2426-
2427-
await self.dps_client._post(
2428-
url_path=self.dps_client._RESOURCE_PATH,
2429-
json={"items": payload},
2430-
headers=headers,
2431-
semaphore=self.dps_client._get_semaphore("write"),
2432-
)
2422+
# Acquire the semaphore before converting to memory-intensive format:
2423+
async with self.dps_client._get_semaphore("write"):
2424+
for dct in payload:
2425+
dct["datapoints"] = [dp.dump() for dp in dct["datapoints"]]
2426+
await self.dps_client._post(
2427+
url_path=self.dps_client._RESOURCE_PATH,
2428+
json={"items": payload},
2429+
headers=None,
2430+
semaphore=None, # Already holding the semaphore above
2431+
)
2432+
# ...and clean up after insert
2433+
# (needed because a ref preventing gc is held to these until the whole job is done):
24332434
for dct in payload:
24342435
dct["datapoints"].clear()
24352436

cognite/client/_sync_api/datapoints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""
22
===============================================================================
3-
dfa197641edb7840e903b3b65c021f58
3+
830266fb5fd8d205e7c1163175ba904e
44
This file is auto-generated from the Async API modules, - do not edit manually!
55
===============================================================================
66
"""

tests/tests_unit/test_api/test_datapoints.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,46 @@ def test_insert_multiple_ts_single_call__above_dps_limit_below_ts_limit(
353353
cognite_client.time_series.data.insert_multiple(dps_objects)
354354
assert 2 == len(mock_post_datapoints.get_requests())
355355

356+
def test_insert_multiple_dump_only_called_within_semaphore(
357+
self,
358+
cognite_client: CogniteClient,
359+
mock_post_datapoints: HTTPXMock,
360+
monkeypatch: MonkeyPatch,
361+
async_client: AsyncCogniteClient,
362+
) -> None:
363+
# Bug in 8.0.0 to 8.1.0: all asyncio tasks were created at once and dp.dump() (synchronous,
364+
# no await) ran in every task before any of them reached the semaphore, causing all payloads
365+
# to be materialised in memory simultaneously (memory regression vs v7).
366+
sem_held = False
367+
dump_sem_held: list[bool] = []
368+
369+
class _MockSemaphore:
370+
async def __aenter__(self) -> _MockSemaphore:
371+
nonlocal sem_held
372+
sem_held = True
373+
return self
374+
375+
async def __aexit__(self, *_: Any) -> None:
376+
nonlocal sem_held
377+
sem_held = False
378+
379+
monkeypatch.setattr(async_client.time_series.data, "_get_semaphore", lambda _op: _MockSemaphore())
380+
381+
original_dump = _InsertDatapoint.dump
382+
383+
def tracking_dump(self: _InsertDatapoint) -> dict:
384+
dump_sem_held.append(sem_held)
385+
return original_dump(self)
386+
387+
monkeypatch.setattr(_InsertDatapoint, "dump", tracking_dump)
388+
389+
monkeypatch.setattr(async_client.time_series.data, "_DPS_INSERT_LIMIT", 5)
390+
dps = [(i * 1e11, i) for i in range(50)]
391+
cognite_client.time_series.data.insert(dps, id=1)
392+
393+
assert len(dump_sem_held) == 50
394+
assert all(dump_sem_held), "dp.dump() was called outside the semaphore context"
395+
356396

357397
@pytest.fixture
358398
def mock_delete_datapoints(

0 commit comments

Comments
 (0)