Skip to content

Commit 55397c1

Browse files
authored
python(feat): lazy flow creation (#296)
1 parent 48b8363 commit 55397c1

5 files changed

Lines changed: 465 additions & 16 deletions

File tree

python/lib/sift_py/error.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import warnings
22

3+
import google.protobuf.message
4+
35

46
class SiftError(Exception):
57
"""
@@ -28,3 +30,20 @@ def _component_deprecation_warning():
2830
"See docs for more details: https://docs.siftstack.com/docs/glossary#component",
2931
SiftAPIDeprecationWarning,
3032
)
33+
34+
35+
# The default max message size for the Sift gRPC server.
36+
GRPC_MAX_MESSAGE_SIZE = 4_194_304
37+
38+
39+
class ProtobufMaxSizeExceededError(Exception):
40+
"""
41+
The library limits the size of certain protobufs to prevent gRPC messages from being too big.
42+
"""
43+
44+
45+
def raise_if_too_large(pb: google.protobuf.message.Message):
46+
size = len(pb.SerializeToString())
47+
name = getattr(pb, "name", pb.__class__.__name__)
48+
if size > GRPC_MAX_MESSAGE_SIZE:
49+
raise ProtobufMaxSizeExceededError(f"{name} too large: {size}")

python/lib/sift_py/ingestion/_internal/ingest.py

Lines changed: 113 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import logging
44
from collections.abc import Callable
55
from datetime import datetime
6-
from typing import Any, Dict, List, Optional, Union, cast
6+
from typing import Any, Dict, List, Optional, Set, Union, cast
77

8+
import grpc
89
from google.protobuf.timestamp_pb2 import Timestamp
10+
from grpc import StatusCode
911
from sift.ingest.v1.ingest_pb2 import (
1012
IngestWithConfigDataChannelValue,
1113
IngestWithConfigDataStreamRequest,
@@ -14,6 +16,7 @@
1416
from sift.ingestion_configs.v2.ingestion_configs_pb2 import ChannelConfig as ChannelConfigPb
1517
from sift.ingestion_configs.v2.ingestion_configs_pb2 import IngestionConfig
1618

19+
from sift_py.error import ProtobufMaxSizeExceededError
1720
from sift_py.grpc.transport import SiftChannel
1821
from sift_py.ingestion._internal.error import IngestionValidationError
1922
from sift_py.ingestion._internal.ingestion_config import (
@@ -43,11 +46,13 @@ class _IngestionServiceImpl:
4346
ingestion_config: IngestionConfig
4447
asset_name: str
4548
flow_configs_by_name: Dict[str, FlowConfig]
49+
flow_configs_created: Set[str]
4650
rules: List[RuleConfig]
4751
run_id: Optional[str]
4852
organization_id: Optional[str]
4953
end_stream_on_error: bool
5054
config: TelemetryConfig
55+
use_lazy_flow_creation: bool
5156

5257
ingest_service_stub: IngestServiceStub
5358
rule_service: RuleService
@@ -58,21 +63,49 @@ def __init__(
5863
config: TelemetryConfig,
5964
run_id: Optional[str] = None,
6065
end_stream_on_error: bool = False,
66+
force_lazy_flow_creation: bool = False,
6167
):
62-
ingestion_config = self.__class__._get_or_create_ingestion_config(channel, config)
68+
try:
69+
ingestion_config = self.__class__._get_or_create_ingestion_config(
70+
channel, config, lazy_flows=force_lazy_flow_creation
71+
)
72+
except ProtobufMaxSizeExceededError:
73+
ingestion_config = self.__class__._get_or_create_ingestion_config(
74+
channel, config, lazy_flows=True
75+
)
76+
self.use_lazy_flow_creation = True
77+
else:
78+
self.use_lazy_flow_creation = force_lazy_flow_creation
79+
6380
self.ingestion_config = ingestion_config
6481

65-
if config._ingestion_client_key_is_generated:
66-
# If this is a generated key, use the local telemetry config since it is static.
82+
# `flow_configs_by_name` will include all flows in the config, and anything already created
83+
# `flow_configs_created` only includes those which are already registered
84+
if config._ingestion_client_key_is_generated and not self.use_lazy_flow_creation:
85+
# If this is a generated key, use the local telemetry config since it is static
86+
# All flows have also already been created since we aren't lazy
6787
self.flow_configs_by_name = {flow.name: flow for flow in config.flows}
88+
self.flow_configs_created = {flow.name for flow in config.flows}
6889
else:
69-
# If the user specified a client key, use the configuration from Sift since it
70-
# may have been updated.
71-
flows = [
72-
FlowConfig.from_pb(f)
73-
for f in get_ingestion_config_flows(channel, ingestion_config.ingestion_config_id)
74-
]
75-
self.flow_configs_by_name = {flow.name: flow for flow in flows}
90+
# If using lazy flow creation, assume the list of flows is large enough that
91+
# `get_ingestion_config_flows` will be very slow. Instead lazily check for
92+
# flow existance during streaming
93+
# Otherwise, since the user specified a client key, use the configuration from Sift since it
94+
# may have been updated, and we've already registered any new flows
95+
96+
if self.use_lazy_flow_creation:
97+
self.flow_configs_by_name = {flow.name: flow for flow in config.flows}
98+
self.flow_configs_created: Set[str] = set()
99+
else:
100+
flows = [
101+
FlowConfig.from_pb(f)
102+
for f in get_ingestion_config_flows(
103+
channel,
104+
ingestion_config.ingestion_config_id,
105+
)
106+
]
107+
self.flow_configs_by_name = {flow.name: flow for flow in flows}
108+
self.flow_configs_created = {flow.name for flow in flows}
76109

77110
self.rule_service = RuleService(channel)
78111
if config.rules:
@@ -94,6 +127,10 @@ def ingest(self, *requests: IngestWithConfigDataStreamRequest):
94127
"""
95128
Perform data ingestion.
96129
"""
130+
# Perform lazy flow registration if needed
131+
if self.use_lazy_flow_creation:
132+
self._lazy_flow_creation(*requests)
133+
97134
self.ingest_service_stub.IngestWithConfigDataStream(iter(requests))
98135

99136
def ingest_flows(self, *flows: FlowOrderedChannelValues):
@@ -110,6 +147,10 @@ def ingest_flows(self, *flows: FlowOrderedChannelValues):
110147
req = self.create_ingestion_request(flow_name, timestamp, channel_values)
111148
requests.append(req)
112149

150+
# Perform lazy flow registration if needed
151+
if self.use_lazy_flow_creation:
152+
self._lazy_flow_creation(*requests)
153+
113154
self.ingest_service_stub.IngestWithConfigDataStream(iter(requests))
114155

115156
def try_ingest_flows(self, *flows: Flow):
@@ -126,6 +167,10 @@ def try_ingest_flows(self, *flows: Flow):
126167
req = self.try_create_ingestion_request(flow_name, timestamp, channel_values)
127168
requests.append(req)
128169

170+
# Perform lazy flow registration if needed
171+
if self.use_lazy_flow_creation:
172+
self._lazy_flow_creation(*requests)
173+
129174
self.ingest_service_stub.IngestWithConfigDataStream(iter(requests))
130175

131176
def attach_run(
@@ -374,6 +419,7 @@ def try_create_flow(self, *flow_config: FlowConfig):
374419

375420
for fc in flow_config:
376421
self.flow_configs_by_name[fc.name] = fc
422+
self.flow_configs_created.add(fc.name)
377423

378424
def try_create_flows(self, *flow_configs: FlowConfig):
379425
"""
@@ -399,13 +445,60 @@ def create_flow(self, *flow_config: FlowConfig):
399445
)
400446
for fc in flow_config:
401447
self.flow_configs_by_name[fc.name] = fc
448+
self.flow_configs_created.add(fc.name)
402449

