Skip to content

Commit 0b7d476

Browse files
bdpedigoCopilot
andauthored
Delta dump tweaks (#228)
* configure small table defaults Co-authored-by: Copilot <copilot@github.com> * update auth pattern, endpoints Co-authored-by: Copilot <copilot@github.com> --------- Co-authored-by: Copilot <copilot@github.com>
1 parent 8003360 commit 0b7d476

5 files changed

Lines changed: 72 additions & 37 deletions

File tree

materializationengine/blueprints/deltalake/api.py

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
request,
1212
url_for,
1313
)
14-
from middle_auth_client import auth_required, auth_requires_admin
14+
from middle_auth_client import auth_required, auth_requires_dataset_admin
1515

1616
from materializationengine.blueprints.reset_auth import reset_auth
1717
from materializationengine.info_client import get_datastack_info, get_datastacks
@@ -31,30 +31,22 @@ def _is_auth_disabled():
3131
)
3232

3333

34-
def _has_datastack_permission(auth_user_info, permission_level, datastack_name):
35-
if not auth_user_info or not datastack_name or not permission_level:
36-
return False
37-
permissions = auth_user_info.get("permissions", [])
38-
required_permission = f"{datastack_name.lower()}_{permission_level.lower()}"
39-
return required_permission in permissions
40-
41-
4234
# ---------------------------------------------------------------------------
4335
# Wizard page routes
4436
# ---------------------------------------------------------------------------
4537

4638

4739
@deltalake_bp.route("/")
4840
@reset_auth
49-
@auth_requires_admin
41+
@auth_required
5042
def index():
5143
"""Redirect to step 1 of the wizard."""
5244
return redirect(url_for("deltalake.wizard_step", step_number=1))
5345

5446

5547
@deltalake_bp.route("/step<int:step_number>")
5648
@reset_auth
57-
@auth_requires_admin
49+
@auth_required
5850
def wizard_step(step_number):
5951
total_steps = 3
6052

@@ -71,6 +63,25 @@ def wizard_step(step_number):
7163
)
7264
datastacks = []
7365

66+
if not _is_auth_disabled():
67+
datasets_admin = g.get("auth_user", {}).get("datasets_admin", [])
68+
datastacks = [ds for ds in datastacks if ds in datasets_admin]
69+
70+
if not datastacks and not _is_auth_disabled():
71+
return render_template(
72+
"deltalake_wizard.html",
73+
current_step=step_number,
74+
total_steps=total_steps,
75+
step_template=None,
76+
datastacks=[],
77+
current_user=g.get("auth_user", {}),
78+
access_denied=True,
79+
access_denied_message="dataset_admin permission is required. You do not have dataset_admin access for any datastacks.",
80+
target_partition_size_mb=get_config_param("DELTALAKE_TARGET_PARTITION_SIZE_MB", 256),
81+
bloom_filter_fpp=get_config_param("DELTALAKE_BLOOM_FILTER_FPP", 0.001),
82+
output_bucket=get_config_param("DELTALAKE_OUTPUT_BUCKET", ""),
83+
), 403
84+
7485
step_template_path = f"deltalake/step{step_number}.html"
7586

