diff --git a/docs/cloud/quickstart.mdx b/docs/cloud/quickstart.mdx index c0fba1ed..1769fcca 100644 --- a/docs/cloud/quickstart.mdx +++ b/docs/cloud/quickstart.mdx @@ -362,8 +362,8 @@ Now you're familiar with using InfinyOn Cloud with the [Cloud CLI], Check out ou [SmartModule Jolt]: hub/smartmodules/jolt.mdx [SmartModule Developer Kit (smdk)]: smartmodules/smdk.mdx [about Topics in the Fluvio docs]: fluvio/concepts/topics.mdx -[about Producers in the Fluvio docs]: fluvio/concepts/producers.mdx -[about Consumers in the Fluvio docs]: fluvio/concepts/consumers.mdx +[about Producers in the Fluvio docs]: fluvio/concepts/producer.mdx +[about Consumers in the Fluvio docs]: fluvio/concepts/consumer.mdx [a growing number of connectors]: hub/overview.mdx#connectors [a random quote]: https://demo-data.infinyon.com/api/quote [about Connectors in the Fluvio docs]: connectors/overview.mdx diff --git a/docs/fluvio/concepts/consumer.mdx b/docs/fluvio/concepts/consumer.mdx new file mode 100644 index 00000000..e2aa7794 --- /dev/null +++ b/docs/fluvio/concepts/consumer.mdx @@ -0,0 +1,225 @@ +--- +sidebar_position: 6 +title: "Consumer" +--- + +# Consumer + +The **Fluvio Consumer** is the component that reads and processes records from topics within the Fluvio streaming platform. It is designed to be flexible and robust, enabling applications to ingest data in real time or in batch mode with fine-grained control over partition selection, output formatting, and offset management. + +--- + +## Capabilities + +- **Record Consumption**: + The Consumer retrieves records from a specified topic and partition. By default, it reads from partition `0` but can be directed to any partition or even all partitions simultaneously. + +- **Flexible Reading Modes**: + - **From the Beginning or Latest**: Consumers can start reading at the beginning of the log (using flags such as `--from-beginning` or `-B`) or from the current offset. + - **Batch vs. Continuous Consumption**: The `--disable-continuous` (`-d`) flag allows the consumer to exit after reading all available records, making it ideal for batch processing. + +- **SmartModule Integration**: + Similar to producers, consumers can integrate [SmartModules](https://github.com/infinyon/fluvio/tree/master/smartmodule/examples/filter_json) (WASM modules) to perform inline transformations or filtering on records before they are delivered to the application. + +- **Custom Output Formatting**: + With the `--format` flag, consumers can tailor the output display by embedding placeholders (e.g., `{{key}}`, `{{value}}`, `{{partition}}`, `{{offset}}`, and `{{time}}`) in a format string to suit their logging or processing needs. + +- **Offset Management**: + Consumers maintain a persistent pointer for each topic partition. This offset ensures that upon restart or recovery, the consumer can resume reading from the correct position in the log. + +--- + +## Partition Consumption Strategies + +### Single Partition Consumption + +By default, when you run a command like: + +```bash +$ fluvio consume my-topic -B -d +``` + +the consumer reads from partition `0`. This is suitable for topics where records are funneled into a single partition or when targeted consumption is required. + +### Specifying a Partition + +To target a specific partition, use the `--partition` (`-p`) flag: + +```bash +$ fluvio consume my-topic -B --partition 1 +``` + +This command directs the consumer to read from partition `1`. + +### Consuming from All Partitions + +For cases where you want to aggregate records from every partition, the `-A` flag allows the consumer to consume records across all partitions: + +```bash +$ fluvio consume my-topic -B -A +``` + +> **Note:** When consuming from multiple partitions, there is no guarantee of record ordering between partitions. + +--- + +## SmartModule Integration for Consumers + +Fluvio SmartModules provide an additional layer of processing by allowing you to filter or transform records in-flight. For example, you might use a SmartModule to filter out records that do not meet certain criteria: + +```bash +$ fluvio consume my-topic -B --smartmodule-path="fluvio_wasm_filter.wasm" +``` + +Alternatively, after registering a SmartModule with the cluster: + +```bash +$ fluvio smartmodule create --wasm-file="fluvio_wasm_filter.wasm" my_filter +``` + +you can apply it by name: + +```bash +$ fluvio consume my-topic -B --smartmodule="my_filter" +``` + +This integration allows for powerful, on-the-fly processing without altering the core consumer logic. + +--- + +## Custom Output Formatting + +The consumer’s default output prints only the record values. For richer debugging or logging, the `--key-value` flag displays both keys and values: + +```bash +$ fluvio consume my-topic -B --key-value +[null] This is my first record ever +[alice] Alice In Wonderland +``` + +Moreover, you can define a custom format using the `--format` option. For instance, to print records as CSV rows: + +```bash +$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}" +2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever +2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever +2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland +``` + +This flexibility in output formatting empowers users to integrate Fluvio into various monitoring, debugging, and processing workflows. + +--- + +## Consumer Offsets and Commit Strategies + +A key aspect of any streaming system is the management of **consumer offsets**. In Fluvio, these offsets serve as durable bookmarks that track which records have been processed: + +- **Persistence and Durability**: + Offsets are stored persistently per topic partition. They survive cluster restarts and upgrades, ensuring that consumers can reliably resume consumption. + +- **Commit Strategies**: + Fluvio supports two primary offset management strategies: + + **Manual Offset Management**: + The application explicitly commits offsets after processing records. This provides full control over when an offset is considered “committed.” + + _Example in Rust:_ + ```rust + use fluvio::{ + consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy}, + Fluvio, Offset, + }; + use futures_util::StreamExt; + + async fn manual_consume(fluvio: &Fluvio) -> anyhow::Result<()> { + let mut stream = fluvio + .consumer_with_config( + ConsumerConfigExtBuilder::default() + .topic("my-topic".to_string()) + .offset_consumer("my-consumer".to_string()) + .offset_start(Offset::beginning()) + .offset_strategy(OffsetManagementStrategy::Manual) + .build()?, + ) + .await?; + + while let Some(Ok(record)) = stream.next().await { + println!("{}", String::from_utf8_lossy(record.as_ref())); + stream.offset_commit()?; + stream.offset_flush().await?; + } + Ok(()) + } + ``` + + 2. **Auto Offset Management**: + Offsets are committed automatically as records are consumed, reducing the burden on the developer. + + _Example in Rust:_ + ```rust + use fluvio::{ + consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy}, + Fluvio, Offset, + }; + use futures_util::StreamExt; + + async fn auto_consume(fluvio: &Fluvio) -> anyhow::Result<()> { + let mut stream = fluvio + .consumer_with_config( + ConsumerConfigExtBuilder::default() + .topic("my-topic".to_string()) + .offset_consumer("my-consumer".to_string()) + .offset_start(Offset::beginning()) + .offset_strategy(OffsetManagementStrategy::Auto) + .build()?, + ) + .await?; + + while let Some(Ok(record)) = stream.next().await { + println!("{}", String::from_utf8_lossy(record.as_ref())); + } + Ok(()) + } + ``` + +- **Consumer Identity and Listing**: + Each consumer is identified by a unique name (specified via the CLI with `-c` or in code). You can list and manage consumers using the Fluvio CLI: + + ```bash + $ fluvio consumer list + CONSUMER TOPIC PARTITION OFFSET LAST SEEN + c1 hello-topic 0 1 4m 14s + ``` + + Consumers can also be deleted as needed: + + ```bash + $ fluvio consumer delete c1 + ``` + +This robust offset management system ensures that applications can reliably process streams of data without loss or duplication, even in the face of failures. + +--- + +## Summary + +The **Fluvio Consumer** abstracts the complexities of reading and processing streaming data with features that include: + +- **Flexible Partition Consumption**: + Read from single or multiple partitions with configurable starting offsets. + +- **Dynamic Data Transformation**: + Integrate SmartModules for real-time filtering or transformation of records. + +- **Customizable Output**: + Format output to suit application needs using the `--key-value` and `--format` options. + +- **Robust Offset Management**: + Choose between manual and automatic strategies to maintain accurate, persistent consumption state. + +By leveraging these capabilities, the Fluvio Consumer provides a powerful, scalable way to build reactive applications that respond to real-time data streams. + +For further details and practical examples, please [check out the consumer in the CLI reference]. + + +[check out the consumer in the CLI reference]: fluvio/cli/fluvio/consume.mdx \ No newline at end of file diff --git a/docs/fluvio/concepts/consumers.mdx b/docs/fluvio/concepts/consumers.mdx deleted file mode 100644 index 7b993db0..00000000 --- a/docs/fluvio/concepts/consumers.mdx +++ /dev/null @@ -1,192 +0,0 @@ ---- -sidebar_position: 6 -title: "Consumers" ---- - -# Consumers -Consumers are applications that "consume" records from a particular -topic and partition[1]. Typically, a consumer will perform actions based -on the events it receives, such as sending a notification or updating -a database. There may be many consumers reading data from a particular -partition at any given time: since the records are persisted, they do -not expire after being consumed. - --> [1] For an overview of Topics and Partitions, see the [Topic documentation] - -### Consumer Offsets -**Consumer Offset** is the [offset] value assigned to the specified consumer name. Upon consuming more records from the topic a consumer updates the offset by performing periodic commit/flush operations. Later, this offset can be retrieved from the **Fluvio** cluster to continue reading from the last record. - -**Consumer Offsets** are persistent and durable, surviving Fluvio cluster restarts and upgrades. Essentially, Fluvio provides the same storage guarantees for Consumer Offsets as it does for the data stored in topics. - -The offset value is maintained separately for each topic partition. Once created, it remains stored until explicitly deleted. -Users can delete the offset either programmatically through their application code or via the Fluvio CLI. - -#### Commit Strategy -Fluvio offers different approaches to offset management, allowing users to choose the one that best fits their use case and requirements. There are **manual** and **auto** offset management strategies: -1. **Manual Strategy** -In this strategy, offsets are managed manually by the user. This means that the user is responsible for committing offsets explicitly when needed. No automatic commits or flushes occur, and all offset management operations must be initiated by the user. -2. **Auto Strategy** -In contrast to the manual strategy, the auto strategy involves automatic management of offsets by the system. When using this strategy, committing offsets is triggered implicitly when reading the next record, effectively committing the offset for the previous record. Additionally, periodic flushes occur at intervals defined in the configuration. - -#### Configuration -* `offset_consumer: String` - the consumer name to uniquely identify the consumer offset. -* `offset_start: Offset` - the default offset value will be used if the consumer offset does not exist. -* `offset_strategy: OffsetManagementStrategy` - defines whether offsets are committed/flushed automatically, manually, or not. -* `offset_flush: Duration` - the period for auto flushes(only relevant for auto offset strategy). - -### Examples -#### Fluvio CLI -Imagine we have an empty topic `hello-topic`. Let's produce some records: - - -```bash -$ echo "One" | fluvio produce hello-topic -$ echo "Two" | fluvio produce hello-topic -``` -Ok, now let's read the topic from the beginning on behalf of consumer `c1`: - -```bash copy="fl" -$ fluvio consume hello-topic -c c1 -Bd -Consuming records from 'hello-topic' starting from the beginning of log -One -Two -``` -From now we can see our `c1` consumer in the consumers list: - -```bash copy="fl" -$ fluvio consumer list - CONSUMER TOPIC PARTITION OFFSET LAST SEEN - c1 hello-topic 0 1 4m 14s -``` -The offset here denotes the last seen offset for the given consumer in the given partition. - -Let's add another record to the topic: - -```bash copy="fl" -$ echo "Three" | fluvio produce hello-topic -``` -and read the topic again from the beginning for the same consumer: - -```bash copy="fl" -$ fluvio consume hello-topic -c c1 -Bd -Consuming records from 'hello-topic' starting from the beginning of log -Three -``` -we get only one "unseen" record which is correct. -Now if you try another consumer `c2`, you get all the records: - -```bash copy="fl" -$ fluvio consume hello-topic -c c2 -Bd -Consuming records from 'hello-topic' starting from the beginning of log -One -Two -Three -``` - -The consumer list now shows us two consumers: - -```bash copy="fl" -$ fluvio consumer list - CONSUMER TOPIC PARTITION OFFSET LAST SEEN - c1 hello-topic 0 2 3m 51s - c2 hello-topic 0 2 2m 21s -``` - -We can delete them now: - -```bash copy="fl" -$ fluvio consumer delete c1 -consumer "c1" on topic "hello-topic" and partition "0" deleted -$ fluvio consumer delete c2 -consumer "c2" on topic "hello-topic" and partition "0" deleted -``` - -#### Manual offset management - -This is an example of programmatic consumers with the manual offset management strategy: -```rust -use fluvio::{ - consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy}, - Fluvio, Offset, -}; -use futures_util::StreamExt; -async fn do_consume_with_manual_commits(fluvio: &Fluvio) -> anyhow::Result<()> { - let mut stream = fluvio - .consumer_with_config( - ConsumerConfigExtBuilder::default() - .topic("my-topic".to_string()) - .offset_consumer("my-consumer".to_string()) - .offset_start(Offset::beginning()) - .offset_strategy(OffsetManagementStrategy::Manual) - .build()?, - ) - .await?; - while let Some(Ok(record)) = stream.next().await { - println!("{}", String::from_utf8_lossy(record.as_ref())); - stream.offset_commit()?; - stream.offset_flush().await?; - } - Ok(()) -} -``` - -#### Auto offset management - -This is an example of programmatic consumers with the auto offset management strategy. As you can see there are no commits and flushes in the code, everything happens under the hood automatically. - -```rust -use fluvio::{ - consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy}, - Fluvio, Offset, -}; -use futures_util::StreamExt; -async fn do_consume_with_auto_commits(fluvio: &Fluvio) -> anyhow::Result<()> { - let mut stream = fluvio - .consumer_with_config( - ConsumerConfigExtBuilder::default() - .topic("my-topic".to_string()) - .offset_consumer("my-consumer".to_string()) - .offset_start(Offset::beginning()) - .offset_strategy(OffsetManagementStrategy::Auto) - .build()?, - ) - .await?; - while let Some(Ok(record)) = stream.next().await { - println!("{}", String::from_utf8_lossy(record.as_ref())); - } - Ok(()) -} -``` -#### Using with connectors - -The following example demonstrates how to enable **Consumer Offsets** on [Fluvio Http Sink connector]. The same configuration applies to all official Fluvio `sink` connectors. -```yaml -apiVersion: 0.2.0 -meta: - version: 0.2.7 - name: my-http-sink - type: http-sink - topic: - meta: - name: http-sink-topic - consumer: - id: my-http-sink - offset: - strategy: auto - start: beginning - flush-period: - secs: 2 - nanos: 0 -http: - endpoint: "http://127.0.0.1/post" -``` - -In this setup, the Consumer initially attempts to retrieve the offset value for the identifier `my-http-sink` upon starting. If this value not found, it will commence from the beginning. Offset flushes occur automatically every 2 seconds. -More information about [connectors configuration]. - -[offset]: fluvio/concepts/offsets.mdx -[Fluvio Http Sink connector]: https://github.com/infinyon/http-sink-connector -[connectors configuration]: connectors/configuration.mdx -[the Fluvio CLI]: fluvio/cli/overview.mdx -[one of the programmatic APIs]: fluvio/apis/overview.mdx -[Topic documentation]: fluvio/concepts/topics.mdx \ No newline at end of file diff --git a/docs/fluvio/concepts/producer.mdx b/docs/fluvio/concepts/producer.mdx new file mode 100644 index 00000000..b55f740c --- /dev/null +++ b/docs/fluvio/concepts/producer.mdx @@ -0,0 +1,111 @@ +--- +sidebar_position: 5 +title: "Producer" +--- + +# Producer + +The **Fluvio Producer** is a core component of the Fluvio ecosystem, designed to efficiently ingest records into a distributed streaming platform. It abstracts the complexities of record transmission, partition management, data transformation, and compression behind a simple command-line interface (CLI). This document outlines the key concepts and strategies that underpin the Fluvio Producer. + +--- + +## Capabilities + +- **Record Ingestion**: + The Producer accepts records from various input sources—such as standard input (stdin) or files—and sends them to a specified topic. This flexibility supports both interactive and batch data ingestion workflows. + +- **Destination Topics**: + Records are directed to user-defined topics, which are subdivided into partitions. Each partition has a leader that handles the write operations, ensuring scalability and ordered processing. + +- **Dynamic Data Transformation**: + Fluvio Producer integrates with [SmartModules](https://github.com/infinyon/fluvio/tree/fdcfce51067a44c06a91bc8e4aab518f0a193145/smartmodule/examples/map) (WASM modules) that can transform records on the fly. These transformations occur after the records are produced but before they are committed, enabling use cases like data capitalization, filtering, or enrichment. + +- **Compression Support**: + To optimize network throughput and storage efficiency, the Producer supports various compression algorithms (e.g., GZIP). Compressed records reduce disk usage and bandwidth, though they incur additional CPU overhead for (de)compression. + +--- + +## Record Production & Partitioning Strategies + +A central function of the Producer is to distribute records among multiple partitions. This distribution follows two primary strategies based on how the records are formatted: + +### 1. Producing Key/Value Records + +- **Key Extraction**: + When records are provided in a key/value format (e.g., using a delimiter such as `:`), the Producer separates the key from the value. + +- **Hash-based Partitioning**: + The extracted key is hashed to determine the target partition. This strategy ensures that **all records with the same key** are consistently sent to the same partition. + + **Example**: + When sending: + ```bash + $ fluvio produce my-topic --key-separator=":" + > alice:Hello, World! + > batman:Goodbye! + ``` + The hash of `"alice"` and `"batman"` will decide their respective partitions, maintaining record order for each key. + +### 2. Producing Non Key/Value Records + +- **Round-Robin Partitioning**: + When records are not parsed as key/value pairs (i.e., no key is specified), the Producer employs a round-robin algorithm. + + **Key Characteristics**: + - **Even Load Distribution**: Records are cyclically assigned across all partitions, ensuring a balanced load. + - **Simplicity**: This method avoids the computational overhead of hashing when key-based routing is unnecessary. + + **Example**: + When producing simple text records without a key separator: + ```bash + $ fluvio produce my-topic -f records.txt + ``` + Each record will be sent to the next partition in sequence, distributing records evenly across the topic. + +--- + +## Integration with SmartModules + +Fluvio Producer supports the integration of SmartModules—WASM-based modules that allow real-time data transformation during production. + +- **How It Works**: + A SmartModule can be applied to the producer session either by: + - Directly referencing a compiled WASM file. + - Using a pre-registered module stored in the Fluvio cluster. + +- **Benefits**: + SmartModules enable on-the-fly modifications, such as transforming text (e.g., converting to uppercase), filtering unwanted records, or enriching the data with additional context—all without altering the original producer logic. + +--- + +## Compression for Enhanced Efficiency + +To balance throughput and resource utilization, the Producer offers support for data compression: + +- **Compression Algorithms**: + Algorithms like GZIP are available to compress records before they are transmitted to the streaming cluster. + +- **Operational Impact**: + - **Reduced Disk Usage**: Compressed records consume less storage on the brokers. + - **Network Efficiency**: Lower data sizes result in improved network performance. + - **Transparent Consumption**: Consumers retrieve and decompress the records without needing additional configuration. + +--- + +## Summary + +The **Fluvio Producer** is engineered to simplify the process of sending data into a distributed streaming system. Its design encompasses several abstract yet essential concepts: + +- **Flexible Record Ingestion**: Supports both interactive and batch processing. +- **Intelligent Partitioning**: + - **Hash-based** for key/value records, ensuring consistency and ordered processing. + - **Round-robin** for non key/value records, promoting even load distribution. +- **Dynamic Data Transformation**: Through SmartModule integration. +- **Optimized Data Handling**: Via built-in compression options. + +By abstracting these complexities, the Fluvio Producer allows users to focus on building robust data pipelines without worrying about the underlying partitioning and transformation details. + +For further details and practical examples, please [check out the producer in the CLI reference]. + + +[check out the producer in the CLI reference]: fluvio/cli/fluvio/produce.mdx diff --git a/docs/fluvio/concepts/producers.mdx b/docs/fluvio/concepts/producers.mdx deleted file mode 100644 index 73cb7d2f..00000000 --- a/docs/fluvio/concepts/producers.mdx +++ /dev/null @@ -1,21 +0,0 @@ ---- -sidebar_position: 5 -title: "Producers and Consumers" ---- - -## Producer - -Producers are applications that "produce" streaming data. -These applications may be monitoring the status of a system, collecting sensor -data, watching traffic to a website, or otherwise observing events in some way. -Producers may be general-purpose tools such as [the Fluvio CLI], or they may be -special-purpose programs built to meet a specific need, in which case the -producer program would leverage [one of the programmatic APIs]. - -Producer performance in transmitting data can be optimized by adjusting -batching parameters. See the [Batching page] for more details. - -[Batching page]: fluvio/concepts/batching.mdx -[the Fluvio CLI]: fluvio/cli/overview.mdx -[one of the programmatic APIs]: fluvio/apis/overview.mdx -[Topic documentation]: fluvio/concepts/topics.mdx \ No newline at end of file diff --git a/docs/fluvio/config-defaults.mdx b/docs/fluvio/config-defaults.mdx index 004f9639..26069f2f 100644 --- a/docs/fluvio/config-defaults.mdx +++ b/docs/fluvio/config-defaults.mdx @@ -76,6 +76,6 @@ This document outlines the default configurations for a Fluvio Cluster. For information on changing the defaults see the advanced [Overriding page] -[Consumers]: fluvio/concepts/consumers.mdx +[Consumers]: fluvio/concepts/consumer.mdx [Batching page]: fluvio/concepts/batching.mdx [Overriding page]: fluvio/concepts/advanced/config-overriding.mdx diff --git a/docs/fluvio/installation/kubernetes.mdx b/docs/fluvio/installation/kubernetes.mdx index f79c1290..25a1a5fa 100644 --- a/docs/fluvio/installation/kubernetes.mdx +++ b/docs/fluvio/installation/kubernetes.mdx @@ -104,6 +104,6 @@ Way to go! You're well on your way to writing real-time distributed apps with Fl If you run into any problems along the way, make sure to check out our [troubleshooting] page to find a fix. [topic]: fluvio/concepts/architecture/sc.mdx#topics -[produce]: fluvio/concepts/producers.mdx -[consume]: fluvio/concepts/consumers.mdx +[produce]: fluvio/concepts/producer.mdx +[consume]: fluvio/concepts/consumer.mdx [troubleshooting]: fluvio/troubleshooting.mdx