Skip to content

Commit 3a3b5a4

Browse files
committed
Refactor Project Aether: Modular architecture, simplified logic, and documentation cleanup
1 parent c804157 commit 3a3b5a4

33 files changed

Lines changed: 613 additions & 1098 deletions

.env.example

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
1+
# Core API Keys
12
OPENAI_API_KEY=your_openai_api_key_here
3+
4+
# Qdrant Vector Store
25
QDRANT_URL=http://localhost:6333
36
QDRANT_API_KEY=your_qdrant_api_key_here
4-
POSTGRES_USER=postgres
5-
POSTGRES_PASSWORD=postgres
6-
POSTGRES_DB=project_aether
7-
POSTGRES_HOST=localhost
8-
POSTGRES_PORT=5432
7+
8+
# Redis Semantic Cache
9+
REDIS_HOST=localhost
10+
REDIS_PORT=6379
11+
SEMANTIC_CACHE_THRESHOLD=0.85
12+
13+
# Observability (Optional - Arize Phoenix)
914
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
15+
16+
# Application Settings
1017
LOG_LEVEL=INFO
18+
DATA_DIR=./data

.github/workflows/ci.yml

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,35 @@
1-
name: Continuous Integration
1+
name: Project Aether CI
22

3-
on: [push, pull_request]
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
48

59
jobs:
610
test:
711
runs-on: ubuntu-latest
812

9-
services:
10-
redis:
11-
image: redis/redis-stack-server:latest
12-
ports:
13-
- 6379:6379
14-
qdrant:
15-
image: qdrant/qdrant:latest
16-
ports:
17-
- 6333:6333
18-
1913
steps:
20-
- uses: actions/checkout@v4
21-
22-
- name: Set up Python
23-
uses: actions/setup-python@v5
24-
with:
25-
python-version: '3.11'
26-
cache: 'pip'
27-
28-
- name: Install dependencies
29-
run: |
30-
python -m pip install --upgrade pip
31-
pip install -r requirements.txt
32-
pip install pytest pytest-asyncio pytest-mock
33-
pip install -e .
34-
35-
- name: Run Tests
36-
env:
37-
OPENAI_API_KEY: sk-fake-key-for-testing
38-
REDIS_HOST: localhost
39-
QDRANT_HOST: localhost
40-
run: pytest tests/
14+
- uses: actions/checkout@v3
15+
16+
- name: Set up Python 3.11
17+
uses: actions/setup-python@v4
18+
with:
19+
python-version: "3.11"
20+
21+
- name: Install dependencies
22+
run: |
23+
python -m pip install --upgrade pip
24+
pip install -r requirements.txt
25+
26+
- name: Run tests with pytest
27+
env:
28+
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
29+
REDIS_HOST: localhost
30+
REDIS_PORT: 6379
31+
QDRANT_URL: http://localhost:6333
32+
LOG_LEVEL: INFO
33+
DATA_DIR: ./tests/data
34+
run: |
35+
pytest tests/

README.md

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,80 @@
1-
# Project Aether: Event-Driven RAG Engine
1+
# Project Aether: RAG Pipeline with Event-Driven Workflows
22