7687
return render_template(
@@ -90,7 +101,7 @@ def wizard_step(step_number):
90101

91102
@deltalake_bp.route("/running-exports")
92103
@reset_auth
93-
@auth_requires_admin
104+
@auth_required
94105
def running_exports_page():
95106
"""Render the running exports monitoring page."""
96107
return render_template(
@@ -122,10 +133,10 @@ def get_defaults():
122133
)
123134

124135

125-
@deltalake_bp.route("/api/discover-specs", methods=["POST"])
136+
@deltalake_bp.route("/api/<string:datastack_name>/discover-specs", methods=["POST"])
126137
@reset_auth
127-
@auth_requires_admin
128-
def discover_specs():
138+
@auth_requires_dataset_admin(table_arg="datastack_name")
139+
def discover_specs(datastack_name):
129140
"""Run spec discovery for a table without enqueuing an export.
130141
131142
Expects JSON body: { datastack, version, table_name, target_partition_size_mb }
@@ -151,14 +162,13 @@ def discover_specs():
151162
return jsonify({"error": "Request body must be JSON"}), 400
152163

153164
data = request.json
154-
datastack = data.get("datastack")
155165
version = data.get("version")
156166
table_name = data.get("table_name")
157167
target_partition_size_mb = data.get("target_partition_size_mb", 256)
158168

159-
if not all([datastack, version, table_name]):
169+
if not all([version, table_name]):
160170
return jsonify(
161-
{"error": "datastack, version, and table_name are required"}
171+
{"error": "version and table_name are required"}
162172
), 400
163173

164174
try:
@@ -167,7 +177,7 @@ def discover_specs():
167177
return jsonify({"error": f"Invalid table name: {table_name!r}"}), 400
168178

169179
# Check Redis cache first.
170-
cache_key = f"deltalake_specs:{datastack}:v{version}:{table_name}"
180+
cache_key = f"deltalake_specs:{datastack_name}:v{version}:{table_name}"
171181
redis_client = _get_redis_client()
172182
cached = redis_client.get(cache_key)
173183
if cached:
@@ -188,16 +198,16 @@ def discover_specs():
188198
return jsonify(cached_data)
189199

190200
try:
191-
datastack_info = get_datastack_info(datastack)
201+
datastack_info = get_datastack_info(datastack_name)
192202
except Exception as e:
193203
return jsonify({"error": f"Datastack not found: {e}"}), 404
194204

195205
sql_uri_config = get_config_param("SQLALCHEMY_DATABASE_URI")
196206
connection_string = _build_frozen_db_connection_string(
197-
sql_uri_config, datastack, version
207+
sql_uri_config, datastack_name, version
198208
)
199209

200-
analysis_database = f"{datastack}__mat{version}"
210+
analysis_database = f"{datastack_name}__mat{version}"
201211
pcg_table_name = datastack_info["segmentation_source"].split("/")[-1]
202212

203213
try:
@@ -234,6 +244,13 @@ def discover_specs():
234244
resolved_specs = discover_default_output_specs(source, engine)
235245
bytes_per_row = estimate_bytes_per_row(connection_string, source)
236246

247+
small_table_threshold_mb = int(
248+
get_config_param("DELTALAKE_SMALL_TABLE_THRESHOLD_MB", 200)
249+
)
250+
estimated_total_mb = row_count * bytes_per_row / (1024 * 1024)
251+
if estimated_total_mb < small_table_threshold_mb and len(resolved_specs) > 1:
252+
resolved_specs = resolved_specs[:1]
253+
237254
# Track which specs had "auto" before resolution (for caching).
238255
was_auto = [spec.n_partitions == "auto" for spec in resolved_specs]
239256

@@ -296,10 +313,10 @@ def discover_specs():
296313
return jsonify(result)
297314

298315

299-
@deltalake_bp.route("/api/check-exists", methods=["POST"])
316+
@deltalake_bp.route("/api/<string:datastack_name>/check-exists", methods=["POST"])
300317
@reset_auth
301-
@auth_requires_admin
302-
def check_exists():
318+
@auth_requires_dataset_admin(table_arg="datastack_name")
319+
def check_exists(datastack_name):
303320
"""Check whether Delta Lake exports already exist for a table/version.
304321
305322
Expects JSON body: { datastack, version, table_name, spec_names? }
@@ -312,19 +329,18 @@ def check_exists():
312329
return jsonify({"error": "Request body must be JSON"}), 400
313330

314331
data = request.json
315-
datastack = data.get("datastack")
316332
version = data.get("version")
317333
table_name = data.get("table_name")
318334

319-
if not all([datastack, version, table_name]):
335+
if not all([version, table_name]):
320336
return jsonify(
321-
{"error": "datastack, version, and table_name are required"}
337+
{"error": "version and table_name are required"}
322338
), 400
323339

324340
from materializationengine.workflows.deltalake_export import _validate_identifier
325341

326342
try:
327-
_validate_identifier(datastack)
343+
_validate_identifier(datastack_name)
328344
_validate_identifier(table_name)
329345
version = int(version)
330346
except (ValueError, TypeError):
@@ -334,7 +350,7 @@ def check_exists():
334350
if not output_bucket:
335351
return jsonify({"error": "DELTALAKE_OUTPUT_BUCKET not configured"}), 500
336352

337-
output_uri_base = f"{output_bucket}/{datastack}/v{version}/{table_name}"
353+
output_uri_base = f"{output_bucket}/{datastack_name}/v{version}/{table_name}"
338354

339355
# If the caller provides explicit spec names, use those.
340356
# Otherwise fall back to cached specs or just check "flat".
@@ -350,7 +366,7 @@ def check_exists():
350366
from materializationengine.workflows.deltalake_export import _get_redis_client
351367

352368
redis_client = _get_redis_client()
353-
cache_key = f"deltalake_specs:{datastack}:v{version}:{table_name}"
369+
cache_key = f"deltalake_specs:{datastack_name}:v{version}:{table_name}"
354370
cached = redis_client.get(cache_key)
355371

356372
partition_names = ["flat"]

materializationengine/workflows/deltalake_export.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,6 +1612,20 @@ def _phase(phase: str, message: str, **status_kwargs) -> None:
16121612
)
16131613
bytes_per_row = estimate_bytes_per_row(connection_string, source)
16141614

