Skip to content

Commit 440d347

Browse files
RoiGlinikarikalon1
authored andcommitted
MAIN-2389 Stream actions (#1665)
* support new stream actions workflow * stream holmes result
1 parent 2ab31b6 commit 440d347

7 files changed

Lines changed: 109 additions & 31 deletions

File tree

src/robusta/core/model/base_params.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class AIInvestigateParams(HolmesParams):
111111
ask: Optional[str]
112112
context: Optional[Dict[str, Any]]
113113
sections: Optional[Dict[str, str]] = None
114+
stream: bool = False
114115

115116

116117
class HolmesToolsResult(BaseModel):

src/robusta/core/model/events.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from collections import defaultdict
55
from dataclasses import dataclass, field
66
from enum import Enum
7-
from typing import Any, Dict, List, Optional
7+
from typing import Any, Dict, List, Optional, Callable
88

99
from pydantic import BaseModel
1010

@@ -59,6 +59,7 @@ class ExecutionBaseEvent:
5959
_scheduler: Optional[PlaybooksScheduler] = None
6060
_context: Optional[ExecutionContext] = None
6161
_event_emitter: Optional[EventEmitter] = None
62+
_ws: Optional[Callable[[str], None]] = None
6263

6364
def set_context(self, context: ExecutionContext):
6465
self._context = context

src/robusta/core/playbooks/internal/ai_integration.py

Lines changed: 35 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -62,35 +62,43 @@ def ask_holmes(event: ExecutionBaseEvent, params: AIInvestigateParams):
6262
include_tool_call_results=True,
6363
sections=params.sections
6464
)
65-
result = requests.post(f"{holmes_url}/api/investigate", data=holmes_req.json())
66-
result.raise_for_status()
6765

68-
holmes_result = HolmesResult(**json.loads(result.text))
69-
title_suffix = (
70-
f" on {params.resource.name}"
71-
if params.resource and params.resource.name and params.resource.name.lower() != "unresolved"
72-
else ""
73-
)
74-
75-
kind = params.resource.kind if params.resource else None
76-
finding = Finding(
77-
title=f"AI Analysis of {investigation__title}{title_suffix}",
78-
aggregation_key="HolmesInvestigationResult",
79-
subject=FindingSubject(
80-
name=params.resource.name if params.resource else "",
81-
namespace=params.resource.namespace if params.resource else "",
82-
subject_type=FindingSubjectType.from_kind(kind) if kind else FindingSubjectType.TYPE_NONE,
83-
node=params.resource.node if params.resource else "",
84-
container=params.resource.container if params.resource else "",
85-
),
86-
finding_type=FindingType.AI_ANALYSIS,
87-
failure=False,
88-
)
89-
finding.add_enrichment(
90-
[HolmesResultsBlock(holmes_result=holmes_result)], enrichment_type=EnrichmentType.ai_analysis
91-
)
66+
if params.stream:
67+
with requests.post(f"{holmes_url}/api/stream/investigate", data=holmes_req.json(), stream=True) as resp:
68+
for line in resp.iter_content(chunk_size=None, decode_unicode=True): # Avoid streaming chunks from holmes. send them as they arrive.
69+
event.ws(data=line)
70+
return
9271

93-
event.add_finding(finding)
72+
else:
73+
result = requests.post(f"{holmes_url}/api/investigate", data=holmes_req.json())
74+
result.raise_for_status()
75+
76+
holmes_result = HolmesResult(**json.loads(result.text))
77+
title_suffix = (
78+
f" on {params.resource.name}"
79+
if params.resource and params.resource.name and params.resource.name.lower() != "unresolved"
80+
else ""
81+
)
82+
83+
kind = params.resource.kind if params.resource else None
84+
finding = Finding(
85+
title=f"AI Analysis of {investigation__title}{title_suffix}",
86+
aggregation_key="HolmesInvestigationResult",
87+
subject=FindingSubject(
88+
name=params.resource.name if params.resource else "",
89+
namespace=params.resource.namespace if params.resource else "",
90+
subject_type=FindingSubjectType.from_kind(kind) if kind else FindingSubjectType.TYPE_NONE,
91+
node=params.resource.node if params.resource else "",
92+
container=params.resource.container if params.resource else "",
93+
),
94+
finding_type=FindingType.AI_ANALYSIS,
95+
failure=False,
96+
)
97+
finding.add_enrichment(
98+
[HolmesResultsBlock(holmes_result=holmes_result)], enrichment_type=EnrichmentType.ai_analysis
99+
)
100+
101+
event.add_finding(finding)
94102

