Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 316 additions & 24 deletions dlt/common/libs/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Dict, Any, List, Optional, Union
from typing import Dict, Any, List, Optional, Tuple, Union
from pathlib import Path
import warnings
import time
Expand Down Expand Up @@ -72,59 +72,351 @@ def ensure_iceberg_compatible_arrow_data(data: pa.Table) -> pa.Table:
return data.cast(schema)


_UPLOAD_CHUNK_SIZE = 8 * 1024 * 1024


def _upload_parquet_to_remote(
arrow_table: pa.Table,
data_location: str,
table_io: Any,
prefix: str = "batch",
) -> str:
"""Write an Arrow table to a temp Parquet file and upload it to remote storage."""
import uuid
import tempfile

import pyarrow.parquet as pq

with tempfile.NamedTemporaryFile(suffix=".parquet", delete=False) as tmp:
temp_path = tmp.name
pq.write_table(
arrow_table, temp_path, compression="snappy", store_decimal_as_integer=True
)

remote_path = f"{data_location}/{prefix}-{uuid.uuid4()}.parquet"
output = table_io.new_output(remote_path)
with open(temp_path, "rb") as fh, output.create() as remote_file:
while True:
chunk = fh.read(_UPLOAD_CHUNK_SIZE)
if not chunk:
break
remote_file.write(chunk)

os.remove(temp_path)
return remote_path


def write_iceberg_table(
table: IcebergTable,
data: pa.Table,
data: Union[pa.Table, pa.RecordBatchReader],
write_disposition: TWriteDisposition,
gc_collect_interval: int = 10,
) -> None:
start_ts = time.monotonic()

if isinstance(data, pa.RecordBatchReader):
_write_iceberg_table_streamed(
table, data, write_disposition, gc_collect_interval=gc_collect_interval
)
else:
if write_disposition == "append":
table.append(ensure_iceberg_compatible_arrow_data(data))
elif write_disposition == "replace":
table.overwrite(ensure_iceberg_compatible_arrow_data(data))
logger.debug(
f"pyiceberg: {write_disposition} arrow with {data.num_rows} rows to table"
f" {table.name()} at location {table.location()} took"
f" {(time.monotonic() - start_ts)} seconds."
)


def _write_iceberg_table_streamed(
table: IcebergTable,
reader: pa.RecordBatchReader,
write_disposition: TWriteDisposition,
gc_collect_interval: int = 10,
) -> None:
"""Streams Arrow batches as individual parquet files via Iceberg's IO,
then does ONE atomic commit.

Memory stays constant: only one batch + one parquet file in memory at a
time. Only lightweight file-path metadata is accumulated.

For unpartitioned tables, uses ``table.add_files`` for the commit so that
Iceberg field-ids and ``schema.name-mapping.default`` are handled correctly
by pyiceberg.

For partitioned tables, uses ``txn.append`` per batch so that pyiceberg
handles partition-aware file writing (``add_files`` requires each file to
contain data for exactly one partition value).
"""
import gc

start_ts = time.monotonic()
if write_disposition == "append":
table.append(ensure_iceberg_compatible_arrow_data(data))
elif write_disposition == "replace":
table.overwrite(ensure_iceberg_compatible_arrow_data(data))
is_partitioned = table.spec() != UNPARTITIONED_PARTITION_SPEC

if is_partitioned:
total_rows, batch_count = _write_streamed_partitioned(
table, reader, write_disposition, gc_collect_interval
)
data_files_desc = "via pyiceberg writer"
else:
total_rows, batch_count, n_files = _write_streamed_add_files(
table, reader, write_disposition, gc_collect_interval
)
data_files_desc = f"{n_files} data files"

if gc_collect_interval:
gc.collect()
logger.debug(
f"pyiceberg: {write_disposition} arrow with {data.num_rows} rows to table {table.name()} at"
f" location {table.location()} took {(time.monotonic() - start_ts)} seconds."
f"pyiceberg: streamed {write_disposition} with {total_rows} rows in {batch_count}"
f" batches ({data_files_desc}) to table {table.name()} at location"
f" {table.location()} took {(time.monotonic() - start_ts)} seconds."
)


def _write_streamed_partitioned(
table: IcebergTable,
reader: pa.RecordBatchReader,
write_disposition: TWriteDisposition,
gc_collect_interval: int,
) -> Tuple[int, int]:
"""Write streamed batches to a partitioned Iceberg table using pyiceberg's
native writer (``txn.append``) which handles partition-aware file layout."""
import gc

