From 50f934ed8a252edc38940d8362353ebefafc04ab Mon Sep 17 00:00:00 2001 From: Joye <68166902+joyeli25@users.noreply.github.com> Date: Sat, 21 Mar 2026 00:05:55 -0700 Subject: [PATCH] Create producer.py --- .../workshop/live/notebooks/producer.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 07-streaming/workshop/live/notebooks/producer.py diff --git a/07-streaming/workshop/live/notebooks/producer.py b/07-streaming/workshop/live/notebooks/producer.py new file mode 100644 index 000000000..0a060d2b4 --- /dev/null +++ b/07-streaming/workshop/live/notebooks/producer.py @@ -0,0 +1,42 @@ +import dataclasses +import json +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import pandas as pd +from kafka import KafkaProducer +from models import Ride, ride_from_row + +# Download NYC yellow taxi trip data (first 1000 rows) +url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet" +columns = ['PULocationID', 'DOLocationID', 'trip_distance', 'total_amount', 'tpep_pickup_datetime'] +df = pd.read_parquet(url, columns=columns).head(1000) + +def ride_serializer(ride): + ride_dict = dataclasses.asdict(ride) + json_str = json.dumps(ride_dict) + return json_str.encode('utf-8') + +server = 'localhost:9092' + +producer = KafkaProducer( + bootstrap_servers=[server], + value_serializer=ride_serializer +) +t0 = time.time() + +topic_name = 'rides' + +for _, row in df.iterrows(): + ride = ride_from_row(row) + producer.send(topic_name, value=ride) + print(f"Sent: {ride}") + time.sleep(0.01) + +producer.flush() + +t1 = time.time() +print(f'took {(t1 - t0):.2f} seconds')