Skip to content

Commit 9ecf95c

Browse files
feat(workflows): add WorkflowRecordStreamTriggerRule to workflow triggers (#2690)
Co-authored-by: Håkon V. Treider <haakonvt@gmail.com>
1 parent 3ba31de commit 9ecf95c

3 files changed

Lines changed: 139 additions & 1 deletion

File tree

cognite/client/_api/workflows/triggers.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,33 @@ async def upsert(
103103
... )
104104
... )
105105
106+
Create or update a record stream trigger for a workflow:
107+
108+
>>> from cognite.client.data_classes.workflows import (
109+
... WorkflowRecordStreamTriggerRule,
110+
... WorkflowRecordStreamSourceSelector,
111+
... )
112+
>>> from cognite.client.data_classes.data_modeling.records import RecordContainerId
113+
>>> client.workflows.triggers.upsert(
114+
... WorkflowTriggerUpsert(
115+
... external_id="my_trigger",
116+
... trigger_rule=WorkflowRecordStreamTriggerRule(
117+
... stream_external_id="my-stream",
118+
... batch_size=100,
119+
... batch_timeout=60,
120+
... initialize_cursor="6h-ago",
121+
... sources=[
122+
... WorkflowRecordStreamSourceSelector(
123+
... source=RecordContainerId("my-space", "my-container"),
124+
... properties=["name", "status"],
125+
... )
126+
... ],
127+
... ),
128+
... workflow_external_id="my_workflow",
129+
... workflow_version="1",
130+
... )
131+
... )
132+
106133
"""
107134
nonce = await create_session_and_return_nonce(
108135
self._cognite_client, api_name="Workflow API", client_credentials=client_credentials

cognite/client/_sync_api/workflows/triggers.py

Lines changed: 28 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cognite/client/data_classes/workflows.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
WriteableCogniteResourceList,
2121
)
2222
from cognite.client.data_classes.data_modeling.query import Query, ResultSetExpression, Select
23+
from cognite.client.data_classes.data_modeling.records import RecordContainerId
24+
from cognite.client.data_classes.filters import Filter
2325
from cognite.client.data_classes.simulators.runs import (
2426
SimulationInputOverride,
2527
)
@@ -1561,6 +1563,88 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
15611563
return item
15621564

15631565

1566+
class WorkflowRecordStreamSourceSelector(CogniteResource):
1567+
"""Selects which container properties to include in a record stream trigger's workflow input.
1568+
1569+
Args:
1570+
source (RecordContainerId): The container to select properties from.
1571+
properties (list[str]): Property identifiers to return; use ``["*"]`` to return all.
1572+
"""
1573+
1574+
def __init__(self, source: RecordContainerId, properties: list[str]) -> None:
1575+
self.source = source
1576+
self.properties = properties
1577+
1578+
@classmethod
1579+
def _load(cls, resource: dict[str, Any]) -> WorkflowRecordStreamSourceSelector:
1580+
return cls(source=RecordContainerId.load(resource["source"]), properties=resource["properties"])
1581+
1582+
def dump(self, camel_case: bool = True) -> dict[str, Any]:
1583+
return {"source": self.source.dump(camel_case=camel_case), "properties": self.properties}
1584+
1585+
1586+
class WorkflowRecordStreamTriggerRule(WorkflowTriggerRule):
1587+
"""
1588+
This class represents a record stream trigger rule.
1589+
1590+
Args:
1591+
stream_external_id (str): The external ID of the stream to subscribe to for record changes.
1592+
batch_size (int): The maximum number of records to pass to a workflow execution.
1593+
batch_timeout (int): The maximum time in seconds to wait for the batch to be filled.
1594+
filter (Filter | None): Optional filter to limit which records trigger the workflow.
1595+
sources (list[WorkflowRecordStreamSourceSelector] | None): Optional containers and properties to include in
1596+
the workflow input.
1597+
initialize_cursor (str | None): Where record stream syncing starts when no cursor exists yet,
1598+
as a relative duration like ``"6h-ago"``. If omitted, syncing starts from the current time (``"0d-ago"``).
1599+
"""
1600+
1601+
_trigger_type = "recordStream"
1602+
1603+
def __init__(
1604+
self,
1605+
stream_external_id: str,
1606+
batch_size: int,
1607+
batch_timeout: int,
1608+
filter: Filter | None = None,
1609+
sources: list[WorkflowRecordStreamSourceSelector] | None = None,
1610+
initialize_cursor: str | None = None,
1611+
) -> None:
1612+
self.stream_external_id = stream_external_id
1613+
self.batch_size = batch_size
1614+
self.batch_timeout = batch_timeout
1615+
self.filter = filter
1616+
self.sources = sources
1617+
self.initialize_cursor = initialize_cursor
1618+
1619+
@classmethod
1620+
def _load_trigger(cls, data: dict[str, Any]) -> WorkflowRecordStreamTriggerRule:
1621+
return cls(
1622+
stream_external_id=data["streamExternalId"],
1623+
batch_size=data["batchSize"],
1624+
batch_timeout=data["batchTimeout"],
1625+
filter=Filter._load_if(data.get("filter")),
1626+
sources=[WorkflowRecordStreamSourceSelector._load(source) for source in (data.get("sources") or [])],
1627+
initialize_cursor=data.get("initializeCursor"),
1628+
)
1629+
1630+
def dump(self, camel_case: bool = True) -> dict[str, Any]:
1631+
item: dict[str, Any] = {
1632+
"trigger_type": self.trigger_type,
1633+
"stream_external_id": self.stream_external_id,
1634+
"batch_size": self.batch_size,
1635+
"batch_timeout": self.batch_timeout,
1636+
}
1637+
if self.filter is not None:
1638+
item["filter"] = self.filter.dump()
1639+
if self.sources:
1640+
item["sources"] = [source.dump(camel_case=camel_case) for source in self.sources]
1641+
if self.initialize_cursor is not None:
1642+
item["initialize_cursor"] = self.initialize_cursor
1643+
if camel_case:
1644+
return convert_all_keys_to_camel_case(item)
1645+
return item
1646+
1647+
15641648
_TRIGGER_RULE_BY_TYPE: dict[str, type[WorkflowTriggerRule]] = {
15651649
subclass._trigger_type: subclass # type: ignore
15661650
for subclass in WorkflowTriggerRule.__subclasses__()

0 commit comments

Comments
 (0)