Skip to content

Commit 7ade372

Browse files
committed
Support user-defined insertId in gcp_stackdriver_logs sink
1 parent a7fae1f commit 7ade372

4 files changed

Lines changed: 207 additions & 4 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
The `gcp_stackdriver_logs` sink now supports extracting a custom `insertId` field from log events
2+
via the new `insert_id_key` configuration option. The insertId is used by GCP for log de-duplication
3+
and to order query results for logs that have the same `logName` and `timestamp` values.
4+
5+
authors: garethpelly

src/sinks/gcp/stackdriver/logs/config.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,21 @@ pub(super) struct StackdriverConfig {
8181
#[configurable(metadata(docs::examples = "severity"))]
8282
pub(super) severity_key: Option<ConfigValuePath>,
8383

84+
/// The field of the log event from which to take the outgoing log's `insertId` field.
85+
///
86+
/// The named field is removed from the log event if present, and its value is used as the
87+
/// unique identifier for the log entry. The insertId is used by GCP to de-duplicate log
88+
/// entries and to order entries with the same logName and timestamp.
89+
///
90+
/// If no insertId key is specified, the insertId field is omitted from the LogEntry and the
91+
/// GCP Logging API assigns its own unique identifier in this field.
92+
///
93+
/// See the [GCP LogEntry insertId documentation][insertid_docs] for more details.
94+
///
95+
/// [insertid_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id
96+
#[configurable(metadata(docs::examples = "insert_id"))]
97+
pub(super) insert_id_key: Option<ConfigValuePath>,
98+
8499
#[serde(flatten)]
85100
pub(super) auth: GcpAuthConfig,
86101

@@ -245,6 +260,7 @@ impl SinkConfig for StackdriverConfig {
245260
self.label_config.clone(),
246261
self.resource.clone(),
247262
self.severity_key.clone(),
263+
self.insert_id_key.clone(),
248264
),
249265
};
250266

src/sinks/gcp/stackdriver/logs/encoder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub(super) struct StackdriverLogsEncoder {
2323
label_config: StackdriverLabelConfig,
2424
resource: StackdriverResource,
2525
severity_key: Option<ConfigValuePath>,
26+
insert_id_key: Option<ConfigValuePath>,
2627
}
2728

2829
impl StackdriverLogsEncoder {
@@ -34,6 +35,7 @@ impl StackdriverLogsEncoder {
3435
label_config: StackdriverLabelConfig,
3536
resource: StackdriverResource,
3637
severity_key: Option<ConfigValuePath>,
38+
insert_id_key: Option<ConfigValuePath>,
3739
) -> Self {
3840
Self {
3941
transformer,
@@ -42,6 +44,7 @@ impl StackdriverLogsEncoder {
4244
label_config,
4345
resource,
4446
severity_key,
47+
insert_id_key,
4548
}
4649
}
4750

@@ -93,6 +96,12 @@ impl StackdriverLogsEncoder {
9396
.map(remap_severity)
9497
.unwrap_or_else(|| 0.into());
9598

99+
let insert_id = self
100+
.insert_id_key
101+
.as_ref()
102+
.and_then(|key| log.remove((PathPrefix::Event, &key.0)))
103+
.map(|value| value.to_string_lossy().into_owned());
104+
96105
let default_labels_key = default_labels_key();
97106
let labels_key = self
98107
.label_config
@@ -133,6 +142,11 @@ impl StackdriverLogsEncoder {
133142
entry.insert("timestamp".into(), json!(timestamp));
134143
}
135144

145+
// If we extracted an insertId, add it to the LogEntry
146+
if let Some(insert_id) = insert_id {
147+
entry.insert("insertId".into(), json!(insert_id));
148+
}
149+
136150
Some(json!(entry))
137151
}
138152

src/sinks/gcp/stackdriver/logs/tests.rs

Lines changed: 172 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ fn encode_valid() {
101101
]),
102102
},
103103
Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()),
104+
None,
104105
);
105106

106107
let mut log = [
@@ -159,6 +160,7 @@ fn encode_inserts_timestamp() {
159160
)]),
160161
},
161162
Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()),
163+
None,
162164
);
163165

164166
let mut log = LogEvent::default();
@@ -189,6 +191,167 @@ fn encode_inserts_timestamp() {
189191
);
190192
}
191193

