Skip to content

Commit f701255

Browse files
authored
Merge branch 'develop' into feature/implement-hamilton-framework-with-sdlc-11-28-51
2 parents c5375e5 + 93dd70f commit f701255

6 files changed

Lines changed: 362 additions & 0 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""Hamilton-inspired LLM pipeline example for the IACT project."""
2+
3+
from . import dataflow
4+
from .driver import HamiltonDriver, MissingDependencyError
5+
from .llm_client import MockLLMClient
6+
7+
__all__ = [
8+
"dataflow",
9+
"HamiltonDriver",
10+
"MissingDependencyError",
11+
"MockLLMClient",
12+
]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""Declarative dataflow modeling the Data → Prompt → LLM → $ pipeline.
2+
3+
The module captures the pace differences between aplicaciones ML tradicionales y
4+
aplicaciones LLM, destacando que ambas requieren habilidades fuertes de
5+
ingeniería de software. Cada función sigue el paradigma Hamilton: el nombre es
6+
el output y los argumentos son las dependencias explícitas.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from typing import Any, Dict, List
12+
13+
from .llm_client import MockLLMClient
14+
15+
PACE_OF_DEVELOPMENT: Dict[str, List[str]] = {
16+
"traditional_ml": [
17+
"Idea & Data/Resources",
18+
"Design",
19+
"Development/Prototype",
20+
"Model Development",
21+
"Getting to Production",
22+
"Operations",
23+
"Maintenance & Business Value",
24+
],
25+
"llm_apps": [
26+
"Idea & Data/Resources",
27+
"Design",
28+
"Development/Prototype",
29+
"Prompt / Model Development",
30+
"Getting to Production",
31+
"Operations",
32+
"Maintenance & Business Value",
33+
],
34+
}
35+
36+
DATAFLOW_LABEL = "Data → Prompt → LLM → $"
37+
38+
39+
def pace_of_development() -> Dict[str, List[str]]:
40+
"""Return the canonical ordering of fases para ML tradicional y apps LLM."""
41+
42+
return PACE_OF_DEVELOPMENT
43+
44+
45+
def prompt_template(
46+
idea: str,
47+
domain_data: Dict[str, str],
48+
pace_of_development: Dict[str, List[str]],
49+
) -> str:
50+
"""Create a template that contrasta los ritmos y exige prácticas SWE."""
51+
52+
traditional = " → ".join(pace_of_development["traditional_ml"])
53+
llm = " → ".join(pace_of_development["llm_apps"])
54+
return (
55+
"You are designing a Hamilton micro-orchestration experiment.\n"
56+
f"Traditional ML pace: {traditional}.\n"
57+
f"LLM app pace: {llm}.\n"
58+
"Explain how strong SWE practices (testing, modularity, reuse, portability)\n"
59+
"keep the system resilient while iterating quickly.\n"
60+
f"Business domain: {domain_data['business_process']} with UI {domain_data['ui']}.\n"
61+
f"Primary data assets: {domain_data['data']}.\n"
62+
f"Goal: deliver {idea} using Hamilton declarative functions.\n"
63+
)
64+
65+
66+
def llm_prompt(prompt_template: str, edge_cases: List[str]) -> str:
67+
"""Combine template with guardrails against edge cases y prompt injection."""
68+
69+
formatted_edge_cases = ", ".join(edge_cases)
70+
return (
71+
f"{prompt_template}"
72+
"Consider the following edge cases explicitly: "
73+
f"{formatted_edge_cases}.\n"
74+
"Detail the pipeline as Data → Prompt → LLM → $, highlighting how guardrails\n"
75+
"prevent prompt injection and balance evaluation with GPU cost awareness."
76+
)
77+
78+
79+
def llm_response(llm_prompt: str, llm_client: MockLLMClient) -> str:
80+
"""Obtain respuesta determinística del cliente LLM simulado."""
81+
82+
return llm_client.complete(llm_prompt)
83+
84+
85+
def prompt_token_estimate(llm_prompt: str, edge_cases: List[str]) -> int:
86+
"""Estimate token count con amortiguador para cobertura de edge cases."""
87+
88+
narrative_tokens = len(llm_prompt.split())
89+
scaled_tokens = round(narrative_tokens * 0.75)
90+
guardrail_tokens = len(edge_cases) * 3
91+
return max(scaled_tokens + guardrail_tokens, 120)
92+
93+
94+
def business_value(
95+
llm_response: str,
96+
pace_of_development: Dict[str, List[str]],
97+
) -> Dict[str, Any]:
98+
"""Empaquetar plan de acción y el contexto de ritmo de desarrollo."""
99+
100+
return {
101+
"llm_plan": llm_response,
102+
"pace": pace_of_development,
103+
"next_step": "Prototype with guarded prompts",
104+
}
105+
106+
107+
def cost_estimate(
108+
prompt_token_estimate: int,
109+
pricing_policy: Dict[str, float],
110+
) -> float:
111+
"""Calcular costo esperado usando tarifa por 1K tokens y factor de seguridad."""
112+
113+
price = pricing_policy["price_per_1k_tokens"]
114+
safety = pricing_policy.get("safety_multiplier", 1.0)
115+
return round((prompt_token_estimate / 1000) * price * safety, 6)
116+
117+
118+
__all__ = [
119+
"PACE_OF_DEVELOPMENT",
120+
"DATAFLOW_LABEL",
121+
"pace_of_development",
122+
"prompt_template",
123+
"llm_prompt",
124+
"llm_response",
125+
"prompt_token_estimate",
126+
"business_value",
127+
"cost_estimate",
128+
]
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Minimal Hamilton-like driver for executing declarative dataflows.
2+
3+
The real Hamilton framework provides a rich micro-orchestration engine. For the
4+
purposes of the repository we build a tiny subset that resolves dependencies by
5+
function name and executes only the nodes required to produce requested targets.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import inspect
11+
from types import ModuleType
12+
from typing import Any, Dict, Iterable, Mapping, Sequence
13+
14+
15+
class MissingDependencyError(RuntimeError):
16+
"""Raised when a dependency required by a node is not available."""
17+
18+
19+
class HamiltonDriver:
20+
"""Execute declarative functions registered from one or more modules.
21+
22+
Functions are registered by name and resolved lazily. Inputs provided via
23+
``execute`` act as seed values, mirroring Hamilton's configuration
24+
dictionary. Each execution resets the cache and produces a log of executed
25+
nodes so tests can assert on evaluation order.
26+
"""
27+
28+
def __init__(self, modules: Iterable[ModuleType]):
29+
self._functions: Dict[str, Any] = {}
30+
self.execution_log: list[str] = []
31+
for module in modules:
32+
self._register_module(module)
33+
34+
def _register_module(self, module: ModuleType) -> None:
35+
for name, candidate in vars(module).items():
36+
if inspect.isfunction(candidate):
37+
self._functions[name] = candidate
38+
39+
def execute(self, targets: Sequence[str], inputs: Mapping[str, Any]) -> Dict[str, Any]:
40+
cache: Dict[str, Any] = {}
41+
context: Dict[str, Any] = dict(inputs)
42+
self.execution_log = []
43+
44+
def resolve(name: str) -> Any:
45+
if name in cache:
46+
return cache[name]
47+
if name in context:
48+
return context[name]
49+
50+
func = self._functions.get(name)
51+
if func is None:
52+
raise MissingDependencyError(f"No data or function available for '{name}'")
53+
54+
signature = inspect.signature(func)
55+
kwargs: Dict[str, Any] = {}
56+
for parameter in signature.parameters.values():
57+
if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
58+
raise MissingDependencyError(
59+
f"Unsupported parameter kind for '{func.__name__}': {parameter.kind}"
60+
)
61+
dependency_name = parameter.name
62+
try:
63+
kwargs[dependency_name] = resolve(dependency_name)
64+
except MissingDependencyError as exc: # pragma: no cover - rephrase message
65+
raise MissingDependencyError(
66+
f"Function '{func.__name__}' requires missing dependency '{dependency_name}'"
67+
) from exc
68+
69+
value = func(**kwargs)
70+
cache[name] = value
71+
context[name] = value
72+
self.execution_log.append(name)
73+
return value
74+
75+
results = {target: resolve(target) for target in targets}
76+
return results
77+
78+
79+
__all__ = ["HamiltonDriver", "MissingDependencyError"]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Deterministic mock client emulating an LLM completion API."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Mapping
6+
7+
8+
class MockLLMClient:
9+
"""Return canned responses y exponer tarifa para estimar costos."""
10+
11+
def __init__(self, price_per_1k_tokens: float, response_catalog: Mapping[str, str]):
12+
self.price_per_1k_tokens = price_per_1k_tokens
13+
self._response_catalog = dict(response_catalog)
14+
15+
def complete(self, prompt: str) -> str:
16+
"""Return the first response cuyo identificador esté contenido en el prompt."""
17+
18+
lower_prompt = prompt.lower()
19+
for key, response in self._response_catalog.items():
20+
if key.lower() in lower_prompt:
21+
return response
22+
return self._response_catalog.get(
23+
"__default__",
24+
"Document modular functions, validate with pytest and guard against prompt injection.",
25+
)
26+
27+
28+
__all__ = ["MockLLMClient"]

scripts/coding/tests/ai/examples/__init__.py

Whitespace-only changes.
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""Tests for the Hamilton-style LLM dataflow example."""
2+
3+
import pytest
4+
5+
from infrastructure.workspace.hamilton_llm import dataflow
6+
from infrastructure.workspace.hamilton_llm import driver as mini_driver
7+
from infrastructure.workspace.hamilton_llm.driver import MissingDependencyError
8+
from infrastructure.workspace.hamilton_llm.llm_client import MockLLMClient
9+
10+
11+
def test_pace_of_development_metadata_matches_expected_sequence():
12+
"""Validate that the module exposes the canonical pacing differences described in the slides."""
13+
assert dataflow.PACE_OF_DEVELOPMENT["traditional_ml"] == [
14+
"Idea & Data/Resources",
15+
"Design",
16+
"Development/Prototype",
17+
"Model Development",
18+
"Getting to Production",
19+
"Operations",
20+
"Maintenance & Business Value",
21+
]
22+
assert dataflow.PACE_OF_DEVELOPMENT["llm_apps"] == [
23+
"Idea & Data/Resources",
24+
"Design",
25+
"Development/Prototype",
26+
"Prompt / Model Development",
27+
"Getting to Production",
28+
"Operations",
29+
"Maintenance & Business Value",
30+
]
31+
32+
33+
def test_hamilton_builder_executes_llm_business_flow():
34+
"""End-to-end execution should transform data into a business value package and cost estimate."""
35+
driver = (
36+
mini_driver.Builder()
37+
.with_modules(dataflow)
38+
.with_config({"pricing_policy": {"price_per_1k_tokens": 0.4, "safety_multiplier": 1.15}})
39+
.with_adapters(mini_driver.DictResult())
40+
.build()
41+
)
42+
mock_client = MockLLMClient(
43+
price_per_1k_tokens=0.4,
44+
response_catalog={
45+
"Data → Prompt → LLM → $": "Use Hamilton declarative functions to keep prompts versioned and guarded against injection."
46+
},
47+
)
48+
49+
inputs = {
50+
"idea": "AI copilots for compliance analysts",
51+
"domain_data": {
52+
"data": "archived compliance tickets",
53+
"ui": "browser extension",
54+
"business_process": "regulatory audit",
55+
},
56+
"edge_cases": [
57+
"Input state space",
58+
"Guard against prompt injection",
59+
"Domain expertise",
60+
"Evaluation",
61+
"Cost/GPUs",
62+
],
63+
"llm_client": mock_client,
64+
}
65+
66+
result = driver.execute(["business_value", "cost_estimate"], inputs)
67+
68+
assert result["business_value"]["llm_plan"].startswith("Use Hamilton declarative functions")
69+
assert result["business_value"]["next_step"] == "Prototype with guarded prompts"
70+
assert pytest.approx(result["cost_estimate"], rel=1e-3) == 0.0552
71+
72+
executed = driver.execution_log
73+
assert executed[-1] == "cost_estimate"
74+
assert "llm_response" in executed
75+
assert executed.index("business_value") < executed.index("cost_estimate")
76+
77+
78+
def test_driver_reports_missing_inputs():
79+
driver = mini_driver.Builder().with_modules(dataflow).build()
80+
mock_client = MockLLMClient(price_per_1k_tokens=0.5, response_catalog={})
81+
82+
with pytest.raises(MissingDependencyError) as exc:
83+
driver.execute(
84+
["cost_estimate"],
85+
{
86+
"idea": "Guardrails demo",
87+
"domain_data": {"data": "logs", "ui": "cli", "business_process": "ops"},
88+
"edge_cases": ["Injection"],
89+
"llm_client": mock_client,
90+
},
91+
)
92+
93+
assert "pricing_policy" in str(exc.value)
94+
95+
96+
def test_builder_requires_modules_before_building():
97+
with pytest.raises(ValueError) as exc:
98+
mini_driver.Builder().build()
99+
100+
assert "modules" in str(exc.value).lower()
101+
102+
103+
def test_custom_adapter_transforms_execution_result():
104+
class KeysAdapter:
105+
def __call__(self, results):
106+
return tuple(sorted(results))
107+
108+
driver = (
109+
mini_driver.Builder()
110+
.with_modules(dataflow)
111+
.with_adapters(KeysAdapter())
112+
.build()
113+
)
114+
115+
assert driver.execute(["pace_of_development"], {}) == ("pace_of_development",)

0 commit comments

Comments
 (0)