-
Notifications
You must be signed in to change notification settings - Fork 466
create_table with a PyArrow Schema
#305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
8cd0e52
3d0445b
6ea0892
7ba0fd1
39a97a2
60ac8f8
b917afd
e4e9f9b
c40c553
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -26,6 +26,7 @@ | |||||
| from __future__ import annotations | ||||||
|
|
||||||
| import concurrent.futures | ||||||
| import itertools | ||||||
| import logging | ||||||
| import os | ||||||
| import re | ||||||
|
|
@@ -34,7 +35,6 @@ | |||||
| from dataclasses import dataclass | ||||||
| from enum import Enum | ||||||
| from functools import lru_cache, singledispatch | ||||||
| from itertools import chain | ||||||
| from typing import ( | ||||||
| TYPE_CHECKING, | ||||||
| Any, | ||||||
|
|
@@ -631,7 +631,7 @@ def _combine_positional_deletes(positional_deletes: List[pa.ChunkedArray], rows: | |||||
| if len(positional_deletes) == 1: | ||||||
| all_chunks = positional_deletes[0] | ||||||
| else: | ||||||
| all_chunks = pa.chunked_array(chain(*[arr.chunks for arr in positional_deletes])) | ||||||
| all_chunks = pa.chunked_array(itertools.chain(*[arr.chunks for arr in positional_deletes])) | ||||||
| return np.setdiff1d(np.arange(rows), all_chunks, assume_unique=False) | ||||||
|
|
||||||
|
|
||||||
|
|
@@ -761,6 +761,32 @@ def primitive(self, primitive: pa.DataType) -> T: | |||||
| """Visit a primitive type.""" | ||||||
|
|
||||||
|
|
||||||
| class PreOrderPyArrowSchemaVisitor(Generic[T], ABC): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch - I missed this one. I'll remove this Visitor as its no longer in use |
||||||
| @abstractmethod | ||||||
| def schema(self, schema: pa.Schema, struct_result: Callable[[], T]) -> T: | ||||||
| """Visit a schema.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def struct(self, struct: pa.StructType, field_results: List[Callable[[], T]]) -> T: | ||||||
| """Visit a struct.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def field(self, field: pa.Field, field_result: Callable[[], T]) -> T: | ||||||
| """Visit a field.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def list(self, list_type: pa.ListType, element_result: Callable[[], T]) -> T: | ||||||
| """Visit a list.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def map(self, map_type: pa.MapType, key_result: Callable[[], T], value_result: Callable[[], T]) -> T: | ||||||
| """Visit a map.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def primitive(self, primitive: pa.DataType) -> T: | ||||||
| """Visit a primitive type.""" | ||||||
|
|
||||||
|
|
||||||
| def _get_field_id(field: pa.Field) -> Optional[int]: | ||||||
| return ( | ||||||
| int(field_id_str.decode()) | ||||||
|
|
@@ -906,6 +932,21 @@ def after_map_value(self, element: pa.Field) -> None: | |||||
| self._field_names.pop() | ||||||
|
|
||||||
|
|
||||||
| class _ConvertToIcebergWithNoIds(_ConvertToIceberg): | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style suggestion, feel free to ignore:
Suggested change
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the suggestion :) |
||||||
| """ | ||||||
| Converts PyArrowSchema to Iceberg Schema with all -1 ids. | ||||||
|
|
||||||
| The schema generated through this visitor should always be | ||||||
| used in conjunction with `new_table_metadata` function to | ||||||
| assign new field ids in order. This is currently used only | ||||||
| when creating an Iceberg Schema from a PyArrow schema when | ||||||
| creating a new Iceberg table. | ||||||
| """ | ||||||
|
|
||||||
| def _field_id(self, field: pa.Field) -> int: | ||||||
| return -1 | ||||||
|
|
||||||
|
|
||||||
| def _task_to_table( | ||||||
| fs: FileSystem, | ||||||
| task: FileScanTask, | ||||||
|
|
@@ -993,7 +1034,7 @@ def _task_to_table( | |||||
|
|
||||||
| def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: | ||||||
| deletes_per_file: Dict[str, List[ChunkedArray]] = {} | ||||||
| unique_deletes = set(chain.from_iterable([task.delete_files for task in tasks])) | ||||||
| unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks])) | ||||||
| if len(unique_deletes) > 0: | ||||||
| executor = ExecutorFactory.get_or_create() | ||||||
| deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map( | ||||||
|
|
@@ -1399,7 +1440,7 @@ def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsColl | |||||
| def struct( | ||||||
| self, struct: StructType, field_results: List[Callable[[], List[StatisticsCollector]]] | ||||||
| ) -> List[StatisticsCollector]: | ||||||
| return list(chain(*[result() for result in field_results])) | ||||||
| return list(itertools.chain(*[result() for result in field_results])) | ||||||
|
|
||||||
| def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: | ||||||
| self._field_id = field.field_id | ||||||
|
|
@@ -1491,7 +1532,7 @@ def schema(self, schema: Schema, struct_result: Callable[[], List[ID2ParquetPath | |||||
| return struct_result() | ||||||
|
|
||||||
| def struct(self, struct: StructType, field_results: List[Callable[[], List[ID2ParquetPath]]]) -> List[ID2ParquetPath]: | ||||||
| return list(chain(*[result() for result in field_results])) | ||||||
| return list(itertools.chain(*[result() for result in field_results])) | ||||||
|
|
||||||
| def field(self, field: NestedField, field_result: Callable[[], List[ID2ParquetPath]]) -> List[ID2ParquetPath]: | ||||||
| self._field_id = field.field_id | ||||||
|
|
||||||
Uh oh!
There was an error while loading. Please reload this page.