Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The CloudWatch Logs sink now accepts a `group_class` option that selects which
log class to create (`STANDARD` or `INFREQUENT_ACCESS`). Defaults to `STANDARD`.

authors: vivshaw
38 changes: 36 additions & 2 deletions src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use aws_sdk_cloudwatchlogs::{Client as CloudwatchLogsClient, types::LogGroupClass};
use futures::FutureExt;
use serde::{Deserialize, Deserializer, de};
use tower::ServiceBuilder;
Expand Down Expand Up @@ -55,6 +55,30 @@ pub struct Retention {
pub days: u32,
}

/// CloudWatch Logs [log class][log_class] used when creating a new log group.
///
/// [log_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
#[configurable_component]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did this to match how y'all are handling S3 storage classes, which you have as SCREAMING_SNAKE

pub enum CloudwatchLogsGroupClass {
/// For real-time monitoring and frequently-accessed logs.
#[default]
Standard,

/// For cost-effective consolidation of logs that are queried only occasionally.
InfrequentAccess,
}

impl From<CloudwatchLogsGroupClass> for LogGroupClass {
fn from(value: CloudwatchLogsGroupClass) -> Self {
match value {
CloudwatchLogsGroupClass::Standard => LogGroupClass::Standard,
CloudwatchLogsGroupClass::InfrequentAccess => LogGroupClass::InfrequentAccess,
}
}
}

fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
Expand Down Expand Up @@ -109,7 +133,8 @@ pub struct CloudwatchLogsSinkConfig {
#[serde(flatten)]
pub region: RegionOrEndpoint,

/// Dynamically create a [log group][log_group] if it does not already exist.
/// Dynamically create a [log group][log_group] if it does not already exist. Its group
/// class is determined by `group_class`.
///
/// This ignores `create_missing_stream` directly after creating the group and creates
/// the first stream.
Expand All @@ -118,6 +143,14 @@ pub struct CloudwatchLogsSinkConfig {
#[serde(default = "crate::serde::default_true")]
pub create_missing_group: bool,

/// The [log class][log_class] used when dynamically creating a log group via
/// `create_missing_group`.
///
/// [log_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
#[serde(default)]
#[configurable(derived)]
pub group_class: CloudwatchLogsGroupClass,

/// Dynamically create a [log stream][log_stream] if it does not already exist.
///
/// [log_stream]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
Expand Down Expand Up @@ -257,6 +290,7 @@ fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
stream_name: Default::default(),
region: Default::default(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down
8 changes: 8 additions & 0 deletions src/sinks/aws_cloudwatch_logs/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn cloudwatch_insert_log_event() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -107,6 +108,7 @@ async fn cloudwatch_insert_log_events_sorted() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -185,6 +187,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -264,6 +267,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -317,6 +321,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation_with_kms_key_and_tags() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -406,6 +411,7 @@ async fn cloudwatch_insert_log_event_batched() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -459,6 +465,7 @@ async fn cloudwatch_insert_log_event_partitioned() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down Expand Up @@ -554,6 +561,7 @@ async fn cloudwatch_healthcheck() {
region: RegionOrEndpoint::with_both("us-east-1", cloudwatch_address().as_str()),
encoding: TextSerializerConfig::default().into(),
create_missing_group: true,
group_class: Default::default(),
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
Expand Down
10 changes: 9 additions & 1 deletion src/sinks/aws_cloudwatch_logs/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use http::{HeaderValue, header::HeaderName};
use indexmap::IndexMap;
use tokio::sync::oneshot;

use crate::sinks::aws_cloudwatch_logs::{config::Retention, service::CloudwatchError};
use crate::sinks::aws_cloudwatch_logs::{
config::{CloudwatchLogsGroupClass, Retention},
service::CloudwatchError,
};

pub struct CloudwatchFuture {
client: Client,
Expand All @@ -42,6 +45,7 @@ struct Client {
retention_days: u32,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
group_class: CloudwatchLogsGroupClass,
}

type ClientResult<T, E> = BoxFuture<'static, Result<T, SdkError<E, HttpResponse>>>;
Expand All @@ -67,6 +71,7 @@ impl CloudwatchFuture {
retention: Retention,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
group_class: CloudwatchLogsGroupClass,
mut events: Vec<Vec<InputLogEvent>>,
token: Option<String>,
token_tx: oneshot::Sender<Option<String>>,
Expand All @@ -80,6 +85,7 @@ impl CloudwatchFuture {
retention_days,
kms_key,
tags,
group_class,
};

let state = if let Some(token) = token {
Expand Down Expand Up @@ -296,12 +302,14 @@ impl Client {
let group_name = self.group_name.clone();
let kms_key = self.kms_key.clone();
let tags = self.tags.clone();
let log_group_class = self.group_class.into();
Box::pin(async move {
client
.create_log_group()
.log_group_name(group_name)
.set_kms_key_id(kms_key)
.set_tags(tags)
.log_group_class(log_group_class)
.send()
.await?;
Ok(())
Expand Down
6 changes: 5 additions & 1 deletion src/sinks/aws_cloudwatch_logs/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use vector_lib::{
use crate::sinks::{
aws_cloudwatch_logs::{
CloudwatchKey,
config::{CloudwatchLogsSinkConfig, Retention},
config::{CloudwatchLogsGroupClass, CloudwatchLogsSinkConfig, Retention},
request,
retry::CloudwatchRetryLogic,
sink::BatchCloudwatchRequest,
Expand Down Expand Up @@ -248,6 +248,7 @@ impl CloudwatchLogsSvc {

let kms_key = config.kms_key.clone();
let tags = config.tags.clone();
let group_class = config.group_class;

CloudwatchLogsSvc {
headers,
Expand All @@ -259,6 +260,7 @@ impl CloudwatchLogsSvc {
retention,
kms_key,
tags,
group_class,
token: None,
token_rx: None,
}
Expand Down Expand Up @@ -335,6 +337,7 @@ impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
self.retention.clone(),
self.kms_key.clone(),
self.tags.clone(),
self.group_class,
event_batches,
self.token.take(),
tx,
Expand All @@ -355,6 +358,7 @@ pub struct CloudwatchLogsSvc {
retention: Retention,
kms_key: Option<String>,
tags: Option<HashMap<String, String>>,
group_class: CloudwatchLogsGroupClass,
token: Option<String>,
token_rx: Option<oneshot::Receiver<Option<String>>>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: {
}
create_missing_group: {
description: """
Dynamically create a [log group][log_group] if it does not already exist.
Dynamically create a [log group][log_group] if it does not already exist. Its group
class is determined by `group_class`.

This ignores `create_missing_stream` directly after creating the group and creates
the first stream.
Expand Down Expand Up @@ -679,6 +680,22 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: {
required: false
type: string: examples: ["http://127.0.0.0:5000/path/to/service"]
}
group_class: {
description: """
The [log class][log_class] used when dynamically creating a log group via
`create_missing_group`.

[log_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html
"""
required: false
type: string: {
default: "STANDARD"
enum: {
INFREQUENT_ACCESS: "For cost-effective consolidation of logs that are queried only occasionally."
STANDARD: "For real-time monitoring and frequently-accessed logs."
}
}
}
group_name: {
description: """
The [group name][group_name] of the target CloudWatch Logs stream.
Expand Down
Loading