Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Use a slim Python base image
FROM python:3.10-slim

# Set working directory inside container
WORKDIR /app

FROM python:3.10-slim

WORKDIR /workspace

# Copy all project files
COPY . /workspace

# Install dependencies
RUN pip install --upgrade pip && pip install -r requirements.txt

# Expose Jupyter port
EXPOSE 8888

# Run Jupyter when container starts
CMD ["bash", "./run_jupyter.sh"]
Original file line number Diff line number Diff line change
@@ -1,19 +1,228 @@
<!-- toc -->
# Real-Time Bitcoin Price Analysis Using Amazon EMR

This project demonstrates a real-time data processing pipeline that collects Bitcoin price data from a public API, stores it in Amazon S3, and processes it using Apache Spark on Amazon EMR for time-series analysis.

---

<!-- tocstop -->
## Technologies Used

- Author: <your name>
- Date: <date>
- **CoinGecko API** – Fetching live Bitcoin price in USD
- **Python** – Core scripting language
- **Boto3** – AWS SDK to interact with Amazon S3
- **Amazon S3** – For storing raw and processed data
- **Apache Spark (Structured Streaming)** – For 1-minute windowed aggregation
- **Amazon EMR** – Cluster to run Spark jobs at scale
- **Docker** – Containerized environment for portability and reproducibility

<Describe all the files in the projects>
---

This project contains the following files
## Project Structure

- `template`.API.ipynb: a notebook describing the native API of <Package>
- `template`.API.md: a description of the native API of <Package>
- `template`.API.py: code for using API of <Package>
- `template`.example.ipynb: a notebook implementing a project using <Package>
- `template`.example.md: a markdown description of the project
- `template`.example.py: code for implementing the project
| File/Folder | Description |
|-------------|-------------|
| `bitcoin_producer.py` | Fetches real-time Bitcoin prices and writes records to S3 (`data_v2/`) |
| `bitcoin_streaming_consumer_emr_debug.py` | Spark job to compute 1-min windowed average from S3 and write to `output/` |
| `bitcoin_kafka/bitcoin_emr_utils.py` | Helper functions for API fetching, timestamping, and S3 upload |
| `bitcoin_emr.API.ipynb` | Demonstrates utility API functions (with simulated S3 upload fallback) |
| `bitcoin_emr.example.ipynb` | Simulates full pipeline with producer input and EMR output |
| `bitcoin_emr.API.md` | Markdown documenting the API and helper layer |
| `bitcoin_emr.example.md` | Markdown documenting the full pipeline example |
| `requirements.txt` | Python package requirements |
| `Dockerfile` + `*.sh` | Docker setup and run scripts |

---

## Output Format

### Input Record (stored in S3)

```json
{
"timestamp": "2025-05-17T09:58:00",
"price_usd": 102723.12
}
```

### Processed Output (via Spark on EMR)

```json
{
"window": {
"start": "2025-05-17T09:58:00",
"end": "2025-05-17T09:59:00"
},
"avg_price": 102750.13
}
```

---

## AWS Credentials Note

This project uses `boto3` to upload Bitcoin price records to Amazon S3.

If valid AWS credentials are present, records will be uploaded to:

```text
s3://bitcoin-price-streaming-data/data_v2/
```

⚠️ If credentials are not present, the upload will be skipped gracefully, and the JSON record will be printed instead.

This ensures the notebooks run end-to-end even without AWS setup.

---

## Docker Setup Instructions

You can run this project entirely in Docker without installing any local dependencies.

### To Build the Image

```bash
bash docker_build.sh
```

### To Run the Container

```bash
bash docker_bash.sh
```

### Open Jupyter

Once the container is running, open your browser and go to:

```text
http://localhost:8888
```

---

### Notebooks to Run

- `bitcoin_emr.API.ipynb` – Test API functions, simulate S3 upload
- `bitcoin_emr.example.ipynb` – Simulate full pipeline input + output
- Corresponding Markdown Documentation:
- `bitcoin_emr.API.md`
- `bitcoin_emr.example.md`

Both notebooks run without requiring cloud setup.

---

## Running the Spark Job on Amazon EMR (Optional)

To run the Spark job (`bitcoin_streaming_consumer_emr_debug.py`) on an actual Amazon EMR cluster and process the real-time Bitcoin data stored in S3:

### 1. Upload Input Data

Ensure the producer script or notebook has pushed data to:

```text
s3://bitcoin-price-streaming-data/data_v2/
```

This folder should contain timestamped `.json` records with the following structure:

```json
{
"timestamp": "YYYY-MM-DDTHH:MM:SS",
"price_usd": FLOAT
}
```

---

### 2. Launch and Configure EMR Cluster

