Skip to content

Commit 8c15fa3

Browse files
fix(records): keep sync PR scoped to sync endpoint
Remove list/filter endpoint surface from the records sync branch and route sync through the shared _list helper for cursor handling. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent e26d6bd commit 8c15fa3

8 files changed

Lines changed: 112 additions & 556 deletions

File tree

cognite/client/_api/data_modeling/records.py

Lines changed: 21 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,15 @@
55
from typing import TYPE_CHECKING, Any, ClassVar, Literal
66

77
from cognite.client._api_client import APIClient
8-
from cognite.client.data_classes.data_modeling.instances import InstanceSort
98
from cognite.client.data_classes.data_modeling.records import (
10-
Record,
119
RecordId,
1210
RecordIdSequence,
13-
RecordList,
1411
RecordSourceSelector,
1512
RecordTargetUnit,
1613
RecordTargetUnits,
1714
RecordWrite,
15+
SyncRecord,
1816
SyncRecordList,
19-
TimeRange,
2017
)
2118
from cognite.client.data_classes.filters import Filter
2219
from cognite.client.utils._concurrency import RecordsConcurrencyOperation
@@ -27,9 +24,6 @@
2724
from cognite.client import AsyncCogniteClient
2825
from cognite.client.config import ClientConfig
2926

30-
StreamType = Literal["immutable", "mutable"]
31-
_DEFAULT_STREAM_TYPE: StreamType = "immutable"
32-
3327

3428
class RecordsAPI(APIClient):
3529
def __init__(self, config: ClientConfig, api_version: str | None, cognite_client: AsyncCogniteClient) -> None:
@@ -44,9 +38,7 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client
4438
"delete": RecordsConcurrencyOperation.WRITE,
4539
}
4640

47-
def _get_semaphore( # type: ignore[override]
48-
self, operation: Literal["read", "write", "delete"], stream_type: StreamType
49-
) -> asyncio.BoundedSemaphore:
41+
def _get_semaphore(self, operation: Literal["read", "write", "delete"]) -> asyncio.BoundedSemaphore:
5042
from cognite.client import global_config
5143

