Skip to content

Commit 2940125

Browse files
committed
refactor and fix failing tests
1 parent c531e0f commit 2940125

File tree

2 files changed

+199
-153
lines changed

2 files changed

+199
-153
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 81 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -83,23 +83,35 @@
8383
)
8484
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
8585
from pyiceberg.io import (
86+
ADLS_ACCOUNT_HOST,
8687
ADLS_ACCOUNT_KEY,
8788
ADLS_ACCOUNT_NAME,
8889
ADLS_BLOB_STORAGE_AUTHORITY,
8990
ADLS_BLOB_STORAGE_SCHEME,
91+
ADLS_CLIENT_ID,
92+
ADLS_CLIENT_SECRET,
93+
ADLS_CONNECTION_STRING,
9094
ADLS_DFS_STORAGE_AUTHORITY,
9195
ADLS_DFS_STORAGE_SCHEME,
9296
ADLS_SAS_TOKEN,
97+
ADLS_TENANT_ID,
9398
AWS_ACCESS_KEY_ID,
9499
AWS_REGION,
95100
AWS_ROLE_ARN,
96101
AWS_ROLE_SESSION_NAME,
97102
AWS_SECRET_ACCESS_KEY,
98103
AWS_SESSION_TOKEN,
104+
GCS_ACCESS,
105+
GCS_CACHE_TIMEOUT,
106+
GCS_CONSISTENCY,
99107
GCS_DEFAULT_LOCATION,
108+
GCS_PROJECT_ID,
109+
GCS_REQUESTER_PAYS,
100110
GCS_SERVICE_HOST,
111+
GCS_SESSION_KWARGS,
101112
GCS_TOKEN,
102113
GCS_TOKEN_EXPIRES_AT_MS,
114+
GCS_VERSION_AWARE,
103115
HDFS_HOST,
104116
HDFS_KERB_TICKET,
105117
HDFS_PORT,
@@ -118,6 +130,8 @@
118130
S3_ROLE_SESSION_NAME,
119131
S3_SECRET_ACCESS_KEY,
120132
S3_SESSION_TOKEN,
133+
S3_SIGNER_ENDPOINT,
134+
S3_SIGNER_URI,
121135
FileIO,
122136
InputFile,
123137
InputStream,
@@ -397,6 +411,32 @@ def parse_location(location: str) -> Tuple[str, str, str]:
397411
else:
398412
return uri.scheme, uri.netloc, f"{uri.netloc}{uri.path}"
399413

414+
def _process_basic_properties(
415+
self, property_mapping: Dict[str, str], special_properties: Set[str], prefix: str
416+
) -> Dict[str, Any]:
417+
"""Process basic property mappings and prefix passthrough logic."""
418+
client_kwargs: Dict[str, Any] = {}
419+
420+
for prop_name, prop_value in self.properties.items():
421+
if prop_value is None:
422+
continue
423+
424+
# Skip properties that need special handling
425+
if prop_name in special_properties:
426+
continue
427+
428+
# Map known property names to filesystem parameter names
429+
if prop_name in property_mapping:
430+
param_name = property_mapping[prop_name]
431+
client_kwargs[param_name] = prop_value
432+
433+
# Pass through any other {prefix}.* properties
434+
elif prop_name.startswith(f"{prefix}."):
435+
param_name = prop_name.split(".", 1)[1]
436+
client_kwargs[param_name] = prop_value
437+
438+
return client_kwargs
439+
400440
def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
401441
"""Initialize FileSystem for different scheme."""
402442
if scheme in {"oss"}:
@@ -426,51 +466,26 @@ def _initialize_oss_fs(self) -> FileSystem:
426466
# Mapping from PyIceberg properties to S3FileSystem parameter names
427467
property_mapping = {
428468
S3_ENDPOINT: "endpoint_override",
429-
S3_ACCESS_KEY_ID: "access_key",
430-
AWS_ACCESS_KEY_ID: "access_key",
431-
S3_SECRET_ACCESS_KEY: "secret_key",
432-
AWS_SECRET_ACCESS_KEY: "secret_key",
433-
S3_SESSION_TOKEN: "session_token",
434-
AWS_SESSION_TOKEN: "session_token",
435-
S3_REGION: "region",
436-
AWS_REGION: "region",
437469
S3_PROXY_URI: "proxy_options",
438470
S3_CONNECT_TIMEOUT: "connect_timeout",
439471
S3_REQUEST_TIMEOUT: "request_timeout",
440-
S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing",
441472
}
442473

443474
# Properties that need special handling
444475
special_properties = {
476+
S3_ACCESS_KEY_ID,
477+
S3_SECRET_ACCESS_KEY,
478+
S3_SESSION_TOKEN,
445479
S3_CONNECT_TIMEOUT,
446480
S3_REQUEST_TIMEOUT,
447481
S3_FORCE_VIRTUAL_ADDRESSING,
448482
S3_ROLE_SESSION_NAME,
449483
S3_RESOLVE_REGION,
450-
AWS_ROLE_SESSION_NAME,
484+
S3_REGION,
451485
}
452486

453-
client_kwargs: Dict[str, Any] = {}
487+
client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3")
454488

455-
for prop_name, prop_value in self.properties.items():
456-
if prop_value is None:
457-
continue
458-
459-
# Skip properties that need special handling
460-
if prop_name in special_properties:
461-
continue
462-
463-
# Map known property names to S3FileSystem parameter names
464-
if prop_name in property_mapping:
465-
param_name = property_mapping[prop_name]
466-
client_kwargs[param_name] = prop_value
467-
468-
# Pass through any other s3.* properties to S3FileSystem
469-
elif prop_name.startswith("s3."):
470-
param_name = prop_name.split(".", 1)[1]
471-
client_kwargs[param_name] = prop_value
472-
473-
# Handle properties that need first value resolution
474489
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
475490
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)
476491

