Skip to content

Commit bb7985e

Browse files
author
Derek
committed
feat: add gRPC transport with Vector wire protocol compatibility
Replace zenoh transport with tonic-based gRPC transport supporting both DFE native protocol and Vector v2 PushEvents wire compatibility.
1 parent 5306dcf commit bb7985e

19 files changed

Lines changed: 1566 additions & 529 deletions

File tree

Cargo.toml

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ config-postgres = ["config", "sqlx", "tokio", "serde_json"]
6363
transport = ["tokio", "serde_json", "rmp-serde", "chrono", "async-trait"]
6464
transport-memory = ["transport"]
6565
transport-kafka = ["transport", "rdkafka"]
66-
transport-zenoh = ["transport", "zenoh"]
67-
transport-all = ["transport-memory", "transport-kafka", "transport-zenoh"]
66+
transport-grpc = ["transport", "dep:tonic", "dep:prost", "dep:prost-types", "dep:tonic-build", "dep:prost-build"]
67+
transport-grpc-vector-compat = ["transport-grpc"]
68+
transport-all = ["transport-memory", "transport-kafka", "transport-grpc"]
6869

6970
# Secrets management
7071
secrets = ["tokio", "serde_json", "async-trait", "parking_lot", "base64", "dirs", "tracing"]
@@ -73,7 +74,7 @@ secrets-aws = ["secrets", "aws-config", "aws-sdk-secretsmanager"]
7374
secrets-all = ["secrets-vault", "secrets-aws"]
7475

7576
# Full feature set
76-
full = ["config", "config-reload", "logger", "metrics", "otel", "otel-metrics", "env", "runtime", "http", "http-server", "spool", "tiered-sink", "resilience", "database", "cache", "transport-all", "secrets-all", "directory-config", "directory-config-git", "deployment", "version-check"]
77+
full = ["config", "config-reload", "logger", "metrics", "otel", "otel-metrics", "env", "runtime", "http", "http-server", "spool", "tiered-sink", "resilience", "database", "cache", "transport-all", "transport-grpc-vector-compat", "secrets-all", "directory-config", "directory-config-git", "deployment", "version-check"]
7778

7879
[dependencies]
7980
# Serialisation (always needed)
@@ -145,8 +146,10 @@ rmp-serde = { version = ">=1.3.1, <2", optional = true }
145146
# Kafka transport
146147
rdkafka = { version = ">=0.38.0, <0.39", optional = true }
147148

148-
# Zenoh transport
149-
zenoh = { version = ">=1.7.2, <2", optional = true }
149+
# gRPC transport (tonic + prost)
150+
tonic = { version = ">=0.12, <0.14", features = ["gzip"], optional = true }
151+
prost = { version = ">=0.13, <0.14", optional = true }
152+
prost-types = { version = ">=0.13, <0.14", optional = true }
150153

151154
# Spool (disk-backed async queue) - yaque is async-native and maintained
152155
yaque = { version = ">=0.6.6, <0.7", optional = true }
@@ -173,6 +176,10 @@ vaultrs = { version = ">=0.7, <0.8", optional = true }
173176
aws-config = { version = ">=1.6, <2", optional = true }
174177
aws-sdk-secretsmanager = { version = ">=1.72, <2", optional = true }
175178

179+
[build-dependencies]
180+
tonic-build = { version = ">=0.12, <0.14", optional = true }
181+
prost-build = { version = ">=0.13, <0.14", optional = true }
182+
176183
[dev-dependencies]
177184
tokio = { version = ">=1.49.0, <2", features = ["full", "test-util"] }
178185
tempfile = ">=3.24.0, <4"

