Skip to content

Commit 380774e

Browse files
api-clients-generation-pipeline[bot]ci.datadog-api-spec
andauthored
Adding compression optional field to Amazon S3 source (#1467)
Co-authored-by: ci.datadog-api-spec <packages@datadoghq.com>
1 parent df078ae commit 380774e

8 files changed

Lines changed: 242 additions & 2 deletions

.generator/schemas/v2/openapi.yaml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43045,12 +43045,14 @@ components:
4304543045
ObservabilityPipelineAmazonS3Source:
4304643046
description: |-
4304743047
The `amazon_s3` source ingests logs from an Amazon S3 bucket.
43048-
It supports AWS authentication and TLS encryption.
43048+
It supports AWS authentication, TLS encryption, and configurable compression.
4304943049

4305043050
**Supported pipeline types:** logs
4305143051
properties:
4305243052
auth:
4305343053
$ref: "#/components/schemas/ObservabilityPipelineAwsAuth"
43054+
compression:
43055+
$ref: "#/components/schemas/ObservabilityPipelineAmazonS3SourceCompression"
4305443056
id:
4305543057
description: The unique identifier for this component. Used in other parts of the pipeline to reference this component (for example, as the `input` to downstream components).
4305643058
example: aws-s3-source
@@ -43073,6 +43075,20 @@ components:
4307343075
- region
4307443076
type: object
4307543077
x-pipeline-types: [logs]
43078+
ObservabilityPipelineAmazonS3SourceCompression:
43079+
description: Compression format for objects retrieved from the S3 bucket. Use `auto` to detect compression from the object's Content-Encoding header or file extension.
43080+
enum:
43081+
- auto
43082+
- none
43083+
- gzip
43084+
- zstd
43085+
example: gzip
43086+
type: string
43087+
x-enum-varnames:
43088+
- AUTO
43089+
- NONE
43090+
- GZIP
43091+
- ZSTD
4307643092
ObservabilityPipelineAmazonS3SourceType:
4307743093
default: amazon_s3
4307843094
description: The source type. Always `amazon_s3`.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Validate an observability pipeline with amazon S3 source compression returns
2+
// "OK" response
3+
use datadog_api_client::datadog;
4+
use datadog_api_client::datadogV2::api_observability_pipelines::ObservabilityPipelinesAPI;
5+
use datadog_api_client::datadogV2::model::ObservabilityPipelineAmazonS3Source;
6+
use datadog_api_client::datadogV2::model::ObservabilityPipelineAmazonS3SourceCompression;
7+
use datadog_api_client::datadogV2::model::ObservabilityPipelineAmazonS3SourceType;
8+
use datadog_api_client::datadogV2::model::ObservabilityPipelineConfig;
9+
use datadog_api_client::datadogV2::model::ObservabilityPipelineConfigDestinationItem;
10+
use datadog_api_client::datadogV2::model::ObservabilityPipelineConfigProcessorGroup;
11+
use datadog_api_client::datadogV2::model::ObservabilityPipelineConfigProcessorItem;
12+
use datadog_api_client::datadogV2::model::ObservabilityPipelineConfigSourceItem;
13+
use datadog_api_client::datadogV2::model::ObservabilityPipelineDataAttributes;
14+
use datadog_api_client::datadogV2::model::ObservabilityPipelineDatadogLogsDestination;
15+
use datadog_api_client::datadogV2::model::ObservabilityPipelineDatadogLogsDestinationType;
16+
use datadog_api_client::datadogV2::model::ObservabilityPipelineFilterProcessor;
17+
use datadog_api_client::datadogV2::model::ObservabilityPipelineFilterProcessorType;
18+
use datadog_api_client::datadogV2::model::ObservabilityPipelineSpec;
19+
use datadog_api_client::datadogV2::model::ObservabilityPipelineSpecData;
20+
21+
#[tokio::main]
22+
async fn main() {
23+
let body =
24+
ObservabilityPipelineSpec::new(
25+
ObservabilityPipelineSpecData::new(
26+
ObservabilityPipelineDataAttributes::new(
27+
ObservabilityPipelineConfig::new(
28+
vec![
29+
ObservabilityPipelineConfigDestinationItem::ObservabilityPipelineDatadogLogsDestination(
30+
Box::new(
31+
ObservabilityPipelineDatadogLogsDestination::new(
32+
"datadog-logs-destination".to_string(),
33+
vec!["my-processor-group".to_string()],
34+
ObservabilityPipelineDatadogLogsDestinationType::DATADOG_LOGS,
35+
),
36+
),
37+
)
38+
],
39+
vec![
40+
ObservabilityPipelineConfigSourceItem::ObservabilityPipelineAmazonS3Source(
41+
Box::new(
42+
ObservabilityPipelineAmazonS3Source::new(
43+
"amazon-s3-source".to_string(),
44+
"us-east-1".to_string(),
45+
ObservabilityPipelineAmazonS3SourceType::AMAZON_S3,
46+
).compression(ObservabilityPipelineAmazonS3SourceCompression::GZIP),
47+
),
48+
)
49+
],
50+
).processor_groups(
51+
vec![
52+
ObservabilityPipelineConfigProcessorGroup::new(
53+
true,
54+
"my-processor-group".to_string(),
55+
"service:my-service".to_string(),
56+
vec!["amazon-s3-source".to_string()],
57+
vec![
58+
ObservabilityPipelineConfigProcessorItem::ObservabilityPipelineFilterProcessor(
59+
Box::new(
60+
ObservabilityPipelineFilterProcessor::new(
61+
true,
62+
"filter-processor".to_string(),
63+
"service:my-service".to_string(),
64+
ObservabilityPipelineFilterProcessorType::FILTER,
65+
),
66+
),
67+
)
68+
],
69+
)
70+
],
71+
),
72+
"Pipeline with S3 Source Compression".to_string(),
73+
),
74+
"pipelines".to_string(),
75+
),
76+
);
77+
let configuration = datadog::Configuration::new();
78+
let api = ObservabilityPipelinesAPI::with_config(configuration);
79+
let resp = api.validate_pipeline(body).await;
80+
if let Ok(value) = resp {
81+
println!("{:#?}", value);
82+
} else {
83+
println!("{:#?}", resp.unwrap_err());
84+
}
85+
}

src/datadogV2/model/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5452,6 +5452,8 @@ pub mod model_observability_pipeline_amazon_data_firehose_source_type;
54525452
pub use self::model_observability_pipeline_amazon_data_firehose_source_type::ObservabilityPipelineAmazonDataFirehoseSourceType;
54535453
pub mod model_observability_pipeline_amazon_s3_source;
54545454
pub use self::model_observability_pipeline_amazon_s3_source::ObservabilityPipelineAmazonS3Source;
5455+
pub mod model_observability_pipeline_amazon_s3_source_compression;
5456+
pub use self::model_observability_pipeline_amazon_s3_source_compression::ObservabilityPipelineAmazonS3SourceCompression;
54555457
pub mod model_observability_pipeline_amazon_s3_source_type;
54565458
pub use self::model_observability_pipeline_amazon_s3_source_type::ObservabilityPipelineAmazonS3SourceType;
54575459
pub mod model_observability_pipeline_fluent_bit_source;

src/datadogV2/model/model_observability_pipeline_amazon_s3_source.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use serde_with::skip_serializing_none;
77
use std::fmt::{self, Formatter};
88

99
/// The `amazon_s3` source ingests logs from an Amazon S3 bucket.
10-
/// It supports AWS authentication and TLS encryption.
10+
/// It supports AWS authentication, TLS encryption, and configurable compression.
1111
///
1212
/// **Supported pipeline types:** logs
1313
#[non_exhaustive]
@@ -18,6 +18,10 @@ pub struct ObservabilityPipelineAmazonS3Source {
1818
/// If omitted, the system’s default credentials are used (for example, the IAM role and environment variables).
1919
#[serde(rename = "auth")]
2020
pub auth: Option<crate::datadogV2::model::ObservabilityPipelineAwsAuth>,
21+
/// Compression format for objects retrieved from the S3 bucket. Use `auto` to detect compression from the object's Content-Encoding header or file extension.
22+
#[serde(rename = "compression")]
23+
pub compression:
24+
Option<crate::datadogV2::model::ObservabilityPipelineAmazonS3SourceCompression>,
2125
/// The unique identifier for this component. Used in other parts of the pipeline to reference this component (for example, as the `input` to downstream components).
2226
#[serde(rename = "id")]
2327
pub id: String,
@@ -48,6 +52,7 @@ impl ObservabilityPipelineAmazonS3Source {
4852
) -> ObservabilityPipelineAmazonS3Source {
4953
ObservabilityPipelineAmazonS3Source {
5054
auth: None,
55+
compression: None,
5156
id,
5257
region,
5358
tls: None,
@@ -63,6 +68,14 @@ impl ObservabilityPipelineAmazonS3Source {
6368
self
6469
}
6570

71+
pub fn compression(
72+
mut self,
73+
value: crate::datadogV2::model::ObservabilityPipelineAmazonS3SourceCompression,
74+
) -> Self {
75+
self.compression = Some(value);
76+
self
77+
}
78+
6679
pub fn tls(mut self, value: crate::datadogV2::model::ObservabilityPipelineTls) -> Self {
6780
self.tls = Some(value);
6881
self
@@ -100,6 +113,9 @@ impl<'de> Deserialize<'de> for ObservabilityPipelineAmazonS3Source {
100113
M: MapAccess<'a>,
101114
{
102115
let mut auth: Option<crate::datadogV2::model::ObservabilityPipelineAwsAuth> = None;
116+
let mut compression: Option<
117+
crate::datadogV2::model::ObservabilityPipelineAmazonS3SourceCompression,
118+
> = None;
103119
let mut id: Option<String> = None;
104120
let mut region: Option<String> = None;
105121
let mut tls: Option<crate::datadogV2::model::ObservabilityPipelineTls> = None;
@@ -121,6 +137,21 @@ impl<'de> Deserialize<'de> for ObservabilityPipelineAmazonS3Source {
121137
}
122138
auth = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
123139
}
140+
"compression" => {
141+
if v.is_null() {
142+
continue;
143+
}
144+
compression =
145+
Some(serde_json::from_value(v).map_err(M::Error::custom)?);
146+
if let Some(ref _compression) = compression {
147+
match _compression {
148+
crate::datadogV2::model::ObservabilityPipelineAmazonS3SourceCompression::UnparsedObject(_compression) => {
149+
_unparsed = true;
150+
},
151+
_ => {}
152+
}
153+
}
154+
}
124155
"id" => {
125156
id = Some(serde_json::from_value(v).map_err(M::Error::custom)?);
126157
}
@@ -163,6 +194,7 @@ impl<'de> Deserialize<'de> for ObservabilityPipelineAmazonS3Source {
163194

164195
let content = ObservabilityPipelineAmazonS3Source {
165196
auth,
197+
compression,
166198
id,
167199
region,
168200
tls,
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2.0 License.
2+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
3+
// Copyright 2019-Present Datadog, Inc.
4+
5+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
6+
7+
#[non_exhaustive]
8+
#[derive(Clone, Debug, Eq, PartialEq)]
9+
pub enum ObservabilityPipelineAmazonS3SourceCompression {
10+
AUTO,
11+
NONE,
12+
GZIP,
13+
ZSTD,
14+
UnparsedObject(crate::datadog::UnparsedObject),
15+
}
16+
17+
impl ToString for ObservabilityPipelineAmazonS3SourceCompression {
18+
fn to_string(&self) -> String {
19+
match self {
20+
Self::AUTO => String::from("auto"),
21+
Self::NONE => String::from("none"),
22+
Self::GZIP => String::from("gzip"),
23+
Self::ZSTD => String::from("zstd"),
24+
Self::UnparsedObject(v) => v.value.to_string(),
25+
}
26+
}
27+
}
28+
29+
impl Serialize for ObservabilityPipelineAmazonS3SourceCompression {
30+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
31+
where
32+
S: Serializer,
33+
{
34+
match self {
35+
Self::UnparsedObject(v) => v.serialize(serializer),
36+
_ => serializer.serialize_str(self.to_string().as_str()),
37+
}
38+
}
39+
}
40+
41+
impl<'de> Deserialize<'de> for ObservabilityPipelineAmazonS3SourceCompression {
42+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
43+
where
44+
D: Deserializer<'de>,
45+
{
46+
let s: String = String::deserialize(deserializer)?;
47+
Ok(match s.as_str() {
48+
"auto" => Self::AUTO,
49+
"none" => Self::NONE,
50+
"gzip" => Self::GZIP,
51+
"zstd" => Self::ZSTD,
52+
_ => Self::UnparsedObject(crate::datadog::UnparsedObject {
53+
value: serde_json::Value::String(s.into()),
54+
}),
55+
})
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2026-04-08T12:44:25.060Z
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"http_interactions": [
3+
{
4+
"request": {
5+
"body": {
6+
"string": "{\"data\":{\"attributes\":{\"config\":{\"destinations\":[{\"id\":\"datadog-logs-destination\",\"inputs\":[\"my-processor-group\"],\"type\":\"datadog_logs\"}],\"processor_groups\":[{\"enabled\":true,\"id\":\"my-processor-group\",\"include\":\"service:my-service\",\"inputs\":[\"amazon-s3-source\"],\"processors\":[{\"enabled\":true,\"id\":\"filter-processor\",\"include\":\"service:my-service\",\"type\":\"filter\"}]}],\"sources\":[{\"compression\":\"gzip\",\"id\":\"amazon-s3-source\",\"region\":\"us-east-1\",\"type\":\"amazon_s3\"}]},\"name\":\"Pipeline with S3 Source Compression\"},\"type\":\"pipelines\"}}",
7+
"encoding": null
8+
},
9+
"headers": {
10+
"Accept": [
11+
"application/json"
12+
],
13+
"Content-Type": [
14+
"application/json"
15+
]
16+
},
17+
"method": "post",
18+
"uri": "https://api.datadoghq.com/api/v2/obs-pipelines/pipelines/validate"
19+
},
20+
"response": {
21+
"body": {
22+
"string": "{\"errors\":[]}\n",
23+
"encoding": null
24+
},
25+
"headers": {
26+
"Content-Type": [
27+
"application/vnd.api+json"
28+
]
29+
},
30+
"status": {
31+
"code": 200,
32+
"message": "OK"
33+
}
34+
},
35+
"recorded_at": "Wed, 08 Apr 2026 12:44:25 GMT"
36+
}
37+
],
38+
"recorded_with": "VCR 6.0.0"
39+
}

tests/scenarios/features/v2/observability_pipelines.feature

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ Feature: Observability Pipelines
207207
Then the response status is 200 OK
208208
And the response "errors" has length 0
209209

210+
@team:DataDog/observability-pipelines
211+
Scenario: Validate an observability pipeline with amazon S3 source compression returns "OK" response
212+
Given new "ValidatePipeline" request
213+
And body with value {"data": {"attributes": {"config": {"destinations": [{"id": "datadog-logs-destination", "inputs": ["my-processor-group"], "type": "datadog_logs"}], "processor_groups": [{"enabled": true, "id": "my-processor-group", "include": "service:my-service", "inputs": ["amazon-s3-source"], "processors": [{"enabled": true, "id": "filter-processor", "include": "service:my-service", "type": "filter"}]}], "sources": [{"id": "amazon-s3-source", "type": "amazon_s3", "region": "us-east-1", "compression": "gzip"}]}, "name": "Pipeline with S3 Source Compression"}, "type": "pipelines"}}
214+
When the request is sent
215+
Then the response status is 200 OK
216+
And the response "errors" has length 0
217+
210218
@team:DataDog/observability-pipelines
211219
Scenario: Validate an observability pipeline with destination secret key returns "OK" response
212220
Given new "ValidatePipeline" request

0 commit comments

Comments
 (0)