from pyiceberg.expressions import AlwaysTrue

total_rows = 0
batch_count = 0

with table.transaction() as txn:
if write_disposition == "replace" and table.current_snapshot():
txn.delete(delete_filter=AlwaysTrue())

for batch in reader:
batch_count += 1
batch_table = ensure_iceberg_compatible_arrow_data(pa.Table.from_batches([batch]))
txn.append(batch_table)
total_rows += batch_table.num_rows
del batch_table

if gc_collect_interval and batch_count % gc_collect_interval == 0:
gc.collect()
if batch_count % 10 == 0:
logger.debug(
f"pyiceberg: streamed {batch_count} batches, {total_rows} rows so far"
)

return total_rows, batch_count


def _write_streamed_add_files(
table: IcebergTable,
reader: pa.RecordBatchReader,
write_disposition: TWriteDisposition,
gc_collect_interval: int,
) -> Tuple[int, int, int]:
"""Write streamed batches to an unpartitioned Iceberg table by manually
writing parquet files and registering them via ``add_files``."""
import gc

data_location = f"{table.location()}/data"
remote_paths: List[str] = []
total_rows = 0
batch_count = 0

for batch in reader:
batch_count += 1
batch_table = ensure_iceberg_compatible_arrow_data(pa.Table.from_batches([batch]))
remote_paths.append(
_upload_parquet_to_remote(batch_table, data_location, table.io, prefix="batch")
)
total_rows += batch_table.num_rows
del batch_table

if gc_collect_interval and batch_count % gc_collect_interval == 0:
gc.collect()
if batch_count % 10 == 0:
logger.debug(
f"pyiceberg: streamed {batch_count} batches, {total_rows} rows so far"
)

if write_disposition == "replace":
from pyiceberg.expressions import AlwaysTrue

with table.transaction() as txn:
if table.current_snapshot():
txn.delete(delete_filter=AlwaysTrue())
txn.add_files(remote_paths, check_duplicate_files=False)
else:
table.add_files(remote_paths, check_duplicate_files=False)

return total_rows, batch_count, len(remote_paths)


def merge_iceberg_table(
table: IcebergTable,
data: pa.Table,
data: Union[pa.Table, pa.RecordBatchReader],
schema: TTableSchema,
load_table_name: str,
gc_collect_interval: int = 10,
) -> None:
"""Merges in-memory Arrow data into on-disk Iceberg table."""
"""Merges Arrow data into on-disk Iceberg table.

Accepts pa.Table or streaming RecordBatchReader.
"""
strategy = schema["x-merge-strategy"] # type: ignore[typeddict-item]
if strategy in ("upsert", "insert-only"):
# evolve schema
arrow_schema = ensure_iceberg_compatible_arrow_schema(data.schema)

with table.update_schema() as update:
update.union_by_name(ensure_iceberg_compatible_arrow_schema(data.schema))
update.union_by_name(arrow_schema)

if "parent" in schema:
join_cols = [get_first_column_name_with_prop(schema, "unique")]
else:
join_cols = get_columns_names_with_prop(schema, "primary_key")

# TODO: replace the batching method with transaction with pyiceberg's release after 0.9.1
for rb in data.to_batches(max_chunksize=1_000):
batch_tbl = pa.Table.from_batches([rb])
batch_tbl = ensure_iceberg_compatible_arrow_data(batch_tbl)

table.upsert(
df=batch_tbl,
join_cols=join_cols,
when_matched_update_all=strategy == "upsert",
when_not_matched_insert_all=True,
case_sensitive=True,
)
_upsert_iceberg_table(
table, data, join_cols, strategy, gc_collect_interval=gc_collect_interval
)
else:
raise ValueError(
f'Merge strategy "{strategy}" is not supported for Iceberg tables. '
f'Table: "{load_table_name}".'
)


def _process_upsert_batch(
batch_tbl: pa.Table,
table: IcebergTable,
txn: Any,
join_cols: List[str],
strategy: str,
has_existing_data: bool,
) -> Tuple[pa.Table, int]:
"""Classify one batch into inserts/updates and apply updates via *txn*.

Returns ``(rows_to_insert, n_updated)``.
"""
if not has_existing_data:
return batch_tbl, 0

from pyiceberg.table import upsert_util
from pyiceberg.io.pyarrow import expression_to_pyarrow
from pyiceberg.expressions.visitors import bind