403450
def create_flows(self, *flow_configs: FlowConfig):
404451
"""
405452
See `create_flow`.
406453
"""
407454
return self.create_flow(*flow_configs)
408455

456+
def _lazy_flow_creation(self, *requests: IngestWithConfigDataStreamRequest):
457+
"""
458+
Used for lazy flow creation, which registers flows with sift as they are seen, instead of during
459+
the service initialization
460+
"""
461+
missing_flow_config_names: Set[str] = set()
462+
for request in requests:
463+
# Skip creation if already registered or an unknown flow name
464+
if (
465+
request.flow not in self.flow_configs_created
466+
and request.flow in self.flow_configs_by_name
467+
):
468+
missing_flow_config_names.add(request.flow)
469+
470+
flow_configs_to_create = [
471+
self.flow_configs_by_name[flow_config_name]
472+
for flow_config_name in missing_flow_config_names
473+
]
474+
475+
if flow_configs_to_create:
476+
try:
477+
create_flow_configs(
478+
self.transport_channel,
479+
self.ingestion_config.ingestion_config_id,
480+
flow_configs_to_create,
481+
)
482+
except (ProtobufMaxSizeExceededError, grpc.RpcError) as e:
483+
# Re-raise gRPC errors unless it just an ALREADY_EXISTS error
484+
if isinstance(e, grpc.RpcError):
485+
if e.code() != StatusCode.ALREADY_EXISTS:
486+
raise
487+
488+
# Try creating them individually instead
489+
for flow_config in flow_configs_to_create:
490+
try:
491+
create_flow_configs(
492+
self.transport_channel,
493+
self.ingestion_config.ingestion_config_id,
494+
[flow_config],
495+
)
496+
except grpc.RpcError as e:
497+
if e.code() != StatusCode.ALREADY_EXISTS:
498+
raise
499+
for flow_config in flow_configs_to_create:
500+
self.flow_configs_created.add(flow_config.name)
501+
409502
@staticmethod
410503
def _update_flow_configs(
411504
channel: SiftChannel, ingestion_config_id: str, telemetry_config: TelemetryConfig
@@ -464,27 +557,33 @@ def _update_flow_configs(
464557

465558
@classmethod
466559
def _get_or_create_ingestion_config(
467-
cls, channel: SiftChannel, config: TelemetryConfig
560+
cls, channel: SiftChannel, config: TelemetryConfig, lazy_flows: bool = False
468561
) -> IngestionConfig:
469562
"""
470563
Retrieves an existing ingestion config or creates a new one. If an existing ingestion config is fetched,
471564
then flows may be updated to reflect any changes that may have occured in the telemetry config.
565+
May raise `ProtobufMaxSizeExceeded` if a large number of flows needing updates or creation are passed
472566
"""
473567

474568
ingestion_config = get_ingestion_config_by_client_key(channel, config.ingestion_client_key)
475569

476570
# Exiting ingestion config.. update flows if necessary
477571
if ingestion_config is not None:
478-
if config._ingestion_client_key_is_generated:
572+
if config._ingestion_client_key_is_generated or lazy_flows:
479573
return ingestion_config
480574
else:
481575
cls._update_flow_configs(channel, ingestion_config.ingestion_config_id, config)
482576
return ingestion_config
483577

578+
if lazy_flows:
579+
config_flows = []
580+
else:
581+
config_flows = config.flows
582+
484583
ingestion_config = create_ingestion_config(
485584
channel,
486585
config.asset_name,
487-
config.flows,
586+
config_flows,
488587
config.ingestion_client_key,
489588
config.organization_id,
490589
)

python/lib/sift_py/ingestion/_internal/ingestion_config.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
IngestionConfigServiceStub,
2020
)
2121

22+
from sift_py.error import raise_if_too_large
2223
from sift_py.grpc.transport import SiftChannel
2324
from sift_py.ingestion.flow import FlowConfig
2425

@@ -54,6 +55,7 @@ def create_ingestion_config(
5455
) -> IngestionConfig:
5556
"""
5657
Creates a new ingestion config
58+
Will raise `ProtobufMaxSizeExceeded` if message size is too large to send
5759
"""
5860

5961
svc = IngestionConfigServiceStub(channel)
@@ -63,6 +65,9 @@ def create_ingestion_config(
6365
organization_id=organization_id or "",
6466
flows=[flow.as_pb(FlowConfigPb) for flow in flows],
6567
)
68+
69+
raise_if_too_large(req)
70+
6671
res = cast(CreateIngestionConfigResponse, svc.CreateIngestionConfig(req))
6772
return res.ingestion_config
6873

@@ -128,10 +133,14 @@ def create_flow_configs(
128133
):
129134
"""
130135
Adds flow configs to an existing ingestion config.
136+
Will raise `ProtobufMaxSizeExceeded` if message size is too large to send
131137
"""
132138
svc = IngestionConfigServiceStub(channel)
133139
req = CreateIngestionConfigFlowsRequest(
134140
ingestion_config_id=ingestion_config_id,
135141
flows=[f.as_pb(FlowConfigPb) for f in flow_configs],
136142
)
143+
144+
raise_if_too_large(req)
145+
137146
_ = cast(CreateIngestionConfigFlowsResponse, svc.CreateIngestionConfigFlows(req))

0 commit comments

Comments
 (0)