-
Notifications
You must be signed in to change notification settings - Fork 479
Support CreateTableTransaction in Glue and Rest #498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
998c6f1
8eace7c
64e6346
ffb8ff6
3a579cd
049e0e2
df0c5ed
755aebf
c98b3b4
09b60ca
04ef8df
211de32
d57ac1c
978a0aa
a413c2e
ad840d5
47ce986
1f5cc28
9ac2f7f
d2617fb
44df2d7
1a4d262
f7c04cf
cecf1c0
2152542
c449fb0
8fc1562
b99c619
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,9 +45,11 @@ | |
| from pyiceberg.table import ( | ||
| CommitTableRequest, | ||
| CommitTableResponse, | ||
| CreateTableTransaction, | ||
| StagedTable, | ||
| Table, | ||
| ) | ||
| from pyiceberg.table.metadata import TableMetadata | ||
| from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata | ||
| from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||
| from pyiceberg.typedef import ( | ||
| EMPTY_DICT, | ||
|
|
@@ -288,6 +290,78 @@ def __init__(self, name: str, **properties: str): | |
| def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: | ||
| return load_file_io({**self.properties, **properties}, location) | ||
|
|
||
| def _create_staged_table( | ||
| self, | ||
| identifier: Union[str, Identifier], | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: Optional[str] = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> StagedTable: | ||
| """Create a table and return the table instance without committing the changes. | ||
|
|
||
| Args: | ||
| identifier (str | Identifier): Table identifier. | ||
| schema (Schema): Table's schema. | ||
| location (str | None): Location for the table. Optional Argument. | ||
| partition_spec (PartitionSpec): PartitionSpec for the table. | ||
| sort_order (SortOrder): SortOrder for the table. | ||
| properties (Properties): Table properties that can be a string based dictionary. | ||
|
|
||
| Returns: | ||
| Table: the created table instance. | ||
|
|
||
| Raises: | ||
| TableAlreadyExistsError: If a table with the name already exists. | ||
| """ | ||
| schema: Schema = self._convert_schema_if_needed(schema) # type: ignore | ||
|
|
||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| location = self._resolve_table_location(location, database_name, table_name) | ||
| metadata_location = self._get_metadata_location(location=location) | ||
| metadata = new_table_metadata( | ||
| location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties | ||
| ) | ||
| io = load_file_io(properties=self.properties, location=metadata_location) | ||
| return StagedTable( | ||
| identifier=(self.name, database_name, table_name), | ||
| metadata=metadata, | ||
| metadata_location=metadata_location, | ||
| io=io, | ||
| catalog=self, | ||
| ) | ||
|
|
||
| def create_table_transaction( | ||
|
Fokko marked this conversation as resolved.
|
||
| self, | ||
| identifier: Union[str, Identifier], | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: Optional[str] = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> CreateTableTransaction: | ||
| """Create a CreateTableTransaction. | ||
|
|
||
| Args: | ||
| identifier (str | Identifier): Table identifier. | ||
| schema (Schema): Table's schema. | ||
| location (str | None): Location for the table. Optional Argument. | ||
| partition_spec (PartitionSpec): PartitionSpec for the table. | ||
| sort_order (SortOrder): SortOrder for the table. | ||
| properties (Properties): Table properties that can be a string based dictionary. | ||
|
|
||
| Returns: | ||
| CreateTableTransaction: createTableTransaction instance. | ||
|
|
||
| Raises: | ||
| TableAlreadyExistsError: If a table with the name already exists. | ||
| """ | ||
| return CreateTableTransaction( | ||
| self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) | ||
| ) | ||
|
|
||
| @abstractmethod | ||
| def create_table( | ||
| self, | ||
|
|
@@ -717,6 +791,10 @@ def _get_updated_props_and_update_summary( | |
|
|
||
| return properties_update_summary, updated_properties | ||
|
|
||
| @staticmethod | ||
| def empty_table_metadata() -> TableMetadata: | ||
| return TableMetadataV1(location="", last_column_id=-1, schema=Schema()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I recommend creating a V2 table by default. This is also the case for Java. Partition evolution is very awkward for V1 tables (keeping the old partitions as null-transforms).
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default is still V2 table. When creating a createTableTransaction, we first call If we use V2Metadata here, we won't be able to create any V1 table since we cannot downgrade from V2 to V1. I think this is the same issue in I added some comments to explain the purpose and also make it private.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the detailed explanation, that makes sense to me 👍 |
||
|
|
||
| def __repr__(self) -> str: | ||
| """Return the string representation of the Catalog class.""" | ||
| return f"{self.name} ({self.__class__})" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,7 @@ | |
| from pyiceberg.table import ( | ||
| CommitTableRequest, | ||
| CommitTableResponse, | ||
| StagedTable, | ||
| Table, | ||
| TableIdentifier, | ||
| ) | ||
|
|
@@ -135,7 +136,7 @@ def _retry_hook(retry_state: RetryCallState) -> None: | |
|
|
||
|
|
||
| class TableResponse(IcebergBaseModel): | ||
| metadata_location: str = Field(alias="metadata-location") | ||
| metadata_location: Optional[str] = Field(alias="metadata-location") | ||
| metadata: TableMetadata | ||
| config: Properties = Field(default_factory=dict) | ||
|
|
||
|
|
@@ -460,7 +461,18 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin | |
| def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> Table: | ||
| return Table( | ||
| identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple, | ||
| metadata_location=table_response.metadata_location, | ||
| metadata_location=table_response.metadata_location, # type: ignore | ||
| metadata=table_response.metadata, | ||
| io=self._load_file_io( | ||
| {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location | ||
| ), | ||
| catalog=self, | ||
| ) | ||
|
|
||
| def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_response: TableResponse) -> StagedTable: | ||
| return StagedTable( | ||
| identifier=(self.name,) + identifier_tuple if self.name else identifier_tuple, | ||
| metadata_location=table_response.metadata_location, # type: ignore | ||
| metadata=table_response.metadata, | ||
| io=self._load_file_io( | ||
| {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location | ||
|
|
@@ -490,16 +502,16 @@ def _config_headers(self, session: Session) -> None: | |
| def _extract_headers_from_properties(self) -> Dict[str, str]: | ||
| return {key[len(HEADER_PREFIX) :]: value for key, value in self.properties.items() if key.startswith(HEADER_PREFIX)} | ||
|
|
||
| @retry(**_RETRY_ARGS) | ||
| def create_table( | ||
| def _create_table( | ||
| self, | ||
| identifier: Union[str, Identifier], | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: Optional[str] = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> Table: | ||
| stage_create: bool = False, | ||
| ) -> TableResponse: | ||
| iceberg_schema = self._convert_schema_if_needed(schema) | ||
| fresh_schema = assign_fresh_schema_ids(iceberg_schema) | ||
| fresh_partition_spec = assign_fresh_partition_spec_ids(partition_spec, iceberg_schema, fresh_schema) | ||
|
|
@@ -512,6 +524,7 @@ def create_table( | |
| table_schema=fresh_schema, | ||
| partition_spec=fresh_partition_spec, | ||
| write_order=fresh_sort_order, | ||
| stage_create=stage_create, | ||
| properties=properties, | ||
| ) | ||
| serialized_json = request.model_dump_json().encode(UTF8) | ||
|
|
@@ -524,7 +537,32 @@ def create_table( | |
| except HTTPError as exc: | ||
| self._handle_non_200_response(exc, {409: TableAlreadyExistsError}) | ||
|
|
||
| table_response = TableResponse(**response.json()) | ||
| return TableResponse(**response.json()) | ||
|
|
||
| @retry(**_RETRY_ARGS) | ||
| def _create_staged_table( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The inheritance feels off here. I would just expect to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! This is a great suggestion! After going through the current code, I find that not only Since it is a big refactoring, please let me know if you want this to happen in a follow-up PR. |
||
| self, | ||
| identifier: Union[str, Identifier], | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: Optional[str] = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> StagedTable: | ||
| table_response = self._create_table(identifier, schema, location, partition_spec, sort_order, properties, True) | ||
| return self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response) | ||
|
|
||
| @retry(**_RETRY_ARGS) | ||
| def create_table( | ||
| self, | ||
| identifier: Union[str, Identifier], | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: Optional[str] = None, | ||
| partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, | ||
| sort_order: SortOrder = UNSORTED_SORT_ORDER, | ||
| properties: Properties = EMPTY_DICT, | ||
| ) -> Table: | ||
| table_response = self._create_table(identifier, schema, location, partition_spec, sort_order, properties) | ||
| return self._response_to_table(self.identifier_to_tuple(identifier), table_response) | ||
|
|
||
| @retry(**_RETRY_ARGS) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.