Skip to content

Commit 5427a95

Browse files
[python] Support REST function API in pypaimon (#7559)
1 parent 654f4e8 commit 5427a95

13 files changed

Lines changed: 1600 additions & 6 deletions

File tree

paimon-python/pypaimon/api/api_request.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222

2323
from pypaimon.common.identifier import Identifier
2424
from pypaimon.common.json_util import json_field
25+
from pypaimon.function.function_change import FunctionChange
26+
from pypaimon.function.function_definition import FunctionDefinition
27+
from pypaimon.schema.data_types import DataField
2528
from pypaimon.schema.schema import Schema
2629
from pypaimon.schema.schema_change import SchemaChange
2730
from pypaimon.snapshot.snapshot import Snapshot
@@ -94,3 +97,62 @@ class RollbackTableRequest(RESTRequest):
9497

9598
instant: Instant = json_field(FIELD_INSTANT)
9699
from_snapshot: Optional[int] = json_field(FIELD_FROM_SNAPSHOT)
100+
101+
102+
@dataclass
103+
class CreateFunctionRequest(RESTRequest):
104+
FIELD_NAME = "name"
105+
FIELD_INPUT_PARAMS = "inputParams"
106+
FIELD_RETURN_PARAMS = "returnParams"
107+
FIELD_DETERMINISTIC = "deterministic"
108+
FIELD_DEFINITIONS = "definitions"
109+
FIELD_COMMENT = "comment"
110+
FIELD_OPTIONS = "options"
111+
112+
name: str = json_field(FIELD_NAME)
113+
input_params: Optional[List[DataField]] = json_field(FIELD_INPUT_PARAMS, default=None)
114+
return_params: Optional[List[DataField]] = json_field(FIELD_RETURN_PARAMS, default=None)
115+
deterministic: bool = json_field(FIELD_DETERMINISTIC, default=False)
116+
definitions: Optional[Dict[str, FunctionDefinition]] = json_field(FIELD_DEFINITIONS, default=None)
117+
comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
118+
options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS, default=None)
119+
120+
def to_dict(self) -> Dict:
121+
result = {
122+
self.FIELD_NAME: self.name,
123+
self.FIELD_DETERMINISTIC: self.deterministic,
124+
}
125+
if self.input_params is not None:
126+
result[self.FIELD_INPUT_PARAMS] = [
127+
p.to_dict() if hasattr(p, 'to_dict') else p for p in self.input_params
128+
]
129+
else:
130+
result[self.FIELD_INPUT_PARAMS] = None
131+
if self.return_params is not None:
132+
result[self.FIELD_RETURN_PARAMS] = [
133+
p.to_dict() if hasattr(p, 'to_dict') else p for p in self.return_params
134+
]
135+
else:
136+
result[self.FIELD_RETURN_PARAMS] = None
137+
if self.definitions is not None:
138+
result[self.FIELD_DEFINITIONS] = {
139+
k: v.to_dict() if hasattr(v, 'to_dict') else v
140+
for k, v in self.definitions.items()
141+
}
142+
else:
143+
result[self.FIELD_DEFINITIONS] = None
144+
result[self.FIELD_COMMENT] = self.comment
145+
result[self.FIELD_OPTIONS] = self.options
146+
return result
147+
148+
149+
@dataclass
150+
class AlterFunctionRequest(RESTRequest):
151+
FIELD_CHANGES = "changes"
152+
153+
changes: List[FunctionChange] = json_field(FIELD_CHANGES)
154+
155+
def to_dict(self) -> Dict:
156+
return {
157+
self.FIELD_CHANGES: [c.to_dict() for c in self.changes]
158+
}

paimon-python/pypaimon/api/api_response.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
from dataclasses import dataclass
2121
from typing import Dict, Generic, List, Optional
2222

23+
from pypaimon.common.identifier import Identifier
2324
from pypaimon.common.json_util import T, json_field
2425
from pypaimon.common.options import Options
26+
from pypaimon.schema.data_types import DataField
2527
from pypaimon.schema.schema import Schema
2628
from pypaimon.snapshot.snapshot_commit import PartitionStatistics
2729
from pypaimon.snapshot.table_snapshot import TableSnapshot
@@ -327,3 +329,227 @@ def __init__(self, snapshot: Optional[TableSnapshot] = None):
327329

