-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_data.py
More file actions
163 lines (133 loc) · 5.44 KB
/
process_data.py
File metadata and controls
163 lines (133 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
AWS S3 Data Processing Script (Lab)
Reads genomics CSV and sensor JSON from S3, computes statistics,
filters records, converts formats, and saves results to a folder in the same bucket.
Region: Singapore (ap-southeast-1)
"""
import io
import json
from statistics import mean, median
import boto3
import pandas as pd
# --- CONFIGURATION (edit for your bucket) ---
BUCKET_NAME = "research-data-wawi-2025" # Replace with your bucket name
REGION = "ap-southeast-1" # Singapore
# S3 object keys (paths) for input files. Change if you uploaded under a prefix (e.g. raw/).
GENOMICS_KEY = "genomics_sample.csv"
SENSOR_KEY = "sensor_readings.json"
# Output folder inside the bucket (no leading slash)
OUTPUT_PREFIX = "processed/"
def get_s3_client():
"""Create S3 client for Singapore region."""
return boto3.client("s3", region_name=REGION)
def read_csv_from_s3(s3_client, bucket, key):
"""Read a CSV file from S3 into a pandas DataFrame."""
response = s3_client.get_object(Bucket=bucket, Key=key)
return pd.read_csv(io.BytesIO(response["Body"].read()))
def read_json_from_s3(s3_client, bucket, key):
"""Read a JSON file from S3 (array of objects)."""
response = s3_client.get_object(Bucket=bucket, Key=key)
return json.loads(response["Body"].read())
def write_df_to_s3(s3_client, bucket, key, df, format="csv"):
"""Write a DataFrame to S3 as CSV or JSON."""
buffer = io.BytesIO()
if format == "csv":
df.to_csv(buffer, index=False)
else:
buffer.write(df.to_json(orient="records", indent=2).encode("utf-8"))
buffer.seek(0)
s3_client.put_object(Bucket=bucket, Key=key, Body=buffer.getvalue())
print(f" Written: s3://{bucket}/{key}")
def write_json_to_s3(s3_client, bucket, key, data):
"""Write a Python dict/list to S3 as JSON."""
body = json.dumps(data, indent=2)
s3_client.put_object(Bucket=bucket, Key=key, Body=body.encode("utf-8"))
print(f" Written: s3://{bucket}/{key}")
def process_genomics(df):
"""Compute statistics and filter genomics data. Return (stats_dict, filtered_df)."""
numeric = df["expression"]
stats = {
"count": int(numeric.count()),
"mean": round(float(numeric.mean()), 4),
"median": round(float(numeric.median()), 4),
"min": round(float(numeric.min()), 4),
"max": round(float(numeric.max()), 4),
}
# Filter: expression >= 10
filtered = df[df["expression"] >= 10].copy()
return stats, filtered
def process_sensor(readings):
"""Compute statistics per sensor and filter by value. Return (stats_list, filtered_list)."""
values = [r["value"] for r in readings]
overall = {
"count": len(values),
"mean": round(mean(values), 4),
"median": round(median(values), 4),
"min": round(min(values), 4),
"max": round(max(values), 4),
}
# Per-sensor stats
by_sensor = {}
for r in readings:
sid = r["sensor_id"]
if sid not in by_sensor:
by_sensor[sid] = []
by_sensor[sid].append(r["value"])
per_sensor = [
{"sensor_id": sid, "mean": round(mean(vals), 4), "count": len(vals)}
for sid, vals in by_sensor.items()
]
stats = {"overall": overall, "per_sensor": per_sensor}
# Filter: value >= 22 (e.g. temperature readings above threshold)
filtered = [r for r in readings if r["value"] >= 22]
return stats, filtered
def main():
print("S3 Data Processing (Lab)")
print(f"Bucket: {BUCKET_NAME} Region: {REGION}")
print()
s3 = get_s3_client()
# --- 1. Read genomics CSV from S3 ---
print("1. Reading genomics CSV from S3...")
genomics_df = read_csv_from_s3(s3, BUCKET_NAME, GENOMICS_KEY)
print(f" Rows: {len(genomics_df)}")
# --- 2. Process genomics: statistics + filter ---
print("2. Processing genomics (statistics, filter expression >= 10)...")
genomics_stats, genomics_filtered = process_genomics(genomics_df)
print(f" Statistics: {genomics_stats}")
print(f" Filtered rows: {len(genomics_filtered)}")
# --- 3. Read sensor JSON from S3 ---
print("3. Reading sensor JSON from S3...")
sensor_readings = read_json_from_s3(s3, BUCKET_NAME, SENSOR_KEY)
print(f" Records: {len(sensor_readings)}")
# --- 4. Process sensor: statistics + filter ---
print("4. Processing sensor data (statistics, filter value >= 22)...")
sensor_stats, sensor_filtered = process_sensor(sensor_readings)
print(f" Overall stats: {sensor_stats['overall']}")
# --- 5. Save all results to S3 (folder: processed/) ---
print("5. Saving results to S3...")
write_json_to_s3(
s3, BUCKET_NAME, f"{OUTPUT_PREFIX}genomics_stats.json", genomics_stats
)
write_df_to_s3(
s3, BUCKET_NAME, f"{OUTPUT_PREFIX}genomics_filtered.csv", genomics_filtered, format="csv"
)
write_df_to_s3(
s3, BUCKET_NAME, f"{OUTPUT_PREFIX}genomics_filtered.json",
genomics_filtered, format="json"
)
write_json_to_s3(
s3, BUCKET_NAME, f"{OUTPUT_PREFIX}sensor_stats.json", sensor_stats
)
write_json_to_s3(
s3, BUCKET_NAME, f"{OUTPUT_PREFIX}sensor_filtered.json", sensor_filtered
)
# Optional: sensor filtered as CSV
sensor_filtered_df = pd.DataFrame(sensor_filtered)
write_df_to_s3(
s3, BUCKET_NAME, f"{OUTPUT_PREFIX}sensor_filtered.csv",
sensor_filtered_df, format="csv"
)
print()
print("Done. Check your bucket folder:", OUTPUT_PREFIX)
if __name__ == "__main__":
main()