5244
return global_config.concurrency_settings.records._semaphore_factory(
@@ -69,7 +61,6 @@ async def delete(
6961
items: RecordId | Sequence[RecordId],
7062
*,
7163
stream_id: str,
72-
stream_type: StreamType = _DEFAULT_STREAM_TYPE,
7364
ignore_unknown_ids: Literal[True] = True,
7465
) -> None:
7566
"""`Delete records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/deleteRecords>`_.
@@ -80,7 +71,6 @@ async def delete(
8071
Args:
8172
items (RecordId | Sequence[RecordId]): Records to delete.
8273
stream_id (str): External ID of the stream to delete from.
83-
stream_type (StreamType): Type of the stream ("immutable" or "mutable"). Defaults to "immutable".
8474
ignore_unknown_ids (Literal[True]): Currently only True is supported
8575
8676
Examples:
@@ -103,15 +93,13 @@ async def delete(
10393
identifiers=RecordIdSequence.load(items),
10494
wrap_ids=True,
10595
resource_path=self._records_url(stream_id),
106-
override_semaphore=self._get_semaphore("delete", stream_type),
10796
)
10897

10998
async def ingest(
11099
self,
111100
items: RecordWrite | Sequence[RecordWrite],
112101
*,
113102
stream_id: str,
114-
stream_type: StreamType = _DEFAULT_STREAM_TYPE,
115103
) -> None:
116104
"""`Ingest records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/ingestRecords>`_.
117105
@@ -123,7 +111,6 @@ async def ingest(
123111
Args:
124112
items (RecordWrite | Sequence[RecordWrite]): One or more records to ingest.
125113
stream_id (str): External ID of the stream to ingest into.
126-
stream_type (StreamType): Type of the stream ("immutable" or "mutable"). Defaults to "immutable".
127114
128115
Examples:
129116
@@ -158,15 +145,13 @@ async def ingest(
158145
items=item_list,
159146
resource_path=self._records_url(stream_id),
160147
no_response=True,
161-
override_semaphore=self._get_semaphore("write", stream_type),
162148
)
163149

164150
async def upsert(
165151
self,
166152
items: RecordWrite | Sequence[RecordWrite],
167153
*,
168154
stream_id: str,
169-
stream_type: StreamType = _DEFAULT_STREAM_TYPE,
170155
upsert_mode: Literal["replace"] = "replace",
171156
) -> None:
172157
"""`Upsert records into a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/upsertRecords>`_.
@@ -179,7 +164,6 @@ async def upsert(
179164
Args:
180165
items (RecordWrite | Sequence[RecordWrite]): One or more records to upsert.
181166
stream_id (str): External ID of the stream to upsert into.
182-
stream_type (StreamType): Type of the stream ("immutable" or "mutable"). Defaults to "immutable".
183167
upsert_mode (Literal['replace']): How existing records are updated. Currently only ``"replace"`` is supported, which fully replaces the existing record. Defaults to ``"replace"``.
184168
185169
Examples:
@@ -215,97 +199,18 @@ async def upsert(
215199
items=item_list,
216200
resource_path=self._records_url(stream_id, "/upsert"),
217201
no_response=True,
218-
override_semaphore=self._get_semaphore("write", stream_type),
219-
)
220-
221-
async def list(
222-
self,
223-
stream_id: str,
224-
*,
225-
stream_type: StreamType = _DEFAULT_STREAM_TYPE,
226-
last_updated_time: TimeRange | None = None,
227-
filter: Filter | None = None,
228-
sources: Sequence[RecordSourceSelector] | None = None,
229-
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
230-
sort: Sequence[InstanceSort] | InstanceSort | None = None,
231-
limit: int = 10,
232-
include_typing: bool = False,
233-
) -> RecordList:
234-
"""`Filter records in a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/filterRecords>`_.
235-
236-
Returns records matching the given filters, sorted by ``lastUpdatedTime`` unless a custom
237-
``sort`` is given. This endpoint is not cursor-paged: it returns at most ``limit`` records
238-
(max 1000). To page over a large time window, issue multiple calls with partitioned
239-
``last_updated_time`` ranges.
240-
241-
Args:
242-
stream_id (str): External ID of the stream to query.
243-
stream_type (StreamType): Type of the stream ("immutable" or "mutable"). Defaults to "immutable".
244-
last_updated_time (TimeRange | None): Filter by last-updated time. **Required for
245-
immutable streams** (must include a lower bound).
246-
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
247-
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
248-
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
249-
to another unit.
250-
sort (Sequence[InstanceSort] | InstanceSort | None): Sort specification(s); up to 5.
251-
limit (int): Maximum number of records to return (1-1000). Defaults to 10.
252-
include_typing (bool): If True, include property type information on the returned
253-
list's ``typing`` attribute.
254-
255-
Returns:
256-
RecordList: The matching records.
257-
258-
Examples:
259-
260-
List records updated since a given timestamp:
261-
262-
>>> from cognite.client import CogniteClient
263-
>>> from cognite.client.data_classes.data_modeling.records import TimeRange
264-
>>> client = CogniteClient()
265-
>>> res = client.data_modeling.records.list(
266-
... stream_id="my-stream",
267-
... last_updated_time=TimeRange(gt=1705341600000),
268-
... limit=100,
269-
... )
270-
"""
271-
self._warning.warn()
272-
other_params: dict[str, Any] = {}
273-
if last_updated_time is not None:
274-
other_params["lastUpdatedTime"] = last_updated_time.dump()
275-
if sources is not None:
276-
other_params["sources"] = [source.dump() for source in sources]
277-
if sort is not None:
278-
sort_list = [sort] if isinstance(sort, InstanceSort) else list(sort)
279-
other_params["sort"] = [spec.dump() for spec in sort_list]
280-
if target_units is not None:
281-
other_params["targetUnits"] = self._dump_target_units(target_units)
282-
if include_typing:
283-
other_params["includeTyping"] = True
284-
285-
return await self._list(
286-
list_cls=RecordList,
287-
resource_cls=Record,
288-
method="POST",
289-
resource_path=self._records_url(stream_id),
290-
url_path=self._records_url(stream_id, "/filter"),
291-
limit=limit,
292-
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
293-
other_params=other_params,
294-
settings_forcing_raw_response_loading=[f"{include_typing=}"] if include_typing else None,
295-
override_semaphore=self._get_semaphore("read", stream_type),
296202
)
297203

298204
async def sync(
299205
self,
300206
stream_id: str,
301207
*,
302-
stream_type: StreamType = _DEFAULT_STREAM_TYPE,
303208
cursor: str | None = None,
304209
initialize_cursor: str | None = None,
305210
filter: Filter | None = None,
306211
sources: Sequence[RecordSourceSelector] | None = None,
307212
target_units: RecordTargetUnits | Sequence[RecordTargetUnit] | None = None,
308-
limit: int | None = None,
213+
limit: int = 10,
309214
include_typing: bool = False,
310215
) -> SyncRecordList:
311216
"""`Sync records from a stream <https://api-docs.cognite.com/20230101/tag/Records/operation/syncRecords>`_.
@@ -318,15 +223,14 @@ async def sync(
318223
319224
Args:
320225
stream_id (str): External ID of the stream to sync.
321-
stream_type (StreamType): Type of the stream ("immutable" or "mutable"). Defaults to "immutable".
322226
cursor (str | None): Resume from a cursor returned by a previous sync call.
323227
initialize_cursor (str | None): Where to start when no ``cursor`` is given, as a
324228
relative duration like ``"7d-ago"``. Ignored when ``cursor`` is set.
325229
filter (Filter | None): Filter expression (see :mod:`cognite.client.data_classes.filters`).
326230
sources (Sequence[RecordSourceSelector] | None): Which container properties to return.
327231
target_units (RecordTargetUnits | Sequence[RecordTargetUnit] | None): Properties to convert
328232
to another unit.
329-
limit (int | None): Maximum number of records to return in this page (1-1000).
233+
limit (int): Maximum number of records to return in this page (1-1000). Defaults to 10.
330234
include_typing (bool): If True, include property type information on the returned
331235
list's ``typing`` attribute.
332236
@@ -351,25 +255,26 @@ async def sync(
351255
self._warning.warn()
352256
if cursor is not None and initialize_cursor is not None:
353257
raise ValueError("Provide either 'cursor' or 'initialize_cursor', not both.")
354-
body: dict[str, Any] = {}
355-
if cursor is not None:
356-
body["cursor"] = cursor
357-
elif initialize_cursor is not None:
358-
body["initializeCursor"] = initialize_cursor
359-
if filter is not None:
360-
body["filter"] = filter.dump(camel_case_property=False)
258+
other_params: dict[str, Any] = {}
259+
if initialize_cursor is not None:
260+
other_params["initializeCursor"] = initialize_cursor
361261
if sources is not None:
362-
body["sources"] = [source.dump() for source in sources]
262+
other_params["sources"] = [source.dump() for source in sources]
363263
if target_units is not None:
364-
body["targetUnits"] = self._dump_target_units(target_units)
365-
if limit is not None:
366-
body["limit"] = limit
264+
other_params["targetUnits"] = self._dump_target_units(target_units)
367265
if include_typing:
368-
body["includeTyping"] = True
266+
other_params["includeTyping"] = True
369267

370-
response = await self._post(
268+
return await self._list(
269+
list_cls=SyncRecordList,
270+
resource_cls=SyncRecord,
271+
method="POST",
272+
resource_path=self._records_url(stream_id),
371273
url_path=self._records_url(stream_id, "/sync"),
372-
json=body,
373-
semaphore=self._get_semaphore("read", stream_type),
274+
limit=limit,
275+
filter=filter.dump(camel_case_property=False) if isinstance(filter, Filter) else filter,
276+
other_params=other_params,
277+
initial_cursor=cursor,
278+
settings_forcing_raw_response_loading=["records_sync_cursor"],
279+
override_semaphore=self._get_semaphore("read"),
374280
)
375-
return SyncRecordList._load_response(response.json())

0 commit comments

Comments
 (0)