-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathingest_data.py
More file actions
93 lines (76 loc) · 2.66 KB
/
ingest_data.py
File metadata and controls
93 lines (76 loc) · 2.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import json
import psycopg2
import re
import os
import db_connect as dbc
def run_ingestion(filepath):
print(f"Starting Ingestion Module...")
if not os.path.exists(filepath):
print(f"Error: Could not find {filepath}.")
return
# ==========================================
# EXTRACT & CLEAN JSON
# ==========================================
print(f"Reading and cleaning {filepath}...")
with open(filepath, 'r', encoding='utf-8') as f:
raw_content = f.read()
# Convert Ruby hash syntax (:key=>) into standard JSON syntax ("key":)
cleaned_content = re.sub(r':([a-zA-Z_]+)\s*=>', r'"\1":', raw_content)
try:
records = json.loads(cleaned_content)
print(f"Successfully extracted {len(records)} books.")
except json.JSONDecodeError as e:
print(f"Failed to parse data. Error: {e}")
return
# ==========================================
# LOAD INTO POSTGRES
# ==========================================
# Establish database connection
conn = dbc.get_db_connection()
if conn is None:
return
cursor = conn.cursor()
# Create the raw data table
cursor.execute('''
CREATE TABLE IF NOT EXISTS raw_books (
id TEXT PRIMARY KEY,
title TEXT,
author TEXT,
genre TEXT,
publisher TEXT,
year INTEGER,
price TEXT
)
''')
conn.commit()
print("Loading records into the 'raw_books' table...")
success_count = 0
error_count = 0
for record in records:
try:
# Insert data, ignore if the ID already exists
cursor.execute('''
INSERT INTO raw_books (id, title, author, genre, publisher, year, price)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (id) DO NOTHING
''', (
str(record.get('id')),
record.get('title'),
record.get('author'),
record.get('genre'),
record.get('publisher'),
record.get('year'),
record.get('price')
))
success_count += 1
except psycopg2.Error as e:
conn.rollback() # Postgres requires a rollback on error to continue
error_count += 1
print(f"Database error on record {record.get('id')}: {e}")
conn.commit()
cursor.close()
conn.close()
print("\n✅ Ingestion Module Complete!")
print(f"Successfully loaded {success_count} rows into raw_books.")
if __name__ == "__main__":
run_ingestion('Task1/task1_d.json')