Skip to content

Commit 36c727f

Browse files
fix: resolve P0/P1 bugs identified by code reviewers
- Fix P0 bug: agents_data.items() crash on list-format YAML agents - Add list-to-dict conversion in converter.py for common PraisonAI format - Fix P1 bug: subprocess pipe deadlock in up.py - Change stdout/stderr from PIPE to DEVNULL to prevent buffer overflow - Fix P1 bug: resource leak in observability setup - Make _setup_observability() idempotent to prevent multiple LangfuseSink creation - Fix P1 bug: double shutdown in signal handler - Simplify signal handler to raise KeyboardInterrupt, single shutdown in finally - Fix P2: wrong health endpoint (/health -> /api/v1/health) - Fix P2: duplicate client code - remove LangflowAPIClient, use LangflowClient - Fix CodeRabbit nitpicks: f-string, variable shadowing All fixes maintain backward compatibility. Core functionality tested. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
1 parent e5583c1 commit 36c727f

5 files changed

Lines changed: 50 additions & 94 deletions

File tree

src/praisonai/praisonai/cli/commands/flow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ def flow_import(
210210
health = client.health_check()
211211
if health["status"] != "healthy":
212212
console.print(f"[red]Error: Langflow server not accessible at {langflow_url}[/red]")
213-
console.print(f"[yellow]Make sure Langflow is running: praisonai flow[/yellow]")
213+
console.print("[yellow]Make sure Langflow is running: praisonai flow[/yellow]")
214214
raise typer.Abort()
215215

216216
# Upload flow

src/praisonai/praisonai/cli/commands/up.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ def add_service(self, cmd: List[str], name: str, env: Optional[dict] = None) ->
3333
proc = subprocess.Popen(
3434
cmd,
3535
env=env or os.environ.copy(),
36-
stdout=subprocess.PIPE,
37-
stderr=subprocess.PIPE,
36+
stdout=subprocess.DEVNULL,
37+
stderr=subprocess.DEVNULL,
3838
)
3939
self.services.append(proc)
4040
self.console.print(f"[green]✅ {name} started (PID: {proc.pid})[/green]")
@@ -57,7 +57,7 @@ def wait_for_service(self, url: str, service_name: str, timeout: int = 60) -> bo
5757
start_time = time.time()
5858
while time.time() - start_time < timeout:
5959
try:
60-
response = requests.get(f"{url}/health", timeout=5)
60+
response = requests.get(f"{url}/api/v1/health", timeout=5)
6161
if response.status_code == 200:
6262
self.console.print(f"[green]✅ {service_name} is ready![/green]")
6363
return True
@@ -263,8 +263,7 @@ def up_start(
263263

264264
# Setup signal handler for graceful shutdown
265265
def signal_handler(signum, frame):
266-
manager.shutdown_all()
267-
raise typer.Abort()
266+
raise KeyboardInterrupt() # Let the try/except/finally handle cleanup
268267

269268
signal.signal(signal.SIGINT, signal_handler)
270269
signal.signal(signal.SIGTERM, signal_handler)

src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agent.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,14 +424,25 @@ def build_response(self) -> Message:
424424

425425
def _setup_observability(self) -> None:
426426
"""Auto-configure observability from environment variables."""
427+
# Skip if already set up
428+
if getattr(self, "_observability_setup_done", False):
429+
return
430+
427431
import os
428432
observe = os.environ.get("PRAISONAI_OBSERVE", "")
429433
if observe == "langfuse":
430434
try:
431435
from praisonai.observability import LangfuseSink
432436
from praisonaiagents.trace.context_events import (
433-
ContextTraceEmitter, set_context_emitter
437+
ContextTraceEmitter, set_context_emitter, get_context_emitter
434438
)
439+
440+
# Reuse existing emitter if already configured
441+
existing = get_context_emitter()
442+
if existing and existing.enabled:
443+
self._observability_setup_done = True
444+
return
445+
435446
# Add flow metadata for trace correlation
436447
metadata = {
437448
"praisonai_source": "langflow",
@@ -442,6 +453,7 @@ def _setup_observability(self) -> None:
442453
sink = LangfuseSink(metadata=metadata)
443454
emitter = ContextTraceEmitter(sink=sink, enabled=True)
444455
set_context_emitter(emitter)
456+
self._observability_setup_done = True
445457
except ImportError:
446458
pass # Langfuse not installed, gracefully degrade
447459

src/praisonai/praisonai/flow/components/PraisonAI/praisonai_agents.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,14 +328,25 @@ def build_response(self) -> Message:
328328

329329
def _setup_observability(self) -> None:
330330
"""Auto-configure observability from environment variables."""
331+
# Skip if already set up
332+
if getattr(self, "_observability_setup_done", False):
333+
return
334+
331335
import os
332336
observe = os.environ.get("PRAISONAI_OBSERVE", "")
333337
if observe == "langfuse":
334338
try:
335339
from praisonai.observability import LangfuseSink
336340
from praisonaiagents.trace.context_events import (
337-
ContextTraceEmitter, set_context_emitter
341+
ContextTraceEmitter, set_context_emitter, get_context_emitter
338342
)
343+
344+
# Reuse existing emitter if already configured
345+
existing = get_context_emitter()
346+
if existing and existing.enabled:
347+
self._observability_setup_done = True
348+
return
349+
339350
# Add flow metadata for trace correlation
340351
metadata = {
341352
"praisonai_source": "langflow",
@@ -347,6 +358,7 @@ def _setup_observability(self) -> None:
347358
sink = LangfuseSink(metadata=metadata)
348359
emitter = ContextTraceEmitter(sink=sink, enabled=True)
349360
set_context_emitter(emitter)
361+
self._observability_setup_done = True
350362
except ImportError:
351363
pass # Langfuse not installed, gracefully degrade
352364

src/praisonai/praisonai/flow/converter.py

Lines changed: 19 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,17 @@ def convert(self, yaml_path: str) -> Dict[str, Any]:
5050
workflow = parser.parse_file(yaml_path)
5151

5252
# Extract metadata from parsed YAML for direct access
53-
yaml_path = Path(yaml_path)
54-
with open(yaml_path, 'r') as f:
53+
yaml_file = Path(yaml_path)
54+
with open(yaml_file, 'r') as f:
5555
import yaml
5656
raw_yaml = yaml.safe_load(f)
5757

5858
# Convert workflow to Langflow nodes
5959
nodes, edges = self._convert_workflow_to_nodes(workflow, raw_yaml)
6060

6161
# Generate flow metadata
62-
flow_name = workflow.name or yaml_path.stem
63-
flow_description = getattr(workflow, 'description', f'Converted from {yaml_path.name}')
62+
flow_name = workflow.name or yaml_file.stem
63+
flow_description = getattr(workflow, 'description', f'Converted from {yaml_file.name}')
6464

6565
# Build Langflow JSON structure
6666
langflow_json = {
@@ -96,6 +96,14 @@ def _convert_workflow_to_nodes(self, workflow, raw_yaml: Dict) -> Tuple[List[Dic
9696
# Convert roles to agents format
9797
agents_data = self._convert_roles_to_agents(raw_yaml['roles'])
9898

99+
# Convert agents_data to dict format if it's a list (common PraisonAI format)
100+
if isinstance(agents_data, list):
101+
agents_dict = {}
102+
for i, agent in enumerate(agents_data):
103+
agent_name = agent.get('name', f'agent_{i}')
104+
agents_dict[agent_name] = agent
105+
agents_data = agents_dict
106+
99107
# Create agent nodes
100108
agent_nodes = []
101109
for agent_id, agent_config in agents_data.items():
@@ -469,85 +477,6 @@ def _is_non_default_value(self, field: str, value: Any) -> bool:
469477
return bool(value) # Include non-empty values for other fields
470478

471479

472-
class LangflowAPIClient:
473-
"""Client for Langflow REST API operations."""
474-
475-
def __init__(self, base_url: str = "http://localhost:7860"):
476-
"""Initialize client with Langflow server URL."""
477-
self.base_url = base_url.rstrip("/")
478-
479-
if not requests:
480-
raise ImportError("requests is required for Langflow API operations. Install with: pip install requests")
481-
482-
def upload_flow(self, flow_json: Dict[str, Any]) -> Dict[str, Any]:
483-
"""
484-
Upload a flow to Langflow.
485-
486-
Args:
487-
flow_json: Langflow JSON flow definition
488-
489-
Returns:
490-
API response with flow ID
491-
"""
492-
url = f"{self.base_url}/api/v1/flows/upload/"
493-
494-
try:
495-
response = requests.post(url, json=flow_json, timeout=30)
496-
response.raise_for_status()
497-
return response.json()
498-
except requests.RequestException as e:
499-
raise ConnectionError(f"Failed to upload flow to Langflow: {e}")
500-
501-
def download_flow(self, flow_id: str) -> Dict[str, Any]:
502-
"""
503-
Download a flow from Langflow.
504-
505-
Args:
506-
flow_id: UUID of the flow to download
507-
508-
Returns:
509-
Langflow JSON flow definition
510-
"""
511-
url = f"{self.base_url}/api/v1/flows/{flow_id}"
512-
513-
try:
514-
response = requests.get(url, timeout=30)
515-
response.raise_for_status()
516-
return response.json()
517-
except requests.RequestException as e:
518-
raise ConnectionError(f"Failed to download flow from Langflow: {e}")
519-
520-
def list_flows(self) -> List[Dict[str, Any]]:
521-
"""
522-
List all flows in Langflow.
523-
524-
Returns:
525-
List of flow metadata
526-
"""
527-
url = f"{self.base_url}/api/v1/flows/"
528-
529-
try:
530-
response = requests.get(url, timeout=30)
531-
response.raise_for_status()
532-
return response.json().get("flows", [])
533-
except requests.RequestException as e:
534-
raise ConnectionError(f"Failed to list flows from Langflow: {e}")
535-
536-
def health_check(self) -> bool:
537-
"""
538-
Check if Langflow server is running.
539-
540-
Returns:
541-
True if server is healthy
542-
"""
543-
url = f"{self.base_url}/api/v1/health"
544-
545-
try:
546-
response = requests.get(url, timeout=10)
547-
return response.status_code == 200
548-
except requests.RequestException:
549-
return False
550-
551480

552481
def yaml_to_langflow_json(yaml_path: str) -> Dict[str, Any]:
553482
"""
@@ -588,11 +517,13 @@ def export_yaml_to_langflow(yaml_path: str, langflow_url: str = "http://localhos
588517
Returns:
589518
Flow ID of uploaded flow
590519
"""
520+
from praisonai.flow.client import LangflowClient
521+
591522
# Convert YAML to Langflow JSON
592523
flow_json = yaml_to_langflow_json(yaml_path)
593524

594525
# Upload to Langflow
595-
client = LangflowAPIClient(langflow_url)
526+
client = LangflowClient(langflow_url)
596527
response = client.upload_flow(flow_json)
597528

598529
return response.get("id", response.get("flow_id", ""))
@@ -611,9 +542,11 @@ def import_langflow_to_yaml(flow_id: str, output_path: Optional[str] = None,
611542
Returns:
612543
YAML string (or path if saved to file)
613544
"""
545+
from praisonai.flow.client import LangflowClient
546+
614547
# Download from Langflow
615-
client = LangflowAPIClient(langflow_url)
616-
flow_json = client.download_flow(flow_id)
548+
client = LangflowClient(langflow_url)
549+
flow_json = client.get_flow(flow_id)
617550

618551
# Convert to YAML
619552
yaml_content = langflow_json_to_yaml(flow_json)

0 commit comments

Comments
 (0)