diff --git a/07-streaming/workshop/live/notebooks/producer.py b/07-streaming/workshop/live/notebooks/producer.py new file mode 100644 index 000000000..0a060d2b4 --- /dev/null +++ b/07-streaming/workshop/live/notebooks/producer.py @@ -0,0 +1,42 @@ +import dataclasses +import json +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import pandas as pd +from kafka import KafkaProducer +from models import Ride, ride_from_row + +# Download NYC yellow taxi trip data (first 1000 rows) +url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet" +columns = ['PULocationID', 'DOLocationID', 'trip_distance', 'total_amount', 'tpep_pickup_datetime'] +df = pd.read_parquet(url, columns=columns).head(1000) + +def ride_serializer(ride): + ride_dict = dataclasses.asdict(ride) + json_str = json.dumps(ride_dict) + return json_str.encode('utf-8') + +server = 'localhost:9092' + +producer = KafkaProducer( + bootstrap_servers=[server], + value_serializer=ride_serializer +) +t0 = time.time() + +topic_name = 'rides' + +for _, row in df.iterrows(): + ride = ride_from_row(row) + producer.send(topic_name, value=ride) + print(f"Sent: {ride}") + time.sleep(0.01) + +producer.flush() + +t1 = time.time() +print(f'took {(t1 - t0):.2f} seconds')