Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ IMSD
incentivizes
indexmap
inodes
insertid
installdeb
interpretervm
invtrapezium
Expand Down
5 changes: 5 additions & 0 deletions changelog.d/gcp_stackdriver_logs_insert_id.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The `gcp_stackdriver_logs` sink now supports extracting a custom `insertId` field from log events
via the new `insert_id_key` configuration option. The insertId is used by GCP for log de-duplication
and to order query results for logs that have the same `logName` and `timestamp` values.

authors: garethpelly
16 changes: 16 additions & 0 deletions src/sinks/gcp/stackdriver/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,21 @@ pub(super) struct StackdriverConfig {
#[configurable(metadata(docs::examples = "severity"))]
pub(super) severity_key: Option<ConfigValuePath>,

/// The field of the log event from which to take the outgoing log's `insertId` field.
///
/// The named field is removed from the log event if present, and its value is used as the
/// unique identifier for the log entry. The insertId is used by GCP to de-duplicate log
/// entries and to order entries with the same logName and timestamp.
///
/// If no insertId key is specified, the insertId field is omitted from the LogEntry and the
/// GCP Logging API assigns its own unique identifier in this field.
///
/// See the [GCP LogEntry insertId documentation][insertid_docs] for more details.
///
/// [insertid_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id
#[configurable(metadata(docs::examples = "insert_id"))]
pub(super) insert_id_key: Option<ConfigValuePath>,

#[serde(flatten)]
pub(super) auth: GcpAuthConfig,

Expand Down Expand Up @@ -249,6 +264,7 @@ impl SinkConfig for StackdriverConfig {
self.label_config.clone(),
self.resource.clone(),
self.severity_key.clone(),
self.insert_id_key.clone(),
),
};

Expand Down
14 changes: 14 additions & 0 deletions src/sinks/gcp/stackdriver/logs/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub(super) struct StackdriverLogsEncoder {
label_config: StackdriverLabelConfig,
resource: StackdriverResource,
severity_key: Option<ConfigValuePath>,
insert_id_key: Option<ConfigValuePath>,
}

impl StackdriverLogsEncoder {
Expand All @@ -34,6 +35,7 @@ impl StackdriverLogsEncoder {
label_config: StackdriverLabelConfig,
resource: StackdriverResource,
severity_key: Option<ConfigValuePath>,
insert_id_key: Option<ConfigValuePath>,
) -> Self {
Self {
transformer,
Expand All @@ -42,6 +44,7 @@ impl StackdriverLogsEncoder {
label_config,
resource,
severity_key,
insert_id_key,
}
}

Expand Down Expand Up @@ -93,6 +96,12 @@ impl StackdriverLogsEncoder {
.map(remap_severity)
.unwrap_or_else(|| 0.into());

let insert_id = self
.insert_id_key
.as_ref()
.and_then(|key| log.remove((PathPrefix::Event, &key.0)))
.map(|value| value.to_string_lossy().into_owned());

let default_labels_key = default_labels_key();
let labels_key = self
.label_config
Expand Down Expand Up @@ -133,6 +142,11 @@ impl StackdriverLogsEncoder {
entry.insert("timestamp".into(), json!(timestamp));
}

// If we extracted an insertId, add it to the LogEntry
if let Some(insert_id) = insert_id {
entry.insert("insertId".into(), json!(insert_id));
}

Some(json!(entry))
}

Expand Down
169 changes: 169 additions & 0 deletions src/sinks/gcp/stackdriver/logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ fn encode_valid() {
]),
},
Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()),
None,
);

let mut log = [
Expand Down Expand Up @@ -159,6 +160,7 @@ fn encode_inserts_timestamp() {
)]),
},
Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()),
None,
);

let mut log = LogEvent::default();
Expand Down Expand Up @@ -189,6 +191,172 @@ fn encode_inserts_timestamp() {
);
}

#[test]
fn encode_with_insert_id_key() {
let transformer = Transformer::default();

let encoder = StackdriverLogsEncoder::new(
transformer,
Template::try_from("testlogs").unwrap(),
StackdriverLogName::Project("project".to_owned()),
StackdriverLabelConfig {
labels_key: None,
labels: HashMap::new(),
},
StackdriverResource {
type_: "generic_node".to_owned(),
labels: HashMap::from([(
"namespace".to_owned(),
Template::try_from("office").unwrap(),
)]),
},
None,
Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()),
);

let log = [
("message", "hello world"),
("insert_id", "topic.access.log-0-12345"),
]
.iter()
.copied()
.collect::<LogEvent>();

