Skip to content

Commit dbef5ef

Browse files
authored
(monitoring) Added GCS/NFS in-flight read counters (#2672)
1 parent 0cd0346 commit dbef5ef

3 files changed

Lines changed: 40 additions & 2 deletions

File tree

packages/shared/pkg/storage/storage_cache_seekable.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/google/uuid"
1515
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
1616
"go.opentelemetry.io/otel/attribute"
17+
"go.opentelemetry.io/otel/metric"
1718
"go.opentelemetry.io/otel/trace"
1819
"go.uber.org/zap"
1920

@@ -52,6 +53,11 @@ var (
5253
"Total bytes written to NFS",
5354
"Total writes to NFS",
5455
))
56+
nfsCacheConcurrentReads = utils.Must(meter.Int64UpDownCounter(
57+
"orchestrator.storage.slab.nfs.read.concurrent",
58+
metric.WithDescription("Number of NFS cache range readers currently open"),
59+
metric.WithUnit("{read}"),
60+
))
5561
)
5662

5763
type featureFlagsClient interface {
@@ -119,7 +125,7 @@ func (c *cachedSeekable) OpenRangeReader(ctx context.Context, off int64, length
119125
rc := io.ReadCloser(&fsRangeReadCloser{Reader: io.NewSectionReader(fp, 0, length), file: fp})
120126
rc = withSpan(rc, span)
121127

122-
return rc, nil
128+
return withNFSGauge(ctx, rc), nil
123129
}
124130

125131
if !os.IsNotExist(err) {
@@ -169,6 +175,26 @@ func (r *spanReadCloser) Close() error {
169175
return err
170176
}
171177

178+
// nfsGaugeReadCloser wraps a reader and decrements the NFS concurrent reads
179+
// gauge on Close.
180+
type nfsGaugeReadCloser struct {
181+
io.ReadCloser
182+
183+
ctx context.Context //nolint:containedctx // needed for gauge decrement in Close
184+
}
185+
186+
func (r *nfsGaugeReadCloser) Close() error {
187+
nfsCacheConcurrentReads.Add(r.ctx, -1)
188+
189+
return r.ReadCloser.Close()
190+
}
191+
192+
func withNFSGauge(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
193+
nfsCacheConcurrentReads.Add(ctx, 1)
194+
195+
return &nfsGaugeReadCloser{ReadCloser: rc, ctx: ctx}
196+
}
197+
172198
// newCacheWriteThroughReader wraps a reader, buffering all data read through it.
173199
// On Close, it asynchronously writes the buffered data to the NFS cache only
174200
// if the total bytes read match the expected length (to avoid caching truncated data).

packages/shared/pkg/storage/storage_cache_seekable_compressed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (c *cachedSeekable) openReaderCompressed(ctx context.Context, offsetU int64
4444
return nil, fmt.Errorf("decompress cached frame: %w", err)
4545
}
4646

47-
return decompressed, nil
47+
return withNFSGauge(ctx, decompressed), nil
4848
case !os.IsNotExist(err):
4949
recordCacheReadError(ctx, cacheTypeSeekable, cacheOpOpenRangeReader, err)
5050
}

packages/shared/pkg/storage/storage_google.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"cloud.google.com/go/storage"
1818
"github.com/googleapis/gax-go/v2"
1919
"go.opentelemetry.io/otel/attribute"
20+
"go.opentelemetry.io/otel/metric"
2021
"go.uber.org/zap"
2122
"google.golang.org/api/iterator"
2223
"google.golang.org/api/option"
@@ -68,6 +69,11 @@ var (
6869
"Total bytes written to GCS",
6970
"Total writes to GCS",
7071
))
72+
gcsConcurrentReads = utils.Must(meter.Int64UpDownCounter(
73+
"orchestrator.storage.gcs.read.concurrent",
74+
metric.WithDescription("Number of GCS range readers currently open"),
75+
metric.WithUnit("{read}"),
76+
))
7177
)
7278

7379
type gcpStorage struct {
@@ -559,6 +565,8 @@ func (o *gcpObject) OpenRangeReader(ctx context.Context, offsetU int64, length i
559565
return nil, err
560566
}
561567

568+
gcsConcurrentReads.Add(ctx, 1)
569+
562570
return &timedReadCloser{inner: rc, timer: timer, ctx: ctx}, nil
563571
}
564572

@@ -584,6 +592,8 @@ func (o *gcpObject) OpenRangeReader(ctx context.Context, offsetU int64, length i
584592
return nil, err
585593
}
586594

595+
gcsConcurrentReads.Add(ctx, 1)
596+
587597
return &timedReadCloser{inner: decompressed, timer: timer, ctx: ctx}, nil
588598
}
589599

@@ -609,6 +619,8 @@ func (r *timedReadCloser) Read(p []byte) (int, error) {
609619
}
610620

611621
func (r *timedReadCloser) Close() error {
622+
gcsConcurrentReads.Add(r.ctx, -1)
623+
612624
err := r.inner.Close()
613625

614626
if r.closeErr != nil || err != nil {

0 commit comments

Comments
 (0)