-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathutils.py
More file actions
160 lines (127 loc) · 5.48 KB
/
Copy pathutils.py
File metadata and controls
160 lines (127 loc) · 5.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
from dynamicannotationdb.schema import DynamicSchemaClient
from geoalchemy2.shape import to_shape
from flask import current_app, abort, g
from middle_auth_client.decorators import users_share_common_group
from celery.utils.log import get_task_logger
celery_logger = get_task_logger(__name__)
def get_app_base_path():
return os.path.dirname(os.path.realpath(__file__))
def get_instance_folder_path():
return os.path.join(get_app_base_path(), "instance")
def make_root_id_column_name(column_name: str):
pos = column_name.split("_")[0]
pos_type = column_name.split("_")[1]
return f"{pos}_{pos_type}_root_id"
def build_materialized_table_id(aligned_volume: str, table_name: str) -> str:
return f"mat__{aligned_volume}__{table_name}"
def get_query_columns_by_suffix(AnnotationModel, SegmentationModel, suffix):
seg_columns = [column.name for column in SegmentationModel.__table__.columns]
anno_columns = [column.name for column in AnnotationModel.__table__.columns]
matched_columns = set()
for column in seg_columns:
prefix = column.split("_")[0]
for anno_col in anno_columns:
if anno_col.startswith(prefix):
matched_columns.add(anno_col)
matched_columns.remove("id")
supervoxel_columns = [
f"{col.rsplit('_', 1)[0]}_{suffix}"
for col in matched_columns
if col != "annotation_id"
]
# # create model columns for querying
anno_model_cols = [getattr(AnnotationModel, name) for name in matched_columns]
anno_model_cols.append(AnnotationModel.id)
seg_model_cols = [getattr(SegmentationModel, name) for name in supervoxel_columns]
# add id columns to lookup
seg_model_cols.extend([SegmentationModel.id])
return anno_model_cols, seg_model_cols, supervoxel_columns
def get_geom_from_wkb(wkb):
wkb_element = to_shape(wkb)
if wkb_element.has_z:
return [
int(wkb_element.xy[0][0]),
int(wkb_element.xy[1][0]),
int(wkb_element.z),
]
def create_segmentation_model(mat_metadata, reset_cache: bool = False):
annotation_table_name = mat_metadata.get("annotation_table_name")
schema_type = mat_metadata.get("schema")
table_metadata = {"reference_table": mat_metadata.get("reference_table")}
pcg_table_name = mat_metadata.get("pcg_table_name")
schema_client = DynamicSchemaClient()
SegmentationModel = schema_client.create_segmentation_model(
table_name=annotation_table_name,
schema_type=schema_type,
segmentation_source=pcg_table_name,
table_metadata=table_metadata,
reset_cache=reset_cache,
)
celery_logger.debug(
f"SEGMENTATION----------------------- {SegmentationModel.__table__.columns}"
)
return SegmentationModel
def create_annotation_model(
mat_metadata, with_crud_columns: bool = True, reset_cache: bool = False
):
annotation_table_name = mat_metadata.get("annotation_table_name")
schema_type = mat_metadata.get("schema")
table_metadata = {"reference_table": mat_metadata.get("reference_table")}
schema_client = DynamicSchemaClient()
AnnotationModel = schema_client.create_annotation_model(
table_name=annotation_table_name,
schema_type=schema_type,
table_metadata=table_metadata,
with_crud_columns=with_crud_columns,
reset_cache=reset_cache,
)
celery_logger.debug(
f"ANNOTATION----------------------- {AnnotationModel.__table__.columns}"
)
return AnnotationModel
def get_config_param(config_param: str):
try:
return current_app.config[config_param]
except Exception:
return os.environ[config_param]
def check_write_permission(db, table_name):
metadata = db.database.get_table_metadata(table_name)
if metadata["user_id"] != str(g.auth_user["id"]):
if metadata["write_permission"] == "GROUP":
if not users_share_common_group(metadata["user_id"]):
abort(
401,
"Unauthorized: You cannot write because you do not share a common group with the creator of this table.",
)
else:
abort(401, "Unauthorized: The table can only be written to by owner")
return metadata
def check_read_permission(db, table_name):
"""Check if user has read permission for table
Args:
db (DynamicAnnotationInterface): db to check
table_name (str): table to check
Returns:
dict: metadata of table
"""
metadata = db.database.get_table_metadata(table_name)
if metadata is None:
abort(404, f"Table {table_name} not found in {db.aligned_volume} database")
if metadata["read_permission"] == "GROUP":
if not users_share_common_group(metadata["user_id"]):
abort(
401,
"Unauthorized: You cannot read this table because you do not share a common group with the creator of this table.",
)
elif metadata["read_permission"] == "PRIVATE":
if metadata["user_id"] != str(g.auth_user["id"]):
abort(401, "Unauthorized: The table can only be read by its owner")
return metadata
def check_ownership(db, table_name):
metadata = db.database.get_table_metadata(table_name)
if metadata is None:
abort(404, f"Table {table_name} not found in {db.aligned_volume} database")
if metadata["user_id"] != str(g.auth_user["id"]):
abort(401, "You cannot do this because you are not the owner of this table")
return metadata