let json = encoder.encode_event(Event::from(log)).unwrap();

assert_eq!(
json,
serde_json::json!({
"logName": "projects/project/logs/testlogs",
"jsonPayload": {"message": "hello world"},
"severity": 0,
"labels": {},
"resource": {
"type": "generic_node",
"labels": {"namespace": "office"}
},
"insertId": "topic.access.log-0-12345"
})
);

// Verify insert_id field was removed from jsonPayload
assert!(
!json["jsonPayload"]
.as_object()
.unwrap()
.contains_key("insert_id")
);
}

#[test]
fn encode_without_insert_id_key() {
let transformer = Transformer::default();

let encoder = StackdriverLogsEncoder::new(
transformer,
Template::try_from("testlogs").unwrap(),
StackdriverLogName::Project("project".to_owned()),
StackdriverLabelConfig {
labels_key: None,
labels: HashMap::new(),
},
StackdriverResource {
type_: "generic_node".to_owned(),
labels: HashMap::from([(
"namespace".to_owned(),
Template::try_from("office").unwrap(),
)]),
},
None,
None,
);

let log = [
("message", "hello world"),
("insert_id", "should-remain-in-payload"),
]
.iter()
.copied()
.collect::<LogEvent>();

let json = encoder.encode_event(Event::from(log)).unwrap();

// insertId should NOT be in the LogEntry json
assert!(!json.as_object().unwrap().contains_key("insertId"));

// insert_id should remain in jsonPayload since we didn't configure insert_id_key
assert_eq!(
json["jsonPayload"]["insert_id"],
serde_json::json!("should-remain-in-payload")
);
}

#[test]
fn encode_insert_id_type_coercion() {
let transformer = Transformer::default();

let encoder = StackdriverLogsEncoder::new(
transformer,
Template::try_from("testlogs").unwrap(),
StackdriverLogName::Project("project".to_owned()),
StackdriverLabelConfig {
labels_key: None,
labels: HashMap::new(),
},
StackdriverResource {
type_: "generic_node".to_owned(),
labels: HashMap::from([(
"namespace".to_owned(),
Template::try_from("office").unwrap(),
)]),
},
None,
Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()),
);

let mut log = LogEvent::default();
log.insert("message", Value::Bytes("hello".into()));
log.insert("insert_id", Value::Integer(12345));

let json = encoder.encode_event(Event::from(log)).unwrap();

assert_eq!(json["insertId"], serde_json::json!("12345"));
}

#[test]
fn encode_insert_id_field_missing() {
let transformer = Transformer::default();

let encoder = StackdriverLogsEncoder::new(
transformer,
Template::try_from("testlogs").unwrap(),
StackdriverLogName::Project("project".to_owned()),
StackdriverLabelConfig {
labels_key: None,
labels: HashMap::new(),
},
StackdriverResource {
type_: "generic_node".to_owned(),
labels: HashMap::from([(
"namespace".to_owned(),
Template::try_from("office").unwrap(),
)]),
},
None,
Some(ConfigValuePath::try_from("insert_id".to_owned()).unwrap()),
);

let log = [("message", "hello world")]
.iter()
.copied()
.collect::<LogEvent>();

let json = encoder.encode_event(Event::from(log)).unwrap();

// insertId should NOT be in LogEntry if key was supplied but field was missing
assert!(!json.as_object().unwrap().contains_key("insertId"));
}

#[test]
fn severity_remaps_strings() {
for &(s, n) in &[
Expand Down Expand Up @@ -239,6 +407,7 @@ async fn correct_request() {
)]),
},
None,
None,
);

let log1 = [("message", "hello")].iter().copied().collect::<LogEvent>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ generated: components: sinks: gcp_stackdriver_logs: configuration: {
required: true
type: string: {}
}
insert_id_key: {
description: """
The field of the log event from which to take the outgoing log's `insertId` field.

The named field is removed from the log event if present, and its value is used as the
unique identifier for the log entry. The insertId is used by GCP to de-duplicate log
entries and to order entries with the same logName and timestamp.

If no insertId key is specified, the insertId field is omitted from the LogEntry and the
GCP Logging API assigns its own unique identifier in this field.

See the [GCP LogEntry insertId documentation][insertid_docs] for more details.

[insertid_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id
"""
required: false
type: string: examples: ["insert_id"]
}
labels: {
description: "A map of key, value pairs that provides additional information about the log entry."
required: false
Expand Down
Loading