The Market Data Service is a microservice designed to:
- Fetch real-time market data from Finnhub.
- Cache recent price lookups in Redis to reduce API calls.
- Publish price updates to a Kafka topic.
- Compute 5-point moving averages via a Kafka consumer.
- Persist raw and processed data in PostgreSQL.
- Manage on-demand and scheduled polling jobs.
- Expose RESTful APIs using FastAPI, with structured logging.
- Architecture
- Folder Structure
- Setup Instructions
- Running the Service
- API Documentation
- Testing
- Troubleshooting
- Future Enhancements
-
FastAPI App:
/prices/latest: returns cached or freshly fetched quotes./prices/poll: schedules recurring polls.
-
Redis Cache:
- Stores price responses for 30 seconds by default to minimize external calls.
-
Finnhub Provider: Fetches stock quotes when cache misses.
-
Kafka Producer: Publishes price events to the
price-eventstopic. -
Kafka Consumer:
- Subscribes to
price-events. - Calculates 5-point moving averages.
- Writes averages back to PostgreSQL.
- Subscribes to
-
PostgreSQL:
raw_responsetable for every quote.symbol_averagestable for computed moving averages.poll_jobstable for scheduled polling configurations.
-
Scheduler:
- On startup, re-schedules accepted poll jobs via APScheduler.
- Supports dynamic job creation via API.
-
Logging:
- Structured, leveled logs via Python’s
loggingand Uvicorn debug.
- Structured, leveled logs via Python’s
market-data-service/
├── app/
│ ├── api/ # FastAPI routes (prices.py,poll_job.py)
│ ├── core/ # Config, DI, cache setup (config.py, cache.py, dependencies.py)
│ ├── models/ # SQLAlchemy ORM models
│ ├── schemas/ # Pydantic request/response models
│ ├── services/ # Business logic (provider, producer, consumer, scheduler)
│ └── main.py # FastAPI application entrypoint
├── tests/ # Pytest unit & integration tests
├── docker-compose.yml # Postgres, Zookeeper, Kafka, Redis, Adminer
├── requirements.txt # Python dependencies (including pre-commit, flake8, pytest)
└── .env # Environment variables
-
Clone the repo
git clone <https://github.com/pavansaipendry/Market-Data-Service.git> cd market-data-service
-
Create & activate virtualenv
python3 -m venv .venv source .venv/bin/activate -
Install dependencies
pip install -r requirements.txt
-
Configure environment variables Create a
.envfile in the project root:FINNHUB_API_KEY=your_api_key DATABASE_URL=postgresql://postgres:postgres@db:5432/marketdb REDIS_HOST=redis REDIS_PORT=6379 KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export PYTHONPATH="${PYTHONPATH}:$(pwd)"
uvicorn app.main:app --reload --port 8000 --log-level debugThe API is now available at http://localhost:8000.
Fetch the latest (cached or live) price for a stock symbol.
-
Query Parameters
symbol(string, required): ticker (e.g.,AAPL,MSFT).
-
Response (
200 OK){ "symbol": "AAPL", "price": 172.5, "timestamp": "2025-06-12T14:23:45Z", "provider": "finnhub" } -
Errors
502 Bad Gateway: external API failure.
Create a new polling job.
-
Request Body
{ "symbols": ["AAPL","MSFT"], "interval": 60, "provider": "finnhub" } -
Response (
202 Accepted){ "job_id": "uuid-string", "status": "accepted", "config": { "symbols": ["AAPL","MSFT"], "interval": 60 } }
Run all tests (unit + integration):
pytest tests/ -W ignore::DeprecationWarning-
Key test modules
test_api.py:/prices/latesttest_poll_api.py:/prices/polltest_consumer.py: moving-average logic
-
Lint & formatting
flake8 . black --check . pre-commit run --all-files
-
Port conflicts
lsof -i :8001 kill -9 <PID>
-
Import errors
- Ensure each
app/dir has__init__.py. - Verify
PYTHONPATHincludes project root.
- Ensure each
-
Redis key not found
- Confirm host/port match your
.env. - Use
redis-cli -h localhost -p 6379 -n 0 KEYS 'price:*'.
- Confirm host/port match your
-
Kafka topic missing
- Enable auto topic creation, or create
price-eventsmanually.
- Enable auto topic creation, or create
- Metrics & Monitoring: Prometheus + Grafana dashboards.
- Authentication: API key or OAuth protection.
- Autoscaling: Deploy to Kubernetes or AWS ECS/Fargate.
- Retry & backoff: Robust error handling in the consumer.
- API versioning: Prepare
/v1namespace for breaking changes.`
