Skip to content

Commit fd4738b

Browse files
fix: use COPY INTO for Dremio seeds, skip Spark seed caching
- Replace fragile CSV promotion REST API with COPY INTO for Dremio (creates Iceberg tables directly from S3 source files) - Remove _promote_csv and _refresh_source methods (no longer needed) - Skip seed caching for Spark (docker stop/start kills Thrift Server) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com>
1 parent 094841b commit fd4738b

2 files changed

Lines changed: 35 additions & 111 deletions

File tree

.github/workflows/test-warehouse.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ jobs:
317317
run: dbt seed -f --target "${{ inputs.warehouse-type }}"
318318

319319
- name: Save seed cache from Docker volumes
320-
if: steps.seed-cache.outputs.cache-hit != 'true' && inputs.warehouse-type != 'duckdb' && inputs.warehouse-type != 'trino' && (inputs.warehouse-type == 'postgres' || inputs.warehouse-type == 'clickhouse' || inputs.warehouse-type == 'dremio' || inputs.warehouse-type == 'spark')
320+
if: steps.seed-cache.outputs.cache-hit != 'true' && inputs.warehouse-type != 'duckdb' && inputs.warehouse-type != 'trino' && inputs.warehouse-type != 'spark' && (inputs.warehouse-type == 'postgres' || inputs.warehouse-type == 'clickhouse' || inputs.warehouse-type == 'dremio')
321321
working-directory: ${{ env.E2E_DBT_PROJECT_DIR }}
322322
run: |
323323
CACHE_DIR="/tmp/seed-cache-${{ inputs.warehouse-type }}"

tests/e2e_dbt_project/external_seeders/dremio.py

Lines changed: 34 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -208,88 +208,18 @@ def _create_s3_source(self, token: str) -> None:
208208
else:
209209
print(" SeedFiles S3 source created/updated in Dremio")
210210

211-
# ------------------------------------------------------------------
212-
# Metadata refresh
213-
# ------------------------------------------------------------------
214-
215-
def _refresh_source(self, token: str) -> None:
216-
import requests
217-
218-
headers = self._headers(token)
219-
resp = requests.get(
220-
f"http://{self.dremio_host}:{self.dremio_port}/apiv2/source/SeedFiles",
221-
headers=headers,
222-
)
223-
if resp.status_code == 200:
224-
requests.post(
225-
f"http://{self.dremio_host}:{self.dremio_port}/apiv2/source/SeedFiles/refresh",
226-
headers=headers,
227-
)
228-
print(" SeedFiles metadata refresh triggered")
229-
time.sleep(5)
230-
231-
# ------------------------------------------------------------------
232-
# CSV promotion
233-
# ------------------------------------------------------------------
234-
235-
def _promote_csv(self, token: str, path_parts: list[str]) -> bool:
236-
import requests
237-
238-
headers = self._headers(token)
239-
full_path = ["SeedFiles"] + path_parts
240-
encoded = ".".join(f'"{p}"' for p in full_path)
241-
path_param = "/".join(full_path)
242-
243-
resp = requests.get(
244-
f"http://{self.dremio_host}:{self.dremio_port}/api/v3/catalog/by-path/{path_param}",
245-
headers=headers,
246-
)
247-
if resp.status_code != 200:
248-
print(f" Cannot find file at {path_param}: {resp.status_code}")
249-
return False
250-
251-
entity = resp.json()
252-
if entity.get("entityType") == "dataset":
253-
print(f" Already promoted: {encoded}")
254-
return True
255-
256-
entity_id = entity.get("id")
257-
if not entity_id:
258-
return False
259-
260-
promote_payload = {
261-
"id": entity_id,
262-
"path": full_path,
263-
"type": "PHYSICAL_DATASET",
264-
"entityType": "dataset",
265-
"format": {
266-
"type": "Text",
267-
"fieldDelimiter": ",",
268-
"lineDelimiter": "\n",
269-
"quote": '"',
270-
"comment": "#",
271-
"extractHeader": True,
272-
"trimHeader": True,
273-
"autoGenerateColumnNames": False,
274-
},
275-
}
276-
resp2 = requests.put(
277-
f"http://{self.dremio_host}:{self.dremio_port}/api/v3/catalog/{entity_id}",
278-
headers=headers,
279-
json=promote_payload,
280-
)
281-
if resp2.status_code in (200, 201):
282-
print(f" Promoted: {encoded}")
283-
return True
284-
print(f" Promote failed ({resp2.status_code}): {resp2.text[:200]}")
285-
return False
286-
287211
# ------------------------------------------------------------------
288212
# Public API
289213
# ------------------------------------------------------------------
290214

