Skip to content

Commit f2a9694

Browse files
committed
Merge branch 'master' of github.com:CAVEconnectome/MaterializationEngine
2 parents 9543250 + a3ac700 commit f2a9694

15 files changed

Lines changed: 3063 additions & 52 deletions

File tree

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 5.18.0
2+
current_version = 5.20.1
33
commit = True
44
tag = True
55

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ COPY uv.lock pyproject.toml ./
3333

3434
ENV UV_PROJECT_ENVIRONMENT="/usr/local/"
3535
RUN --mount=type=cache,target=/root/.cache/uv \
36-
UV_VENV_ARGS="--system-site-packages" uv sync --frozen --no-install-project --no-default-groups
36+
UV_VENV_ARGS="--system-site-packages" uv sync --frozen --no-install-project --no-default-groups --group deploy
3737

3838
# COPY . ./
3939
# RUN --mount=type=cache,target=/root/.cache/uv \

docs/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
author = "Derrick Brittain, Forrest Collman, Sven Dorkenwald"
2424

2525
# The full version, including alpha/beta/rc tags
26-
release = "5.18.0"
26+
release = "5.20.1"
2727

2828

2929

materializationengine/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "5.18.0"
1+
__version__ = "5.20.1"

materializationengine/blueprints/client/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from materializationengine.models import MaterializedMetadata
3333
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
3434

35-
__version__ = "5.18.0"
35+
__version__ = "5.20.1"
3636

3737

