-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathutils.py
More file actions
339 lines (275 loc) · 11 KB
/
utils.py
File metadata and controls
339 lines (275 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
#!/usr/bin/env python
import datetime
import os
from typing import Any
from typing import Dict
from typing import Optional
from typing import Union
from ...exceptions import ManagementError
from ...management import files as mgmt_files
from ...management import manage_workspaces
from ...management.files import FilesManager
from ...management.files import FileSpace
from ...management.files import manage_files
from ...management.inference_api import InferenceAPIInfo
from ...management.inference_api import InferenceAPIManager
from ...management.workspace import StarterWorkspace
from ...management.workspace import Workspace
from ...management.workspace import WorkspaceGroup
from ...management.workspace import WorkspaceManager
def get_workspace_manager() -> WorkspaceManager:
"""Return a new workspace manager."""
return manage_workspaces()
def get_files_manager() -> FilesManager:
"""Return a new files manager."""
return manage_files()
def dt_isoformat(dt: Optional[datetime.datetime]) -> Optional[str]:
"""Convert datetime to string."""
if dt is None:
return None
return dt.isoformat()
def get_workspace_group(params: Dict[str, Any]) -> WorkspaceGroup:
"""
Find a workspace group matching group_id or group_name.
This function will get a workspace group or ID from the
following parameters:
* params['group_name']
* params['group_id']
* params['group']['group_name']
* params['group']['group_id']
* params['in_group']['group_name']
* params['in_group']['group_id']
Or, from the SINGLESTOREDB_WORKSPACE_GROUP environment variable.
"""
manager = get_workspace_manager()
group_name = params.get('group_name') or \
(params.get('in_group') or {}).get('group_name') or \
(params.get('group') or {}).get('group_name')
if group_name:
workspace_groups = [
x for x in manager.workspace_groups
if x.name == group_name
]
if not workspace_groups:
raise KeyError(
f'no workspace group found with name: {group_name}',
)
if len(workspace_groups) > 1:
ids = ', '.join(x.id for x in workspace_groups)
raise ValueError(
f'more than one workspace group with given name was found: {ids}',
)
return workspace_groups[0]
group_id = params.get('group_id') or \
(params.get('in_group') or {}).get('group_id') or \
(params.get('group') or {}).get('group_id')
if group_id:
try:
return manager.get_workspace_group(group_id)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(f'no workspace group found with ID: {group_id}')
raise
if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
try:
return manager.get_workspace_group(
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(
'no workspace found with ID: '
f'{os.environ["SINGLESTOREDB_WORKSPACE_GROUP"]}',
)
raise
if os.environ.get('SINGLESTOREDB_CLUSTER'):
raise ValueError('clusters and shared workspaces are not currently supported')
raise KeyError('no workspace group was specified')
def get_workspace(params: Dict[str, Any]) -> Workspace:
"""
Retrieve the specified workspace.
This function will get a workspace group or ID from the
following parameters:
* params['workspace_name']
* params['workspace_id']
* params['workspace']['workspace_name']
* params['workspace']['workspace_id']
Or, from the SINGLESTOREDB_WORKSPACE environment variable.
"""
manager = get_workspace_manager()
workspace_name = params.get('workspace_name') or \
(params.get('workspace') or {}).get('workspace_name')
if workspace_name:
wg = get_workspace_group(params)
workspaces = [
x for x in wg.workspaces
if x.name == workspace_name
]
if not workspaces:
raise KeyError(
f'no workspace found with name: {workspace_name}',
)
if len(workspaces) > 1:
ids = ', '.join(x.id for x in workspaces)
raise ValueError(
f'more than one workspace with given name was found: {ids}',
)
return workspaces[0]
workspace_id = params.get('workspace_id') or \
(params.get('workspace') or {}).get('workspace_id')
if workspace_id:
try:
return manager.get_workspace(workspace_id)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(f'no workspace found with ID: {workspace_id}')
raise
if os.environ.get('SINGLESTOREDB_WORKSPACE'):
try:
return manager.get_workspace(
os.environ['SINGLESTOREDB_WORKSPACE'],
)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(
'no workspace found with ID: '
f'{os.environ["SINGLESTOREDB_WORKSPACE"]}',
)
raise
if os.environ.get('SINGLESTOREDB_CLUSTER'):
raise ValueError('clusters and shared workspaces are not currently supported')
raise KeyError('no workspace was specified')
def get_deployment(
params: Dict[str, Any],
) -> Union[WorkspaceGroup, StarterWorkspace]:
"""
Find a starter workspace matching deployment_id or deployment_name.
This function will get a starter workspace or ID from the
following parameters:
* params['deployment_name']
* params['deployment_id']
* params['group']['deployment_name']
* params['group']['deployment_id']
* params['in_deployment']['deployment_name']
* params['in_deployment']['deployment_id']
* params['in']['in_group']['deployment_name']
* params['in']['in_group']['deployment_id']
* params['in']['in_deployment']['deployment_name']
* params['in']['in_deployment']['deployment_id']
Or, from the SINGLESTOREDB_WORKSPACE_GROUP
or SINGLESTOREDB_CLUSTER environment variables.
"""
manager = get_workspace_manager()
#
# Search for deployment by name
#
deployment_name = params.get('deployment_name') or \
(params.get('in_deployment') or {}).get('deployment_name') or \
(params.get('group') or {}).get('deployment_name') or \
((params.get('in') or {}).get('in_group') or {}).get('deployment_name') or \
((params.get('in') or {}).get('in_deployment') or {}).get('deployment_name')
if deployment_name:
# Standard workspace group
workspace_groups = [
x for x in manager.workspace_groups
if x.name == deployment_name
]
if len(workspace_groups) == 1:
return workspace_groups[0]
elif len(workspace_groups) > 1:
ids = ', '.join(x.id for x in workspace_groups)
raise ValueError(
f'more than one workspace group with given name was found: {ids}',
)
# Starter workspace
starter_workspaces = [
x for x in manager.starter_workspaces
if x.name == deployment_name
]
if len(starter_workspaces) == 1:
return starter_workspaces[0]
elif len(starter_workspaces) > 1:
ids = ', '.join(x.id for x in starter_workspaces)
raise ValueError(
f'more than one starter workspace with given name was found: {ids}',
)
raise KeyError(f'no deployment found with name: {deployment_name}')
#
# Search for deployment by ID
#
deployment_id = params.get('deployment_id') or \
(params.get('in_deployment') or {}).get('deployment_id') or \
(params.get('group') or {}).get('deployment_id') or \
((params.get('in') or {}).get('in_group') or {}).get('deployment_id') or \
((params.get('in') or {}).get('in_deployment') or {}).get('deployment_id')
if deployment_id:
try:
# Standard workspace group
return manager.get_workspace_group(deployment_id)
except ManagementError as exc:
if exc.errno == 404:
try:
# Starter workspace
return manager.get_starter_workspace(deployment_id)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(f'no deployment found with ID: {deployment_id}')
raise
else:
raise
# Use workspace group from environment
if os.environ.get('SINGLESTOREDB_WORKSPACE_GROUP'):
try:
return manager.get_workspace_group(
os.environ['SINGLESTOREDB_WORKSPACE_GROUP'],
)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(
'no workspace found with ID: '
f'{os.environ["SINGLESTOREDB_WORKSPACE_GROUP"]}',
)
raise
# Use cluster from environment
if os.environ.get('SINGLESTOREDB_CLUSTER'):
try:
return manager.get_starter_workspace(
os.environ['SINGLESTOREDB_CLUSTER'],
)
except ManagementError as exc:
if exc.errno == 404:
raise KeyError(
'no starter workspace found with ID: '
f'{os.environ["SINGLESTOREDB_CLUSTER"]}',
)
raise
raise KeyError('no deployment was specified')
def get_file_space(params: Dict[str, Any]) -> FileSpace:
"""
Retrieve the specified file space.
This function will get a file space from the
following parameters:
* params['file_location']
"""
manager = get_files_manager()
file_location = params.get('file_location')
if file_location:
file_location_lower_case = file_location.lower()
if file_location_lower_case == mgmt_files.PERSONAL_SPACE:
return manager.personal_space
elif file_location_lower_case == mgmt_files.SHARED_SPACE:
return manager.shared_space
elif file_location_lower_case == mgmt_files.MODELS_SPACE:
return manager.models_space
else:
raise ValueError(f'invalid file location: {file_location}')
raise KeyError('no file space was specified')
def get_inference_api_manager() -> InferenceAPIManager:
"""Return the inference API manager for the current project."""
wm = get_workspace_manager()
return wm.organization.inference_apis
def get_inference_api(params: Dict[str, Any]) -> InferenceAPIInfo:
"""Return an inference API based on model name in params."""
inference_apis = get_inference_api_manager()
model_name = params['model_name']
return inference_apis.get(model_name)