3-
![Python](https://img.shields.io/badge/python-3.11+-blue.svg)
4-
![LlamaIndex](https://img.shields.io/badge/framework-LlamaIndex-orange.svg)
5-
![License](https://img.shields.io/badge/license-Apache--2.0-green.svg)
3+
## Overview
4+
Project Aether is a Retrieval-Augmented Generation (RAG) system built with Python and LlamaIndex. It implements a document ingestion and retrieval pipeline using an event-driven architecture (Workflows) to handle complex tasks like query transformation, metadata enrichment, and semantic caching.
65

7-
**Author:** Gabriel (Gabaoun) Penha
6+
The project is designed as a modular reference for building RAG applications that require more than simple linear processing, incorporating retries, asynchronous operations, and a clear separation of concerns.
87

9-
> *A highly resilient, event-driven Retrieval-Augmented Generation (RAG) engine optimizing semantic search latency by 80% while ensuring robust PII masking and enterprise-grade reliability.*
8+
## Features
9+
- **Event-Driven Ingestion:** Processes documents through a series of discrete steps (Loading -> PII Masking -> Semantic Splitting -> Enrichment -> Indexing).
10+
- **Advanced Retrieval:** Implements HyDE (Hypothetical Document Embeddings), query refinement loops, and relevance judgment (Chain-of-Thought) before generating answers.
11+
- **Semantic Caching:** Uses Redis to store and retrieve previously generated answers for identical or highly similar queries to reduce LLM latency and cost.
12+
- **PII Masking:** Basic regex-based masking of emails and phone numbers during the ingestion phase.
13+
- **Resiliency:** Uses `tenacity` for exponential backoff retries on LLM and database operations.
14+
- **Memory Efficiency:** Uses Python generators during document splitting to handle larger datasets without high memory consumption.
1015

11-
Project Aether is a world-class reference implementation of a complex RAG system. By shifting from standard linear pipelines to LlamaIndex Workflows, it introduces cycles, streaming, and robust failure recovery natively into the ingestion and retrieval processes.
16+
## Tech Stack
17+
- **Language:** Python 3.11+
18+
- **Orchestration:** LlamaIndex (Workflows)
19+
- **Vector Database:** Qdrant
20+
- **Cache:** Redis
21+
- **LLM:** OpenAI (GPT-4o, GPT-4o-mini)
22+
- **Embeddings:** HuggingFace (BGE models)
23+
- **Configuration:** Pydantic Settings
1224

13-
## 🌟 Key Features
25+
## Key Technical Points
26+
- **Modular Refactoring:** Logic is split into `core` (business logic), `services` (external integrations), `pipeline` (workflow orchestration), and `models` (data structures).
27+
- **Asynchronous Execution:** Heavy use of `asyncio` for non-blocking I/O, particularly in PII masking and LLM calls.
28+
- **Custom Splitter:** Implements a `SemanticDoubleMergingSplitter` which performs an initial semantic split and then merges small chunks that fall below a minimum size threshold.
1429

15-
* **Event-Driven Workflows:** Employs LlamaIndex `Workflow` and `Event` classes to orchestrate query decomposition, HyDE, and Chain-of-Thought (CoT) relevance judgments with self-correction loops.
16-
* **Semantic Caching (Redis):** Caches query vectors via HNSW indices, intercepting recurrent queries to deliver sub-100ms response times and drastically reduce LLM API costs.
17-
* **Enterprise Governance:** Integrates an asynchronous Microsoft Presidio masking layer to strip Personally Identifiable Information (PII) before documents ever hit the vector database.
18-
* **Resilient Infrastructure:** Bulletproofed with `tenacity` for exponential backoff on all critical third-party I/O (LLMs, Qdrant).
19-
* **Memory-Optimized Ingestion:** Implements a custom `SemanticDoubleMergingSplitter` leveraging Python Generators to process massive document sets without memory bloat.
30+
## Design Decisions
31+
- **LlamaIndex Workflows over Pipelines:** Chosen to allow for non-linear logic, such as the query refinement loop in the retrieval workflow which can re-run if initial results are deemed irrelevant.
32+
- **BGE-Reranker:** Integrated to improve precision by re-evaluating the top retrieved nodes using a cross-encoder model.
33+
- **Strict Typing:** All major functions and classes use Python type hints for better maintainability and error detection.
2034

21-
## 📈 Benchmarks
35+
## Limitations
36+
- **Regex-based PII:** The current PII masker uses basic regular expressions and is not a substitute for a production-grade NER (Named Entity Recognition) system.
37+
- **Simplified Semantic Cache:** The current implementation uses exact string matching in Redis for the cache keys rather than true vector-based similarity search.
38+
- **Single Collection:** Currently hardcoded to use a single Qdrant collection for all documents.
2239

23-
| Metric | Basic RAG | Project Aether | Impact |
24-
|--------|-----------|----------------|--------|
25-
| **Faithfulness (Hallucination Rate)** | 62% | **88%** | ⬇️ HyDE & CoT Evaluation |
26-
| **Answer Relevance** | 70% | **92%** | ⬆️ BGE-Reranker & Reordering |
27-
| **Context Precision** | 55% | **85%** | ⬆️ Semantic Chunking Generators |
28-
| **Avg. Latency (P95)** | 5.2s | **0.8s** | ⚡ Semantic Cache (80% Hit Rate) |
29-
30-
## 🛠 Architecture Decision Records (ADR)
31-
We maintain a robust architecture history. See the `docs/adr/` directory for detailed reasoning on our stack:
32-
- [ADR 001: Native Vector Search on Redis](docs/adr/ADR-001-Native-Vector-Search-Redis.md)
33-
- [ADR 002: LlamaIndex Workflows for Event-Driven RAG](docs/adr/002-LlamaIndex-Workflows-for-Event-Driven-RAG.md)
34-
- [ADR 003: Semantic Chunking Strategy](docs/adr/003-Semantic-Chunking-Strategy.md)
35-
36-
## 🚀 Getting Started
40+
## Getting Started
3741

3842
### Prerequisites
39-
- Docker & Docker Compose
40-
- Python 3.11+ (Uses `async/await` heavily)
43+
- Docker and Docker Compose
44+
- Python 3.11+
4145
- OpenAI API Key
4246

4347
### Installation
44-
1. Clone the repository and navigate to the directory:
48+
1. Clone the repository:
4549
```bash
46-
git clone https://github.com/gabaoun/Project-Aether.git
47-
cd Project-Aether
50+
git clone https://github.com/your-username/Project-Aether.git
51+
cd Project-Aether
4852
```
4953
2. Install dependencies:
5054
```bash
5155
pip install -r requirements.txt
5256
```
53-
3. Environment Setup:
57+
3. Setup environment variables:
5458
```bash
5559
cp .env.example .env
56-
# Add your OPENAI_API_KEY to .env
60+
# Edit .env with your OpenAI API Key and other settings
5761
```
58-
4. Start the infrastructure (Qdrant, Postgres, Redis):
62+
4. Start infrastructure:
5963
```bash
6064
docker-compose up -d
6165
```
6266

6367
### Usage
64-
Execute the main application to start ingestion (if `./data` is populated) and the interactive retrieval loop:
68+
Ensure you have documents in the `./data` directory (as specified in your `.env`), then run:
6569
```bash
6670
python main.py
6771
```
6872

69-
### Testing
70-
Run the comprehensive test suite:
73+
## Testing
74+
Run the test suite using pytest:
7175
```bash
7276
pytest tests/
7377
```
74-
## ⚖️ License
75-
Distributed under the Apache 2.0 License. See `LICENSE` for more information.
78+
79+
## Purpose
80+
This project was developed to demonstrate a technically sound approach to building RAG systems. It focuses on clean architecture, error handling, and implementing advanced RAG patterns in a way that is maintainable and extensible.

docker-compose.yml

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,7 @@ services:
88
- "6333:6333"
99
- "6334:6334"
1010
volumes:
11-
- ./qdrant_data:/qdrant/storage
12-
networks:
13-
- aether_network
14-
15-
postgres:
16-
image: ankane/pgvector:latest
17-
container_name: postgres
18-
environment:
19-
POSTGRES_USER: postgres
20-
POSTGRES_PASSWORD: postgres
21-
POSTGRES_DB: project_aether
22-
ports:
23-
- "5432:5432"
24-
volumes:
25-
- postgres_data:/var/lib/postgresql/data
11+
- qdrant_data:/qdrant/storage
2612
networks:
2713
- aether_network
2814

@@ -42,5 +28,5 @@ networks:
4228
driver: bridge
4329

4430
volumes:
45-
postgres_data:
31+
qdrant_data:
4632
redis_data:

docs/ADR-001-Native-Vector-Search-Redis.md

Lines changed: 0 additions & 25 deletions
This file was deleted.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# ADR 001: Simplified Semantic Caching with Redis
2+
3+
## Status
4+
Accepted
5+
6+
## Context
7+
RAG systems can be slow and expensive if the same queries are sent to the LLM repeatedly. We need a way to cache responses for identical or highly similar queries.
8+
9+
## Decision
10+
We decided to implement a simplified caching layer using Redis as a key-value store.
11+
12+
## Rationale
13+
- **Speed:** Redis provides sub-millisecond lookups for cached answers.
14+
- **Cost Reduction:** Intercepting recurrent queries prevents redundant LLM API calls.
15+
- **Implementation Clarity:** For the current project scope, a key-based lookup (using query strings) provides immediate value without the overhead of managing a separate vector index within Redis.
16+
17+
## Consequences
18+
- **Positive:** Immediate performance gain for repeated queries and reduced costs.
19+
- **Negative:** Current implementation requires exact query matching (or highly similar pre-processed strings) and does not yet leverage true vector-based similarity search in Redis.
20+
21+
## Future Work
22+
In a production-grade system, this could be migrated to utilize Redis Stack's native vector search capabilities (HNSW) to allow for a true semantic cache.

main.py

Lines changed: 33 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,18 @@
1-
"""
2-
Project Aether - Entry Point
3-
Author: Gabriel (Gabaoun) Penha
4-
"""
51
import os
62
import asyncio
7-
import logging
83
from qdrant_client import QdrantClient
9-
from src.processing.ingestion_workflow import IngestionWorkflow
10-
from src.retrieval.retrieval_workflow import RetrievalWorkflow, StreamingStatusEvent
11-
from src.utils.config import settings
4+
from src.pipeline.ingestion import IngestionWorkflow
5+
from src.pipeline.retrieval import RetrievalWorkflow, StreamingStatusEvent
6+
from src.config.settings import settings
7+
from src.utils.logger import logger
128
from llama_index.core import set_global_handler
139

14-
# Setup logging
15-
logging.basicConfig(level=getattr(logging, settings.log_level))
16-
17-
# Setup observability
10+
# Optional Observability
1811
try:
1912
set_global_handler("arize_phoenix", endpoint=settings.phoenix_collector_endpoint)
20-
print(f"Observability enabled via Arize Phoenix at {settings.phoenix_collector_endpoint}")
13+
logger.info(f"Observability enabled at {settings.phoenix_collector_endpoint}")
2114
except ImportError:
22-
print("Arize Phoenix not installed. Skipping observability setup.")
15+
logger.warning("Arize Phoenix not installed. Skipping observability.")
2316

2417
async def main():
2518
qdrant_client = QdrantClient(url=settings.qdrant_url, api_key=settings.qdrant_api_key)
@@ -29,38 +22,39 @@ async def main():
2922

3023
data_dir = settings.data_dir
3124
if os.path.exists(data_dir) and os.listdir(data_dir):
32-
print(f"Starting ingestion for documents in {data_dir}...")
25+
logger.info(f"Starting ingestion from {data_dir}...")
3326
index = await ingestion_wf.run(input_dir=data_dir)
3427
else:
35-
print("Data directory is empty or does not exist. Skipping ingestion.")
28+
logger.error(f"Data directory '{data_dir}' is empty or missing.")
3629
return
3730

3831
retrieval_wf = RetrievalWorkflow(index=index)
3932

4033
while True:
41-
query = input("\nEnter your query (or 'exit' to quit): ")
42-
if query.lower() == 'exit':
43-
break
44-
45-
print(f"\nProcessing query: {query}")
46-
47-
# Stream workflow events
48-
handler = retrieval_wf.run(query=query)
49-
50-
async for event in handler.stream_events():
51-
if isinstance(event, StreamingStatusEvent):
52-
print(f"[Streaming] ⏳ {event.status}")
34+
try:
35+
query = input("\nEnter query (or 'exit'): ")
36+
if query.lower() == 'exit':
37+
break
5338

54-
result = await handler
55-
56-
print("\n--- Answer ---")
57-
if result.get("from_cache"):
58-
print("[✅ RETURNED FROM REDIS CACHE]")
59-
print(result["answer"])
60-
print("\n--- Sources ---")
61-
for node in result.get("source_nodes", []):
62-
file_name = node.metadata.get('file_name', 'Unknown file')
63-
print(f"- {file_name}")
39+
handler = retrieval_wf.run(query=query)
40+
41+
async for event in handler.stream_events():
42+
if isinstance(event, StreamingStatusEvent):
43+
print(f"⏳ {event.status}")
44+
45+
result = await handler
46+
47+
print("\n--- Answer ---")
48+
if result.get("from_cache"):
49+
print("[CACHED]")
50+
print(result["answer"])
51+
print("\n--- Sources ---")
52+
for node in result.get("source_nodes", []):
53+
print(f"- {node.metadata.get('file_name', 'Unknown')}")
54+
except KeyboardInterrupt:
55+
break
56+
except Exception as e:
57+
logger.error(f"Error: {e}")
6458

6559
if __name__ == "__main__":
66-
asyncio.run(main())
60+
asyncio.run(main())

0 commit comments

Comments
 (0)