Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 144 additions & 25 deletions app/core/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import os
import shutil
import argparse
from typing import Optional
import configparser
from typing import Optional, List
from pathlib import Path

from dotenv import load_dotenv
from langsmith import Client
from pathlib import Path
from langchain_community.chat_models import ChatOpenAI, ChatLiteLLM

from app.core.workflow.langraph_workflow import create_workflow, process_workflow
from app.core.session import create_user_session, initialize_session_context
from app.core.utils import IntRange, setup_logger
import configparser
from langchain_community.chat_models import ChatOpenAI, ChatLiteLLM
from app.core.questions import standard_questions


Expand All @@ -32,6 +36,14 @@
}


class SessionFilePreparationError(ValueError):
"""Raised when CLI input files cannot be staged into the session directory."""

def __init__(self, source_path: Path, message: str):
super().__init__(message)
self.source_path = source_path


def get_api_key(provider: str) -> Optional[str]:
"""
Get API key for specified provider from environment variables.
Expand Down Expand Up @@ -198,52 +210,159 @@ def langsmith_setup() -> Optional[Client]:
return None


def _prepare_session_files(session_id: str, file_paths: List[str]) -> Path:
"""
Copy user-supplied local files into the session's input directory so that
the FILE_ANALYZER tool can discover them at runtime.

Args:
session_id: Active session identifier.
file_paths: List of local file paths provided via the CLI.

Returns:
Path to the session input directory.

Raises:
SessionFilePreparationError: If any supplied path cannot be staged safely.
"""
input_dir = create_user_session(session_id, input_dir=True)
staged_destinations: dict[Path, Path] = {}

for raw_path in file_paths:
src = Path(raw_path).expanduser().resolve(strict=False)
if not src.exists():
raise SessionFilePreparationError(src, f"File not found: {src}")
if not src.is_file():
raise SessionFilePreparationError(src, f"Input path is not a file: {src}")

dest = input_dir / src.name
previous_src = staged_destinations.get(dest)
if previous_src is not None:
if previous_src == src:
raise SessionFilePreparationError(
src,
f"Input file was provided more than once: {src}",
)
raise SessionFilePreparationError(
src,
(
f"Cannot stage '{src}' because it would overwrite '{previous_src}' in "
f"the session input directory. Rename one of the files or choose a different path."
),
)

if src == dest.resolve(strict=False):
raise SessionFilePreparationError(
src,
f"Input file is already staged in the session directory: {src}",
)

if dest.exists():
raise SessionFilePreparationError(
src,
f"Cannot stage '{src}' because destination '{dest}' already exists.",
)

try:
shutil.copy2(str(src), str(dest))
except (shutil.SameFileError, OSError) as exc:
raise SessionFilePreparationError(
src,
f"Failed to stage '{src}' into '{dest}': {exc}",
) from exc

staged_destinations[dest] = src
logger.info(f"Copied '{src}' -> '{dest}'")
Comment thread
qodo-code-review[bot] marked this conversation as resolved.
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

return input_dir


def main():
"""Main function to run the workflow."""
# Define command line arguments

parser = argparse.ArgumentParser(description="Process a workflow with a predefined question number.")
parser.add_argument('-q', '--question', type=int, choices=IntRange(1, len(standard_questions)),
help=f"Choose a standard question number from 1 to {len(standard_questions)}.")
parser.add_argument('-c', '--custom', type=str,
help="Provide a custom question.")
parser.add_argument('-e', '--evaluation', action='store_true',
help="Enable evaluation mode")
parser.add_argument('--api-key', type=str,
help="OpenAI API key (optional, defaults to environment variable)")
parser.add_argument('--endpoint', type=str,
help="Knowledge graph endpoint URL (optional)")
"""
CLI entry-point for running the MetaboT workflow.

