Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Fill out the form by providing your ClickPipe with a name, a description (option
<Image img={cp_step2} alt="Fill out connection details" size="md"/>

## Configure a schema registry (optional) {#4-configure-your-schema-registry}
A valid schema is required for Avro streams. See [Schema registries](./02_schema-registries.md) for more details on how to configure a schema registry.
A valid schema is required for Avro and Protobuf streams. See [Schema registries](./02_schema-registries.md) for more details on how to configure a schema registry.

## Configure a reverse private endpoint (optional) {#5-configure-reverse-private-endpoint}
Configure a Reverse Private Endpoint to allow ClickPipes to connect to your Kafka cluster using AWS PrivateLink.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

# Schema registries {#schema-registries}

ClickPipes supports schema registries for Avro data streams.
ClickPipes supports schema registries for Avro and Protobuf data streams.

## Supported registries for Kafka ClickPipes {#supported-schema-registries}

Expand All @@ -26,7 +26,7 @@

## Configuration {#schema-registry-configuration}

ClickPipes with Avro data require a schema registry. This can be configured in one of three ways:
ClickPipes with Avro or Protobuf data require a schema registry. This can be configured in one of three ways:

1. Providing a complete path to the schema subject (e.g. `https://registry.example.com/subjects/events`)
- Optionally, a specific version can be referenced by appending `/versions/[version]` to the url (otherwise ClickPipes will retrieve the latest version).
Expand All @@ -35,16 +35,17 @@

## How it works {#how-schema-registries-work}

ClickPipes dynamically retrieves and applies the Avro schema from the configured schema registry.
ClickPipes dynamically retrieves and applies the schema from the configured schema registry.
- If there's a schema id embedded in the message, it will use that to retrieve the schema.
- If there's no schema id embedded in the message, it will use the schema id or subject name specified in the ClickPipe configuration to retrieve the schema.
- If the message is written without an embedded schema id, and no schema id or subject name is specified in the ClickPipe configuration, then the schema will not be retrieved and the message will be skipped with a `SOURCE_SCHEMA_ERROR` logged in the ClickPipes errors table.
- If the message does not conform to the schema, then the message will be skipped with a `DATA_PARSING_ERROR` logged in the ClickPipes errors table.
- For Protobuf schemas only, ClickPipes will load any imported schemas defined as dependencies. Avro schemas with external references are not yet supported.

Check notice on line 43 in docs/integrations/data-ingestion/clickpipes/kafka/02_schema-registries.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Contractions

Suggestion: Use 'aren't' instead of 'are not'.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- For Protobuf schemas only, ClickPipes will load any imported schemas defined as dependencies. Avro schemas with external references are not yet supported.
- For Protobuf schemas only, ClickPipes will load any imported schemas defined as dependencies. Avro schemas with external references aren't yet supported.


## Schema mapping {#schema-mapping}

The following rules are applied to the mapping between the retrieved Avro schema and the ClickHouse destination table:
The following rules are applied to the mapping between the retrieved Avro or Protobuf schema and the ClickHouse destination table:

- If the Avro schema contains a field that is not included in the ClickHouse destination mapping, that field is ignored.
- If the Avro schema is missing a field defined in the ClickHouse destination mapping, the ClickHouse column will be populated with a "zero" value, such as 0 or an empty string. Note that DEFAULT expressions are not currently evaluated for ClickPipes inserts (this is temporary limitation pending updates to the ClickHouse server default processing).
- If the Avro schema field and the ClickHouse column are incompatible, inserts of that row/message will fail, and the failure will be recorded in the ClickPipes errors table. Note that several implicit conversions are supported (like between numeric types), but not all (for example, an Avro record field can not be inserted into an Int32 ClickHouse column).
- If the schema contains a field that is not included in the ClickHouse destination mapping, that field is ignored.

Check notice on line 49 in docs/integrations/data-ingestion/clickpipes/kafka/02_schema-registries.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Contractions

Suggestion: Use 'that's' instead of 'that is'.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- If the schema contains a field that is not included in the ClickHouse destination mapping, that field is ignored.
- If the schema contains a field that isn't included in the ClickHouse destination mapping, that field is ignored.

- If the schema is missing a field defined in the ClickHouse destination mapping, the ClickHouse column will be populated with a "zero" value, such as 0 or an empty string. Note that DEFAULT expressions are not currently evaluated for ClickPipes inserts (this is temporary limitation pending updates to the ClickHouse server default processing).
- If the schema field and the ClickHouse column are incompatible, inserts of that row/message will fail, and the failure will be recorded in the ClickPipes errors table. Note that several implicit conversions are supported (like between numeric types), but not all (for example, an Avro record field can not be inserted into an Int32 ClickHouse column).
49 changes: 35 additions & 14 deletions docs/integrations/data-ingestion/clickpipes/kafka/03_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
The supported formats are:
- [JSON](/integrations/data-formats/json/overview)
- [AvroConfluent](/interfaces/formats/AvroConfluent)
- [Protobuf](/interfaces/formats/Protobuf)

## Supported data types {#supported-data-types}

Expand All @@ -57,15 +58,34 @@
- UUID
- IPv4
- IPv6
- Time, Time64
- JSON
- all ClickHouse LowCardinality types
- Map with keys and values using any of the above types (including Nullables)
- Tuple and Array with elements using any of the above types (including Nullables, one level depth only)
- SimpleAggregateFunction types (for AggregatingMergeTree or SummingMergeTree destinations)

### Variant type support {#variant-type-support}
ClickPipes supports the Variant type in the following circumstances:
- Avro Unions. If your Avro schema contains a union with multiple non-null types, ClickPipes will infer the
appropriate variant type. Variant types are not otherwise supported for Avro data.
- JSON fields. You can manually specify a Variant type (such as `Variant(String, Int64, DateTime)`) for any JSON field
in the source data stream. Complex subtypes (arrays/maps/tuples) are not supported. In addition, because of the way ClickPipes determines
the correct variant subtype to use, only one integer or datetime type can be used in the Variant definition - for example, `Variant(Int64, UInt32)` is not supported.

### JSON type support {#json-type-support}
ClickPipes support the JSON type in the following circumstances:
- Avro Record and Protobuf Message fields can always be assigned to a JSON column.
- Avro String and Bytes fields can be assigned to a JSON column if the Avro field actually contains JSON String objects.
- Protobuf string and bytes Kinds can be assigned to a JSON column if the Protobuf field actually contains JSON String objects.
- JSON fields that are always a JSON object can be assigned to a JSON destination column.

Note that you will have to manually change the destination column to the desired JSON type, including any fixed or skipped paths.

### Avro {#avro}

#### Supported Avro Data Types {#supported-avro-data-types}
ClickPipes supports all Avro Primitive and Complex types, and all Avro Logical types except `time-millis`, `time-micros`, `local-timestamp-millis`, `local_timestamp-micros`, and `duration`. Avro `record` types are converted to Tuple, `array` types to Array, and `map` to Map (string keys only). In general the conversions listed [here](/interfaces/formats/Avro#data-type-mapping) are available. We recommend using exact type matching for Avro numeric types, as ClickPipes does not check for overflow or precision loss on type conversion.
ClickPipes supports all Avro Primitive and Complex types, and all Avro Logical types except `local-timestamp-millis` and `local_timestamp-micros`. Avro `record` types are converted to Tuple, `array` types to Array, and `map` to Map (string keys only). In general the conversions listed [here](/interfaces/schema-inference#avro) are available. We recommend using exact type matching for Avro numeric types, as ClickPipes does not check for overflow or precision loss on type conversion.

Check notice on line 88 in docs/integrations/data-ingestion/clickpipes/kafka/03_reference.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Contractions

Suggestion: Use 'doesn't' instead of 'does not'.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ClickPipes supports all Avro Primitive and Complex types, and all Avro Logical types except `local-timestamp-millis` and `local_timestamp-micros`. Avro `record` types are converted to Tuple, `array` types to Array, and `map` to Map (string keys only). In general the conversions listed [here](/interfaces/schema-inference#avro) are available. We recommend using exact type matching for Avro numeric types, as ClickPipes does not check for overflow or precision loss on type conversion.
ClickPipes supports all Avro Primitive and Complex types, and all Avro Logical types except `local-timestamp-millis` and `local_timestamp-micros`. Avro `record` types are converted to Tuple, `array` types to Array, and `map` to Map (string keys only). In general the conversions listed [here](/interfaces/schema-inference#avro) are available. We recommend using exact type matching for Avro numeric types, as ClickPipes doesn't check for overflow or precision loss on type conversion.

Alternatively, all Avro types can be inserted into a `String` column, and will be represented as a valid JSON string in that case.

#### Nullable types and Avro unions {#nullable-types-and-avro-unions}
Expand All @@ -75,21 +95,22 @@
- An empty Map for a null Avro Map
- A named Tuple with all default/zero values for a null Avro Record

#### Variant type support {#variant-type-support}
ClickPipes supports the Variant type in the following circumstances:
- Avro Unions. If your Avro schema contains a union with multiple non-null types, ClickPipes will infer the
appropriate variant type. Variant types are not otherwise supported for Avro data.
- JSON fields. You can manually specify a Variant type (such as `Variant(String, Int64, DateTime)`) for any JSON field
in the source data stream. Complex subtypes (arrays/maps/tuples) are not supported. In addition, because of the way ClickPipes determines
the correct variant subtype to use, only one integer or datetime type can be used in the Variant definition - for example, `Variant(Int64, UInt32)` is not supported.
### Protobuf {#protobuf}

#### JSON type support {#json-type-support}
ClickPipes support the JSON type in the following circumstances:
- Avro Record types can always be assigned to a JSON column.
- Avro String and Bytes types can be assigned to a JSON column if the column actually holds JSON String objects.
- JSON fields that are always a JSON object can be assigned to a JSON destination column.
#### Supported Protobuf Data Types {#supported-protobuf-data-types}

Check warning on line 100 in docs/integrations/data-ingestion/clickpipes/kafka/03_reference.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Headings

'Supported Protobuf Data Types {#supported-protobuf-data-types}' should use sentence-style capitalization.
ClickPipes supports all Protobuf version 2 and 3 types (except the long deprecated proto 2 `group` type). Basic conversions are identical to those used for the ClickHouse Protobuf format listed [here](/interfaces/schema-inference#protobuf).
We recommend exact type matching for Protobuf numeric types, as type conversion can result overflows or precision loss. Protobuf maps, arrays, and Nullable variations of basic types are also supported. ClickPipes also recognizes a
limited set of Google "well known types": Timestamp, Duration, and "wrapper" messages. Timestamps can be accurately mapped to DateTime or DateTime64 types, Durations to Time or Time64 types, and wrapper messages to the
underlying type. All Protobuf types can also be mapped to a ClickHouse `String` column and will be represented by a JSON string in that case.

Note that you will have to manually change the destination column to the desired JSON type, including any fixed or skipped paths.
#### Protobuf One-Ofs {#protobuf-one-ofs}
During schema inference, protobuf "One Of" special fields will normally be mapped to a named Tuple, where only one of the fields will have a "non-default" value. Alternatively, some "One Ofs" may be automatically mapped to a name variant field
with the name of the "One Of", and a value representing using one of the valid types of the constituent fields. Alternatively, each "One Of" constituent field can be manually mapped to a ClickHouse column, where only one of the constituent fields
will ever be populated during processing.

#### Message Lists (Envelopes) {#protobuf-message-lists}
If the top level Protobuf schema defined for the ClickPipe contains a single repeated field that is itself a protobuf Message, schema inference and column mapping will be based on the "contained" Message field. The Kafka message will be processed as a
list of such messages, and a single Kafka message will generate multiple ClickHouse rows.

## Kafka virtual columns {#kafka-virtual-columns}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
## Limitations {#limitations}

- [`DEFAULT`](/sql-reference/statements/create/table#default) is not supported.
- Individual messages are limited to 8MB (uncompressed) by default when running with the smallest (XS) replica size, and 16MB (uncompressed) with larger replicas. Messages that exceed this limit will be rejected with an error. If you have a need for larger messages, please contact support.
- Individual messages are limited to 2MB (uncompressed) by default when running with the smallest (XS) replica size, and 8MB (uncompressed) with larger replicas. Messages that exceed this limit will be rejected with an error. If you have a need for larger messages, please contact support.

Check notice on line 24 in docs/integrations/data-ingestion/clickpipes/kafka/04_best_practices.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Wordy

Suggestion: Use 'please' only if we've inconvenienced the user.

Check warning on line 24 in docs/integrations/data-ingestion/clickpipes/kafka/04_best_practices.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Units

Add a space between the number and the unit in '8MB'.

Check warning on line 24 in docs/integrations/data-ingestion/clickpipes/kafka/04_best_practices.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Units

Add a space between the number and the unit in '2MB'.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Individual messages are limited to 2MB (uncompressed) by default when running with the smallest (XS) replica size, and 8MB (uncompressed) with larger replicas. Messages that exceed this limit will be rejected with an error. If you have a need for larger messages, please contact support.
- Individual messages are limited to 2 MB (uncompressed) by default when running with the smallest (XS) replica size, and 8MB (uncompressed) with larger replicas. Messages that exceed this limit will be rejected with an error. If you have a need for larger messages, [contact support](https://clickhouse.com/support/program).


## Delivery semantics {#delivery-semantics}
ClickPipes for Kafka provides `at-least-once` delivery semantics (as one of the most commonly used approaches). We'd love to hear your feedback on delivery semantics [contact form](https://clickhouse.com/company/contact?loc=clickpipes). If you need exactly-once semantics, we recommend using our official [`clickhouse-kafka-connect`](https://clickhouse.com/blog/real-time-event-streaming-with-kafka-connect-confluent-cloud-clickhouse) sink.
Expand Down Expand Up @@ -119,7 +119,7 @@
ClickPipes inserts data into ClickHouse in batches. This is to avoid creating too many parts in the database which can lead to performance issues in the cluster.

Batches are inserted when one of the following criteria has been met:
- The batch size has reached the maximum size (100,000 rows or 32MB per 1GB of pod memory)
- The batch size has reached the maximum size (100,000 rows or 28MB per 1GB of pod memory)

Check warning on line 122 in docs/integrations/data-ingestion/clickpipes/kafka/04_best_practices.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Units

Add a space between the number and the unit in '1GB'.

Check warning on line 122 in docs/integrations/data-ingestion/clickpipes/kafka/04_best_practices.md

View workflow job for this annotation

GitHub Actions / vale

ClickHouse.Units

Add a space between the number and the unit in '28MB'.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- The batch size has reached the maximum size (100,000 rows or 28MB per 1GB of pod memory)
- The batch size has reached the maximum size (100,000 rows or 28 MB per 1 GB of pod memory)

- The batch has been open for a maximum amount of time (5 seconds)

### Latency {#latency}
Expand Down