Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions internal/impl/azure/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,55 @@ input:
)
})

t.Run("blob_storage_tags", func(t *testing.T) {
u4, err := uuid.NewV4()
require.NoError(t, err)
containerName := u4.String()

client, err := azblob.NewClientFromConnectionString(connString, nil)
require.NoError(t, err)
_, err = client.CreateContainer(t.Context(), containerName, nil)
require.NoError(t, err)

env := service.NewEnvironment()
outConf, err := bsoSpec().ParseYAML(fmt.Sprintf(`
storage_connection_string: %s
container: %s
path: tagged-blob.txt
blob_type: BLOCK
public_access_level: PRIVATE
tags:
Environment: production
Source: test-suite
`, connString, containerName), env)
require.NoError(t, err)

conf, err := bsoConfigFromParsed(outConf)
require.NoError(t, err)

writer, err := newAzureBlobStorageWriter(conf, service.MockResources().Logger())
require.NoError(t, err)

require.NoError(t, writer.Connect(t.Context()))
t.Cleanup(func() { require.NoError(t, writer.Close(context.Background())) })

msg := service.NewMessage([]byte("hello world"))
require.NoError(t, writer.Write(t.Context(), msg))

resp, err := client.ServiceClient().NewContainerClient(containerName).NewBlobClient("tagged-blob.txt").GetTags(t.Context(), nil)
require.NoError(t, err)

tags := make(map[string]string)
for _, tag := range resp.BlobTagSet {
tags[*tag.Key] = *tag.Value
}

assert.Equal(t, map[string]string{
"Environment": "production",
"Source": "test-suite",
}, tags)
})

t.Run("queue_storage", func(t *testing.T) {
dummyQueue := "foo"

Expand Down
75 changes: 70 additions & 5 deletions internal/impl/azure/output_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
"context"
"errors"
"fmt"
"sort"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"

Check failure on line 28 in internal/impl/azure/output_blob_storage.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not properly formatted (gofumpt)
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"

"github.com/redpanda-data/benthos/v4/public/service"
Expand All @@ -32,14 +35,21 @@
// Blob Storage Output Fields
bsoFieldContainer = "container"
bsoFieldPath = "path"
bsoFieldTags = "tags"
bsoFieldBlobType = "blob_type"
bsoFieldPublicAccessLevel = "public_access_level"
)

type bsoTagPair struct {
key string
value *service.InterpolatedString
}

type bsoConfig struct {
client *azblob.Client
Container *service.InterpolatedString
Path *service.InterpolatedString
Tags []bsoTagPair
BlobType *service.InterpolatedString
PublicAccessLevel *service.InterpolatedString
}
Expand All @@ -59,6 +69,23 @@
if conf.Path, err = pConf.FieldInterpolatedString(bsoFieldPath); err != nil {
return
}

var tagMap map[string]*service.InterpolatedString
if tagMap, err = pConf.FieldInterpolatedStringMap(bsoFieldTags); err != nil {
return
}
if len(tagMap) > 10 {
err = fmt.Errorf("at most 10 blob index tags are permitted, got %d", len(tagMap))
return
}
conf.Tags = make([]bsoTagPair, 0, len(tagMap))
for k, v := range tagMap {
conf.Tags = append(conf.Tags, bsoTagPair{key: k, value: v})
}
sort.Slice(conf.Tags, func(i, j int) bool {
return conf.Tags[i].key < conf.Tags[j].key
})

if conf.BlobType, err = pConf.FieldInterpolatedString(bsoFieldBlobType); err != nil {
return
}
Expand Down Expand Up @@ -99,6 +126,14 @@
Example(`${!meta("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`).
Default(`${!counter()}-${!timestamp_unix_nano()}.txt`),
service.NewInterpolatedStringMapField(bsoFieldTags).
Description("Key/value pairs to store with the blob as https://learn.microsoft.com/en-us/azure/storage/blobs/storage-manage-find-blobs[blob index tags^]. A maximum of 10 tags are permitted, tag keys must be between 1 and 128 characters, and tag values must be between 0 and 256 characters. Keys and values are case-sensitive and only support string values. Not supported on storage accounts with hierarchical namespace (Data Lake Gen2).").
Default(map[string]any{}).
Example(map[string]any{
"Environment": "production",
"Source": `${!meta("kafka_topic")}`,
}).
Advanced(),
service.NewInterpolatedStringEnumField(bsoFieldBlobType, "BLOCK", "APPEND").
Description("Block and Append blobs are comprized of blocks, and each blob can support up to 50,000 blocks. The default value is `+\"`BLOCK`\"+`.`").
Advanced().
Expand Down Expand Up @@ -145,15 +180,19 @@
return nil
}

