Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions backends/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,13 +307,12 @@ func New(ctx context.Context, opt Options) (*Backend, error) {

if opt.CreateContainer {
// Create bucket if it does not exist
metricCalls.WithLabelValues("create-container").Inc()
metricLastCallTimestamp.WithLabelValues("create-container").SetToCurrentTime()

start := time.Now()
_, err := client.CreateContainer(ctx, opt.Container, &azblob.CreateContainerOptions{})
if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
return nil, err
}
recordCallMetrics("create-container", start)
}

b := &Backend{
Expand Down Expand Up @@ -399,6 +398,8 @@ func convertAzureError(err error) error {
}

func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
defer recordCallMetrics("list", time.Now())

var blobs simpleblob.BlobList

// Runes to strip from blob names for GlobalPrefix
Expand All @@ -414,6 +415,7 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
for blobPager.More() {
resp, err := blobPager.NextPage(ctx)
if err != nil {
recordErrorMetrics("list", err)
return nil, err
}

Expand All @@ -438,8 +440,6 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
}
}

metricLastCallTimestamp.WithLabelValues("list").SetToCurrentTime()

return blobs, nil
}

Expand All @@ -463,13 +463,14 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
}

func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) {
metricCalls.WithLabelValues("load").Inc()
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()
defer recordCallMetrics("load", time.Now())

// Download the blob's contents and ensure that the download worked properly
blobDownloadResponse, err := b.client.DownloadStream(ctx, b.opt.Container, name, nil)
if err = convertAzureError(err); err != nil {
metricCallErrors.WithLabelValues("load").Inc()
if !errors.Is(err, os.ErrNotExist) {
recordErrorMetrics("load", err)
}
return nil, err
}

Expand Down Expand Up @@ -497,14 +498,12 @@ func (b *Backend) Store(ctx context.Context, name string, data []byte) error {

// doStore is a convenience wrapper around doStoreReader.
func (b *Backend) doStore(ctx context.Context, name string, data []byte) (azblob.UploadStreamResponse, error) {
return b.doStoreReader(ctx, name, bytes.NewReader(data), int64(len(data)))
return b.doStoreReader(ctx, name, bytes.NewReader(data))
}

// doStoreReader stores data with key name in Azure blob, using r as a source for data.
// The value of size may be -1, in case the size is not known.
func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, size int64) (azblob.UploadStreamResponse, error) {
metricCalls.WithLabelValues("store").Inc()
metricLastCallTimestamp.WithLabelValues("store").SetToCurrentTime()
func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader) (azblob.UploadStreamResponse, error) {
defer recordCallMetrics("store", time.Now())

uploadStreamOptions := &azblob.UploadStreamOptions{
Concurrency: b.opt.Concurrency,
Expand All @@ -513,7 +512,7 @@ func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, s
// Perform UploadStream
resp, err := b.client.UploadStream(ctx, b.opt.Container, name, r, uploadStreamOptions)
if err != nil {
metricCallErrors.WithLabelValues("store").Inc()
recordErrorMetrics("store", err)
return azblob.UploadStreamResponse{}, err
}

Expand All @@ -533,8 +532,7 @@ func (b *Backend) Delete(ctx context.Context, name string) error {
}

func (b *Backend) doDelete(ctx context.Context, name string) error {
metricCalls.WithLabelValues("delete").Inc()
metricLastCallTimestamp.WithLabelValues("delete").SetToCurrentTime()
defer recordCallMetrics("delete", time.Now())

_, err := b.client.DeleteBlob(ctx, b.opt.Container, name, nil)

Expand All @@ -543,7 +541,7 @@ func (b *Backend) doDelete(ctx context.Context, name string) error {
if errors.Is(err, os.ErrNotExist) {
return nil
}
metricCallErrors.WithLabelValues("delete").Inc()
recordErrorMetrics("delete", err)
return err
}

Expand Down
38 changes: 31 additions & 7 deletions backends/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
Expand All @@ -30,13 +32,6 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {
t.Fatalf("failed to start container: %v", err)
}

state, err := azuriteContainer.State(ctx)
if err != nil {
t.Fatalf("failed to get container state: %v", err)
}

t.Log(state.Running)

serviceURL, err := azuriteContainer.BlobServiceURL(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -139,3 +134,32 @@ func TestBackend_globalPrefixAndMarker(t *testing.T) {
tester.DoBackendTests(t, b)
assert.NotEmpty(t, b.lastMarker)
}

func TestMetrics(t *testing.T) {
metricName := "storage_azure_call_error_by_type_total"
metricCallErrorsType.Reset() // Ensure no metrics are left from other tests
b := getBackend(t.Context(), t)

// Operations on a healthy backend do not collect any metrics.
assert.Equal(t, 0, testutil.CollectAndCount(metricCallErrorsType, metricName))
var (
_ = b.Store(t.Context(), "my-key", []byte{})
_, _ = b.Load(t.Context(), "my-key")
_ = b.Delete(t.Context(), "my-key")
_, _ = b.Load(t.Context(), "no-such-key") // ErrNotExist is not collected
_ = b.Delete(t.Context(), "no-such-key")
_, _ = b.List(t.Context(), "")
)
assert.Equal(t, 0, testutil.CollectAndCount(metricCallErrorsType, metricName))

// A failed operation generates metrics.
savedContainer := b.opt.Container
b.opt.Container = "non-existent-container"
assert.Error(t, b.Store(t.Context(), "file-in-non-existent-container", []byte{})) // Fails due to missing bucket
b.opt.Container = savedContainer
assert.Equal(t, 1, testutil.CollectAndCount(metricCallErrorsType, metricName))
p, err := testutil.CollectAndFormat(metricCallErrorsType, expfmt.TypeProtoCompact, metricName)
require.NoError(t, err)
assert.Regexp(t, `label:{name:"method"\s+value:"store"}`, string(p))
assert.Regexp(t, `label:{name:"error"\s+value:"ContainerNotFound"}`, string(p))
}
57 changes: 57 additions & 0 deletions backends/azure/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package azure

import (
"context"
"errors"
"net"
"net/url"
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -26,10 +34,59 @@ var (
},
[]string{"method"},
)
metricCallErrorsType = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "storage_azure_call_error_by_type_total",
Help: "Azure API call errors by method and error type",
},
[]string{"method", "error"},
)
metricCallHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "storage_azure_call_duration_seconds",
Help: "Azure API call duration by method",
Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60},
},
[]string{"method"},
)
)

