diff --git a/backends/azure/azure.go b/backends/azure/azure.go index 2932c08..3197615 100644 --- a/backends/azure/azure.go +++ b/backends/azure/azure.go @@ -307,13 +307,12 @@ func New(ctx context.Context, opt Options) (*Backend, error) { if opt.CreateContainer { // Create bucket if it does not exist - metricCalls.WithLabelValues("create-container").Inc() - metricLastCallTimestamp.WithLabelValues("create-container").SetToCurrentTime() - + start := time.Now() _, err := client.CreateContainer(ctx, opt.Container, &azblob.CreateContainerOptions{}) if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) { return nil, err } + recordCallMetrics("create-container", start) } b := &Backend{ @@ -399,6 +398,8 @@ func convertAzureError(err error) error { } func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) { + defer recordCallMetrics("list", time.Now()) + var blobs simpleblob.BlobList // Runes to strip from blob names for GlobalPrefix @@ -414,6 +415,7 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis for blobPager.More() { resp, err := blobPager.NextPage(ctx) if err != nil { + recordErrorMetrics("list", err) return nil, err } @@ -438,8 +440,6 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis } } - metricLastCallTimestamp.WithLabelValues("list").SetToCurrentTime() - return blobs, nil } @@ -463,13 +463,14 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) { } func (b *Backend) doLoadReader(ctx context.Context, name string) (io.ReadCloser, error) { - metricCalls.WithLabelValues("load").Inc() - metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime() + defer recordCallMetrics("load", time.Now()) // Download the blob's contents and ensure that the download worked properly blobDownloadResponse, err := b.client.DownloadStream(ctx, b.opt.Container, name, nil) if err = convertAzureError(err); err != nil { - metricCallErrors.WithLabelValues("load").Inc() + if !errors.Is(err, os.ErrNotExist) { + recordErrorMetrics("load", err) + } return nil, err } @@ -497,14 +498,12 @@ func (b *Backend) Store(ctx context.Context, name string, data []byte) error { // doStore is a convenience wrapper around doStoreReader. func (b *Backend) doStore(ctx context.Context, name string, data []byte) (azblob.UploadStreamResponse, error) { - return b.doStoreReader(ctx, name, bytes.NewReader(data), int64(len(data))) + return b.doStoreReader(ctx, name, bytes.NewReader(data)) } // doStoreReader stores data with key name in Azure blob, using r as a source for data. -// The value of size may be -1, in case the size is not known. -func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, size int64) (azblob.UploadStreamResponse, error) { - metricCalls.WithLabelValues("store").Inc() - metricLastCallTimestamp.WithLabelValues("store").SetToCurrentTime() +func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader) (azblob.UploadStreamResponse, error) { + defer recordCallMetrics("store", time.Now()) uploadStreamOptions := &azblob.UploadStreamOptions{ Concurrency: b.opt.Concurrency, @@ -513,7 +512,7 @@ func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, s // Perform UploadStream resp, err := b.client.UploadStream(ctx, b.opt.Container, name, r, uploadStreamOptions) if err != nil { - metricCallErrors.WithLabelValues("store").Inc() + recordErrorMetrics("store", err) return azblob.UploadStreamResponse{}, err } @@ -533,8 +532,7 @@ func (b *Backend) Delete(ctx context.Context, name string) error { } func (b *Backend) doDelete(ctx context.Context, name string) error { - metricCalls.WithLabelValues("delete").Inc() - metricLastCallTimestamp.WithLabelValues("delete").SetToCurrentTime() + defer recordCallMetrics("delete", time.Now()) _, err := b.client.DeleteBlob(ctx, b.opt.Container, name, nil) @@ -543,7 +541,7 @@ func (b *Backend) doDelete(ctx context.Context, name string) error { if errors.Is(err, os.ErrNotExist) { return nil } - metricCallErrors.WithLabelValues("delete").Inc() + recordErrorMetrics("delete", err) return err } diff --git a/backends/azure/azure_test.go b/backends/azure/azure_test.go index 5c7fa23..9ecbbd9 100644 --- a/backends/azure/azure_test.go +++ b/backends/azure/azure_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/expfmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" @@ -30,13 +32,6 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) { t.Fatalf("failed to start container: %v", err) } - state, err := azuriteContainer.State(ctx) - if err != nil { - t.Fatalf("failed to get container state: %v", err) - } - - t.Log(state.Running) - serviceURL, err := azuriteContainer.BlobServiceURL(ctx) require.NoError(t, err) @@ -139,3 +134,32 @@ func TestBackend_globalPrefixAndMarker(t *testing.T) { tester.DoBackendTests(t, b) assert.NotEmpty(t, b.lastMarker) } + +func TestMetrics(t *testing.T) { + metricName := "storage_azure_call_error_by_type_total" + metricCallErrorsType.Reset() // Ensure no metrics are left from other tests + b := getBackend(t.Context(), t) + + // Operations on a healthy backend do not collect any metrics. + assert.Equal(t, 0, testutil.CollectAndCount(metricCallErrorsType, metricName)) + var ( + _ = b.Store(t.Context(), "my-key", []byte{}) + _, _ = b.Load(t.Context(), "my-key") + _ = b.Delete(t.Context(), "my-key") + _, _ = b.Load(t.Context(), "no-such-key") // ErrNotExist is not collected + _ = b.Delete(t.Context(), "no-such-key") + _, _ = b.List(t.Context(), "") + ) + assert.Equal(t, 0, testutil.CollectAndCount(metricCallErrorsType, metricName)) + + // A failed operation generates metrics. + savedContainer := b.opt.Container + b.opt.Container = "non-existent-container" + assert.Error(t, b.Store(t.Context(), "file-in-non-existent-container", []byte{})) // Fails due to missing bucket + b.opt.Container = savedContainer + assert.Equal(t, 1, testutil.CollectAndCount(metricCallErrorsType, metricName)) + p, err := testutil.CollectAndFormat(metricCallErrorsType, expfmt.TypeProtoCompact, metricName) + require.NoError(t, err) + assert.Regexp(t, `label:{name:"method"\s+value:"store"}`, string(p)) + assert.Regexp(t, `label:{name:"error"\s+value:"ContainerNotFound"}`, string(p)) +} diff --git a/backends/azure/metrics.go b/backends/azure/metrics.go index e38ca3c..df5082e 100644 --- a/backends/azure/metrics.go +++ b/backends/azure/metrics.go @@ -1,6 +1,14 @@ package azure import ( + "context" + "errors" + "net" + "net/url" + "os" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/prometheus/client_golang/prometheus" ) @@ -26,10 +34,59 @@ var ( }, []string{"method"}, ) + metricCallErrorsType = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "storage_azure_call_error_by_type_total", + Help: "Azure API call errors by method and error type", + }, + []string{"method", "error"}, + ) + metricCallHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "storage_azure_call_duration_seconds", + Help: "Azure API call duration by method", + Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30, 60}, + }, + []string{"method"}, + ) ) +func recordCallMetrics(method string, start time.Time) { + metricCalls.WithLabelValues(method).Inc() + metricLastCallTimestamp.WithLabelValues(method).SetToCurrentTime() + metricCallHistogram.WithLabelValues(method).Observe(time.Since(start).Seconds()) +} + +func recordErrorMetrics(method string, err error) { + if err == nil { + return + } + errorLabel := "Unknown" + var netErr *net.OpError + var dnsErr *net.DNSError + var urlErr *url.Error + var azErr *azcore.ResponseError + switch { + case errors.Is(err, os.ErrNotExist): + errorLabel = "NotFound" + case errors.Is(err, context.DeadlineExceeded), + (errors.As(err, &netErr) && netErr.Timeout()): + errorLabel = "Timeout" + case errors.As(err, &dnsErr): + errorLabel = "DNSError" + case errors.As(err, &urlErr): + errorLabel = "URLError" + case errors.As(err, &azErr): + errorLabel = azErr.ErrorCode + } + metricCallErrors.WithLabelValues(method).Inc() + metricCallErrorsType.WithLabelValues(method, errorLabel).Inc() +} + func init() { prometheus.MustRegister(metricLastCallTimestamp) prometheus.MustRegister(metricCalls) prometheus.MustRegister(metricCallErrors) + prometheus.MustRegister(metricCallErrorsType) + prometheus.MustRegister(metricCallHistogram) } diff --git a/backends/azure/stream.go b/backends/azure/stream.go index 2308d29..eb32057 100644 --- a/backends/azure/stream.go +++ b/backends/azure/stream.go @@ -40,7 +40,7 @@ func (b *Backend) NewWriter(ctx context.Context, name string) (io.WriteCloser, e // if the writing end of the pipe is closed. // It is okay to write to w.info from this goroutine // because it will only be used after w.donePipe is closed. - w.info, err = w.backend.doStoreReader(w.ctx, w.name, pr, -1) + w.info, err = w.backend.doStoreReader(w.ctx, w.name, pr) _ = pr.CloseWithError(err) // Always returns nil. close(w.donePipe) }() diff --git a/backends/s3/metrics.go b/backends/s3/metrics.go index a935fa7..c93e57d 100644 --- a/backends/s3/metrics.go +++ b/backends/s3/metrics.go @@ -1,6 +1,14 @@ package s3 import ( + "context" + "errors" + "net" + "net/url" + "os" + "time" + + "github.com/minio/minio-go/v7" "github.com/prometheus/client_golang/prometheus" ) @@ -43,6 +51,39 @@ var ( ) ) +func recordCallMetrics(method string, start time.Time) { + metricCalls.WithLabelValues(method).Inc() + metricLastCallTimestamp.WithLabelValues(method).SetToCurrentTime() + metricCallHistogram.WithLabelValues(method).Observe(time.Since(start).Seconds()) +} + +func recordErrorMetrics(method string, err error) { + if err == nil { + return + } + errorLabel := "Unknown" + var netErr *net.OpError + var dnsErr *net.DNSError + var urlErr *url.Error + errRes := minio.ToErrorResponse(err) + switch { + case errors.Is(err, os.ErrNotExist): + errorLabel = "NotFound" + case errors.Is(err, context.DeadlineExceeded), + errors.Is(err, ErrClientTimeout), + (errors.As(err, &netErr) && netErr.Timeout()): + errorLabel = "Timeout" + case errors.As(err, &dnsErr): + errorLabel = "DNSError" + case errors.As(err, &urlErr): + errorLabel = "URLError" + case errRes.Code != "": + errorLabel = errRes.Code + } + metricCallErrors.WithLabelValues(method).Inc() + metricCallErrorsType.WithLabelValues(method, errorLabel).Inc() +} + func init() { prometheus.MustRegister(metricLastCallTimestamp) prometheus.MustRegister(metricCalls) diff --git a/backends/s3/s3.go b/backends/s3/s3.go index 385a05f..5c1a3e2 100644 --- a/backends/s3/s3.go +++ b/backends/s3/s3.go @@ -247,15 +247,8 @@ func (b *Backend) List(ctx context.Context, prefix string) (blobList simpleblob. return blobs.WithPrefix(prefix), nil } -func recordMinioDurationMetric(method string, start time.Time) { - elapsed := time.Since(start) - metricCallHistogram.WithLabelValues(method).Observe(elapsed.Seconds()) -} - func (b *Backend) doList(ctx context.Context, prefix string) (blobs simpleblob.BlobList, err error) { - metricCalls.WithLabelValues("list").Inc() - metricLastCallTimestamp.WithLabelValues("list").SetToCurrentTime() - defer recordMinioDurationMetric("list", time.Now()) + defer recordCallMetrics("list", time.Now()) // Runes to strip from blob names for GlobalPrefix // This is fine, because we can trust the API to only return with the prefix. @@ -267,8 +260,7 @@ func (b *Backend) doList(ctx context.Context, prefix string) (blobs simpleblob.B }) for obj := range objIter { if err = convertError(ctx, obj.Err, true); err != nil { - metricCallErrors.WithLabelValues("list").Inc() - metricCallErrorsType.WithLabelValues("list", errorToMetricsLabel(err)).Inc() + recordErrorMetrics("list", err) return nil, err } @@ -317,15 +309,12 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) { } func (b *Backend) doLoadReader(ctx context.Context, name string) (*minio.Object, error) { - metricCalls.WithLabelValues("load").Inc() - metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime() - defer recordMinioDurationMetric("load", time.Now()) + defer recordCallMetrics("load", time.Now()) obj, err := b.client.GetObject(ctx, b.opt.Bucket, name, minio.GetObjectOptions{}) if err = convertError(ctx, err, false); err != nil { if !errors.Is(err, os.ErrNotExist) { - metricCallErrors.WithLabelValues("load").Inc() - metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc() + recordErrorMetrics("load", err) } return nil, err } @@ -335,8 +324,7 @@ func (b *Backend) doLoadReader(ctx context.Context, name string) (*minio.Object, info, err := obj.Stat() if err = convertError(ctx, err, false); err != nil { if !errors.Is(err, os.ErrNotExist) { - metricCallErrors.WithLabelValues("load").Inc() - metricCallErrorsType.WithLabelValues("load", errorToMetricsLabel(err)).Inc() + recordErrorMetrics("load", err) } return nil, err } @@ -369,9 +357,7 @@ func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio. // doStoreReader stores data with key name in S3, using r as a source for data. // The value of size may be -1, in case the size is not known. func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, size int64) (minio.UploadInfo, error) { - metricCalls.WithLabelValues("store").Inc() - metricLastCallTimestamp.WithLabelValues("store").SetToCurrentTime() - defer recordMinioDurationMetric("store", time.Now()) + defer recordCallMetrics("store", time.Now()) putObjectOptions := minio.PutObjectOptions{ NumThreads: b.opt.NumMinioThreads, @@ -382,8 +368,7 @@ func (b *Backend) doStoreReader(ctx context.Context, name string, r io.Reader, s // minio accepts size == -1, meaning the size is unknown. info, err := b.client.PutObject(ctx, b.opt.Bucket, name, r, size, putObjectOptions) if err = convertError(ctx, err, false); err != nil { - metricCallErrors.WithLabelValues("store").Inc() - metricCallErrorsType.WithLabelValues("store", errorToMetricsLabel(err)).Inc() + recordErrorMetrics("store", err) return info, err } return info, nil @@ -402,14 +387,11 @@ func (b *Backend) Delete(ctx context.Context, name string) error { } func (b *Backend) doDelete(ctx context.Context, name string) error { - metricCalls.WithLabelValues("delete").Inc() - metricLastCallTimestamp.WithLabelValues("delete").SetToCurrentTime() - defer recordMinioDurationMetric("delete", time.Now()) + defer recordCallMetrics("delete", time.Now()) err := b.client.RemoveObject(ctx, b.opt.Bucket, name, minio.RemoveObjectOptions{}) if err = convertError(ctx, err, false); err != nil { - metricCallErrors.WithLabelValues("delete").Inc() - metricCallErrorsType.WithLabelValues("delete", errorToMetricsLabel(err)).Inc() + recordErrorMetrics("delete", err) return err } return nil @@ -565,15 +547,14 @@ func New(ctx context.Context, opt Options) (*Backend, error) { if opt.CreateBucket { // Create bucket if it does not exist - metricCalls.WithLabelValues("create-bucket").Inc() - metricLastCallTimestamp.WithLabelValues("create-bucket").SetToCurrentTime() - + start := time.Now() err := client.MakeBucket(ctx, opt.Bucket, minio.MakeBucketOptions{Region: opt.Region}) if err != nil { if err := convertError(ctx, err, false); err != nil { return nil, err } } + recordCallMetrics("create-bucket", start) } b := &Backend{ @@ -593,41 +574,6 @@ func (b *Backend) setGlobalPrefix(prefix string) { b.markerName = b.prependGlobalPrefix(UpdateMarkerFilename) } -// errorToMetricsLabel converts an error into a prometheus label. -// If error is a NotExist error, "NotFound" is returned. -// If error is a timeout, "Timeout" is returned. -// If error is a DNS error, the DNS error is returned. -// If error is a URL error, the URL error is returned. -// If error is a MinIO error, the MinIO error code is returned. -// Otherwise "Unknown" is returned. -func errorToMetricsLabel(err error) string { - if err == nil { - return "ok" - } - if errors.Is(err, os.ErrNotExist) { - return "NotFound" - } - var netError *net.OpError - if errors.Is(err, context.DeadlineExceeded) || - errors.Is(err, ErrClientTimeout) || - (errors.As(err, &netError) && netError.Timeout()) { - return "Timeout" - } - var dnsErr *net.DNSError - if errors.As(err, &dnsErr) { - return "DNSError" - } - var urlErr *url.Error - if errors.As(err, &urlErr) { - return "URLError" - } - errRes := minio.ToErrorResponse(err) - if errRes.Code != "" { - return errRes.Code - } - return "Unknown" -} - func getOpt[T comparable](optVal, defaultVal T) T { var zero T if optVal == zero {