@@ -505,57 +520,35 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
505520
else:
506521
bucket_region = provided_region
507522

508-
# Properties that need special handling
523+
# Mapping from PyIceberg properties to S3FileSystem parameter names
509524
property_mapping = {
510525
S3_ENDPOINT: "endpoint_override",
511-
S3_ACCESS_KEY_ID: "access_key",
512-
AWS_ACCESS_KEY_ID: "access_key",
513-
S3_SECRET_ACCESS_KEY: "secret_key",
514-
AWS_SECRET_ACCESS_KEY: "secret_key",
515-
S3_SESSION_TOKEN: "session_token",
516-
AWS_SESSION_TOKEN: "session_token",
517526
S3_PROXY_URI: "proxy_options",
518527
S3_CONNECT_TIMEOUT: "connect_timeout",
519528
S3_REQUEST_TIMEOUT: "request_timeout",
520-
S3_ROLE_ARN: "role_arn",
521-
AWS_ROLE_ARN: "role_arn",
522-
S3_ROLE_SESSION_NAME: "session_name",
523-
AWS_ROLE_SESSION_NAME: "session_name",
524-
S3_FORCE_VIRTUAL_ADDRESSING: "force_virtual_addressing",
525529
S3_RETRY_STRATEGY_IMPL: "retry_strategy",
526530
}
527531

528-
# Properties that need special handling and should not be passed directly
532+
# Properties that need special handling
529533
special_properties = {
534+
S3_ACCESS_KEY_ID,
535+
S3_SECRET_ACCESS_KEY,
536+
S3_SESSION_TOKEN,
537+
S3_ROLE_ARN,
538+
S3_ROLE_SESSION_NAME,
530539
S3_RESOLVE_REGION,
531540
S3_REGION,
532-
AWS_REGION,
533541
S3_RETRY_STRATEGY_IMPL,
534542
S3_CONNECT_TIMEOUT,
535543
S3_REQUEST_TIMEOUT,
544+
S3_SIGNER_ENDPOINT,
545+
S3_SIGNER_URI,
546+
S3_FORCE_VIRTUAL_ADDRESSING,
536547
}
537548

