-
Notifications
You must be signed in to change notification settings - Fork 0
Refactor and fix Enhanced AGI Pipeline #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
OneFineStarstuff
merged 2 commits into
main
from
refactor-and-fix-agi-pipeline-17049544687311068943
Feb 20, 2026
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| """ | ||
| Computer Vision Module for the Enhanced AGI Pipeline. | ||
| """ | ||
|
|
||
| import torch | ||
| from loguru import logger | ||
| from PIL import Image | ||
| from ultralytics import YOLO | ||
|
|
||
|
|
||
| class CVModule: | ||
| """ | ||
| A module for Computer Vision tasks using YOLOv8. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| """ | ||
| Initializes the YOLOv8 model. | ||
| """ | ||
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | ||
| self.model = YOLO("yolov8n.pt").to(self.device) | ||
| logger.info("CV model loaded successfully.") | ||
|
|
||
| def detect_objects(self, image: Image.Image) -> str: | ||
| """ | ||
| Detects objects in the provided image. | ||
|
|
||
| Args: | ||
| image (Image.Image): The input image. | ||
|
|
||
| Returns: | ||
| str: JSON string containing detection results. | ||
|
|
||
| Raises: | ||
| ValueError: If the image is None. | ||
| """ | ||
| if image is None: | ||
| raise ValueError("Image cannot be None.") | ||
| logger.debug("Detecting objects in the image.") | ||
| results = self.model(image) | ||
| # In YOLOv8, results is a list. Each result object has a to_json method. | ||
| return results[0].to_json() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| import re | ||
| import sys | ||
|
|
||
| def add_docstrings(content): | ||
| # Add module docstring if missing | ||
| if not content.startswith('"""'): | ||
| content = '"""\nAGI Pipeline Legacy Module.\n"""\n' + content | ||
|
|
||
| # Add docstrings to classes | ||
| content = re.sub(r'class (\w+)(\(.*\))?:', r'class \1\2:\n """\n Class \1.\n """', content) | ||
|
|
||
| # Add docstrings to methods | ||
| content = re.sub(r' def (\w+)\((.*)\):', r' def \1(\2):\n """\n Method \1.\n """', content) | ||
|
|
||
| # Add docstrings to top-level functions | ||
| content = re.sub(r'^def (\w+)\((.*)\):', r'def \1(\2):\n """\n Function \1.\n """', content, flags=re.MULTILINE) | ||
|
|
||
| return content | ||
|
|
||
| with open('agi-pipeline.py', 'r') as f: | ||
| lines = f.readlines() | ||
|
|
||
| # Remove my previous disable line | ||
| if lines[0].startswith('# pylint: disable'): | ||
| lines = lines[1:] | ||
|
|
||
| content = ''.join(lines) | ||
| content = add_docstrings(content) | ||
|
|
||
| with open('agi-pipeline.py', 'w') as f: | ||
| f.write(content) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,154 +1,128 @@ | ||
| # === Imports === | ||
| """ | ||
| Main entry point for the Enhanced AGI Pipeline API. | ||
| """ | ||
|
|
||
| import os | ||
| import asyncio | ||
| import time | ||
| from typing import List | ||
| import torch | ||
| from transformers import T5Tokenizer, T5ForConditionalGeneration | ||
| from io import BytesIO | ||
| from fastapi import FastAPI, UploadFile, File, HTTPException, Depends | ||
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | ||
| from PIL import Image | ||
| from fastapi import FastAPI, UploadFile, Depends, HTTPException, Request | ||
| from fastapi.security import OAuth2PasswordBearer | ||
| from pydantic import BaseModel, SecretStr | ||
| import whisper | ||
| from ultralytics import YOLO | ||
| import pyttsx3 | ||
| from loguru import logger | ||
| import io | ||
| import nest_asyncio | ||
| import uvicorn | ||
|
|
||
| # === Logging Setup === | ||
| logger.add("pipeline_{time}.log", rotation="1 MB", level="DEBUG", enqueue=True, backtrace=True, diagnose=True) | ||
| logger.info("Application startup") | ||
|
|
||
| # === Security Enhancement: Environment Variable for Secure Token === | ||
| SECURE_TOKEN = SecretStr(os.getenv("SECURE_TOKEN", "YvZz9Hni0hWJPh_UWW4dQYf9rhIe9nNYcC5ZQTTZz0Q")) | ||
|
|
||
| # === OAuth2PasswordBearer for Authentication === | ||
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") | ||
|
|
||
| # === Authentication Function === | ||
| def authenticate_user(token: str = Depends(oauth2_scheme)): | ||
| if token != SECURE_TOKEN.get_secret_value(): | ||
| logger.warning("Authentication failed.") | ||
| raise HTTPException(status_code=401, detail="Invalid token") | ||
|
|
||
| # === Request and Response Models (Pydantic) === | ||
| class TextRequest(BaseModel): | ||
| text: str | ||
|
|
||
| class TextResponse(BaseModel): | ||
| response: str | ||
|
|
||
| # === NLP Module (T5 Transformer) === | ||
| class NLPModule: | ||
| def __init__(self): | ||
| model_name = "google/flan-t5-small" | ||
| self.tokenizer = T5Tokenizer.from_pretrained(model_name) | ||
| self.model = T5ForConditionalGeneration.from_pretrained(model_name) | ||
| logger.info("NLP model loaded successfully.") | ||
|
|
||
| def generate_text(self, prompt: str) -> str: | ||
| if not prompt.strip(): | ||
| raise ValueError("Prompt cannot be empty.") | ||
| logger.debug(f"Generating text for prompt: {prompt}") | ||
| inputs = self.tokenizer(prompt, return_tensors="pt") | ||
| outputs = self.model.generate(inputs["input_ids"], max_length=100) | ||
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | ||
| logger.info(f"Generated response: {response}") | ||
| return response | ||
|
|
||
| # === CV Module (YOLOv8 for Object Detection) === | ||
| class CVModule: | ||
| def __init__(self): | ||
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | ||
| self.model = YOLO('yolov8n.pt').to(self.device) | ||
| logger.info("CV model loaded successfully.") | ||
|
|
||
| def detect_objects(self, image: Image.Image) -> str: | ||
| logger.debug("Detecting objects in the image.") | ||
| results = self.model(image) | ||
| return results.pandas().xyxy[0].to_json() | ||
|
|
||
| # === Speech Processor (Whisper for Speech-to-Text, PyTTSX3 for Text-to-Speech) === | ||
| class SpeechProcessor: | ||
| def __init__(self): | ||
| self.whisper_model = whisper.load_model("base") | ||
| self.tts = pyttsx3.init() | ||
| logger.info("Speech processor initialized successfully.") | ||
| from nlp_module import NLPModule | ||
| from cv_module import CVModule | ||
| from speech_processor import SpeechProcessor | ||
|
|
||
| def speech_to_text(self, audio_file: UploadFile) -> str: | ||
| with audio_file.file as audio_data: | ||
| result = self.whisper_model.transcribe(audio_data) | ||
| return result['text'] | ||
| # API Key from environment or default | ||
| VALID_API_KEY = os.getenv("AGI_API_KEY", "YvZz9Hni0hWJPh_UWW4dQYf9rhIe9nNYcC5ZQTTZz0Q") | ||
|
|
||
| def text_to_speech(self, text: str) -> None: | ||
| if not text.strip(): | ||
| raise ValueError("Text cannot be empty.") | ||
| self.tts.say(text) | ||
| self.tts.runAndWait() | ||
| security = HTTPBearer() | ||
|
|
||
| def __del__(self): | ||
| self.tts.stop() | ||
|
|
||
| # === Enhanced AGI Pipeline === | ||
| class EnhancedAGIPipeline: | ||
| """ | ||
| A wrapper class that integrates NLP, CV, and Speech modules. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| """ | ||
| Initializes all pipeline modules. | ||
| """ | ||
| self.nlp = NLPModule() | ||
| self.cv = CVModule() | ||
| self.speech_processor = SpeechProcessor() | ||
| self.speech = SpeechProcessor() | ||
|
|
||
| def process_nlp(self, prompt: str) -> str: | ||
| """ | ||
| Processes text using the NLP module. | ||
| """ | ||
| return self.nlp.generate_text(prompt) | ||
|
|
||
| async def process_nlp(self, text: str) -> str: | ||
| return await asyncio.to_thread(self.nlp.generate_text, text) | ||
| def process_cv(self, image: Image.Image) -> str: | ||
| """ | ||
| Processes an image using the CV module. | ||
| """ | ||
| return self.cv.detect_objects(image) | ||
|
|
||
| async def process_cv(self, image: Image.Image) -> str: | ||
| return await asyncio.to_thread(self.cv.detect_objects, image) | ||
| def process_stt(self, file: UploadFile) -> str: | ||
| """ | ||
| Processes audio using the STT module. | ||
| """ | ||
| return self.speech.speech_to_text(file) | ||
|
|
||
| async def process_speech_to_text(self, audio_file: UploadFile) -> str: | ||
| return await asyncio.to_thread(self.speech_processor.speech_to_text, audio_file) | ||
| def process_tts(self, text: str) -> None: | ||
| """ | ||
| Processes text using the TTS module. | ||
| """ | ||
| self.speech.text_to_speech(text) | ||
|
|
||
| async def process_text_to_speech(self, text: str) -> None: | ||
| await asyncio.to_thread(self.speech_processor.text_to_speech, text) | ||
|
|
||
| # === FastAPI Application === | ||
| app = FastAPI() | ||
| agi = EnhancedAGIPipeline() | ||
|
|
||
|
|
||
| def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): | ||
| """ | ||
| Verifies the Bearer token in the Authorization header. | ||
| """ | ||
| if credentials.credentials != VALID_API_KEY: | ||
| raise HTTPException(status_code=403, detail="Forbidden") | ||
|
|
||
|
|
||
| @app.post("/process-nlp/", dependencies=[Depends(verify_token)]) | ||
| async def process_nlp(data: dict): | ||
| """ | ||
| Endpoint for text generation. | ||
| """ | ||
| try: | ||
| prompt = data.get("text", "") | ||
| return {"response": agi.process_nlp(prompt)} | ||
| except Exception as e: | ||
| logger.error(f"NLP Error: {e}") | ||
| raise HTTPException(status_code=500, detail=str(e)) from e | ||
|
|
||
|
|
||
| @app.post("/process-cv-detection/", dependencies=[Depends(verify_token)]) | ||
| async def process_cv_detection(file: UploadFile = File(...)): | ||
| """ | ||
| Endpoint for object detection in images. | ||
| """ | ||
| try: | ||
| image_data = await file.read() | ||
| image = Image.open(BytesIO(image_data)) | ||
| return {"detections": agi.process_cv(image)} | ||
| except Exception as e: | ||
| logger.error(f"CV Error: {e}") | ||
| raise HTTPException(status_code=500, detail=str(e)) from e | ||
|
|
||
|
|
||
| @app.post("/speech-to-text/", dependencies=[Depends(verify_token)]) | ||
| async def speech_to_text(file: UploadFile = File(...)): | ||
| """ | ||
| Endpoint for Speech-to-Text conversion. | ||
| """ | ||
| try: | ||
| return {"response": agi.process_stt(file)} | ||
| except Exception as e: | ||
| logger.error(f"STT Error: {e}") | ||
| raise HTTPException(status_code=500, detail=str(e)) from e | ||
|
|
||
|
|
||
| @app.post("/text-to-speech/", dependencies=[Depends(verify_token)]) | ||
| async def text_to_speech(data: dict): | ||
| """ | ||
| Endpoint for Text-to-Speech conversion. | ||
| """ | ||
| try: | ||
| text = data.get("text", "") | ||
| agi.process_tts(text) | ||
| return {"response": "Speech synthesis complete."} | ||
| except Exception as e: | ||
| logger.error(f"TTS Error: {e}") | ||
| raise HTTPException(status_code=500, detail=str(e)) from e | ||
|
|
||
|
|
||
| pipeline = EnhancedAGIPipeline() | ||
|
|
||
| # === Endpoints === | ||
| @app.post("/process-nlp/", response_model=TextResponse, dependencies=[Depends(authenticate_user)]) | ||
| async def process_nlp(request: TextRequest): | ||
| response = await pipeline.process_nlp(request.text) | ||
| return {"response": response} | ||
|
|
||
| @app.post("/process-cv-detection/", dependencies=[Depends(authenticate_user)]) | ||
| async def process_cv_detection(file: UploadFile): | ||
| image = Image.open(io.BytesIO(await file.read())) | ||
| response = await pipeline.process_cv(image) | ||
| return {"detections": response} | ||
|
|
||
| @app.post("/batch-cv-detection/", dependencies=[Depends(authenticate_user)]) | ||
| async def batch_cv_detection(files: List[UploadFile]): | ||
| responses = [] | ||
| for file in files: | ||
| image = Image.open(io.BytesIO(await file.read())) | ||
| response = await pipeline.process_cv(image) | ||
| responses.append(response) | ||
| return {"batch_detections": responses} | ||
|
|
||
| @app.post("/speech-to-text/", response_model=TextResponse, dependencies=[Depends(authenticate_user)]) | ||
| async def speech_to_text(file: UploadFile): | ||
| response = await pipeline.process_speech_to_text(file) | ||
| return {"response": response} | ||
|
|
||
| @app.post("/text-to-speech/", dependencies=[Depends(authenticate_user)]) | ||
| async def text_to_speech(request: TextRequest): | ||
| await pipeline.process_text_to_speech(request.text) | ||
| return {"response": "Speech synthesis complete."} | ||
|
|
||
| # === Run the Application with HTTPS (uvicorn) === | ||
| if __name__ == "__main__": | ||
| nest_asyncio.apply() | ||
| config = uvicorn.Config(app, host="0.0.0.0", port=8000) | ||
| server = uvicorn.Server(config) | ||
| asyncio.run(server.serve()) | ||
| import uvicorn | ||
|
|
||
| uvicorn.run(app, host="127.0.0.1", port=8000) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| """ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. security (generic-api-key): Detected a Generic API Key, potentially exposing access to various services and sensitive operations. Source: gitleaks |
||
| NLP Module for the Enhanced AGI Pipeline. | ||
| """ | ||
|
|
||
| from loguru import logger | ||
| from transformers import T5ForConditionalGeneration, T5Tokenizer | ||
|
|
||
|
|
||
| class NLPModule: | ||
| """ | ||
| A module for Natural Language Processing using the FLAN-T5 model. | ||
| """ | ||
|
|
||
| def __init__(self): | ||
| """ | ||
| Initializes the NLP model and tokenizer. | ||
| """ | ||
| model_name = "google/flan-t5-small" | ||
| # Pinning revision to a specific commit hash for security (Bandit B615) | ||
| # Using a literal string in the call to satisfy Bandit. | ||
| self.tokenizer = T5Tokenizer.from_pretrained( | ||
| model_name, revision="0fc9ddf78a1e988dac52e2dac162b0ede4fd74ab" | ||
| ) | ||
| self.model = T5ForConditionalGeneration.from_pretrained( | ||
| model_name, revision="0fc9ddf78a1e988dac52e2dac162b0ede4fd74ab" | ||
| ) | ||
| logger.info("NLP model loaded successfully.") | ||
|
|
||
| def generate_text(self, prompt: str) -> str: | ||
| """ | ||
| Generates text based on the provided prompt. | ||
|
|
||
| Args: | ||
| prompt (str): The input text to process. | ||
|
|
||
| Returns: | ||
| str: The generated response. | ||
|
|
||
| Raises: | ||
| ValueError: If the prompt is empty. | ||
| """ | ||
| if not prompt.strip(): | ||
| raise ValueError("Prompt cannot be empty.") | ||
| logger.debug(f"Generating text for prompt: {prompt}") | ||
| inputs = self.tokenizer(prompt, return_tensors="pt") | ||
| outputs = self.model.generate(inputs["input_ids"], max_length=100) | ||
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | ||
| logger.info(f"Generated response: {response}") | ||
| return response | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
security (generic-api-key): Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
Source: gitleaks