Skip to content

Commit 63d5aad

Browse files
devatsecureclaude
andcommitted
feat: Wire Temporal orchestrator into hybrid_analyzer pipeline
Added _try_temporal_execution method with config toggle (enable_temporal). Gracefully falls back to direct execution when temporalio not installed or Temporal server unreachable. 13 tests covering all paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8f5f8e5 commit 63d5aad

File tree

3 files changed

+727
-44
lines changed

3 files changed

+727
-44
lines changed

scripts/hybrid_analyzer.py

Lines changed: 163 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,18 @@
168168
_MCP_IMPORT_OK = False
169169
_MCP_LIB_OK = False
170170

171+
# Temporal orchestrator (optional execution backend)
172+
try:
173+
from temporal_orchestrator import (
174+
AuditWorkflowRunner,
175+
PipelineActivities,
176+
)
177+
178+
_TEMPORAL_IMPORT_OK = True
179+
except ImportError:
180+
_TEMPORAL_IMPORT_OK = False
181+
_TEMPORAL_LIB_OK = False
182+
171183

172184
class HybridSecurityAnalyzer:
173185
"""
@@ -580,19 +592,27 @@ def __init__(
580592

581593
# Validation: At least one scanner or AI enrichment must be enabled
582594
active_features = [
583-
name for name in (
584-
"semgrep", "trivy", "checkov", "api_security", "dast",
585-
"supply_chain", "fuzzing", "threat_intel", "remediation",
586-
"runtime_security", "regression_testing", "ai_enrichment",
587-
"nuclei_templates", "zap_baseline",
595+
name
596+
for name in (
597+
"semgrep",
598+
"trivy",
599+
"checkov",
600+
"api_security",
601+
"dast",
602+
"supply_chain",
603+
"fuzzing",
604+
"threat_intel",
605+
"remediation",
606+
"runtime_security",
607+
"regression_testing",
608+
"ai_enrichment",
609+
"nuclei_templates",
610+
"zap_baseline",
588611
)
589612
if getattr(self, f"enable_{name}", False)
590613
]
591614
if not active_features:
592-
raise ValueError(
593-
"At least one tool must be enabled! "
594-
"Use --help to see available scanner flags."
595-
)
615+
raise ValueError("At least one tool must be enabled! Use --help to see available scanner flags.")
596616

597617
def analyze(
598618
self, target_path: str, output_dir: Optional[str] = None, severity_filter: Optional[list[str]] = None
@@ -629,9 +649,24 @@ def analyze(
629649
logger.info("Tools: %s", self._get_enabled_tools())
630650
logger.info("")
631651

652+
# -- Temporal execution backend (optional) --
653+
if self.config.get("enable_temporal", False):
654+
temporal_result = self._try_temporal_execution(
655+
target_path=target_path,
656+
output_dir=output_dir,
657+
severity_filter=severity_filter,
658+
)
659+
if temporal_result is not None:
660+
return temporal_result
661+
# Fall-through: Temporal was requested but unavailable/failed.
662+
# The warning was already logged inside _try_temporal_execution.
663+
632664
# -- PHASE 0: MCP Server Status --
633665
if self._mcp_started:
634-
logger.info("Phase 0: MCP server is running (background thread: %s)", self._mcp_thread.name if self._mcp_thread else "unknown")
666+
logger.info(
667+
"Phase 0: MCP server is running (background thread: %s)",
668+
self._mcp_thread.name if self._mcp_thread else "unknown",
669+
)
635670
elif self.config.get("enable_mcp_server", False):
636671
logger.info("Phase 0: MCP server enabled but not running (startup may have failed)")
637672

@@ -678,9 +713,27 @@ def analyze(
678713
# scan_codebase expects list of {"path": ..., "content": ...} dicts
679714
_heuristic_files = []
680715
_target = Path(target_path)
681-
_heuristic_exts = {".py", ".js", ".ts", ".tsx", ".jsx", ".java", ".go", ".rb", ".yml", ".yaml", ".json", ".tf"}
716+
_heuristic_exts = {
717+
".py",
718+
".js",
719+
".ts",
720+
".tsx",
721+
".jsx",
722+
".java",
723+
".go",
724+
".rb",
725+
".yml",
726+
".yaml",
727+
".json",
728+
".tf",
729+
}
682730
for fp in _target.rglob("*"):
683-
if fp.is_file() and fp.suffix in _heuristic_exts and ".git" not in fp.parts and "node_modules" not in fp.parts:
731+
if (
732+
fp.is_file()
733+
and fp.suffix in _heuristic_exts
734+
and ".git" not in fp.parts
735+
and "node_modules" not in fp.parts
736+
):
684737
with contextlib.suppress(Exception):
685738
_heuristic_files.append({"path": str(fp), "content": fp.read_text(errors="ignore")})
686739
if len(_heuristic_files) >= 500:
@@ -813,6 +866,95 @@ def analyze(
813866

814867
return result
815868

869+
# ------------------------------------------------------------------
870+
# Temporal execution backend
871+
# ------------------------------------------------------------------
872+
873+
def _try_temporal_execution(
874+
self,
875+
target_path: str,
876+
output_dir: Optional[str],
877+
severity_filter: Optional[list[str]],
878+
) -> Optional[HybridScanResult]:
879+
"""Attempt to run the pipeline via the Temporal orchestrator.
880+
881+
Returns a ``HybridScanResult`` if Temporal execution succeeds, or
882+
``None`` if Temporal is unavailable / fails so the caller should
883+
fall back to direct execution.
884+
885+
Graceful degradation hierarchy:
886+
1. ``temporal_orchestrator`` module not importable -> warn, return None
887+
2. ``temporalio`` library not installed -> warn, return None
888+
3. Workflow execution raises any exception -> warn, return None
889+
"""
890+
if not _TEMPORAL_IMPORT_OK:
891+
logger.warning(
892+
"Temporal enabled in config but temporal_orchestrator module "
893+
"could not be imported. Falling back to direct execution."
894+
)
895+
return None
896+
897+
retry_mode = self.config.get("temporal_retry_mode", "production")
898+
899+
try:
900+
runner = AuditWorkflowRunner(
901+
activities=PipelineActivities(config=self.config),
902+
retry_mode=retry_mode,
903+
)
904+
logger.info("Running pipeline via Temporal orchestrator (mode=%s)", retry_mode)
905+
runner.run(repo_path=target_path, config=self.config)
906+
907+
# Log summary from Temporal execution
908+
summary = runner.get_summary()
909+
logger.info(
910+
"Temporal workflow completed: %d/%d phases succeeded",
911+
summary.get("completed_phases", 0),
912+
summary.get("total_phases", 0),
913+
)
914+
for pname, pdetail in summary.get("phases", {}).items():
915+
status = pdetail.get("status", "unknown")
916+
duration = pdetail.get("duration_seconds", 0.0)
917+
if status == "failed":
918+
logger.warning(
919+
" Phase %s: %s (%.1fs) — %s",
920+
pname,
921+
status,
922+
duration,
923+
pdetail.get("error", ""),
924+
)
925+
else:
926+
logger.info(" Phase %s: %s (%.1fs)", pname, status, duration)
927+
928+
# After Temporal execution, run the normal analyze() path for the
929+
# full result assembly. Temporal adds crash-recovery and retry
930+
# semantics around the same phase logic; the final reporting still
931+
# goes through the standard code path.
932+
#
933+
# Re-invoke analyze() with Temporal disabled to avoid recursion
934+
# and get the full HybridScanResult with SARIF/JSON/Markdown.
935+
original_toggle = self.config.get("enable_temporal", False)
936+
self.config["enable_temporal"] = False
937+
try:
938+
result = self.analyze(
939+
target_path=target_path,
940+
output_dir=output_dir,
941+
severity_filter=severity_filter,
942+
)
943+
finally:
944+
self.config["enable_temporal"] = original_toggle
945+
946+
# Attach Temporal workflow metadata to the result
947+
result.__dict__["temporal_summary"] = summary
948+
949+
return result
950+
951+
except Exception as exc:
952+
logger.warning(
953+
"Temporal execution failed: %s. Falling back to direct execution.",
954+
exc,
955+
)
956+
return None
957+
816958
# ------------------------------------------------------------------
817959
# Vulnerability enrichment pipeline (v2.0)
818960
# ------------------------------------------------------------------
@@ -835,7 +977,9 @@ def _enrich_findings(self, findings: list[HybridFinding], target_path: str) -> l
835977
from enrichment_pipeline import run_enrichment_pipeline
836978

837979
finding_dicts, _enrichment_meta = run_enrichment_pipeline(
838-
finding_dicts, self.config, target_path,
980+
finding_dicts,
981+
self.config,
982+
target_path,
839983
)
840984

841985
# -- License risk scoring (hybrid_analyzer-specific, SBOM-based) --
@@ -847,10 +991,12 @@ def _enrich_findings(self, findings: list[HybridFinding], target_path: str) -> l
847991
pkg = fd.get("cve_id") and fd.get("title", "")
848992
if pkg and " in " in pkg:
849993
pkg_name = pkg.split(" in ")[-1].strip()
850-
components.append({
851-
"name": pkg_name,
852-
"version": fd.get("installed_version", "unknown"),
853-
})
994+
components.append(
995+
{
996+
"name": pkg_name,
997+
"version": fd.get("installed_version", "unknown"),
998+
}
999+
)
8541000
if components:
8551001
risks = license_scorer.score_components(components)
8561002
if risks:

scripts/temporal_orchestrator.py

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
99
Requires: temporalio>=1.7.0 (optional dependency)
1010
"""
11+
1112
from __future__ import annotations
1213

