diff --git a/docs/content/pypaimon/python-api.md b/docs/content/pypaimon/python-api.md index 197a018ef1f7..ed052f065a93 100644 --- a/docs/content/pypaimon/python-api.md +++ b/docs/content/pypaimon/python-api.md @@ -44,6 +44,24 @@ catalog_options = { catalog = CatalogFactory.create(catalog_options) ``` +For an S3 warehouse, pass S3 authentication options with the filesystem catalog options: + +```python +from pypaimon import CatalogFactory + +catalog_options = { + 'warehouse': 's3://bucket/path/to/warehouse', + 's3.endpoint': 'https://s3.amazonaws.com', + 's3.access-key': 'xxx', + 's3.secret-key': 'yyy', + # Optional. Required for temporary credentials. + 's3.session-token': 'zzz', + # Optional. Useful for S3 compatible stores such as MinIO. + 's3.path-style-access': 'true', +} +catalog = CatalogFactory.create(catalog_options) +``` + {{< /tab >}} {{< tab "rest catalog" >}} The sample code is as follows. The detailed meaning of option can be found in [REST]({{< ref "concepts/rest/overview" >}}). diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index b5e03c015154..6f9f32aad9cd 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -33,6 +33,7 @@ from pypaimon.common.file_io import FileIO from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions, S3Options +from pypaimon.common.options.options_utils import OptionsUtils from pypaimon.common.uri_reader import UriReaderFactory from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE from pypaimon.schema.data_types import (AtomicType, DataField, @@ -113,6 +114,32 @@ def _create_s3_retry_config( else: return {} + def _get_property(self, *keys: str): + for key in keys: + if self.properties.contains_key(key): + return self.properties.to_map().get(key) + return None + + def _get_s3_property(self, name: str, legacy_key: str = None): + keys = [] + if legacy_key: + keys.append(legacy_key) + keys.extend([ + "s3." + name, + "s3a." + name, + "fs.s3." + name, + "fs.s3a." + name, + ]) + return self._get_property(*keys) + + def _get_s3_boolean_property(self, name: str) -> bool: + value = self._get_s3_property(name) + if value is None: + return False + if isinstance(value, bool): + return value + return OptionsUtils.convert_to_boolean(value) + def _extract_oss_bucket(self, location) -> str: uri = urlparse(location) if uri.scheme and uri.scheme != "oss": @@ -169,7 +196,48 @@ def _initialize_oss_fs(self, path) -> FileSystem: return pafs.S3FileSystem(**client_kwargs) def _initialize_s3_fs(self) -> FileSystem: - if self.properties.get(S3Options.S3_ACCESS_KEY_ID): + access_key = self._get_property( + S3Options.S3_ACCESS_KEY_ID.key(), + "s3.access-key", + "s3.access.key", + "s3a.access-key", + "s3a.access.key", + "fs.s3.access-key", + "fs.s3.access.key", + "fs.s3a.access-key", + "fs.s3a.access.key") + secret_key = self._get_property( + S3Options.S3_ACCESS_KEY_SECRET.key(), + "s3.secret-key", + "s3.secret.key", + "s3a.secret-key", + "s3a.secret.key", + "fs.s3.secret-key", + "fs.s3.secret.key", + "fs.s3a.secret-key", + "fs.s3a.secret.key") + session_token = self._get_property( + S3Options.S3_SECURITY_TOKEN.key(), + "s3.session-token", + "s3.session.token", + "s3.security-token", + "s3.security.token", + "s3a.session-token", + "s3a.session.token", + "s3a.security-token", + "s3a.security.token", + "fs.s3.session-token", + "fs.s3.session.token", + "fs.s3.security-token", + "fs.s3.security.token", + "fs.s3a.session-token", + "fs.s3a.session.token", + "fs.s3a.security-token", + "fs.s3a.security.token") + endpoint = self._get_s3_property("endpoint", S3Options.S3_ENDPOINT.key()) + region = self._get_s3_property("region", S3Options.S3_REGION.key()) + + if access_key: # When explicit credentials are provided, disable the EC2 Instance Metadata # Service (IMDS) probe to avoid multi-second timeouts in non-AWS environments. # Uses setdefault so that an explicit user setting is never overridden. @@ -177,14 +245,17 @@ def _initialize_s3_fs(self) -> FileSystem: os.environ.setdefault("AWS_EC2_METADATA_DISABLED", "true") client_kwargs = { - "endpoint_override": self.properties.get(S3Options.S3_ENDPOINT), - "access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID), - "secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET), - "session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN), - "region": self.properties.get(S3Options.S3_REGION), + "endpoint_override": endpoint, + "access_key": access_key, + "secret_key": secret_key, + "session_token": session_token, + "region": region, } if self._pyarrow_gte_7: - client_kwargs["force_virtual_addressing"] = True + path_style_access = ( + self._get_s3_boolean_property("path-style-access") or + self._get_s3_boolean_property("path.style.access")) + client_kwargs["force_virtual_addressing"] = not path_style_access retry_config = self._create_s3_retry_config() client_kwargs.update(retry_config) diff --git a/paimon-python/pypaimon/tests/filesystem_catalog_test.py b/paimon-python/pypaimon/tests/filesystem_catalog_test.py index dbbc18bb3e00..bbad49185d58 100644 --- a/paimon-python/pypaimon/tests/filesystem_catalog_test.py +++ b/paimon-python/pypaimon/tests/filesystem_catalog_test.py @@ -17,7 +17,7 @@ import shutil import tempfile import unittest -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pyarrow as pa @@ -89,6 +89,49 @@ def test_table(self): self.assertTrue(isinstance(table.fields[2].type, AtomicType)) self.assertEqual(table.fields[2].type.type, "STRING") + def test_s3_filesystem_catalog_with_paimon_options(self): + with patch("pypaimon.filesystem.pyarrow_file_io.pafs.S3FileSystem") as s3_file_system: + CatalogFactory.create({ + "warehouse": "s3://bucket/warehouse", + "s3.endpoint": "http://localhost:9000", + "s3.access-key": "access-key", + "s3.secret-key": "secret-key", + "s3.session-token": "session-token", + "s3.region": "us-east-1", + "s3.path-style-access": "true", + }) + + s3_file_system.assert_called_once() + kwargs = s3_file_system.call_args[1] + self.assertEqual(kwargs["endpoint_override"], "http://localhost:9000") + self.assertEqual(kwargs["access_key"], "access-key") + self.assertEqual(kwargs["secret_key"], "secret-key") + self.assertEqual(kwargs["session_token"], "session-token") + self.assertEqual(kwargs["region"], "us-east-1") + if "force_virtual_addressing" in kwargs: + self.assertFalse(kwargs["force_virtual_addressing"]) + + def test_s3_filesystem_catalog_with_legacy_options(self): + with patch("pypaimon.filesystem.pyarrow_file_io.pafs.S3FileSystem") as s3_file_system: + CatalogFactory.create({ + "warehouse": "s3://bucket/warehouse", + "fs.s3.endpoint": "http://localhost:9000", + "fs.s3.accessKeyId": "access-key", + "fs.s3.accessKeySecret": "secret-key", + "fs.s3.securityToken": "session-token", + "fs.s3.region": "us-east-1", + }) + + s3_file_system.assert_called_once() + kwargs = s3_file_system.call_args[1] + self.assertEqual(kwargs["endpoint_override"], "http://localhost:9000") + self.assertEqual(kwargs["access_key"], "access-key") + self.assertEqual(kwargs["secret_key"], "secret-key") + self.assertEqual(kwargs["session_token"], "session-token") + self.assertEqual(kwargs["region"], "us-east-1") + if "force_virtual_addressing" in kwargs: + self.assertTrue(kwargs["force_virtual_addressing"]) + def test_alter_table(self): catalog = CatalogFactory.create({ "warehouse": self.warehouse