func recordCallMetrics(method string, start time.Time) {
metricCalls.WithLabelValues(method).Inc()
metricLastCallTimestamp.WithLabelValues(method).SetToCurrentTime()
Comment thread
ahouene marked this conversation as resolved.
metricCallHistogram.WithLabelValues(method).Observe(time.Since(start).Seconds())
}

func recordErrorMetrics(method string, err error) {
if err == nil {
return
}
errorLabel := "Unknown"
var netErr *net.OpError
var dnsErr *net.DNSError
var urlErr *url.Error
var azErr *azcore.ResponseError
switch {
case errors.Is(err, os.ErrNotExist):
errorLabel = "NotFound"
case errors.Is(err, context.DeadlineExceeded),
(errors.As(err, &netErr) && netErr.Timeout()):
errorLabel = "Timeout"
case errors.As(err, &dnsErr):
errorLabel = "DNSError"
case errors.As(err, &urlErr):
errorLabel = "URLError"
case errors.As(err, &azErr):
errorLabel = azErr.ErrorCode
}
metricCallErrors.WithLabelValues(method).Inc()
metricCallErrorsType.WithLabelValues(method, errorLabel).Inc()
}

func init() {
prometheus.MustRegister(metricLastCallTimestamp)
prometheus.MustRegister(metricCalls)
prometheus.MustRegister(metricCallErrors)
prometheus.MustRegister(metricCallErrorsType)
prometheus.MustRegister(metricCallHistogram)
}
2 changes: 1 addition & 1 deletion backends/azure/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, e
// if the writing end of the pipe is closed.
// It is okay to write to w.info from this goroutine
// because it will only be used after w.donePipe is closed.
w.info, err = w.backend.doStoreReader(w.ctx, w.name, pr, -1)
w.info, err = w.backend.doStoreReader(w.ctx, w.name, pr)
_ = pr.CloseWithError(err) // Always returns nil.
close(w.donePipe)
}()
Expand Down
41 changes: 41 additions & 0 deletions backends/s3/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package s3

import (
"context"
"errors"
"net"
"net/url"
"os"
"time"

"github.com/minio/minio-go/v7"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -43,6 +51,39 @@ var (
)
)

func recordCallMetrics(method string, start time.Time) {
metricCalls.WithLabelValues(method).Inc()
metricLastCallTimestamp.WithLabelValues(method).SetToCurrentTime()
Comment thread
ahouene marked this conversation as resolved.
metricCallHistogram.WithLabelValues(method).Observe(time.Since(start).Seconds())
}

func recordErrorMetrics(method string, err error) {
if err == nil {
return
}
errorLabel := "Unknown"
var netErr *net.OpError
var dnsErr *net.DNSError
var urlErr *url.Error
errRes := minio.ToErrorResponse(err)
switch {
case errors.Is(err, os.ErrNotExist):
errorLabel = "NotFound"
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, ErrClientTimeout),
(errors.As(err, &netErr) && netErr.Timeout()):
errorLabel = "Timeout"
case errors.As(err, &dnsErr):
errorLabel = "DNSError"
case errors.As(err, &urlErr):
errorLabel = "URLError"
case errRes.Code != "":
errorLabel = errRes.Code
}
metricCallErrors.WithLabelValues(method).Inc()
metricCallErrorsType.WithLabelValues(method, errorLabel).Inc()
}

func init() {
prometheus.MustRegister(metricLastCallTimestamp)
prometheus.MustRegister(metricCalls)
Expand Down
Loading