- Create R2 Data Catalog-backed Pipelines resources.
- Copy
wrangler.toml.exampletowrangler.tomland set the stream endpoints. - Set Wrangler secrets.
- Build and deploy the Worker.
- Send a capture/identify verification flow and query the Iceberg tables.
The examples below use stable table names for a fresh deployment: default.hogflare_events and default.hogflare_persons. If you use versioned names during migration, substitute those names consistently in the sink commands and queries.
Set these values before creating sinks:
export R2_BUCKET="<bucket-name>"
export R2_CATALOG_TOKEN="<r2-data-catalog-token>"R2_CATALOG_TOKEN is the token used by R2 Data Catalog/R2 SQL clients such as DuckDB or PyIceberg. The bucket must have R2 Data Catalog enabled before creating r2-data-catalog sinks.
Create the events stream, sink, and pipeline:
bunx wrangler pipelines streams create hogflare_events_stream \
--schema-file scripts/events-pipeline-schema.json \
--http-enabled true \
--http-auth true
bunx wrangler pipelines sinks create hogflare_events_sink \
--type r2-data-catalog \
--bucket "$R2_BUCKET" \
--namespace default \
--table hogflare_events \
--catalog-token "$R2_CATALOG_TOKEN" \
--roll-interval 60
bunx wrangler pipelines create hogflare_events_pipeline \
--sql "INSERT INTO hogflare_events_sink SELECT * FROM hogflare_events_stream;"Create the persons stream, sink, and pipeline if you want queryable people in Iceberg:
bunx wrangler pipelines streams create hogflare_persons_stream \
--schema-file scripts/persons-pipeline-schema.json \
--http-enabled true \
--http-auth true
bunx wrangler pipelines sinks create hogflare_persons_sink \
--type r2-data-catalog \
--bucket "$R2_BUCKET" \
--namespace default \
--table hogflare_persons \
--catalog-token "$R2_CATALOG_TOKEN" \
--roll-interval 60
bunx wrangler pipelines create hogflare_persons_pipeline \
--sql "INSERT INTO hogflare_persons_sink SELECT * FROM hogflare_persons_stream;"Each stream creation command prints an HTTP endpoint like https://<stream-id>.ingest.cloudflare.com. Use those endpoints in wrangler.toml.
Copy the example and fill in the stream endpoints:
cp wrangler.toml.example wrangler.tomlname = "hogflare"
main = "build/index.js" # generated entrypoint from worker-build for the Rust worker
compatibility_date = "2025-01-09"
[vars]
CLOUDFLARE_PIPELINE_ENDPOINT = "https://<stream-id>.ingest.cloudflare.com"
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT = "https://<persons-stream-id>.ingest.cloudflare.com"
CLOUDFLARE_PIPELINE_TIMEOUT_SECS = "10"
# Optional
# POSTHOG_TEAM_ID = "1"
# POSTHOG_GROUP_TYPE_0 = "company"
# POSTHOG_GROUP_TYPE_1 = "team"
# POSTHOG_GROUP_TYPE_2 = "project"
# POSTHOG_GROUP_TYPE_3 = "org"
# POSTHOG_GROUP_TYPE_4 = "workspace"
# POSTHOG_SESSION_RECORDING_ENDPOINT = "/s/"
[[durable_objects.bindings]]
name = "PERSONS"
class_name = "PersonDurableObject"
[[durable_objects.bindings]]
name = "PERSON_ID_COUNTER"
class_name = "PersonIdCounterDurableObject"
[[durable_objects.bindings]]
name = "GROUPS"
class_name = "GroupDurableObject"
[[migrations]]
tag = "v1"
new_sqlite_classes = ["PersonDurableObject"]
[[migrations]]
tag = "v2"
new_sqlite_classes = ["PersonIdCounterDurableObject", "GroupDurableObject"]| Setting | Required | Notes |
|---|---|---|
CLOUDFLARE_PIPELINE_ENDPOINT |
Yes | Events stream HTTP endpoint from wrangler pipelines streams create. |
CLOUDFLARE_PIPELINE_AUTH_TOKEN |
Yes, for authenticated streams | Bearer token used for events stream HTTP ingest. |
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT |
No | Persons stream endpoint. Set this to write person snapshots to Iceberg. |
CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN |
No | Falls back to CLOUDFLARE_PIPELINE_AUTH_TOKEN when omitted. |
CLOUDFLARE_PIPELINE_TIMEOUT_SECS |
No | Defaults to 10 seconds. |
POSTHOG_API_KEY |
No | Default project token returned by /decide when request/header token is absent. |
POSTHOG_TEAM_ID |
No | Optional team id attached to event and person rows. |
POSTHOG_GROUP_TYPE_0..4 |
No | Maps PostHog group types to group0..group4; set POSTHOG_GROUP_TYPE_0=company to populate group0 for company groups. |
POSTHOG_SESSION_RECORDING_ENDPOINT |
No | Returned in /decide session recording config. |
HOGFLARE_REPLAY_ACCOUNT_ID |
No | Enables the /replay UI API when set with bucket and token. |
HOGFLARE_REPLAY_BUCKET |
No | R2 bucket name backing the R2 Data Catalog warehouse. |
HOGFLARE_REPLAY_R2_SQL_TOKEN |
No | R2 SQL/Data Catalog token used server-side to query replay rows. Store as a secret. |
HOGFLARE_REPLAY_EVENTS_TABLE |
No | Iceberg events table queried by replay APIs. Defaults to default.hogflare_events. |
HOGFLARE_REPLAY_QUERY_LIMIT |
No | Maximum snapshot rows a replay API request can read. Defaults to 5000. |
POSTHOG_SIGNING_SECRET |
No | Enables HMAC request signature checks. |
PERSON_DEBUG_TOKEN |
No | Enables /__debug/person/:id for deployment verification. |
HOGFLARE_FEATURE_FLAGS |
No | JSON flag config used by /decide and /flags. |
Use a Cloudflare API token that can write to Pipelines for CLOUDFLARE_PIPELINE_AUTH_TOKEN. The same token can usually be reused for the persons stream.
bunx wrangler secret put CLOUDFLARE_PIPELINE_AUTH_TOKEN
# Optional. If omitted, the persons pipeline uses CLOUDFLARE_PIPELINE_AUTH_TOKEN.
bunx wrangler secret put CLOUDFLARE_PERSONS_PIPELINE_AUTH_TOKEN
# Optional.
bunx wrangler secret put POSTHOG_SIGNING_SECRET
bunx wrangler secret put PERSON_DEBUG_TOKEN
bunx wrangler secret put HOGFLARE_FEATURE_FLAGS
bunx wrangler secret put HOGFLARE_REPLAY_R2_SQL_TOKENworker-build --release
bunx wrangler deployexport HOGFLARE_URL="https://<your-worker>.workers.dev"
export HOGFLARE_API_KEY="phc_verify_$(date -u +%Y%m%d%H%M%S)"
export HOGFLARE_ANON_ID="${HOGFLARE_API_KEY}_anon"
export HOGFLARE_USER_ID="${HOGFLARE_API_KEY}_user"Send an anonymous capture:
curl -X POST "$HOGFLARE_URL/capture" \
-H "Content-Type: application/json" \
-d "{
\"api_key\": \"$HOGFLARE_API_KEY\",
\"event\": \"verify-anon-capture\",
\"distinct_id\": \"$HOGFLARE_ANON_ID\",
\"properties\": {
\"\$set\": { \"initial_referrer\": \"docs\" },
\"\$set_once\": { \"first_seen_source\": \"readme\" }
}
}"Identify the user and link the anonymous ID:
curl -X POST "$HOGFLARE_URL/identify" \
-H "Content-Type: application/json" \
-d "{
\"api_key\": \"$HOGFLARE_API_KEY\",
\"distinct_id\": \"$HOGFLARE_USER_ID\",
\"properties\": {
\"\$anon_distinct_id\": \"$HOGFLARE_ANON_ID\",
\"\$set\": { \"email\": \"verify@example.com\", \"plan\": \"pro\" },
\"\$set_once\": { \"signup_source\": \"readme\" }
}
}"Send a post-identify capture:
curl -X POST "$HOGFLARE_URL/capture" \
-H "Content-Type: application/json" \
-d "{
\"api_key\": \"$HOGFLARE_API_KEY\",
\"event\": \"verify-identified-capture\",
\"distinct_id\": \"$HOGFLARE_USER_ID\",
\"properties\": { \"button\": \"verify\" }
}"Wait for the sink roll interval, then query R2 SQL:
export R2_WAREHOUSE="<account-id>_<bucket-name>"
export WRANGLER_R2_SQL_AUTH_TOKEN="$R2_CATALOG_TOKEN"
bunx wrangler r2 sql query "$R2_WAREHOUSE" \
"select event, distinct_id, person_id, person_properties
from default.hogflare_events
where api_key = '$HOGFLARE_API_KEY'
order by created_at asc"
bunx wrangler r2 sql query "$R2_WAREHOUSE" \
"select operation, canonical_distinct_id, person_id, distinct_ids, merged_properties
from default.hogflare_persons
where api_key = '$HOGFLARE_API_KEY'
order by updated_at asc"Expected result: the three event rows share one person_id, and the persons table has capture, identify, capture snapshots. After identify, distinct_ids should include both the anonymous and identified IDs.
The repo includes a lightweight fake pipeline used by tests.
docker compose up --build -d fake-pipeline# .env.local (not committed)
CLOUDFLARE_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
CLOUDFLARE_PERSONS_PIPELINE_ENDPOINT=http://127.0.0.1:8088/
CLOUDFLARE_PIPELINE_TIMEOUT_SECS=5cargo run --bin hogflareDelete Pipelines resources in dependency order: pipelines first, then streams and sinks.
bunx wrangler pipelines list
bunx wrangler pipelines delete <pipeline-id> --force
bunx wrangler pipelines streams list
bunx wrangler pipelines streams delete <stream-id> --force
bunx wrangler pipelines sinks list
bunx wrangler pipelines sinks delete <sink-id> --forcewrangler r2 sql query is read-only. To drop an Iceberg table from R2 Data Catalog, use the Iceberg catalog API. One local option is PyIceberg:
R2_CATALOG_TOKEN="<r2-data-catalog-token>" uv run --with pyiceberg python - <<'PY'
import os
from pyiceberg.catalog.rest import RestCatalog
catalog = RestCatalog(
name="hogflare",
warehouse="<account-id>_<bucket-name>",
uri="https://catalog.cloudflarestorage.com/<account-id>/<bucket-name>",
token=os.environ["R2_CATALOG_TOKEN"],
)
catalog.drop_table(("default", "<table-name>"), purge_requested=True)
PY