Skip to content

Commit 6a09d5c

Browse files
committed
feat: first commit
0 parents  commit 6a09d5c

22 files changed

Lines changed: 2777 additions & 0 deletions

.chglog/CHANGELOG.tpl.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{{ range .Versions }}
2+
<a name="{{ .Tag.Name }}"></a>
3+
## {{ if .Tag.Previous }}[{{ .Tag.Name }}]({{ $.Info.RepositoryURL }}/compare/{{ .Tag.Previous.Name }}...{{ .Tag.Name }}){{ else }}{{ .Tag.Name }}{{ end }} ({{ datetime "2006-01-02" .Tag.Date }})
4+
5+
{{ range .CommitGroups -}}
6+
### {{ .Title }}
7+
8+
{{ range .Commits -}}
9+
* {{ if .Scope }}**{{ .Scope }}:** {{ end }}{{ .Subject }} (@[{{ .Author.Name }}]({{ .Author.Email }}))
10+
{{ end }}
11+
{{ end -}}
12+
13+
{{- if .RevertCommits -}}
14+
### Reverts
15+
16+
{{ range .RevertCommits -}}
17+
* {{ .Revert.Header }}
18+
{{ end }}
19+
{{ end -}}
20+
21+
{{- if .MergeCommits -}}
22+
### Pull Requests
23+
24+
{{ range .MergeCommits -}}
25+
* {{ .Header }}
26+
{{ end }}
27+
{{ end -}}
28+
29+
{{- if .NoteGroups -}}
30+
{{ range .NoteGroups -}}
31+
### {{ .Title }}
32+
33+
{{ range .Notes }}
34+
{{ .Body }}
35+
{{ end }}
36+
{{ end -}}
37+
{{ end -}}
38+
{{ end -}}

.chglog/config.yml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
style: github
2+
template: CHANGELOG.tpl.md
3+
info:
4+
title: CHANGELOG
5+
repository_url: https://github.com/DoNewsCode/core-otfranz
6+
options:
7+
commits:
8+
filters:
9+
Type:
10+
- feat
11+
- fix
12+
- perf
13+
- refactor
14+
commit_groups:
15+
title_maps:
16+
feat: ✨ Features
17+
fix: 🐛 Bug Fixes
18+
perf: ⚡️ Performance
19+
refactor: ♻️ Code Refactoring
20+
doc: 📚 Documentation
21+
docs: 📚 Documentation
22+
chore: 🏗 Chore
23+
test: 🚦 Test
24+
style: 🎨 Style
25+
header:
26+
pattern: "^(\\w*)(?:\\(([\\w\\$\\.\\-\\*\\s]*)\\))?\\:\\s(.*)$"
27+
pattern_maps:
28+
- Type
29+
- Scope
30+
- Subject
31+
notes:
32+
keywords:
33+
- BREAKING CHANGE

.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# kafka configs
2+
KAFKA_ADDR=127.0.0.1:9092
3+

