-
-
Notifications
You must be signed in to change notification settings - Fork 8.1k
Expand file tree
/
Copy pathingest_data.py
More file actions
77 lines (66 loc) · 2.32 KB
/
ingest_data.py
File metadata and controls
77 lines (66 loc) · 2.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python
# coding: utf-8
import click
import pandas as pd
from sqlalchemy import create_engine
from tqdm.auto import tqdm
dtype = {
"VendorID": "Int64",
"passenger_count": "Int64",
"trip_distance": "float64",
"RatecodeID": "Int64",
"store_and_fwd_flag": "string",
"PULocationID": "Int64",
"DOLocationID": "Int64",
"payment_type": "Int64",
"fare_amount": "float64",
"extra": "float64",
"mta_tax": "float64",
"tip_amount": "float64",
"tolls_amount": "float64",
"improvement_surcharge": "float64",
"total_amount": "float64",
"congestion_surcharge": "float64"
}
parse_dates = [
"tpep_pickup_datetime",
"tpep_dropoff_datetime"
]
@click.command()
@click.option('--pg-user', default='root', help='PostgreSQL user')
@click.option('--pg-pass', default='root', help='PostgreSQL password')
@click.option('--pg-host', default='localhost', help='PostgreSQL host')
@click.option('--pg-port', default=5432, type=int, help='PostgreSQL port')
@click.option('--pg-db', default='ny_taxi', help='PostgreSQL database name')
@click.option('--year', default=2021, type=int, help='Year of the data')
@click.option('--month', default=1, type=int, help='Month of the data')
@click.option('--target-table', default='yellow_taxi_data', help='Target table name')
@click.option('--chunksize', default=100000, type=int, help='Chunk size for reading CSV')
def run(pg_user, pg_pass, pg_host, pg_port, pg_db, year, month, target_table, chunksize):
"""Ingest NYC taxi data into PostgreSQL database."""
prefix = 'https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow'
url = f'{prefix}/yellow_tripdata_{year}-{month:02d}.csv.gz'
engine = create_engine(f'postgresql+psycopg://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}')
df_iter = pd.read_csv(
url,
dtype=dtype,
parse_dates=parse_dates,
iterator=True,
chunksize=chunksize,
)
first = True
for df_chunk in tqdm(df_iter):
if first:
df_chunk.head(0).to_sql(
name=target_table,
con=engine,
if_exists='replace'
)
first = False
df_chunk.to_sql(
name=target_table,
con=engine,
if_exists='append'
)
if __name__ == '__main__':
run()