Skip to content

Commit f9bd787

Browse files
authored
Timeplus inputs (#582)
1 parent b617718 commit f9bd787

11 files changed

Lines changed: 1219 additions & 3 deletions

docs/datadog-input.md

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
# Datadog Input
2+
3+
## Overview
4+
5+
The **Datadog Input** emulates Datadog Intake endpoints for **metrics, logs and processes** by listening on a HTTP port and receiving events posted by Datadog clients / Agents. Datadog users can simply configure their clients / agents to use Datadog Input IP address and port in Timeplus, and the input will handle the rest.
6+
7+
## Create Datadog Input
8+
9+
```sql
10+
CREATE INPUT <datadog_input>
11+
SETTINGS
12+
type='datadog',
13+
metrics_target_stream=<metrics_target_stream_name>,
14+
process_target_stream=<process_target_stream_name>,
15+
logs_target_stream=<logs_target_stream_name>,
16+
tcp_port=<bind_tcp_port>,
17+
listen_host=<listen_host>
18+
COMMENT '<comments>';
19+
```
20+
21+
**Settings**
22+
- `type`: Type to indicate the input protocol. `'datadog'`.
23+
- `metrics_target_stream`: The name of the target stream that stores incoming metric data. Optional. If it is empty, the incoming metrics data will be discarded.
24+
- `processes_target_stream`: The name of the target stream that stores incoming process data. Optional. If it is empty, the incoming process data will be discarded.
25+
- `logs_target_stream`: The name of the target stream that stores incoming log data. Optional. If it is empty, the incoming logs data will be discarded.
26+
- `tcp_port`: The TCP port on which the input server listens for incoming connections. Default is **9997**.
27+
- `listen_host`: The network interface or host address on which the input server listens. Default is **'0.0.0.0'**.
28+
29+
:::note
30+
The target streams for Datadog Input requires fixed schema (without [retention policies](/append-stream-ttl)), shown below.
31+
32+
```sql
33+
-- Metrics
34+
CREATE STREAM datadog_metrics_target_stream
35+
(
36+
metric string, -- required
37+
`points.value` float64, -- required
38+
`points.timestamp` int64,
39+
type enum8( -- required
40+
'UNSPECIFIED'=0,
41+
'COUNT' = 1,
42+
'RATE' = 2,
43+
'GAUGE' = 3,
44+
),
45+
`resources.type` array(low_cardinality(string)),
46+
`resources.name` array(low_cardinality(string)),
47+
tags array(low_cardinality(string)),
48+
unit string,
49+
source_type_name string,
50+
interval int64,
51+
`metadata.origin.origin_product` uint32,
52+
`metadata.origin.origin_category` uint32,
53+
`metadata.origin.origin_service` uint32
54+
);
55+
56+
-- Logs
57+
CREATE STREAM datadog_logs_target_stream (
58+
message string, -- required
59+
status float64, -- required
60+
timestamp int64,
61+
hostname low_cardinality(string), -- required
62+
service low_cardinality(string),
63+
ddtags array(low_cardinality(string))
64+
)
65+
66+
-- Processes.
67+
-- It is a very sparse stream. User can remove the optional columns
68+
-- according to the real Datadog agent configuration / deployment
69+
CREATE STREAM datadog_processes_target_stream (
70+
hostname string,
71+
`processes.key` uint32,
72+
`processes.pid` int32, -- required
73+
`processes.host.id` int64,
74+
`processes.host.org_id` int32,
75+
`processes.host.name` string,
76+
`processes.host.all_tags` array(string),
77+
`processes.host.num_cpus` int32,
78+
`processes.host.total_memory` int64,
79+
`processes.host.tag_index` int32,
80+
`processes.host.tags_modified` int64,
81+
`processes.command.args` array(string),
82+
`processes.command.cwd` string,
83+
`processes.command.root` string,
84+
`processes.command.on_disk` bool,
85+
`processes.command.ppid` int32,
86+
`processes.command.pgroup` int32,
87+
`processes.command.exe` string,
88+
`processes.command.comm` string,
89+
`processes.user.name` string,
90+
`processes.user.uid` int32,
91+
`processes.user.gid` int32,
92+
`processes.user.euid` int32,
93+
`processes.user.egid` int32,
94+
`processes.user.suid` int32,
95+
`processes.user.sgid` int32,
96+
`processes.memory.rss` uint64,
97+
`processes.memory.vms` uint64,
98+
`processes.memory.swap` uint64,
99+
`processes.memory.shared` uint64,
100+
`processes.memory.text` uint64,
101+
`processes.memory.lib` uint64,
102+
`processes.memory.data` uint64,
103+
`processes.memory.dirty` uint64,
104+
`processes.cpu.last_cpu` string,
105+
`processes.cpu.total_pct` float64,
106+
`processes.cpu.user_pct` float64,
107+
`processes.cpu.system_pct` float64,
108+
`processes.cpu.num_threads` int32,
109+
`processes.cpu.nice` int32,
110+
`processes.cpu.user_time` int64,
111+
`processes.cpu.system_time` int64,
112+
`processes.create_time` int64,
113+
`processes.open_fd_count` int32,
114+
`processes.state` enum8('U' = 0, 'D' = 1, 'R' = 2, 'S' = 3, 'T' = 4, 'W' = 5, 'X' = 6, 'Z' = 7),
115+
`processes.io_stat.read_rate` float64,
116+
`processes.io_stat.write_rate` float64,
117+
`processes.io_stat.read_bytes_rate` float64,
118+
`processes.io_stat.write_bytes_rate` float64,
119+
`processes.container_id` string,
120+
`processes.container_key` uint32,
121+
`processes.voluntary_ctx_switches` uint64,
122+
`processes.involuntary_ctx_switches` uint64,
123+
`processes.byte_key` bytes,
124+
`processes.container_byte_key` bytes,
125+
`processes.ns_pid` int32,
126+
`processes.networks.connection_rate` float64,
127+
`processes.networks.bytes_rate` float64,
128+
`processes.process_context` array(string),
129+
`processes.tags` array(string),
130+
`processes.language` enum8(
131+
'LANGUAGE_UNKNOWN' = 0,
132+
'LANGUAGE_JAVA' = 1,
133+
'LANGUAGE_NODE' = 2,
134+
'LANGUAGE_PYTHON' = 3,
135+
'LANGUAGE_RUBY' = 4,
136+
'LANGUAGE_DOTNET' = 5,
137+
'LANGUAGE_GO' = 6,
138+
'LANGUAGE_CPP' = 7,
139+
'LANGUAGE_PHP' = 8
140+
),
141+
`processes.port_info.tcp` array(int32),
142+
`processes.port_info.udp` array(int32),
143+
`processes.service_discovery.generated_service_name.name` string,
144+
`processes.service_discovery.generated_service_name.source` int32,
145+
`processes.service_discovery.dd_service_name.name` string,
146+
`processes.service_discovery.dd_service_name.source` int32,
147+
`processes.service_discovery.additional_generated_names.name` array(string),
148+
`processes.service_discovery.additional_generated_names.source` array(uint8),
149+
`processes.service_discovery.tracer_metadata.runtime_id` array(string),
150+
`processes.service_discovery.tracer_metadata.service_name` array(string),
151+
`processes.service_discovery.apm_instrumentation` bool,
152+
`processes.injection_state` uint8,
153+
`host.id` int64,
154+
`host.org_id` int32,
155+
`host.name` string,
156+
`host.all_tags` array(string),
157+
`host.num_cpus` int32,
158+
`host.total_memory` int64,
159+
`host.tag_index` int32,
160+
`host.tags_modified` int64,
161+
`info.uuid` string,
162+
`info.os.name` string,
163+
`info.os.platform` string,
164+
`info.os.family` string,
165+
`info.os.version` string,
166+
`info.os.kernel_version` string,
167+
`info.cpus.number` array(int32),
168+
`info.cpus.vendor` array(low_cardinality(string)),
169+
`info.cpus.family` array(low_cardinality(string)),
170+
`info.cpus.model` array(low_cardinality(string)),
171+
`info.cpus.physical_id` array(string),
172+
`info.cpus.core_id` array(low_cardinality(string)),
173+
`info.cpus.cores` array(int32),
174+
`info.cpus.mhz` array(int64),
175+
`info.cpus.cache_size` array(int32),
176+
`info.total_memory` int64,
177+
group_id int32,
178+
group_size int32,
179+
containers.type string,
180+
containers.id string,
181+
containers.cpu_limit float64,
182+
containers.memory_limit uint64,
183+
containers.state enum8('unknown' = 0, 'created' = 1, 'restarting' = 2, 'running' = 3, 'paused' = 4, 'exited' = 5, 'dead' = 6),
184+
containers.health enum8('unknownHealth' = 0, 'starting' = 1, 'healthy' = 2, 'unhealthy' = 3),
185+
containers.created int64,
186+
containers.rbps float64,
187+
containers.wbps float64,
188+
containers.key uint32,
189+
containers.net_rcvd_ps float64,
190+
containers.net_sent_ps float64,
191+
containers.net_rcvd_bps float64,
192+
containers.net_sent_bps float64,
193+
containers.user_pct float64,
194+
containers.system_pct float64,
195+
containers.total_pct float64,
196+
containers.mem_rss uint64,
197+
containers.mem_cache uint64,
198+
`containers.host.id` int64,
199+
`containers.host.org_id` int32,
200+
`containers.host.name` string,
201+
`containers.host.all_tags` array(string),
202+
`containers.host.num_cpus` int32,
203+
`containers.host.total_memory` int64,
204+
`containers.host.tag_index` int32,
205+
`containers.host.tags_modified` int64,
206+
containers.started int64,
207+
containers.byte_key bytes,
208+
containers.tags array(string),
209+
containers.addresses.ip array(string),
210+
containers.addresses.port array(int32),
211+
containers.addresses.protocol array(uint8),
212+
containers.thread_count uint64,
213+
containers.thread_limit uint64,
214+
containers.mem_usage uint64,
215+
containers.cpu_usage_ns float64,
216+
containers.mem_accounted uint64,
217+
containers.cpu_request float64,
218+
containers.memory_request uint64,
219+
containers.repo_digest string,
220+
network_id string,
221+
container_host_type int32,
222+
hint_mask int32
223+
);
224+
```
225+
:::
226+
227+
:::info
228+
You may want to fine-tune the target stream when provisioning it, especially if its historical store is enabled and it will serve applications. This includes settings such as:
229+
230+
- [Compression codec](/append-stream-codecs)
231+
- [Retention policies](/append-stream-ttl)
232+
- [Indexes](/append-stream-indexes)
233+
234+
Alternatively, you can disable the historical store entirely by using `SETTIGNS storage_type='streaming'` and use the target stream as a persistent, queryable queue. In this scenario, fine-tuning compression, retention, and indexes is not necessary.
235+
:::
236+
237+
**Example**:
238+
239+
```sql
240+
-- Create Datadog input and write the incoming data to target streams
241+
CREATE INPUT datadog_input
242+
SETTINGS
243+
type = 'datadog',
244+
tcp_port = 9090,
245+
metrics_target_stream = 'datadog_metrics_target_stream',
246+
logs_target_stream = 'datadog_logs_target_stream',
247+
process_target_stream = 'datadog_processes_target_stream'
248+
COMMNET 'Datadog Input Test'
249+
```
250+
251+
## DatadogAgent Configuration
252+
253+
DatadogAgent users can simply update `datadog.yaml` to point to the Datadog input in Timeplus.
254+
255+
**Example**
256+
```
257+
% cat /etc/datadog-agent/datadog.yaml
258+
259+
dd_url: http://32q80q8.timeplus:9090
260+
logs_enabled: true
261+
logs_config:
262+
container_collect_all: true
263+
logs_dd_url: http://32q80q8.timeplus:9090
264+
process_config: enabled: "true"
265+
process_dd_url: http://32q80q8.timeplus:9090
266+
```

docs/elastic-input.md

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# ElasticSearch HTTP Input
2+
3+
## Overview
4+
5+
The **ElasticSearch HTTP Input** emulates ElasticSearch `/_bulk` endpoint by listening on a HTTP port and receiving events posted by ElasticSearch clients / Elastic Beats. ElasticSearch users can simply configure their clients / Beats use ElasticSearch HTTP Input IP address and port in Timeplus, and the input will handle the rest.
6+
7+
## Create ElasticSearch HTTP Input
8+
9+
```sql
10+
CREATE INPUT <elastic_input>
11+
SETTINGS
12+
type='elastic',
13+
target_stream=<target_stream_name>,
14+
tcp_port=<bind_tcp_port>,
15+
listen_host=<listen_host>
16+
COMMENT '<comments>';
17+
```
18+
19+
**Settings**
20+
- `type`: Type to indicate the input protocol. `'elastic'`.
21+
- `target_stream`: The name of the target stream that stores incoming data after ElasticSearch `/_bulk` endpoint protocol parsing.
22+
- `tcp_port`: The TCP port on which the input server listens for incoming connections. Default is **9997**.
23+
- `listen_host`: The network interface or host address on which the input server listens. Default is **'0.0.0.0'**.
24+
25+
:::note
26+
The target stream for ElasticSearch HTTP Input requires a fixed schema, shown below.
27+
28+
```sql
29+
CREATE STREAM elastic_target_stream
30+
(
31+
_raw string, -- required
32+
_index string, -- required
33+
_id string -- required
34+
);
35+
```
36+
:::
37+
38+
:::info
39+
You may want to fine-tune the target stream when provisioning it, especially if its historical store is enabled and it will serve applications. This includes settings such as:
40+
41+
- [Compression codec](/append-stream-codecs)
42+
- [Retention policies](/append-stream-ttl)
43+
- [Indexes](/append-stream-indexes)
44+
45+
Alternatively, you can disable the historical store entirely by using `SETTIGNS storage_type='streaming'` and use the target stream as a persistent, queryable queue. In this scenario, fine-tuning compression, retention, and indexes is not necessary.
46+
:::
47+
48+
**Example**:
49+
50+
```sql
51+
-- Create ElasticSearch input and write the incoming data to target stream
52+
CREATE INPUT splunk_hec_input
53+
SETTINGS
54+
type = 'elastic',
55+
tcp_port = 9200,
56+
target_stream = 'elastic_target_stream'
57+
COMMNET 'Elastic HTTP Input Test'
58+
```
59+
60+
## ElasticSearch Clients / Beats Configuration
61+
62+
ElasticSearch users can simply update the `/_bulk` endpoint clients / beats to use the Elastic HTTP input IP:Port in Timeplus to post events.
63+
64+
**Example**
65+
66+
The following curl emulates a Elastic client to post events to Elastic HTTP Input in Timeplus.
67+
68+
```
69+
curl -X POST "4qfar3.timeplus:9200/_bulk" \
70+
-H "Content-Type: application/x-ndjson" \
71+
-d '{"index":{"_index":"test","_id":"1"}}\n{"field1":"value1"}\n
72+
{"delete":{"_index":"test","_id":"2"}}\n
73+
{"create":{"_index":"test","_id":"3"}}\n{"field1":"value3"}\n
74+
{"update":{"_id":"1","_index":"test"}}\n{"doc":{"field2":"value2"}}\n'
75+
```
76+
77+
The following illustrate filebeat configuration change to use Elastic Input in Timeplus.
78+
79+
```
80+
% /etc/filebeat/filebeat.yml
81+
...
82+
output.elasticsearch:
83+
hosts: ["http://4qfar3.timeplus:9200"] # <<< Point to Elastic Input in Timeplus
84+
username: "timeplus"
85+
password: "timeplus_password"
86+
```

docs/enterprise-v3.1.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Key highlights of the Timeplus 3.1 release include:
66

77
- **Timeplus Inputs**
88

9-
Timeplus input is a new concept which allows users to push / stream data to the inputs by leveraging existing data ecosystem and tools.
9+
Timeplus [input](/inputs) is a new concept which allows users to push / stream data to the inputs by leveraging existing data ecosystem and tools.
1010

1111
In this release, the following inputs are supported.
1212

@@ -25,7 +25,7 @@ Key highlights of the Timeplus 3.1 release include:
2525
- **Performance Enhancements**
2626

2727
- Bidirectional direct join for Mutable streams
28-
- Historical data backfill concurrency control backfill_max_threads query setting
28+
- Historical data backfill concurrency control `backfill_max_threads` query setting
2929
- Big performance improvements on Protobuf Kafka record streaming parsing.
3030
- Better HTTP Connection Pooling
3131
- Better Materialized View Workload Rebalance

0 commit comments

Comments
 (0)