.github/workflows/go.yml

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
name: Go
2+
on:
3+
push:
4+
branches:
5+
- master
6+
pull_request:
7+
workflow_dispatch:
8+
jobs:
9+
build:
10+
strategy:
11+
matrix:
12+
go-version: [1.16.x,1.17.x]
13+
runs-on: ubuntu-latest
14+
services:
15+
zookeeper:
16+
image: wurstmeister/zookeeper:latest
17+
ports:
18+
- 2181:2181
19+
kafka:
20+
image: wurstmeister/kafka:latest
21+
ports:
22+
- 9092:9092
23+
env:
24+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
25+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
26+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
27+
KAFKA_CREATE_TOPICS: "test:1:1,trace:1:1,example:1:1,foo:1:1,bar:1:1"
28+
steps:
29+
- uses: actions/checkout@v2
30+
31+
- name: Set up Go
32+
uses: actions/setup-go@v2
33+
with:
34+
go-version: ${{ matrix.go-version }}
35+
36+
- uses: actions/cache@v2
37+
id: cache-go-mod
38+
with:
39+
path: ~/go/pkg/mod
40+
key: ${{ runner.os }}-go-${{ matrix.go-version }}-${{ hashFiles('go.mod') }}
41+
restore-keys: |
42+
${{ runner.os }}-go-${{ matrix.go-version }}
43+
44+
- name: Environment Variables from Dotenv
45+
uses: c-py/action-dotenv-to-setenv@v3
46+
47+
- name: Build
48+
if: steps.cache-go-mod.outputs.cache-hit != 'true'
49+
run: go build -v ./...
50+
51+
- name: Test
52+
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...
53+
54+
- name: Upload coverage
55+
uses: codecov/codecov-action@v1
56+
with:
57+
token: ${{ secrets.CODECOV_TOKEN }}
58+
file: coverage.txt
59+
lint:
60+
name: lint
61+
runs-on: ubuntu-latest
62+
steps:
63+
- uses: actions/checkout@v2
64+
- name: Lint
65+
uses: golangci/golangci-lint-action@v2
66+
with:
67+
version: v1.41
68+
args: --disable errcheck --timeout 5m0s
69+
only-new-issues: true

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*/.DS_Store
2+
core.iml
3+
.idea/*
4+
/.idea/
5+
6+
config/testdata/module_test_partial.json

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2021 DoNews
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
### core-otfranz
2+
3+
Using [franz-go](github.com/twmb/franz-go) as kafka backend of [core](https://github.com/DoNewsCode/core).
4+
5+
The core/otkafka is based on [kafka-go](github.com/segmentio/kafka-go).

client.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package otfranz
2+
3+
import (
4+
"context"
5+
6+
"github.com/opentracing/opentracing-go"
7+
"github.com/opentracing/opentracing-go/ext"
8+
"github.com/twmb/franz-go/pkg/kgo"
9+
)
10+
11+
// Client is a decorator around *kgo.Client that provides tracing capabilities.
12+
type Client struct {
13+
*kgo.Client
14+
tracer opentracing.Tracer
15+
}
16+
17+
// NewClient takes a *kgo.Client and returns a decorated Client.
18+
func NewClient(client *kgo.Client, tracer opentracing.Tracer) *Client {
19+
return &Client{Client: client, tracer: tracer}
20+
}
21+
22+
// ProduceWithTracing wrap Produce method with tracing.
23+
func (c *Client) ProduceWithTracing(ctx context.Context, r *kgo.Record, promise func(*kgo.Record, error)) {
24+
if c.tracer == nil {
25+
c.Produce(ctx, r, promise)
26+
return
27+
}
28+
29+
span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, c.tracer, "kafka producer")
30+
defer span.Finish()
31+
32+
ext.SpanKind.Set(span, ext.SpanKindProducerEnum)
33+
34+
c.Produce(ctx, r, func(record *kgo.Record, err error) {
35+
if err != nil {
36+
ext.LogError(span, err)
37+
}
38+
if promise != nil {
39+
promise(record, err)
40+
}
41+
})
42+
}
43+
44+
// ProduceSyncWithTracing wrap ProduceSync method with tracing.
45+
func (c *Client) ProduceSyncWithTracing(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults {
46+
if c.tracer == nil {
47+
return c.ProduceSync(ctx, rs...)
48+
}
49+
50+
span, ctx := opentracing.StartSpanFromContextWithTracer(ctx, c.tracer, "kafka producer")
51+
defer span.Finish()
52+
53+
ext.SpanKind.Set(span, ext.SpanKindProducerEnum)
54+
55+
return c.ProduceSync(ctx, rs...)
56+
}

client_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package otfranz
2+
3+
import (
4+
"context"
5+
"os"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/DoNewsCode/core/config"
11+
"github.com/go-kit/log"
12+
"github.com/opentracing/opentracing-go"
13+
"github.com/opentracing/opentracing-go/mocktracer"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/twmb/franz-go/pkg/kgo"
16+
)
17+
18+
func TestClient_ProduceWithTracing(t *testing.T) {
19+
if os.Getenv("KAFKA_ADDR") == "" {
20+
t.Skip("set KAFKA_ADDR to run TestProvideFactory")
21+
return
22+
}
23+
addrs := strings.Split(os.Getenv("KAFKA_ADDR"), ",")
24+
factory, cleanup := provideFactory(factoryIn{
25+
Logger: log.NewNopLogger(),
26+
Conf: config.MapAdapter{"kafka": map[string]Config{
27+
"default": {
28+
SeedBrokers: addrs,
29+
DefaultProduceTopic: "tracing",
30+
},
31+
}},
32+
}, func(name string, config *Config) {})
33+
defer cleanup()
34+
cli, err := factory.Make("default")
35+
assert.NoError(t, err)
36+
assert.NotNil(t, cli)
37+
38+
tracer := mocktracer.New()
39+
40+
clientWithTrace := NewClient(cli, tracer)
41+
42+
span, ctx := opentracing.StartSpanFromContextWithTracer(context.Background(), tracer, "test")
43+
record := &kgo.Record{Value: []byte("bar")}
44+
clientWithTrace.ProduceWithTracing(ctx, record, func(r *kgo.Record, err error) {
45+
if err != nil {
46+
t.Fatalf("produce error: %v\n", err)
47+
}
48+
})
49+
time.Sleep(time.Second)
50+
51+
if err := clientWithTrace.ProduceSyncWithTracing(ctx, record).FirstErr(); err != nil {
52+
t.Fatalf("produce sync error: %v\n", err)
53+
}
54+
55+
assert.Len(t, tracer.FinishedSpans(), 2)
56+
span.Finish()
57+
}
58+
59+
func TestClient_ProduceWithOutTracing(t *testing.T) {
60+
if os.Getenv("KAFKA_ADDR") == "" {
61+
t.Skip("set KAFKA_ADDR to run TestProvideFactory")
62+
return
63+
}
64+
addrs := strings.Split(os.Getenv("KAFKA_ADDR"), ",")
65+
factory, cleanup := provideFactory(factoryIn{
66+
Logger: log.NewNopLogger(),
67+
Conf: config.MapAdapter{"kafka": map[string]Config{
68+
"default": {
69+
SeedBrokers: addrs,
70+
DefaultProduceTopic: "tracing",
71+
},
72+
}},
73+
}, func(name string, config *Config) {})
74+
defer cleanup()
75+
cli, err := factory.Make("default")
76+
assert.NoError(t, err)
77+
assert.NotNil(t, cli)
78+
79+
clientWithTrace := NewClient(cli, nil)
80+
81+
record := &kgo.Record{Value: []byte("bar")}
82+
83+
tracer := mocktracer.New()
84+
span, ctx := opentracing.StartSpanFromContextWithTracer(context.Background(), tracer, "test")
85+
86+
clientWithTrace.ProduceWithTracing(ctx, record, func(r *kgo.Record, err error) {
87+
if err != nil {
88+
t.Fatalf("produce error: %v\n", err)
89+
}
90+
})
91+
time.Sleep(time.Second)
92+
93+
if err := clientWithTrace.ProduceSyncWithTracing(ctx, record).FirstErr(); err != nil {
94+
t.Fatalf("produce sync error: %v\n", err)
95+
}
96+
assert.Len(t, tracer.FinishedSpans(), 0)
97+
span.Finish()
98+
}

0 commit comments

Comments
 (0)