func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, blobName, blobType string, message []byte) error {
func (a *azureBlobStorageWriter) uploadBlob(ctx context.Context, containerName, blobName, blobType string, message []byte, tags map[string]string) error {
containerClient := a.conf.client.ServiceClient().NewContainerClient(containerName)
var err error
if blobType == "APPEND" {
appendBlobClient := containerClient.NewAppendBlobClient(blobName)
_, err = appendBlobClient.AppendBlock(ctx, streaming.NopCloser(bytes.NewReader(message)), nil)
if err != nil {
if isErrorCode(err, bloberror.BlobNotFound) {
_, err := appendBlobClient.Create(ctx, nil)
var createOpts *appendblob.CreateOptions
if len(tags) > 0 {
createOpts = &appendblob.CreateOptions{Tags: tags}
}
_, err := appendBlobClient.Create(ctx, createOpts)
if err != nil && !isErrorCode(err, bloberror.BlobAlreadyExists) {
return fmt.Errorf("creating append blob: %w", err)
}
Expand All @@ -167,8 +206,22 @@
return fmt.Errorf("appending block to blob: %w", err)
}
}
// Tags must be set separately for append blobs since AppendBlock does not
// support tags. On creation they are set via CreateOptions, but when
// appending to an existing blob we need an explicit SetTags call to ensure
// tags reflect the latest configured values.
if len(tags) > 0 {
blobClient := containerClient.NewBlobClient(blobName)
if _, err = blobClient.SetTags(ctx, tags, nil); err != nil {
return fmt.Errorf("setting blob tags: %w", err)
}
}
} else {
_, err = containerClient.NewBlockBlobClient(blobName).UploadStream(ctx, bytes.NewReader(message), nil)
var uploadOpts *blockblob.UploadStreamOptions
if len(tags) > 0 {
uploadOpts = &blockblob.UploadStreamOptions{Tags: tags}
}
_, err = containerClient.NewBlockBlobClient(blobName).UploadStream(ctx, bytes.NewReader(message), uploadOpts)
if err != nil {
return fmt.Errorf("pushing block to blob: %w", err)
}
Expand Down Expand Up @@ -211,7 +264,19 @@
return err
}

if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes); err != nil {
var blobTags map[string]string
if len(a.conf.Tags) > 0 {
blobTags = make(map[string]string, len(a.conf.Tags))
for _, pair := range a.conf.Tags {
tagVal, err := pair.value.TryString(msg)
if err != nil {
return fmt.Errorf("tag %v interpolation: %w", pair.key, err)
}
blobTags[pair.key] = tagVal
}
}

if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes, blobTags); err != nil {
if isErrorCode(err, bloberror.ContainerNotFound) {
var accessLevel string
if accessLevel, err = a.conf.PublicAccessLevel.TryString(msg); err != nil {
Expand All @@ -224,7 +289,7 @@
}
}

if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes); err != nil {
if err := a.uploadBlob(ctx, containerName, blobName, blobType, mBytes, blobTags); err != nil {
return fmt.Errorf("error retrying to upload blob: %s", err)
}
} else {
Expand Down
Loading