Skip to content

Commit e81f39e

Browse files
committed
Add Iceberg export commands; change refresh interval to int
1 parent 6ec3064 commit e81f39e

File tree

2 files changed

+156
-19
lines changed

2 files changed

+156
-19
lines changed

singlestoredb/fusion/handlers/export.py

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Optional
77

88
from .. import result
9+
from ...management.export import _get_exports
910
from ...management.export import ExportService
1011
from ...management.export import ExportStatus
1112
from ..handler import SQLHandler
@@ -187,7 +188,8 @@ def _start_export(params: Dict[str, Any]) -> Optional[FusionSQLResult]:
187188
order_by=order_by or None,
188189
properties=json.loads(params['properties']) if params['properties'] else None,
189190
incremental=params.get('incremental', False),
190-
refresh_interval=refresh_interval_delta,
191+
refresh_interval=int(refresh_interval_delta.total_seconds())
192+
if refresh_interval_delta is not None else None,
191193
).start()
192194

193195
res = FusionSQLResult()
@@ -387,6 +389,25 @@ def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
387389
StartIncrementalExport.register(overwrite=True)
388390

389391

392+
def _format_status(export_id: str, status: ExportStatus) -> Optional[FusionSQLResult]:
393+
"""Return the status of an export operation."""
394+
info = status._info()
395+
396+
res = FusionSQLResult()
397+
res.add_field('ExportID', result.STRING)
398+
res.add_field('Status', result.STRING)
399+
res.add_field('Message', result.STRING)
400+
res.set_rows([
401+
(
402+
export_id,
403+
info.get('status', 'Unknown'),
404+
info.get('statusMsg', ''),
405+
),
406+
])
407+
408+
return res
409+
410+
390411
class ShowExport(SQLHandler):
391412
"""
392413
SHOW EXPORT export_id;
@@ -400,31 +421,92 @@ class ShowExport(SQLHandler):
400421

401422
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
402423
wsg = get_workspace_group({})
403-
out = ExportStatus(params['export_id'], wsg)
424+
return _format_status(
425+
params['export_id'], ExportStatus(params['export_id'], wsg),
426+
)
404427

405-
status = out._info()
428+
429+
ShowExport.register(overwrite=True)
430+
431+
432+
class ShowExports(SQLHandler):
433+
"""
434+
SHOW EXPORTS [ scope ];
435+
436+
# Location of the export
437+
scope = FOR '<scope>'
438+
439+
"""
440+
441+
_enabled = False
442+
443+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
444+
wsg = get_workspace_group({})
445+
446+
exports = _get_exports(wsg, params.get('scope', 'all'))
406447

407448
res = FusionSQLResult()
408449
res.add_field('ExportID', result.STRING)
409450
res.add_field('Status', result.STRING)
410451
res.add_field('Message', result.STRING)
411452
res.set_rows([
412453
(
413-
params['export_id'],
414-
status.get('status', 'Unknown'),
415-
status.get('statusMsg', ''),
416-
),
454+
info['egressID'],
455+
info.get('status', 'Unknown'),
456+
info.get('statusMsg', ''),
457+
)
458+
for info in [x._info() for x in exports]
417459
])
418460

419461
return res
420462

421463

422-
ShowExport.register(overwrite=True)
464+
ShowExports.register(overwrite=True)
465+
466+
467+
class SuspendExport(SQLHandler):
468+
"""
469+
SUSPEND EXPORT export_id;
470+
471+
# ID of export
472+
export_id = '<export-id>'
473+
474+
"""
475+
476+
_enabled = False
477+
478+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
479+
wsg = get_workspace_group({})
480+
service = ExportService.from_export_id(wsg, params['export_id'])
481+
return _format_status(params['export_id'], service.suspend())
482+
483+
484+
SuspendExport.register(overwrite=True)
485+
486+
487+
class ResumeExport(SQLHandler):
488+
"""
489+
RESUME EXPORT export_id;
490+
491+
# ID of export
492+
export_id = '<export-id>'
493+
494+
"""
495+
496+
_enabled = False
497+
498+
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
499+
wsg = get_workspace_group({})
500+
service = ExportService.from_export_id(wsg, params['export_id'])
501+
return _format_status(params['export_id'], service.resume())
502+
503+
504+
ResumeExport.register(overwrite=True)
423505

424506

425-
class StopExport(SQLHandler):
507+
class DropExport(SQLHandler):
426508
"""
427-
STOP EXPORT export_id;
509+
DROP EXPORT export_id;
428510
429511
# ID of export
430512
export_id = '<export-id>'
@@ -436,8 +518,8 @@ class StopExport(SQLHandler):
436518
def run(self, params: Dict[str, Any]) -> Optional[FusionSQLResult]:
437519
wsg = get_workspace_group({})
438520
service = ExportService.from_export_id(wsg, params['export_id'])
439-
service.stop()
521+
service.drop()
440522
return None
441523