194+
#[test]
195+
fn encode_with_insert_id_key() {
196+
let transformer = Transformer::default();
197+
198+
let encoder = StackdriverLogsEncoder::new(
199+
transformer,
200+
Template::try_from("testlogs").unwrap(),
201+
StackdriverLogName::Project("project".to_owned()),
202+
StackdriverLabelConfig {
203+
labels_key: None,
204+
labels: HashMap::new(),
205+
},
206+
StackdriverResource {
207+
type_: "generic_node".to_owned(),
208+
labels: HashMap::from([(
209+
"namespace".to_owned(),
210+
Template::try_from("office").unwrap(),
211+
)]),
212+
},
213+
None,
214+
Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()),
215+
);
216+
217+
let log = [
218+
("message", "hello world"),
219+
("insert_id", "topic.access.log-0-12345"),
220+
]
221+
.iter()
222+
.copied()
223+
.collect::<LogEvent>();
224+
225+
let json = encoder.encode_event(Event::from(log)).unwrap();
226+
227+
assert_eq!(
228+
json,
229+
serde_json::json!({
230+
"logName": "projects/project/logs/testlogs",
231+
"jsonPayload": {"message": "hello world"},
232+
"severity": 0,
233+
"labels": {},
234+
"resource": {
235+
"type": "generic_node",
236+
"labels": {"namespace": "office"}
237+
},
238+
"insertId": "topic.access.log-0-12345"
239+
})
240+
);
241+
242+
// Verify insert_id field was removed from jsonPayload
243+
assert!(!json["jsonPayload"].as_object().unwrap().contains_key("insert_id"));
244+
}
245+
246+
#[test]
247+
fn encode_without_insert_id_key() {
248+
let transformer = Transformer::default();
249+
250+
let encoder = StackdriverLogsEncoder::new(
251+
transformer,
252+
Template::try_from("testlogs").unwrap(),
253+
StackdriverLogName::Project("project".to_owned()),
254+
StackdriverLabelConfig {
255+
labels_key: None,
256+
labels: HashMap::new(),
257+
},
258+
StackdriverResource {
259+
type_: "generic_node".to_owned(),
260+
labels: HashMap::from([(
261+
"namespace".to_owned(),
262+
Template::try_from("office").unwrap(),
263+
)]),
264+
},
265+
None,
266+
None,
267+
);
268+
269+
let log = [
270+
("message", "hello world"),
271+
("insert_id", "should-remain-in-payload"),
272+
]
273+
.iter()
274+
.copied()
275+
.collect::<LogEvent>();
276+
277+
let json = encoder.encode_event(Event::from(log)).unwrap();
278+
279+
// insertId should NOT be in the LogEntry json
280+
assert!(!json.as_object().unwrap().contains_key("insertId"));
281+
282+
// insert_id should remain in jsonPayload since we didn't configure insert_id_key
283+
assert_eq!(
284+
json["jsonPayload"]["insert_id"],
285+
serde_json::json!("should-remain-in-payload")
286+
);
287+
}
288+
289+
#[test]
290+
fn encode_insert_id_type_coercion() {
291+
let transformer = Transformer::default();
292+
293+
let encoder = StackdriverLogsEncoder::new(
294+
transformer,
295+
Template::try_from("testlogs").unwrap(),
296+
StackdriverLogName::Project("project".to_owned()),
297+
StackdriverLabelConfig {
298+
labels_key: None,
299+
labels: HashMap::new(),
300+
},
301+
StackdriverResource {
302+
type_: "generic_node".to_owned(),
303+
labels: HashMap::from([(
304+
"namespace".to_owned(),
305+
Template::try_from("office").unwrap(),
306+
)]),
307+
},
308+
None,
309+
Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()),
310+
);
311+
312+
let mut log = LogEvent::default();
313+
log.insert("message", Value::Bytes("hello".into()));
314+
log.insert("insert_id", Value::Integer(12345));
315+
316+
let json = encoder.encode_event(Event::from(log)).unwrap();
317+
318+
assert_eq!(json["insertId"], serde_json::json!("12345"));
319+
}
320+
321+
#[test]
322+
fn encode_insert_id_field_missing() {
323+
let transformer = Transformer::default();
324+
325+
let encoder = StackdriverLogsEncoder::new(
326+
transformer,
327+
Template::try_from("testlogs").unwrap(),
328+
StackdriverLogName::Project("project".to_owned()),
329+
StackdriverLabelConfig {
330+
labels_key: None,
331+
labels: HashMap::new(),
332+
},
333+
StackdriverResource {
334+
type_: "generic_node".to_owned(),
335+
labels: HashMap::from([(
336+
"namespace".to_owned(),
337+
Template::try_from("office").unwrap(),
338+
)]),
339+
},
340+
None,
341+
Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()),
342+
);
343+
344+
let log = [("message", "hello world")]
345+
.iter()
346+
.copied()
347+
.collect::<LogEvent>();
348+
349+
let json = encoder.encode_event(Event::from(log)).unwrap();
350+
351+
// insertId should NOT be in LogEntry if key was supplied but field was missing
352+
assert!(!json.as_object().unwrap().contains_key("insertId"));
353+
}
354+
192355
#[test]
193356
fn severity_remaps_strings() {
194357
for &(s, n) in &[
@@ -239,6 +402,7 @@ async fn correct_request() {
239402
)]),
240403
},
241404
None,
405+
None,
242406
);
243407

244408
let log1 = [("message", "hello")].iter().copied().collect::<LogEvent>();
@@ -322,11 +486,15 @@ async fn fails_missing_creds() {
322486
log_id = "testlogs"
323487
resource.type = "generic_node"
324488
resource.namespace = "office"
325-
"#})
489+
credentials_path = {missing_credentials_path:?}
490+
"#}))
326491
.unwrap();
327-
if config.build(SinkContext::default()).await.is_ok() {
328-
panic!("config.build failed to error");
329-
}
492+
493+
let error = config
494+
.build(SinkContext::default())
495+
.await
496+
.expect_err("config.build failed to error");
497+
assert_downcast_matches!(error, GcpError, GcpError::InvalidCredentials { .. });
330498
}
331499

332500
#[test]

0 commit comments

Comments
 (0)