Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/content/pypaimon/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ catalog_options = {
catalog = CatalogFactory.create(catalog_options)
```

For an S3 warehouse, pass S3 authentication options with the filesystem catalog options:

```python
from pypaimon import CatalogFactory

catalog_options = {
'warehouse': 's3://bucket/path/to/warehouse',
's3.endpoint': 'https://s3.amazonaws.com',
's3.access-key': 'xxx',
's3.secret-key': 'yyy',
# Optional. Required for temporary credentials.
's3.session-token': 'zzz',
# Optional. Useful for S3 compatible stores such as MinIO.
's3.path-style-access': 'true',
}
catalog = CatalogFactory.create(catalog_options)
```

{{< /tab >}}
{{< tab "rest catalog" >}}
The sample code is as follows. The detailed meaning of option can be found in [REST]({{< ref "concepts/rest/overview" >}}).
Expand Down
85 changes: 78 additions & 7 deletions paimon-python/pypaimon/filesystem/pyarrow_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.options.options_utils import OptionsUtils
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE
from pypaimon.schema.data_types import (AtomicType, DataField,
Expand Down Expand Up @@ -113,6 +114,32 @@ def _create_s3_retry_config(
else:
return {}

def _get_property(self, *keys: str):
for key in keys:
if self.properties.contains_key(key):
return self.properties.to_map().get(key)
return None

def _get_s3_property(self, name: str, legacy_key: str = None):
keys = []
if legacy_key:
keys.append(legacy_key)
keys.extend([
"s3." + name,
"s3a." + name,
"fs.s3." + name,
"fs.s3a." + name,
])
return self._get_property(*keys)

def _get_s3_boolean_property(self, name: str) -> bool:
value = self._get_s3_property(name)
if value is None:
return False
if isinstance(value, bool):
return value
return OptionsUtils.convert_to_boolean(value)

def _extract_oss_bucket(self, location) -> str:
uri = urlparse(location)
if uri.scheme and uri.scheme != "oss":
Expand Down Expand Up @@ -169,22 +196,66 @@ def _initialize_oss_fs(self, path) -> FileSystem:
return pafs.S3FileSystem(**client_kwargs)

def _initialize_s3_fs(self) -> FileSystem:
if self.properties.get(S3Options.S3_ACCESS_KEY_ID):
access_key = self._get_property(
S3Options.S3_ACCESS_KEY_ID.key(),
"s3.access-key",
"s3.access.key",
"s3a.access-key",
"s3a.access.key",
"fs.s3.access-key",
"fs.s3.access.key",
"fs.s3a.access-key",
"fs.s3a.access.key")
secret_key = self._get_property(
S3Options.S3_ACCESS_KEY_SECRET.key(),
"s3.secret-key",
"s3.secret.key",
"s3a.secret-key",
"s3a.secret.key",
"fs.s3.secret-key",
"fs.s3.secret.key",
"fs.s3a.secret-key",
"fs.s3a.secret.key")
session_token = self._get_property(
S3Options.S3_SECURITY_TOKEN.key(),
"s3.session-token",
"s3.session.token",
"s3.security-token",
"s3.security.token",
"s3a.session-token",
"s3a.session.token",
"s3a.security-token",
"s3a.security.token",
"fs.s3.session-token",
"fs.s3.session.token",
"fs.s3.security-token",
"fs.s3.security.token",
"fs.s3a.session-token",
"fs.s3a.session.token",
"fs.s3a.security-token",
"fs.s3a.security.token")
endpoint = self._get_s3_property("endpoint", S3Options.S3_ENDPOINT.key())
region = self._get_s3_property("region", S3Options.S3_REGION.key())

if access_key:
# When explicit credentials are provided, disable the EC2 Instance Metadata
# Service (IMDS) probe to avoid multi-second timeouts in non-AWS environments.
# Uses setdefault so that an explicit user setting is never overridden.
# Note: this is process-wide and affects all AWS SDK clients.
os.environ.setdefault("AWS_EC2_METADATA_DISABLED", "true")

client_kwargs = {
"endpoint_override": self.properties.get(S3Options.S3_ENDPOINT),
"access_key": self.properties.get(S3Options.S3_ACCESS_KEY_ID),
"secret_key": self.properties.get(S3Options.S3_ACCESS_KEY_SECRET),
"session_token": self.properties.get(S3Options.S3_SECURITY_TOKEN),
"region": self.properties.get(S3Options.S3_REGION),
"endpoint_override": endpoint,
"access_key": access_key,
"secret_key": secret_key,
"session_token": session_token,
"region": region,
}
if self._pyarrow_gte_7:
client_kwargs["force_virtual_addressing"] = True
path_style_access = (
self._get_s3_boolean_property("path-style-access") or
self._get_s3_boolean_property("path.style.access"))
client_kwargs["force_virtual_addressing"] = not path_style_access

retry_config = self._create_s3_retry_config()
client_kwargs.update(retry_config)
Expand Down
45 changes: 44 additions & 1 deletion paimon-python/pypaimon/tests/filesystem_catalog_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import shutil
import tempfile
import unittest
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pyarrow as pa

Expand Down Expand Up @@ -89,6 +89,49 @@ def test_table(self):
self.assertTrue(isinstance(table.fields[2].type, AtomicType))
self.assertEqual(table.fields[2].type.type, "STRING")

def test_s3_filesystem_catalog_with_paimon_options(self):
with patch("pypaimon.filesystem.pyarrow_file_io.pafs.S3FileSystem") as s3_file_system:
CatalogFactory.create({
"warehouse": "s3://bucket/warehouse",
"s3.endpoint": "http://localhost:9000",
"s3.access-key": "access-key",
"s3.secret-key": "secret-key",
"s3.session-token": "session-token",
"s3.region": "us-east-1",
"s3.path-style-access": "true",
})

s3_file_system.assert_called_once()
kwargs = s3_file_system.call_args[1]
self.assertEqual(kwargs["endpoint_override"], "http://localhost:9000")
self.assertEqual(kwargs["access_key"], "access-key")
self.assertEqual(kwargs["secret_key"], "secret-key")
self.assertEqual(kwargs["session_token"], "session-token")
self.assertEqual(kwargs["region"], "us-east-1")
if "force_virtual_addressing" in kwargs:
self.assertFalse(kwargs["force_virtual_addressing"])

def test_s3_filesystem_catalog_with_legacy_options(self):
with patch("pypaimon.filesystem.pyarrow_file_io.pafs.S3FileSystem") as s3_file_system:
CatalogFactory.create({
"warehouse": "s3://bucket/warehouse",
"fs.s3.endpoint": "http://localhost:9000",
"fs.s3.accessKeyId": "access-key",
"fs.s3.accessKeySecret": "secret-key",
"fs.s3.securityToken": "session-token",
"fs.s3.region": "us-east-1",
})

s3_file_system.assert_called_once()
kwargs = s3_file_system.call_args[1]
self.assertEqual(kwargs["endpoint_override"], "http://localhost:9000")
self.assertEqual(kwargs["access_key"], "access-key")
self.assertEqual(kwargs["secret_key"], "secret-key")
self.assertEqual(kwargs["session_token"], "session-token")
self.assertEqual(kwargs["region"], "us-east-1")
if "force_virtual_addressing" in kwargs:
self.assertTrue(kwargs["force_virtual_addressing"])

def test_alter_table(self):
catalog = CatalogFactory.create({
"warehouse": self.warehouse
Expand Down
Loading