291215
def load(self) -> None:
292-
print("\n=== Loading Dremio seeds via MinIO ===")
216+
"""Load seeds using COPY INTO (no fragile CSV promotion needed).
217+
218+
1. Upload CSVs to MinIO.
219+
2. Create an S3 source so Dremio can read those files.
220+
3. For each CSV, CREATE TABLE in Nessie + COPY INTO from S3.
221+
"""
222+
print("\n=== Loading Dremio seeds via MinIO + COPY INTO ===")
293223

294224
print("\nStep 1: Uploading CSVs to MinIO...")
295225
self._upload_csvs_to_minio()
@@ -298,11 +228,7 @@ def load(self) -> None:
298228
token = self._get_token()
299229
self._create_s3_source(token)
300230

301-
print("\nStep 3: Refreshing source metadata...")
302-
self._refresh_source(token)
303-
time.sleep(5)
304-
305-
print(f"\nStep 4: Creating Nessie namespace '{self.schema_name}'...")
231+
print(f"\nStep 3: Creating Nessie namespace '{self.schema_name}'...")
306232
try:
307233
self._sql(
308234
token,
@@ -311,42 +237,40 @@ def load(self) -> None:
311237
except Exception as e:
312238
print(f" Warning creating schema: {e}")
313239

314-
print("\nStep 5: Creating Iceberg tables from promoted CSVs...")
240+
print("\nStep 4: Creating Iceberg tables and loading CSV data...")
315241
for subdir, csv_path, table_name in self.iter_seed_csvs():
316-
fname = os.path.basename(csv_path)
242+
cols = self.csv_columns(csv_path)
243+
if not cols:
244+
print(f" Skipping {table_name} (completely empty file)")
245+
continue
317246

318-
if not self.csv_has_data(csv_path):
319-
cols = self.csv_columns(csv_path)
320-
if not cols:
321-
print(f" Skipping {table_name} (completely empty file)")
322-
continue
323-
col_defs = ", ".join(f'"{c}" VARCHAR' for c in cols)
324-
sql = (
325-
f"CREATE TABLE IF NOT EXISTS "
326-
f'NessieSource."{self.schema_name}"."{table_name}" '
327-
f"({col_defs})"
328-
)
329-
print(f" Creating empty table: {table_name}")
330-
try:
331-
self._sql(token, sql, timeout=60)
332-
except Exception as e:
333-
print(f" Error: {e}")
247+
fqn = f'NessieSource."{self.schema_name}"."{table_name}"'
248+
249+
# Create empty Iceberg table with VARCHAR columns
250+
col_defs = ", ".join(f'"{c}" VARCHAR' for c in cols)
251+
create_sql = f"CREATE TABLE IF NOT EXISTS {fqn} ({col_defs})"
252+
try:
253+
self._sql(token, create_sql, timeout=60)
254+
except Exception as e:
255+
print(f" Error creating table {table_name}: {e}")
334256
continue
335257

336-
promoted = self._promote_csv(token, ["seeds", subdir, fname])
337-
if not promoted:
338-
print(f" Skipping CTAS for {table_name} (promotion failed)")
258+
if not self.csv_has_data(csv_path):
259+
print(f" Created empty table: {table_name}")
339260
continue
340261

341-
s3_ref = f'"SeedFiles"."seeds"."{subdir}"."{fname}"'
342-
sql = (
343-
f"CREATE TABLE IF NOT EXISTS "
344-
f'NessieSource."{self.schema_name}"."{table_name}" AS '
345-
f"SELECT * FROM {s3_ref}"
262+
# Load CSV data using COPY INTO from the S3 source
263+
fname = os.path.basename(csv_path)
264+
s3_path = f"@SeedFiles/seeds/{subdir}/{fname}"
265+
copy_sql = (
266+
f"COPY INTO {fqn} "
267+
f"FROM '{s3_path}' "
268+
f"FILE_FORMAT 'csv' "
269+
f"(EXTRACT_HEADER 'true', TRIM_SPACE 'true')"
346270
)
347-
print(f" CTAS: {table_name}")
271+
print(f" COPY INTO: {table_name}")
348272
try:
349-
self._sql(token, sql, timeout=120)
273+
self._sql(token, copy_sql, timeout=120)
350274
except Exception as e:
351275
print(f" Error: {e}")
352276

0 commit comments

Comments
 (0)