Skip to content

Commit 08cb7f1

Browse files
committed
fix after review
1 parent f958baf commit 08cb7f1

17 files changed

Lines changed: 47 additions & 45 deletions

File tree

fd/file.d.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
263263
if len(deadqueueMap) > 0 {
264264
deadqueueType := deadqueue.Get("type").MustString()
265265
if deadqueueType == "" {
266-
return nil, fmt.Errorf("%s doesn't have type", pluginKind)
266+
return nil, fmt.Errorf("deadqueue of %s doesn't have type", pluginKind)
267267
}
268268
deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType)
269269
if err != nil {
@@ -274,12 +274,12 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
274274

275275
deadqueueConfigJson, err := deadqueue.Encode()
276276
if err != nil {
277-
logger.Panicf("can't create config json for %s", t)
277+
logger.Panicf("can't create config json for %s deadqueue", deadqueueType)
278278
}
279279

280280
config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values)
281281
if err != nil {
282-
logger.Fatalf("error on creating %s with type %q: %s", t, pluginKind, err.Error())
282+
logger.Fatalf("error on creating deadqueue of %s with type %q: %s", deadqueueType, pluginKind, err.Error())
283283
}
284284
deadqueueInfo.Config = config
285285

pipeline/backoff.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type RetriableBatcher struct {
1111
outFn RetriableBatcherOutFn
1212
batcher *Batcher
1313
backoffOpts BackoffOpts
14-
DeadQueueIsAvailable bool
14+
IsDeadQueueAvailable bool
1515
onRetryError func(err error, events []*Event)
1616
}
1717

@@ -21,15 +21,15 @@ type BackoffOpts struct {
2121
MinRetention time.Duration
2222
Multiplier float64
2323
AttemptNum int
24-
DeadQueueIsAvailable bool
24+
IsDeadQueueAvailable bool
2525
}
2626

2727
func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher {
2828
batcherBackoff := &RetriableBatcher{
2929
outFn: batcherOutFn,
3030
backoffOpts: opts,
3131
onRetryError: onError,
32-
DeadQueueIsAvailable: opts.DeadQueueIsAvailable,
32+
IsDeadQueueAvailable: opts.IsDeadQueueAvailable,
3333
}
3434
batcherBackoff.setBatcher(batcherOpts)
3535
return batcherBackoff
@@ -66,7 +66,7 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) {
6666
events = batch.events
6767
}
6868
b.onRetryError(err, events)
69-
if batch != nil && b.DeadQueueIsAvailable {
69+
if batch != nil && b.IsDeadQueueAvailable {
7070
batch.reset()
7171
batch.status = BatchStatusInDeadQueue
7272
}

pipeline/backoff_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func TestBackoff(t *testing.T) {
3232
},
3333
BackoffOpts{
3434
AttemptNum: 3,
35-
DeadQueueIsAvailable: false,
35+
IsDeadQueueAvailable: false,
3636
},
3737
errorFn,
3838
)
@@ -61,7 +61,7 @@ func TestBackoffWithError(t *testing.T) {
6161
},
6262
BackoffOpts{
6363
AttemptNum: 3,
64-
DeadQueueIsAvailable: false,
64+
IsDeadQueueAvailable: false,
6565
},
6666
errorFn,
6767
)
@@ -91,7 +91,7 @@ func TestBackoffWithErrorWithDeadQueue(t *testing.T) {
9191
},
9292
BackoffOpts{
9393
AttemptNum: 3,
94-
DeadQueueIsAvailable: true,
94+
IsDeadQueueAvailable: true,
9595
},
9696
errorFn,
9797
)

