Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 88 additions & 17 deletions fd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (

func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
capacity := pipeline.DefaultCapacity
antispamThreshold := pipeline.DefaultAntispamThreshold
var antispamExceptions antispam.Exceptions
sourceNameMetaField := pipeline.DefaultSourceNameMetaField
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
Expand All @@ -32,6 +30,11 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
metaCacheSize := pipeline.DefaultMetaCacheSize
pool := ""

antispamThreshold := pipeline.DefaultAntispamThreshold
antispamMaintenanceInterval := pipeline.DefaultMaintenanceInterval
var antispamExceptions antispam.Exceptions
var antispamRules antispam.Rules

metricHoldDuration := pipeline.DefaultMetricHoldDuration
metricMaxLabelValueLength := pipeline.DefaultMetricMaxLabelValueLength

Expand Down Expand Up @@ -89,20 +92,40 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
eventTimeout = i
}

antispamThreshold = settings.Get("antispam_threshold").MustInt()
antispamThreshold *= int(maintenanceInterval / time.Second)
if antispamThreshold < 0 {
logger.Warn("negative antispam_threshold value, antispam disabled")
antispamThreshold = 0
}

var err error
antispamExceptions, err = extractAntispamExceptions(settings)
if err != nil {
logger.Fatalf("extract exceptions: %s", err)
}
antispamExceptions.Prepare()

antispamSettings := settings.Get("antispam")
str = antispamSettings.Get("maintenance_interval").MustString()
if str != "" {
i, err := time.ParseDuration(str)
if err != nil {
logger.Fatalf("can't parse antispam maintenance interval: %s", err.Error())
}
antispamMaintenanceInterval = i
}

antispamThreshold = antispamSettings.Get("threshold").MustInt(pipeline.DefaultAntispamThreshold)
if mp, _ := antispamSettings.Map(); mp == nil {
antispamThreshold = settings.Get("antispam_threshold").MustInt(pipeline.DefaultAntispamThreshold)
}
if antispamThreshold < pipeline.DefaultAntispamThreshold {
logger.Warn("invalid antispam_threshold value, antispam disabled")
antispamThreshold = pipeline.DefaultAntispamThreshold
}
if antispamThreshold != pipeline.DefaultAntispamThreshold {
antispamThreshold *= int(antispamMaintenanceInterval / time.Second)
}

antispamRules, err = extractAntispamRules(antispamSettings, antispamMaintenanceInterval)
if err != nil {
logger.Fatalf("extract antispam rules: %s", err)
}

sourceNameMetaField = settings.Get("source_name_meta_field").MustString()
isStrict = settings.Get("is_strict").MustBool()

