Skip to content

Commit ce70aad

Browse files
authored
Add metrics for latency of various POSIX operations (#728)
1 parent 19a6c45 commit ce70aad

2 files changed

Lines changed: 102 additions & 26 deletions

File tree

storage/posix/files.go

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/transparency-dev/tessera/internal/migrate"
3838
"github.com/transparency-dev/tessera/internal/parse"
3939
storage "github.com/transparency-dev/tessera/storage/internal"
40+
"go.opentelemetry.io/otel/metric"
4041
"k8s.io/klog/v2"
4142
)
4243

@@ -172,7 +173,9 @@ func (s *Storage) newAppender(ctx context.Context, o *logResourceStorage, opts *
172173
// (e.g. <something>.lock>) to avoid inherent brittleness of the `fcntrl` API
173174
// (*any* `Close` operation on this file (even if it's a different FD) from
174175
// this PID, or overwriting of the file by *any* process breaks the lock.)
175-
func (s *Storage) lockFile(p string) (func() error, error) {
176+
func (s *Storage) lockFile(ctx context.Context, p string) (func() error, error) {
177+
now := time.Now()
178+
176179
p = filepath.Join(s.cfg.Path, stateDir, p)
177180
f, err := os.OpenFile(p, syscall.O_CREAT|syscall.O_RDWR|syscall.O_CLOEXEC, filePerm)
178181
if err != nil {
@@ -188,9 +191,13 @@ func (s *Storage) lockFile(p string) (func() error, error) {
188191
// Keep trying until we manage to get an answer without being interrupted.
189192
for {
190193
if err := syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &flockT); err != syscall.EINTR {
194+
if err == nil {
195+
posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String(fmt.Sprintf("lock-%s", p))))
196+
}
191197
return f.Close, err
192198
}
193199
}
200+
194201
}
195202

196203
// Add takes an entry and queues it for inclusion in the log.
@@ -234,8 +241,8 @@ func (l *logResourceStorage) ReadTile(ctx context.Context, level, index uint64,
234241
})
235242
}
236243

237-
func (l *logResourceStorage) IntegratedSize(_ context.Context) (uint64, error) {
238-
size, _, err := l.s.readTreeState()
244+
func (l *logResourceStorage) IntegratedSize(ctx context.Context) (uint64, error) {
245+
size, _, err := l.s.readTreeState(ctx)
239246
return size, err
240247
}
241248

@@ -254,7 +261,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
254261
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
255262
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
256263
a.s.mu.Lock()
257-
unlock, err := a.s.lockFile(treeStateLock)
264+
unlock, err := a.s.lockFile(ctx, treeStateLock)
258265
if err != nil {
259266
panic(err)
260267
}
@@ -265,7 +272,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
265272
a.s.mu.Unlock()
266273
}()
267274

