Skip to content

Commit aa3e104

Browse files
committed
add global option to loosen ordering guarantees in stateless transforms in exchange for potential performance benefits
1 parent 5ab0ba0 commit aa3e104

10 files changed

Lines changed: 502 additions & 3 deletions

File tree

lib/vector-core/src/config/global_options.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,15 @@ pub struct GlobalOptions {
190190
/// `find_vector_metrics`, and `aggregate_vector_metrics` functions.
191191
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
192192
pub metrics_storage_refresh_period: Option<f64>,
193+
194+
/// Whether to preserve event ordering for concurrent stateless transforms.
195+
///
196+
/// When `true` (the default), concurrent stateless transforms use `FuturesOrdered`,
197+
/// guaranteeing output order matches input order. When `false`, `FuturesUnordered`
198+
/// is used, which may improve throughput at the cost of ordering guarantees.
199+
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
200+
#[configurable(metadata(docs::common = false, docs::required = false))]
201+
pub preserve_ordering_stateless_transforms: Option<bool>,
193202
}
194203

195204
impl_generate_config_from_default!(GlobalOptions);
@@ -295,6 +304,16 @@ impl GlobalOptions {
295304
errors.push("conflicting values for 'expire_metrics_secs' found".to_owned());
296305
}
297306

307+
if conflicts(
308+
self.preserve_ordering_stateless_transforms.as_ref(),
309+
with.preserve_ordering_stateless_transforms.as_ref(),
310+
) {
311+
errors.push(
312+
"conflicting values for 'preserve_ordering_stateless_transforms' found"
313+
.to_owned(),
314+
);
315+
}
316+
298317
let data_dir = if self.data_dir.is_none() || self.data_dir == default_data_dir() {
299318
with.data_dir
300319
} else if with.data_dir != default_data_dir() && self.data_dir != with.data_dir {
@@ -345,6 +364,9 @@ impl GlobalOptions {
345364
metrics_storage_refresh_period: self
346365
.metrics_storage_refresh_period
347366
.or(with.metrics_storage_refresh_period),
367+
preserve_ordering_stateless_transforms: self
368+
.preserve_ordering_stateless_transforms
369+
.or(with.preserve_ordering_stateless_transforms),
348370
})
349371
} else {
350372
Err(errors)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
optimization_goal: ingress_throughput
2+
3+
target:
4+
name: vector
5+
command: /usr/bin/vector
6+
cpu_allotment: 6
7+
memory_allotment: 8GiB
8+
9+
environment:
10+
VECTOR_THREADS: 4
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
generator:
2+
- http:
3+
seed:
4+
[
5+
2,
6+
3,
7+
5,
8+
7,
9+
11,
10+
13,
11+
17,
12+
19,
13+
23,
14+
29,
15+
31,
16+
37,
17+
41,
18+
43,
19+
47,
20+
53,
21+
59,
22+
61,
23+
67,
24+
71,
25+
73,
26+
79,
27+
83,
28+
89,
29+
97,
30+
101,
31+
103,
32+
107,
33+
109,
34+
113,
35+
127,
36+
131
37+
]
38+
headers:
39+
dd-api-key: "DEADBEEF"
40+
target_uri: "http://localhost:8282/v1/input"
41+
bytes_per_second: "500 Mb"
42+
parallel_connections: 10
43+
method:
44+
post:
45+
variant: "datadog_log"
46+
maximum_prebuild_cache_size_bytes: "256 Mb"
47+
target_metrics:
48+
- prometheus: # internal telemetry
49+
uri: "http://127.0.0.1:9090/metrics"
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
data_dir: "/var/lib/vector"
2+
3+
##
4+
## Sources
5+
##
6+
7+
sources:
8+
internal_metrics:
9+
type: "internal_metrics"
10+
11+
datadog_agent:
12+
type: "datadog_agent"
13+
acknowledgements: false
14+
address: "0.0.0.0:8282"
15+
16+
##
17+
## Transforms
18+
##
19+
20+
transforms:
21+
remap_regex:
22+
type: "remap"
23+
inputs: ["datadog_agent"]
24+
source: |
25+
msg = string!(.message)
26+
27+
# Credit card patterns
28+
.match_visa = match(msg, r'^4[0-9]{12}(?:[0-9]{3})?$')
29+
.match_mastercard = match(msg, r'^5[1-5][0-9]{14}$')
30+
.match_amex = match(msg, r'^3[47][0-9]{13}$')
31+
.match_discover = match(msg, r'^6(?:011|5[0-9]{2})[0-9]{12}$')
32+
.match_diners = match(msg, r'^3(?:0[0-5]|[68][0-9])[0-9]{11}$')
33+
34+
# Network patterns
35+
.match_ipv4 = match(msg, r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
36+
.match_ipv6 = match(msg, r'(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}')
37+
.match_mac_addr = match(msg, r'^([0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$')
38+
.match_cidr = match(msg, r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}\b')
39+
.match_port = match(msg, r':[0-9]{1,5}\b')
40+
41+
# Email and URL patterns
42+
.match_email = match(msg, r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
43+
.match_url = match(msg, r'https?://[^\s/$.?#].[^\s]*')
44+
.match_domain = match(msg, r'\b[a-zA-Z0-9-]+\.[a-zA-Z]{2,}\b')
45+
.match_uri_path = match(msg, r'/[a-zA-Z0-9._~:/?#\[\]@!$&()*+,;=-]+')
46+
.match_query_string = match(msg, r'\?[a-zA-Z0-9_]+=[a-zA-Z0-9_%+-]+')
47+
48+
# Date and time patterns
49+
.match_iso8601 = match(msg, r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}')
50+
.match_date_slash = match(msg, r'\d{2}/\d{2}/\d{4}')
51+
.match_date_dash = match(msg, r'\d{4}-\d{2}-\d{2}')
52+
.match_time_24h = match(msg, r'\b([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]\b')
53+
.match_timestamp_unix = match(msg, r'\b1[0-9]{9}\b')
54+
55+
# Log level patterns
56+
.match_log_error = match(msg, r'(?i)\b(error|err|fatal|critical|crit)\b')
57+
.match_log_warn = match(msg, r'(?i)\b(warn|warning)\b')
58+
.match_log_info = match(msg, r'(?i)\b(info|notice)\b')
59+
.match_log_debug = match(msg, r'(?i)\b(debug|trace|verbose)\b')
60+
.match_log_level_bracket = match(msg, r'\[(ERROR|WARN|INFO|DEBUG|TRACE)\]')
61+
62+
# Identifier patterns
63+
.match_uuid = match(msg, r'[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}')
64+
.match_hex_string = match(msg, r'\b[0-9a-fA-F]{16,}\b')
65+
.match_sha256 = match(msg, r'\b[0-9a-fA-F]{64}\b')
66+
.match_md5 = match(msg, r'\b[0-9a-fA-F]{32}\b')
67+
.match_base64 = match(msg, r'[A-Za-z0-9+/]{20,}={0,2}')
68+
69+
# HTTP patterns
70+
.match_http_method = match(msg, r'\b(GET|POST|PUT|DELETE|PATCH|HEAD|OPTIONS)\b')
71+
.match_http_status = match(msg, r'\b[1-5][0-9]{2}\b')
72+
.match_user_agent = match(msg, r'Mozilla/[0-9.]+')
73+
.match_content_type = match(msg, r'(?i)content-type:\s*[a-z]+/[a-z0-9.+-]+')
74+
.match_auth_bearer = match(msg, r'(?i)bearer\s+[a-zA-Z0-9._~+/=-]+')
75+
76+
# AWS patterns
77+
.match_aws_arn = match(msg, r'arn:aws:[a-zA-Z0-9-]+:[a-z0-9-]*:[0-9]*:[a-zA-Z0-9/_-]+')
78+
.match_aws_account = match(msg, r'\b[0-9]{12}\b')
79+
.match_aws_region = match(msg, r'\b(us|eu|ap|sa|ca|me|af)-(north|south|east|west|central|northeast|southeast)-[0-9]\b')
80+
.match_s3_bucket = match(msg, r's3://[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]')
81+
.match_aws_access_key = match(msg, r'\bAKIA[0-9A-Z]{16}\b')
82+
83+
# Kubernetes patterns
84+
.match_k8s_pod = match(msg, r'[a-z0-9]+(-[a-z0-9]+)*-[a-z0-9]{5,10}')
85+
.match_k8s_namespace = match(msg, r'(?i)namespace[=:/]\s*[a-z0-9-]+')
86+
.match_k8s_container = match(msg, r'(?i)container[=:/]\s*[a-z0-9-]+')
87+
.match_k8s_image = match(msg, r'[a-z0-9.-]+/[a-z0-9._-]+:[a-z0-9._-]+')
88+
.match_k8s_label = match(msg, r'[a-z0-9A-Z._-]+/[a-z0-9A-Z._-]+')
89+
90+
# JSON and structured data patterns
91+
.match_json_obj = match(msg, r'\{[^{}]*"[^"]+"\s*:')
92+
.match_json_array = match(msg, r'\[[^\[\]]*\]')
93+
.match_key_value = match(msg, r'[a-zA-Z_][a-zA-Z0-9_]*=[^\s,]+')
94+
.match_quoted_string = match(msg, r'"[^"\\]*(?:\\.[^"\\]*)*"')
95+
.match_numeric_value = match(msg, r'\b-?[0-9]+\.?[0-9]*(?:[eE][+-]?[0-9]+)?\b')
96+
97+
# File path patterns
98+
.match_unix_path = match(msg, r'/(?:[a-zA-Z0-9._-]+/)*[a-zA-Z0-9._-]+')
99+
.match_windows_path = match(msg, r'[A-Z]:\\(?:[a-zA-Z0-9._-]+\\)*[a-zA-Z0-9._-]+')
100+
.match_file_ext = match(msg, r'\.[a-zA-Z0-9]{1,10}$')
101+
.match_log_file = match(msg, r'[a-zA-Z0-9._-]+\.log(?:\.[0-9]+)?')
102+
.match_pid_file = match(msg, r'/var/run/[a-zA-Z0-9._-]+\.pid')
103+
104+
# Error and exception patterns
105+
.match_stack_trace = match(msg, r'at [a-zA-Z0-9.$_]+\([A-Za-z0-9._]+:[0-9]+\)')
106+
.match_exception_class = match(msg, r'\b[A-Z][a-zA-Z0-9]*(?:Error|Exception|Fault)\b')
107+
.match_errno = match(msg, r'(?i)errno[=:\s]+[0-9]+')
108+
.match_segfault = match(msg, r'(?i)segmentation fault|sigsegv|signal 11')
109+
.match_oom = match(msg, r'(?i)out of memory|oom|cannot allocate')
110+
111+
# Database patterns
112+
.match_sql_select = match(msg, r'(?i)\bSELECT\b.+\bFROM\b')
113+
.match_sql_insert = match(msg, r'(?i)\bINSERT\s+INTO\b')
114+
.match_sql_update = match(msg, r'(?i)\bUPDATE\b.+\bSET\b')
115+
.match_sql_delete = match(msg, r'(?i)\bDELETE\s+FROM\b')
116+
.match_connection_string = match(msg, r'(?i)(mysql|postgres|mongodb|redis)://[^\s]+')
117+
118+
# Security patterns
119+
.match_ssn = match(msg, r'\b[0-9]{3}-[0-9]{2}-[0-9]{4}\b')
120+
.match_phone_us = match(msg, r'\b\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b')
121+
.match_jwt = match(msg, r'eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+')
122+
.match_private_key = match(msg, r'-----BEGIN (?:RSA )?PRIVATE KEY-----')
123+
.match_api_key_generic = match(msg, r'(?i)(api[_-]?key|apikey|secret[_-]?key)\s*[=:]\s*\S+')
124+
125+
# Encoding and format patterns
126+
.match_xml_tag = match(msg, r'<[a-zA-Z][a-zA-Z0-9]*(?:\s[^>]*)?\s*/?>')
127+
.match_html_entity = match(msg, r'&(?:#[0-9]+|#x[0-9a-fA-F]+|[a-zA-Z]+);')
128+
.match_csv_row = match(msg, r'^(?:"[^"]*"|[^,]*)(,(?:"[^"]*"|[^,]*))*$')
129+
.match_semver = match(msg, r'\bv?[0-9]+\.[0-9]+\.[0-9]+(?:-[a-zA-Z0-9.]+)?(?:\+[a-zA-Z0-9.]+)?\b')
130+
.match_mime_type = match(msg, r'\b[a-z]+/[a-z0-9][a-z0-9!#$&\-^_.+]*\b')
131+
132+
##
133+
## Sinks
134+
##
135+
136+
sinks:
137+
prometheus:
138+
type: "prometheus_exporter"
139+
inputs: ["internal_metrics"]
140+
address: "0.0.0.0:9090"
141+
142+
blackhole:
143+
type: "blackhole"
144+
print_interval_secs: 0
145+
inputs: ["remap_regex"]
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
optimization_goal: ingress_throughput
2+
3+
target:
4+
name: vector
5+
command: /usr/bin/vector
6+
cpu_allotment: 6
7+
memory_allotment: 8GiB
8+
9+
environment:
10+
VECTOR_THREADS: 4
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
generator:
2+
- http:
3+
seed:
4+
[
5+
2,
6+
3,
7+
5,
8+
7,
9+
11,
10+
13,
11+
17,
12+
19,
13+
23,
14+
29,
15+
31,
16+
37,
17+
41,
18+
43,
19+
47,
20+
53,
21+
59,
22+
61,
23+
67,
24+
71,
25+
73,
26+
79,
27+
83,
28+
89,
29+
97,
30+
101,
31+
103,
32+
107,
33+
109,
34+
113,
35+
127,
36+
131
37+
]
38+
headers:
39+
dd-api-key: "DEADBEEF"
40+
target_uri: "http://localhost:8282/v1/input"
41+
bytes_per_second: "500 Mb"
42+
parallel_connections: 10
43+
method:
44+
post:
45+
variant: "datadog_log"
46+
maximum_prebuild_cache_size_bytes: "256 Mb"
47+
target_metrics:
48+
- prometheus: # internal telemetry
49+
uri: "http://127.0.0.1:9090/metrics"

0 commit comments

Comments
 (0)