Skip to content

Commit 7016700

Browse files
committed
export assign/integrate latency
1 parent 8693957 commit 7016700

3 files changed

Lines changed: 16 additions & 0 deletions

File tree

storage/aws/aws.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ func (a *Appender) integrateEntriesJob(ctx context.Context) {
297297
}
298298

299299
if err := otel.TraceErr(ctx, "tessera.storage.aws.integrateEntriesJob", tracer, func(ctx context.Context, span trace.Span) error {
300+
start := time.Now()
301+
defer opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("integrateEntries")))
302+
300303
ctx, cancel := context.WithTimeout(ctx, defaultIntegrationTimeout)
301304
defer cancel() // Note: ok because we're in a func passed to TraceErr here!
302305

@@ -1054,7 +1057,11 @@ func (s *mySQLSequencer) initDB(ctx context.Context) error {
10541057
// index assigned to the first entry in the batch.
10551058
func (s *mySQLSequencer) assignEntries(ctx context.Context, entries []*tessera.Entry) error {
10561059
return otel.TraceErr(ctx, "tessera.storage.gcp.assignEntries", tracer, func(ctx context.Context, span trace.Span) error {
1060+
start := time.Now()
1061+
defer opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("assignEntries")))
1062+
10571063
span.SetAttributes(numEntriesKey.Int(len(entries)))
1064+
10581065
// First grab the treeSize in a non-locking read-only fashion (we don't want to block/collide with integration).
10591066
// We'll use this value to determine whether we need to apply back-pressure.
10601067
var treeSize uint64

storage/gcp/gcp.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ func (a *Appender) integrateEntriesJob(ctx context.Context) {
345345
}
346346

347347
if err := otel.TraceErr(ctx, "tessera.storage.gcp.integrateEntriesJob", tracer, func(ctx context.Context, span trace.Span) error {
348+
start := time.Now()
349+
defer opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("integrateEntries")))
350+
348351
ctx, cancel := context.WithTimeout(ctx, defaultIntegrationTimeout)
349352
defer cancel() // Note: ok because we're in a func passed to TraceErr here!
350353

@@ -833,6 +836,9 @@ func (s *spannerCoordinator) checkDataCompatibility(ctx context.Context) error {
833836
// index assigned to the first entry in the batch.
834837
func (s *spannerCoordinator) assignEntries(ctx context.Context, entries []*tessera.Entry) error {
835838
return otel.TraceErr(ctx, "tessera.storage.gcp.assignEntries", tracer, func(ctx context.Context, span trace.Span) error {
839+
start := time.Now()
840+
defer opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("assignEntries")))
841+
836842
span.SetAttributes(numEntriesKey.Int(len(entries)))
837843

838844
span.AddEvent("Reading IntCoord:seq")

storage/posix/files.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ func (l *logResourceStorage) NextIndex(ctx context.Context) (uint64, error) {
306306
// than one-by-one.
307307
func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry) error {
308308
return otel.TraceErr(ctx, "tessera.storage.posix.assignEntries", tracer, func(ctx context.Context, span trace.Span) error {
309+
start := time.Now()
310+
defer posixOpsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("assignIntegrateBatch")))
309311
span.SetAttributes(numEntriesKey.Int(len(entries)))
310312

311313
// Double locking:
@@ -398,6 +400,7 @@ func (a *appender) sequenceBatch(ctx context.Context, entries []*tessera.Entry)
398400
if err := a.s.writeTreeState(ctx, newSize, newRoot); err != nil {
399401
return fmt.Errorf("failed to write new tree state: %v", err)
400402
}
403+
401404
// Notify that we know for sure there's a new checkpoint, but don't block if there's already
402405
// an outstanding notification in the channel.
403406
select {

0 commit comments

Comments
 (0)