538-
client_kwargs: Dict[str, Any] = {}
539-
549+
client_kwargs = self._process_basic_properties(property_mapping, special_properties, "s3")
540550
client_kwargs["region"] = bucket_region
541-
for prop_name, prop_value in self.properties.items():
542-
if prop_value is None:
543-
continue
544-
545-
# Skip properties that need special handling
546-
if prop_name in special_properties:
547-
continue
548-
549-
if prop_name in property_mapping:
550-
param_name = property_mapping[prop_name]
551-
client_kwargs[param_name] = prop_value
552551

553-
# Pass through any other s3.* properties that might be used by S3FileSystem
554-
elif prop_name.startswith("s3."):
555-
param_name = prop_name.split(".", 1)[1]
556-
client_kwargs[param_name] = prop_value
557-
558-
# Handle properties that need first value resolution
559552
if S3_ACCESS_KEY_ID in self.properties or AWS_ACCESS_KEY_ID in self.properties:
560553
client_kwargs["access_key"] = get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID)
561554

@@ -577,6 +570,9 @@ def _initialize_s3_fs(self, netloc: Optional[str]) -> FileSystem:
577570
if request_timeout := self.properties.get(S3_REQUEST_TIMEOUT):
578571
client_kwargs["request_timeout"] = float(request_timeout)
579572

573+
if self.properties.get(S3_FORCE_VIRTUAL_ADDRESSING) is not None:
574+
client_kwargs["force_virtual_addressing"] = property_as_bool(self.properties, S3_FORCE_VIRTUAL_ADDRESSING, False)
575+
580576
# Handle retry strategy special case
581577
if (retry_strategy_impl := self.properties.get(S3_RETRY_STRATEGY_IMPL)) and (
582578
retry_instance := _import_retry_strategy(retry_strategy_impl)
@@ -606,24 +602,17 @@ def _initialize_azure_fs(self) -> FileSystem:
606602
ADLS_BLOB_STORAGE_SCHEME: "blob_storage_scheme",
607603
ADLS_DFS_STORAGE_SCHEME: "dfs_storage_scheme",
608604
ADLS_SAS_TOKEN: "sas_token",
605+
ADLS_CLIENT_ID: "client_id",
606+
ADLS_CLIENT_SECRET: "client_secret",
607+
ADLS_TENANT_ID: "tenant_id",
609608
}
610609

611-
client_kwargs: Dict[str, Any] = {}
612-
613-
for prop_name, prop_value in self.properties.items():
614-
if prop_value is None:
615-
continue
616-
617-
# Map known property names to AzureFileSystem parameter names
618-
if prop_name in property_mapping:
619-
param_name = property_mapping[prop_name]
620-
client_kwargs[param_name] = prop_value
621-
622-
# Pass through any other adls.* properties that might be used by AzureFileSystem
623-
elif prop_name.startswith("adls."):
624-
param_name = prop_name.split(".", 1)[1]
625-
client_kwargs[param_name] = prop_value
610+
special_properties = {
611+
ADLS_CONNECTION_STRING,
612+
ADLS_ACCOUNT_HOST,
613+
}
626614

615+
client_kwargs = self._process_basic_properties(property_mapping, special_properties, "adls")
627616
return AzureFileSystem(**client_kwargs)
628617

629618
def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
@@ -632,33 +621,19 @@ def _initialize_hdfs_fs(self, scheme: str, netloc: Optional[str]) -> FileSystem:
632621
if netloc:
633622
return HadoopFileSystem.from_uri(f"{scheme}://{netloc}")
634623

635-
# Mapping from PyIceberg properties to S3FileSystem parameter names
624+
# Mapping from PyIceberg properties to HadoopFileSystem parameter names
636625
property_mapping = {
637626
HDFS_HOST: "host",
638627
HDFS_PORT: "port",
639628
HDFS_USER: "user",
640629
HDFS_KERB_TICKET: "kerb_ticket",
641630
}
642631

