Skip to content

Commit d6c1243

Browse files
authored
Merge branch 'develop' into feature/implement-hamilton-framework-with-sdlc-11-10-26
2 parents ce3b53b + e164755 commit d6c1243

4 files changed

Lines changed: 247 additions & 0 deletions

File tree

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
"""Hamilton-inspired LLM pipeline example for the IACT project."""
2+
3+
from . import dataflow
4+
from .driver import HamiltonDriver, MissingDependencyError
5+
from .llm_client import MockLLMClient
6+
7+
__all__ = [
8+
"dataflow",
9+
"HamiltonDriver",
10+
"MissingDependencyError",
11+
"MockLLMClient",
12+
]
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
"""Declarative dataflow modeling the Data → Prompt → LLM → $ pipeline.
2+
3+
The module captures the pace differences between aplicaciones ML tradicionales y
4+
aplicaciones LLM, destacando que ambas requieren habilidades fuertes de
5+
ingeniería de software. Cada función sigue el paradigma Hamilton: el nombre es
6+
el output y los argumentos son las dependencias explícitas.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from typing import Any, Dict, List
12+
13+
from .llm_client import MockLLMClient
14+
15+
PACE_OF_DEVELOPMENT: Dict[str, List[str]] = {
16+
"traditional_ml": [
17+
"Idea & Data/Resources",
18+
"Design",
19+
"Development/Prototype",
20+
"Model Development",
21+
"Getting to Production",
22+
"Operations",
23+
"Maintenance & Business Value",
24+
],
25+
"llm_apps": [
26+
"Idea & Data/Resources",
27+
"Design",
28+
"Development/Prototype",
29+
"Prompt / Model Development",
30+
"Getting to Production",
31+
"Operations",
32+
"Maintenance & Business Value",
33+
],
34+
}
35+
36+
DATAFLOW_LABEL = "Data → Prompt → LLM → $"
37+
38+
39+
def pace_of_development() -> Dict[str, List[str]]:
40+
"""Return the canonical ordering of fases para ML tradicional y apps LLM."""
41+
42+
return PACE_OF_DEVELOPMENT
43+
44+
45+
def prompt_template(
46+
idea: str,
47+
domain_data: Dict[str, str],
48+
pace_of_development: Dict[str, List[str]],
49+
) -> str:
50+
"""Create a template that contrasta los ritmos y exige prácticas SWE."""
51+
52+
traditional = " → ".join(pace_of_development["traditional_ml"])
53+
llm = " → ".join(pace_of_development["llm_apps"])
54+
return (
55+
"You are designing a Hamilton micro-orchestration experiment.\n"
56+
f"Traditional ML pace: {traditional}.\n"
57+
f"LLM app pace: {llm}.\n"
58+
"Explain how strong SWE practices (testing, modularity, reuse, portability)\n"
59+
"keep the system resilient while iterating quickly.\n"
60+
f"Business domain: {domain_data['business_process']} with UI {domain_data['ui']}.\n"
61+
f"Primary data assets: {domain_data['data']}.\n"
62+
f"Goal: deliver {idea} using Hamilton declarative functions.\n"
63+
)
64+
65+
66+
def llm_prompt(prompt_template: str, edge_cases: List[str]) -> str:
67+
"""Combine template with guardrails against edge cases y prompt injection."""
68+
69+
formatted_edge_cases = ", ".join(edge_cases)
70+
return (
71+
f"{prompt_template}"
72+
"Consider the following edge cases explicitly: "
73+
f"{formatted_edge_cases}.\n"
74+
"Detail the pipeline as Data → Prompt → LLM → $, highlighting how guardrails\n"
75+
"prevent prompt injection and balance evaluation with GPU cost awareness."
76+
)
77+
78+
79+
def llm_response(llm_prompt: str, llm_client: MockLLMClient) -> str:
80+
"""Obtain respuesta determinística del cliente LLM simulado."""
81+
82+
return llm_client.complete(llm_prompt)
83+
84+
85+
def prompt_token_estimate(llm_prompt: str, edge_cases: List[str]) -> int:
86+
"""Estimate token count con amortiguador para cobertura de edge cases."""
87+
88+
narrative_tokens = len(llm_prompt.split())
89+
scaled_tokens = round(narrative_tokens * 0.75)
90+
guardrail_tokens = len(edge_cases) * 3
91+
return max(scaled_tokens + guardrail_tokens, 120)
92+
93+
94+
def business_value(
95+
llm_response: str,
96+
pace_of_development: Dict[str, List[str]],
97+
) -> Dict[str, Any]:
98+
"""Empaquetar plan de acción y el contexto de ritmo de desarrollo."""
99+
100+
return {
101+
"llm_plan": llm_response,
102+
"pace": pace_of_development,
103+
"next_step": "Prototype with guarded prompts",
104+
}
105+
106+
107+
def cost_estimate(
108+
prompt_token_estimate: int,
109+
pricing_policy: Dict[str, float],
110+
) -> float:
111+
"""Calcular costo esperado usando tarifa por 1K tokens y factor de seguridad."""
112+
113+
price = pricing_policy["price_per_1k_tokens"]
114+
safety = pricing_policy.get("safety_multiplier", 1.0)
115+
return round((prompt_token_estimate / 1000) * price * safety, 6)
116+
117+
118+
__all__ = [
119+
"PACE_OF_DEVELOPMENT",
120+
"DATAFLOW_LABEL",
121+
"pace_of_development",
122+
"prompt_template",
123+
"llm_prompt",
124+
"llm_response",
125+
"prompt_token_estimate",
126+
"business_value",
127+
"cost_estimate",
128+
]
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Minimal Hamilton-like driver for executing declarative dataflows.
2+
3+
The real Hamilton framework provides a rich micro-orchestration engine. For the
4+
purposes of the repository we build a tiny subset that resolves dependencies by
5+
function name and executes only the nodes required to produce requested targets.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import inspect
11+
from types import ModuleType
12+
from typing import Any, Dict, Iterable, Mapping, Sequence
13+
14+
15+
class MissingDependencyError(RuntimeError):
16+
"""Raised when a dependency required by a node is not available."""
17+
18+
19+
class HamiltonDriver:
20+
"""Execute declarative functions registered from one or more modules.
21+
22+
Functions are registered by name and resolved lazily. Inputs provided via
23+
``execute`` act as seed values, mirroring Hamilton's configuration
24+
dictionary. Each execution resets the cache and produces a log of executed
25+
nodes so tests can assert on evaluation order.
26+
"""
27+
28+
def __init__(self, modules: Iterable[ModuleType]):
29+
self._functions: Dict[str, Any] = {}
30+
self.execution_log: list[str] = []
31+
for module in modules:
32+
self._register_module(module)
33+
34+
def _register_module(self, module: ModuleType) -> None:
35+
for name, candidate in vars(module).items():
36+
if inspect.isfunction(candidate):
37+
self._functions[name] = candidate
38+
39+
def execute(self, targets: Sequence[str], inputs: Mapping[str, Any]) -> Dict[str, Any]:
40+
cache: Dict[str, Any] = {}
41+
context: Dict[str, Any] = dict(inputs)
42+
self.execution_log = []
43+
44+
def resolve(name: str) -> Any:
45+
if name in cache:
46+
return cache[name]
47+
if name in context:
48+
return context[name]
49+
50+
func = self._functions.get(name)
51+
if func is None:
52+
raise MissingDependencyError(f"No data or function available for '{name}'")
53+
54+
signature = inspect.signature(func)
55+
kwargs: Dict[str, Any] = {}
56+
for parameter in signature.parameters.values():
57+
if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
58+
raise MissingDependencyError(
59+
f"Unsupported parameter kind for '{func.__name__}': {parameter.kind}"
60+
)
61+
dependency_name = parameter.name
62+
try:
63+
kwargs[dependency_name] = resolve(dependency_name)
64+
except MissingDependencyError as exc: # pragma: no cover - rephrase message
65+
raise MissingDependencyError(
66+
f"Function '{func.__name__}' requires missing dependency '{dependency_name}'"
67+
) from exc
68+
69+
value = func(**kwargs)
70+
cache[name] = value
71+
context[name] = value
72+
self.execution_log.append(name)
73+
return value
74+
75+
results = {target: resolve(target) for target in targets}
76+
return results
77+
78+
79+
__all__ = ["HamiltonDriver", "MissingDependencyError"]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""Deterministic mock client emulating an LLM completion API."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Mapping
6+
7+
8+
class MockLLMClient:
9+
"""Return canned responses y exponer tarifa para estimar costos."""
10+
11+
def __init__(self, price_per_1k_tokens: float, response_catalog: Mapping[str, str]):
12+
self.price_per_1k_tokens = price_per_1k_tokens
13+
self._response_catalog = dict(response_catalog)
14+
15+
def complete(self, prompt: str) -> str:
16+
"""Return the first response cuyo identificador esté contenido en el prompt."""
17+
18+
lower_prompt = prompt.lower()
19+
for key, response in self._response_catalog.items():
20+
if key.lower() in lower_prompt:
21+
return response
22+
return self._response_catalog.get(
23+
"__default__",
24+
"Document modular functions, validate with pytest and guard against prompt injection.",
25+
)
26+
27+
28+
__all__ = ["MockLLMClient"]

0 commit comments

Comments
 (0)