Skip to content

Commit b990d94

Browse files
api-clients-generation-pipeline[bot]ci.datadog-api-spec
andauthored
Adding compression optional field to Amazon S3 source (DataDog#3939)
Co-authored-by: ci.datadog-api-spec <packages@datadoghq.com>
1 parent 92e3758 commit b990d94

7 files changed

Lines changed: 244 additions & 9 deletions

.generator/schemas/v2/openapi.yaml

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43045,12 +43045,14 @@ components:
4304543045
ObservabilityPipelineAmazonS3Source:
4304643046
description: |-
4304743047
The `amazon_s3` source ingests logs from an Amazon S3 bucket.
43048-
It supports AWS authentication and TLS encryption.
43048+
It supports AWS authentication, TLS encryption, and configurable compression.
4304943049

4305043050
**Supported pipeline types:** logs
4305143051
properties:
4305243052
auth:
4305343053
$ref: "#/components/schemas/ObservabilityPipelineAwsAuth"
43054+
compression:
43055+
$ref: "#/components/schemas/ObservabilityPipelineAmazonS3SourceCompression"
4305443056
id:
4305543057
description: The unique identifier for this component. Used in other parts of the pipeline to reference this component (for example, as the `input` to downstream components).
4305643058
example: aws-s3-source
@@ -43073,6 +43075,20 @@ components:
4307343075
- region
4307443076
type: object
4307543077
x-pipeline-types: [logs]
43078+
ObservabilityPipelineAmazonS3SourceCompression:
43079+
description: Compression format for objects retrieved from the S3 bucket. Use `auto` to detect compression from the object's Content-Encoding header or file extension.
43080+
enum:
43081+
- auto
43082+
- none
43083+
- gzip
43084+
- zstd
43085+
example: gzip
43086+
type: string
43087+
x-enum-varnames:
43088+
- AUTO
43089+
- NONE
43090+
- GZIP
43091+
- ZSTD
4307643092
ObservabilityPipelineAmazonS3SourceType:
4307743093
default: amazon_s3
4307843094
description: The source type. Always `amazon_s3`.

api/datadogV2/model_observability_pipeline_amazon_s3_source.go

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ import (
1111
)
1212

1313
// ObservabilityPipelineAmazonS3Source The `amazon_s3` source ingests logs from an Amazon S3 bucket.
14-
// It supports AWS authentication and TLS encryption.
14+
// It supports AWS authentication, TLS encryption, and configurable compression.
1515
//
1616
// **Supported pipeline types:** logs
1717
type ObservabilityPipelineAmazonS3Source struct {
1818
// AWS authentication credentials used for accessing AWS services such as S3.
1919
// If omitted, the system’s default credentials are used (for example, the IAM role and environment variables).
2020
Auth *ObservabilityPipelineAwsAuth `json:"auth,omitempty"`
21+
// Compression format for objects retrieved from the S3 bucket. Use `auto` to detect compression from the object's Content-Encoding header or file extension.
22+
Compression *ObservabilityPipelineAmazonS3SourceCompression `json:"compression,omitempty"`
2123
// The unique identifier for this component. Used in other parts of the pipeline to reference this component (for example, as the `input` to downstream components).
2224
Id string `json:"id"`
2325
// AWS region where the S3 bucket resides.
@@ -83,6 +85,34 @@ func (o *ObservabilityPipelineAmazonS3Source) SetAuth(v ObservabilityPipelineAws
8385
o.Auth = &v
8486
}
8587

88+
// GetCompression returns the Compression field value if set, zero value otherwise.
89+
func (o *ObservabilityPipelineAmazonS3Source) GetCompression() ObservabilityPipelineAmazonS3SourceCompression {
90+
if o == nil || o.Compression == nil {
91+
var ret ObservabilityPipelineAmazonS3SourceCompression
92+
return ret
93+
}
94+
return *o.Compression
95+
}
96+
97+
// GetCompressionOk returns a tuple with the Compression field value if set, nil otherwise
98+
// and a boolean to check if the value has been set.
99+
func (o *ObservabilityPipelineAmazonS3Source) GetCompressionOk() (*ObservabilityPipelineAmazonS3SourceCompression, bool) {
100+
if o == nil || o.Compression == nil {
101+
return nil, false
102+
}
103+
return o.Compression, true
104+
}
105+
106+
// HasCompression returns a boolean if a field has been set.
107+
func (o *ObservabilityPipelineAmazonS3Source) HasCompression() bool {
108+
return o != nil && o.Compression != nil
109+
}
110+
111+
// SetCompression gets a reference to the given ObservabilityPipelineAmazonS3SourceCompression and assigns it to the Compression field.
112+
func (o *ObservabilityPipelineAmazonS3Source) SetCompression(v ObservabilityPipelineAmazonS3SourceCompression) {
113+
o.Compression = &v
114+
}
115+
86116
// GetId returns the Id field value.
87117
func (o *ObservabilityPipelineAmazonS3Source) GetId() string {
88118
if o == nil {
@@ -217,6 +247,9 @@ func (o ObservabilityPipelineAmazonS3Source) MarshalJSON() ([]byte, error) {
217247
if o.Auth != nil {
218248
toSerialize["auth"] = o.Auth
219249
}
250+
if o.Compression != nil {
251+
toSerialize["compression"] = o.Compression
252+
}
220253
toSerialize["id"] = o.Id
221254
toSerialize["region"] = o.Region
222255
if o.Tls != nil {
@@ -236,12 +269,13 @@ func (o ObservabilityPipelineAmazonS3Source) MarshalJSON() ([]byte, error) {
236269
// UnmarshalJSON deserializes the given payload.
237270
func (o *ObservabilityPipelineAmazonS3Source) UnmarshalJSON(bytes []byte) (err error) {
238271
all := struct {
239-
Auth *ObservabilityPipelineAwsAuth `json:"auth,omitempty"`
240-
Id *string `json:"id"`
241-
Region *string `json:"region"`
242-
Tls *ObservabilityPipelineTls `json:"tls,omitempty"`
243-
Type *ObservabilityPipelineAmazonS3SourceType `json:"type"`
244-
UrlKey *string `json:"url_key,omitempty"`
272+
Auth *ObservabilityPipelineAwsAuth `json:"auth,omitempty"`
273+
Compression *ObservabilityPipelineAmazonS3SourceCompression `json:"compression,omitempty"`
274+
Id *string `json:"id"`
275+
Region *string `json:"region"`
276+
Tls *ObservabilityPipelineTls `json:"tls,omitempty"`
277+
Type *ObservabilityPipelineAmazonS3SourceType `json:"type"`
278+
UrlKey *string `json:"url_key,omitempty"`
245279
}{}
246280
if err = datadog.Unmarshal(bytes, &all); err != nil {
247281
return datadog.Unmarshal(bytes, &o.UnparsedObject)
@@ -257,7 +291,7 @@ func (o *ObservabilityPipelineAmazonS3Source) UnmarshalJSON(bytes []byte) (err e
257291
}
258292
additionalProperties := make(map[string]interface{})
259293
if err = datadog.Unmarshal(bytes, &additionalProperties); err == nil {
260-
datadog.DeleteKeys(additionalProperties, &[]string{"auth", "id", "region", "tls", "type", "url_key"})
294+
datadog.DeleteKeys(additionalProperties, &[]string{"auth", "compression", "id", "region", "tls", "type", "url_key"})
261295
} else {
262296
return err
263297
}
@@ -267,6 +301,11 @@ func (o *ObservabilityPipelineAmazonS3Source) UnmarshalJSON(bytes []byte) (err e
267301
hasInvalidField = true
268302
}
269303
o.Auth = all.Auth
304+
if all.Compression != nil && !all.Compression.IsValid() {
305+
hasInvalidField = true
306+
} else {
307+
o.Compression = all.Compression
308+
}
270309
o.Id = *all.Id
271310
o.Region = *all.Region
272311
if all.Tls != nil && all.Tls.UnparsedObject != nil && o.UnparsedObject == nil {
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache-2.0 License.
2+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
3+
// Copyright 2019-Present Datadog, Inc.
4+
5+
package datadogV2
6+
7+
import (
8+
"fmt"
9+
10+
"github.com/DataDog/datadog-api-client-go/v2/api/datadog"
11+
)
12+
13+
// ObservabilityPipelineAmazonS3SourceCompression Compression format for objects retrieved from the S3 bucket. Use `auto` to detect compression from the object's Content-Encoding header or file extension.
14+
type ObservabilityPipelineAmazonS3SourceCompression string
15+
16+
// List of ObservabilityPipelineAmazonS3SourceCompression.
17+
const (
18+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_AUTO ObservabilityPipelineAmazonS3SourceCompression = "auto"
19+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_NONE ObservabilityPipelineAmazonS3SourceCompression = "none"
20+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_GZIP ObservabilityPipelineAmazonS3SourceCompression = "gzip"
21+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_ZSTD ObservabilityPipelineAmazonS3SourceCompression = "zstd"
22+
)
23+
24+
var allowedObservabilityPipelineAmazonS3SourceCompressionEnumValues = []ObservabilityPipelineAmazonS3SourceCompression{
25+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_AUTO,
26+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_NONE,
27+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_GZIP,
28+
OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_ZSTD,
29+
}
30+
31+
// GetAllowedValues reeturns the list of possible values.
32+
func (v *ObservabilityPipelineAmazonS3SourceCompression) GetAllowedValues() []ObservabilityPipelineAmazonS3SourceCompression {
33+
return allowedObservabilityPipelineAmazonS3SourceCompressionEnumValues
34+
}
35+
36+
// UnmarshalJSON deserializes the given payload.
37+
func (v *ObservabilityPipelineAmazonS3SourceCompression) UnmarshalJSON(src []byte) error {
38+
var value string
39+
err := datadog.Unmarshal(src, &value)
40+
if err != nil {
41+
return err
42+
}
43+
*v = ObservabilityPipelineAmazonS3SourceCompression(value)
44+
return nil
45+
}
46+
47+
// NewObservabilityPipelineAmazonS3SourceCompressionFromValue returns a pointer to a valid ObservabilityPipelineAmazonS3SourceCompression
48+
// for the value passed as argument, or an error if the value passed is not allowed by the enum.
49+
func NewObservabilityPipelineAmazonS3SourceCompressionFromValue(v string) (*ObservabilityPipelineAmazonS3SourceCompression, error) {
50+
ev := ObservabilityPipelineAmazonS3SourceCompression(v)
51+
if ev.IsValid() {
52+
return &ev, nil
53+
}
54+
return nil, fmt.Errorf("invalid value '%v' for ObservabilityPipelineAmazonS3SourceCompression: valid values are %v", v, allowedObservabilityPipelineAmazonS3SourceCompressionEnumValues)
55+
}
56+
57+
// IsValid return true if the value is valid for the enum, false otherwise.
58+
func (v ObservabilityPipelineAmazonS3SourceCompression) IsValid() bool {
59+
for _, existing := range allowedObservabilityPipelineAmazonS3SourceCompressionEnumValues {
60+
if existing == v {
61+
return true
62+
}
63+
}
64+
return false
65+
}
66+
67+
// Ptr returns reference to ObservabilityPipelineAmazonS3SourceCompression value.
68+
func (v ObservabilityPipelineAmazonS3SourceCompression) Ptr() *ObservabilityPipelineAmazonS3SourceCompression {
69+
return &v
70+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Validate an observability pipeline with amazon S3 source compression returns "OK" response
2+
3+
package main
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
"fmt"
9+
"os"
10+
11+
"github.com/DataDog/datadog-api-client-go/v2/api/datadog"
12+
"github.com/DataDog/datadog-api-client-go/v2/api/datadogV2"
13+
)
14+
15+
func main() {
16+
body := datadogV2.ObservabilityPipelineSpec{
17+
Data: datadogV2.ObservabilityPipelineSpecData{
18+
Attributes: datadogV2.ObservabilityPipelineDataAttributes{
19+
Config: datadogV2.ObservabilityPipelineConfig{
20+
Destinations: []datadogV2.ObservabilityPipelineConfigDestinationItem{
21+
datadogV2.ObservabilityPipelineConfigDestinationItem{
22+
ObservabilityPipelineDatadogLogsDestination: &datadogV2.ObservabilityPipelineDatadogLogsDestination{
23+
Id: "datadog-logs-destination",
24+
Inputs: []string{
25+
"my-processor-group",
26+
},
27+
Type: datadogV2.OBSERVABILITYPIPELINEDATADOGLOGSDESTINATIONTYPE_DATADOG_LOGS,
28+
}},
29+
},
30+
ProcessorGroups: []datadogV2.ObservabilityPipelineConfigProcessorGroup{
31+
{
32+
Enabled: true,
33+
Id: "my-processor-group",
34+
Include: "service:my-service",
35+
Inputs: []string{
36+
"amazon-s3-source",
37+
},
38+
Processors: []datadogV2.ObservabilityPipelineConfigProcessorItem{
39+
datadogV2.ObservabilityPipelineConfigProcessorItem{
40+
ObservabilityPipelineFilterProcessor: &datadogV2.ObservabilityPipelineFilterProcessor{
41+
Enabled: true,
42+
Id: "filter-processor",
43+
Include: "service:my-service",
44+
Type: datadogV2.OBSERVABILITYPIPELINEFILTERPROCESSORTYPE_FILTER,
45+
}},
46+
},
47+
},
48+
},
49+
Sources: []datadogV2.ObservabilityPipelineConfigSourceItem{
50+
datadogV2.ObservabilityPipelineConfigSourceItem{
51+
ObservabilityPipelineAmazonS3Source: &datadogV2.ObservabilityPipelineAmazonS3Source{
52+
Id: "amazon-s3-source",
53+
Type: datadogV2.OBSERVABILITYPIPELINEAMAZONS3SOURCETYPE_AMAZON_S3,
54+
Region: "us-east-1",
55+
Compression: datadogV2.OBSERVABILITYPIPELINEAMAZONS3SOURCECOMPRESSION_GZIP.Ptr(),
56+
}},
57+
},
58+
},
59+
Name: "Pipeline with S3 Source Compression",
60+
},
61+
Type: "pipelines",
62+
},
63+
}
64+
ctx := datadog.NewDefaultContext(context.Background())
65+
configuration := datadog.NewConfiguration()
66+
apiClient := datadog.NewAPIClient(configuration)
67+
api := datadogV2.NewObservabilityPipelinesApi(apiClient)
68+
resp, r, err := api.ValidatePipeline(ctx, body)
69+
70+
if err != nil {
71+
fmt.Fprintf(os.Stderr, "Error when calling `ObservabilityPipelinesApi.ValidatePipeline`: %v\n", err)
72+
fmt.Fprintf(os.Stderr, "Full HTTP response: %v\n", r)
73+
}
74+
75+
responseContent, _ := json.MarshalIndent(resp, "", " ")
76+
fmt.Fprintf(os.Stdout, "Response from `ObservabilityPipelinesApi.ValidatePipeline`:\n%s\n", responseContent)
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
2026-04-08T12:44:25.060Z
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
interactions:
2+
- request:
3+
body: |
4+
{"data":{"attributes":{"config":{"destinations":[{"id":"datadog-logs-destination","inputs":["my-processor-group"],"type":"datadog_logs"}],"processor_groups":[{"enabled":true,"id":"my-processor-group","include":"service:my-service","inputs":["amazon-s3-source"],"processors":[{"enabled":true,"id":"filter-processor","include":"service:my-service","type":"filter"}]}],"sources":[{"compression":"gzip","id":"amazon-s3-source","region":"us-east-1","type":"amazon_s3"}]},"name":"Pipeline with S3 Source Compression"},"type":"pipelines"}}
5+
form: {}
6+
headers:
7+
Accept:
8+
- application/json
9+
Content-Type:
10+
- application/json
11+
id: 0
12+
method: POST
13+
url: https://api.datadoghq.com/api/v2/obs-pipelines/pipelines/validate
14+
response:
15+
body: '{"errors":[]}
16+
17+
'
18+
code: 200
19+
duration: 0ms
20+
headers:
21+
Content-Type:
22+
- application/vnd.api+json
23+
status: 200 OK
24+
version: 2

tests/scenarios/features/v2/observability_pipelines.feature

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ Feature: Observability Pipelines
207207
Then the response status is 200 OK
208208
And the response "errors" has length 0
209209

210+
@team:DataDog/observability-pipelines
211+
Scenario: Validate an observability pipeline with amazon S3 source compression returns "OK" response
212+
Given new "ValidatePipeline" request
213+
And body with value {"data": {"attributes": {"config": {"destinations": [{"id": "datadog-logs-destination", "inputs": ["my-processor-group"], "type": "datadog_logs"}], "processor_groups": [{"enabled": true, "id": "my-processor-group", "include": "service:my-service", "inputs": ["amazon-s3-source"], "processors": [{"enabled": true, "id": "filter-processor", "include": "service:my-service", "type": "filter"}]}], "sources": [{"id": "amazon-s3-source", "type": "amazon_s3", "region": "us-east-1", "compression": "gzip"}]}, "name": "Pipeline with S3 Source Compression"}, "type": "pipelines"}}
214+
When the request is sent
215+
Then the response status is 200 OK
216+
And the response "errors" has length 0
217+
210218
@team:DataDog/observability-pipelines
211219
Scenario: Validate an observability pipeline with destination secret key returns "OK" response
212220
Given new "ValidatePipeline" request

0 commit comments

Comments
 (0)