Skip to content

Commit a9f9110

Browse files
committed
Update module 3 to have streaming ingest
Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent 55245ae commit a9f9110

9 files changed

Lines changed: 562 additions & 107 deletions

File tree

module_3/README.md

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<h1>Module 3: Orchestrated batch transformations using dbt + Airflow with Feast (Snowflake)</h1>
22

3-
> **Note:** This module is still WIP, and does not have a public data set to use
3+
> **Note:** This module is still WIP, and does not have a public data set to use. There is a smaller dataset visible in `data/`
44
55
This is a very similar module to module 1. The key difference is now we'll be using a data warehouse (Snowflake) in combination with dbt + Airflow to ensure that batch features are regularly generated.
66

@@ -21,7 +21,7 @@ This is a very similar module to module 1. The key difference is now we'll be us
2121
- [Workshop](#workshop)
2222
- [Step 1: Install Feast](#step-1-install-feast)
2323
- [Step 2: Inspect the `feature_store.yaml`](#step-2-inspect-the-feature_storeyaml)
24-
- [Step 3: Spin up services (Redis + Feast SQL Registry + Feast services)](#step-3-spin-up-services-redis--feast-sql-registry--feast-services)
24+
- [Step 3: Spin up services (Kafka + Redis + Feast SQL Registry + Feast services)](#step-3-spin-up-services-kafka--redis--feast-sql-registry--feast-services)
2525
- [Step 4: Set up dbt models for batch transformations](#step-4-set-up-dbt-models-for-batch-transformations)
2626
- [Step 5: Run `feast apply`](#step-5-run-feast-apply)
2727
- [Step 6: Set up orchestration](#step-6-set-up-orchestration)
@@ -30,8 +30,9 @@ This is a very similar module to module 1. The key difference is now we'll be us
3030
- [6c: Enable the DAG](#6c-enable-the-dag)
3131
- [Q: What if different feature views have different freshness requirements?](#q-what-if-different-feature-views-have-different-freshness-requirements)
3232
- [Step 6d (optional): Run a backfill](#step-6d-optional-run-a-backfill)
33-
- [Step 7: Run `get_historical_features` and `get_online_features`](#step-7-run-get_historical_features-and-get_online_features)
34-
- [Step 8: Streaming](#step-8-streaming)
33+
- [Step 7: Retrieve features + test stream ingestion](#step-7-retrieve-features--test-stream-ingestion)
34+
- [Overview](#overview)
35+
- [Time to run code!](#time-to-run-code)
3536
- [Conclusion](#conclusion)
3637
- [Limitations](#limitations)
3738
- [Why Feast?](#why-feast)
@@ -70,21 +71,25 @@ offline_store:
7071
entity_key_serialization_version: 2
7172
```
7273
73-
## Step 3: Spin up services (Redis + Feast SQL Registry + Feast services)
74+
## Step 3: Spin up services (Kafka + Redis + Feast SQL Registry + Feast services)
7475
7576
We use Docker Compose to spin up the services we need.
7677
- This deploys an instance of Redis, Postgres for a registry, a Feast feature server + push server.
78+
- This also uses `transactions.parquet` to generate streaming feature values to ingest into the online store with dummy timestamps
7779

7880
Start up the Docker daemon and then use Docker Compose to spin up the services as described above:
7981
- You may need to run `sudo docker-compose up` if you run into a Docker permission denied error
8082
```console
8183
$ docker-compose up
8284
8385
Creating network "module_3_default" with the default driver
84-
Creating redis ... done
85-
Creating registry ... done
86+
Creating registry ... done
87+
Creating zookeeper ... done
88+
Creating redis ... done
89+
Creating broker ... done
90+
Creating tx_kafka_events ... done
8691
Creating feast_feature_server ... done
87-
Attaching to redis, registry, feast_feature_server
92+
Attaching to zookeeper, redis, registry, broker, kafka_events, feast_feature_server
8893
...
8994
```
9095

@@ -106,15 +111,28 @@ This will create the initial tables we need for Feast
106111
In this example, we're using a test database in Snowflake.
107112

108113
To get started, go ahead and register the feature repository
109-
```bash
110-
# Note: first you need to export environment variables matching the above variables:
111-
# export SNOWFLAKE_DEPLOYMENT_URL="[YOUR DEPLOYMENT]
112-
# export SNOWFLAKE_USER="[YOUR USER]
113-
# export SNOWFLAKE_PASSWORD="[YOUR PASSWORD]
114-
# export SNOWFLAKE_ROLE="[YOUR ROLE]
115-
# export SNOWFLAKE_WAREHOUSE="[YOUR WAREHOUSE]
116-
# export SNOWFLAKE_DATABASE="[YOUR DATABASE]
117-
cd feature_repo; feast apply
114+
```console
115+
<!--
116+
Note: first you need to export environment variables
117+
matching the above variables:
118+
119+
export SNOWFLAKE_DEPLOYMENT_URL="[YOUR DEPLOYMENT]
120+
export SNOWFLAKE_USER="[YOUR USER]
121+
export SNOWFLAKE_PASSWORD="[YOUR PASSWORD]
122+
export SNOWFLAKE_ROLE="[YOUR ROLE]
123+
export SNOWFLAKE_WAREHOUSE="[YOUR WAREHOUSE]
124+
export SNOWFLAKE_DATABASE="[YOUR DATABASE]
125+
-->
126+
$ cd feature_repo; feast apply
127+
128+
Created entity user
129+
Created feature view aggregate_transactions_features
130+
Created feature view credit_scores_features
131+
Created feature service model_v1
132+
Created feature service model_v2
133+
134+
Deploying infrastructure for aggregate_transactions_features
135+
Deploying infrastructure for credit_scores_features
118136
```
119137
## Step 6: Set up orchestration
120138
### Step 6a: Setting up Airflow to work with dbt
@@ -217,11 +235,11 @@ airflow dags backfill \
217235
feature_dag
218236
```
219237

220-
## Step 7: Run `get_historical_features` and `get_online_features`
221-
Run [Jupyter notebook](feature_repo/module_3.ipynb)
238+
## Step 7: Retrieve features + test stream ingestion
239+
### Overview
240+
Feast exposes a `get_historical_features` method to generate training data / run batch scoring and `get_online_features` method to power model serving.
222241

223-
## Step 8: Streaming
224-
There are two broad approaches with streaming
242+
To achieve fresher features, one might consider using streaming compute.There are two broad approaches with streaming
225243
1. **[Simple, semi-fresh features]** Use data warehouse / data lake specific streaming ingest of raw data.
226244
- This means that Feast only needs to know about a "batch feature" because the assumption is those batch features are sufficiently fresh.
227245
- **BUT** there are limits to how fresh your features are. You won't be able to get to minute level freshness.
@@ -230,6 +248,9 @@ There are two broad approaches with streaming
230248

231249
Feast will help enforce a consistent schema across batch + streaming features as they land in the online store.
232250

251+
### Time to run code!
252+
Now, Run [Jupyter notebook](feature_repo/module_3.ipynb)
253+
233254
# Conclusion
234255
By the end of this module, you will have learned how to build a full feature platform, with orchestrated batch transformations (using dbt + Airflow), orchestrated materialization (with Feast + Airflow).
235256

306 KB
Binary file not shown.

module_3/data/transactions.parquet

149 KB
Binary file not shown.

module_3/docker-compose.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,38 @@
11
---
22
version: '3'
33
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper:7.0.1
6+
container_name: zookeeper
7+
environment:
8+
ZOOKEEPER_CLIENT_PORT: 2181
9+
ZOOKEEPER_TICK_TIME: 2000
10+
11+
broker:
12+
image: confluentinc/cp-kafka:7.0.1
13+
container_name: broker
14+
ports:
15+
- "9092:9092"
16+
- "29092:29092"
17+
depends_on:
18+
- zookeeper
19+
environment:
20+
KAFKA_BROKER_ID: 1
21+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
22+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
23+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
24+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
25+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
26+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
27+
28+
tx_kafka_events:
29+
build:
30+
context: .
31+
dockerfile: kafka_demo/Dockerfile
32+
depends_on:
33+
- broker
34+
container_name: tx_kafka_events
35+
436
redis:
537
image: redis
638
container_name: redis

module_3/feature_repo/data_sources.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,23 @@
1111
timestamp_field="TIMESTAMP",
1212
)
1313

14-
aggregate_transactions_source = SnowflakeSource(
15-
name="transactions_7d_source",
14+
aggregate_transactions_batch = SnowflakeSource(
15+
name="transactions_7d_batch",
1616
database=yaml.safe_load(open("feature_store.yaml"))["offline_store"]["database"],
1717
table="AGGREGATE_TRANSACTION_FEATURES",
1818
schema="FRAUD",
1919
timestamp_field="TIMESTAMP",
2020
tags={"dbtModel": "models/example/aggregate_transaction_features.sql"},
2121
)
2222

23+
aggregate_transactions_push = PushSource(
24+
name="transactions_7d", batch_source=aggregate_transactions_batch
25+
)
26+
2327
credit_scores = SnowflakeSource(
2428
name="credit_scores_source",
2529
database=yaml.safe_load(open("feature_store.yaml"))["offline_store"]["database"],
2630
query="SELECT USER_ID, DATE, CREDIT_SCORE, TIMESTAMP FROM CREDIT_SCORES",
2731
schema="FRAUD",
2832
timestamp_field="TIMESTAMP",
2933
)
30-
31-
# A push source is useful if you have upstream systems that transform features (e.g. stream processing jobs)
32-
driver_stats_push_source = PushSource(
33-
name="driver_stats_push_source",
34-
batch_source=transactions_source,
35-
)

module_3/feature_repo/features.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
Field(name="7D_AVG_AMT", dtype=Float32),
3535
],
3636
online=True,
37-
source=aggregate_transactions_source,
37+
source=aggregate_transactions_push,
3838
tags={"production": "True"},
3939
owner="test2@gmail.com",
4040
)

0 commit comments

Comments
 (0)