Skip to content

Commit 642b9bb

Browse files
authored
feat(Data Modeling): change data modeling /sync default mode to two_phase (#2595)
1 parent b5d166a commit 642b9bb

4 files changed

Lines changed: 141 additions & 84 deletions

File tree

MIGRATION_GUIDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ Changes are grouped as follows:
7777
- The default value for the `operator` parameter in the `InstancesAPI.search` method has been changed to `AND` (which previously defaulted to 'OR' behavior). This change provides more precise search results by default, requiring all search terms to be present. If you need the previous behavior of matching any search term, explicitly pass `operator='OR'`.
7878
- All typed instance apply classes, e.g. `CogniteAssetApply` from `cognite.client.data_classes.data_modeling.cdm.v1` (or `extractor_extensions.v1`) now work with patch updates (using `replace=False`). Previously, all unset fields would be dumped as `None` and thus cleared/nulled in the backend database. Now, any unset fields are not dumped and will not clear an existing value (unless used with `replace=True`).
7979
- For users of the Data Modeling API method `sync`, the data classes have been split from those used in `query`. They can be recognized by simply appending `Sync` to the end, e.g. `Query` and `QuerySync`. Previously, these were used for both, but as these API endpoints continue to evolve, it makes sense to fine-tune the data classes to each - starting now. Examples: `QuerySync`, `SelectSync`, `NodeResultSetExpressionSync` and `EdgeResultSetExpressionSync`.
80+
- The default value of `sync_mode` on `NodeResultSetExpressionSync` and `EdgeResultSetExpressionSync` has changed from `None` to `"two_phase"`. Previously, omitting `sync_mode` would send no mode to the API (which defaults to `onePhase` server-side). Now it explicitly requests two-phase sync (backfill followed by live updates), which is the recommended mode for most use cases. To restore the old behaviour, pass `sync_mode="one_phase"` explicitly.
8081
- When using `to_pandas` on a list of Data Modeling instances, the properties will be expanded by default (to separate columns). To get the old behaviour, pass `expand_properties=False`.
8182
- When using `to_pandas` on a list of Data Modeling instances, parameters `expand_metadata` and `metadata_prefix` are no longer silently ignored and will raise as unrecognized.
8283
- When using `get` on a list of Data Modeling instances, the parameter `id` has been removed. Use `instance_id` (or `external_id` when there is no ambiguity on space).

cognite/client/_api/data_modeling/instances.py

Lines changed: 65 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -874,7 +874,11 @@ async def subscribe(
874874
>>> view_id = ViewId("someSpace", "someView", "v1")
875875
>>> filter = Equals(view_id.as_property_ref("myAsset"), "Il-Tempo-Gigante")
876876
>>> query = QuerySync(
877-
... with_={"work_orders": NodeResultSetExpressionSync(filter=filter)},
877+
... with_={
878+
... "work_orders": NodeResultSetExpressionSync(
879+
... filter=filter, sync_mode="two_phase"
880+
... )
881+
... },
878882
... select={"work_orders": SelectSync([SourceSelector(view_id, ["*"])])},
879883
... )
880884
>>> subscription_context = client.data_modeling.instances.subscribe(
@@ -1577,26 +1581,26 @@ async def query(
15771581
>>> from cognite.client.data_classes.data_modeling.ids import ViewId
15781582
>>> client = CogniteClient()
15791583
>>> # async_client = AsyncCogniteClient() # another option
1580-
>>> work_order_id = ViewId("mySpace", "WorkOrderView", "v1")
1581-
>>> pump_id = ViewId("mySpace", "PumpView", "v1")
1584+
>>> work_order_view = ViewId("mySpace", "myWorkOrder", "v1")
1585+
>>> pump_view = ViewId("mySpace", "myPump", "v1")
15821586
>>> query = Query(
15831587
... with_={
15841588
... "work_orders": NodeResultSetExpression(
1585-
... filter=Range(work_order_id.as_property_ref("createdYear"), lt=2023)
1589+
... filter=Range(work_order_view.as_property_ref("createdYear"), lt=2023)
15861590
... ),
15871591
... "work_orders_to_pumps": EdgeResultSetExpression(
15881592
... from_="work_orders",
15891593
... filter=Equals(
15901594
... ["edge", "type"],
1591-
... {"space": work_order_id.space, "externalId": "WorkOrder.asset"},
1595+
... {"space": work_order_view.space, "externalId": "WorkOrder.pump"},
15921596
... ),
15931597
... ),
15941598
... "pumps": NodeResultSetExpression(from_="work_orders_to_pumps"),
15951599
... },
15961600
... select={
15971601
... "pumps": Select(
1598-
... [SourceSelector(pump_id, ["name"])],
1599-
... sort=[InstanceSort(pump_id.as_property_ref("name"))],
1602+
... [SourceSelector(pump_view, properties=["name"])],
1603+
... sort=[InstanceSort(pump_view.as_property_ref("name"))],
16001604
... ),
16011605
... },
16021606
... )
@@ -1646,60 +1650,86 @@ async def sync(
16461650
16471651
Subscribe to changes for nodes and edges in a project, matching a supplied filter.
16481652
1653+
Note:
1654+
Each result set expression accepts a ``sync_mode`` controlling how the initial data is loaded. The default,
1655+
``"two_phase"``, runs a backfill phase followed by live updates and is recommended for queries with custom
1656+
filters; the backfill phase's filters and sort must be backed by a cursorable index. Use ``"one_phase"`` when
1657+
fetching all instances (or all in specific spaces) for better throughput, or ``"no_backfill"`` to skip the
1658+
initial load and only receive live updates.
1659+
1660+
When using ``"two_phase"``, the backfill phase is over when the number of instances returned is smaller than
1661+
the limit (including 0).
1662+
16491663
Args:
1650-
query (QuerySync): Query.
1651-
include_typing (bool): Should we return property type information as part of the result?
1664+
query (QuerySync): The query for instances.
1665+
include_typing (bool): Return property type information as part of the result.
16521666
debug (DebugParameters | None): Debug settings for profiling and troubleshooting.
16531667
16541668
Returns:
1655-
QueryResult: The resulting nodes and/or edges from the query.
1669+
QueryResult: The resulting instances from the query.
16561670
16571671
Examples:
16581672
1659-
Query all pumps connected to work orders created before 2023, sorted by name:
1673+
Sync all assets in a given space using one-phase mode (recommended for full fetches):
16601674
1661-
>>> from cognite.client import CogniteClient
1662-
>>> from cognite.client.data_classes.data_modeling.instances import InstanceSort
1675+
>>> from cognite.client import CogniteClient, AsyncCogniteClient
16631676
>>> from cognite.client.data_classes.data_modeling.query import (
1664-
... Query,
1665-
... Select,
1666-
... NodeResultSetExpression,
1667-
... EdgeResultSetExpression,
1677+
... QuerySync,
1678+
... SelectSync,
1679+
... NodeResultSetExpressionSync,
16681680
... SourceSelector,
16691681
... )
1670-
>>> from cognite.client.data_classes.filters import Range, Equals
1682+
>>> from cognite.client.data_classes.filters import SpaceFilter
16711683
>>> from cognite.client.data_classes.data_modeling.ids import ViewId
16721684
>>> client = CogniteClient()
16731685
>>> # async_client = AsyncCogniteClient() # another option
1674-
>>> work_order_id = ViewId("mySpace", "WorkOrderView", "v1")
1675-
>>> pump_id = ViewId("mySpace", "PumpView", "v1")
1676-
>>> query = Query(
1686+
>>> asset_view = ViewId("mySpace", "myAssets", "v1")
1687+
>>> query = QuerySync(
16771688
... with_={
1678-
... "work_orders": NodeResultSetExpression(
1679-
... filter=Range(work_order_id.as_property_ref("createdYear"), lt=2023)
1689+
... "pumps": NodeResultSetExpressionSync(
1690+
... filter=SpaceFilter("mySpace"),
1691+
... sync_mode="one_phase",
16801692
... ),
1681-
... "work_orders_to_pumps": EdgeResultSetExpression(
1693+
... },
1694+
... select={"pumps": SelectSync([SourceSelector(asset_view, ["*"])])},
1695+
... )
1696+
>>> res = client.data_modeling.instances.sync(query)
1697+
>>> # Later: keep up with changes by following the cursor:
1698+
>>> query.cursors = res.cursors
1699+
>>> res_new = client.data_modeling.instances.sync(query)
1700+
1701+
Sync all pumps connected to open work orders:
1702+
1703+
>>> from cognite.client.data_classes.data_modeling.query import (
1704+
... EdgeResultSetExpressionSync,
1705+
... )
1706+
>>> from cognite.client.data_classes.filters import Equals
1707+
>>> work_order_view = ViewId("mySpace", "myWorkOrder", "v1")
1708+
>>> pump_view = ViewId("mySpace", "myPump", "v1")
1709+
>>> query = QuerySync(
1710+
... with_={
1711+
... "work_orders": NodeResultSetExpressionSync(
1712+
... filter=Equals(work_order_view.as_property_ref("status"), "open"),
1713+
... sync_mode="two_phase",
1714+
... ),
1715+
... "work_orders_to_pumps": EdgeResultSetExpressionSync(
16821716
... from_="work_orders",
16831717
... filter=Equals(
16841718
... ["edge", "type"],
1685-
... {"space": work_order_id.space, "externalId": "WorkOrder.asset"},
1719+
... {"space": work_order_view.space, "externalId": "WorkOrder.pump"},
16861720
... ),
1721+
... sync_mode="two_phase",
1722+
... ),
1723+
... "pumps": NodeResultSetExpressionSync(
1724+
... from_="work_orders_to_pumps",
1725+
... sync_mode="two_phase",
16871726
... ),
1688-
... "pumps": NodeResultSetExpression(from_="work_orders_to_pumps"),
16891727
... },
16901728
... select={
1691-
... "pumps": Select(
1692-
... [SourceSelector(pump_id, ["name"])],
1693-
... sort=[InstanceSort(pump_id.as_property_ref("name"))],
1694-
... ),
1729+
... "pumps": SelectSync([SourceSelector(pump_view, ["*"])]),
16951730
... },
16961731
... )
16971732
>>> res = client.data_modeling.instances.sync(query)
1698-
>>> # Added a new work order with pumps created before 2023
1699-
>>> query.cursors = res.cursors
1700-
>>> res_new = client.data_modeling.instances.sync(query)
1701-
1702-
In the last example, the res_new will only contain the pumps that have been added with the new work order.
17031733
17041734
To debug and/or profile your query, you can use the debug parameter:
17051735

0 commit comments

Comments
 (0)