Skip to content

Commit c155d00

Browse files
committed
in_amqp: Add documentation for in_amqp plugin
Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com>
1 parent 4a18664 commit c155d00

2 files changed

Lines changed: 144 additions & 0 deletions

File tree

SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
## Data pipeline
7777

7878
* [Inputs](pipeline/inputs.md)
79+
* [AMQP](pipeline/inputs/amqp.md)
7980
* [Blob](pipeline/inputs/blob.md)
8081
* [Collectd](pipeline/inputs/collectd.md)
8182
* [CPU metrics](pipeline/inputs/cpu-metrics.md)

pipeline/inputs/amqp.md

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# AMQP (Advanced Message Queuing Protocol)
2+
3+
The _AMQP_ (Advanced Message Queuing Protocol) input plugin allows Fluent Bit to consume messages from an AMQP broker such as RabbitMQ. It connects to the specified broker, consumes messages from a queue, and processes them as log records.
4+
5+
## Configuration parameters
6+
7+
The plugin supports the following configuration parameters:
8+
9+
| Key | Description | Default |
10+
|:---|:---|:---|
11+
| `uri` | Specify an AMQP URI to connect to the broker | `amqp://` |
12+
| `queue` | Specify an AMQP queue name to consume from | _none_ (required) |
13+
| `parser` | Specify a parser to process the message payload | _none_ |
14+
| `reconnect.retry_limits` | Maximum number of retries to connect to the broker | `5` |
15+
| `reconnect.retry_interval` | Retry interval (in seconds) to connect to the broker | `60` |
16+
| `thread.ring_buffer.capacity` | Set the capacity of the ring buffer | _default_ |
17+
| `thread.ring_buffer.window` | Set the window size of the ring buffer | _default_ |
18+
| `thread.ring_buffer.retry_limit` | Set the retry limit for the ring buffer | _default_ |
19+
20+
## How it works
21+
22+
The AMQP input plugin connects to an AMQP broker and consumes messages from a specified queue. Each message is processed and converted into a Fluent Bit log record with the following characteristics:
23+
24+
1. The message body becomes the main content of the record
25+
2. AMQP message properties (like content type, routing key, and so on) are added as metadata
26+
3. AMQP message headers are added as a nested metadata field
27+
4. If a parser is specified, it's applied to the message body
28+
29+
### Message Properties Mapping
30+
31+
The following AMQP message properties are mapped to Fluent Bit record metadata:
32+
33+
- `routing_key` - The routing key used to route the message to the queue
34+
- `content_type` - The MIME content type of the message
35+
- `content_encoding` - The content encoding of the message
36+
- `correlation_id` - Application correlation identifier
37+
- `reply_to` - Address to reply to
38+
39+
### Message Headers
40+
41+
AMQP message headers are mapped to a `headers` field in the record metadata as a key-value map.
42+
43+
## Get started
44+
45+
To consume messages from an AMQP broker, you can run the plugin from the command line or through the configuration file.
46+
47+
### Command line
48+
49+
The following command will start Fluent Bit with the AMQP input plugin:
50+
51+
```shell
52+
fluent-bit -i amqp -p queue=my_queue -o stdout
53+
```
54+
55+
### Configuration file
56+
57+
In your main configuration file, append the following sections:
58+
59+
{% tabs %}
60+
{% tab title="fluent-bit.yaml" %}
61+
62+
```yaml
63+
pipeline:
64+
inputs:
65+
- name: amqp
66+
queue: my_queue
67+
uri: amqp://guest:guest@localhost:5672/%2F
68+
69+
outputs:
70+
- name: stdout
71+
match: '*'
72+
```
73+
74+
{% endtab %}
75+
{% tab title="fluent-bit.conf" %}
76+
77+
```text
78+
[INPUT]
79+
Name amqp
80+
Queue my_queue
81+
URI amqp://guest:guest@localhost:5672/%2F
82+
83+
[OUTPUT]
84+
Name stdout
85+
Match *
86+
```
87+
88+
{% endtab %}
89+
{% endtabs %}
90+
91+
## Example: Consuming JSON messages
92+
93+
If your AMQP messages contain JSON data, you can use a parser to process them:
94+
95+
{% tabs %}
96+
{% tab title="fluent-bit.yaml" %}
97+
98+
```yaml
99+
parsers:
100+
- name: json
101+
format: json
102+
103+
pipeline:
104+
inputs:
105+
- name: amqp
106+
queue: json_messages
107+
parser: json
108+
uri: amqp://
109+
110+
outputs:
111+
- name: stdout
112+
match: '*'
113+
```
114+
115+
{% endtab %}
116+
{% tab title="fluent-bit.conf" %}
117+
118+
```text
119+
[INPUT]
120+
Name amqp
121+
Queue json_messages
122+
Parser json
123+
URI amqp://
124+
125+
[PARSER]
126+
Name json
127+
Format json
128+
129+
[OUTPUT]
130+
Name stdout
131+
Match *
132+
```
133+
134+
{% endtab %}
135+
{% endtabs %}
136+
137+
## Connection Management
138+
139+
If a connection is lost during operation or startup process, plugin will automatically attempt to reconnect based on `reconnect.retry_limits` and `reconnect.retry_interval` properties.
140+
141+
## Requirements
142+
143+
The AMQP input plugin requires the RabbitMQ C client library (rabbitmq-c) to be installed on the system.

0 commit comments

Comments
 (0)