From ffd1546562e67b7911eab2ea3279dd604685baf0 Mon Sep 17 00:00:00 2001 From: "pingshan.wj" Date: Wed, 15 Apr 2026 11:53:45 +0800 Subject: [PATCH 1/3] feat(k8s): add breakdown latency log & metric in pool controller --- kubernetes/internal/controller/metrics.go | 76 +++++++++++++++++++ .../internal/controller/pool_controller.go | 41 +++++++--- 2 files changed, 106 insertions(+), 11 deletions(-) create mode 100644 kubernetes/internal/controller/metrics.go diff --git a/kubernetes/internal/controller/metrics.go b/kubernetes/internal/controller/metrics.go new file mode 100644 index 000000000..b310e92d1 --- /dev/null +++ b/kubernetes/internal/controller/metrics.go @@ -0,0 +1,76 @@ +package controller + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +const ( + summaryMaxAge = time.Second * 30 + summaryAgeBuckets uint32 = 3 + metricNamespace = "opensandbox-controller" +) + +var ( + allocatorScheduleDurationSummary = func() *prometheus.SummaryVec { + return prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: metricNamespace, + Subsystem: "allocator", + Name: "schedule_duration_ms", + MaxAge: summaryMaxAge, + AgeBuckets: summaryAgeBuckets, + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, + []string{"namespace", "pool_name", "success"}, + ) + }() + allocatorPersistAllocationStateDurationSummary = func() *prometheus.SummaryVec { + return prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: metricNamespace, + Subsystem: "allocator", + Name: "persist_alloc_state_duration_ms", + MaxAge: summaryMaxAge, + AgeBuckets: summaryAgeBuckets, + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, + []string{"namespace", "pool_name", "success"}, + ) + }() + allocatorSyncAllocResultDurationSummary = func() *prometheus.SummaryVec { + return prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: metricNamespace, + Subsystem: "allocator", + Name: "sync_alloc_result_duration_ms", + MaxAge: summaryMaxAge, + AgeBuckets: summaryAgeBuckets, + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, + []string{"namespace", "pool_name", "success"}, + ) + }() + allocatorSyncSingleAllocResultDurationSummary = func() *prometheus.SummaryVec { + return prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: metricNamespace, + Subsystem: "allocator", + Name: "sync_single_alloc_result_duration_ms", + MaxAge: summaryMaxAge, + AgeBuckets: summaryAgeBuckets, + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, + []string{"namespace", "pool_name", "sandbox_name", "success"}, + ) + }() +) + +func init() { + metrics.Registry.MustRegister(allocatorScheduleDurationSummary) + metrics.Registry.MustRegister(allocatorPersistAllocationStateDurationSummary) + metrics.Registry.MustRegister(allocatorSyncAllocResultDurationSummary) + metrics.Registry.MustRegister(allocatorSyncSingleAllocResultDurationSummary) +} diff --git a/kubernetes/internal/controller/pool_controller.go b/kubernetes/internal/controller/pool_controller.go index 867b57f27..53f107203 100644 --- a/kubernetes/internal/controller/pool_controller.go +++ b/kubernetes/internal/controller/pool_controller.go @@ -101,6 +101,10 @@ type PoolReconciler struct { func (r *PoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := logf.FromContext(ctx) + start := time.Now() + defer func() { + log.Info("Reconcile finished", "latencyMs", time.Since(start).Milliseconds()) + }() // Fetch the Pool instance pool := &sandboxv1alpha1.Pool{} if err := r.Get(ctx, req.NamespacedName, pool); err != nil { @@ -319,7 +323,10 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp Pool: pool, Pods: pods, } + start := time.Now() allocStatus, pendingSyncs, poolDirty, err := r.Allocator.Schedule(ctx, spec) + schLatency := time.Since(start).Milliseconds() + allocatorScheduleDurationSummary.WithLabelValues(pool.Namespace, pool.Name, strconv.FormatBool(err == nil)).Observe(float64(schLatency)) if err != nil { return nil, err } @@ -329,8 +336,9 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp idlePods = append(idlePods, pod.Name) } } - log.Info("Schedule result", "pool", pool.Name, "allocated", len(allocStatus.PodAllocation), - "idlePods", len(idlePods), "supplement", allocStatus.PodSupplement, "pendingSyncs", len(pendingSyncs), "poolDirty", poolDirty) + log.Info("Allocator schedule result", "pool", pool.Name, "sandboxSize", len(batchSandboxes), "podSize", len(pods), + "allocated", len(allocStatus.PodAllocation), "idlePods", len(idlePods), "supplement", allocStatus.PodSupplement, "pendingSyncs", len(pendingSyncs), "poolDirty", poolDirty, + "latencyMs", schLatency) schedResult := &ScheduleResult{ PodAllocation: allocStatus.PodAllocation, @@ -341,10 +349,14 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp // Persist allocation to memory store if poolDirty { - if err := r.Allocator.PersistPoolAllocation(ctx, pool, &AllocStatus{PodAllocation: allocStatus.PodAllocation}); err != nil { - log.Error(err, "Failed to persist pool allocation") - return nil, err + start = time.Now() + err := r.Allocator.PersistPoolAllocation(ctx, pool, &AllocStatus{PodAllocation: allocStatus.PodAllocation}) + persistLatency := time.Since(start).Milliseconds() + allocatorPersistAllocationStateDurationSummary.WithLabelValues(pool.Namespace, pool.Name, strconv.FormatBool(err == nil)).Observe(float64(persistLatency)) + if err != nil { + return nil, fmt.Errorf("Failed to persist pool allocation, err %w", err) } + log.Info("Allocator persist allocation state", "pool", pool.Name, "allocation", len(allocStatus.PodAllocation), "latencyMs", persistLatency) } // Sync to each BatchSandbox concurrently @@ -352,17 +364,22 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp var wg sync.WaitGroup sem := make(chan struct{}, syncSandboxAllocConcurrency) + start = time.Now() for _, syncInfo := range pendingSyncs { wg.Add(1) sem <- struct{}{} go func(info SandboxSyncInfo) { defer wg.Done() defer func() { <-sem }() - if err := r.Allocator.SyncSandboxAllocation(ctx, info.Sandbox, info.Pods); err != nil { + start0 := time.Now() + err := r.Allocator.SyncSandboxAllocation(ctx, info.Sandbox, info.Pods) + latencyMs0 := time.Since(start0).Milliseconds() + allocatorSyncSingleAllocResultDurationSummary.WithLabelValues(info.Sandbox.Namespace, info.Sandbox.Spec.PoolRef, info.Sandbox.Name, strconv.FormatBool(err == nil)).Observe(float64(latencyMs0)) + if err != nil { log.Error(err, "Failed to sync sandbox allocation", "sandbox", info.SandboxName) errCh <- fmt.Errorf("failed to sync sandbox %s: %w", info.SandboxName, err) } else { - log.Info("Successfully sync Sandbox allocation", "sandbox", info.SandboxName, "pods", info.Pods) + log.Info("Successfully sync Sandbox allocation", "sandbox", info.SandboxName, "pods", info.Pods, "latencyMs", latencyMs0) } }(syncInfo) } @@ -373,11 +390,13 @@ func (r *PoolReconciler) scheduleSandbox(ctx context.Context, pool *sandboxv1alp for err := range errCh { syncErrs = append(syncErrs, err) } - - if err := gerrors.Join(syncErrs...); err != nil { - return nil, err + syncAggErr := gerrors.Join(syncErrs...) + syncAllocLatency := time.Since(start).Milliseconds() + log.Info("Sync sandbox allocation", "total", len(pendingSyncs), "failed", len(syncErrs), "latencyMs", syncAllocLatency) + allocatorSyncAllocResultDurationSummary.WithLabelValues(pool.Namespace, pool.Name, strconv.FormatBool(syncAggErr == nil)).Observe(float64(syncAllocLatency)) + if syncAggErr != nil { + return nil, syncAggErr } - return schedResult, nil } From 8136cee79fcb93329f449f095e2dee35c4c420c6 Mon Sep 17 00:00:00 2001 From: "pingshan.wj" Date: Wed, 15 Apr 2026 12:22:51 +0800 Subject: [PATCH 2/3] feat(k8s): add breakdown log& endpoints-last-trans-time in batchsandbox controller --- kubernetes/internal/controller/apis.go | 7 ++++--- .../controller/batchsandbox_controller.go | 21 ++++++++++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/kubernetes/internal/controller/apis.go b/kubernetes/internal/controller/apis.go index 461acfd96..a15fca789 100644 --- a/kubernetes/internal/controller/apis.go +++ b/kubernetes/internal/controller/apis.go @@ -24,9 +24,10 @@ import ( ) const ( - AnnoAllocStatusKey = "sandbox.opensandbox.io/alloc-status" - AnnoAllocReleaseKey = "sandbox.opensandbox.io/alloc-release" - LabelBatchSandboxPodIndexKey = "batch-sandbox.sandbox.opensandbox.io/pod-index" + AnnoAllocStatusKey = "sandbox.opensandbox.io/alloc-status" + AnnoAllocReleaseKey = "sandbox.opensandbox.io/alloc-release" + AnnotationEndpointsLastTransTime = "sandbox.opensandbox.io/endpoints-last-trans-time" + LabelBatchSandboxPodIndexKey = "batch-sandbox.sandbox.opensandbox.io/pod-index" FinalizerTaskCleanup = "batch-sandbox.sandbox.opensandbox.io/task-cleanup" ) diff --git a/kubernetes/internal/controller/batchsandbox_controller.go b/kubernetes/internal/controller/batchsandbox_controller.go index f7b418663..188114769 100644 --- a/kubernetes/internal/controller/batchsandbox_controller.go +++ b/kubernetes/internal/controller/batchsandbox_controller.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -86,6 +87,10 @@ type BatchSandboxReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := logf.FromContext(ctx) + start := time.Now() + defer func() { + log.Info("Reconcile finished", "latencyMs", time.Since(start).Milliseconds()) + }() var aggErrors []error defer func() { _ = DurationStore.Pop(req.String()) @@ -186,11 +191,16 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request "metadata": map[string]any{ "annotations": map[string]string{ AnnotationSandboxEndpoints: string(raw), + // example 2026-04-15T12:19:11.696+08:00, use ms precision + AnnotationEndpointsLastTransTime: time.Now().Format("2006-01-02T15:04:05.000Z07:00"), }, }, }) obj := &sandboxv1alpha1.BatchSandbox{ObjectMeta: metav1.ObjectMeta{Namespace: batchSbx.Namespace, Name: batchSbx.Name}} - if err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, patchData)); err != nil { + start := time.Now() + err := r.Patch(ctx, obj, client.RawPatch(types.MergePatchType, patchData)) + log.Info("Sync sandbox endpoint", "sandbox", klog.KObj(batchSbx), "latencyMs", time.Since(start).Milliseconds(), "success", strconv.FormatBool(err == nil)) + if err != nil { log.Error(err, "failed to patch annotation", "annotation", AnnotationSandboxEndpoints, "body", string(patchData)) aggErrors = append(aggErrors, err) } @@ -226,8 +236,13 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request }) if err != nil { aggErrors = append(aggErrors, err) - } else if err := r.Status().Patch(ctx, batchSbx, client.RawPatch(types.MergePatchType, patchData)); err != nil { - aggErrors = append(aggErrors, err) + } else { + start := time.Now() + err := r.Status().Patch(ctx, batchSbx, client.RawPatch(types.MergePatchType, patchData)) + log.Info("Update sandbox status", "sandbox", klog.KObj(batchSbx), "latencyMs", time.Since(start).Milliseconds(), "success", strconv.FormatBool(err == nil)) + if err != nil { + aggErrors = append(aggErrors, err) + } } } From cf761255beb34faaf60b5cb36d7810a3d0512ab6 Mon Sep 17 00:00:00 2001 From: "pingshan.wj" Date: Wed, 15 Apr 2026 19:39:50 +0800 Subject: [PATCH 3/3] feat(k8s): update logger to support nansec --- kubernetes/internal/utils/logging/logger.go | 28 ++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/kubernetes/internal/utils/logging/logger.go b/kubernetes/internal/utils/logging/logger.go index 5b74759e4..7d8f10822 100644 --- a/kubernetes/internal/utils/logging/logger.go +++ b/kubernetes/internal/utils/logging/logger.go @@ -16,6 +16,7 @@ package logging import ( "os" + "time" "github.com/go-logr/logr" zap2 "go.uber.org/zap" @@ -24,6 +25,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" ) +// localTimeEncoder encodes time in local timezone with nanosecond precision +func localTimeEncoder(t time.Time, enc zapcore.PrimitiveArrayEncoder) { + enc.AppendString(t.Local().Format("2006-01-02T15:04:05.000000000Z07:00")) +} + // Options contains configuration for the logger type Options struct { // Development configures the logger to use a development config @@ -63,6 +69,22 @@ func DefaultOptions() Options { // NewLoggerWithZapOptions creates a logger using controller-runtime's zap options // and adds file output support func NewLoggerWithZapOptions(opts Options) logr.Logger { + // Create encoder config with nanosecond timestamp + encoderConfig := zapcore.EncoderConfig{ + MessageKey: "msg", + LevelKey: "level", + TimeKey: "ts", + NameKey: "logger", + CallerKey: "caller", + FunctionKey: zapcore.OmitKey, + StacktraceKey: "stacktrace", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeTime: localTimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + } + // Add AddCaller option to include file and line number in logs if opts.ZapOptions.ZapOpts == nil { opts.ZapOptions.ZapOpts = []zap2.Option{} @@ -71,7 +93,10 @@ func NewLoggerWithZapOptions(opts Options) logr.Logger { // If file output is not enabled, use the default zap logger if !opts.EnableFileOutput { - return zap.New(zap.UseFlagOptions(&opts.ZapOptions)) + return zap.New( + zap.UseFlagOptions(&opts.ZapOptions), + zap.Encoder(zapcore.NewJSONEncoder(encoderConfig)), + ) } // Create file writer with rotation @@ -93,6 +118,7 @@ func NewLoggerWithZapOptions(opts Options) logr.Logger { // Create logger with multi-writer return zap.New( zap.UseFlagOptions(&opts.ZapOptions), + zap.Encoder(zapcore.NewJSONEncoder(encoderConfig)), zap.WriteTo(multiWriter), ) }