Skip to content

Commit af20d30

Browse files
fix: Dremio CI - batched seed materialization, single-threaded seeding, skip seed cache for Trino
- Skip seed caching for Trino (Hive Metastore doesn't recover from stop/start) - Remove dead Trino readiness code from seed cache restart section - Add batched Dremio seed materialization to handle large seeds (splits VALUES into 500-row batches) - Use --threads 1 for Dremio seed step to avoid Nessie catalog race conditions - Fix Dremio DROP SCHEMA cleanup macro (Dremio doesn't support DROP SCHEMA) Co-Authored-By: Itamar Hartstein <haritamar@gmail.com>
1 parent 6459984 commit af20d30

3 files changed

Lines changed: 99 additions & 36 deletions

File tree

.github/workflows/test-warehouse.yml

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,15 @@ jobs:
308308
(inputs.warehouse-type == 'postgres' || inputs.warehouse-type == 'clickhouse' || inputs.warehouse-type == 'trino' || inputs.warehouse-type == 'dremio' || inputs.warehouse-type == 'spark' || inputs.warehouse-type == 'duckdb' || inputs.generate-data)
309309
run: |
310310
python generate_data.py
311-
dbt seed -f --target "${{ inputs.warehouse-type }}"
311+
# Dremio needs single-threaded seeding to avoid Nessie catalog race conditions
312+
SEED_EXTRA_ARGS=""
313+
if [ "${{ inputs.warehouse-type }}" = "dremio" ]; then
314+
SEED_EXTRA_ARGS="--threads 1"
315+
fi
316+
dbt seed -f --target "${{ inputs.warehouse-type }}" $SEED_EXTRA_ARGS
312317
313318
- name: Save seed cache from Docker volumes
314-
if: steps.seed-cache.outputs.cache-hit != 'true' && inputs.warehouse-type != 'duckdb' && (inputs.warehouse-type == 'postgres' || inputs.warehouse-type == 'clickhouse' || inputs.warehouse-type == 'trino' || inputs.warehouse-type == 'dremio' || inputs.warehouse-type == 'spark')
319+
if: steps.seed-cache.outputs.cache-hit != 'true' && inputs.warehouse-type != 'duckdb' && inputs.warehouse-type != 'trino' && (inputs.warehouse-type == 'postgres' || inputs.warehouse-type == 'clickhouse' || inputs.warehouse-type == 'dremio' || inputs.warehouse-type == 'spark')
315320
working-directory: ${{ env.E2E_DBT_PROJECT_DIR }}
316321
run: |
317322
CACHE_DIR="/tmp/seed-cache-${{ inputs.warehouse-type }}"
@@ -338,39 +343,6 @@ jobs:
338343
339344
# Wait for services to be ready after restart
340345
case "${{ inputs.warehouse-type }}" in
341-
trino)
342-
echo "Waiting for Hive Metastore to accept connections after restart..."
343-
for i in $(seq 1 60); do
344-
if nc -z 127.0.0.1 9083 2>/dev/null; then
345-
echo "Hive Metastore port is open."
346-
break
347-
fi
348-
echo "Waiting for Hive Metastore after restart... ($i/60)"; sleep 5
349-
done
350-
echo "Waiting for Trino to be fully ready after restart..."
351-
for i in $(seq 1 60); do
352-
if curl -sf http://localhost:8086/v1/info 2>/dev/null | grep -q '"starting":false'; then
353-
echo "Trino is ready."
354-
break
355-
fi
356-
echo "Waiting for Trino after restart... ($i/60)"; sleep 5
357-
done
358-
# Verify Trino can actually reach the Hive Metastore (catalog query)
359-
echo "Verifying Trino-Hive Metastore connectivity..."
360-
for i in $(seq 1 30); do
361-
RESP=$(curl -s -X POST http://localhost:8086/v1/statement \
362-
-H "X-Trino-User: admin" \
363-
-d "SHOW SCHEMAS FROM iceberg" 2>/dev/null)
364-
if echo "$RESP" | grep -q '"error"'; then
365-
echo "Trino-Hive Metastore not ready yet... ($i/30)"; sleep 5
366-
elif echo "$RESP" | grep -q '"stats"'; then
367-
echo "Trino can reach Hive Metastore."
368-
break
369-
else
370-
echo "Unexpected response, retrying... ($i/30)"; sleep 5
371-
fi
372-
done
373-
;;
374346
clickhouse)
375347
for i in $(seq 1 30); do
376348
curl -sf http://localhost:8123/ping > /dev/null && break
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
{#
2+
Override dbt-dremio's seed materialization to support large seeds via batched inserts.
3+
Dremio's REST API rejects SQL that is too large or complex, so we split the VALUES
4+
clause into batches of BATCH_SIZE rows and issue separate INSERT INTO statements.
5+
#}
6+
7+
{% macro dremio__select_csv_rows_batch(model, agate_table, start_idx, end_idx) %}
8+
{%- set column_override = model['config'].get('column_types', {}) -%}
9+
{%- set quote_seed_column = model['config'].get('quote_columns', None) -%}
10+
{%- set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) -%}
11+
select
12+
{% for col_name in agate_table.column_names -%}
13+
{%- set inferred_type = adapter.convert_type(agate_table, loop.index0) -%}
14+
{%- set type = column_override.get(col_name, inferred_type) -%}
15+
{%- set column_name = (col_name | string) -%}
16+
cast({{ adapter.quote_seed_column(column_name, quote_seed_column) }} as {{ type }})
17+
as {{ adapter.quote_seed_column(column_name, quote_seed_column) }}{%- if not loop.last -%}, {%- endif -%}
18+
{% endfor %}
19+
from
20+
(values
21+
{% for idx in range(start_idx, end_idx) %}
22+
{%- set row = agate_table.rows[idx] -%}
23+
({%- for value in row -%}
24+
{% if value is not none %}
25+
{{ "'" ~ (value | string | replace("'", "''")) ~ "'" }}
26+
{% else %}
27+
cast(null as varchar)
28+
{% endif %}
29+
{%- if not loop.last%},{%- endif %}
30+
{%- endfor -%})
31+
{%- if not loop.last%},{%- endif %}
32+
{% endfor %}) temp_table ( {{ cols_sql }} )
33+
{% endmacro %}
34+
35+
36+
{% materialization seed, adapter = 'dremio' %}
37+
38+
{%- set identifier = model['alias'] -%}
39+
{%- set format = config.get('format', validator=validation.any[basestring]) or 'iceberg' -%}
40+
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
41+
{%- set target_relation = this.incorporate(type='table') -%}
42+
{% set grant_config = config.get('grants') %}
43+
44+
{{ run_hooks(pre_hooks) }}
45+
46+
{% if old_relation is not none -%}
47+
{{ adapter.drop_relation(old_relation) }}
48+
{%- endif %}
49+
50+
{%- set agate_table = load_agate_table() -%}
51+
{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}
52+
{%- set num_rows = (agate_table.rows | length) -%}
53+
54+
{# Batch size: keep each SQL statement small enough for Dremio's REST API #}
55+
{%- set batch_size = 500 -%}
56+
{%- set first_end = [batch_size, num_rows] | min -%}
57+
58+
{# Create table with first batch #}
59+
{%- set first_batch_sql = dremio__select_csv_rows_batch(model, agate_table, 0, first_end) -%}
60+
{% call statement('effective_main') -%}
61+
{{ create_table_as(False, target_relation, first_batch_sql) }}
62+
{%- endcall %}
63+
64+
{# Insert remaining batches #}
65+
{% for batch_start in range(batch_size, num_rows, batch_size) %}
66+
{%- set batch_end = [batch_start + batch_size, num_rows] | min -%}
67+
{%- set batch_sql = dremio__select_csv_rows_batch(model, agate_table, batch_start, batch_end) -%}
68+
{% call statement('insert_batch_' ~ batch_start) -%}
69+
INSERT INTO {{ target_relation }}
70+
{{ batch_sql }}
71+
{%- endcall %}
72+
{% endfor %}
73+
74+
{% call noop_statement('main', 'CREATE ' ~ num_rows, 'CREATE', num_rows) %}
75+
-- batched seed insert ({{ num_rows }} rows in {{ (num_rows / batch_size) | round(0, 'ceil') | int }} batches)
76+
{% endcall %}
77+
78+
{{ refresh_metadata(target_relation, format) }}
79+
80+
{{ apply_twin_strategy(target_relation) }}
81+
82+
{% do persist_docs(target_relation, model) %}
83+
84+
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
85+
86+
{{ run_hooks(post_hooks) }}
87+
88+
{{ return({'relations': [target_relation]}) }}
89+
90+
{% endmaterialization %}

tests/e2e_dbt_project/macros/system/drop_test_schemas.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
{% endmacro %}
4242
4343
{% macro dremio__edr_drop_schema(schema_name) %}
44-
{% do run_query("DROP SCHEMA IF EXISTS " ~ schema_name) %}
44+
{# Dremio does not support DROP SCHEMA; Docker container cleanup handles this in CI #}
45+
{% do log("Skipping schema drop for Dremio (not supported); Docker cleanup handles this.", info=true) %}
4546
{% endmacro %}
4647
4748
{% macro duckdb__edr_drop_schema(schema_name) %}

0 commit comments

Comments
 (0)