Expand Down Expand Up @@ -139,14 +162,18 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
MaxEventSize: maxInputEventSize,
CutOffEventByLimit: cutOffEventByLimit,
CutOffEventByLimitField: cutOffEventByLimitField,
AntispamThreshold: antispamThreshold,
AntispamExceptions: antispamExceptions,
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
Pool: pipeline.PoolType(pool),
Antispam: pipeline.AntispamSettings{
Threshold: antispamThreshold,
Rules: antispamRules,
Exceptions: antispamExceptions,
MaintenanceInterval: antispamMaintenanceInterval,
},
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
StreamField: streamField,
IsStrict: isStrict,
Pool: pipeline.PoolType(pool),
Metric: &pipeline.MetricSettings{
HoldDuration: metricHoldDuration,
MaxLabelValueLength: metricMaxLabelValueLength,
Expand All @@ -171,6 +198,50 @@ func extractAntispamExceptions(settings *simplejson.Json) (antispam.Exceptions,
return exceptions, nil
}

func extractAntispamRules(settings *simplejson.Json, antispamMaintenanceInterval time.Duration) (antispam.Rules, error) {
rulesJSON := settings.Get("rules")
rulesRaw := rulesJSON.MustArray()
if len(rulesRaw) == 0 {
return nil, nil
}

rules := make(antispam.Rules, 0, len(rulesRaw))
for i := range rulesRaw {
ruleJSON := rulesJSON.GetIndex(i)

name := ruleJSON.Get("name").MustString()
if name == "" {
return nil, fmt.Errorf("name must be set")
}

threshold := ruleJSON.Get("threshold").MustInt()
if threshold < pipeline.DefaultAntispamThreshold {
logger.Warnf("invalid threshold value, antispam disabled for rule #%d", i)
threshold = pipeline.DefaultAntispamThreshold
}
if threshold != pipeline.DefaultAntispamThreshold {
threshold *= int(antispamMaintenanceInterval / time.Second)
}

doIfChecker, err := extractDoIfChecker(ruleJSON.Get("do_if"))
if err != nil {
return nil, err
}

if doIfChecker == nil {
return nil, fmt.Errorf("missing do_if section, rule #%d", i)
}

rules = append(rules, antispam.Rule{
Name: name,
Threshold: threshold,
DoIfChecker: doIfChecker,
})
}

return rules, nil
}

func extractMatchMode(actionJSON *simplejson.Json) pipeline.MatchMode {
mm := actionJSON.Get("match_mode").MustString()
return pipeline.MatchModeFromString(mm)
Expand Down
65 changes: 62 additions & 3 deletions pipeline/README.idoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Which field in the log indicates `stream`. Mostly used for distinguishing `stdou

**`maintenance_interval`** *`string`* *`default=5s`*

How often to perform maintenance. Maintenance includes antispammer maintenance and metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`<number>(ms|s|m|h)`).
How often to perform maintenance. Maintenance includes metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

Expand All @@ -64,16 +64,20 @@ How long the event can process in action plugins and block stream in streamer un

<br>

**`antispam_threshold`** *`int`* *`default=0`*
**`antispam_threshold`** *`int`* *`default=-1`*

Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to 0 antispammer is disabled. If set to the value greater than 0 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time.
Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to -1 antispammer is disabled. If set to the value greater than -1 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time.

> ⚠ DEPRECATED. Use `threshold` in `antispam` instead.

<br>

**`antispam_exceptions`** *`[]`[antispam.Exception](/pipeline/antispam/README.md#exception-parameters)*

The list of antispammer exceptions. If the log matches at least one of the exceptions it is not accounted in antispammer.

> ⚠ DEPRECATED. Use `rules` in `antispam` instead.

<br>

**`meta_cache_size`** *`int`* *`default=1024`*
Expand Down Expand Up @@ -146,6 +150,8 @@ pipelines:
max_label_value_length: 100
```

<br>

**`hold_duration`** *`string`* *`default=30m`*

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).
Expand All @@ -158,6 +164,59 @@ Maximum length of custom metric labels in action plugins. If zero, no limit is s

<br>

## Antispam

Section for antispam in settings. Example:

```yaml
pipelines:
test:
settings:
antispam:
threshold: 3000
maintenance_interval: 5s
rules:
- name: ban_all
threshold: 0
do_if:
op: equal
field: source_name
values: ["test.log"]
- name: custom_threshold
threshold: 5000
do_if:
op: and
operands:
- op: contains
data: meta.service
values:
- test_service
- op: prefix
data: event
values:
- '{"level":"debug"'
```

<br>

**`maintenance_interval`** *`string`* *`default=5s`*

How often to perform antispammer maintenance. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

**`threshold`** *`int`* *`default=-1`*

Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to -1 antispammer is disabled. If set to the value greater than -1 antispammer is enabled and bans sources which write `threshold` or more logs in `maintenance_interval` time. If the `antispam` section is not specified, then `antispam_threshold` is used.

<br>

**`rules`** *`[]`[antispam.Rule](/pipeline/antispam/README.md#rules)*

The list of antispammer rules. If the log matches at least one of the exceptions it is not accounted in antispammer.

<br>

## Datetime parse formats

Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go [time.Parse](https://pkg.go.dev/time#Parse) (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases.
Expand Down
65 changes: 62 additions & 3 deletions pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Which field in the log indicates `stream`. Mostly used for distinguishing `stdou

**`maintenance_interval`** *`string`* *`default=5s`*

How often to perform maintenance. Maintenance includes antispammer maintenance and metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`<number>(ms|s|m|h)`).
How often to perform maintenance. Maintenance includes metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

Expand All @@ -64,16 +64,20 @@ How long the event can process in action plugins and block stream in streamer un

<br>

**`antispam_threshold`** *`int`* *`default=0`*
**`antispam_threshold`** *`int`* *`default=-1`*

Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to 0 antispammer is disabled. If set to the value greater than 0 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time.
Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to -1 antispammer is disabled. If set to the value greater than -1 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time.

> ⚠ DEPRECATED. Use `threshold` in `antispam` instead.

<br>

**`antispam_exceptions`** *`[]`[antispam.Exception](/pipeline/antispam/README.md#exception-parameters)*

The list of antispammer exceptions. If the log matches at least one of the exceptions it is not accounted in antispammer.

> ⚠ DEPRECATED. Use `rules` in `antispam` instead.

<br>

**`meta_cache_size`** *`int`* *`default=1024`*
Expand Down Expand Up @@ -146,6 +150,8 @@ pipelines:
max_label_value_length: 100
```

<br>

**`hold_duration`** *`string`* *`default=30m`*

The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`<number>(ms|s|m|h)`).
Expand All @@ -158,6 +164,59 @@ Maximum length of custom metric labels in action plugins. If zero, no limit is s

<br>

## Antispam

Section for antispam in settings. Example:

```yaml
pipelines:
test:
settings:
antispam:
threshold: 3000
maintenance_interval: 5s
rules:
- name: ban_all
threshold: 0
do_if:
op: equal
field: source_name
values: ["test.log"]
- name: custom_threshold
threshold: 5000
do_if:
op: and
operands:
- op: contains
data: meta.service
values:
- test_service
- op: prefix
data: event
values:
- '{"level":"debug"'
```

<br>

**`maintenance_interval`** *`string`* *`default=5s`*

How often to perform antispammer maintenance. The value must be passed in format of duration (`<number>(ms|s|m|h)`).

<br>

**`threshold`** *`int`* *`default=-1`*

Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to -1 antispammer is disabled. If set to the value greater than -1 antispammer is enabled and bans sources which write `threshold` or more logs in `maintenance_interval` time. If the `antispam` section is not specified, then `antispam_threshold` is used.

<br>

**`rules`** *`[]`[antispam.Rule](/pipeline/antispam/README.md#rules)*

The list of antispammer rules. If the log matches at least one of the exceptions it is not accounted in antispammer.

<br>

## Datetime parse formats

Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go [time.Parse](https://pkg.go.dev/time#Parse) (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases.
Expand Down
Loading
Loading