Fix/iceberg drop tables#16
Open
hentzthename wants to merge 2 commits intosidequery:mainfrom
Open
Conversation
dlt core gates refresh="drop_resources" / refresh="drop_sources" on hasattr(job_client, "drop_tables") (dlt/load/utils.py). Without that method the load layer warns and silently skips the drops, which is what triggered the "Client for iceberg_rest does not implement drop table" message in the field. Tests cover: - method existence (the hasattr gate) - selective drop of named tables only - idempotent behavior for missing tables - delete_schema=True clears _dlt_version rows for the current schema - refresh="drop_resources" end-to-end
IcebergRestClient only exposed drop_storage() (full namespace wipe). dlt core's refresh="drop_resources" / refresh="drop_sources" path calls job_client.drop_tables(*names, delete_schema=True); without it, drops are warned-and-skipped, leaving stale tables in the destination. Drops each named table via the PyIceberg catalog (swallowing NoSuchTableError for idempotence). When delete_schema=True, wipes _dlt_version rows for self.schema.name via table.delete(EqualTo(...)), matching the SqlJobClientBase.drop_tables contract. Inherited _delete_schema_in_storage isn't used because IcebergRestClient extends JobClientBase directly, not SqlJobClientBase, and would need a real SQL client to run the DELETE statement.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Hi Nico, another small one that fell out of running
pipeline.extract(source, refresh="drop_resources")afterpipeline.sync_destination()against a Nessie deployment.The load layer emitted:
…and silently skipped the drops, so stale tables stuck around across refreshes.
Problem
dlt core gates the per-table drop path on
hasattr(job_client, "drop_tables")(dlt/load/utils.py:170).IcebergRestClientonly exposeddrop_storage()(a full namespace wipe) -- nodrop_tables(*names, delete_schema=True)-- so the load layer fell back to the warn-and-skip branch. Net effect:refresh="drop_resources"/refresh="drop_sources"were effectively no-ops on this destination.pipeline.sql_client().drop_dataset()had no coherent per-table partner (dataset-level works via the base-classDROP SCHEMA CASCADE).pyicebergdirectly for destructive ops.Solution
Implement the
JobClient.drop_tablescontract onIcebergRestClient:NoSuchTableErrorso the call is idempotent (dlt may pass tables that were never physically created).delete_schema=True, remove all_dlt_versionrows whereschema_name = self.schema.nameviatable.delete(EqualTo(...)), matching theSqlJobClientBase.drop_tablescontract.One deviation worth calling out: the obvious move would be
self._delete_schema_in_storage(self.schema), but that method lives onSqlJobClientBase(notJobClientBase) and usesself.sql_client.execute_sql(...).IcebergRestClientextendsJobClientBasedirectly, and itssql_clientis a DuckDB view provider rather than a real DDL-capable client -- so theDELETEis issued via PyIceberg's row-delete API instead, reusing the pattern already atdestination_client.py:1151-1153.Changes
destination_client.pyIcebergRestClient.drop_tables(new)catalog.drop_table(...);NoSuchTableErroris swallowed. Whendelete_schema=True, deletes_dlt_versionrows forself.schema.nameviaversion_table.delete(EqualTo("schema_name", ...)).No changes to
sql_client.pyorschema_evolution.py.Tests
test_drop_tables.pycovering:hasattrgate (method is actually exposed on the class)delete_schema=Trueclears_dlt_versionrows for the current schemapipeline.run(..., refresh="drop_resources")end-to-end (the originally reported symptom)