268-
size, _, err := a.s.readTreeState()
275+
size, _, err := a.s.readTreeState(ctx)
269276
if err != nil {
270277
if !errors.Is(err, os.ErrNotExist) {
271278
return err
@@ -337,7 +344,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
337344
klog.Errorf("Integrate failed: %v", err)
338345
return err
339346
}
340-
if err := a.s.writeTreeState(newSize, newRoot); err != nil {
347+
if err := a.s.writeTreeState(ctx, newSize, newRoot); err != nil {
341348
return fmt.Errorf("failed to write new tree state: %v", err)
342349
}
343350
// Notify that we know for sure there's a new checkpoint, but don't block if there's already
@@ -391,6 +398,8 @@ func (lrs *logResourceStorage) readTiles(ctx context.Context, tileIDs []storage.
391398
// If no complete tile exists at that location, it will attempt to find a
392399
// partial tile for the given tree size at that location.
393400
func (lrs *logResourceStorage) readTile(ctx context.Context, level, index uint64, p uint8) (*api.HashTile, error) {
401+
now := time.Now()
402+
394403
t, err := lrs.ReadTile(ctx, level, index, p)
395404
if err != nil {
396405
if errors.Is(err, os.ErrNotExist) {
@@ -404,6 +413,8 @@ func (lrs *logResourceStorage) readTile(ctx context.Context, level, index uint64
404413
if err := tile.UnmarshalText(t); err != nil {
405414
return nil, fmt.Errorf("failed to parse tile: %w", err)
406415
}
416+
417+
posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("readTile")))
407418
return &tile, nil
408419
}
409420

@@ -425,7 +436,9 @@ func (lrs *logResourceStorage) storeTile(ctx context.Context, level, index, logS
425436
return lrs.writeTile(ctx, level, index, layout.PartialTileSize(level, index, logSize), t)
426437
}
427438

428-
func (lrs *logResourceStorage) writeTile(_ context.Context, level, index uint64, partial uint8, t []byte) error {
439+
func (lrs *logResourceStorage) writeTile(ctx context.Context, level, index uint64, partial uint8, t []byte) error {
440+
now := time.Now()
441+
429442
tPath := layout.TilePath(level, index, partial)
430443

431444
if err := lrs.s.createOverwrite(tPath, t); err != nil {
@@ -454,6 +467,7 @@ func (lrs *logResourceStorage) writeTile(_ context.Context, level, index uint64,
454467
}
455468
}
456469

470+
posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("writeTile")))
457471
return nil
458472
}
459473

@@ -479,7 +493,7 @@ func (a *appender) initialise(ctx context.Context) error {
479493
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
480494
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
481495
a.s.mu.Lock()
482-
unlock, err := a.s.lockFile(treeStateLock)
496+
unlock, err := a.s.lockFile(ctx, treeStateLock)
483497
if err != nil {
484498
panic(err)
485499
}
@@ -493,14 +507,14 @@ func (a *appender) initialise(ctx context.Context) error {
493507
if err := a.s.ensureVersion(compatibilityVersion); err != nil {
494508
return err
495509
}
496-
curSize, _, err := a.s.readTreeState()
510+
curSize, _, err := a.s.readTreeState(ctx)
497511
if err != nil {
498512
if !errors.Is(err, os.ErrNotExist) {
499513
return fmt.Errorf("failed to load checkpoint for log: %v", err)
500514
}
501515
// Create the directory structure and write out an empty checkpoint
502516
klog.Infof("Initializing directory for POSIX log at %q (this should only happen ONCE per log!)", a.s.cfg.Path)
503-
if err := a.s.writeTreeState(0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
517+
if err := a.s.writeTreeState(ctx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
504518
return fmt.Errorf("failed to write tree-state checkpoint: %v", err)
505519
}
506520
if a.newCP != nil {
@@ -551,7 +565,9 @@ func (s *Storage) ensureVersion(version uint16) error {
551565
}
552566

553567
// writeTreeState stores the current tree size and root hash on disk.
554-
func (s *Storage) writeTreeState(size uint64, root []byte) error {
568+
func (s *Storage) writeTreeState(ctx context.Context, size uint64, root []byte) error {
569+
now := time.Now()
570+
555571
raw, err := json.Marshal(treeState{Size: size, Root: root})
556572
if err != nil {
557573
return fmt.Errorf("error in Marshal: %v", err)
@@ -560,11 +576,15 @@ func (s *Storage) writeTreeState(size uint64, root []byte) error {
560576
if err := s.createOverwrite(filepath.Join(stateDir, treeStateFile), raw); err != nil {
561577
return fmt.Errorf("failed to create/overwrite private tree state file: %w", err)
562578
}
579+
580+
posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("writeTreeState")))
563581
return nil
564582
}
565583

566584
// readTreeState reads and returns the currently stored tree state.
567-
func (s *Storage) readTreeState() (uint64, []byte, error) {
585+
func (s *Storage) readTreeState(ctx context.Context) (uint64, []byte, error) {
586+
now := time.Now()
587+
568588
p := filepath.Join(s.cfg.Path, stateDir, treeStateFile)
569589
raw, err := os.ReadFile(p)
570590
if err != nil {
@@ -574,15 +594,19 @@ func (s *Storage) readTreeState() (uint64, []byte, error) {
574594
if err := json.Unmarshal(raw, ts); err != nil {
575595
return 0, nil, fmt.Errorf("error in Unmarshal: %v", err)
576596
}
597+
598+
posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("readTreeState")))
577599
return ts.Size, ts.Root, nil
578600
}
579601

580602
// publishCheckpoint checks whether the currently published checkpoint (if any) is more than
581603
// minStaleness old, and, if so, creates and published a fresh checkpoint from the current
582604
// stored tree state.
583605
func (a *appender) publishCheckpoint(ctx context.Context, minStaleness time.Duration) error {
606+
now := time.Now()
607+
584608
// Lock the destination "published" checkpoint location:
585-
unlock, err := a.s.lockFile(publishLock)
609+
unlock, err := a.s.lockFile(ctx, publishLock)
586610
if err != nil {
587611
return fmt.Errorf("lockFile(%s): %v", publishLock, err)
588612
}
@@ -603,7 +627,7 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStaleness time.Dura
603627
return nil
604628
}
605629
}
606-
size, root, err := a.s.readTreeState()
630+
size, root, err := a.s.readTreeState(ctx)
607631
if err != nil {
608632
return fmt.Errorf("readTreeState: %v", err)
609633
}
@@ -618,6 +642,8 @@ func (a *appender) publishCheckpoint(ctx context.Context, minStaleness time.Dura
618642

619643
klog.V(2).Infof("Published latest checkpoint: %d, %x", size, root)
620644

645+
posixOpsHistogram.Record(ctx, time.Since(now).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint")))
646+
621647
return nil
622648
}
623649

@@ -701,7 +727,7 @@ func (s *Storage) readGCState() (uint64, error) {
701727

702728
func (s *Storage) garbageCollect(ctx context.Context, treeSize uint64, maxBundles uint) error {
703729
// Lock the gc location:
704-
unlock, err := s.lockFile(gcStateLock)
730+
unlock, err := s.lockFile(ctx, gcStateLock)
705731
if err != nil {
706732
return fmt.Errorf("lockFile(%s): %v", gcStateLock, err)
707733
}
@@ -813,7 +839,7 @@ func (s *Storage) MigrationWriter(ctx context.Context, opts *tessera.MigrationOp
813839
},
814840
bundleHasher: opts.LeafHasher(),
815841
}
816-
if err := r.initialise(); err != nil {
842+
if err := r.initialise(ctx); err != nil {
817843
return nil, nil, err
818844
}
819845
return r, r.logStorage, nil
@@ -841,7 +867,7 @@ func (m *MigrationStorage) AwaitIntegration(ctx context.Context, sourceSize uint
841867
if err := m.buildTree(ctx, sourceSize); err != nil {
842868
klog.Warningf("buildTree: %v", err)
843869
}
844-
s, r, err := m.s.readTreeState()
870+
s, r, err := m.s.readTreeState(ctx)
845871
if err != nil {
846872
klog.Warningf("readTreeState: %v", err)
847873
}
@@ -851,7 +877,7 @@ func (m *MigrationStorage) AwaitIntegration(ctx context.Context, sourceSize uint
851877
}
852878
}
853879

854-
func (m *MigrationStorage) initialise() error {
880+
func (m *MigrationStorage) initialise(ctx context.Context) error {
855881
// Idempotent: If folder exists, nothing happens.
856882
if err := mkdirAll(filepath.Join(m.s.cfg.Path, stateDir), dirPerm); err != nil {
857883
return fmt.Errorf("failed to create log directory: %q", err)
@@ -860,7 +886,7 @@ func (m *MigrationStorage) initialise() error {
860886
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
861887
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
862888
m.s.mu.Lock()
863-
unlock, err := m.s.lockFile(treeStateLock)
889+
unlock, err := m.s.lockFile(ctx, treeStateLock)
864890
if err != nil {
865891
panic(err)
866892
}
@@ -874,14 +900,14 @@ func (m *MigrationStorage) initialise() error {
874900
if err := m.s.ensureVersion(compatibilityVersion); err != nil {
875901
return err
876902
}
877-
curSize, _, err := m.s.readTreeState()
903+
curSize, _, err := m.s.readTreeState(ctx)
878904
if err != nil {
879905
if !errors.Is(err, os.ErrNotExist) {
880906
return fmt.Errorf("failed to load checkpoint for log: %v", err)
881907
}
882908
// Create the directory structure and write out an empty checkpoint
883909
klog.Infof("Initializing directory for POSIX log at %q (this should only happen ONCE per log!)", m.s.cfg.Path)
884-
if err := m.s.writeTreeState(0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
910+
if err := m.s.writeTreeState(ctx, 0, rfc6962.DefaultHasher.EmptyRoot()); err != nil {
885911
return fmt.Errorf("failed to write tree-state checkpoint: %v", err)
886912
}
887913
return nil
@@ -895,8 +921,8 @@ func (m *MigrationStorage) SetEntryBundle(ctx context.Context, index uint64, par
895921
return m.logStorage.writeBundle(ctx, index, partial, bundle)
896922
}
897923

898-
func (m *MigrationStorage) IntegratedSize(_ context.Context) (uint64, error) {
899-
sz, _, err := m.s.readTreeState()
924+
func (m *MigrationStorage) IntegratedSize(ctx context.Context) (uint64, error) {
925+
sz, _, err := m.s.readTreeState(ctx)
900926
return sz, err
901927
}
902928

@@ -905,7 +931,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
905931
// - The mutex `Lock()` ensures that multiple concurrent calls to this function within a task are serialised.
906932
// - The POSIX `lockFile()` ensures that distinct tasks are serialised.
907933
m.s.mu.Lock()
908-
unlock, err := m.s.lockFile(treeStateLock)
934+
unlock, err := m.s.lockFile(ctx, treeStateLock)
909935
if err != nil {
910936
panic(err)
911937
}
@@ -916,7 +942,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
916942
m.s.mu.Unlock()
917943
}()
918944

919-
size, _, err := m.s.readTreeState()
945+
size, _, err := m.s.readTreeState(ctx)
920946
if err != nil {
921947
if !errors.Is(err, os.ErrNotExist) {
922948
return err
@@ -941,7 +967,7 @@ func (m *MigrationStorage) buildTree(ctx context.Context, targetSize uint64) err
941967
if err != nil {
942968
return fmt.Errorf("doIntegrate(%d, ...): %v", size, err)
943969
}
944-
if err := m.s.writeTreeState(newSize, newRoot); err != nil {
970+
if err := m.s.writeTreeState(ctx, newSize, newRoot); err != nil {
945971
return fmt.Errorf("failed to write new tree state: %v", err)
946972
}
947973

storage/posix/otel.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2025 The Tessera authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package posix
16+
17+
import (
18+
"go.opentelemetry.io/otel"
19+
"go.opentelemetry.io/otel/attribute"
20+
"go.opentelemetry.io/otel/metric"
21+
"k8s.io/klog/v2"
22+
)
23+
24+
const name = "github.com/transparency-dev/tessera/storage/posix"
25+
26+
var (
27+
meter = otel.Meter(name)
28+
29+
opNameKey = attribute.Key("op_name")
30+
)
31+
32+
var (
33+
posixOpsHistogram metric.Int64Histogram
34+
35+
// Custom histogram buckets as we're interested in low-millis upto low-seconds.
36+
histogramBuckets = []float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000}
37+
)
38+
39+
func init() {
40+
var err error
41+
42+
posixOpsHistogram, err = meter.Int64Histogram(
43+
"tessera.appender.ops.duration",
44+
metric.WithDescription("Duration of calls to POSIX file operations"),
45+
metric.WithUnit("ms"),
46+
metric.WithExplicitBucketBoundaries(histogramBuckets...))
47+
if err != nil {
48+
klog.Exitf("Failed to create posixOptsHistogram metric: %v", err)
49+
}
50+
}

0 commit comments

Comments
 (0)