-
Notifications
You must be signed in to change notification settings - Fork 2
add CLI file input support for FILE_ANALYZER tool when running locally #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,17 @@ | ||
| import os | ||
| import shutil | ||
| import argparse | ||
| from typing import Optional | ||
| import configparser | ||
| from typing import Optional, List | ||
| from pathlib import Path | ||
|
|
||
| from dotenv import load_dotenv | ||
| from langsmith import Client | ||
| from pathlib import Path | ||
| from langchain_community.chat_models import ChatOpenAI, ChatLiteLLM | ||
|
|
||
| from app.core.workflow.langraph_workflow import create_workflow, process_workflow | ||
| from app.core.session import create_user_session, initialize_session_context | ||
| from app.core.utils import IntRange, setup_logger | ||
| import configparser | ||
| from langchain_community.chat_models import ChatOpenAI, ChatLiteLLM | ||
| from app.core.questions import standard_questions | ||
|
|
||
|
|
||
|
|
@@ -32,6 +36,14 @@ | |
| } | ||
|
|
||
|
|
||
| class SessionFilePreparationError(ValueError): | ||
| """Raised when CLI input files cannot be staged into the session directory.""" | ||
|
|
||
| def __init__(self, source_path: Path, message: str): | ||
| super().__init__(message) | ||
| self.source_path = source_path | ||
|
|
||
|
|
||
| def get_api_key(provider: str) -> Optional[str]: | ||
| """ | ||
| Get API key for specified provider from environment variables. | ||
|
|
@@ -198,52 +210,159 @@ def langsmith_setup() -> Optional[Client]: | |
| return None | ||
|
|
||
|
|
||
| def _prepare_session_files(session_id: str, file_paths: List[str]) -> Path: | ||
| """ | ||
| Copy user-supplied local files into the session's input directory so that | ||
| the FILE_ANALYZER tool can discover them at runtime. | ||
|
|
||
| Args: | ||
| session_id: Active session identifier. | ||
| file_paths: List of local file paths provided via the CLI. | ||
|
|
||
| Returns: | ||
| Path to the session input directory. | ||
|
|
||
| Raises: | ||
| SessionFilePreparationError: If any supplied path cannot be staged safely. | ||
| """ | ||
| input_dir = create_user_session(session_id, input_dir=True) | ||
| staged_destinations: dict[Path, Path] = {} | ||
|
|
||
| for raw_path in file_paths: | ||
| src = Path(raw_path).expanduser().resolve(strict=False) | ||
| if not src.exists(): | ||
| raise SessionFilePreparationError(src, f"File not found: {src}") | ||
| if not src.is_file(): | ||
| raise SessionFilePreparationError(src, f"Input path is not a file: {src}") | ||
|
|
||
| dest = input_dir / src.name | ||
| previous_src = staged_destinations.get(dest) | ||
| if previous_src is not None: | ||
| if previous_src == src: | ||
| raise SessionFilePreparationError( | ||
| src, | ||
| f"Input file was provided more than once: {src}", | ||
| ) | ||
| raise SessionFilePreparationError( | ||
| src, | ||
| ( | ||
| f"Cannot stage '{src}' because it would overwrite '{previous_src}' in " | ||
| f"the session input directory. Rename one of the files or choose a different path." | ||
| ), | ||
| ) | ||
|
|
||
| if src == dest.resolve(strict=False): | ||
| raise SessionFilePreparationError( | ||
| src, | ||
| f"Input file is already staged in the session directory: {src}", | ||
| ) | ||
|
|
||
| if dest.exists(): | ||
| raise SessionFilePreparationError( | ||
| src, | ||
| f"Cannot stage '{src}' because destination '{dest}' already exists.", | ||
| ) | ||
|
|
||
| try: | ||
| shutil.copy2(str(src), str(dest)) | ||
| except (shutil.SameFileError, OSError) as exc: | ||
| raise SessionFilePreparationError( | ||
| src, | ||
| f"Failed to stage '{src}' into '{dest}': {exc}", | ||
| ) from exc | ||
|
|
||
| staged_destinations[dest] = src | ||
| logger.info(f"Copied '{src}' -> '{dest}'") | ||
|
qodo-code-review[bot] marked this conversation as resolved.
|
||
|
|
||
| return input_dir | ||
|
|
||
|
|
||
| def main(): | ||
| """Main function to run the workflow.""" | ||
| # Define command line arguments | ||
|
|
||
| parser = argparse.ArgumentParser(description="Process a workflow with a predefined question number.") | ||
| parser.add_argument('-q', '--question', type=int, choices=IntRange(1, len(standard_questions)), | ||
| help=f"Choose a standard question number from 1 to {len(standard_questions)}.") | ||
| parser.add_argument('-c', '--custom', type=str, | ||
| help="Provide a custom question.") | ||
| parser.add_argument('-e', '--evaluation', action='store_true', | ||
| help="Enable evaluation mode") | ||
| parser.add_argument('--api-key', type=str, | ||
| help="OpenAI API key (optional, defaults to environment variable)") | ||
| parser.add_argument('--endpoint', type=str, | ||
| help="Knowledge graph endpoint URL (optional)") | ||
| """ | ||
| CLI entry-point for running the MetaboT workflow. | ||
|
|
||
| Usage examples: | ||
| python -m app.core.main -q 1 | ||
| python -m app.core.main -c "Describe my dataset" -f data.csv | ||
| python -m app.core.main -c "Compare files" -f file1.csv file2.tsv | ||
| """ | ||
| parser = argparse.ArgumentParser( | ||
| description="Process a workflow with a predefined question number." | ||
| ) | ||
| parser.add_argument( | ||
| '-q', '--question', type=int, | ||
| choices=IntRange(1, len(standard_questions)), | ||
| help=f"Choose a standard question number from 1 to {len(standard_questions)}.", | ||
| ) | ||
| parser.add_argument( | ||
| '-c', '--custom', type=str, | ||
| help="Provide a custom question.", | ||
| ) | ||
| parser.add_argument( | ||
| '-f', '--file', type=str, nargs='+', | ||
| help="One or more local file paths to make available for the FILE_ANALYZER tool.", | ||
| ) | ||
| parser.add_argument( | ||
| '-e', '--evaluation', action='store_true', | ||
| help="Enable evaluation mode.", | ||
| ) | ||
| parser.add_argument( | ||
| '--api-key', type=str, | ||
| help="OpenAI API key (optional, defaults to environment variable).", | ||
| ) | ||
| parser.add_argument( | ||
| '--endpoint', type=str, | ||
| help="Knowledge graph endpoint URL (optional).", | ||
| ) | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| # Resolve the question | ||
| if args.question: | ||
| question = standard_questions[args.question - 1] | ||
| elif args.custom: | ||
| question = args.custom | ||
| else: | ||
| print("You must provide either a standard question number or a custom question.") | ||
| print("You must provide either a standard question number (-q) or a custom question (-c).") | ||
| return | ||
|
|
||
| # Create a user session (mirrors the Streamlit session lifecycle) and | ||
| # reconfigure the logger so subsequent CLI logs land in the session file. | ||
| session_id = create_user_session() | ||
| initialize_session_context(session_id) | ||
| global logger | ||
| logger = setup_logger(__name__) | ||
|
|
||
| # Initialize LangSmith if available | ||
| langsmith_client = langsmith_setup() | ||
| langsmith_setup() | ||
|
|
||
| # Get endpoint URL from arguments or environment | ||
| # Resolve endpoint URL | ||
| endpoint_url = ( | ||
| args.endpoint | ||
| or os.environ.get("KG_ENDPOINT_URL") | ||
| or "https://enpkg.commons-lab.org/graphdb/repositories/ENPKG" | ||
| ) | ||
| models = llm_creation() | ||
|
|
||
| # Initialize language models | ||
| models = llm_creation(api_key=args.api_key) | ||
|
Comment on lines
+346
to
+347
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1. Wrong api key reused main() now forwards --api-key into llm_creation(), but llm_creation keeps a single api_key value and applies it to every configured provider section. In a multi-provider params.ini (OpenAI + deepseek + ovh), deepseek/ovh models will incorrectly receive the OpenAI key (or the first resolved key), causing authentication failures. Agent Prompt
|
||
|
|
||
| # Stage user-provided files into the session's input directory | ||
| if args.file: | ||
| try: | ||
| _prepare_session_files(session_id, args.file) | ||
| except SessionFilePreparationError as exc: | ||
| logger.error(str(exc)) | ||
| print(f"Error: {exc}") | ||
| return | ||
|
|
||
| try: | ||
| # Create and process workflow | ||
| workflow = create_workflow( | ||
| models=models, | ||
| session_id=session_id, | ||
| endpoint_url=endpoint_url, | ||
| evaluation=False, | ||
| api_key=args.api_key | ||
| api_key=args.api_key, | ||
| ) | ||
|
|
||
| process_workflow(workflow, question) | ||
|
|
||
| except Exception as e: | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import sys | ||
| from pathlib import Path | ||
|
|
||
| import app.core.main as main_module | ||
|
|
||
|
|
||
| class DummyLogger: | ||
| def __init__(self): | ||
| self.info_messages = [] | ||
| self.error_messages = [] | ||
|
|
||
| def info(self, message, *args): | ||
| if args: | ||
| message = message % args | ||
| self.info_messages.append(message) | ||
|
|
||
| def error(self, message, *args): | ||
| if args: | ||
| message = message % args | ||
| self.error_messages.append(message) | ||
|
|
||
|
|
||
| def test_prepare_session_files_copies_valid_file(tmp_path, monkeypatch): | ||
| input_dir = tmp_path / "input_files" | ||
| input_dir.mkdir() | ||
| source_file = tmp_path / "source.csv" | ||
| source_file.write_text("a,b\n1,2\n", encoding="utf-8") | ||
| dummy_logger = DummyLogger() | ||
| input_dir_path = input_dir | ||
|
|
||
| monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) | ||
| monkeypatch.setattr(main_module, "logger", dummy_logger) | ||
|
|
||
| staged_dir = main_module._prepare_session_files("session-id", [str(source_file)]) | ||
|
|
||
| copied_file = input_dir / source_file.name | ||
| assert staged_dir == input_dir | ||
| assert copied_file.read_text(encoding="utf-8") == source_file.read_text(encoding="utf-8") | ||
|
|
||
|
|
||
| def test_prepare_session_files_rejects_directory(tmp_path, monkeypatch): | ||
| input_dir = tmp_path / "input_files" | ||
| input_dir.mkdir() | ||
| source_dir = tmp_path / "source_dir" | ||
| source_dir.mkdir() | ||
| input_dir_path = input_dir | ||
|
|
||
| monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) | ||
|
|
||
| try: | ||
| main_module._prepare_session_files("session-id", [str(source_dir)]) | ||
| assert False, "Expected SessionFilePreparationError" | ||
| except main_module.SessionFilePreparationError as exc: | ||
| assert "not a file" in str(exc) | ||
|
|
||
|
|
||
| def test_prepare_session_files_rejects_colliding_basenames(tmp_path, monkeypatch): | ||
| input_dir = tmp_path / "input_files" | ||
| input_dir.mkdir() | ||
| first_dir = tmp_path / "first" | ||
| second_dir = tmp_path / "second" | ||
| first_dir.mkdir() | ||
| second_dir.mkdir() | ||
| first_file = first_dir / "results.csv" | ||
| second_file = second_dir / "results.csv" | ||
| first_file.write_text("a,b\n1,2\n", encoding="utf-8") | ||
| second_file.write_text("a,b\n3,4\n", encoding="utf-8") | ||
| dummy_logger = DummyLogger() | ||
| input_dir_path = input_dir | ||
|
|
||
| monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) | ||
| monkeypatch.setattr(main_module, "logger", dummy_logger) | ||
|
|
||
| try: | ||
| main_module._prepare_session_files("session-id", [str(first_file), str(second_file)]) | ||
| assert False, "Expected SessionFilePreparationError" | ||
| except main_module.SessionFilePreparationError as exc: | ||
| assert "would overwrite" in str(exc) | ||
|
|
||
|
|
||
| def test_prepare_session_files_rejects_same_file_destination(tmp_path, monkeypatch): | ||
| input_dir = tmp_path / "input_files" | ||
| input_dir.mkdir() | ||
| staged_file = input_dir / "already_staged.csv" | ||
| staged_file.write_text("a,b\n1,2\n", encoding="utf-8") | ||
| input_dir_path = input_dir | ||
|
|
||
| monkeypatch.setattr(main_module, "create_user_session", lambda session_id, input_dir=False: input_dir_path) | ||
|
|
||
| try: | ||
| main_module._prepare_session_files("session-id", [str(staged_file)]) | ||
| assert False, "Expected SessionFilePreparationError" | ||
| except main_module.SessionFilePreparationError as exc: | ||
| assert "already staged" in str(exc) | ||
|
|
||
|
|
||
| def test_main_passes_cli_api_key_and_reconfigures_logger(monkeypatch): | ||
| original_argv = sys.argv[:] | ||
| state = {"session_initialized": False, "setup_logger_states": []} | ||
| captured = {} | ||
|
|
||
| def fake_setup_logger(name): | ||
| state["setup_logger_states"].append(state["session_initialized"]) | ||
| return DummyLogger() | ||
|
|
||
| def fake_initialize_session_context(session_id): | ||
| state["session_initialized"] = True | ||
|
|
||
| def fake_llm_creation(api_key=None, params_file=None): | ||
| captured["llm_api_key"] = api_key | ||
| return {"llm_o": object()} | ||
|
|
||
| def fake_create_workflow(models, session_id=None, endpoint_url=None, evaluation=False, api_key=None): | ||
| captured["workflow_api_key"] = api_key | ||
| captured["session_id"] = session_id | ||
| return "workflow" | ||
|
|
||
| def fake_process_workflow(workflow, question): | ||
| captured["workflow"] = workflow | ||
| captured["question"] = question | ||
|
|
||
| monkeypatch.setattr(sys, "argv", ["prog", "-c", "hello", "--api-key", "cli-key"]) | ||
| monkeypatch.setattr(main_module, "logger", DummyLogger()) | ||
| monkeypatch.setattr(main_module, "setup_logger", fake_setup_logger) | ||
| monkeypatch.setattr(main_module, "initialize_session_context", fake_initialize_session_context) | ||
| monkeypatch.setattr(main_module, "create_user_session", lambda session_id=None, user_session_dir=False, input_dir=False: "session-123") | ||
| monkeypatch.setattr(main_module, "langsmith_setup", lambda: None) | ||
| monkeypatch.setattr(main_module, "llm_creation", fake_llm_creation) | ||
| monkeypatch.setattr(main_module, "create_workflow", fake_create_workflow) | ||
| monkeypatch.setattr(main_module, "process_workflow", fake_process_workflow) | ||
|
|
||
| try: | ||
| main_module.main() | ||
| finally: | ||
| monkeypatch.setattr(sys, "argv", original_argv) | ||
|
|
||
| assert captured["llm_api_key"] == "cli-key" | ||
| assert captured["workflow_api_key"] == "cli-key" | ||
| assert state["setup_logger_states"] == [True] | ||
| assert captured["session_id"] == "session-123" | ||
| assert captured["workflow"] == "workflow" | ||
| assert captured["question"] == "hello" | ||
|
|
||
|
|
||
| def test_main_prints_user_friendly_error_for_bad_staged_file(monkeypatch, capsys): | ||
| original_argv = sys.argv[:] | ||
|
|
||
| monkeypatch.setattr(sys, "argv", ["prog", "-c", "hello", "-f", "/missing/file.csv"]) | ||
| monkeypatch.setattr(main_module, "logger", DummyLogger()) | ||
| monkeypatch.setattr(main_module, "setup_logger", lambda name: DummyLogger()) | ||
| monkeypatch.setattr(main_module, "initialize_session_context", lambda session_id: None) | ||
| monkeypatch.setattr(main_module, "create_user_session", lambda session_id=None, user_session_dir=False, input_dir=False: "session-123") | ||
| monkeypatch.setattr(main_module, "langsmith_setup", lambda: None) | ||
| monkeypatch.setattr(main_module, "llm_creation", lambda api_key=None, params_file=None: {"llm_o": object()}) | ||
| monkeypatch.setattr( | ||
| main_module, | ||
| "_prepare_session_files", | ||
| lambda session_id, file_paths: (_ for _ in ()).throw( | ||
| main_module.SessionFilePreparationError(Path(file_paths[0]), f"File not found: {file_paths[0]}") | ||
| ), | ||
| ) | ||
| monkeypatch.setattr(main_module, "create_workflow", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("workflow should not run"))) | ||
|
|
||
| try: | ||
| main_module.main() | ||
| finally: | ||
| monkeypatch.setattr(sys, "argv", original_argv) | ||
|
|
||
| captured = capsys.readouterr() | ||
| assert "Error: File not found: /missing/file.csv" in captured.out |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.