The cloud-agnostic, event-driven declarative lakehouse engine built on Apache Iceberg.
"Don't build the engine. Delete the engine."
Status: v0.1 MVP. The declarative refresh engine and polling-based event-driven loop both work — define a derived table in SQL with partition windowing and refresh modes, run
floe watch, and Floe keeps it fresh as its upstreams get new Iceberg snapshots. Data quality routing, preconditions, push-based event hooks (replacing polling), Flink-based streaming compute, and multi-cloud deployment profiles are roadmap items (v0.2+).
You have a raw data table that keeps getting new rows — delivery events, orders, telemetry. You want a derived table that's a cleaned, joined, or aggregated view of it, ready for downstream consumers (analytics dashboards, ML feature pipelines, BI tools). Today, you build that by writing a custom pipeline: a Spark or Redshift script that reads the raw table, transforms it, writes the result, and runs on a schedule. Every team writes that orchestration boilerplate from scratch, for every derived table they want.
Floe lets you skip the boilerplate. You declare the derived table once in SQL:
CREATE DYNAMIC TABLE silver.orders
PARTITION BY order_date
PARTITION_WINDOW = INTERVAL '7 days'
REFRESH_MODE = INCREMENTAL
AS
SELECT o.order_id, o.amount, c.region
FROM bronze.raw_orders o
JOIN bronze.customers c ON o.customer_id = c.id;Floe builds the dependency graph between upstream and downstream tables, runs the SELECT incrementally for only the partitions that need refreshing when upstream data lands, writes the result back into Iceberg, and injects lineage columns so you can trace every row back to the upstream snapshot that produced it. No orchestration code to write per table.
flowchart LR
A[("bronze.raw_orders<br/>(Iceberg)")] -->|new commit| F["Floe<br/>refresh engine"]
B[("bronze.customers<br/>(Iceberg)")] -->|new commit| F
D["CREATE DYNAMIC TABLE<br/>silver.orders AS SELECT ..."] -.declares.-> F
F -->|incremental refresh<br/>+ lineage columns| O[("silver.orders<br/>(Iceberg)")]
O --> Q[Analytics / ML / BI<br/>via Trino, DuckDB, Spark]
The vision is to bring Snowflake-style Dynamic Tables to Apache Iceberg, without cloud lock-in. v0.1 ships the core refresh engine; the rest — data quality routing, event-driven triggering, multi-cloud profiles — is what's left to build.
What's implemented today (v0.1):
- Declarative — define what you want, not how to refresh it
- Iceberg-native — pure open table format; works with any Iceberg-compatible reader (Trino, DuckDB, Spark)
- Incremental and full refresh with partition-aware windowing
- Event-driven refresh via polling —
floe watchkeeps derived tables fresh as upstream Iceberg tables receive new snapshots - Automatic lineage injection — every row in a derived table carries
_floe_input_snapshot_idand_floe_job_run_idcolumns - Stateless workers — all state lives in Iceberg snapshots; no external state stores
Roadmap (v0.2 and beyond):
- Native push-based event hooks — Iceberg commit listeners + pluggable Event Bus (NATS, EventBridge, Event Hubs, Pub/Sub) replace the v0.1 polling loop
- Flink-based streaming compute — replaces DuckDB as the production compute layer for streaming workloads and the
TRIGGEREDrefresh mode - Built-in data quality — preconditions, quarantine routing, DQ rule enforcement on every write
- Cloud-agnostic deployment — packaged for AWS, Azure, GCP. (v0.1 runs against a local SQLite Iceberg catalog.)
- AI-native authoring — describe a pipeline in plain English; Floe generates the SQL DDL and wires it into the DAG
- Local-first iteration — build and test pipelines instantly against a small local dataset; no CDK/CloudFormation round-trip per change
The quickstart example ships a self-contained delivery-analytics catalog and a demo runner that fires two upstream appends while the dashboard is live:
# one-time setup
cd examples/quickstart
python seed_sources.py # seeds bronze.deliveries + bronze.delivery_defects
floe apply # materialises all three DITs for the first time
# watch the live dashboard
python demo_runner.py # starts floe watch with the Rich UI; appends data twice in the backgroundThe dashboard shows the pipeline DAG, per-table status, snapshot IDs, and a rolling event log — all updating in real time as Floe detects new Iceberg snapshots and refreshes downstream tables.
To record a GIF of the demo (Linux/macOS, requires VHS):
# from the repo root (venv must be active)
vhs examples/quickstart/demo.tape- The Problem
- Inspiration: Snowflake's Architecture
- Core Concepts
- Architecture Overview
- Component Deep Dives
- Developer Interface
- Deployment Modes
- Operational Patterns
- Comparison with Existing Projects
- Roadmap
- Open Questions
You build a delivery analytics pipeline. Two source tables — bronze.deliveries (every package delivered) and bronze.delivery_defects (damage reports, customer complaints, contractual SLA breaches) — both partitioned by delivery_date. You build a gold.delivery_quality_daily table that joins them to compute the daily defect rate per region.
Simple, until you understand how the data actually behaves:
- Delivery events show up within minutes of the package being delivered.
- Defect reports trickle in for days afterward. A delivery on April 22 might have a customer complaint filed on April 27, a contractual review filed on May 1, and a chargeback filed on May 5.
Your gold.delivery_quality_daily partition for April 22 is wrong on April 23, less wrong on April 24, and roughly correct by May 5. Each old partition needs to be revisited as late-arriving defects show up.
You write down the SLA your stakeholders actually want:
Every partition for the last 14 days must have been recomputed within 2 days of that partition's date.
So today's partition needs a refresh today or yesterday. Last week's partition needs a refresh from this week. The partition from 14 days ago is allowed to be a little older. Older than 14 days, you don't care — that data is now stable.
This is a sliding-window freshness contract, and no single cron schedule can express it. You can run a job daily that rewrites the last 14 partitions, but then every question about "is this fresh enough?" lives in your head, not in the system.
You write the daily job. Then the operational questions start:
- Upstream isn't ready yet.
bronze.delivery_defectshasn't refreshed today. Do you wait? Wait how long? At what point do you give up and run with stale upstream data? - Or do you trigger upstream? If you can re-run the upstream pipeline yourself, do you do it on every run? You're now coupling pipelines that were supposed to be independent — and you've just rebuilt a fragile orchestration layer.
- Late arrivals outside the window. A defect lands for January 12 — a partition from four months ago, well outside your 14-day window. Do you refresh it? Your job ignores anything older.
- Backfills. A bug is fixed. You need to recompute 90 partitions correctly. Your daily-cron design has no answer for this; you write a one-off script, run it manually, hope nobody else's pipeline is reading mid-write.
- Partial dependencies. Today's
goldpartition only needs today'sbronzedata. But your job re-reads all 14 days of upstream because there's no easy way to express "only this partition depends on that partition."
You patch each one. You add sensors. You add YAML dependency configs. You add manual override flags. Six months later your "platform" is a 50K-line Python service that nobody but you can extend without breaking. This is the trap. Every data team falls in. The fix is always more orchestration code, and that code becomes the most fragile thing in your stack.
Floe is what happens when you stop scheduling and let the data trigger the work.
A new partition lands in bronze.delivery_defects? The corresponding gold.delivery_quality_daily partition is automatically marked stale and refreshed. A late-arriving defect for January 12? The January 12 gold partition refreshes — same code path, no special handling. A backfill? You replay the upstream snapshots; everything downstream catches up automatically. You write the SQL once, declare the freshness target, and Floe figures out the rest.
There is no scheduling. There is no "is the upstream fresh enough" decision in your code. The DAG knows.
Streaming-native lakehouses are a maturing space, but no project hits all five must-haves:
| Project | Event-Driven | Iceberg-Native | Dynamic Table DAG | Cloud-Agnostic | Local Mode |
|---|---|---|---|---|---|
| Apache Paimon | ✅ | ❌ (own format) | ❌ | ✅ | Partial |
| RisingWave | ✅ | Partial | ✅ | ✅ | ✅ |
| Flink + Iceberg (DIY) | ✅ | ✅ | ❌ | ✅ | ❌ |
| Delta Live Tables | ✅ | ❌ | ✅ | ❌ (Databricks only) | ❌ |
| Snowflake Dynamic Tables | ✅ | ❌ | ✅ | ❌ (Snowflake only) | ❌ |
| Floe | ✅ | ✅ | ✅ | ✅ | ✅ |
No existing open-source project combines all five. Floe fills this gap.
Floe is a coordination layer for declarative pipelines on Apache Iceberg. You declare your transformations and freshness targets; Floe handles dependency resolution, incremental refresh, lineage, and data quality routing.
Floe is not:
- A query engine. Floe does not serve interactive SQL queries. Use Trino, DuckDB, or Spark SQL to read Floe-managed Iceberg tables.
- A data catalog UI. Use Apache Polaris, Project Nessie, or your cloud provider's catalog.
- A replacement for Kafka/Flink. Floe is a coordination and declaration layer that orchestrates Flink jobs — it does not replace the underlying compute.
Floe is a direct implementation of the patterns described in Snowflake's research papers: "Streaming Democratized" and "What's the Difference?"
Snowflake never modifies a file in place. Every write produces a new table version defined by the set of micro-partitions active at that timestamp. Apache Iceberg replicates this exactly:
- Every commit creates a new Snapshot ID
- Snapshots are immutable; old snapshots enable time travel
- Incremental scans read only the files added between two snapshot IDs:
read.option("start-snapshot-id", X)
This makes Iceberg the correct storage foundation for Floe — not Delta Lake (which has a different metadata model) and not a proprietary format.
Snowflake treats streaming as a mathematical problem, not a data movement problem. Instead of re-running the full query Q on every refresh, the Snowflake compiler rewrites it into its derivative ΔQ — a query that operates only on the rows that changed since the last checkpoint.
Algebraic rules apply:
Δ(A JOIN B)→ scan only new rows in A joined against full B (or vice versa)Δ(GROUP BY)→ merge new aggregates into existing aggregatesΔ(FILTER)→ apply filter to new rows only
Compute cost scales with change volume, not total data volume.
Floe implements this via Flink's incremental processing model, generating merge plans that operate on Iceberg snapshot diffs rather than full table scans.
Snowflake's Dynamic Tables abstract away scheduling entirely:
- User declares the desired output (SQL query) and desired freshness (
TARGET_LAG) - The system builds a DAG of dependencies
- The scheduler monitors upstream freshness and triggers refreshes only when needed
Floe's Dynamic Iceberg Tables (DITs) are the open-source equivalent, extended with explicit data quality preconditions and quarantine routing.
The central abstraction. A DIT is an Iceberg table defined by a query over other Iceberg tables or streaming sources. It has:
- A query (the desired output)
- A lag target (desired freshness)
- A refresh mode (how to compute the update)
- Optional preconditions (data readiness checks before processing)
- Optional DQ routing (what to do with rows that fail quality checks)
CREATE DYNAMIC TABLE silver.orders
LAG = '5 minutes'
REFRESH_MODE = INCREMENTAL
PRECONDITIONS (
min_rows(bronze.raw_orders, count=1)
)
ON_DQ_FAIL ROUTE TO quarantine.orders_failed
AS
SELECT
o.order_id,
o.amount,
c.region,
c.segment
FROM bronze.raw_orders o
JOIN bronze.customers c ON o.customer_id = c.id;| Mode | Description | When to Use |
|---|---|---|
INCREMENTAL |
Reads only the Iceberg snapshot diff; merges into output table | Default; works for most joins, filters, simple aggregations |
FULL |
Full recompute from scratch | Complex aggregations where incremental rewrite is not possible |
TRIGGERED |
Fires immediately on any upstream commit event | Sub-minute latency requirements |
SCHEDULED |
Time-based (cron expression), ignores commit events | External data sources without Iceberg commits |
Floe automatically degrades to FULL if the DAG Planner determines INCREMENTAL is not safe for the given query.
A single LAG value is fine for tables that are always rewritten as a whole, but most production pipelines have a more nuanced freshness contract:
"Every partition for the last N days must have been recomputed within X of right now."
This pattern shows up everywhere — financial close cycles, regulatory reporting windows, late-arriving event corrections, slowly-stabilizing aggregates. Floe expresses it directly:
CREATE DYNAMIC TABLE gold.delivery_quality_daily
PARTITION BY (delivery_date)
PARTITION_WINDOW = '14 days' -- how far back to keep fresh
PARTITION_FRESHNESS = '2 days' -- max staleness within the window
REFRESH_MODE = 'INCREMENTAL'
AS
SELECT
d.delivery_date, d.region, d.station_id,
COUNT(*) AS total_deliveries,
COUNT(f.defect_id) AS defect_count,
ROUND(100.0 * COUNT(f.defect_id) / COUNT(*), 2) AS defect_rate_pct
FROM silver.deliveries_enriched d
LEFT JOIN bronze.delivery_defects f USING (delivery_id)
GROUP BY d.delivery_date, d.region, d.station_id;Behavior:
- Window — at refresh time, only rows where
delivery_date >= today - 14 daysare computed and written. Partitions outside the window are never touched by Floe and remain at whatever value they had at last refresh. - Freshness — a refresh fires when EITHER upstream changes OR
now() - last_refresh > 2 days. The freshness clock guarantees the SLA is met even if upstream is quiet. - Atomic partition replacement — in-window partitions are overwritten in a single Iceberg commit using a
partition_col >= cutofffilter. Older partitions are not part of the transaction. - Late arrivals are free — when a defect is filed today against a 7-day-old delivery, the next refresh of the 14-day window picks it up. No backfill script, no special-case handling.
The output table is created with an Iceberg identity partition spec on the declared columns, so partition pruning works for any downstream reader (Trino, Spark, DuckDB).
A precondition is a data readiness assertion that must pass before a refresh executes. If a precondition fails, the refresh exits cleanly (no error, no partial write) and retries on the next event.
Built-in precondition functions:
| Function | Description |
|---|---|
min_rows(table, count=N) |
Table must have at least N rows |
completeness(table, partition_col, distinct_vals=N) |
Partition column must have N distinct values (e.g., 7 days in a week) |
snapshot_age(table, max_age='1 hour') |
Upstream snapshot must be fresher than threshold |
dependency_ready(table_a, table_b) |
Both upstream tables must share the same processing epoch |
| Custom Python function | Any callable returning bool |
Example — wait for a full week before computing weekly scorecard:
CREATE DYNAMIC TABLE gold.weekly_scorecard
LAG = '1 day'
REFRESH_MODE = INCREMENTAL
PRECONDITIONS (
completeness(silver.events, partition_col='day', distinct_vals=7, filter="week = current_week()")
)
AS
SELECT week, da_id, sum(score) as total_score
FROM silver.events
GROUP BY week, da_id;Rather than crashing on bad data or silently dropping rows, Floe routes DQ-failed rows to a companion quarantine table. The production table only receives rows that pass all DQ checks.
CREATE DYNAMIC TABLE silver.payments
LAG = '10 minutes'
ON_DQ_FAIL ROUTE TO quarantine.payments_failed
DQ_RULES (
not_null(payment_id),
not_null(amount),
range_check(amount, min=0.01, max=100000),
no_duplicates(payment_id)
)
AS SELECT * FROM bronze.raw_payments;The quarantine table is a regular Iceberg table with an additional _floe_dq_failure_reason column. An alert fires if the quarantine write rate exceeds a configured threshold.
Every DIT write automatically injects lineage metadata columns. These are non-optional — they are how Floe guarantees auditability for payment-impacting and compliance-sensitive data.
| Column | Description |
|---|---|
_floe_input_snapshot_id |
Snapshot ID of the upstream table read during this refresh |
_floe_job_run_id |
Unique ID for this specific refresh execution |
_floe_processed_at |
UTC timestamp of when this row was written |
_floe_refresh_mode |
INCREMENTAL or FULL — which mode was used |
This enables exact audit queries: "Show me the exact upstream state that produced payment record X."
SELECT _floe_input_snapshot_id
FROM silver.payments
WHERE payment_id = 'pay_abc123';
-- Returns: 8029341234
-- Now time-travel to that snapshot:
SELECT * FROM bronze.raw_payments
FOR VERSION AS OF 8029341234
WHERE payment_id = 'pay_abc123';Floe parses all DIT definitions in a project and builds a directed acyclic graph of table dependencies. This DAG drives:
- Cascade scheduling — when Table A commits, Floe knows which downstream DITs (B, C, D...) need to be evaluated
- Parallel refresh — independent branches of the DAG refresh concurrently
- Cycle detection — circular dependencies are rejected at
floe applytime - Schema change propagation — upstream schema changes are detected before a refresh runs, not during
v0.1 implementation note: the architecture below is the target design. v0.1 ships a single-process implementation: polling replaces the push-based Event Bus, and DuckDB replaces Flink as the compute layer. The DAG Planner, Refresh Executor, and Catalog Manager all run in the same Python process. v0.2 swaps the polling loop for catalog commit listeners and a pluggable Event Bus; Flink integration follows.
flowchart TD
subgraph ControlPlane["Floe Control Plane"]
DAG["DAG Planner\n─────────────\n• Parse DITs\n• Build graph\n• Plan queries\n• Cycle detect"]
SCHED["Refresh Scheduler\n─────────────\n• Event listener\n• LAG evaluation\n• Job dispatch\n• Preconditions"]
CAT["Catalog Manager\n─────────────\n• Table registry\n• Schema mgmt\n• Snapshot index\n• Lineage store"]
end
subgraph EventBus["Event Bus (pluggable)"]
EB["Local: NATS\nAWS: EventBridge\nAzure: Event Hubs\nGCP: Pub/Sub\nOSS: Kafka / Redpanda"]
end
subgraph Compute["Compute Layer"]
W1["Flink Worker\nincremental merge"]
W2["Flink Worker\nfull refresh"]
W3["Flink Worker\nCDC ingest"]
end
subgraph Storage["Apache Iceberg"]
META["Metadata\n─────────────\nREST Catalog\nNessie / Polaris"]
OBJ["Object Storage (pluggable)\n─────────────\nLocal / MinIO · AWS S3\nAzure ADLS Gen2 · GCP GCS"]
end
DAG --> EB
SCHED --> EB
CAT --> EB
EB --> W1
EB --> W2
EB --> W3
W1 --> META
W2 --> META
W3 --> META
META --> OBJ
1. Source write → Iceberg table (CDC via Flink, direct write, or streaming ingest)
2. Iceberg commit → emits CommitEvent { table, snapshot_id, timestamp }
3. Event Bus delivers CommitEvent to Refresh Scheduler
4. Refresh Scheduler checks: which downstream DITs depend on this table?
5. For each dependent DIT:
a. Evaluate LAG: is a refresh due?
b. Run PRECONDITIONS: does the data meet readiness criteria?
c. If both pass → submit Flink job with incremental merge plan
6. Flink job reads Iceberg snapshot diff, applies transformation, writes new snapshot
7. New Iceberg commit on downstream table → emit CommitEvent → repeat from step 3
The DAG Planner is responsible for parsing DIT definitions and building the dependency graph.
Inputs: DIT SQL/Python definitions, existing Iceberg catalog state Outputs: Execution plan per DIT (query rewrite, refresh mode, incremental merge strategy)
Key responsibilities:
- SQL parsing — extract table references from
AS SELECT ...to build edges - Incremental eligibility — determine if a query can be run incrementally or requires full recompute. Rules:
- Simple projections and filters:
INCREMENTAL - Joins where one side is append-only:
INCREMENTAL - Aggregations with
GROUP BY:INCREMENTAL(using partial aggregation merge) - Window functions with unbounded windows:
FULL - Self-joins:
FULL
- Simple projections and filters:
- Query rewrite — generate the
ΔQ(delta query) for incremental mode, using Iceberg's snapshot diff API - Schema validation — detect upstream schema changes before runtime
The Refresh Scheduler is the heart of the event-driven loop. It is a lightweight, stateless process — its only persistent state is the last-processed snapshot ID per DIT, stored as Iceberg table properties.
Responsibilities:
- Subscribe to the Event Bus for
CommitEventmessages - Maintain the per-DIT refresh timer (LAG evaluation)
- Evaluate preconditions before dispatching a Flink job
- Prevent duplicate dispatches (idempotency guard using snapshot ID)
- Handle backpressure: if a DIT's Flink job is still running, queue (do not drop) the next trigger
Idempotency: Before dispatching a job, the Scheduler checks: "Has this specific upstream snapshot already been processed for this DIT?" If yes, skip. The checkpoint is written to the output Iceberg table's properties — no external state store required.
Wraps the Iceberg catalog to provide Floe-specific operations:
- Table registration and discovery
- Schema evolution coordination (schema changes require DAG re-validation)
- Snapshot indexing for incremental scan planning
- Lineage metadata storage (DIT → upstream snapshot mapping)
Supported catalogs:
- Iceberg REST Catalog (default)
- Project Nessie (Git-like branching for Iceberg tables)
- Apache Polaris (cloud-managed)
- AWS Glue Data Catalog
- Hive Metastore
v0.1 implementation: an in-process polling loop (
floe watch) replaces the Event Bus. It polls every external upstream table'scurrent_snapshot_idat a configurable interval and dispatches refreshes directly. The push-based design below is v0.2+.
The Event Bus is the signal backbone. It carries CommitEvent messages from Iceberg writes to the Refresh Scheduler, and CDC events from source systems to ingestion workers.
| Deployment | Event Bus | Notes |
|---|---|---|
| Local | Embedded NATS JetStream | Zero external dependencies |
| AWS | Amazon EventBridge | Native Iceberg S3 event integration |
| Azure | Azure Event Hubs | Kafka-compatible API |
| GCP | Google Cloud Pub/Sub | — |
| Self-managed | Apache Kafka / Redpanda | Works everywhere |
The Event Bus interface is a thin abstraction (publish(event), subscribe(topic, handler)). Swapping implementations requires only a config change — no code changes to the Scheduler or workers.
v0.1 implementation: DuckDB is the compute engine in v0.1, used for both
INCREMENTALandFULLrefresh modes against the local SQLite Iceberg catalog. Flink integration is v0.2, where it becomes the streaming-first engine for low-latencyTRIGGEREDmode and high-throughput batch workloads. The trade-offs and deployment model below describe the v0.2+ target.
Apache Flink is the primary compute engine. It was chosen over Spark/Glue for two reasons:
- Native streaming — Flink is a streaming-first engine. Spark is batch-first with streaming bolted on. For low-latency
TRIGGEREDrefresh modes, Flink's architecture is the correct fit. - Iceberg connector maturity — Flink's Iceberg connector supports full read/write including streaming writes with exactly-once semantics.
Flink jobs in Floe are short-lived and stateless:
- Launched per refresh (not long-running streaming jobs, unless in
TRIGGEREDmode) - State lives in Iceberg, not in Flink's state backend
- Workers are disposable — any failure simply retries from the last snapshot
For TRIGGERED mode (sub-minute latency), Floe runs a persistent Flink streaming job that continuously reads the Iceberg change log.
Flink deployment options:
- Standalone (local mode, Docker Compose)
- Kubernetes (via Flink Kubernetes Operator)
- AWS: Amazon EMR / Kinesis Data Analytics
- Azure: HDInsight / Azure Stream Analytics (Flink)
- GCP: Dataproc (Flink)
Floe writes exclusively to Apache Iceberg tables. No proprietary format, no vendor lock-in.
Why Iceberg (not Delta Lake, not Hudi, not Paimon):
| Feature | Iceberg | Delta Lake | Hudi | Paimon |
|---|---|---|---|---|
| Open spec (no single vendor) | ✅ | Partial (Databricks) | ✅ | ✅ |
| Snapshot-based versioning | ✅ | ✅ | Partial | ✅ |
| Incremental scan API | ✅ (start-snapshot-id) |
Limited | ✅ | ✅ |
| REST Catalog standard | ✅ | ❌ | ❌ | ❌ |
| Multi-engine read support | ✅ | ✅ | Partial | Partial |
| Schema evolution (safe) | ✅ | Partial | Partial | ✅ |
Iceberg's start-snapshot-id / end-snapshot-id scan option is what makes Floe's incremental refresh mathematically exact. It is the direct equivalent of Snowflake's micro-partition diff.
my-pipeline/
├── floe.yaml # project config (catalog, compute, storage)
├── sources/
│ └── raw_orders.sql # source table definitions
├── transformations/
│ ├── silver_orders.sql # DIT definitions
│ └── gold_metrics.sql
├── dq/
│ └── rules.py # custom DQ rule functions
└── tests/
└── test_silver_orders.py
project: my-analytics-pipeline
version: "1.0"
catalog:
type: rest # rest | nessie | polaris | glue | hive
uri: http://localhost:8181
compute:
engine: flink
mode: local # local | kubernetes | emr | dataproc | hdinsight
parallelism: 4
storage:
type: s3compatible # s3compatible | s3 | adls | gcs | local
warehouse: s3://my-bucket/warehouse
endpoint: http://localhost:9000 # MinIO for local mode
event_bus:
type: nats # nats | eventbridge | eventhubs | pubsub | kafka
uri: nats://localhost:4222
defaults:
lag: "5 minutes"
refresh_mode: incremental
on_dq_fail: quarantine
lineage: true # inject _floe_* columns (default: true, non-optional)Full DIT definition syntax:
CREATE DYNAMIC TABLE <catalog>.<database>.<table>
[ PARTITION BY (<col>[, <col>]*) ]
[ LAG = '<duration>' ]
[ PARTITION_WINDOW = '<duration>' ]
[ PARTITION_FRESHNESS = '<duration>' ]
[ REFRESH_MODE = INCREMENTAL | FULL | TRIGGERED | SCHEDULED '<cron>' ]
[ PRECONDITIONS (
<precondition_function>(...) [, ...]
) ]
[ DQ_RULES (
<dq_rule_function>(...) [, ...]
) ]
[ ON_DQ_FAIL ROUTE TO <quarantine_table> | FAIL | DROP ]
[ TAGS ( key = 'value' [, ...] ) ]
AS
<select_statement>;Partition-aware options:
PARTITION BY (col)— output table is created with Iceberg identity partitioning on these columnsPARTITION_WINDOW— partitions older thantoday - <duration>are excluded from refresh; previous values are preservedPARTITION_FRESHNESS— maximum staleness for in-window partitions; a refresh fires if upstream changes OR if this deadline lapses
from floe import Pipeline, DynamicTable, preconditions, dq
pipeline = Pipeline.from_config("floe.yaml")
silver_orders = DynamicTable(
name="silver.orders",
query="""
SELECT o.order_id, o.amount, c.region
FROM bronze.raw_orders o
JOIN bronze.customers c ON o.customer_id = c.id
""",
lag="5 minutes",
refresh_mode="incremental",
preconditions=[
preconditions.min_rows("bronze.raw_orders", count=1),
],
dq_rules=[
dq.not_null("order_id"),
dq.range_check("amount", min=0.01),
],
on_dq_fail="quarantine",
)
pipeline.register(silver_orders)
pipeline.apply()Custom precondition:
from floe.preconditions import precondition
@precondition
def weekly_completeness(spark, table: str, week: str) -> bool:
count = spark.read.format("iceberg").load(table) \
.filter(f"week = '{week}'") \
.select("day").distinct().count()
return count >= 7floe init my-pipeline # scaffold project
floe plan # validate DITs, show DAG
floe apply # deploy DIT definitions
floe status # DAG status and refresh health
floe dag # visualize dependency graph
floe refresh silver.orders # manual trigger
floe deploy --shadow silver.orders # deploy in shadow mode
floe promote silver.orders # promote shadow to production
floe diff # show pending changes
floe reset-checkpoint silver.orders # trigger full backfill
floe reset-checkpoint silver.orders --from 2026-01-01
floe snapshot-history silver.orders --last 10
floe logs --followZero external dependencies. Everything runs in Docker Compose.
# docker-compose.yaml (generated by `floe init`)
services:
floe-control: # Control plane (DAG Planner + Scheduler + Catalog Manager)
flink-jobmanager: # Flink cluster
flink-taskmanager:
minio: # S3-compatible object storage
nats: # Event bus
iceberg-rest: # Iceberg REST catalogfloe up # start local stack
floe apply # deploy DITs
floe down # tear downHelm chart for production deployment. Bring your own object storage and event bus.
helm repo add floe https://charts.floe.dev
helm install floe floe/floe -f floe-values.yamlAWS:
storage: { type: s3, bucket: my-bucket }
catalog: { type: glue, region: us-east-1 }
event_bus: { type: eventbridge }
compute: { engine: flink, mode: emr }Azure:
storage: { type: adls, account: myaccount, container: warehouse }
catalog: { type: rest, uri: https://my-polaris.azurewebsites.net }
event_bus: { type: eventhubs, namespace: my-ns }
compute: { engine: flink, mode: hdinsight }GCP:
storage: { type: gcs, bucket: my-bucket }
catalog: { type: rest, uri: https://my-catalog.run.app }
event_bus: { type: pubsub, project: my-project }
compute: { engine: flink, mode: dataproc }Shadow mode enables risk-free migration from a legacy pipeline to a Floe DIT. The new pipeline runs in parallel; production traffic is only switched after validation.
# Deploy in shadow mode — writes to silver.orders_shadow, runs daily reconciliation
floe deploy --shadow silver.orders
# After N days of zero variance:
floe promote silver.orders # atomically swaps read pointer; legacy pipeline stopsReconciliation reports: row count diff, value diff (configurable columns), latency diff.
floe reset-checkpoint silver.orders --from-beginning
floe reset-checkpoint silver.orders --from 2026-01-01
floe reset-checkpoint silver.orders --from-snapshot 8029341234Backfill uses the exact same code path as incremental refresh — there is no separate backfill mode or job.
Iceberg handles schema evolution natively (add column, rename column, widen type, reorder). Floe adds a guardrail:
floe diffdetects upstream schema changes before they propagate- Breaking changes (drop column, type narrowing) block downstream DITs and alert
- Non-breaking changes (add column, widen type) auto-propagate with a catalog version bump
Floe emits structured metrics to a pluggable observability backend:
| Metric | Description |
|---|---|
floe.refresh.latency_ms |
Time from CommitEvent to downstream snapshot commit |
floe.refresh.lag_seconds |
Actual lag vs. configured LAG target |
floe.dq.quarantine_rate |
Fraction of rows routed to quarantine |
floe.precondition.skips |
Number of times a precondition blocked a refresh |
floe.dag.depth |
Number of hops in the longest DAG path |
Supported backends: Prometheus, AWS CloudWatch, Azure Monitor, GCP Cloud Monitoring, OpenTelemetry.
| Dimension | Snowflake Dynamic Tables | Floe |
|---|---|---|
| Table format | Proprietary micro-partitions | Apache Iceberg (open) |
| Cloud | Snowflake only | Any / local |
| Preconditions | TARGET_LAG only |
Full precondition DSL |
| DQ routing | Not built-in | First-class quarantine pattern |
| Lineage | Internal (not exported) | Explicit _floe_* columns in Iceberg |
| Cost model | Snowflake credits | Open source; pay for your own compute |
Paimon is the closest streaming table format with event-driven capabilities, but it uses its own storage format (LSM-tree based). Tables stored in Paimon cannot be read directly by Trino, DuckDB, or standard Iceberg readers. Floe is pure Iceberg — any Iceberg-compatible reader works.
DLT provides a very similar declarative pipeline experience but is Databricks-exclusive. Moving off Databricks means rewriting all pipelines. Floe is vendor-neutral by design.
RisingWave is a streaming database that can sink to Iceberg. It handles Iceberg as an output, not as the primary storage layer. Floe treats Iceberg as the system of record — both the input and the output — which enables full time-travel and snapshot lineage across the entire pipeline.
Building the same system yourself requires writing Flink jobs per transformation, building a custom orchestration layer, implementing idempotency and checkpoint management, writing DQ routing logic per job, and building a DAG planner for multi-hop cascades. Floe replaces all of that boilerplate so engineers write only the transformation logic.
- Core DIT engine: DAG Planner, Refresh Executor, Catalog Manager
- DuckDB reference compute backend (Flink moves to v0.2)
- Local deployment against SQLite Iceberg catalog
-
INCREMENTALandFULLrefresh modes - Partition-aware refresh (
PARTITION BY+PARTITION_WINDOW+PARTITION_FRESHNESS) - Automatic
_floe_*lineage column injection - CLI:
init,apply,plan,status,refresh,dag,watch - Polling-based event detection (
floe watch) — push-based hooks move to v0.2
- Apache Flink compute backend (streaming-first engine, replaces DuckDB for production workloads)
- Push-based event detection: Iceberg commit listeners + pluggable Event Bus (NATS / EventBridge / Event Hubs / Pub/Sub)
- Docker Compose local deployment (Flink + MinIO + NATS + Iceberg REST)
-
PRECONDITIONSDSL (built-in functions + custom Python) -
DQ_RULESDSL - Quarantine routing (
ON_DQ_FAIL ROUTE TO) - DQ alerting (threshold-based quarantine rate)
- Kubernetes Helm chart
- AWS config profile (EventBridge + S3 + Glue catalog)
- Azure config profile (Event Hubs + ADLS + Polaris)
- GCP config profile (Pub/Sub + GCS)
- Shadow mode (
floe deploy --shadow,floe promote) - Backfill commands (
floe reset-checkpoint) - Schema evolution guardrails
- Prometheus / OpenTelemetry metrics
-
TRIGGEREDrefresh mode (persistent Flink streaming job) -
floe dagASCII DAG visualization - Python SDK (full parity with SQL DDL)
- VS Code extension (DIT syntax highlighting, DAG preview)
- Instant local sandbox: spin up sample-data DITs in seconds (no CDK/CloudFormation), with the same DDL that runs in cloud
- AI-native authoring: natural-language → DIT DDL via MCP server / SDK, with DAG-aware validation against the live catalog
- Multi-catalog federation (query across catalogs in one DAG)
- Branch-based development (via Nessie catalog branching)
- Flink job failure recovery and retry policies
- Operational runbook and chaos testing suite
| Question | Notes |
|---|---|
| Core language: Python or JVM? | Python CLI for UX; Flink jobs are JVM. Control plane could be Python (easier contribution) or Kotlin (type-safe, JVM interop). Decision needed before v0.1. |
| Catalog default for local mode | REST catalog (via iceberg-rest-image) is simplest. Nessie adds branching but heavier. Start with REST, add Nessie in v0.4. |
| Incremental aggregation correctness | Partial aggregation merge (e.g., SUM, COUNT) is safe. MEDIAN, DISTINCT COUNT are not incrementally composable. Need a clear error message + automatic FULL fallback. |
| Multi-hop cascade latency | Deep DAGs (A → B → C → D) accumulate per-hop refresh latency. Need a maximum-cascade-depth config and a latency budget planner. |
| Flink cold-start for short-lived jobs | Flink job startup (~30s for standalone, ~2-4min for cloud-managed) limits how low LAG can go for non-TRIGGERED modes. Document this ceiling clearly. |
| Licensing | Apache 2.0 is the obvious choice for ecosystem compatibility (Iceberg, Flink are both Apache 2.0). |
- Zhu, R. et al. (2023). Streaming Democratized: How Snowflake Implements Dynamic Tables. ACM SIGMOD 2023.
- Grover, A. et al. (2025). What's the Difference? Incremental Processing with Change Queries in Snowflake. ACM SIGMOD 2025.
- Apache Iceberg — open table format for huge analytic datasets
- Apache Flink — stateful stream processing engine
- Apache Flink × Iceberg connector — Flink read/write support for Iceberg tables
- Apache Paimon — streaming lakehouse table format
- Project Nessie — Git-like version control for data lakes
- Apache Polaris — open-source Iceberg catalog
- NATS — lightweight cloud-native messaging (local event bus)
- MinIO — S3-compatible object storage (local deployment)
- RisingWave — streaming database with Iceberg integration
- Delta Live Tables — declarative pipelines on Databricks
- Snowflake Dynamic Tables — the primary inspiration for this project
Apache 2.0
Floe is named for a sheet of floating ice — ice in motion. Small, flat, and fast.