Usage examples:
python -m app.core.main -q 1
python -m app.core.main -c "Describe my dataset" -f data.csv
python -m app.core.main -c "Compare files" -f file1.csv file2.tsv
"""
parser = argparse.ArgumentParser(
description="Process a workflow with a predefined question number."
)
parser.add_argument(
'-q', '--question', type=int,
choices=IntRange(1, len(standard_questions)),
help=f"Choose a standard question number from 1 to {len(standard_questions)}.",
)
parser.add_argument(
'-c', '--custom', type=str,
help="Provide a custom question.",
)
parser.add_argument(
'-f', '--file', type=str, nargs='+',
help="One or more local file paths to make available for the FILE_ANALYZER tool.",
)
parser.add_argument(
'-e', '--evaluation', action='store_true',
help="Enable evaluation mode.",
)
parser.add_argument(
'--api-key', type=str,
help="OpenAI API key (optional, defaults to environment variable).",
)
parser.add_argument(
'--endpoint', type=str,
help="Knowledge graph endpoint URL (optional).",
)

args = parser.parse_args()

# Resolve the question
if args.question:
question = standard_questions[args.question - 1]
elif args.custom:
question = args.custom
else:
print("You must provide either a standard question number or a custom question.")
print("You must provide either a standard question number (-q) or a custom question (-c).")
return

# Create a user session (mirrors the Streamlit session lifecycle) and
# reconfigure the logger so subsequent CLI logs land in the session file.
session_id = create_user_session()
initialize_session_context(session_id)
global logger
logger = setup_logger(__name__)

# Initialize LangSmith if available
langsmith_client = langsmith_setup()
langsmith_setup()

# Get endpoint URL from arguments or environment
# Resolve endpoint URL
endpoint_url = (
args.endpoint
or os.environ.get("KG_ENDPOINT_URL")
or "https://enpkg.commons-lab.org/graphdb/repositories/ENPKG"
)
models = llm_creation()

# Initialize language models
models = llm_creation(api_key=args.api_key)
Comment on lines +346 to +347
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Wrong api key reused 🐞 Bug ≡ Correctness

main() now forwards --api-key into llm_creation(), but llm_creation keeps a single api_key value and
applies it to every configured provider section. In a multi-provider params.ini (OpenAI + deepseek +
ovh), deepseek/ovh models will incorrectly receive the OpenAI key (or the first resolved key),
causing authentication failures.
Agent Prompt
### Issue description
`llm_creation()` reuses a single `api_key` value across all config sections. After this PR, `main()` passes `--api-key` into `llm_creation()`, so deepseek/ovh models can receive the OpenAI key and fail auth.

### Issue Context
`app/config/params.ini` contains OpenAI sections plus `deepseek_*` and `ovh_*` sections. Each provider needs its own key (from provider-specific env vars), while the CLI `--api-key` is described as an OpenAI key.

### Fix Focus Areas
- app/core/main.py[109-166]
- app/core/main.py[346-348]

### Suggested fix approach
- In `llm_creation()`, do **not** mutate/reuse the incoming `api_key` parameter across sections.
- Introduce a per-section variable, e.g. `section_api_key`:
  - If `provider == "openai"` and a CLI `api_key` is provided, use it.
  - Otherwise, resolve via `get_api_key(provider)` for that section.
- Use `section_api_key` when setting `model_params[...]`.
- (Optional) If you want CLI support for deepseek/ovh keys too, add explicit CLI flags rather than overloading `--api-key`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


# Stage user-provided files into the session's input directory
if args.file:
try:
_prepare_session_files(session_id, args.file)
except SessionFilePreparationError as exc:
logger.error(str(exc))
print(f"Error: {exc}")
return

try:
# Create and process workflow
workflow = create_workflow(
models=models,
session_id=session_id,
endpoint_url=endpoint_url,
evaluation=False,
api_key=args.api_key
api_key=args.api_key,
)

process_workflow(workflow, question)

except Exception as e:
Expand Down
172 changes: 172 additions & 0 deletions app/core/tests/test_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from __future__ import annotations

import sys
from pathlib import Path

import app.core.main as main_module


class DummyLogger:
def __init__(self):
self.info_messages = []
self.error_messages = []

def info(self, message, *args):
if args:
message = message % args
self.info_messages.append(message)

def error(self, message, *args):
if args:
message = message % args
self.error_messages.append(message)


def test_prepare_session_files_copies_valid_file(tmp_path, monkeypatch):
input_dir = tmp_path / "input_files"
input_dir.mkdir()
source_file = tmp_path / "source.csv"
source_file.write_text("a,b\n1,2\n", encoding="utf-8")
dummy_logger = DummyLogger()
input_dir_path = input_dir

monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path)
monkeypatch.setattr(main_module, "logger", dummy_logger)

staged_dir = main_module._prepare_session_files("session-id", [str(source_file)])

copied_file = input_dir / source_file.name
assert staged_dir == input_dir
assert copied_file.read_text(encoding="utf-8") == source_file.read_text(encoding="utf-8")


def test_prepare_session_files_rejects_directory(tmp_path, monkeypatch):
input_dir = tmp_path / "input_files"
input_dir.mkdir()
source_dir = tmp_path / "source_dir"
source_dir.mkdir()
input_dir_path = input_dir

monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path)

try:
main_module._prepare_session_files("session-id", [str(source_dir)])
assert False, "Expected SessionFilePreparationError"
except main_module.SessionFilePreparationError as exc:
assert "not a file" in str(exc)


def test_prepare_session_files_rejects_colliding_basenames(tmp_path, monkeypatch):
input_dir = tmp_path / "input_files"
input_dir.mkdir()
first_dir = tmp_path / "first"
second_dir = tmp_path / "second"
first_dir.mkdir()
second_dir.mkdir()
first_file = first_dir / "results.csv"
second_file = second_dir / "results.csv"
first_file.write_text("a,b\n1,2\n", encoding="utf-8")
second_file.write_text("a,b\n3,4\n", encoding="utf-8")
dummy_logger = DummyLogger()
input_dir_path = input_dir

monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path)
monkeypatch.setattr(main_module, "logger", dummy_logger)

try:
main_module._prepare_session_files("session-id", [str(first_file), str(second_file)])
assert False, "Expected SessionFilePreparationError"
except main_module.SessionFilePreparationError as exc:
assert "would overwrite" in str(exc)


def test_prepare_session_files_rejects_same_file_destination(tmp_path, monkeypatch):
input_dir = tmp_path / "input_files"
input_dir.mkdir()
staged_file = input_dir / "already_staged.csv"
staged_file.write_text("a,b\n1,2\n", encoding="utf-8")
input_dir_path = input_dir

monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path)

try:
main_module._prepare_session_files("session-id", [str(staged_file)])
assert False, "Expected SessionFilePreparationError"
except main_module.SessionFilePreparationError as exc:
assert "already staged" in str(exc)


def test_main_passes_cli_api_key_and_reconfigures_logger(monkeypatch):
original_argv = sys.argv[:]
state = {"session_initialized": False, "setup_logger_states": []}
captured = {}

def fake_setup_logger(name):
state["setup_logger_states"].append(state["session_initialized"])
return DummyLogger()

def fake_initialize_session_context(session_id):
state["session_initialized"] = True

def fake_llm_creation(api_key=None, params_file=None):
captured["llm_api_key"] = api_key
return {"llm_o": object()}

def fake_create_workflow(models, session_id=None, endpoint_url=None, evaluation=False, api_key=None):
captured["workflow_api_key"] = api_key
captured["session_id"] = session_id
return "workflow"

def fake_process_workflow(workflow, question):
captured["workflow"] = workflow
captured["question"] = question

monkeypatch.setattr(sys, "argv", ["prog", "-c", "hello", "--api-key", "cli-key"])
monkeypatch.setattr(main_module, "logger", DummyLogger())
monkeypatch.setattr(main_module, "setup_logger", fake_setup_logger)
monkeypatch.setattr(main_module, "initialize_session_context", fake_initialize_session_context)
monkeypatch.setattr(main_module, "create_user_session", lambda session_id=None, user_session_dir=False, input_dir=False: "session-123")
monkeypatch.setattr(main_module, "langsmith_setup", lambda: None)
monkeypatch.setattr(main_module, "llm_creation", fake_llm_creation)
monkeypatch.setattr(main_module, "create_workflow", fake_create_workflow)
monkeypatch.setattr(main_module, "process_workflow", fake_process_workflow)

try:
main_module.main()
finally:
monkeypatch.setattr(sys, "argv", original_argv)

assert captured["llm_api_key"] == "cli-key"
assert captured["workflow_api_key"] == "cli-key"
assert state["setup_logger_states"] == [True]
assert captured["session_id"] == "session-123"
assert captured["workflow"] == "workflow"
assert captured["question"] == "hello"


def test_main_prints_user_friendly_error_for_bad_staged_file(monkeypatch, capsys):
original_argv = sys.argv[:]

monkeypatch.setattr(sys, "argv", ["prog", "-c", "hello", "-f", "/missing/file.csv"])
monkeypatch.setattr(main_module, "logger", DummyLogger())
monkeypatch.setattr(main_module, "setup_logger", lambda name: DummyLogger())
monkeypatch.setattr(main_module, "initialize_session_context", lambda session_id: None)
monkeypatch.setattr(main_module, "create_user_session", lambda session_id=None, user_session_dir=False, input_dir=False: "session-123")
monkeypatch.setattr(main_module, "langsmith_setup", lambda: None)
monkeypatch.setattr(main_module, "llm_creation", lambda api_key=None, params_file=None: {"llm_o": object()})
monkeypatch.setattr(
main_module,
"_prepare_session_files",
lambda session_id, file_paths: (_ for _ in ()).throw(
main_module.SessionFilePreparationError(Path(file_paths[0]), f"File not found: {file_paths[0]}")
),
)
monkeypatch.setattr(main_module, "create_workflow", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("workflow should not run")))

try:
main_module.main()
finally:
monkeypatch.setattr(sys, "argv", original_argv)

captured = capsys.readouterr()
assert "Error: File not found: /missing/file.csv" in captured.out
Loading