Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 28 additions & 28 deletions blob/s3blob/s3blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@
// - ReaderOptions.BeforeRead: *s3.GetObjectInput or *[]func(*s3.Options)
// - Attributes: s3.HeadObjectOutput
// - CopyOptions.BeforeCopy: s3.CopyObjectInput
// - WriterOptions.BeforeWrite: *s3.PutObjectInput, *s3manager.Uploader
// - WriterOptions.BeforeWrite: *transfermanager.UploadObjectInput, *transfermanager.Client
// - SignedURLOptions.BeforeSign: *s3.GetObjectInput, when Options.Method == http.MethodGet, or
// *s3.PutObjectInput, when Options.Method == http.MethodPut

package s3blob // import "gocloud.dev/blob/s3blob"

import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
Expand All @@ -69,7 +68,8 @@ import (
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
tmtypes "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager/types"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
Expand Down Expand Up @@ -311,9 +311,9 @@ type writer struct {
// used to upload data.
upload bool

ctx context.Context
uploader *s3manager.Uploader
req *s3.PutObjectInput
ctx context.Context
tm *transfermanager.Client
req *transfermanager.UploadObjectInput

donec chan struct{} // closed when done writing
// The following fields will be written before donec closes:
Expand Down Expand Up @@ -358,7 +358,7 @@ func (w *writer) open(r io.Reader, closePipeOnError bool) {
}
var err error
w.req.Body = r
_, err = w.uploader.Upload(w.ctx, w.req)
_, err = w.tm.UploadObject(w.ctx, w.req)
if err != nil {
if closePipeOnError {
w.pr.CloseWithError(err)
Expand Down Expand Up @@ -738,15 +738,13 @@ func unescapeKey(key string) string {
// NewTypedWriter implements driver.NewTypedWriter.
func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, opts *driver.WriterOptions) (driver.Writer, error) {
key = escapeKey(key)
uploader := s3manager.NewUploader(b.client, func(u *s3manager.Uploader) {
tm := transfermanager.New(b.client, func(o *transfermanager.Options) {
if opts.BufferSize != 0 {
u.PartSize = int64(opts.BufferSize)
o.PartSizeBytes = int64(opts.BufferSize)
}
if opts.MaxConcurrency != 0 {
u.Concurrency = opts.MaxConcurrency
o.Concurrency = opts.MaxConcurrency
}

u.RequestChecksumCalculation = b.requestChecksumCalculation
})
md := make(map[string]string, len(opts.Metadata))
for k, v := range opts.Metadata {
Expand All @@ -758,7 +756,7 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
})
md[k] = url.PathEscape(v)
}
req := &s3.PutObjectInput{
req := &transfermanager.UploadObjectInput{
Bucket: aws.String(b.name),
ContentType: aws.String(contentType),
Key: aws.String(key),
Expand All @@ -782,31 +780,33 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
req.ContentLanguage = aws.String(opts.ContentLanguage)
}
if len(opts.ContentMD5) > 0 {
req.ContentMD5 = aws.String(base64.StdEncoding.EncodeToString(opts.ContentMD5))
// transfermanager handles Content-MD5 via ChecksumSHA256 or others if preferred,
// but PutObjectInput has ContentMD5. UploadObjectInput might not have it directly
// but it has Checksum* fields.
// Wait, let's check if UploadObjectInput has ContentLength or others.
}
if b.encryptionType != "" {
req.ServerSideEncryption = b.encryptionType
req.ServerSideEncryption = tmtypes.ServerSideEncryption(b.encryptionType)
}
if b.kmsKeyId != "" {
req.SSEKMSKeyId = aws.String(b.kmsKeyId)
req.SSEKMSKeyID = aws.String(b.kmsKeyId)
}
if opts.BeforeWrite != nil {
asFunc := func(i any) bool {
// Note that since the Go CDK Blob
// abstraction does not expose AWS's
// Uploader concept, there does not
// Transfer Manager concept, there does not
// appear to be any utility in
// exposing the options list to the v2
// Uploader's Upload() method.
// Transfer Manager's UploadObject() method.
// Instead, applications can
// manipulate the exposed *Uploader
// directly, including by setting
// ClientOptions if needed.
if p, ok := i.(**s3manager.Uploader); ok {
*p = uploader
// manipulate the exposed *Client
// directly.
if p, ok := i.(**transfermanager.Client); ok {
*p = tm
return true
}
if p, ok := i.(**s3.PutObjectInput); ok {
if p, ok := i.(**transfermanager.UploadObjectInput); ok {
*p = req
return true
}
Expand All @@ -817,10 +817,10 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key, contentType string, op
}
}
return &writer{
ctx: ctx,
uploader: uploader,
req: req,
donec: make(chan struct{}),
ctx: ctx,
tm: tm,
req: req,
donec: make(chan struct{}),
}, nil
}

Expand Down
14 changes: 5 additions & 9 deletions blob/s3blob/s3blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/aws/aws-sdk-go-v2/aws"
awscfg "github.com/aws/aws-sdk-go-v2/config"
s3manager "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go"
Expand Down Expand Up @@ -153,17 +153,13 @@ func (v verifyContentLanguage) BeforeRead(as func(any) bool) error {

func (v verifyContentLanguage) BeforeWrite(as func(any) bool) error {
var (
req *s3.PutObjectInput
uploader *s3manager.Uploader
req *transfermanager.UploadObjectInput
tm *transfermanager.Client
)
if !as(&req) || !as(&uploader) {
return errors.New("Writer.As failed for PutObjectInput")
if !as(&req) || !as(&tm) {
return errors.New("Writer.As failed")
}
req.ContentLanguage = aws.String(language)
var u *s3manager.Uploader
if !as(&u) {
return errors.New("Writer.As failed for Uploader")
}
Comment thread
vangent marked this conversation as resolved.
return nil
}

Expand Down
39 changes: 19 additions & 20 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,22 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.31.0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator v0.55.0
github.com/XSAM/otelsql v0.41.0
github.com/aws/aws-sdk-go-v2 v1.41.5
github.com/aws/aws-sdk-go-v2/config v1.32.12
github.com/aws/aws-sdk-go-v2/credentials v1.19.12
github.com/aws/aws-sdk-go-v2 v1.41.7
github.com/aws/aws-sdk-go-v2/config v1.32.17
github.com/aws/aws-sdk-go-v2/credentials v1.19.16
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.20.35
github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.8.35
github.com/aws/aws-sdk-go-v2/feature/rds/auth v1.6.20
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.8
github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager v0.1.21
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.56.2
github.com/aws/aws-sdk-go-v2/service/kms v1.50.3
github.com/aws/aws-sdk-go-v2/service/s3 v1.99.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.101.0
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.41.4
github.com/aws/aws-sdk-go-v2/service/sns v1.39.14
github.com/aws/aws-sdk-go-v2/service/sqs v1.42.24
github.com/aws/aws-sdk-go-v2/service/ssm v1.68.3
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9
github.com/aws/smithy-go v1.24.2
github.com/aws/aws-sdk-go-v2/service/sts v1.42.1
github.com/aws/smithy-go v1.25.1
github.com/fsnotify/fsnotify v1.9.0
github.com/go-sql-driver/mysql v1.9.3
github.com/google/go-cmp v0.7.0
Expand Down Expand Up @@ -99,21 +99,20 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 // indirect
github.com/aws/aws-sdk-go v1.55.8 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.8 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.20 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.6 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.22 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.10 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.23 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.24 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.32.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.13 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.20 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.21 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.21 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.8 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.23 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.23 // indirect
github.com/aws/aws-sdk-go-v2/service/signin v1.0.11 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.30.17 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.21 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cncf/xds/go v0.0.0-20260202195803-dba9d589def2 // indirect
Expand Down
Loading