Skip to content

Commit a45f2a9

Browse files
TutorTask86_Spring2025_Real_Time_Bitcoin_Price_Analysis_Using_Amazon_EMR_2 (#252)
Co-authored-by: Krishna P Taduri <krishna.pratardan@gmail.com> Co-authored-by: Krishna P Taduri <40231735+tkpratardan@users.noreply.github.com>
1 parent 69059e1 commit a45f2a9

25 files changed

Lines changed: 719 additions & 210 deletions
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Use a slim Python base image
2+
FROM python:3.10-slim
3+
4+
# Set working directory inside container
5+
WORKDIR /app
6+
7+
FROM python:3.10-slim
8+
9+
WORKDIR /workspace
10+
11+
# Copy all project files
12+
COPY . /workspace
13+
14+
# Install dependencies
15+
RUN pip install --upgrade pip && pip install -r requirements.txt
16+
17+
# Expose Jupyter port
18+
EXPOSE 8888
19+
20+
# Run Jupyter when container starts
21+
CMD ["bash", "./run_jupyter.sh"]
Lines changed: 221 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,228 @@
1-
<!-- toc -->
1+
# Real-Time Bitcoin Price Analysis Using Amazon EMR
22

3+
This project demonstrates a real-time data processing pipeline that collects Bitcoin price data from a public API, stores it in Amazon S3, and processes it using Apache Spark on Amazon EMR for time-series analysis.
34

5+
---
46

5-
<!-- tocstop -->
7+
## Technologies Used
68

7-
- Author: <your name>
8-
- Date: <date>
9+
- **CoinGecko API** – Fetching live Bitcoin price in USD
10+
- **Python** – Core scripting language
11+
- **Boto3** – AWS SDK to interact with Amazon S3
12+
- **Amazon S3** – For storing raw and processed data
13+
- **Apache Spark (Structured Streaming)** – For 1-minute windowed aggregation
14+
- **Amazon EMR** – Cluster to run Spark jobs at scale
15+
- **Docker** – Containerized environment for portability and reproducibility
916

10-
<Describe all the files in the projects>
17+
---
1118

12-
This project contains the following files
19+
## Project Structure
1320

14-
- `template`.API.ipynb: a notebook describing the native API of <Package>
15-
- `template`.API.md: a description of the native API of <Package>
16-
- `template`.API.py: code for using API of <Package>
17-
- `template`.example.ipynb: a notebook implementing a project using <Package>
18-
- `template`.example.md: a markdown description of the project
19-
- `template`.example.py: code for implementing the project
21+
| File/Folder | Description |
22+
|-------------|-------------|
23+
| `bitcoin_producer.py` | Fetches real-time Bitcoin prices and writes records to S3 (`data_v2/`) |
24+
| `bitcoin_streaming_consumer_emr_debug.py` | Spark job to compute 1-min windowed average from S3 and write to `output/` |
25+
| `bitcoin_kafka/bitcoin_emr_utils.py` | Helper functions for API fetching, timestamping, and S3 upload |
26+
| `bitcoin_emr.API.ipynb` | Demonstrates utility API functions (with simulated S3 upload fallback) |
27+
| `bitcoin_emr.example.ipynb` | Simulates full pipeline with producer input and EMR output |
28+
| `bitcoin_emr.API.md` | Markdown documenting the API and helper layer |
29+
| `bitcoin_emr.example.md` | Markdown documenting the full pipeline example |
30+
| `requirements.txt` | Python package requirements |
31+
| `Dockerfile` + `*.sh` | Docker setup and run scripts |
32+
33+
---
34+
35+
## Output Format
36+
37+
### Input Record (stored in S3)
38+
39+
```json
40+
{
41+
"timestamp": "2025-05-17T09:58:00",
42+
"price_usd": 102723.12
43+
}
44+
```
45+
46+
### Processed Output (via Spark on EMR)
47+
48+
```json
49+
{
50+
"window": {
51+
"start": "2025-05-17T09:58:00",
52+
"end": "2025-05-17T09:59:00"
53+
},
54+
"avg_price": 102750.13
55+
}
56+
```
57+
58+
---
59+
60+
## AWS Credentials Note
61+
62+
This project uses `boto3` to upload Bitcoin price records to Amazon S3.
63+
64+
If valid AWS credentials are present, records will be uploaded to:
65+
66+
```text
67+
s3://bitcoin-price-streaming-data/data_v2/
68+
```
69+
70+
⚠️ If credentials are not present, the upload will be skipped gracefully, and the JSON record will be printed instead.
71+
72+
This ensures the notebooks run end-to-end even without AWS setup.
73+
74+
---
75+
76+
## Docker Setup Instructions
77+
78+
You can run this project entirely in Docker without installing any local dependencies.
79+
80+
### To Build the Image
81+
82+
```bash
83+
bash docker_build.sh
84+
```
85+
86+
### To Run the Container
87+
88+
```bash
89+
bash docker_bash.sh
90+
```
91+
92+
### Open Jupyter
93+
94+
Once the container is running, open your browser and go to:
95+
96+
```text
97+
http://localhost:8888
98+
```
99+
100+
---
101+
102+
### Notebooks to Run
103+
104+
- `bitcoin_emr.API.ipynb` – Test API functions, simulate S3 upload
105+
- `bitcoin_emr.example.ipynb` – Simulate full pipeline input + output
106+
- Corresponding Markdown Documentation:
107+
- `bitcoin_emr.API.md`
108+
- `bitcoin_emr.example.md`
109+
110+
Both notebooks run without requiring cloud setup.
111+
112+
---
113+
114+
## Running the Spark Job on Amazon EMR (Optional)
115+
116+
To run the Spark job (`bitcoin_streaming_consumer_emr_debug.py`) on an actual Amazon EMR cluster and process the real-time Bitcoin data stored in S3:
117+
118+
### 1. Upload Input Data
119+
120+
Ensure the producer script or notebook has pushed data to:
121+
122+
```text
123+
s3://bitcoin-price-streaming-data/data_v2/
124+
```
125+
126+
This folder should contain timestamped `.json` records with the following structure:
127+
128+
```json
129+
{
130+
"timestamp": "YYYY-MM-DDTHH:MM:SS",
131+
"price_usd": FLOAT
132+
}
133+
```
134+
135+
---
136+
137+
### 2. Launch and Configure EMR Cluster
138+
139+
Navigate to the [EMR Console](https://console.aws.amazon.com/elasticmapreduce/) and create a cluster with the following configurations:
140+
141+
#### Software Configuration
142+
143+
- **Release version**: EMR 6.x (e.g., 6.13.0)
144+
- **Applications**: Spark (uncheck others if not needed)
145+
146+
#### Hardware Configuration
147+
148+
- **Instance type**: `m5.xlarge` (for both Master and Core)
149+
- **Core nodes**: At least 1
150+
- **Auto-termination**: Enable if needed to save costs
151+
152+
#### General Configuration
153+
154+
- **Cluster name**: `bitcoin-emr-cluster`
155+
- **Logging**: Enable and set an S3 log path (e.g., `s3://your-bucket/emr-logs/`)
156+
- **EC2 key pair**: Select a key pair for SSH access (optional but recommended)
157+
158+
#### Networking
159+
160+
- **VPC**: Use the default or a custom one with public subnet
161+
- **Permissions**:
162+
- Use a service role with `AmazonS3FullAccess` and `AmazonEMRFullAccessPolicy_v2`
163+
- Ensure the EC2 instance profile also has access to S3
164+
165+
---
166+
167+
### 3. Submit the Spark Job
168+
169+
You can submit the job in one of two ways:
170+
171+
#### (a) Add a Step from the Console
172+
173+
- Upload `bitcoin_streaming_consumer_emr_debug.py` to S3 (e.g., `s3://your-bucket/scripts/`)
174+
- In the cluster’s "Steps" tab, add a new step:
175+
- **Type**: Spark
176+
- **Name**: `Run Bitcoin Streaming Job`
177+
- **Script location**:
178+
```bash
179+
s3://your-bucket/scripts/bitcoin_streaming_consumer_emr_debug.py
180+
```
181+
- **Arguments**: Leave blank
182+
183+
#### (b) SSH and Run Manually
184+
185+
1. SSH into the master node:
186+
```bash
187+
ssh -i your-key.pem hadoop@<master-node-public-dns>
188+
```
189+
190+
2. Run the script using:
191+
```bash
192+
spark-submit --deploy-mode cluster --master yarn s3://your-bucket/scripts/bitcoin_streaming_consumer_emr_debug.py
193+
```
194+
195+
---
196+
197+
### 4. Output Location
198+
199+
After execution, check the results in your S3 bucket:
200+
201+
```text
202+
s3://bitcoin-price-streaming-data/output/
203+
```
204+
205+
Each file contains windowed average price data over 1-minute intervals in JSON format.
206+
207+
---
208+
209+
### 📝 Tip
210+
211+
To reduce costs:
212+
- Use **auto-termination** after job completion
213+
- Always **terminate idle clusters**
214+
- Monitor logs in `emr-logs/` for errors or debug output
215+
216+
---
217+
218+
## Summary
219+
220+
- Docker runs the entire project with zero setup
221+
- AWS and EMR usage is optional but supported
222+
- Notebooks simulate output if cloud access is unavailable
223+
- Fully reproducible for grading or real deployment
224+
225+
---
226+
227+
**Author:** Rithika Baskaran
228+
**Course:** DATA605 — Spring 2025
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.functions import to_timestamp
3+
from pyspark.sql.types import StructType, StringType, DoubleType
4+
5+
# 1. Start Spark session
6+
spark = SparkSession.builder \
7+
.appName("BitcoinDebugConsole") \
8+
.getOrCreate()
9+
10+
spark.sparkContext.setLogLevel("INFO")
11+
12+
# 2. Define schema
13+
schema = StructType() \
14+
.add("timestamp", StringType()) \
15+
.add("price_usd", DoubleType())
16+
17+
# 3. Read from S3 (simulated stream)
18+
df_stream = spark.readStream \
19+
.schema(schema) \
20+
.option("maxFilesPerTrigger", 1) \
21+
.json("s3://bitcoin-price-streaming-data/data/")
22+
23+
# 4. Parse timestamp column
24+
df_stream = df_stream.withColumn("timestamp", to_timestamp("timestamp"))
25+
26+
# 5. DEBUG: Print raw parsed data to stdout (instead of saving to S3)
27+
query = df_stream.writeStream \
28+
.format("console") \
29+
.outputMode("append") \
30+
.option("truncate", False) \
31+
.trigger(once=True) \
32+
.start()
33+
34+
query.awaitTermination()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from pyspark.sql import SparkSession
2+
from pyspark.sql.types import StructType, StringType, DoubleType
3+
from pyspark.sql.functions import to_timestamp
4+
5+
spark = SparkSession.builder.appName("BitcoinReadTest").getOrCreate()
6+
spark.sparkContext.setLogLevel("INFO")
7+
8+
print("✅ Spark session started")
9+
10+
schema = StructType() \
11+
.add("timestamp", StringType()) \
12+
.add("price_usd", DoubleType())
13+
14+
print("📥 Reading from S3...")
15+
16+
df = spark.read.schema(schema).json("s3://bitcoin-price-streaming-data/data_v2/")
17+
18+
print("🕒 Parsing timestamp...")
19+
20+
df = df.withColumn("timestamp", to_timestamp("timestamp"))
21+
22+
print("📊 Showing data...")
23+
24+
df.show(truncate=False)
25+
26+
print("✅ Done.")
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Bitcoin EMR Utility API Tutorial
2+
3+
## Table of Contents
4+
- [Introduction](#introduction)
5+
- [Native API Used](#native-api-used)
6+
- [Our Utility Layer](#our-utility-layer)
7+
- [Key Functions](#key-functions)
8+
- [fetch_bitcoin_price()](#fetchbitcoin_price)
9+
- [get_current_timestamp()](#get_current_timestamp)
10+
- [save_price_to_s3()](#save_price_to_s3)
11+
- [Example Usage](#example-usage)
12+
- [Conclusion](#conclusion)
13+
14+
---
15+
16+
## Introduction
17+
18+
This tutorial explains the utility layer built for fetching real-time Bitcoin prices from a public API and storing the results in Amazon S3. These utility functions are used by producer scripts and Jupyter notebooks to simulate real-time data pipelines.
19+
20+
---
21+
22+
## Native API Used
23+
24+
- **CoinGecko API**: Used to fetch the current price of Bitcoin in USD.
25+
- **Boto3 (AWS SDK for Python)**: Used to programmatically write JSON files to an S3 bucket.
26+
27+
---
28+
29+
## Our Utility Layer
30+
31+
All helper logic is implemented in `bitcoin_emr_utils.py`, which serves as the foundation for producer scripts and Spark-based processing.
32+
33+
---
34+
35+
## Key Functions
36+
37+
### fetch_bitcoin_price()
38+
39+
- Calls the CoinGecko API.
40+
- Returns the current Bitcoin price in USD.
41+
- Raises an exception if the response is not valid.
42+
43+
### get_current_timestamp()
44+
45+
- Returns the current timestamp in ISO 8601 format with timezone.
46+
- Used to timestamp each Bitcoin price record.
47+
48+
### save_price_to_s3(bucket, folder, filename_prefix="price", price_usd=None)
49+
50+
- Constructs a record using the current timestamp and given price.
51+
- Uploads the JSON object to the given S3 bucket/folder.
52+
- Automatically fetches the price if `price_usd` is not provided.
53+
54+
---
55+
56+
## Example Usage
57+
58+
```python
59+
from bitcoin_emr_utils import fetch_bitcoin_price, save_price_to_s3
60+
61+
price = fetch_bitcoin_price()
62+
save_price_to_s3(bucket='bitcoin-price-streaming-data', folder='data_v2', price_usd=price)
63+

0 commit comments

Comments
 (0)