3838
authorizations = {

materializationengine/blueprints/client/api2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
7272
from materializationengine.utils import check_read_permission
7373

74-
__version__ = "5.18.0"
74+
__version__ = "5.20.1"
7575

7676

7777
authorizations = {

materializationengine/blueprints/materialize/api.py

Lines changed: 109 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,43 @@
11
import datetime
22
import logging
3+
import os
4+
import subprocess
5+
6+
import cloudfiles
37
import redis
4-
from dynamicannotationdb.models import AnalysisTable, Base
5-
from flask import abort, current_app, request, jsonify
8+
from dynamicannotationdb.models import AnalysisTable, AnalysisVersion, Base
9+
from flask import abort, current_app, jsonify, request
610
from flask_accepts import accepts
7-
from flask_restx import Namespace, Resource, inputs, reqparse, fields
11+
from flask_restx import Namespace, Resource, fields, inputs, reqparse
12+
from middle_auth_client import (
13+
auth_requires_admin,
14+
auth_requires_dataset_admin,
15+
auth_requires_permission,
16+
)
17+
from sqlalchemy import MetaData, Table
18+
from sqlalchemy.engine.url import make_url
19+
from sqlalchemy.exc import NoSuchTableError
20+
821
from materializationengine.blueprints.client.utils import get_latest_version
22+
from materializationengine.blueprints.materialize.schemas import (
23+
AnnotationIDListSchema,
24+
BadRootsSchema,
25+
VirtualVersionSchema,
26+
)
927
from materializationengine.blueprints.reset_auth import reset_auth
1028
from materializationengine.database import (
11-
dynamic_annotation_cache,
1229
db_manager,
30+
dynamic_annotation_cache,
1331
)
1432
from materializationengine.info_client import (
1533
get_aligned_volumes,
1634
get_datastack_info,
1735
get_relevant_datastack_info,
1836
)
19-
from dynamicannotationdb.models import AnalysisVersion
2037
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
21-
from materializationengine.blueprints.materialize.schemas import BadRootsSchema
22-
from middle_auth_client import auth_requires_admin, auth_requires_permission, auth_requires_dataset_admin
23-
from sqlalchemy import MetaData, Table
24-
from sqlalchemy.engine.url import make_url
25-
from sqlalchemy.exc import NoSuchTableError
2638
from materializationengine.utils import check_write_permission
27-
import os
28-
import subprocess
29-
import cloudfiles
30-
31-
32-
from materializationengine.blueprints.materialize.schemas import (
33-
VirtualVersionSchema,
34-
AnnotationIDListSchema,
35-
)
36-
3739

38-
__version__ = "5.18.0"
40+
__version__ = "5.20.1"
3941

4042

4143
bulk_upload_parser = reqparse.RequestParser()
@@ -715,7 +717,7 @@ class TableResource(Resource):
715717
@mat_bp.doc("get_all_tables", security="apikey")
716718
def get(self, aligned_volume_name, version):
717719
check_aligned_volume(aligned_volume_name)
718-
720+
719721
with db_manager.session_scope(aligned_volume_name) as session:
720722
response = (
721723
session.query(AnalysisTable)
@@ -742,11 +744,11 @@ class AnnotationResource(Resource):
742744
@mat_bp.doc("get_top_materialized_annotations", security="apikey")
743745
def get(self, aligned_volume_name: str, version: int, tablename: str):
744746
check_aligned_volume(aligned_volume_name)
745-
747+
746748
try:
747749
with db_manager.session_scope(aligned_volume_name) as session:
748750
engine = db_manager.get_engine(aligned_volume_name)
749-
751+
750752
metadata = MetaData()
751753
try:
752754
annotation_table = Table(
@@ -755,23 +757,24 @@ def get(self, aligned_volume_name: str, version: int, tablename: str):
755757
except NoSuchTableError as e:
756758
logging.error(f"No table exists {e}")
757759
return abort(404)
758-
760+
759761
response = session.query(annotation_table).limit(10).all()
760762
annotations = [r._asdict() for r in response]
761-
763+
762764
return (annotations, 200) if annotations else abort(404)
763-
765+
764766
except Exception as e:
765767
logging.error(f"Error querying annotations: {e}")
766768
return abort(500)
767769

770+
768771
@mat_bp.route("/materialize/run/create_virtual/datastack/<string:datastack_name>")
769772
class CreateVirtualPublicVersionResource(Resource):
770773
@reset_auth
771774
@auth_requires_dataset_admin(table_arg="datastack_name")
772775
@mat_bp.doc("create virtual materialization", security="apikey")
773776
@accepts("VirtualVersionSchema", schema=VirtualVersionSchema, api=mat_bp)
774-
def post(self, datastack_name:str):
777+
def post(self, datastack_name: str):
775778
"""Create a virtual version from an existing frozen version.
776779
777780
Args:
@@ -790,8 +793,7 @@ def post(self, datastack_name:str):
790793
if not tables_to_include:
791794
return abort(400, "No tables included")
792795

793-
with db_manager.session_scope(aligned_volume) as session:
794-
796+
with db_manager.session_scope(aligned_volume) as session:
795797
analysis_version = (
796798
session.query(AnalysisVersion)
797799
.filter(AnalysisVersion.version == target_version)
@@ -851,3 +853,80 @@ def post(self, datastack_name:str):
851853
analysis_version.expires_on = expiration_timestamp
852854

853855
return f"{virtual_datastack_name} created", 200
856+
857+
858+
@mat_bp.route(
859+
"/materialize/run/write_deltalake/datastack/<string:datastack_name>/version/<int(signed=True):version>/table_name/<string:table_name>/"
860+
)
861+
class WriteDeltalakeResource(Resource):
862+
@reset_auth
863+
@auth_requires_dataset_admin(table_arg="datastack_name")
864+
@mat_bp.doc("Export a table to Delta Lake", security="apikey")
865+
def post(self, datastack_name: str, version: int, table_name: str):
866+
"""Export a frozen materialization table to partitioned Delta Lake.
867+
868+
Args:
869+
datastack_name (str): name of datastack from infoservice
870+
version (int): materialization version (-1 for latest)
871+
table_name (str): annotation table name to export
872+
"""
873+
from materializationengine.workflows.deltalake_export import (
874+
write_deltalake_table,
875+
)
876+
877+
if version == -1:
878+
version = get_latest_version(datastack_name)
879+
880+
datastack_info = get_datastack_info(datastack_name)
881+
882+
# Accept optional output_specs from JSON body.
883+
output_specs = None
884+
if request.is_json and request.json:
885+
output_specs = request.json.get("output_specs", None)
886+
887+
if output_specs is not None:
888+
from materializationengine.workflows.deltalake_export import (
889+
DeltaLakeOutputSpec,
890+
)
891+
892+
if not isinstance(output_specs, list):
893+
return abort(400, "output_specs must be a list")
894+
for item in output_specs:
895+
if not isinstance(item, dict):
896+
return abort(400, "each entry in output_specs must be an object")
897+
try:
898+
DeltaLakeOutputSpec(**item)
899+
except TypeError as exc:
900+
return abort(400, f"invalid output_specs entry: {exc}")
901+
902+
write_deltalake_table.s(
903+
datastack_info, version, table_name, output_specs=output_specs
904+
).apply_async()
905+
906+
return {
907+
"message": f"Delta Lake export enqueued for {table_name} v{version}"
908+
}, 200
909+
910+
@reset_auth
911+
@auth_requires_dataset_admin(table_arg="datastack_name")
912+
@mat_bp.doc("Get Delta Lake export progress", security="apikey")
913+
def get(self, datastack_name: str, version: int, table_name: str):
914+
"""Get progress of a Delta Lake export for a table.
915+
916+
Returns JSON with ``status``, ``rows_processed``, ``total_rows``,
917+
and ``percent_complete``. Returns 404 if no export is tracked.
918+
"""
919+
from materializationengine.workflows.deltalake_export import (
920+
get_deltalake_export_progress,
921+
)
922+
923+
if version == -1:
924+
version = get_latest_version(datastack_name)
925+
926+
progress = get_deltalake_export_progress(datastack_name, version, table_name)
927+
if progress is None:
928+
return {
929+
"message": f"No export progress found for {table_name} v{version}"
930+
}, 404
931+
932+
return progress, 200

materializationengine/celery_init.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from celery import Celery
22

3-
43
celery = Celery(
54
include=[
65
"materializationengine.workflows.ingest_new_annotations",
@@ -12,6 +11,7 @@
1211
"materializationengine.workflows.periodic_database_removal",
1312
"materializationengine.workflows.periodic_materialization",
1413
"materializationengine.workflows.spatial_lookup",
14+
"materializationengine.workflows.deltalake_export",
1515
"materializationengine.shared_tasks",
1616
"materializationengine.views",
1717
"materializationengine.monitor",

materializationengine/celery_worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def create_celery(app=None):
7171
"result_expires": 86400, # results expire in broker after 1 day
7272
"redis_socket_connect_timeout": 10,
7373
"broker_transport_options": {
74-
"visibility_timeout": 8000,
74+
"visibility_timeout": 21600,
7575
"socket_timeout": 20,
7676
"socket_connect_timeout": 20,
7777
}, # timeout (s) for tasks to be sent back to broker queue

materializationengine/config.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class BaseConfig:
4848
MERGE_TABLES = True
4949
AUTH_SERVICE_NAMESPACE = "datastack"
5050

51-
REDIS_HOST="localhost"
52-
REDIS_PORT=6379
53-
REDIS_PASSWORD=""
51+
REDIS_HOST = "localhost"
52+
REDIS_PORT = 6379
53+
REDIS_PASSWORD = ""
5454
SESSION_TYPE = "redis"
5555
PERMANENT_SESSION_LIFETIME = timedelta(hours=24)
5656
SESSION_PREFIX = "annotation_upload_"
@@ -62,6 +62,31 @@ class BaseConfig:
6262
STAGING_DATABASE_NAME = "staging"
6363
MATERIALIZATION_UPLOAD_BUCKET_PATH = "test_annotation_csv_upload"
6464

65+
# Delta Lake export settings
66+
DELTALAKE_OUTPUT_BUCKET = os.environ.get("DELTALAKE_OUTPUT_BUCKET", "")
67+
DELTALAKE_FLUSH_THRESHOLD_BYTES = int(
68+
os.environ.get("DELTALAKE_FLUSH_THRESHOLD_BYTES", 2 * 1024 * 1024 * 1024)
69+
)
70+
DELTALAKE_TARGET_PARTITION_SIZE_MB = int(
71+
os.environ.get("DELTALAKE_TARGET_PARTITION_SIZE_MB", 256)
72+
)
73+
DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS = int(
74+
os.environ.get("DELTALAKE_OPTIMIZE_MAX_CONCURRENT_TASKS", 1)
75+
)
76+
DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES = (
77+
int(os.environ["DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES"])
78+
if "DELTALAKE_OPTIMIZE_TARGET_SIZE_BYTES" in os.environ
79+
else None
80+
)
81+
# this one should help with memory during optimize if it is still a problem,
82+
# but has not been tested on the mesh worker nodes. im not sure how spilling to
83+
# disk on those will work out of the box
84+
DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES = (
85+
int(os.environ["DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES"])
86+
if "DELTALAKE_OPTIMIZE_MAX_SPILL_SIZE_BYTES" in os.environ
87+
else None
88+
)
89+
6590
if os.environ.get("DAF_CREDENTIALS", None) is not None:
6691
with open(os.environ.get("DAF_CREDENTIALS"), "r") as f:
6792
AUTH_TOKEN = json.load(f)["token"]
@@ -150,7 +175,6 @@ class DevConfig(BaseConfig):
150175
CELERY_BROKER_URL = REDIS_URL
151176
CELERY_RESULT_BACKEND = REDIS_URL
152177
USE_SENTINEL = os.environ.get("USE_SENTINEL", False)
153-
154178

155179

156180
class TestConfig(BaseConfig):
@@ -193,18 +217,20 @@ def configure_app(app: Flask) -> Flask:
193217
beat_schedules_before = app.config.get("BEAT_SCHEDULES", "NOT_SET")
194218
app.config.from_pyfile("config.cfg", silent=True)
195219
beat_schedules_after = app.config.get("BEAT_SCHEDULES", "NOT_SET")
196-
220+
197221
handler = logging.StreamHandler(sys.stdout)
198222
handler.setLevel(app.config["LOGGING_LEVEL"])
199223
app.logger.removeHandler(default_handler)
200224
app.logger.addHandler(handler)
201225
app.logger.setLevel(app.config["LOGGING_LEVEL"])
202226
app.logger.propagate = False
203-
227+
204228
# Log BEAT_SCHEDULES loading status (debug level)
205229
app.logger.debug(f"BEAT_SCHEDULES before config.cfg: {beat_schedules_before}")
206230
app.logger.debug(f"BEAT_SCHEDULES after config.cfg: {beat_schedules_after}")
207-
app.logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}")
231+
app.logger.debug(
232+
f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}"
233+
)
208234
app.logger.debug(app.config)
209235
app.app_context().push()
210236
return app

0 commit comments

Comments
 (0)