442524

443-
StopExport.register(overwrite=True)
525+
DropExport.register(overwrite=True)

singlestoredb/management/export.py

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from __future__ import annotations
44

55
import copy
6-
import datetime
76
import json
87
from typing import Any
98
from typing import Dict
@@ -29,7 +28,7 @@ class ExportService(object):
2928
order_by: Optional[List[Dict[str, Dict[str, str]]]]
3029
properties: Optional[Dict[str, Any]]
3130
incremental: bool
32-
refresh_interval: Optional[datetime.timedelta]
31+
refresh_interval: Optional[int]
3332
export_id: Optional[str]
3433

3534
def __init__(
@@ -43,7 +42,7 @@ def __init__(
4342
partition_by: Optional[List[Dict[str, str]]] = None,
4443
order_by: Optional[List[Dict[str, Dict[str, str]]]] = None,
4544
incremental: bool = False,
46-
refresh_interval: Optional[datetime.timedelta] = None,
45+
refresh_interval: Optional[int] = None,
4746
properties: Optional[Dict[str, Any]] = None,
4847
):
4948
#: Workspace group
@@ -156,7 +155,7 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
156155
sortOrderSpec=sort_order_spec,
157156
properties=self.properties,
158157
incremental=self.incremental or None,
159-
refreshInterval=int(self.refresh_interval.total_seconds())
158+
refreshInterval=self.refresh_interval
160159
if self.refresh_interval is not None else None,
161160
).items() if v is not None
162161
},
@@ -166,8 +165,8 @@ def start(self, tags: Optional[List[str]] = None) -> 'ExportStatus':
166165

167166
return ExportStatus(self.export_id, self.workspace_group)
168167

169-
def stop(self) -> 'ExportStatus':
170-
"""Stop the export process."""
168+
def suspend(self) -> 'ExportStatus':
169+
"""Suspend the export process."""
171170
if self._manager is None:
172171
raise ManagementError(
173172
msg='No workspace manager is associated with this object.',
@@ -179,12 +178,50 @@ def stop(self) -> 'ExportStatus':
179178
)
180179

181180
self._manager._post(
182-
f'workspaceGroups/{self.workspace_group.id}/egress/stopTableEgress',
181+
f'workspaceGroups/{self.workspace_group.id}/egress/suspendTableEgress',
183182
json=dict(egressID=self.export_id),
184183
)
185184

186185
return ExportStatus(self.export_id, self.workspace_group)
187186

187+
def resume(self) -> 'ExportStatus':
188+
"""Resume the export process."""
189+
if self._manager is None:
190+
raise ManagementError(
191+
msg='No workspace manager is associated with this object.',
192+
)
193+
194+
if self.export_id is None:
195+
raise ManagementError(
196+
msg='Export ID is not set. You must start the export first.',
197+
)
198+
199+
self._manager._post(
200+
f'workspaceGroups/{self.workspace_group.id}/egress/resumeTableEgress',
201+
json=dict(egressID=self.export_id),
202+
)
203+
204+
return ExportStatus(self.export_id, self.workspace_group)
205+
206+
def drop(self) -> None:
207+
"""Drop the export process."""
208+
if self._manager is None:
209+
raise ManagementError(
210+
msg='No workspace manager is associated with this object.',
211+
)
212+
213+
if self.export_id is None:
214+
raise ManagementError(
215+
msg='Export ID is not set. You must start the export first.',
216+
)
217+
218+
self._manager._post(
219+
f'workspaceGroups/{self.workspace_group.id}/egress/dropTableEgress',
220+
json=dict(egressID=self.export_id),
221+
)
222+
223+
return None
224+
188225
def status(self) -> ExportStatus:
189226
"""Get the status of the export process."""
190227
if self._manager is None:
@@ -238,3 +275,21 @@ def __str__(self) -> str:
238275

239276
def __repr__(self) -> str:
240277
return self.status
278+
279+
280+
def _get_exports(
281+
workspace_group: WorkspaceGroup,
282+
scope: str = 'all',
283+
) -> List[ExportStatus]:
284+
"""Get all exports in the workspace group."""
285+
if workspace_group._manager is None:
286+
raise ManagementError(
287+
msg='No workspace manager is associated with this object.',
288+
)
289+
290+
out = workspace_group._manager._get(
291+
f'workspaceGroups/{workspace_group.id}/egress/tableEgressStatus',
292+
json=dict(scope=scope),
293+
)
294+
295+
return out.json()

0 commit comments

Comments
 (0)