matched_predicate = upsert_util.create_match_filter(batch_tbl, join_cols)
matched_existing = table.scan(
row_filter=matched_predicate, case_sensitive=True
).to_arrow()

n_updated = 0
if strategy == "upsert":
rows_to_update = upsert_util.get_rows_to_update(
batch_tbl, matched_existing, join_cols
)
if len(rows_to_update) > 0:
overwrite_filter = upsert_util.create_match_filter(rows_to_update, join_cols)
txn.overwrite(rows_to_update, overwrite_filter=overwrite_filter)
n_updated = len(rows_to_update)

if len(matched_existing) > 0:
expr_match = upsert_util.create_match_filter(matched_existing, join_cols)
expr_bound = bind(table.schema(), expr_match, case_sensitive=True)
expr_arrow = expression_to_pyarrow(expr_bound)
rows_to_insert = batch_tbl.filter(~expr_arrow)
else:
rows_to_insert = batch_tbl

del matched_existing
return rows_to_insert, n_updated


def _upsert_iceberg_table(
table: IcebergTable,
data: Union[pa.Table, pa.RecordBatchReader],
join_cols: List[str],
strategy: str,
gc_collect_interval: int = 10,
) -> None:
"""Upserts Arrow data into an Iceberg table with minimal snapshots.

For unpartitioned tables, insert rows are written manually and registered
via ``txn.add_files`` in the same transaction as updates.

For partitioned tables, inserts are handled in a separate transaction
using ``txn.append`` because pyiceberg 0.9.x crashes (SIGILL) when
mixing writer-based operations (append/overwrite) with ``add_files``
or when issuing multiple writer operations in the same transaction.
"""
import gc

start_ts = time.monotonic()
has_existing_data = table.current_snapshot() is not None
is_partitioned = table.spec() != UNPARTITIONED_PARTITION_SPEC
data_location = f"{table.location()}/data"
insert_paths: List[str] = []
partitioned_inserts: List[pa.Table] = []
total_updated = 0
total_inserted = 0
batch_count = 0

batches = (
data
if isinstance(data, pa.RecordBatchReader)
else data.to_batches(max_chunksize=1_000)
)

with table.transaction() as txn:
for batch in batches:
batch_count += 1
batch_tbl = ensure_iceberg_compatible_arrow_data(
pa.Table.from_batches([batch])
)

rows_to_insert, n_updated = _process_upsert_batch(
batch_tbl, table, txn, join_cols, strategy, has_existing_data
)
total_updated += n_updated

if len(rows_to_insert) > 0:
total_inserted += len(rows_to_insert)
if is_partitioned:
partitioned_inserts.append(rows_to_insert)
else:
insert_paths.append(
_upload_parquet_to_remote(
rows_to_insert, data_location, table.io, prefix="upsert"
)
)

del batch_tbl
if gc_collect_interval and batch_count % gc_collect_interval == 0:
gc.collect()
if batch_count % 10 == 0:
logger.debug(
f"pyiceberg: upsert streamed {batch_count} batches,"
f" {total_inserted} inserts, {total_updated} updates so far"
)

if insert_paths:
txn.add_files(insert_paths, check_duplicate_files=False)

if partitioned_inserts:
with table.transaction() as insert_txn:
for insert_tbl in partitioned_inserts:
insert_txn.append(insert_tbl)
del partitioned_inserts

logger.debug(
f"pyiceberg: upsert {total_updated} updated, {total_inserted} inserted"
f" in {batch_count} batches"
f" into table {table.name()} took {(time.monotonic() - start_ts)} seconds."
)


def get_sql_catalog(
catalog_name: str,
uri: str,
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/filesystem/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class FilesystemDestinationClientConfiguration(FilesystemConfigurationWithLocalF
"""Default Iceberg table properties applied to all tables; per-table adapter properties take precedence."""
iceberg_namespace_properties: Optional[Dict[str, str]] = None
"""Properties passed to the Iceberg catalog when creating the namespace."""
iceberg_gc_collect_interval: int = 0
"""How often (in batches) to run gc.collect() during streamed Iceberg writes. Set to 0 to disable."""

@resolve_type("credentials")
def resolve_credentials_type(self) -> Type[CredentialsConfiguration]:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/impl/filesystem/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.enforces_nulls_on_alter = False
caps.sqlglot_dialect = "duckdb"
caps.supports_nested_types = True
caps.recommended_file_size = 128 * 1024 * 1024

return caps

Expand Down
Loading
Loading