Skip to content

Commit f862306

Browse files
andersfyllingclaudehaakonvt
authored
feat(records): add upsert endpoint (#2679)
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com> Co-authored-by: Håkon V. Treider <haakonvt@gmail.com>
1 parent 5309e35 commit f862306

3 files changed

Lines changed: 179 additions & 2 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ def _get_semaphore(self, operation: Literal["write", "delete"]) -> asyncio.Bound
3535
)
3636

3737
def _records_url(self, stream_id: str, suffix: str = "") -> str:
38-
return interpolate_and_url_encode("/streams/{}/records{}", stream_id, suffix)
38+
# Encode only stream_id; the suffix is a literal path segment (e.g. "/upsert"),
39+
# so it must not be percent-encoded.
40+
return interpolate_and_url_encode("/streams/{}/records", stream_id) + suffix
3941

4042
async def delete(
4143
self,
@@ -127,3 +129,57 @@ async def ingest(
127129
resource_path=self._records_url(stream_id),
128130
no_response=True,
129131
)
132+
133+
async def upsert(
134+
self,
135+
items: RecordWrite | Sequence[RecordWrite],
136+
*,
137+
stream_id: str,
138+
upsert_mode: Literal["replace"] = "replace",
139+
) -> None:
140+
"""`Upsert records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_.
141+
142+
Creates or fully updates records. Only valid for mutable streams (returns 422 on
143+
immutable). When a record with the same ``space + externalId`` already exists it is
144+
fully replaced (this endpoint does not do partial property updates); otherwise it is
145+
created.
146+
147+
Args:
148+
items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert.
149+
stream_id (str): External ID of the stream to upsert into.
150+
upsert_mode (Literal['replace']): How existing records are updated. Currently only ``"replace"`` is supported, which fully replaces the existing record. Defaults to ``"replace"``.
151+
152+
Examples:
153+
154+
Upsert a single record:
155+
156+
>>> from cognite.client import CogniteClient
157+
>>> from cognite.client.data_classes.data_modeling.records import (
158+
... RecordWrite,
159+
... RecordContainerId,
160+
... RecordSource,
161+
... )
162+
>>> client = CogniteClient()
163+
>>> client.data_modeling.records.upsert(
164+
... RecordWrite(
165+
... space="my-space",
166+
... external_id="rec-1",
167+
... sources=[
168+
... RecordSource(
169+
... source=RecordContainerId(
170+
... space="my-space", external_id="my-container"
171+
... ),
172+
... properties={"temperature": 23.0},
173+
... )
174+
... ],
175+
... ),
176+
... stream_id="my-stream",
177+
... )
178+
"""
179+
self._warning.warn()
180+
item_list: list[RecordWrite] = [items] if isinstance(items, RecordWrite) else list(items)
181+
await self._create_multiple(
182+
items=item_list,
183+
resource_path=self._records_url(stream_id, "/upsert"),
184+
no_response=True,
185+
)

cognite/client/_sync_api/data_modeling/records.py

Lines changed: 48 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/tests_unit/test_api/test_data_modeling/test_records.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ def mock_ingest(httpx_mock: HTTPXMock, ingest_url_pattern: re.Pattern) -> None:
4545
httpx_mock.add_response(method="POST", url=ingest_url_pattern, status_code=202)
4646

4747

48+
@pytest.fixture
49+
def upsert_url_pattern(records_base_url: str) -> re.Pattern:
50+
return re.compile(re.escape(records_base_url) + r"/upsert$")
51+
52+
53+
@pytest.fixture
54+
def mock_upsert(httpx_mock: HTTPXMock, upsert_url_pattern: re.Pattern) -> None:
55+
httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202)
56+
57+
4858
@pytest.fixture
4959
def write_item() -> RecordWrite:
5060
return RecordWrite(
@@ -152,6 +162,70 @@ def test_ingest_chunks_over_1000(
152162
assert len(jsgz_load(requests[1].content)["items"]) == 1
153163

154164

165+
class TestRecordsAPIUpsert:
166+
def test_upsert_single_posts_correct_body(
167+
self,
168+
cognite_client: CogniteClient,
169+
httpx_mock: HTTPXMock,
170+
mock_upsert: None,
171+
stream_id: str,
172+
write_item: RecordWrite,
173+
) -> None:
174+
cognite_client.data_modeling.records.upsert(write_item, stream_id=stream_id)
175+
requests = httpx_mock.get_requests()
176+
assert len(requests) == 1
177+
assert requests[0].url.path.endswith(f"/streams/{stream_id}/records/upsert")
178+
body = jsgz_load(requests[0].content)
179+
assert body == {
180+
"items": [
181+
{
182+
"space": "sp",
183+
"externalId": "rec-1",
184+
"sources": [
185+
{
186+
"source": {"type": "container", "space": "sp", "externalId": "container-x"},
187+
"properties": {"temp": 22.5},
188+
}
189+
],
190+
}
191+
]
192+
}
193+
194+
def test_upsert_accepts_sequence(
195+
self,
196+
cognite_client: CogniteClient,
197+
httpx_mock: HTTPXMock,
198+
mock_upsert: None,
199+
stream_id: str,
200+
) -> None:
201+
items = [
202+
RecordWrite(space="sp", external_id="rec-1", sources=[]),
203+
RecordWrite(space="sp", external_id="rec-2", sources=[]),
204+
]
205+
cognite_client.data_modeling.records.upsert(items, stream_id=stream_id)
206+
body = jsgz_load(httpx_mock.get_requests()[0].content)
207+
assert [item["externalId"] for item in body["items"]] == ["rec-1", "rec-2"]
208+
209+
def test_upsert_chunks(
210+
self,
211+
cognite_client: CogniteClient,
212+
async_client: AsyncCogniteClient,
213+
httpx_mock: HTTPXMock,
214+
upsert_url_pattern: re.Pattern,
215+
stream_id: str,
216+
monkeypatch: pytest.MonkeyPatch,
217+
) -> None:
218+
monkeypatch.setattr(async_client.data_modeling.records, "_CREATE_LIMIT", 10)
219+
httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202)
220+
httpx_mock.add_response(method="POST", url=upsert_url_pattern, status_code=202)
221+
items = [RecordWrite(space="sp", external_id=f"r-{i}", sources=[]) for i in range(11)]
222+
cognite_client.data_modeling.records.upsert(items, stream_id=stream_id)
223+
requests = httpx_mock.get_requests()
224+
assert len(requests) == 2
225+
assert len(jsgz_load(requests[0].content)["items"]) == 10
226+
assert len(jsgz_load(requests[1].content)["items"]) == 1
227+
228+
155229
class TestRecordDTOs:
156230
def test_record_write_as_id(self, write_item: RecordWrite) -> None:
157231
rid = write_item.as_id()

0 commit comments

Comments
 (0)