diff --git a/docs/content.zh/docs/deployment/filesystems/s3.md b/docs/content.zh/docs/deployment/filesystems/s3.md index c1515617ba9f8..caaad68d3f814 100644 --- a/docs/content.zh/docs/deployment/filesystems/s3.md +++ b/docs/content.zh/docs/deployment/filesystems/s3.md @@ -27,20 +27,18 @@ under the License. # Amazon S3 -[Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3) 提供用于多种场景的云对象存储。S3 可与 Flink 一起使用以读取、写入数据,并可与 [流的 **State backends**]({{< ref "docs/ops/state/state_backends" >}}) 相结合使用。 +[Amazon Simple Storage Service](http://aws.amazon.com/s3/) (Amazon S3) provides cloud object storage for a variety of use cases. You can use S3 with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{< ref "docs/ops/state/state_backends" >}}). - - -通过以下格式指定路径,S3 对象可类似于普通文件使用: +You can use S3 objects like regular files by specifying paths in the following format: ```plain s3:/// ``` -Endpoint 可以是一个文件或目录,例如: +The endpoint can either be a single file or a directory, for example: ```java -// 读取 S3 bucket +// Read from S3 bucket FileSource fileSource = FileSource.forRecordStreamFormat( new TextLineInputFormat(), new Path("s3:///") ).build(); @@ -50,112 +48,236 @@ env.fromSource( "s3-input" ); -// 写入 S3 bucket +// Write to S3 bucket stream.sinkTo( - FileSink.forRowFormat( - new Path("s3:///"), new SimpleStringEncoder<>() - ).build() + FileSink.forRowFormat( + new Path("s3:///"), new SimpleStringEncoder<>() + ).build() ); - -// 使用 S3 作为 checkpoint storage +// Use S3 as checkpoint storage Configuration config = new Configuration(); config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "s3:///"); env.configure(config); ``` -注意这些例子并*不详尽*,S3 同样可以用在其他场景,包括 [JobManager 高可用配置]({{< ref "docs/deployment/ha/overview" >}}) 或 [RocksDBStateBackend]({{< ref "docs/ops/state/state_backends" >}}#the-rocksdbstatebackend),以及所有 Flink 需要使用文件系统 URI 的位置。 +Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup]({{< ref "docs/deployment/ha/overview" >}}) or the [EmbeddedRocksDBStateBackend]({{< ref "docs/ops/state/state_backends" >}}#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI (unless otherwise stated). -在大部分使用场景下,可使用 `flink-s3-fs-hadoop` 或 `flink-s3-fs-presto` 两个独立且易于设置的 S3 文件系统插件。然而在某些情况下,例如使用 S3 作为 YARN 的资源存储目录时,可能需要配置 Hadoop S3 文件系统。 +## S3 FileSystem Implementations -### Hadoop/Presto S3 文件系统插件 +Flink provides three independent S3 filesystem implementations: -{{< hint info >}} -如果您在使用 [Flink on EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html),您无需手动对此进行配置。 -{{< /hint >}} +| Implementation | Checkpointing | FileSink | Notes | +|---------------|:---:|:---:|-------| +| **Native S3** (`flink-s3-fs-native`) | ✓ | ✓ | **Experimental** in Flink 2.3. Built on AWS SDK v2; no Hadoop dependency. | +| **Presto S3** (`flink-s3-fs-presto`) | ✓ | x | Production-proven for checkpointing. | +| **Hadoop S3** (`flink-s3-fs-hadoop`) | ✓ | ✓ | Mature; the only stable implementation that provides `RecoverableWriter` for the FileSink. | -Flink 提供两种文件系统用来与 S3 交互:`flink-s3-fs-presto` 和 `flink-s3-fs-hadoop`。两种实现都是独立的且没有依赖项,因此使用时无需将 Hadoop 添加至 classpath。 +Previously, users had to choose between Presto (recommended for checkpointing throughput) and Hadoop (the only implementation with `RecoverableWriter`, required by the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}})). The Native S3 implementation unifies both capabilities in a single plugin and measurements show significant checkpoint throughput improvements over the Presto implementation. - - `flink-s3-fs-presto`,通过 *s3://* 和 *s3p://* 两种 scheme 使用,基于 [Presto project](https://prestodb.io/)。 - 可以使用[和 Presto 文件系统相同的配置项](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration)进行配置,方式为将配置添加到 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}})中。如果要在 S3 中使用 checkpoint,推荐使用 Presto S3 文件系统。 +All three are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. - - `flink-s3-fs-hadoop`,通过 *s3://* 和 *s3a://* 两种 scheme 使用, 基于 [Hadoop Project](https://hadoop.apache.org/)。 - 本文件系统可以使用类似 [Hadoop S3A 的配置项](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A)进行配置,方式为将配置添加到 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}})中。 - - 例如,Hadoop 有 `fs.s3a.connection.maximum` 的配置选项。 如果你想在 Flink 程序中改变该配置的值,你需要将配置 `s3.connection.maximum: xyz` 添加到 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}})中。Flink 会内部将其转换成配置 `fs.s3a.connection.maximum`。 而无需通过 Hadoop 的 XML 配置文件来传递参数。 - - 另外,它是唯一支持 [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) 的 S3 文件系统。 - -`flink-s3-fs-hadoop` 和 `flink-s3-fs-presto` 都为 *s3://* scheme 注册了默认的文件系统包装器,`flink-s3-fs-hadoop` 另外注册了 *s3a://*,`flink-s3-fs-presto` 注册了 *s3p://*,因此二者可以同时使用。 -例如某作业使用了 [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}),它仅支持 Hadoop,但建立 checkpoint 使用 Presto。在这种情况下,建议明确地使用 *s3a://* 作为 sink (Hadoop) 的 scheme,checkpoint (Presto) 使用 *s3p://*。这一点对于 [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) 同样成立。 +## Common Configuration -在启动 Flink 之前,将对应的 JAR 文件从 `opt` 复制到 Flink 发行版的 `plugins` 目录下,以使用 `flink-s3-fs-hadoop` 或 `flink-s3-fs-presto`。 +### Configure Access Credentials -```bash -mkdir ./plugins/s3-fs-presto -cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/ -``` +After setting up the S3 FileSystem implementation, you need to make sure that Flink is allowed to access your S3 buckets. The following three approaches are **independent alternatives** — choose the one that fits your environment: -#### 配置访问凭据 +#### Identity and Access Management (IAM) (Recommended) -在设置好 S3 文件系统包装器后,您需要确认 Flink 具有访问 S3 Bucket 的权限。 +The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). -##### Identity and Access Management (IAM)(推荐使用) +If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. -建议通过 [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html) 来配置 AWS 凭据。可使用 IAM 功能为 Flink 实例安全地提供访问 S3 Bucket 所需的凭据。关于配置的细节超出了本文档的范围,请参考 AWS 用户手册中的 [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) 部分。 +#### Delegation Tokens -如果配置正确,则可在 AWS 中管理对 S3 的访问,而无需为 Flink 分发任何访问密钥(Access Key)。 +[Delegation tokens]({{< ref "docs/deployment/security/security-delegation-token" >}}) provide time-bounded, automatically negotiated credentials. The JobManager uses long-lived credentials (access key and secret key) to call [AWS STS](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html) and obtain short-lived session tokens, which are then automatically distributed to TaskManagers. -##### 访问密钥(Access Key)(不推荐) +Each S3 implementation has its own delegation token provider with a dedicated configuration prefix. You must set the `access-key`, `secret-key`, and `region` under the corresponding prefix for the implementation you are using: -可以通过**访问密钥对(access and secret key)**授予 S3 访问权限。请注意,根据 [Introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2),不推荐使用该方法。 +```yaml +# For Native S3 implementation +security.delegation.token.provider.s3-native.access-key: your-access-key +security.delegation.token.provider.s3-native.secret-key: your-secret-key +security.delegation.token.provider.s3-native.region: us-east-1 + +# For Hadoop implementation +security.delegation.token.provider.s3-hadoop.access-key: your-access-key +security.delegation.token.provider.s3-hadoop.secret-key: your-secret-key +security.delegation.token.provider.s3-hadoop.region: us-east-1 + +# For Presto implementation +security.delegation.token.provider.s3-presto.access-key: your-access-key +security.delegation.token.provider.s3-presto.secret-key: your-secret-key +security.delegation.token.provider.s3-presto.region: us-east-1 +``` - `s3.access-key` 和 `s3.secret-key` 均需要在 Flink 的 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}}) 中进行配置: +All three values (`access-key`, `secret-key`, `region`) must be set for delegation tokens to be issued. The `DynamicTemporaryAWSCredentialsProvider` is automatically included in the credentials provider chain for each implementation, so TaskManagers will consume the distributed tokens without additional configuration. + +#### Access Keys + +Access to S3 can be granted via your **access and secret key pair**. While access keys are not inherently insecure, IAM roles are preferred as they avoid the need to manage and distribute static credentials. See the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2) for more context. + +You need to configure both `s3.access-key` and `s3.secret-key` in Flink's [configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml s3.access-key: your-access-key s3.secret-key: your-secret-key ``` -You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}). +### Configure Non-S3 Endpoint + +The S3 filesystems also support using S3 compliant object stores. +To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml -# flink-s3-fs-hadoop -fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider -# flink-s3-fs-presto -presto.s3.credentials-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider +s3.endpoint: your-endpoint-hostname ``` -## 配置非 S3 访问点 +### Configure Path Style Access -S3 文件系统还支持兼容 S3 的对象存储服务,如 [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) 和 [Minio](https://min.io/)。可在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}}) 中配置使用的访问点: +Some S3 compliant object stores might not have virtual host style addressing enabled by default. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml -s3.endpoint: your-endpoint-hostname +s3.path-style-access: true +``` + +{{< hint info >}} +The legacy configuration key `s3.path.style.access` is still supported as a fallback for backward compatibility. +{{< /hint >}} + +## Implementation Details + +### Native S3 FileSystem (Experimental) + +{{< hint warning >}} +**Experimental**: The Native S3 FileSystem is experimental in Flink 2.3. It is functionally complete and has demonstrated strong performance in benchmarks. +{{< /hint >}} + +The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2 completely removing the dependency on Hadoop. It is registered under the schemes *s3://* and *s3a://*. It provides a drop-in replacement for the Presto and Hadoop implementations, supporting checkpointing, the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}}) (via `RecoverableWriter`), server-side encryption (SSE-S3, SSE-KMS), cross-account access via IAM role assumption, entropy injection, and bulk copy via S3TransferManager. + +#### Setup + +To use the Native S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-native +cp ./opt/flink-s3-fs-native-{{< version >}}.jar ./plugins/s3-fs-native/ ``` -## 配置路径样式的访问 +#### Configuration -某些兼容 S3 的对象存储服务可能没有默认启用虚拟主机样式的寻址。这种情况下需要在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}}) 中添加配置以启用路径样式的访问: +In addition to the [common configuration](#common-configuration) options (`s3.access-key`, `s3.secret-key`, `s3.endpoint`, `s3.path-style-access`), the Native S3 FileSystem supports the following options: ```yaml -s3.path.style.access: true +# Server-side encryption +s3.sse.type: sse-s3 # or sse-kms, aws:kms, AES256, none (default) +s3.sse.kms.key-id: arn:aws:kms:region:account:key/id # Required for SSE-KMS + +# IAM role assumption for cross-account access +s3.assume-role.arn: arn:aws:iam::account:role/RoleName +s3.assume-role.external-id: external-id-if-required +s3.assume-role.session-name: flink-s3-session +s3.assume-role.session-duration: 3600 + +# Performance tuning +s3.upload.min.part.size: 5242880 # 5 MB default +s3.upload.max.concurrent.uploads: 4 # Based on CPU cores +s3.read.buffer.size: 262144 # 256 KB default +s3.async.enabled: true # Async read/write operations +s3.bulk-copy.enabled: true # Bulk copy via S3TransferManager +s3.bulk-copy.max-concurrent: 16 # Max concurrent copy ops ``` -## S3 文件系统的熵注入 +When `fs.s3.aws.credentials.provider` is not set, the Native S3 FileSystem automatically builds a credentials chain in the following order: delegation tokens, static credentials (if `s3.access-key` and `s3.secret-key` are configured), and the AWS SDK v2 `DefaultCredentialsProvider` (environment variables, instance profiles, etc.). You only need to set this option if you require a custom provider chain. -内置的 S3 文件系统 (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) 支持熵注入。熵注入是通过在关键字开头附近添加随机字符,以提高 AWS S3 bucket 可扩展性的技术。 +--- -如果熵注入被启用,路径中配置好的字串将会被随机字符所替换。例如路径 `s3://my-bucket/_entropy_/checkpoints/dashboard-job/` 将会被替换成类似于 `s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/` 的路径。 -**这仅在使用熵注入选项创建文件时启用!** -否则将完全删除文件路径中的 entropy key。更多细节请参见 [FileSystem.create(Path, WriteOption)](https://nightlies.apache.org/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-)。 +### Presto S3 FileSystem {{< hint info >}} -目前 Flink 运行时仅对 checkpoint 数据文件使用熵注入选项。所有其他文件包括 checkpoint 元数据与外部 URI 都不使用熵注入,以保证 checkpoint URI 的可预测性。 +You don't have to configure this manually if you are running [Flink on EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html). {{< /hint >}} -配置 *entropy key* 与 *entropy length* 参数以启用熵注入: +The Presto S3 FileSystem is based on code from the [Presto project](https://prestodb.io/). It is registered under the schemes *s3://* and *s3p://*. It is the production-proven choice for checkpointing to S3. It does not support the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}}) (`createRecoverableWriter` throws `UnsupportedOperationException`). + +#### Setup + +To use the Presto S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-presto +cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/ +``` + +#### Configuration + +The [common configuration](#common-configuration) options apply. In addition, Presto-specific keys are supported via the [Presto file system configuration](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration). + +--- + +### Hadoop S3 FileSystem + +The Hadoop S3 FileSystem is based on code from the [Hadoop Project](https://hadoop.apache.org/). It is registered under the schemes *s3://* and *s3a://*. It is the only stable implementation that supports the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}}) (via `RecoverableWriter`). + +#### Setup + +To use the Hadoop S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-hadoop +cp ./opt/flink-s3-fs-hadoop-{{< version >}}.jar ./plugins/s3-fs-hadoop/ +``` + +#### Configuration + +The [common configuration](#common-configuration) options apply. In addition, [Hadoop's s3a configuration keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) are supported. Hadoop configuration keys are automatically translated — for example, `fs.s3a.connection.maximum` becomes `s3.connection.maximum`. + +--- + +## Using Multiple S3 Implementations + +All three S3 implementations register as handlers for the *s3://* scheme. Additionally, each implementation supports alternative schemes: + +| Implementation | Schemes | +|---------------|---------| +| Native S3 | *s3://*, *s3a://* | +| Presto | *s3://*, *s3p://* | +| Hadoop | *s3://*, *s3a://* | + +It is safe to load multiple S3 plugin JARs simultaneously — the priority mechanism ensures only one factory handles each scheme. The Native S3 implementation has the lowest priority (`-1` vs the default `0`), so when another implementation is present, it will take precedence for all overlapping schemes (e.g., *s3://* and *s3a://*). You can override factory priorities via the `fs..priority.` configuration option. + +You can use multiple S3 implementations simultaneously by leveraging their different URI schemes. For example, if a job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) sink with Hadoop but Presto for checkpointing: + +- Use *s3a://* scheme for the sink (Hadoop) +- Use *s3p://* scheme for checkpointing (Presto) + +{{< hint info >}} +The Native S3 implementation does not introduce a new URI scheme. It supports the existing *s3://* and *s3a://* schemes. Since both the Native S3 and Hadoop implementations register for the same schemes, Flink uses a priority-based mechanism to select which factory handles each scheme. By default, Native S3 has the lowest priority and will **not** be selected when another implementation is present for the same scheme. + +To use the Native S3 implementation, either place only the `flink-s3-fs-native` plugin JAR in the `plugins` directory, or use the `fs..priority.` configuration to raise its priority while other implementations are present in `plugins`. +{{< /hint >}} + +--- + +## Advanced Features + +### Entropy Injection + +All S3 file systems support entropy injection, a technique to improve the scalability of AWS S3 buckets through adding random characters near the beginning of the key. + +If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path +`s3://my-bucket/_entropy_/checkpoints/dashboard-job/` would be replaced by something like `s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/`. +**This only happens when the file creation passes the option to inject entropy!** +Otherwise, the file path removes the entropy key substring entirely. See [FileSystem.create(Path, WriteOption)](https://nightlies.apache.org/flink/flink-docs-release-1.6/api/java/org/apache/flink/core/fs/FileSystem.html#create-org.apache.flink.core.fs.Path-org.apache.flink.core.fs.FileSystem.WriteOptions-) +for details. + +{{< hint info >}} +The Flink runtime currently passes the option to inject entropy only to checkpoint data files. All other files, including checkpoint metadata and external URI, do not inject entropy to keep checkpoint URIs predictable. +{{< /hint >}} + +To enable entropy injection, configure the *entropy key* and the *entropy length* parameters. ``` s3.entropy.key: _entropy_ @@ -163,14 +285,16 @@ s3.entropy.length: 4 (default) ``` -`s3.entropy.key` 定义了路径中被随机字符替换掉的字符串。不包含 entropy key 路径将保持不变。 -如果文件系统操作没有经过 *"熵注入"* 写入,entropy key 字串将被直接移除。 -`s3.entropy.length` 定义了用于熵注入的随机字母/数字字符的数量。 +The `s3.entropy.key` defines the string in paths that is replaced by the random characters. Paths that do not contain the entropy key are left unchanged. +If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed. +The `s3.entropy.length` defines the number of random alphanumeric characters used for entropy. -## s5cmd +### s5cmd + +**Supported by**: Presto S3 FileSystem, Hadoop S3 FileSystem Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` can be configured to use the [s5cmd tool](https://github.com/peak/s5cmd) for faster file upload and download. -[Benchmark results](https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support) are showing that `s5cmd` can be over 2 times more CPU efficient. +[Benchmark results](https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support) are showing that `s5cmd` can be over 2 times more CPU efficient. Which means either using half the CPU to upload or download the same set of files, or doing that twice as fast with the same amount of available CPU. In order to use this feature, the `s5cmd` binary has to be present and accessible to the Flink's task managers, for example via embedding it in the used docker image. @@ -179,7 +303,10 @@ Secondly, the path to the `s5cmd` has to be configured via: s3.s5cmd.path: /path/to/the/s5cmd ``` +#### Configuration + The remaining configuration options (with their default value listed below) are: + ```yaml # Extra arguments that will be passed directly to the s5cmd call. Please refer to the s5cmd's official documentation. s3.s5cmd.args: -r 0 @@ -188,17 +315,19 @@ s3.s5cmd.batch.max-size: 1024mb # Maximum number of files that will be uploaded via a single s5cmd call. s3.s5cmd.batch.max-files: 100 ``` -Both `s3.s5cmd.batch.max-size` and `s3.s5cmd.batch.max-files` are used to control resource usage of the `s5cmd` binary, to prevent it from overloading the task manager. -It is recommended to first configure and making sure Flink works without using `s5cmd` and only then enabling this feature. +Both `s3.s5cmd.batch.max-size` and `s3.s5cmd.batch.max-files` control resource usage of the `s5cmd` binary to prevent it from overloading the task manager. + +It is recommended to first configure and verify that Flink works without using `s5cmd`, then enable this feature. -### Credentials +#### Credentials -If you are using [access keys](#access-keys-discouraged), they will be passed to the `s5cmd`. -Apart from that `s5cmd` has its own independent (but similar) of Flink way of [using credentials](https://github.com/peak/s5cmd?tab=readme-ov-file#specifying-credentials). +If you are using [access keys](#access-keys), they will be passed to `s5cmd`. +Apart from that, `s5cmd` has its own independent way of [using credentials](https://github.com/peak/s5cmd?tab=readme-ov-file#specifying-credentials). -### Limitations +#### Limitations -Currently, Flink will use `s5cmd` only during recovery, when downloading state files from S3 and using RocksDB. +Currently, `flink-s3-fs-hadoop` and `flink-s3-fs-presto` use `s5cmd` only during recovery, when downloading state files from S3 and using RocksDB. +`flink-s3-fs-native` uses `S3TransferManager` when enabled via `s3.bulk-copy.enabled` (default: `true`) for bulk copy operations and `s3.async.enabled` (default: `true`) for async read/write, providing similar performance benefits. {{< top >}} diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md index 965cc3c0130f9..9b96d4519d585 100644 --- a/docs/content/docs/deployment/filesystems/s3.md +++ b/docs/content/docs/deployment/filesystems/s3.md @@ -64,94 +64,208 @@ env.configure(config); Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup]({{< ref "docs/deployment/ha/overview" >}}) or the [EmbeddedRocksDBStateBackend]({{< ref "docs/ops/state/state_backends" >}}#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI (unless otherwise stated). -For most use cases, you may use one of our `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 filesystem plugins which are self-contained and easy to set up. -For some cases, however, e.g., for using S3 as YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 filesystem implementation. +## S3 FileSystem Implementations -### Hadoop/Presto S3 File Systems plugins +Flink provides three independent S3 filesystem implementations: -{{< hint info >}} -You don't have to configure this manually if you are running [Flink on EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html). -{{< /hint >}} +| Implementation | Checkpointing | FileSink | Notes | +|---------------|:---:|:---:|-------| +| **Native S3** (`flink-s3-fs-native`) | ✓ | ✓ | **Experimental** in Flink 2.3. Built on AWS SDK v2; no Hadoop dependency. | +| **Presto S3** (`flink-s3-fs-presto`) | ✓ | x | Production-proven for checkpointing. | +| **Hadoop S3** (`flink-s3-fs-hadoop`) | ✓ | ✓ | Mature; the only stable implementation that provides `RecoverableWriter` for the FileSink. | -Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. -Both implementations are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. +Previously, users had to choose between Presto (recommended for checkpointing throughput) and Hadoop (the only implementation with `RecoverableWriter`, required by the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}})). The Native S3 implementation unifies both capabilities in a single plugin and measurements show significant checkpoint throughput improvements over the Presto implementation. - - `flink-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is based on code from the [Presto project](https://prestodb.io/). - You can configure it using [the same configuration keys as the Presto file system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration), by adding the configurations to your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). The Presto S3 implementation is the recommended file system for checkpointing to S3. +All three are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. - - `flink-s3-fs-hadoop`, registered under *s3://* and *s3a://*, based on code from the [Hadoop Project](https://hadoop.apache.org/). - The file system can be [configured using Hadoop's s3a configuration keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by adding the configurations to your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). - - For example, Hadoop has a `fs.s3a.connection.maximum` configuration key. If you want to change it, you need to put `s3.connection.maximum: xyz` to the [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). Flink will internally translate this back to `fs.s3a.connection.maximum`. There is no need to pass configuration parameters using Hadoop's XML configuration files. - - It is the only S3 file system with support for the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}). - +## Common Configuration -Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem -wrappers for URIs with the *s3://* scheme, `flink-s3-fs-hadoop` also registers -for *s3a://* and `flink-s3-fs-presto` also registers for *s3p://*, so you can -use this to use both at the same time. -For example, the job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) which only supports Hadoop, but uses Presto for checkpointing. -In this case, you should explicitly use *s3a://* as a scheme for the sink (Hadoop) and *s3p://* for checkpointing (Presto). +### Configure Access Credentials -To use `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the `opt` directory to the `plugins` directory of your Flink distribution before starting Flink, e.g. +After setting up the S3 FileSystem implementation, you need to make sure that Flink is allowed to access your S3 buckets. The following three approaches are **independent alternatives** — choose the one that fits your environment: -```bash -mkdir ./plugins/s3-fs-presto -cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/ -``` +#### Identity and Access Management (IAM) (Recommended) -#### Configure Access Credentials +The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). -After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets. +If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. -##### Identity and Access Management (IAM) (Recommended) +#### Delegation Tokens -The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). +[Delegation tokens]({{< ref "docs/deployment/security/security-delegation-token" >}}) provide time-bounded, automatically negotiated credentials. The JobManager uses long-lived credentials (access key and secret key) to call [AWS STS](https://docs.aws.amazon.com/STS/latest/APIReference/welcome.html) and obtain short-lived session tokens, which are then automatically distributed to TaskManagers. -If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. +Each S3 implementation has its own delegation token provider with a dedicated configuration prefix. You must set the `access-key`, `secret-key`, and `region` under the corresponding prefix for the implementation you are using: -##### Access Keys (Discouraged) +```yaml +# For Native S3 implementation +security.delegation.token.provider.s3-native.access-key: your-access-key +security.delegation.token.provider.s3-native.secret-key: your-secret-key +security.delegation.token.provider.s3-native.region: us-east-1 + +# For Hadoop implementation +security.delegation.token.provider.s3-hadoop.access-key: your-access-key +security.delegation.token.provider.s3-hadoop.secret-key: your-secret-key +security.delegation.token.provider.s3-hadoop.region: us-east-1 + +# For Presto implementation +security.delegation.token.provider.s3-presto.access-key: your-access-key +security.delegation.token.provider.s3-presto.secret-key: your-secret-key +security.delegation.token.provider.s3-presto.region: us-east-1 +``` + +All three values (`access-key`, `secret-key`, `region`) must be set for delegation tokens to be issued. The `DynamicTemporaryAWSCredentialsProvider` is automatically included in the credentials provider chain for each implementation, so TaskManagers will consume the distributed tokens without additional configuration. -Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). +#### Access Keys -You need to configure both `s3.access-key` and `s3.secret-key` in Flink's [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): +Access to S3 can be granted via your **access and secret key pair**. While access keys are not inherently insecure, IAM roles are preferred as they avoid the need to manage and distribute static credentials. See the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2) for more context. + +You need to configure both `s3.access-key` and `s3.secret-key` in Flink's [configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml s3.access-key: your-access-key s3.secret-key: your-secret-key ``` -You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}). +### Configure Non-S3 Endpoint + +The S3 filesystems also support using S3 compliant object stores. +To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml -# flink-s3-fs-hadoop -fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider -# flink-s3-fs-presto -presto.s3.credentials-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider +s3.endpoint: your-endpoint-hostname ``` -## Configure Non-S3 Endpoint +### Configure Path Style Access -The S3 Filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [MinIO](https://min.io/). -To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). +Some S3 compliant object stores might not have virtual host style addressing enabled by default. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml -s3.endpoint: your-endpoint-hostname +s3.path-style-access: true ``` -## Configure Path Style Access +{{< hint info >}} +The legacy configuration key `s3.path.style.access` is still supported as a fallback for backward compatibility. +{{< /hint >}} + +## Implementation Details -Some S3 compliant object stores might not have virtual host style addressing enabled by default, for example when using Standalone MinIO for testing purpose. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). +### Native S3 FileSystem (Experimental) + +{{< hint warning >}} +**Experimental**: The Native S3 FileSystem is experimental in Flink 2.3. It is functionally complete and has demonstrated strong performance in benchmarks. +{{< /hint >}} + +The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2 completely removing the dependency on Hadoop. It is registered under the schemes *s3://* and *s3a://*. It provides a drop-in replacement for the Presto and Hadoop implementations, supporting checkpointing, the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}}) (via `RecoverableWriter`), server-side encryption (SSE-S3, SSE-KMS), cross-account access via IAM role assumption, entropy injection, and bulk copy via S3TransferManager. + +#### Setup + +To use the Native S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-native +cp ./opt/flink-s3-fs-native-{{< version >}}.jar ./plugins/s3-fs-native/ +``` + +#### Configuration + +In addition to the [common configuration](#common-configuration) options (`s3.access-key`, `s3.secret-key`, `s3.endpoint`, `s3.path-style-access`), the Native S3 FileSystem supports the following options: ```yaml -s3.path.style.access: true +# Server-side encryption +s3.sse.type: sse-s3 # or sse-kms, aws:kms, AES256, none (default) +s3.sse.kms.key-id: arn:aws:kms:region:account:key/id # Required for SSE-KMS + +# IAM role assumption for cross-account access +s3.assume-role.arn: arn:aws:iam::account:role/RoleName +s3.assume-role.external-id: external-id-if-required +s3.assume-role.session-name: flink-s3-session +s3.assume-role.session-duration: 3600 + +# Performance tuning +s3.upload.min.part.size: 5242880 # 5 MB default +s3.upload.max.concurrent.uploads: 4 # Based on CPU cores +s3.read.buffer.size: 262144 # 256 KB default +s3.async.enabled: true # Async read/write operations +s3.bulk-copy.enabled: true # Bulk copy via S3TransferManager +s3.bulk-copy.max-concurrent: 16 # Max concurrent copy ops ``` -## Entropy injection for S3 file systems +When `fs.s3.aws.credentials.provider` is not set, the Native S3 FileSystem automatically builds a credentials chain in the following order: delegation tokens, static credentials (if `s3.access-key` and `s3.secret-key` are configured), and the AWS SDK v2 `DefaultCredentialsProvider` (environment variables, instance profiles, etc.). You only need to set this option if you require a custom provider chain. + +--- + +### Presto S3 FileSystem + +{{< hint info >}} +You don't have to configure this manually if you are running [Flink on EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html). +{{< /hint >}} + +The Presto S3 FileSystem is based on code from the [Presto project](https://prestodb.io/). It is registered under the schemes *s3://* and *s3p://*. It is the production-proven choice for checkpointing to S3. It does not support the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}}) (`createRecoverableWriter` throws `UnsupportedOperationException`). + +#### Setup -The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is -a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. +To use the Presto S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-presto +cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/ +``` + +#### Configuration + +The [common configuration](#common-configuration) options apply. In addition, Presto-specific keys are supported via the [Presto file system configuration](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration). + +--- + +### Hadoop S3 FileSystem + +The Hadoop S3 FileSystem is based on code from the [Hadoop Project](https://hadoop.apache.org/). It is registered under the schemes *s3://* and *s3a://*. It is the only stable implementation that supports the [FileSink]({{< ref "docs/connectors/datastream/filesystem" >}}) (via `RecoverableWriter`). + +#### Setup + +To use the Hadoop S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-hadoop +cp ./opt/flink-s3-fs-hadoop-{{< version >}}.jar ./plugins/s3-fs-hadoop/ +``` + +#### Configuration + +The [common configuration](#common-configuration) options apply. In addition, [Hadoop's s3a configuration keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) are supported. Hadoop configuration keys are automatically translated — for example, `fs.s3a.connection.maximum` becomes `s3.connection.maximum`. + +--- + +## Using Multiple S3 Implementations + +All three S3 implementations register as handlers for the *s3://* scheme. Additionally, each implementation supports alternative schemes: + +| Implementation | Schemes | +|---------------|---------| +| Native S3 | *s3://*, *s3a://* | +| Presto | *s3://*, *s3p://* | +| Hadoop | *s3://*, *s3a://* | + +It is safe to load multiple S3 plugin JARs simultaneously — the priority mechanism ensures only one factory handles each scheme. The Native S3 implementation has the lowest priority (`-1` vs the default `0`), so when another implementation is present, it will take precedence for all overlapping schemes (e.g., *s3://* and *s3a://*). You can override factory priorities via the `fs..priority.` configuration option. + +You can use multiple S3 implementations simultaneously by leveraging their different URI schemes. For example, if a job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) sink with Hadoop but Presto for checkpointing: + +- Use *s3a://* scheme for the sink (Hadoop) +- Use *s3p://* scheme for checkpointing (Presto) + +{{< hint info >}} +The Native S3 implementation does not introduce a new URI scheme. It supports the existing *s3://* and *s3a://* schemes. Since both the Native S3 and Hadoop implementations register for the same schemes, Flink uses a priority-based mechanism to select which factory handles each scheme. By default, Native S3 has the lowest priority and will **not** be selected when another implementation is present for the same scheme. + +To use the Native S3 implementation, either place only the `flink-s3-fs-native` plugin JAR in the `plugins` directory, or use the `fs..priority.` configuration to raise its priority while other implementations are present in `plugins`. +{{< /hint >}} + +--- + +## Advanced Features + +### Entropy Injection + +All S3 file systems support entropy injection, a technique to improve the scalability of AWS S3 buckets through adding random characters near the beginning of the key. If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path `s3://my-bucket/_entropy_/checkpoints/dashboard-job/` would be replaced by something like `s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/`. @@ -175,7 +289,9 @@ The `s3.entropy.key` defines the string in paths that is replaced by the random If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed. The `s3.entropy.length` defines the number of random alphanumeric characters used for entropy. -## s5cmd +### s5cmd + +**Supported by**: Presto S3 FileSystem, Hadoop S3 FileSystem Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` can be configured to use the [s5cmd tool](https://github.com/peak/s5cmd) for faster file upload and download. [Benchmark results](https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support) are showing that `s5cmd` can be over 2 times more CPU efficient. @@ -187,7 +303,10 @@ Secondly, the path to the `s5cmd` has to be configured via: s3.s5cmd.path: /path/to/the/s5cmd ``` +#### Configuration + The remaining configuration options (with their default value listed below) are: + ```yaml # Extra arguments that will be passed directly to the s5cmd call. Please refer to the s5cmd's official documentation. s3.s5cmd.args: -r 0 @@ -196,17 +315,19 @@ s3.s5cmd.batch.max-size: 1024mb # Maximum number of files that will be uploaded via a single s5cmd call. s3.s5cmd.batch.max-files: 100 ``` -Both `s3.s5cmd.batch.max-size` and `s3.s5cmd.batch.max-files` are used to control resource usage of the `s5cmd` binary, to prevent it from overloading the task manager. -It is recommended to first configure and making sure Flink works without using `s5cmd` and only then enabling this feature. +Both `s3.s5cmd.batch.max-size` and `s3.s5cmd.batch.max-files` control resource usage of the `s5cmd` binary to prevent it from overloading the task manager. + +It is recommended to first configure and verify that Flink works without using `s5cmd`, then enable this feature. -### Credentials +#### Credentials -If you are using [access keys](#access-keys-discouraged), they will be passed to the `s5cmd`. -Apart from that `s5cmd` has its own independent (but similar) of Flink way of [using credentials](https://github.com/peak/s5cmd?tab=readme-ov-file#specifying-credentials). +If you are using [access keys](#access-keys), they will be passed to `s5cmd`. +Apart from that, `s5cmd` has its own independent way of [using credentials](https://github.com/peak/s5cmd?tab=readme-ov-file#specifying-credentials). -### Limitations +#### Limitations -Currently, Flink will use `s5cmd` only during recovery, when downloading state files from S3 and using RocksDB. +Currently, `flink-s3-fs-hadoop` and `flink-s3-fs-presto` use `s5cmd` only during recovery, when downloading state files from S3 and using RocksDB. +`flink-s3-fs-native` uses `S3TransferManager` when enabled via `s3.bulk-copy.enabled` (default: `true`) for bulk copy operations and `s3.async.enabled` (default: `true`) for async read/write, providing similar performance benefits. {{< top >}}