build.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// SPDX-License-Identifier: FSL-1.1-ALv2
2+
// Copyright (c) 2026 HYPERI PTY LIMITED
3+
//
4+
// Protobuf code generation for gRPC transport.
5+
// Only runs when transport-grpc or transport-grpc-vector-compat features are enabled.
6+
7+
fn main() -> Result<(), Box<dyn std::error::Error>> {
8+
// DFE native transport proto
9+
#[cfg(feature = "transport-grpc")]
10+
{
11+
tonic_build::configure()
12+
.build_server(true)
13+
.build_client(true)
14+
.compile_protos(&["proto/dfe/transport/v1/dfe_transport.proto"], &["proto"])?;
15+
}
16+
17+
// Vector wire protocol compat (vendored protos)
18+
#[cfg(feature = "transport-grpc-vector-compat")]
19+
{
20+
// Compile event.proto first (message types only, no services)
21+
prost_build::Config::new()
22+
.compile_protos(&["proto/vector/event.proto"], &["proto/vector"])?;
23+
24+
// Compile vector.proto (service + request/response types)
25+
// extern_path tells prost the event types live in the sibling module
26+
tonic_build::configure()
27+
.build_server(true)
28+
.build_client(true)
29+
.extern_path(".event", "crate::transport::vector_compat::proto::event")
30+
.compile_protos(&["proto/vector/vector.proto"], &["proto/vector"])?;
31+
}
32+
33+
Ok(())
34+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// SPDX-License-Identifier: FSL-1.1-ALv2
2+
// Copyright (c) 2026 HYPERI PTY LIMITED
3+
//
4+
// DFE native transport protocol.
5+
// Lightweight bulk bytes transfer with format hints.
6+
7+
syntax = "proto3";
8+
9+
package dfe.transport.v1;
10+
11+
// DFE inter-service transport.
12+
service DfeTransport {
13+
// Push a batch of data to the receiver.
14+
rpc Push(PushRequest) returns (PushResponse);
15+
16+
// Health check.
17+
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
18+
}
19+
20+
// Data format hint for the receiver.
21+
enum Format {
22+
FORMAT_AUTO = 0;
23+
FORMAT_JSON = 1;
24+
FORMAT_MSGPACK = 2;
25+
FORMAT_ARROW_IPC = 3;
26+
}
27+
28+
message PushRequest {
29+
// Raw payload bytes (JSON lines, MsgPack, or Arrow IPC batch).
30+
bytes payload = 1;
31+
32+
// Format hint so the receiver can skip detection.
33+
Format format = 2;
34+
35+
// Routing metadata (e.g. topic, org_id, source).
36+
map<string, string> metadata = 3;
37+
}
38+
39+
message PushResponse {
40+
// Number of events accepted by the receiver.
41+
uint64 accepted = 1;
42+
}
43+
44+
// Serving status.
45+
enum ServingStatus {
46+
SERVING = 0;
47+
NOT_SERVING = 1;
48+
}
49+
50+
message HealthCheckRequest {}
51+
52+
message HealthCheckResponse {
53+
ServingStatus status = 1;
54+
}

proto/vector/event.proto

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
// Vendored from https://github.com/vectordotdev/vector
2+
// Source: lib/vector-core/proto/event.proto
3+
// Pinned: 2026-03-02 (stable since 2021, rarely changes)
4+
5+
syntax = "proto3";
6+
7+
package event;
8+
9+
import "google/protobuf/timestamp.proto";
10+
11+
message EventArray {
12+
oneof events {
13+
LogArray logs = 1;
14+
MetricArray metrics = 2;
15+
TraceArray traces = 3;
16+
}
17+
}
18+
19+
message LogArray {
20+
repeated Log logs = 1;
21+
}
22+
23+
message MetricArray {
24+
repeated Metric metrics = 1;
25+
}
26+
27+
message TraceArray {
28+
repeated Trace traces = 1;
29+
}
30+
31+
message EventWrapper {
32+
oneof event {
33+
Log log = 1;
34+
Metric metric = 2;
35+
Trace trace = 3;
36+
}
37+
}
38+
39+
message Log {
40+
// Deprecated, use value instead
41+
map<string, Value> fields = 1;
42+
Value value = 2;
43+
Value metadata = 3 [deprecated = true];
44+
Metadata metadata_full = 4;
45+
}
46+
47+
message Trace {
48+
map<string, Value> fields = 1;
49+
Value metadata = 2 [deprecated = true];
50+
Metadata metadata_full = 3;
51+
}
52+
53+
message ValueMap {
54+
map<string, Value> fields = 1;
55+
}
56+
57+
message ValueArray {
58+
repeated Value items = 1;
59+
}
60+
61+
enum ValueNull {
62+
NULL_VALUE = 0;
63+
}
64+
65+
message Value {
66+
reserved 3;
67+
oneof kind {
68+
bytes raw_bytes = 1;
69+
google.protobuf.Timestamp timestamp = 2;
70+
int64 integer = 4;
71+
double float = 5;
72+
bool boolean = 6;
73+
ValueMap map = 7;
74+
ValueArray array = 8;
75+
ValueNull null = 9;
76+
}
77+
}
78+
79+
message DatadogOriginMetadata {
80+
optional uint32 origin_product = 1;
81+
optional uint32 origin_category = 2;
82+
optional uint32 origin_service = 3;
83+
}
84+
85+
message Secrets {
86+
map<string, string> entries = 1;
87+
}
88+
89+
message OutputId {
90+
string component = 1;
91+
optional string port = 2;
92+
}
93+
94+
message Metadata {
95+
Value value = 1;
96+
DatadogOriginMetadata datadog_origin_metadata = 2;
97+
optional string source_id = 3;
98+
optional string source_type = 4;
99+
OutputId upstream_id = 5;
100+
Secrets secrets = 6;
101+
bytes source_event_id = 7;
102+
}
103+
104+
message Metric {
105+
string name = 1;
106+
google.protobuf.Timestamp timestamp = 2;
107+
map<string, string> tags_v1 = 3;
108+
map<string, TagValues> tags_v2 = 20;
109+
enum Kind {
110+
Incremental = 0;
111+
Absolute = 1;
112+
}
113+
Kind kind = 4;
114+
oneof value {
115+
Counter counter = 5;
116+
Gauge gauge = 6;
117+
Set set = 7;
118+
Distribution1 distribution1 = 8;
119+
AggregatedHistogram1 aggregated_histogram1 = 9;
120+
AggregatedSummary1 aggregated_summary1 = 10;
121+
Distribution2 distribution2 = 12;
122+
AggregatedHistogram2 aggregated_histogram2 = 13;
123+
AggregatedSummary2 aggregated_summary2 = 14;
124+
Sketch sketch = 15;
125+
AggregatedHistogram3 aggregated_histogram3 = 16;
126+
AggregatedSummary3 aggregated_summary3 = 17;
127+
}
128+
string namespace = 11;
129+
uint32 interval_ms = 18;
130+
Value metadata = 19 [deprecated = true];
131+
Metadata metadata_full = 21;
132+
}
133+
134+
message TagValues {
135+
repeated TagValue values = 1;
136+
}
137+
138+
message TagValue {
139+
optional string value = 1;
140+
}
141+
142+
message Counter {
143+
double value = 1;
144+
}
145+
146+
message Gauge {
147+
double value = 1;
148+
}
149+
150+
message Set {
151+
repeated string values = 1;
152+
}
153+
154+
enum StatisticKind {
155+
Histogram = 0;
156+
Summary = 1;
157+
}
158+
159+
message Distribution1 {
160+
repeated double values = 1;
161+
repeated uint32 sample_rates = 2;
162+
StatisticKind statistic = 3;
163+
}
164+
165+
message Distribution2 {
166+
repeated DistributionSample samples = 1;
167+
StatisticKind statistic = 2;
168+
}
169+
170+
message DistributionSample {
171+
double value = 1;
172+
uint32 rate = 2;
173+
}
174+
175+
message AggregatedHistogram1 {
176+
repeated double buckets = 1;
177+
repeated uint32 counts = 2;
178+
uint32 count = 3;
179+
double sum = 4;
180+
}
181+
182+
message AggregatedHistogram2 {
183+
repeated HistogramBucket buckets = 1;
184+
uint32 count = 2;
185+
double sum = 3;
186+
}
187+
188+
message AggregatedHistogram3 {
189+
repeated HistogramBucket3 buckets = 1;
190+
uint64 count = 2;
191+
double sum = 3;
192+
}
193+
194+
message HistogramBucket {
195+
double upper_limit = 1;
196+
uint32 count = 2;
197+
}
198+
199+
message HistogramBucket3 {
200+
double upper_limit = 1;
201+
uint64 count = 2;
202+
}
203+
204+
message AggregatedSummary1 {
205+
repeated double quantiles = 1;
206+
repeated double values = 2;
207+
uint32 count = 3;
208+
double sum = 4;
209+
}
210+
211+
message AggregatedSummary2 {
212+
repeated SummaryQuantile quantiles = 1;
213+
uint32 count = 2;
214+
double sum = 3;
215+
}
216+
217+
message AggregatedSummary3 {
218+
repeated SummaryQuantile quantiles = 1;
219+
uint64 count = 2;
220+
double sum = 3;
221+
}
222+
223+
message SummaryQuantile {
224+
double quantile = 1;
225+
double value = 2;
226+
}
227+
228+
message Sketch {
229+
message AgentDDSketch {
230+
uint32 count = 1;
231+
double min = 2;
232+
double max = 3;
233+
double sum = 4;
234+
double avg = 5;
235+
repeated sint32 k = 6;
236+
repeated uint32 n = 7;
237+
}
238+
239+
oneof sketch {
240+
AgentDDSketch agent_dd_sketch = 1;
241+
}
242+
}

0 commit comments

Comments
 (0)