328330
def get_snapshot(self) -> Optional[TableSnapshot]:
329331
return self.snapshot
332+
333+
334+
@dataclass
335+
class GetFunctionResponse(AuditRESTResponse):
336+
"""Response for getting a function."""
337+
FIELD_UUID = "uuid"
338+
FIELD_NAME = "name"
339+
FIELD_INPUT_PARAMS = "inputParams"
340+
FIELD_RETURN_PARAMS = "returnParams"
341+
FIELD_DETERMINISTIC = "deterministic"
342+
FIELD_DEFINITIONS = "definitions"
343+
FIELD_COMMENT = "comment"
344+
FIELD_OPTIONS = "options"
345+
346+
uuid: Optional[str] = json_field(FIELD_UUID, default=None)
347+
name: Optional[str] = json_field(FIELD_NAME, default=None)
348+
input_params: Optional[List[DataField]] = json_field(FIELD_INPUT_PARAMS, default=None)
349+
return_params: Optional[List[DataField]] = json_field(FIELD_RETURN_PARAMS, default=None)
350+
deterministic: bool = json_field(FIELD_DETERMINISTIC, default=False)
351+
definitions: Optional[Dict[str, 'FunctionDefinition']] = json_field(FIELD_DEFINITIONS, default=None)
352+
comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
353+
options: Optional[Dict[str, str]] = json_field(FIELD_OPTIONS, default=None)
354+
355+
def __init__(
356+
self,
357+
uuid: Optional[str] = None,
358+
name: Optional[str] = None,
359+
input_params: Optional[List[DataField]] = None,
360+
return_params: Optional[List[DataField]] = None,
361+
deterministic: bool = False,
362+
definitions: Optional[Dict[str, 'FunctionDefinition']] = None,
363+
comment: Optional[str] = None,
364+
options: Optional[Dict[str, str]] = None,
365+
owner: Optional[str] = None,
366+
created_at: Optional[int] = None,
367+
created_by: Optional[str] = None,
368+
updated_at: Optional[int] = None,
369+
updated_by: Optional[str] = None,
370+
):
371+
super().__init__(owner, created_at, created_by, updated_at, updated_by)
372+
self.uuid = uuid
373+
self.name = name
374+
self.input_params = input_params
375+
self.return_params = return_params
376+
self.deterministic = deterministic
377+
self.definitions = definitions
378+
self.comment = comment
379+
self.options = options
380+
381+
def to_function(self, identifier):
382+
from pypaimon.function.function import FunctionImpl
383+
return FunctionImpl(
384+
identifier=identifier,
385+
input_params=self.input_params,
386+
return_params=self.return_params,
387+
deterministic=self.deterministic,
388+
definitions=self.definitions or {},
389+
comment=self.comment,
390+
options=self.options or {},
391+
)
392+
393+
@staticmethod
394+
def _parse_data_fields(raw: Optional[list]) -> Optional[List[DataField]]:
395+
if raw is None:
396+
return None
397+
return [DataField.from_dict(f) if isinstance(f, dict) else f for f in raw]
398+
399+
@staticmethod
400+
def _parse_definitions(raw) -> Optional[Dict[str, 'FunctionDefinition']]:
401+
from pypaimon.function.function_definition import FunctionDefinition
402+
if raw is None:
403+
return None
404+
return {
405+
k: FunctionDefinition.from_dict(v) if isinstance(v, dict) else v
406+
for k, v in raw.items()
407+
}
408+
409+
@classmethod
410+
def from_dict(cls, data: Dict) -> "GetFunctionResponse":
411+
return cls(
412+
uuid=data.get("uuid"),
413+
name=data.get("name"),
414+
input_params=cls._parse_data_fields(data.get("inputParams")),
415+
return_params=cls._parse_data_fields(data.get("returnParams")),
416+
deterministic=data.get("deterministic", False),
417+
definitions=cls._parse_definitions(data.get("definitions")),
418+
comment=data.get("comment"),
419+
options=data.get("options"),
420+
owner=data.get("owner"),
421+
created_at=data.get("createdAt"),
422+
created_by=data.get("createdBy"),
423+
updated_at=data.get("updatedAt"),
424+
updated_by=data.get("updatedBy"),
425+
)
426+
427+
def to_dict(self) -> Dict:
428+
result = {}
429+
if self.uuid is not None:
430+
result["uuid"] = self.uuid
431+
result["name"] = self.name
432+
result["inputParams"] = (
433+
[p.to_dict() if hasattr(p, 'to_dict') else p for p in self.input_params]
434+
if self.input_params is not None else None
435+
)
436+
result["returnParams"] = (
437+
[p.to_dict() if hasattr(p, 'to_dict') else p for p in self.return_params]
438+
if self.return_params is not None else None
439+
)
440+
result["deterministic"] = self.deterministic
441+
if self.definitions is not None:
442+
result["definitions"] = {
443+
k: v.to_dict() if hasattr(v, 'to_dict') else v
444+
for k, v in self.definitions.items()
445+
}
446+
else:
447+
result["definitions"] = None
448+
result["comment"] = self.comment
449+
result["options"] = self.options
450+
if self.owner is not None:
451+
result["owner"] = self.owner
452+
if self.created_at is not None:
453+
result["createdAt"] = self.created_at
454+
if self.created_by is not None:
455+
result["createdBy"] = self.created_by
456+
if self.updated_at is not None:
457+
result["updatedAt"] = self.updated_at
458+
if self.updated_by is not None:
459+
result["updatedBy"] = self.updated_by
460+
return result
461+
462+
463+
@dataclass
464+
class ListFunctionsResponse(PagedResponse[str]):
465+
"""Response for listing functions."""
466+
FIELD_FUNCTIONS = "functions"
467+
468+
functions: Optional[List[str]] = json_field(FIELD_FUNCTIONS, default=None)
469+
next_page_token: Optional[str] = json_field(
470+
PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
471+
472+
def data(self) -> Optional[List[str]]:
473+
return self.functions
474+
475+
def get_next_page_token(self) -> Optional[str]:
476+
return self.next_page_token
477+
478+
479+
@dataclass
480+
class ListFunctionDetailsResponse(PagedResponse['GetFunctionResponse']):
481+
"""Response for listing function details."""
482+
FIELD_FUNCTION_DETAILS = "functionDetails"
483+
484+
function_details: Optional[List[GetFunctionResponse]] = json_field(
485+
FIELD_FUNCTION_DETAILS, default=None)
486+
next_page_token: Optional[str] = json_field(
487+
PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
488+
489+
def data(self) -> Optional[List[GetFunctionResponse]]:
490+
return self.function_details
491+
492+
def get_next_page_token(self) -> Optional[str]:
493+
return self.next_page_token
494+
495+
@classmethod
496+
def from_dict(cls, data: Dict) -> "ListFunctionDetailsResponse":
497+
details = data.get("functionDetails")
498+
if details is not None:
499+
details = [GetFunctionResponse.from_dict(d) for d in details]
500+
return cls(
501+
function_details=details,
502+
next_page_token=data.get("nextPageToken"),
503+
)
504+
505+
def to_dict(self) -> Dict:
506+
result = {}
507+
if self.function_details is not None:
508+
result["functionDetails"] = [d.to_dict() for d in self.function_details]
509+
else:
510+
result["functionDetails"] = None
511+
result["nextPageToken"] = self.next_page_token
512+
return result
513+
514+
515+
@dataclass
516+
class ListFunctionsGloballyResponse(PagedResponse[Identifier]):
517+
"""Response for listing functions globally across databases."""
518+
FIELD_FUNCTIONS = "functions"
519+
520+
functions: Optional[List[Identifier]] = json_field(FIELD_FUNCTIONS, default=None)
521+
next_page_token: Optional[str] = json_field(
522+
PagedResponse.FIELD_NEXT_PAGE_TOKEN, default=None)
523+
524+
def data(self) -> Optional[List[Identifier]]:
525+
return self.functions
526+
527+
def get_next_page_token(self) -> Optional[str]:
528+
return self.next_page_token
529+
530+
@classmethod
531+
def from_dict(cls, data: Dict) -> "ListFunctionsGloballyResponse":
532+
functions = data.get("functions")
533+
if functions is not None:
534+
functions = [
535+
Identifier.from_string(f) if isinstance(f, str) else
536+
Identifier.create(f.get("database"), f.get("object"))
537+
if isinstance(f, dict) else f
538+
for f in functions
539+
]
540+
return cls(
541+
functions=functions,
542+
next_page_token=data.get("nextPageToken"),
543+
)
544+
545+
def to_dict(self) -> Dict:
546+
result = {}
547+
if self.functions is not None:
548+
result["functions"] = [
549+
{"database": f.get_database_name(), "object": f.get_object_name()}
550+
for f in self.functions
551+
]
552+
else:
553+
result["functions"] = None
554+
result["nextPageToken"] = self.next_page_token
555+
return result

paimon-python/pypaimon/api/resource_paths.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ class ResourcePaths:
2828
TABLES = "tables"
2929
TABLE_DETAILS = "table-details"
3030
PARTITIONS = "partitions"
31+
FUNCTIONS = "functions"
32+
FUNCTION_DETAILS = "function-details"
3133

3234
def __init__(self, prefix: str):
3335
self.base_path = "/{}/{}".format(self.V1, prefix).rstrip("/")
@@ -83,3 +85,17 @@ def table_snapshot(self, database_name: str, table_name: str) -> str:
8385
def partitions(self, database_name: str, table_name: str) -> str:
8486
return ("{}/{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
8587
self.TABLES, RESTUtil.encode_string(table_name), self.PARTITIONS))
88+
89+
def functions(self, database_name: Optional[str] = None) -> str:
90+
if database_name:
91+
return "{}/{}/{}/{}".format(self.base_path, self.DATABASES,
92+
RESTUtil.encode_string(database_name), self.FUNCTIONS)
93+
return "{}/{}".format(self.base_path, self.FUNCTIONS)
94+
95+
def function_details(self, database_name: str) -> str:
96+
return "{}/{}/{}/{}".format(self.base_path, self.DATABASES,
97+
RESTUtil.encode_string(database_name), self.FUNCTION_DETAILS)
98+
99+
def function(self, database_name: str, function_name: str) -> str:
100+
return "{}/{}/{}/{}/{}".format(self.base_path, self.DATABASES, RESTUtil.encode_string(database_name),
101+
self.FUNCTIONS, RESTUtil.encode_string(function_name))

0 commit comments

Comments
 (0)