pipeline/plugin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type ActionPluginParams struct {
5959
type OutputPluginParams struct {
6060
PluginDefaultParams
6161
Controller OutputPluginController
62-
Router Router
62+
Router *Router
6363
Logger *zap.SugaredLogger
6464
}
6565

pipeline/processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func allEventStatuses() []eventStatus {
5555
type processor struct {
5656
id int
5757
streamer *streamer
58-
router Router
58+
router *Router
5959
finalize finalizeFn
6060

6161
activeCounter *atomic.Int32
@@ -88,7 +88,7 @@ func newProcessor(
8888
id: id,
8989
streamer: streamer,
9090
actionMetrics: actionMetrics,
91-
router: *router,
91+
router: router,
9292
finalize: finalizeFn,
9393

9494
activeCounter: activeCounter,

pipeline/router.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (r *Router) Ack(event *Event) {
2727
}
2828

2929
func (r *Router) Fail(event *Event) {
30-
if r.DeadQueueIsAvailable() {
30+
if r.IsDeadQueueAvailable() {
3131
r.deadQueue.Out(event)
3232
}
3333
}
@@ -38,19 +38,19 @@ func (r *Router) Out(event *Event) {
3838

3939
func (r *Router) Stop() {
4040
r.output.Stop()
41-
if r.DeadQueueIsAvailable() {
41+
if r.IsDeadQueueAvailable() {
4242
r.deadQueue.Stop()
4343
}
4444
}
4545

46-
func (r *Router) DeadQueueIsAvailable() bool {
46+
func (r *Router) IsDeadQueueAvailable() bool {
4747
return r.deadQueue != nil
4848
}
4949

5050
func (r *Router) Start(params *OutputPluginParams) {
51-
params.Router = *r
51+
params.Router = r
5252
r.output.Start(r.outputInfo.Config, params)
53-
if r.DeadQueueIsAvailable() {
53+
if r.IsDeadQueueAvailable() {
5454
r.deadQueue.Start(r.deadQueueInfo.Config, params)
5555
}
5656
}

pipeline/router_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestRouterNormalProcessing(t *testing.T) {
7272

7373
params := test.NewEmptyOutputPluginParams()
7474
params.PipelineName = "test_pipeline"
75-
params.Router = *r
75+
params.Router = r
7676
params.Controller = controller
7777
r.Start(params)
7878
defer r.Stop()
@@ -122,7 +122,7 @@ func TestRouterDeadQueueProcessing(t *testing.T) {
122122

123123
params := test.NewEmptyOutputPluginParams()
124124
params.PipelineName = "test_pipeline"
125-
params.Router = *r
125+
params.Router = r
126126
params.Controller = controller
127127
r.Start(params)
128128
defer r.Stop()

plugin/output/clickhouse/clickhouse.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ type Plugin struct {
5959
insertErrorsMetric prometheus.Counter
6060
queriesCountMetric prometheus.Counter
6161

62-
router pipeline.Router
62+
router *pipeline.Router
6363
}
6464

6565
type Setting struct {
@@ -444,12 +444,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
444444
MinRetention: p.config.Retention_,
445445
Multiplier: float64(p.config.RetentionExponentMultiplier),
446446
AttemptNum: p.config.Retry,
447-
DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(),
447+
IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
448448
}
449449

450450
onError := func(err error, events []*pipeline.Event) {
451451
var level zapcore.Level
452-
if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() {
452+
if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
453453
level = zapcore.FatalLevel
454454
} else {
455455
level = zapcore.ErrorLevel

plugin/output/devnull/devnull.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ It provides an API to test pipelines and other plugins.
1212

1313
type Plugin struct {
1414
controller pipeline.OutputPluginController
15-
router pipeline.Router
15+
router *pipeline.Router
1616
outFn func(event *pipeline.Event)
1717
total *atomic.Int64
1818
}

plugin/output/elasticsearch/elasticsearch.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Plugin struct {
5151
sendErrorMetric *prometheus.CounterVec
5252
indexingErrorsMetric prometheus.Counter
5353

54-
router pipeline.Router
54+
router *pipeline.Router
5555
}
5656

5757
// ! config-params
@@ -263,12 +263,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP
263263
MinRetention: p.config.Retention_,
264264
Multiplier: float64(p.config.RetentionExponentMultiplier),
265265
AttemptNum: p.config.Retry,
266-
DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(),
266+
IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(),
267267
}
268268

269269
onError := func(err error, events []*pipeline.Event) {
270270
var level zapcore.Level
271-
if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() {
271+
if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() {
272272
level = zapcore.FatalLevel
273273
} else {
274274
level = zapcore.ErrorLevel

0 commit comments

Comments
 (0)