1615+
small_table_threshold_mb = get_config_param(
1616+
"DELTALAKE_SMALL_TABLE_THRESHOLD_MB", 200
1617+
)
1618+
estimated_total_mb = row_count * bytes_per_row / (1024 * 1024)
1619+
if estimated_total_mb < small_table_threshold_mb and len(resolved_specs) > 1:
1620+
celery_logger.info(
1621+
"Table %s estimated size %.1f MB < threshold %s MB — "
1622+
"trimming to single (id) spec",
1623+
table_name,
1624+
estimated_total_mb,
1625+
small_table_threshold_mb,
1626+
)
1627+
resolved_specs = resolved_specs[:1]
1628+
16151629
for spec in resolved_specs:
16161630
if spec.n_partitions == "auto" or spec.n_partitions is None:
16171631
effective_target = spec.target_file_size_mb or target_partition_size_mb

static/js/deltalakeStep1.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,10 @@ document.addEventListener("alpine:init", () => {
106106
async checkExists() {
107107
this.checkingExists = true;
108108
try {
109-
const resp = await fetch("/materialize/deltalake/api/check-exists", {
109+
const resp = await fetch(`/materialize/deltalake/api/${this.datastack}/check-exists`, {
110110
method: "POST",
111111
headers: { "Content-Type": "application/json" },
112112
body: JSON.stringify({
113-
datastack: this.datastack,
114113
version: parseInt(this.version),
115114
table_name: this.tableName,
116115
}),
@@ -131,11 +130,10 @@ document.addEventListener("alpine:init", () => {
131130
this.discovering = true;
132131
this.error = null;
133132
try {
134-
const resp = await fetch("/materialize/deltalake/api/discover-specs", {
133+
const resp = await fetch(`/materialize/deltalake/api/${this.datastack}/discover-specs`, {
135134
method: "POST",
136135
headers: { "Content-Type": "application/json" },
137136
body: JSON.stringify({
138-
datastack: this.datastack,
139137
version: parseInt(this.version),
140138
table_name: this.tableName,
141139
target_partition_size_mb: this.targetPartitionSizeMb,

static/js/deltalakeStep3.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@ document.addEventListener("alpine:init", () => {
1919
const state = Alpine.store("dlWizard").state;
2020
const specNames = state.specs.map((s) => s.name);
2121
try {
22-
const resp = await fetch("/materialize/deltalake/api/check-exists", {
22+
const resp = await fetch(`/materialize/deltalake/api/${state.datastack}/check-exists`, {
2323
method: "POST",
2424
headers: { "Content-Type": "application/json" },
2525
body: JSON.stringify({
26-
datastack: state.datastack,
2726
version: state.version,
2827
table_name: state.tableName,
2928
spec_names: specNames,

templates/deltalake_wizard.html

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,15 @@
128128
</div>
129129

130130
<div class="card mt-5">
131+
{% if access_denied %}
132+
<div class="card-body">
133+
<div class="alert alert-danger mb-0" role="alert">
134+
<i class="fas fa-lock me-2"></i>{{ access_denied_message }}
135+
</div>
136+
</div>
137+
{% else %}
131138
{% include step_template %}
139+
{% endif %}
132140
</div>
133141
</div>
134142
{% endblock %}

0 commit comments

Comments
 (0)