Navigate to the [EMR Console](https://console.aws.amazon.com/elasticmapreduce/) and create a cluster with the following configurations:

#### Software Configuration

- **Release version**: EMR 6.x (e.g., 6.13.0)
- **Applications**: Spark (uncheck others if not needed)

#### Hardware Configuration

- **Instance type**: `m5.xlarge` (for both Master and Core)
- **Core nodes**: At least 1
- **Auto-termination**: Enable if needed to save costs

#### General Configuration

- **Cluster name**: `bitcoin-emr-cluster`
- **Logging**: Enable and set an S3 log path (e.g., `s3://your-bucket/emr-logs/`)
- **EC2 key pair**: Select a key pair for SSH access (optional but recommended)

#### Networking

- **VPC**: Use the default or a custom one with public subnet
- **Permissions**:
- Use a service role with `AmazonS3FullAccess` and `AmazonEMRFullAccessPolicy_v2`
- Ensure the EC2 instance profile also has access to S3

---

### 3. Submit the Spark Job

You can submit the job in one of two ways:

#### (a) Add a Step from the Console

- Upload `bitcoin_streaming_consumer_emr_debug.py` to S3 (e.g., `s3://your-bucket/scripts/`)
- In the cluster’s "Steps" tab, add a new step:
- **Type**: Spark
- **Name**: `Run Bitcoin Streaming Job`
- **Script location**:
```bash
s3://your-bucket/scripts/bitcoin_streaming_consumer_emr_debug.py
```
- **Arguments**: Leave blank

#### (b) SSH and Run Manually

1. SSH into the master node:
```bash
ssh -i your-key.pem hadoop@<master-node-public-dns>
```

2. Run the script using:
```bash
spark-submit --deploy-mode cluster --master yarn s3://your-bucket/scripts/bitcoin_streaming_consumer_emr_debug.py
```

---

### 4. Output Location

After execution, check the results in your S3 bucket:

```text
s3://bitcoin-price-streaming-data/output/
```

Each file contains windowed average price data over 1-minute intervals in JSON format.

---

### 📝 Tip

To reduce costs:
- Use **auto-termination** after job completion
- Always **terminate idle clusters**
- Monitor logs in `emr-logs/` for errors or debug output

---

## Summary

- Docker runs the entire project with zero setup
- AWS and EMR usage is optional but supported
- Notebooks simulate output if cloud access is unavailable
- Fully reproducible for grading or real deployment

---

**Author:** Rithika Baskaran
**Course:** DATA605 — Spring 2025
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StringType, DoubleType

# 1. Start Spark session
spark = SparkSession.builder \
.appName("BitcoinDebugConsole") \
.getOrCreate()

spark.sparkContext.setLogLevel("INFO")

# 2. Define schema
schema = StructType() \
.add("timestamp", StringType()) \
.add("price_usd", DoubleType())

# 3. Read from S3 (simulated stream)
df_stream = spark.readStream \
.schema(schema) \
.option("maxFilesPerTrigger", 1) \
.json("s3://bitcoin-price-streaming-data/data/")

# 4. Parse timestamp column
df_stream = df_stream.withColumn("timestamp", to_timestamp("timestamp"))

# 5. DEBUG: Print raw parsed data to stdout (instead of saving to S3)
query = df_stream.writeStream \
.format("console") \
.outputMode("append") \
.option("truncate", False) \
.trigger(once=True) \
.start()

query.awaitTermination()
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, DoubleType
from pyspark.sql.functions import to_timestamp

spark = SparkSession.builder.appName("BitcoinReadTest").getOrCreate()
spark.sparkContext.setLogLevel("INFO")

print("✅ Spark session started")

schema = StructType() \
.add("timestamp", StringType()) \
.add("price_usd", DoubleType())

print("📥 Reading from S3...")

df = spark.read.schema(schema).json("s3://bitcoin-price-streaming-data/data_v2/")

print("🕒 Parsing timestamp...")

df = df.withColumn("timestamp", to_timestamp("timestamp"))

print("📊 Showing data...")

df.show(truncate=False)

print("✅ Done.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Bitcoin EMR Utility API Tutorial

## Table of Contents
- [Introduction](#introduction)
- [Native API Used](#native-api-used)
- [Our Utility Layer](#our-utility-layer)
- [Key Functions](#key-functions)
- [fetch_bitcoin_price()](#fetchbitcoin_price)
- [get_current_timestamp()](#get_current_timestamp)
- [save_price_to_s3()](#save_price_to_s3)
- [Example Usage](#example-usage)
- [Conclusion](#conclusion)

---

## Introduction

This tutorial explains the utility layer built for fetching real-time Bitcoin prices from a public API and storing the results in Amazon S3. These utility functions are used by producer scripts and Jupyter notebooks to simulate real-time data pipelines.

---

## Native API Used

- **CoinGecko API**: Used to fetch the current price of Bitcoin in USD.
- **Boto3 (AWS SDK for Python)**: Used to programmatically write JSON files to an S3 bucket.

---

## Our Utility Layer

All helper logic is implemented in `bitcoin_emr_utils.py`, which serves as the foundation for producer scripts and Spark-based processing.

---

## Key Functions

### fetch_bitcoin_price()

- Calls the CoinGecko API.
- Returns the current Bitcoin price in USD.
- Raises an exception if the response is not valid.

### get_current_timestamp()

- Returns the current timestamp in ISO 8601 format with timezone.
- Used to timestamp each Bitcoin price record.

### save_price_to_s3(bucket, folder, filename_prefix="price", price_usd=None)

- Constructs a record using the current timestamp and given price.
- Uploads the JSON object to the given S3 bucket/folder.
- Automatically fetches the price if `price_usd` is not provided.

---

## Example Usage

```python
from bitcoin_emr_utils import fetch_bitcoin_price, save_price_to_s3

price = fetch_bitcoin_price()
save_price_to_s3(bucket='bitcoin-price-streaming-data', folder='data_v2', price_usd=price)

Loading