|
| 1 | +--- |
| 2 | +title: "Kafka Connect" |
| 3 | +--- |
| 4 | +<!-- |
| 5 | + - Licensed to the Apache Software Foundation (ASF) under one or more |
| 6 | + - contributor license agreements. See the NOTICE file distributed with |
| 7 | + - this work for additional information regarding copyright ownership. |
| 8 | + - The ASF licenses this file to You under the Apache License, Version 2.0 |
| 9 | + - (the "License"); you may not use this file except in compliance with |
| 10 | + - the License. You may obtain a copy of the License at |
| 11 | + - |
| 12 | + - http://www.apache.org/licenses/LICENSE-2.0 |
| 13 | + - |
| 14 | + - Unless required by applicable law or agreed to in writing, software |
| 15 | + - distributed under the License is distributed on an "AS IS" BASIS, |
| 16 | + - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 17 | + - See the License for the specific language governing permissions and |
| 18 | + - limitations under the License. |
| 19 | + --> |
| 20 | + |
| 21 | +# Kafka Connect |
| 22 | + |
| 23 | +[Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) is a popular framework for moving data |
| 24 | +in and out of Kafka via connectors. There are many different connectors available, such as the S3 sink |
| 25 | +for writing data from Kafka to S3 and Debezium source connectors for writing change data capture records from relational |
| 26 | +databases to Kafka. |
| 27 | + |
| 28 | +It has a straightforward, decentralized, distributed architecture. A cluster consists of a number of worker processes, |
| 29 | +and a connector runs tasks on these processes to perform the work. Connector deployment is configuration driven, so |
| 30 | +generally no code needs to be written to run a connector. |
| 31 | + |
| 32 | +## Apache Iceberg Sink Connector |
| 33 | + |
| 34 | +The Apache Iceberg Sink Connector for Kafka Connect is a sink connector for writing data from Kafka into Iceberg tables. |
| 35 | + |
| 36 | +## Features |
| 37 | + |
| 38 | +* Commit coordination for centralized Iceberg commits |
| 39 | +* Exactly-once delivery semantics |
| 40 | +* Multi-table fan-out |
| 41 | +* Automatic table creation and schema evolution |
| 42 | +* Field name mapping via Iceberg’s column mapping functionality |
| 43 | + |
| 44 | +## Installation |
| 45 | + |
| 46 | +The connector zip archive is created as part of the Iceberg build. You can run the build via: |
| 47 | +```bash |
| 48 | +./gradlew -x test -x integrationTest clean build |
| 49 | +``` |
| 50 | +The zip archive will be found under `./kafka-connect/kafka-connect-runtime/build/distributions`. There is |
| 51 | +one distribution that bundles the Hive Metastore client and related dependencies, and one that does not. |
| 52 | +Copy the distribution archive into the Kafka Connect plugins directory on all nodes. |
| 53 | + |
| 54 | +## Requirements |
| 55 | + |
| 56 | +The sink relies on [KIP-447](https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics) |
| 57 | +for exactly-once semantics. This requires Kafka 2.5 or later. |
| 58 | + |
| 59 | +## Configuration |
| 60 | + |
| 61 | +| Property | Description | |
| 62 | +|--------------------------------------------|------------------------------------------------------------------------------------------------------------------| |
| 63 | +| iceberg.tables | Comma-separated list of destination tables | |
| 64 | +| iceberg.tables.dynamic-enabled | Set to `true` to route to a table specified in `routeField` instead of using `routeRegex`, default is `false` | |
| 65 | +| iceberg.tables.route-field | For multi-table fan-out, the name of the field used to route records to tables | |
| 66 | +| iceberg.tables.default-commit-branch | Default branch for commits, main is used if not specified | |
| 67 | +| iceberg.tables.default-id-columns | Default comma-separated list of columns that identify a row in tables (primary key) | |
| 68 | +| iceberg.tables.default-partition-by | Default comma-separated list of partition field names to use when creating tables | |
| 69 | +| iceberg.tables.auto-create-enabled | Set to `true` to automatically create destination tables, default is `false` | |
| 70 | +| iceberg.tables.evolve-schema-enabled | Set to `true` to add any missing record fields to the table schema, default is `false` | |
| 71 | +| iceberg.tables.schema-force-optional | Set to `true` to set columns as optional during table create and evolution, default is `false` to respect schema | |
| 72 | +| iceberg.tables.schema-case-insensitive | Set to `true` to look up table columns by case-insensitive name, default is `false` for case-sensitive | |
| 73 | +| iceberg.tables.auto-create-props.* | Properties set on new tables during auto-create | |
| 74 | +| iceberg.tables.write-props.* | Properties passed through to Iceberg writer initialization, these take precedence | |
| 75 | +| iceberg.table.\<table name\>.commit-branch | Table-specific branch for commits, use `iceberg.tables.default-commit-branch` if not specified | |
| 76 | +| iceberg.table.\<table name\>.id-columns | Comma-separated list of columns that identify a row in the table (primary key) | |
| 77 | +| iceberg.table.\<table name\>.partition-by | Comma-separated list of partition fields to use when creating the table | |
| 78 | +| iceberg.table.\<table name\>.route-regex | The regex used to match a record's `routeField` to a table | |
| 79 | +| iceberg.control.topic | Name of the control topic, default is `control-iceberg` | |
| 80 | +| iceberg.control.commit.interval-ms | Commit interval in msec, default is 300,000 (5 min) | |
| 81 | +| iceberg.control.commit.timeout-ms | Commit timeout interval in msec, default is 30,000 (30 sec) | |
| 82 | +| iceberg.control.commit.threads | Number of threads to use for commits, default is (cores * 2) | |
| 83 | +| iceberg.catalog | Name of the catalog, default is `iceberg` | |
| 84 | +| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | |
| 85 | +| iceberg.hadoop-conf-dir | If specified, Hadoop config files in this directory will be loaded | |
| 86 | +| iceberg.hadoop.* | Properties passed through to the Hadoop configuration | |
| 87 | +| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization | |
| 88 | + |
| 89 | +If `iceberg.tables.dynamic-enabled` is `false` (the default) then you must specify `iceberg.tables`. If |
| 90 | +`iceberg.tables.dynamic-enabled` is `true` then you must specify `iceberg.tables.route-field` which will |
| 91 | +contain the name of the table. |
| 92 | + |
| 93 | +### Kafka configuration |
| 94 | + |
| 95 | +By default the connector will attempt to use Kafka client config from the worker properties for connecting to |
| 96 | +the control topic. If that config cannot be read for some reason, Kafka client settings |
| 97 | +can be set explicitly using `iceberg.kafka.*` properties. |
| 98 | + |
| 99 | +#### Message format |
| 100 | + |
| 101 | +Messages should be converted to a struct or map using the appropriate Kafka Connect converter. |
| 102 | + |
| 103 | +### Catalog configuration |
| 104 | + |
| 105 | +The `iceberg.catalog.*` properties are required for connecting to the Iceberg catalog. The core catalog |
| 106 | +types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie, |
| 107 | +JDBC, and Hive. JDBC drivers are not included in the default distribution, so you will need to include |
| 108 | +those if needed. When using a Hive catalog, you can use the distribution that includes the Hive metastore client, |
| 109 | +otherwise you will need to include that yourself. |
| 110 | + |
| 111 | +To set the catalog type, you can set `iceberg.catalog.type` to `rest`, `hive`, or `hadoop`. For other |
| 112 | +catalog types, you need to instead set `iceberg.catalog.catalog-impl` to the name of the catalog class. |
| 113 | + |
| 114 | +#### REST example |
| 115 | + |
| 116 | +``` |
| 117 | +"iceberg.catalog.type": "rest", |
| 118 | +"iceberg.catalog.uri": "https://catalog-service", |
| 119 | +"iceberg.catalog.credential": "<credential>", |
| 120 | +"iceberg.catalog.warehouse": "<warehouse>", |
| 121 | +``` |
| 122 | + |
| 123 | +#### Hive example |
| 124 | + |
| 125 | +NOTE: Use the distribution that includes the HMS client (or include the HMS client yourself). Use `S3FileIO` when |
| 126 | +using S3 for storage (the default is `HadoopFileIO` with `HiveCatalog`). |
| 127 | +``` |
| 128 | +"iceberg.catalog.type": "hive", |
| 129 | +"iceberg.catalog.uri": "thrift://hive:9083", |
| 130 | +"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", |
| 131 | +"iceberg.catalog.warehouse": "s3a://bucket/warehouse", |
| 132 | +"iceberg.catalog.client.region": "us-east-1", |
| 133 | +"iceberg.catalog.s3.access-key-id": "<AWS access>", |
| 134 | +"iceberg.catalog.s3.secret-access-key": "<AWS secret>", |
| 135 | +``` |
| 136 | + |
| 137 | +#### Glue example |
| 138 | + |
| 139 | +``` |
| 140 | +"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", |
| 141 | +"iceberg.catalog.warehouse": "s3a://bucket/warehouse", |
| 142 | +"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", |
| 143 | +``` |
| 144 | + |
| 145 | +#### Nessie example |
| 146 | + |
| 147 | +``` |
| 148 | +"iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog", |
| 149 | +"iceberg.catalog.uri": "http://localhost:19120/api/v2", |
| 150 | +"iceberg.catalog.ref": "main", |
| 151 | +"iceberg.catalog.warehouse": "s3a://bucket/warehouse", |
| 152 | +"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", |
| 153 | +``` |
| 154 | + |
| 155 | +#### Notes |
| 156 | + |
| 157 | +Depending on your setup, you may need to also set `iceberg.catalog.s3.endpoint`, `iceberg.catalog.s3.staging-dir`, |
| 158 | +or `iceberg.catalog.s3.path-style-access`. See the [Iceberg docs](https://iceberg.apache.org/docs/latest/) for |
| 159 | +full details on configuring catalogs. |
| 160 | + |
| 161 | +### Azure ADLS configuration example |
| 162 | + |
| 163 | +When using ADLS, Azure requires the passing of AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_CLIENT_SECRET for its Java SDK. |
| 164 | +If you're running Kafka Connect in a container, be sure to inject those values as environment variables. See the |
| 165 | +[Azure Identity Client library for Java](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for more information. |
| 166 | + |
| 167 | +An example of these would be: |
| 168 | +``` |
| 169 | +AZURE_CLIENT_ID=e564f687-7b89-4b48-80b8-111111111111 |
| 170 | +AZURE_TENANT_ID=95f2f365-f5b7-44b1-88a1-111111111111 |
| 171 | +AZURE_CLIENT_SECRET="XXX" |
| 172 | +``` |
| 173 | +Where the CLIENT_ID is the Application ID of a registered application under |
| 174 | +[App Registrations](https://portal.azure.com/#view/Microsoft_AAD_RegisteredApps/ApplicationsListBlade), the TENANT_ID is |
| 175 | +from your [Azure Tenant Properties](https://portal.azure.com/#view/Microsoft_AAD_IAM/TenantProperties.ReactView), and |
| 176 | +the CLIENT_SECRET is created within the "Certificates & Secrets" section, under "Manage" after choosing your specific |
| 177 | +App Registration. You might have to choose "Client secrets" in the middle panel and the "+" in front of "New client secret" |
| 178 | +to generate one. Be sure to set this variable to the Value and not the Id. |
| 179 | + |
| 180 | +It's also important that the App Registration is granted the Role Assignment "Storage Blob Data Contributor" in your |
| 181 | +Storage Account's Access Control (IAM), or it won't be able to write new files there. |
| 182 | + |
| 183 | +Then, within the Connector's configuration, you'll want to include the following: |
| 184 | + |
| 185 | +``` |
| 186 | +"iceberg.catalog.type": "rest", |
| 187 | +"iceberg.catalog.uri": "https://catalog:8181", |
| 188 | +"iceberg.catalog.warehouse": "abfss://storage-container-name@storageaccount.dfs.core.windows.net/warehouse", |
| 189 | +"iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO", |
| 190 | +"iceberg.catalog.include-credentials": "true" |
| 191 | +``` |
| 192 | + |
| 193 | +Where `storage-container-name` is the container name within your Azure Storage Account, `/warehouse` is the location |
| 194 | +within that container where your Apache Iceberg files will be written by default (or if iceberg.tables.auto-create-enabled=true), |
| 195 | +and the `include-credentials` parameter passes along the Azure Java client credentials along. This will configure the |
| 196 | +Iceberg Sink connector to connect to the REST catalog implementation at `iceberg.catalog.uri` to obtain the required |
| 197 | +Connection String for the ADLSv2 client |
| 198 | + |
| 199 | +### Google GCS configuration example |
| 200 | + |
| 201 | +By default, Application Default Credentials (ADC) will be used to connect to GCS. Details on how ADC works can |
| 202 | +be found in the [Google Cloud documentation](https://cloud.google.com/docs/authentication/application-default-credentials). |
| 203 | + |
| 204 | +``` |
| 205 | +"iceberg.catalog.type": "rest", |
| 206 | +"iceberg.catalog.uri": "https://catalog:8181", |
| 207 | +"iceberg.catalog.warehouse": "gs://bucket-name/warehouse", |
| 208 | +"iceberg.catalog.io-impl": "org.apache.iceberg.google.gcs.GCSFileIO" |
| 209 | +``` |
| 210 | + |
| 211 | +### Hadoop configuration |
| 212 | + |
| 213 | +When using HDFS or Hive, the sink will initialize the Hadoop configuration. First, config files |
| 214 | +from the classpath are loaded. Next, if `iceberg.hadoop-conf-dir` is specified, config files |
| 215 | +are loaded from that location. Finally, any `iceberg.hadoop.*` properties from the sink config are |
| 216 | +applied. When merging these, the order of precedence is sink config > config dir > classpath. |
| 217 | + |
| 218 | +## Examples |
| 219 | + |
| 220 | +### Initial setup |
| 221 | + |
| 222 | +#### Source topic |
| 223 | + |
| 224 | +This assumes the source topic already exists and is named `events`. |
| 225 | + |
| 226 | +#### Control topic |
| 227 | + |
| 228 | +If your Kafka cluster has `auto.create.topics.enable` set to `true` (the default), then the control topic will be |
| 229 | +automatically created. If not, then you will need to create the topic first. The default topic name is `control-iceberg`: |
| 230 | +```bash |
| 231 | +bin/kafka-topics \ |
| 232 | + --command-config command-config.props \ |
| 233 | + --bootstrap-server ${CONNECT_BOOTSTRAP_SERVERS} \ |
| 234 | + --create \ |
| 235 | + --topic control-iceberg \ |
| 236 | + --partitions 1 |
| 237 | +``` |
| 238 | +*NOTE: Clusters running on Confluent Cloud have `auto.create.topics.enable` set to `false` by default.* |
| 239 | + |
| 240 | +#### Iceberg catalog configuration |
| 241 | + |
| 242 | +Configuration properties with the prefix `iceberg.catalog.` will be passed to Iceberg catalog initialization. |
| 243 | +See the [Iceberg docs](https://iceberg.apache.org/docs/latest/) for details on how to configure |
| 244 | +a particular catalog. |
| 245 | + |
| 246 | +### Single destination table |
| 247 | + |
| 248 | +This example writes all incoming records to a single table. |
| 249 | + |
| 250 | +#### Create the destination table |
| 251 | + |
| 252 | +```sql |
| 253 | +CREATE TABLE default.events ( |
| 254 | + id STRING, |
| 255 | + type STRING, |
| 256 | + ts TIMESTAMP, |
| 257 | + payload STRING) |
| 258 | +PARTITIONED BY (hours(ts)) |
| 259 | +``` |
| 260 | + |
| 261 | +#### Connector config |
| 262 | + |
| 263 | +This example config connects to a Iceberg REST catalog. |
| 264 | +```json |
| 265 | +{ |
| 266 | +"name": "events-sink", |
| 267 | +"config": { |
| 268 | + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", |
| 269 | + "tasks.max": "2", |
| 270 | + "topics": "events", |
| 271 | + "iceberg.tables": "default.events", |
| 272 | + "iceberg.catalog.type": "rest", |
| 273 | + "iceberg.catalog.uri": "https://localhost", |
| 274 | + "iceberg.catalog.credential": "<credential>", |
| 275 | + "iceberg.catalog.warehouse": "<warehouse name>" |
| 276 | + } |
| 277 | +} |
| 278 | +``` |
| 279 | + |
| 280 | +### Multi-table fan-out, static routing |
| 281 | + |
| 282 | +This example writes records with `type` set to `list` to the table `default.events_list`, and |
| 283 | +writes records with `type` set to `create` to the table `default.events_create`. Other records |
| 284 | +will be skipped. |
| 285 | + |
| 286 | +#### Create two destination tables |
| 287 | + |
| 288 | +```sql |
| 289 | +CREATE TABLE default.events_list ( |
| 290 | + id STRING, |
| 291 | + type STRING, |
| 292 | + ts TIMESTAMP, |
| 293 | + payload STRING) |
| 294 | +PARTITIONED BY (hours(ts)); |
| 295 | + |
| 296 | +CREATE TABLE default.events_create ( |
| 297 | + id STRING, |
| 298 | + type STRING, |
| 299 | + ts TIMESTAMP, |
| 300 | + payload STRING) |
| 301 | +PARTITIONED BY (hours(ts)); |
| 302 | +``` |
| 303 | + |
| 304 | +#### Connector config |
| 305 | + |
| 306 | +```json |
| 307 | +{ |
| 308 | +"name": "events-sink", |
| 309 | +"config": { |
| 310 | + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", |
| 311 | + "tasks.max": "2", |
| 312 | + "topics": "events", |
| 313 | + "iceberg.tables": "default.events_list,default.events_create", |
| 314 | + "iceberg.tables.route-field": "type", |
| 315 | + "iceberg.table.default.events_list.route-regex": "list", |
| 316 | + "iceberg.table.default.events_create.route-regex": "create", |
| 317 | + "iceberg.catalog.type": "rest", |
| 318 | + "iceberg.catalog.uri": "https://localhost", |
| 319 | + "iceberg.catalog.credential": "<credential>", |
| 320 | + "iceberg.catalog.warehouse": "<warehouse name>" |
| 321 | + } |
| 322 | +} |
| 323 | +``` |
| 324 | + |
| 325 | +### Multi-table fan-out, dynamic routing |
| 326 | + |
| 327 | +This example writes to tables with names from the value in the `db_table` field. If a table with |
| 328 | +the name does not exist, then the record will be skipped. For example, if the record's `db_table` |
| 329 | +field is set to `default.events_list`, then the record is written to the `default.events_list` table. |
| 330 | + |
| 331 | +#### Create two destination tables |
| 332 | + |
| 333 | +See above for creating two tables. |
| 334 | + |
| 335 | +#### Connector config |
| 336 | + |
| 337 | +```json |
| 338 | +{ |
| 339 | +"name": "events-sink", |
| 340 | +"config": { |
| 341 | + "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", |
| 342 | + "tasks.max": "2", |
| 343 | + "topics": "events", |
| 344 | + "iceberg.tables.dynamic-enabled": "true", |
| 345 | + "iceberg.tables.route-field": "db_table", |
| 346 | + "iceberg.catalog.type": "rest", |
| 347 | + "iceberg.catalog.uri": "https://localhost", |
| 348 | + "iceberg.catalog.credential": "<credential>", |
| 349 | + "iceberg.catalog.warehouse": "<warehouse name>" |
| 350 | + } |
| 351 | +} |
| 352 | +``` |
0 commit comments