95103
except Exception as e:
96104
logging.exception(

src/robusta/core/playbooks/playbooks_event_handler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Any, Dict, List, Optional
2+
from typing import Any, Callable, Dict, List, Optional
33

44
from robusta.core.model.events import ExecutionBaseEvent
55
from robusta.core.playbooks.base_trigger import TriggerEvent
@@ -39,6 +39,13 @@ def run_external_action(
3939
"""Execute an external action"""
4040
pass
4141

42+
@abstractmethod
43+
def run_external_stream_action(
44+
self, action_name: str, action_params: Optional[dict], stream: Callable[str, Optional[str]]
45+
) -> Optional[Dict[str, Any]]:
46+
"""Execute an external stream action"""
47+
pass
48+
4249
@abstractmethod
4350
def get_global_config(self) -> dict:
4451
"""Return runner global config"""

src/robusta/core/playbooks/playbooks_event_handler_impl.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def __run_playbook_actions(
197197
start_time = time.time()
198198
source: str = (
199199
"manual_action"
200-
if any(name == SYNC_RESPONSE_SINK for name in getattr(execution_event, "named_sinks", []))
200+
if any(name == SYNC_RESPONSE_SINK for name in (execution_event.named_sinks or []))
201201
else ""
202202
)
203203
self.__prepare_execution_event(execution_event)
@@ -368,3 +368,37 @@ def handle_sigint(self, sig, frame):
368368

369369
self.set_cluster_active(False)
370370
sys.exit(0)
371+
372+
def run_external_stream_action(
373+
self, action_name: str, action_params: Optional[dict], ws
374+
) -> Optional[Dict[str, Any]]:
375+
action_def = self.registry.get_actions().get_action(action_name)
376+
if not action_def:
377+
return self.__error_resp(f"External action not found {action_name}", ErrorCodes.ACTION_NOT_FOUND.value)
378+
379+
if not action_def.from_params_func:
380+
return self.__error_resp(
381+
f"Action {action_name} cannot run using external event", ErrorCodes.NOT_EXTERNAL_ACTION.value
382+
)
383+
384+
try:
385+
instantiation_params = action_def.from_params_parameter_class(**action_params)
386+
except Exception:
387+
return self.__error_resp(
388+
f"Failed to create execution instance for"
389+
f" {action_name} {action_def.from_params_parameter_class}"
390+
f" {action_params} {traceback.format_exc()}",
391+
ErrorCodes.EVENT_PARAMS_INSTANTIATION_FAILED.value,
392+
)
393+
394+
execution_event = action_def.from_params_func(instantiation_params)
395+
if not execution_event:
396+
return self.__error_resp(
397+
f"Failed to create execution event for {action_name} {action_params}",
398+
ErrorCodes.EVENT_INSTANTIATION_FAILED.value,
399+
)
400+
401+
execution_event.ws = ws
402+
playbook_action = PlaybookAction(action_name=action_name, action_params=action_params)
403+
404+
return self.__run_playbook_actions(execution_event, [playbook_action])

src/robusta/core/reporting/action_requests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class ExternalActionRequest(BaseModel):
2727
partial_auth_b: str = "" # Auth for public key auth protocol option - should be added by the relay
2828
request_id: str = "" # If specified, should return a sync response using the specified request_id
2929
no_sinks: bool = False # Indicates not to send to sinks at all. The request body has a sink list,
30+
stream: bool = False
3031
# however an empty sink list means using the server default sinks
3132

3233

src/robusta/integrations/receiver.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,12 @@ def stop(self):
136136
def __sync_response(cls, status_code: int, request_id: str, data) -> Dict:
137137
return {"action": "response", "request_id": request_id, "status_code": status_code, "data": data}
138138

139+
def __stream_response(self, request_id: str, data: str):
140+
self.ws.send(data=json.dumps({"action": "stream", "request_id": request_id, "data": data}))
141+
142+
def __close_stream_response(self, request_id: str, data: str):
143+
self.ws.send(data=json.dumps({"action": "stream", "request_id": request_id, "data": data, "close": True}))
144+
139145
def __exec_external_request(self, action_request: ExternalActionRequest, validate_timestamp: bool):
140146
logging.debug(f"Callback `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)}")
141147
sync_response = action_request.request_id != "" # if request_id is set, we need to write back the response
@@ -175,6 +181,23 @@ def __exec_external_request(self, action_request: ExternalActionRequest, validat
175181
http_code = 200 if response.get("success") else 500
176182
self.ws.send(data=json.dumps(self.__sync_response(http_code, action_request.request_id, response)))
177183

184+
def __exec_external_stream_request(self, action_request: ExternalActionRequest, validate_timestamp: bool):
185+
logging.debug(f"Callback `{action_request.body.action_name}` {to_safe_str(action_request.body.action_params)}")
186+
187+
validation_response = self.__validate_request(action_request, validate_timestamp)
188+
if validation_response.http_code != 200:
189+
req_json = action_request.json(exclude={"body"})
190+
body_json = action_request.body.json(exclude={"action_params"}) # action params already printed above
191+
logging.error(f"Failed to validate action request {req_json} {body_json}")
192+
self.__close_stream_response(action_request.request_id, validation_response.dict(exclude={"http_code"}))
193+
return
194+
195+
res = self.event_handler.run_external_stream_action(action_request.body.action_name,
196+
action_request.body.action_params,
197+
lambda data: self.__stream_response(request_id=action_request.request_id, data=data))
198+
res = "" if res.get("success") else json.dumps(res)
199+
self.__close_stream_response(action_request.request_id, res)
200+
178201
def _process_action(self, action: ExternalActionRequest, validate_timestamp: bool) -> None:
179202
self._executor.submit(self._process_action_sync, action, validate_timestamp)
180203

@@ -189,7 +212,10 @@ def _process_action_sync(self, action: ExternalActionRequest, validate_timestamp
189212
else:
190213
ctx = nullcontext()
191214
with ctx:
192-
self.__exec_external_request(action, validate_timestamp)
215+
if action.stream:
216+
self.__exec_external_stream_request(action, validate_timestamp)
217+
else:
218+
self.__exec_external_request(action, validate_timestamp)
193219
except Exception:
194220
logging.error(
195221
f"Failed to run incoming event {self._stringify_incoming_event(action.dict())}",

0 commit comments

Comments
 (0)