forked from opensearch-project/observability-stack
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipelines.template.yaml
More file actions
148 lines (142 loc) · 4.99 KB
/
Copy pathpipelines.template.yaml
File metadata and controls
148 lines (142 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Data Prepper Pipeline Configuration
# Processes and transforms logs, traces, and metrics before writing to OpenSearch
# Main OTLP pipeline - receives all telemetry and routes by type
otlp-pipeline:
delay: 10
source:
# OTLP source receives telemetry via OpenTelemetry Protocol
otlp:
# Listen on port 21890 for gRPC connections from OpenTelemetry Collector
port: 21890
# Disable SSL for development
ssl: false
# Route telemetry by signal type
route:
- logs: "getEventType() == \"LOG\""
- traces: "getEventType() == \"TRACE\""
# Send to appropriate sub-pipelines
sink:
- pipeline:
name: "otel-logs-pipeline"
routes:
- "logs"
- pipeline:
name: "otel-traces-pipeline"
routes:
- "traces"
# Log processing pipeline
# Receives logs from main pipeline and writes to OpenSearch
otel-logs-pipeline:
workers: 5
delay: 10
source:
pipeline:
name: "otlp-pipeline"
buffer:
bounded_blocking:
processor:
- copy_values:
entries:
- from_key: "time"
to_key: "@timestamp"
# Write processed logs to OpenSearch
sink:
- opensearch:
hosts: ["OPENSEARCH_PROTOCOL://OPENSEARCH_HOST:OPENSEARCH_PORT"]
username: OPENSEARCH_USER
password: OPENSEARCH_PASSWORD
# Disable SSL verification for development
insecure: true
# Use log analytics index type for automatic index management
index_type: log-analytics-plain
# Trace processing pipeline
# Receives traces from main pipeline and distributes to raw and service map pipelines
otel-traces-pipeline:
delay: 100
source:
pipeline:
name: "otlp-pipeline"
# Send traces to both raw storage and service map generation
sink:
- pipeline:
name: "traces-raw-pipeline"
- pipeline:
name: "service-map-pipeline"
# Raw trace storage pipeline
# Stores individual trace spans in OpenSearch
traces-raw-pipeline:
source:
pipeline:
name: "otel-traces-pipeline"
processor:
# Process raw trace data for OpenSearch storage
- otel_traces:
sink:
- opensearch:
hosts: ["OPENSEARCH_PROTOCOL://OPENSEARCH_HOST:OPENSEARCH_PORT"]
username: OPENSEARCH_USER
password: OPENSEARCH_PASSWORD
insecure: true
# Use trace analytics index type for automatic index management
index_type: trace-analytics-plain-raw
# Service map generation pipeline (APM)
# Builds service dependency maps and RED metrics from trace relationships.
# Splits into two sub-pipelines so we can strip high-cardinality `randomKey`
# labels from the Cortex branch without polluting the service-map branch.
service-map-pipeline:
delay: 100
source:
pipeline:
name: "otel-traces-pipeline"
processor:
# APM service map processor - generates topology, service details, and RED metrics
- otel_apm_service_map:
group_by_attributes: [telemetry.sdk.language]
window_duration: 10s
route:
- otel_apm_service_map_route: 'getEventType() == "SERVICE_MAP"'
- service_processed_metrics: 'getEventType() == "METRIC"'
sink:
- opensearch:
hosts: ["OPENSEARCH_PROTOCOL://OPENSEARCH_HOST:OPENSEARCH_PORT"]
username: OPENSEARCH_USER
password: OPENSEARCH_PASSWORD
index_type: otel-v2-apm-service-map
routes: [otel_apm_service_map_route]
insecure: true
# Fan out service-derived RED metrics to a dedicated pipeline that strips
# the per-event randomKey UUID before Cortex rejects it for cardinality.
- pipeline:
name: "service-metrics-cortex-pipeline"
routes: [service_processed_metrics]
# Strips the per-event `randomKey` UUID from span-derived RED metrics
# before the Cortex remote-write sink. Without this, data-prepper tags
# every latency bucket with a fresh UUID, blowing past Cortex's default
# 50k series-per-metric limit in minutes.
service-metrics-cortex-pipeline:
delay: 100
source:
pipeline:
name: "service-map-pipeline"
processor:
# Drop only the per-event randomKey. Do NOT strip telemetry.sdk.language:
# otel_apm_service_map is configured to group_by it and emits one sample
# per (service, operation, remoteService, sdk.language) per window, so
# removing the label collapses multi-language services onto the same
# series+timestamp and Cortex rejects them as duplicate samples.
# Note: data-prepper event keys use JSON-pointer-style paths; labels
# set by otel_apm_service_map land under /attributes/<key>, so both
# the top-level and attributes-scoped paths are listed to be safe.
- delete_entries:
with_keys:
- "/attributes/randomKey"
- "randomKey"
sink:
# Route RED metrics to local Cortex via remote write.
# Cortex's distributor push endpoint is /api/v1/push (not Prometheus's /api/v1/write).
- prometheus:
url: "http://PROMETHEUS_HOST:PROMETHEUS_PORT/api/v1/push"
insecure: true
threshold:
max_events: 500
flush_interval: 5s