1314
import logging
@@ -254,19 +255,15 @@ def __init__(
254255
retry_mode: str = "production",
255256
):
256257
self._activities = activities or PipelineActivities()
257-
self._retry_policy = RETRY_POLICIES.get(
258-
retry_mode, RETRY_POLICIES["production"]
259-
)
258+
self._retry_policy = RETRY_POLICIES.get(retry_mode, RETRY_POLICIES["production"])
260259
self._phase_results: dict[str, PhaseResult] = {}
261260

262261
@property
263262
def phase_results(self) -> dict[str, PhaseResult]:
264263
"""Return results from all completed phases."""
265264
return dict(self._phase_results)
266265

267-
def run(
268-
self, repo_path: str, config: dict[str, Any] | None = None
269-
) -> dict[str, PhaseResult]:
266+
def run(self, repo_path: str, config: dict[str, Any] | None = None) -> dict[str, PhaseResult]:
270267
"""Execute all 6 phases sequentially.
271268
272269
Each phase receives the output of the previous phase.
@@ -303,9 +300,7 @@ def run(
303300
self._phase_results[phase_name] = result
304301

305302
if result.status == "failed":
306-
logger.error(
307-
"Phase %s failed: %s", phase_name, result.error
308-
)
303+
logger.error("Phase %s failed: %s", phase_name, result.error)
309304
# In strict mode, halt on any failure
310305
if config.get("phase_gate_strict", False):
311306
break
@@ -342,16 +337,8 @@ def get_summary(self) -> dict[str, Any]:
342337
"""
343338
return {
344339
"total_phases": len(PIPELINE_PHASES),
345-
"completed_phases": sum(
346-
1
347-
for r in self._phase_results.values()
348-
if r.status == "success"
349-
),
350-
"failed_phases": sum(
351-
1
352-
for r in self._phase_results.values()
353-
if r.status == "failed"
354-
),
340+
"completed_phases": sum(1 for r in self._phase_results.values() if r.status == "success"),
341+
"failed_phases": sum(1 for r in self._phase_results.values() if r.status == "failed"),
355342
"phases": {
356343
name: {
357344
"status": result.status,
@@ -386,10 +373,7 @@ async def create_temporal_client(server: str = "localhost:7233") -> Any:
386373
If ``temporalio`` is not installed.
387374
"""
388375
if not TEMPORAL_AVAILABLE:
389-
raise RuntimeError(
390-
"temporalio package not installed. "
391-
"Install with: pip install temporalio>=1.7.0"
392-
)
376+
raise RuntimeError("temporalio package not installed. Install with: pip install temporalio>=1.7.0")
393377
return await Client.connect(server)
394378

395379

@@ -415,10 +399,7 @@ async def start_temporal_worker(
415399
If ``temporalio`` is not installed.
416400
"""
417401
if not TEMPORAL_AVAILABLE:
418-
raise RuntimeError(
419-
"temporalio package not installed. "
420-
"Install with: pip install temporalio>=1.7.0"
421-
)
402+
raise RuntimeError("temporalio package not installed. Install with: pip install temporalio>=1.7.0")
422403
activities_instance = PipelineActivities()
423404
return Worker(
424405
client,

0 commit comments

Comments
 (0)