|
| 1 | +--- |
| 2 | +sidebar_position: 6 |
| 3 | +title: "Consumer" |
| 4 | +--- |
| 5 | + |
| 6 | +# Consumer |
| 7 | + |
| 8 | +The **Fluvio Consumer** is the component that reads and processes records from topics within the Fluvio streaming platform. It is designed to be flexible and robust, enabling applications to ingest data in real time or in batch mode with fine-grained control over partition selection, output formatting, and offset management. |
| 9 | + |
| 10 | +--- |
| 11 | + |
| 12 | +## Capabilities |
| 13 | + |
| 14 | +- **Record Consumption**: |
| 15 | + The Consumer retrieves records from a specified topic and partition. By default, it reads from partition `0` but can be directed to any partition or even all partitions simultaneously. |
| 16 | + |
| 17 | +- **Flexible Reading Modes**: |
| 18 | + - **From the Beginning or Latest**: Consumers can start reading at the beginning of the log (using flags such as `--from-beginning` or `-B`) or from the current offset. |
| 19 | + - **Batch vs. Continuous Consumption**: The `--disable-continuous` (`-d`) flag allows the consumer to exit after reading all available records, making it ideal for batch processing. |
| 20 | + |
| 21 | +- **SmartModule Integration**: |
| 22 | + Similar to producers, consumers can integrate [SmartModules](https://github.com/infinyon/fluvio/tree/master/smartmodule/examples/filter_json) (WASM modules) to perform inline transformations or filtering on records before they are delivered to the application. |
| 23 | + |
| 24 | +- **Custom Output Formatting**: |
| 25 | + With the `--format` flag, consumers can tailor the output display by embedding placeholders (e.g., `{{key}}`, `{{value}}`, `{{partition}}`, `{{offset}}`, and `{{time}}`) in a format string to suit their logging or processing needs. |
| 26 | + |
| 27 | +- **Offset Management**: |
| 28 | + Consumers maintain a persistent pointer for each topic partition. This offset ensures that upon restart or recovery, the consumer can resume reading from the correct position in the log. |
| 29 | + |
| 30 | +--- |
| 31 | + |
| 32 | +## Partition Consumption Strategies |
| 33 | + |
| 34 | +### Single Partition Consumption |
| 35 | + |
| 36 | +By default, when you run a command like: |
| 37 | + |
| 38 | +```bash |
| 39 | +$ fluvio consume my-topic -B -d |
| 40 | +``` |
| 41 | + |
| 42 | +the consumer reads from partition `0`. This is suitable for topics where records are funneled into a single partition or when targeted consumption is required. |
| 43 | + |
| 44 | +### Specifying a Partition |
| 45 | + |
| 46 | +To target a specific partition, use the `--partition` (`-p`) flag: |
| 47 | + |
| 48 | +```bash |
| 49 | +$ fluvio consume my-topic -B --partition 1 |
| 50 | +``` |
| 51 | + |
| 52 | +This command directs the consumer to read from partition `1`. |
| 53 | + |
| 54 | +### Consuming from All Partitions |
| 55 | + |
| 56 | +For cases where you want to aggregate records from every partition, the `-A` flag allows the consumer to consume records across all partitions: |
| 57 | + |
| 58 | +```bash |
| 59 | +$ fluvio consume my-topic -B -A |
| 60 | +``` |
| 61 | + |
| 62 | +> **Note:** When consuming from multiple partitions, there is no guarantee of record ordering between partitions. |
| 63 | +
|
| 64 | +--- |
| 65 | + |
| 66 | +## SmartModule Integration for Consumers |
| 67 | + |
| 68 | +Fluvio SmartModules provide an additional layer of processing by allowing you to filter or transform records in-flight. For example, you might use a SmartModule to filter out records that do not meet certain criteria: |
| 69 | + |
| 70 | +```bash |
| 71 | +$ fluvio consume my-topic -B --smartmodule-path="fluvio_wasm_filter.wasm" |
| 72 | +``` |
| 73 | + |
| 74 | +Alternatively, after registering a SmartModule with the cluster: |
| 75 | + |
| 76 | +```bash |
| 77 | +$ fluvio smartmodule create --wasm-file="fluvio_wasm_filter.wasm" my_filter |
| 78 | +``` |
| 79 | + |
| 80 | +you can apply it by name: |
| 81 | + |
| 82 | +```bash |
| 83 | +$ fluvio consume my-topic -B --smartmodule="my_filter" |
| 84 | +``` |
| 85 | + |
| 86 | +This integration allows for powerful, on-the-fly processing without altering the core consumer logic. |
| 87 | + |
| 88 | +--- |
| 89 | + |
| 90 | +## Custom Output Formatting |
| 91 | + |
| 92 | +The consumer’s default output prints only the record values. For richer debugging or logging, the `--key-value` flag displays both keys and values: |
| 93 | + |
| 94 | +```bash |
| 95 | +$ fluvio consume my-topic -B --key-value |
| 96 | +[null] This is my first record ever |
| 97 | +[alice] Alice In Wonderland |
| 98 | +``` |
| 99 | + |
| 100 | +Moreover, you can define a custom format using the `--format` option. For instance, to print records as CSV rows: |
| 101 | + |
| 102 | +```bash |
| 103 | +$ fluvio consume my-topic -B --format="{{time}},{{partition}},{{offset}},{{key}},{{value}}" |
| 104 | +2022-05-04T15:35:49.244Z,0,0,null,This is my first record ever |
| 105 | +2022-05-04T15:35:49.244Z,0,1,null,This is my second record ever |
| 106 | +2022-05-04T15:52:19.963Z,0,2,alice,Alice In Wonderland |
| 107 | +``` |
| 108 | + |
| 109 | +This flexibility in output formatting empowers users to integrate Fluvio into various monitoring, debugging, and processing workflows. |
| 110 | + |
| 111 | +--- |
| 112 | + |
| 113 | +## Consumer Offsets and Commit Strategies |
| 114 | + |
| 115 | +A key aspect of any streaming system is the management of **consumer offsets**. In Fluvio, these offsets serve as durable bookmarks that track which records have been processed: |
| 116 | + |
| 117 | +- **Persistence and Durability**: |
| 118 | + Offsets are stored persistently per topic partition. They survive cluster restarts and upgrades, ensuring that consumers can reliably resume consumption. |
| 119 | + |
| 120 | +- **Commit Strategies**: |
| 121 | + Fluvio supports two primary offset management strategies: |
| 122 | + |
| 123 | + **Manual Offset Management**: |
| 124 | + The application explicitly commits offsets after processing records. This provides full control over when an offset is considered “committed.” |
| 125 | + |
| 126 | + _Example in Rust:_ |
| 127 | + ```rust |
| 128 | + use fluvio::{ |
| 129 | + consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy}, |
| 130 | + Fluvio, Offset, |
| 131 | + }; |
| 132 | + use futures_util::StreamExt; |
| 133 | + |
| 134 | + async fn manual_consume(fluvio: &Fluvio) -> anyhow::Result<()> { |
| 135 | + let mut stream = fluvio |
| 136 | + .consumer_with_config( |
| 137 | + ConsumerConfigExtBuilder::default() |
| 138 | + .topic("my-topic".to_string()) |
| 139 | + .offset_consumer("my-consumer".to_string()) |
| 140 | + .offset_start(Offset::beginning()) |
| 141 | + .offset_strategy(OffsetManagementStrategy::Manual) |
| 142 | + .build()?, |
| 143 | + ) |
| 144 | + .await?; |
| 145 | + |
| 146 | + while let Some(Ok(record)) = stream.next().await { |
| 147 | + println!("{}", String::from_utf8_lossy(record.as_ref())); |
| 148 | + stream.offset_commit()?; |
| 149 | + stream.offset_flush().await?; |
| 150 | + } |
| 151 | + Ok(()) |
| 152 | + } |
| 153 | + ``` |
| 154 | + |
| 155 | + 2. **Auto Offset Management**: |
| 156 | + Offsets are committed automatically as records are consumed, reducing the burden on the developer. |
| 157 | + |
| 158 | + _Example in Rust:_ |
| 159 | + ```rust |
| 160 | + use fluvio::{ |
| 161 | + consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy}, |
| 162 | + Fluvio, Offset, |
| 163 | + }; |
| 164 | + use futures_util::StreamExt; |
| 165 | + |
| 166 | + async fn auto_consume(fluvio: &Fluvio) -> anyhow::Result<()> { |
| 167 | + let mut stream = fluvio |
| 168 | + .consumer_with_config( |
| 169 | + ConsumerConfigExtBuilder::default() |
| 170 | + .topic("my-topic".to_string()) |
| 171 | + .offset_consumer("my-consumer".to_string()) |
| 172 | + .offset_start(Offset::beginning()) |
| 173 | + .offset_strategy(OffsetManagementStrategy::Auto) |
| 174 | + .build()?, |
| 175 | + ) |
| 176 | + .await?; |
| 177 | + |
| 178 | + while let Some(Ok(record)) = stream.next().await { |
| 179 | + println!("{}", String::from_utf8_lossy(record.as_ref())); |
| 180 | + } |
| 181 | + Ok(()) |
| 182 | + } |
| 183 | + ``` |
| 184 | + |
| 185 | +- **Consumer Identity and Listing**: |
| 186 | + Each consumer is identified by a unique name (specified via the CLI with `-c` or in code). You can list and manage consumers using the Fluvio CLI: |
| 187 | + |
| 188 | + ```bash |
| 189 | + $ fluvio consumer list |
| 190 | + CONSUMER TOPIC PARTITION OFFSET LAST SEEN |
| 191 | + c1 hello-topic 0 1 4m 14s |
| 192 | + ``` |
| 193 | + |
| 194 | + Consumers can also be deleted as needed: |
| 195 | + |
| 196 | + ```bash |
| 197 | + $ fluvio consumer delete c1 |
| 198 | + ``` |
| 199 | + |
| 200 | +This robust offset management system ensures that applications can reliably process streams of data without loss or duplication, even in the face of failures. |
| 201 | + |
| 202 | +--- |
| 203 | + |
| 204 | +## Summary |
| 205 | + |
| 206 | +The **Fluvio Consumer** abstracts the complexities of reading and processing streaming data with features that include: |
| 207 | + |
| 208 | +- **Flexible Partition Consumption**: |
| 209 | + Read from single or multiple partitions with configurable starting offsets. |
| 210 | + |
| 211 | +- **Dynamic Data Transformation**: |
| 212 | + Integrate SmartModules for real-time filtering or transformation of records. |
| 213 | + |
| 214 | +- **Customizable Output**: |
| 215 | + Format output to suit application needs using the `--key-value` and `--format` options. |
| 216 | + |
| 217 | +- **Robust Offset Management**: |
| 218 | + Choose between manual and automatic strategies to maintain accurate, persistent consumption state. |
| 219 | + |
| 220 | +By leveraging these capabilities, the Fluvio Consumer provides a powerful, scalable way to build reactive applications that respond to real-time data streams. |
| 221 | + |
| 222 | +For further details and practical examples, please [check out the consumer in the CLI reference]. |
| 223 | + |
| 224 | + |
| 225 | +[check out the consumer in the CLI reference]: fluvio/cli/fluvio/consume.mdx |
0 commit comments