Skip to content

Commit cf6e915

Browse files
authored
Kafka Reporter for Enhanced Data Reporting (#224)
1 parent 123380c commit cf6e915

60 files changed

Lines changed: 2832 additions & 384 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/e2e.yaml

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
name: E2E
17+
18+
on:
19+
pull_request:
20+
21+
jobs:
22+
build:
23+
name: Build SkyWalking Go agent
24+
runs-on: ubuntu-latest
25+
steps:
26+
- name: Set up Go
27+
uses: actions/setup-go@v2
28+
with:
29+
go-version: 1.19
30+
- uses: actions/checkout@v2
31+
with:
32+
submodules: true
33+
- name: Build SkyWalking Go agent
34+
run: make -C test/e2e build
35+
- name: Upload agent binary
36+
uses: actions/upload-artifact@v4
37+
with:
38+
name: skywalking-go-agent
39+
path: test/e2e/dist/
40+
e2e:
41+
name: Basic function + ${{ matrix.case.name }} reporter
42+
runs-on: ubuntu-latest
43+
needs: build
44+
timeout-minutes: 90
45+
strategy:
46+
matrix:
47+
case:
48+
- name: gRPC
49+
path: test/e2e/case/grpc/e2e.yaml
50+
- name: Kafka
51+
path: test/e2e/case/kafka/e2e.yaml
52+
steps:
53+
- uses: actions/checkout@v2
54+
with:
55+
submodules: true
56+
- name: Download agent binary
57+
uses: actions/download-artifact@v4
58+
with:
59+
name: skywalking-go-agent
60+
path: test/e2e/dist/
61+
- name: Run E2E Tests
62+
uses: apache/skywalking-infra-e2e@7e4b5b68716fdb7b79b21fa8908f9db497e1b115
63+
with:
64+
e2e-file: ${{ matrix.case.path }}

.github/workflows/windows-plugin-test.yaml

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,17 @@ jobs:
3838
Enable-WindowsOptionalFeature -Online -FeatureName Microsoft-Windows-Subsystem-Linux
3939
Enable-WindowsOptionalFeature -Online -FeatureName VirtualMachinePlatform
4040
41-
Invoke-WebRequest -Uri https://wslstorestorage.blob.core.windows.net/wslblob/wsl_update_x64.msi -OutFile wsl_update.msi
42-
Start-Process msiexec.exe -Wait -ArgumentList '/i wsl_update.msi /quiet'
43-
4441
wsl --set-default-version 2
4542
wsl --update
46-
wsl --install -d Ubuntu
43+
44+
$distros = (Invoke-RestMethod -Uri https://raw.githubusercontent.com/microsoft/WSL/refs/heads/master/distributions/DistributionInfo.json).Distributions
45+
$distroDownloadUrl = $distros.Where({ $_.Name -eq 'Ubuntu' }).Amd64PackageUrl
46+
Set-Location $env:GITHUB_WORKSPACE\..\..
47+
$filename = "Ubuntu$([System.IO.Path]::GetExtension($distroDownloadUrl))"
48+
Invoke-WebRequest -Uri $distroDownloadUrl -OutFile $filename
49+
Expand-Archive -Path $filename -DestinationPath .\
50+
$distroExe = (Get-ChildItem -Path . -Filter *.exe).Where({ $_.Name -notmatch 'splash|setup' }).FullName
51+
& $distroExe install --root
4752
4853
@'
4954
@echo off
@@ -56,19 +61,27 @@ jobs:
5661
- name: WSL - Setup Docker
5762
shell: wsl-run {0}
5863
run: |
59-
# Add Docker's official GPG key:
60-
sudo apt-get update
64+
for i in {1..3}; do
65+
sudo rm -r /var/lib/apt/lists/*
66+
sudo mkdir /var/lib/apt/lists/partial
67+
sudo apt-get update -qq && break || echo "apt-get update failed, retrying ($i)…"
68+
done
69+
6170
sudo apt-get install -y ca-certificates curl
6271
sudo install -m 0755 -d /etc/apt/keyrings
6372
sudo curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc
6473
sudo chmod a+r /etc/apt/keyrings/docker.asc
6574
66-
# Add the repository to Apt sources:
6775
echo \
6876
"deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
6977
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
7078
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
71-
sudo apt-get update
79+
80+
for i in {1..3}; do
81+
sudo rm -r /var/lib/apt/lists/*
82+
sudo mkdir /var/lib/apt/lists/partial
83+
sudo apt-get update -qq && break || echo "apt-get update failed, retrying ($i)…"
84+
done
7285
7386
sudo apt-get install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
7487

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
.cache/
66
/bin
77
coverage.txt
8+
/test/e2e/dist/
89
/test/plugins/workspace
910
/test/plugins/dist/
1011
*.pb.go

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Release Notes.
77
#### Features
88

99
* Support Windows plugin test.
10+
* Support Kafka reporter.
1011

1112
#### Plugins
1213

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ test: ## Run E2E scenario tests
8787
lint: linter ## Run golangci-lint linter
8888
@$(LOG_TARGET)
8989
@for dir in $$(find . -name go.mod -exec dirname {} \; ); do \
90-
if [[ $$dir == "./test/plugins/scenarios/"* ]]; then \
90+
if [[ $$dir == "./test/plugins/scenarios/"* ]] || [[ $$dir == "./test/e2e/"* ]]; then \
9191
continue; \
9292
fi; \
9393
echo "Linting $$dir"; \

agent/reporter/imports.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
_ "os"
2828
_ "strconv"
2929
_ "strings"
30+
_ "sync"
3031
_ "time"
3132

3233
// imports the logs for reporter
@@ -55,6 +56,11 @@ import (
5556
_ "google.golang.org/grpc/stats"
5657
_ "google.golang.org/grpc/status"
5758

59+
// imports required packages for Kafka reporter
60+
_ "github.com/segmentio/kafka-go"
61+
_ "github.com/segmentio/kafka-go/compress"
62+
_ "google.golang.org/protobuf/proto"
63+
5864
// imports protocols between agent and backend
5965
_ "skywalking.apache.org/repo/goapi/collect/agent/configuration/v3"
6066
_ "skywalking.apache.org/repo/goapi/collect/common/v3"
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Kafka Reporter
2+
3+
This document describes how to configure and use the Kafka reporter in the Apache SkyWalking Go agent. The Kafka reporter provides an alternative to the default gRPC reporter, allowing you to send trace, metrics, and log data to Apache Kafka.
4+
5+
## Overview
6+
7+
The SkyWalking Go agent can be configured to report collected telemetry data (traces, metrics, logs) to a Kafka cluster. This is useful for scenarios where Kafka is already part of your infrastructure or when you prefer Kafka's buffering and scalability features for handling observability data.
8+
9+
**Note:** Even when the primary data reporting is set to Kafka (`reporter.type: kafka`), the CDS functionality itself still relies on gRPC communication with the SkyWalking OAP (Observability Analysis Platform). Therefore, you **must** also configure the relevant gRPC settings under `reporter.grpc` (or their corresponding environment variables like `SW_AGENT_REPORTER_GRPC_BACKEND_SERVICE`) for CDS to work correctly.
10+
11+
12+
## Enabling Kafka Reporter
13+
14+
You can enable the Kafka reporter either through environment variables or by configuring the `agent.default.yaml` file.
15+
16+
**Using Environment Variables:**
17+
18+
Set the `SW_AGENT_REPORTER_TYPE` environment variable to `kafka`:
19+
```bash
20+
export SW_AGENT_REPORTER_TYPE=kafka
21+
```
22+
23+
**Using `agent.default.yaml`:**
24+
25+
Modify the `reporter.type` setting in your `agent.default.yaml` configuration file:
26+
```yaml
27+
reporter:
28+
type: kafka # or grpc
29+
# ... other global reporter settings
30+
```
31+
32+
## Configuration
33+
34+
The Kafka reporter requires specific configurations for connecting to your Kafka cluster and specifying topics for different data types. These can be set via environment variables or in the `agent.default.yaml` file. Environment variable names generally follow the pattern `SW_AGENT_REPORTER_KAFKA_OPTION_NAME_IN_UPPERCASE`.
35+
36+
### Core Kafka Configuration
37+
38+
These settings are typically found under the `reporter.kafka` section in `agent.default.yaml` or can be set using the corresponding environment variables.
39+
40+
* **Broker Addresses:**
41+
A comma-separated list of Kafka broker addresses.
42+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_BROKERS`
43+
* YAML: `reporter.kafka.brokers`
44+
* Example: `kafka1:9092,kafka2:9092`
45+
46+
47+
* **Topic for Segments:**
48+
The Kafka topic where trace segments will be sent.
49+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_TOPIC_SEGMENT`
50+
* YAML: `reporter.kafka.topic_segment`
51+
* Example: `skywalking-segments`
52+
53+
54+
* **Topic for Metrics:**
55+
The Kafka topic where metrics data will be sent.
56+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_TOPIC_METER`
57+
* YAML: `reporter.kafka.topic_meter`
58+
* Example: `skywalking-meters`
59+
60+
61+
* **Topic for Logs:**
62+
The Kafka topic where log data will be sent.
63+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_TOPIC_LOGGING`
64+
* YAML: `reporter.kafka.topic_logging`
65+
* Example: `skywalking-logs`
66+
67+
68+
* **Topic for Management:** (Optional)
69+
The Kafka topic for management-related messages (e.g., potentially for configurations or commands in future use).
70+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_TOPIC_MANAGEMENT`
71+
* YAML: `reporter.kafka.topic_management`
72+
* Example: `skywalking-management`
73+
74+
75+
### Advanced Kafka Configuration
76+
77+
These options allow fine-tuning of the Kafka producer behavior. They can also be configured via environment variables or in `agent.default.yaml` under `reporter.kafka`.
78+
79+
* **Send Queue Size (Tracing, Metrics, Logs):**
80+
The maximum size of the internal queue for buffering data before sending for each data type.
81+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_MAX_SEND_QUEUE`
82+
* YAML: `reporter.kafka.max_send_queue`
83+
* Default: `5000`
84+
85+
86+
* **Batch Size:**
87+
The maximum number of messages batched before being sent to a partition.
88+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_BATCH_SIZE`
89+
* YAML: `reporter.kafka.batchSize`
90+
* Default: `1000`
91+
92+
93+
* **Batch Bytes:**
94+
The maximum total bytes batched before being sent to a partition.
95+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_BATCH_BYTES`
96+
* YAML: `reporter.kafka.batchBytes`
97+
* Default: `1048576` (1MB)
98+
99+
100+
* **Batch Timeout:**
101+
The maximum time the producer will wait before sending a batch, even if `batchSize` or `batchBytes` is not met. (e.g., "1s", "500ms").
102+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_BATCH_TIMEOUT`
103+
* YAML: `reporter.kafka.batchTimeout`
104+
* Default: `1s` (1 second)
105+
106+
107+
* **Acknowledgement (Acks):**
108+
The level of acknowledgement required from Kafka brokers.
109+
* `0`: No acknowledgement (producer does not wait).
110+
* `1`: Leader acknowledgement (producer waits for the leader to write the record).
111+
* `-1`: All in-sync replicas acknowledgement.
112+
* Environment Variable: `SW_AGENT_REPORTER_KAFKA_ACKS`
113+
* YAML: `reporter.kafka.acks`
114+
* Default: `1` (leader)
115+
116+
117+
### Example `agent.default.yaml` Snippet for Kafka:
118+
119+
```yaml
120+
reporter:
121+
# Reporter type: "grpc" or "kafka"
122+
type: ${SW_AGENT_REPORTER_TYPE:kafka}
123+
kafka:
124+
# Kafka broker addresses, comma separated
125+
brokers: ${SW_AGENT_REPORTER_KAFKA_BROKERS:127.0.0.1:9092}
126+
# Topic for segments data
127+
topic_segment: ${SW_AGENT_REPORTER_KAFKA_TOPIC_SEGMENT:skywalking-segments}
128+
# Topic for meters data
129+
topic_meter: ${SW_AGENT_REPORTER_KAFKA_TOPIC_METER:skywalking-meters}
130+
# Topic for logging data
131+
topic_logging: ${SW_AGENT_REPORTER_KAFKA_TOPIC_LOGGING:skywalking-logs}
132+
# Topic for management data
133+
topic_management: ${SW_AGENT_REPORTER_KAFKA_TOPIC_MANAGEMENT:skywalking-managements}
134+
# Send queue size
135+
max_send_queue: ${SW_AGENT_REPORTER_KAFKA_MAX_SEND_QUEUE:5000}
136+
# Batch size
137+
batch_size: ${SW_AGENT_REPORTER_KAFKA_BATCH_SIZE:1000}
138+
# Batch bytes
139+
batch_bytes: ${SW_AGENT_REPORTER_KAFKA_BATCH_BYTES:1048576}
140+
# Batch timeout millis
141+
batch_timeout_millis: ${SW_AGENT_REPORTER_KAFKA_BATCH_TIMEOUT_MILLIS:1000}
142+
# Acknowledge, 0: none, 1: leader, -1: all
143+
acks: ${SW_AGENT_REPORTER_KAFKA_ACKS:1}
144+
```
145+
146+
## Data Format
147+
148+
The agent transforms the collected spans, metrics, and logs into the standard Apache SkyWalking data format before sending them to Kafka. The SkyWalking OAP (Observability Analysis Platform) should be configured with Kafka fetchers to consume and process this data from the specified Kafka topics. Ensure your OAP version is compatible and configured correctly to ingest data reported via Kafka.

docs/menu.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ catalog:
4242
path: /en/agent/plugin-configurations
4343
- name: Transport Layer Security (TLS)
4444
path: /en/advanced-features/grpc-tls
45+
- name: Kafka Reporter
46+
path: /en/advanced-features/kafka-reporter
4547
- name: Manual APIs
4648
catalog:
4749
- name: Tracing APIs

go.work

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ use (
3636
./test/plugins/runner-helper
3737

3838
// define the plugin test scenarios
39+
./test/e2e/base/consumer
40+
./test/e2e/base/provider
41+
3942
./test/plugins/scenarios/dubbo
4043
./test/plugins/scenarios/gin
4144
./test/plugins/scenarios/short_versions_gin

go.work.sum

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,7 @@ github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4x
508508
github.com/apache/dubbo-getty v1.4.9-0.20221022181821-4dc6252ce98c h1:2LE4IlyVBBlMo0ZDI+vq9YIb35dyij1YR5EnNWVPnNQ=
509509
github.com/apache/dubbo-go-hessian2 v1.11.5 h1:rcK22+yMw2Hejm6GRG7WrdZ0DinW2QMZc01c7YVZjcQ=
510510
github.com/apache/skywalking-go v0.0.0-20230411034404-b9270e98036b/go.mod h1:K0KR1RwXnGRlhw/ANpJ4Ex2CzhT4ST78MxmgHNi3/Z4=
511+
github.com/apache/skywalking-go v0.6.0/go.mod h1:vrfjgJ+a1R4N8v3s/1w7IuZrQoXsREnuZUQef+bzJ9c=
511512
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
512513
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
513514
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
@@ -904,8 +905,6 @@ github.com/go-kratos/swagger-api v1.0.0 h1:R1JcAqLSvfDzqHoDnpK3Jt1MEJAXGNifV7AnB
904905
github.com/go-ldap/ldap v3.0.2+incompatible h1:kD5HQcAzlQ7yrhfn+h+MSABeAy/jAJhvIJ/QDllP44g=
905906
github.com/go-ldap/ldap/v3 v3.1.10 h1:7WsKqasmPThNvdl0Q5GPpbTDD/ZD98CfuawrMIuh7qQ=
906907
github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
907-
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
908-
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
909908
github.com/go-micro/plugins/v4/auth/jwt v1.2.0 h1:+dJHFnobRhX3QvTlHKCj5Sn2eeFsfNFzmFCc4HX5ULQ=
910909
github.com/go-micro/plugins/v4/auth/jwt v1.2.0/go.mod h1:6tHH9WrpbEwmNQKJj8DutZtZdvY9gl0XBRJPnzEwl6Q=
911910
github.com/go-micro/plugins/v4/broker/nats v1.1.0 h1:JCo2JksH0M6HC8q8L+x9QCqGRr5M0BADQw2Q50TWCas=
@@ -1223,7 +1222,6 @@ github.com/lyft/protoc-gen-validate v0.0.13 h1:KNt/RhmQTOLr7Aj8PsJ7mTronaFyx80mR
12231222
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
12241223
github.com/magiconair/properties v1.8.4/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
12251224
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
1226-
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
12271225
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
12281226
github.com/marstr/guid v1.1.0 h1:/M4H/1G4avsieL6BbUwCOBzulmoeKVP5ux/3mQNnbyI=
12291227
github.com/marstr/guid v1.1.0/go.mod h1:74gB1z2wpxxInTG6yaqA7KrtM0NZ+RbrcqDvYHefzho=
@@ -1355,7 +1353,6 @@ github.com/oklog/oklog v0.3.2 h1:wVfs8F+in6nTBMkA7CbRw+zZMIB7nNM825cM1wuzoTk=
13551353
github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw=
13561354
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
13571355
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5 h1:58+kh9C6jJVXYjt8IE48G2eWl6BjwU5Gj0gqY84fy78=
1358-
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
13591356
github.com/oliveagle/jsonpath v0.0.0-20180606110733-2e52cf6e6852 h1:Yl0tPBa8QPjGmesFh1D0rDy+q1Twx6FyU7VWHi8wZbI=
13601357
github.com/onsi/ginkgo v0.0.0-20151202141238-7f8ab55aaf3b/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
13611358
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -1543,7 +1540,6 @@ github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+
15431540
github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
15441541
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
15451542
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
1546-
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
15471543
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
15481544
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
15491545
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=

0 commit comments

Comments
 (0)