643-
hdfs_kwargs: Dict[str, Any] = {}
632+
hdfs_kwargs = self._process_basic_properties(property_mapping, set(), "hdfs")
644633

645-
for prop_name, prop_value in self.properties.items():
646-
if prop_value is None:
647-
continue
648-
649-
# Map known property names to HadoopFileSystem parameter names
650-
if prop_name in property_mapping:
651-
param_name = property_mapping[prop_name]
652-
653-
if param_name == "port":
654-
hdfs_kwargs[param_name] = int(prop_value)
655-
else:
656-
hdfs_kwargs[param_name] = prop_value
657-
658-
# Pass through any other hdfs.* properties used to be used by HadoopFileSystem
659-
elif prop_name.startswith("hdfs."):
660-
param_name = prop_name.split(".", 1)[1]
661-
hdfs_kwargs[param_name] = prop_value
634+
# Handle port conversion to int
635+
if "port" in hdfs_kwargs:
636+
hdfs_kwargs["port"] = int(hdfs_kwargs["port"])
662637

663638
return HadoopFileSystem(**hdfs_kwargs)
664639

@@ -668,36 +643,23 @@ def _initialize_gcs_fs(self) -> FileSystem:
668643
# Mapping from PyIceberg properties to GcsFileSystem parameter names
669644
property_mapping = {
670645
GCS_TOKEN: "access_token",
671-
GCS_TOKEN_EXPIRES_AT_MS: "credential_token_expiration",
672646
GCS_DEFAULT_LOCATION: "default_bucket_location",
673-
GCS_SERVICE_HOST: "endpoint_override",
647+
GCS_PROJECT_ID: "project_id",
674648
}
675649

676650
# Properties that need special handling
677651
special_properties = {
678652
GCS_TOKEN_EXPIRES_AT_MS,
679653
GCS_SERVICE_HOST,
654+
GCS_ACCESS,
655+
GCS_CONSISTENCY,
656+
GCS_CACHE_TIMEOUT,
657+
GCS_REQUESTER_PAYS,
658+
GCS_SESSION_KWARGS,
659+
GCS_VERSION_AWARE,
680660
}
681661

682-
gcs_kwargs: Dict[str, Any] = {}
683-
684-
for prop_name, prop_value in self.properties.items():
685-
if prop_value is None:
686-
continue
687-
688-
# Skip properties that need special handling
689-
if prop_name in special_properties:
690-
continue
691-
692-
# Map known property names to GcsFileSystem parameter names
693-
if prop_name in property_mapping:
694-
param_name = property_mapping[prop_name]
695-
gcs_kwargs[param_name] = prop_value
696-
697-
# Pass through any other gcs.* properties that might be used by GcsFileSystem
698-
elif prop_name.startswith("gcs."):
699-
param_name = prop_name.split(".", 1)[1]
700-
gcs_kwargs[param_name] = prop_value
662+
gcs_kwargs = self._process_basic_properties(property_mapping, special_properties, "gcs")
701663

702664
if expiration := self.properties.get(GCS_TOKEN_EXPIRES_AT_MS):
703665
gcs_kwargs["credential_token_expiration"] = millis_to_datetime(int(expiration))
@@ -710,17 +672,7 @@ def _initialize_gcs_fs(self) -> FileSystem:
710672
return GcsFileSystem(**gcs_kwargs)
711673

712674
def _initialize_local_fs(self) -> FileSystem:
713-
local_kwargs: Dict[str, Any] = {}
714-
715-
for prop_name, prop_value in self.properties.items():
716-
if prop_value is None:
717-
continue
718-
719-
# Pass through any other file.* properties that might be used by PyArrowLocalFileSystem
720-
elif prop_name.startswith("file."):
721-
param_name = prop_name.split(".", 1)[1]
722-
local_kwargs[param_name] = prop_value
723-
675+
local_kwargs = self._process_basic_properties({}, set(), "file")
724676
return PyArrowLocalFileSystem(**local_kwargs)
725677

726678
def new_input(self, location: str) -> PyArrowFile:

0 commit comments

Comments
 (0)