Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## v1.11.0

- chore: improve kakfa component

## v1.10.0

- feat: support async flush log to disk
Expand Down
22 changes: 22 additions & 0 deletions config/local/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
default: # 实例名称
Version: "2.8.1"
RequiredAcks: 1
Topic: "test-topic"
ConsumeTopic:
- "test-topic1"
- "test-topic2"
Brokers:
- "localhost:9092"
GroupID: "test-group"
Partitioner: "hash"

order: # 另一个实例配置
Version: "2.8.1"
RequiredAcks: 1
Topic: "order-topic"
ConsumeTopic:
- "order-topic"
Brokers:
- "localhost:9092"
GroupID: "order-group"
Partitioner: "random"
22 changes: 22 additions & 0 deletions examples/queue/kafka/config/app.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Name: eagle
Version: 1.0.0
PprofPort: :5555
Mode: debug # debug, release, test
JwtSecret: JWT_SECRET
JwtTimeout: 86400
CookieName: jwt-token
SSL: true
CtxDefaultTimeout: 12
CSRF: true
Debug: false
EnableTrace: false
EnablePprof: true

HTTP:
Addr: :8080
ReadTimeout: 3s
WriteTimeout: 3s
GRPC:
Addr: :9090
ReadTimeout: 5s
WriteTimeout: 5s
22 changes: 22 additions & 0 deletions examples/queue/kafka/config/kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
default: # 实例名称
Version: "2.8.1"
RequiredAcks: 1
Topic: "test-topic"
ConsumeTopic:
- "test-topic1"
- "test-topic2"
Brokers:
- "localhost:9092"
GroupID: "test-group"
Partitioner: "hash"

order: # 另一个实例配置
Version: "2.8.1"
RequiredAcks: 1
Topic: "order-topic"
ConsumeTopic:
- "order-topic"
Brokers:
- "localhost:9092"
GroupID: "order-group"
Partitioner: "random"
14 changes: 14 additions & 0 deletions examples/queue/kafka/config/logger.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Development: false
DisableCaller: false
DisableStacktrace: false
Encoding: json # json or console
Level: info # 日志级别,INFO, WARN, ERROR
Name: eagle
Writers: console # 有2个可选项:file,console 选择file会将日志记录到logger_file指定的日志文件中,选择console会将日志输出到标准输出,当然也可以两者同时选择
LoggerFile: /tmp/log/eagle.log
LoggerWarnFile: /tmp/log/eagle.wf.log
LoggerErrorFile: /tmp/log/eagle.err.log
LogRollingPolicy: daily
LogRotateDate: 1
LogRotateSize: 1
LogBackupCount: 7
52 changes: 38 additions & 14 deletions examples/queue/kafka/main.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,51 @@
package main

import (
"context"
"log"
"os"

"github.com/Shopify/sarama"
"github.com/go-eagle/eagle/pkg/queue/kafka"
)

func main() {
var (
config = sarama.NewConfig()
logger = log.New(os.Stderr, "[sarama_logger]", log.LstdFlags)
groupID = "sarama_consumer"
topic = "go-message-broker-topic"
brokers = []string{"localhost:9093"}
message = "Hello World Kafka!"
)
// 1. 初始化配置
kafka.Load()
defer kafka.Close()

// kafka publish message
kafka.NewProducer(config, logger, topic, brokers).Publish(message)
// 2. 获取配置信息(可选)
configs := kafka.GetConfig()
if len(configs) == 0 {
log.Fatal("No kafka config found")
}

// kafka consume message
kafka.NewConsumer(config, logger, topic, groupID, brokers).Consume()
// 3. 使用配置进行消息发布
ctx := context.Background()
err := kafka.Publish(ctx, "default", "test-topic", "hello world")
if err != nil {
log.Printf("Failed to publish message: %v", err)
}

// 4. 使用配置进行消息消费
handler := func(data []byte) error {
log.Printf("Received message: %s", string(data))
return nil
}

// 从默认实例消费
go func() {
err := kafka.ConsumePartition(ctx, "default", "test-topic", handler)
if err != nil {
log.Printf("Failed to consume message: %v", err)
}
}()

// 从order实例消费
go func() {
err := kafka.ConsumePartition(ctx, "order", "order-topic", handler)
if err != nil {
log.Printf("Failed to consume message: %v", err)
}
}()

select {}
}
34 changes: 20 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/go-eagle/eagle
go 1.22

require (
github.com/Shopify/sarama v1.19.0
github.com/IBM/sarama v1.45.1
github.com/alicebob/miniredis/v2 v2.15.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/dgraph-io/ristretto v0.1.0
Expand Down Expand Up @@ -44,7 +44,7 @@ require (
github.com/spf13/cast v1.4.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.0
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
github.com/swaggo/gin-swagger v1.2.0
github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf
github.com/toolkits/net v0.0.0-20160910085801-3f39ab6fe3ce
Expand All @@ -66,8 +66,8 @@ require (
go.opentelemetry.io/otel/trace v1.26.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.31.0
golang.org/x/sync v0.10.0
golang.org/x/crypto v0.33.0
golang.org/x/sync v0.11.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.33.0
gorm.io/driver/clickhouse v0.6.1
Expand All @@ -82,7 +82,6 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.23.2 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/KyleBanks/depth v1.2.1 // indirect
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
Expand All @@ -98,8 +97,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.1.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
Expand All @@ -120,10 +119,13 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.2.4 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -132,11 +134,16 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.5.4 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/lestrrat-go/strftime v1.0.5 // indirect
Expand All @@ -157,14 +164,13 @@ require (
github.com/paulmach/orb v0.11.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/pierrec/lz4 v2.5.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/extra/rediscmd/v9 v9.0.5 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
Expand All @@ -190,9 +196,9 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
Loading
Loading