diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index beee750db8d5b..98ce2419d8980 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -272,6 +272,7 @@ IMSD incentivizes indexmap inodes +insertid installdeb interpretervm invtrapezium diff --git a/changelog.d/gcp_stackdriver_logs_insert_id.feature.md b/changelog.d/gcp_stackdriver_logs_insert_id.feature.md new file mode 100644 index 0000000000000..e28435328e70f --- /dev/null +++ b/changelog.d/gcp_stackdriver_logs_insert_id.feature.md @@ -0,0 +1,5 @@ +The `gcp_stackdriver_logs` sink now supports extracting a custom `insertId` field from log events +via the new `insert_id_key` configuration option. The insertId is used by GCP for log de-duplication +and to order query results for logs that have the same `logName` and `timestamp` values. + +authors: garethpelly diff --git a/src/sinks/gcp/stackdriver/logs/config.rs b/src/sinks/gcp/stackdriver/logs/config.rs index addb45163df76..13d7764d0b53c 100644 --- a/src/sinks/gcp/stackdriver/logs/config.rs +++ b/src/sinks/gcp/stackdriver/logs/config.rs @@ -81,6 +81,21 @@ pub(super) struct StackdriverConfig { #[configurable(metadata(docs::examples = "severity"))] pub(super) severity_key: Option, + /// The field of the log event from which to take the outgoing log's `insertId` field. + /// + /// The named field is removed from the log event if present, and its value is used as the + /// unique identifier for the log entry. The insertId is used by GCP to de-duplicate log + /// entries and to order entries with the same logName and timestamp. + /// + /// If no insertId key is specified, the insertId field is omitted from the LogEntry and the + /// GCP Logging API assigns its own unique identifier in this field. + /// + /// See the [GCP LogEntry insertId documentation][insertid_docs] for more details. + /// + /// [insertid_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id + #[configurable(metadata(docs::examples = "insert_id"))] + pub(super) insert_id_key: Option, + #[serde(flatten)] pub(super) auth: GcpAuthConfig, @@ -249,6 +264,7 @@ impl SinkConfig for StackdriverConfig { self.label_config.clone(), self.resource.clone(), self.severity_key.clone(), + self.insert_id_key.clone(), ), }; diff --git a/src/sinks/gcp/stackdriver/logs/encoder.rs b/src/sinks/gcp/stackdriver/logs/encoder.rs index b03a9c0140181..5c7a0fb35d364 100644 --- a/src/sinks/gcp/stackdriver/logs/encoder.rs +++ b/src/sinks/gcp/stackdriver/logs/encoder.rs @@ -23,6 +23,7 @@ pub(super) struct StackdriverLogsEncoder { label_config: StackdriverLabelConfig, resource: StackdriverResource, severity_key: Option, + insert_id_key: Option, } impl StackdriverLogsEncoder { @@ -34,6 +35,7 @@ impl StackdriverLogsEncoder { label_config: StackdriverLabelConfig, resource: StackdriverResource, severity_key: Option, + insert_id_key: Option, ) -> Self { Self { transformer, @@ -42,6 +44,7 @@ impl StackdriverLogsEncoder { label_config, resource, severity_key, + insert_id_key, } } @@ -93,6 +96,12 @@ impl StackdriverLogsEncoder { .map(remap_severity) .unwrap_or_else(|| 0.into()); + let insert_id = self + .insert_id_key + .as_ref() + .and_then(|key| log.remove((PathPrefix::Event, &key.0))) + .map(|value| value.to_string_lossy().into_owned()); + let default_labels_key = default_labels_key(); let labels_key = self .label_config @@ -133,6 +142,11 @@ impl StackdriverLogsEncoder { entry.insert("timestamp".into(), json!(timestamp)); } + // If we extracted an insertId, add it to the LogEntry + if let Some(insert_id) = insert_id { + entry.insert("insertId".into(), json!(insert_id)); + } + Some(json!(entry)) } diff --git a/src/sinks/gcp/stackdriver/logs/tests.rs b/src/sinks/gcp/stackdriver/logs/tests.rs index 89f3db2419f43..d953df3541397 100644 --- a/src/sinks/gcp/stackdriver/logs/tests.rs +++ b/src/sinks/gcp/stackdriver/logs/tests.rs @@ -101,6 +101,7 @@ fn encode_valid() { ]), }, Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()), + None, ); let mut log = [ @@ -159,6 +160,7 @@ fn encode_inserts_timestamp() { )]), }, Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()), + None, ); let mut log = LogEvent::default(); @@ -189,6 +191,172 @@ fn encode_inserts_timestamp() { ); } +#[test] +fn encode_with_insert_id_key() { + let transformer = Transformer::default(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverLabelConfig { + labels_key: None, + labels: HashMap::new(), + }, + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + None, + Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()), + ); + + let log = [ + ("message", "hello world"), + ("insert_id", "topic.access.log-0-12345"), + ] + .iter() + .copied() + .collect::(); + + let json = encoder.encode_event(Event::from(log)).unwrap(); + + assert_eq!( + json, + serde_json::json!({ + "logName": "projects/project/logs/testlogs", + "jsonPayload": {"message": "hello world"}, + "severity": 0, + "labels": {}, + "resource": { + "type": "generic_node", + "labels": {"namespace": "office"} + }, + "insertId": "topic.access.log-0-12345" + }) + ); + + // Verify insert_id field was removed from jsonPayload + assert!( + !json["jsonPayload"] + .as_object() + .unwrap() + .contains_key("insert_id") + ); +} + +#[test] +fn encode_without_insert_id_key() { + let transformer = Transformer::default(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverLabelConfig { + labels_key: None, + labels: HashMap::new(), + }, + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + None, + None, + ); + + let log = [ + ("message", "hello world"), + ("insert_id", "should-remain-in-payload"), + ] + .iter() + .copied() + .collect::(); + + let json = encoder.encode_event(Event::from(log)).unwrap(); + + // insertId should NOT be in the LogEntry json + assert!(!json.as_object().unwrap().contains_key("insertId")); + + // insert_id should remain in jsonPayload since we didn't configure insert_id_key + assert_eq!( + json["jsonPayload"]["insert_id"], + serde_json::json!("should-remain-in-payload") + ); +} + +#[test] +fn encode_insert_id_type_coercion() { + let transformer = Transformer::default(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverLabelConfig { + labels_key: None, + labels: HashMap::new(), + }, + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + None, + Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()), + ); + + let mut log = LogEvent::default(); + log.insert("message", Value::Bytes("hello".into())); + log.insert("insert_id", Value::Integer(12345)); + + let json = encoder.encode_event(Event::from(log)).unwrap(); + + assert_eq!(json["insertId"], serde_json::json!("12345")); +} + +#[test] +fn encode_insert_id_field_missing() { + let transformer = Transformer::default(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverLabelConfig { + labels_key: None, + labels: HashMap::new(), + }, + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + None, + Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()), + ); + + let log = [("message", "hello world")] + .iter() + .copied() + .collect::(); + + let json = encoder.encode_event(Event::from(log)).unwrap(); + + // insertId should NOT be in LogEntry if key was supplied but field was missing + assert!(!json.as_object().unwrap().contains_key("insertId")); +} + #[test] fn severity_remaps_strings() { for &(s, n) in &[ @@ -239,6 +407,7 @@ async fn correct_request() { )]), }, None, + None, ); let log1 = [("message", "hello")].iter().copied().collect::(); diff --git a/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue b/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue index 2685eda9ffb10..2acfa3b3543a2 100644 --- a/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue +++ b/website/cue/reference/components/sinks/generated/gcp_stackdriver_logs.cue @@ -141,6 +141,24 @@ generated: components: sinks: gcp_stackdriver_logs: configuration: { required: true type: string: {} } + insert_id_key: { + description: """ + The field of the log event from which to take the outgoing log's `insertId` field. + + The named field is removed from the log event if present, and its value is used as the + unique identifier for the log entry. The insertId is used by GCP to de-duplicate log + entries and to order entries with the same logName and timestamp. + + If no insertId key is specified, the insertId field is omitted from the LogEntry and the + GCP Logging API assigns its own unique identifier in this field. + + See the [GCP LogEntry insertId documentation][insertid_docs] for more details. + + [insertid_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id + """ + required: false + type: string: examples: ["insert_id"] + } labels: { description: "A map of key, value pairs that provides additional information about the log entry." required: false