Skip to content

Commit 54afd00

Browse files
authored
Merge pull request #919 from Spground/feature/public-k8s-perf-opt
feat(k8s): Add some logs & perf opt
2 parents 8fab5d8 + 6a2edc1 commit 54afd00

11 files changed

Lines changed: 241 additions & 111 deletions

kubernetes/Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,16 @@ COPY internal/ internal/
4848
# by leaving it empty we can ensure that the container and binary shipped on it will have the same platform.
4949
RUN echo "Building for $TARGETOS/$TARGETARCH"
5050
ARG PACKAGE=./cmd/controller
51+
ARG COMMIT_ID=unknown
52+
ARG BUILD_DATE=unknown
5153
RUN if [ -n "${CC}" ]; then export CC; fi; \
5254
if [ -n "${CXX}" ]; then export CXX; fi; \
5355
export CGO_ENABLED="${CGO_ENABLED}" GOOS="${TARGETOS:-linux}" GOARCH="${TARGETARCH}" \
5456
CGO_CFLAGS="${CGO_CFLAGS:-${CFLAGS}}" \
5557
CGO_CXXFLAGS="${CGO_CXXFLAGS:-${CXXFLAGS}}" \
5658
CGO_LDFLAGS="${CGO_LDFLAGS}"; \
5759
go build ${GOFLAGS} -trimpath -buildvcs=false \
58-
-ldflags "${LDFLAGS} -buildid= -B none" \
60+
-ldflags "${LDFLAGS} -buildid= -B none -X main.commitID=${COMMIT_ID} -X main.buildDate=${BUILD_DATE}" \
5961
-o server ${PACKAGE}
6062

6163
# Use golang image as base to ensure nsenter (util-linux) is available

kubernetes/build.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ BUILD_ARGS=()
3131
for name in GOFLAGS LDFLAGS CGO_ENABLED CC CXX CFLAGS CXXFLAGS CGO_CFLAGS CGO_CXXFLAGS CGO_LDFLAGS; do
3232
build_arg_if_set "${name}"
3333
done
34+
BUILD_ARGS+=(--build-arg "COMMIT_ID=$(git rev-parse --short HEAD)")
35+
BUILD_ARGS+=(--build-arg "BUILD_DATE=$(date -u +%Y-%m-%dT%H:%M:%SZ)")
3436
mkdir -p "$(dirname "${BUILD_METADATA_FILE}")"
3537

3638
DOCKERHUB_REPO="opensandbox"

kubernetes/cmd/controller/main.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,17 @@ import (
4343
"github.com/alibaba/OpenSandbox/sandbox-k8s/internal/controller"
4444
poolassign "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/controller/poolassign"
4545
cryptoutil "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/crypto"
46+
"github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/expectations"
4647
"github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/fieldindex"
4748
"github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils/logging"
4849
// +kubebuilder:scaffold:imports
4950
)
5051

52+
var (
53+
commitID = "unknown"
54+
buildDate = "unknown"
55+
)
56+
5157
const (
5258
defaultBatchSandboxConcurrency = 32
5359
defaultPoolConcurrency = 16
@@ -237,6 +243,8 @@ func main() {
237243
logger := logging.NewLoggerWithZapOptions(logOpts)
238244
ctrl.SetLogger(logger)
239245

246+
setupLog.Info("Starting controller", "commitID", commitID, "buildDate", buildDate)
247+
240248
// if the enable-http2 flag is false (the default), http/2 should be disabled
241249
// due to its vulnerabilities. More specifically, disabling http/2 will
242250
// prevent from being vulnerable to the HTTP/2 Stream Cancellation and
@@ -375,11 +383,12 @@ func main() {
375383
}
376384

377385
config := ctrl.GetConfigOrDie()
386+
config.UserAgent = "sandbox-k8s-controller/1.0"
378387
// Set client rate limiter if specified
379-
if kubeClientQPS > 0 {
388+
if kubeClientQPS != 0 {
380389
config.QPS = float32(kubeClientQPS)
381390
}
382-
if kubeClientBurst > 0 {
391+
if kubeClientBurst != 0 {
383392
config.Burst = kubeClientBurst
384393
}
385394

@@ -422,11 +431,12 @@ func main() {
422431
}
423432

424433
if err := (&controller.BatchSandboxReconciler{
425-
Client: mgr.GetClient(),
426-
Scheme: mgr.GetScheme(),
427-
Recorder: mgr.GetEventRecorderFor("batchsandbox-controller"),
428-
ResumePullSecret: resumePullSecret,
429-
ProfileStore: profileStore,
434+
Client: mgr.GetClient(),
435+
Scheme: mgr.GetScheme(),
436+
Recorder: mgr.GetEventRecorderFor("batchsandbox-controller"),
437+
ResumePullSecret: resumePullSecret,
438+
ProfileStore: profileStore,
439+
StatusRVExpectation: expectations.NewResourceVersionExpectation(),
430440
}).SetupWithManager(mgr, batchSandboxConcurrency); err != nil {
431441
setupLog.Error(err, "unable to create controller", "controller", "BatchSandbox")
432442
os.Exit(1)

kubernetes/internal/controller/allocator.go

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync"
2323

2424
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
2526
"sigs.k8s.io/controller-runtime/pkg/client"
2627
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
2728
logf "sigs.k8s.io/controller-runtime/pkg/log"
@@ -268,24 +269,38 @@ func NewAnnoAllocationSyncer(client client.Client) AllocationSyncer {
268269
}
269270

270271
func (syncer *annoAllocationSyncer) SetAllocation(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, allocation *SandboxAllocation) error {
271-
old, ok := sandbox.DeepCopyObject().(*sandboxv1alpha1.BatchSandbox)
272-
if !ok {
273-
return fmt.Errorf("invalid object")
272+
js, err := json.Marshal(allocation)
273+
if err != nil {
274+
return err
274275
}
275276
anno := sandbox.GetAnnotations()
276277
if anno == nil {
277278
anno = make(map[string]string)
278279
}
279-
js, err := json.Marshal(allocation)
280+
anno[AnnoAllocStatusKey] = string(js)
281+
sandbox.SetAnnotations(anno)
282+
283+
needAddFinalizer := !controllerutil.ContainsFinalizer(sandbox, FinalizerPoolAllocation)
284+
if needAddFinalizer {
285+
sandbox.SetFinalizers(append(sandbox.GetFinalizers(), FinalizerPoolAllocation))
286+
}
287+
288+
meta := map[string]any{
289+
"annotations": map[string]string{
290+
AnnoAllocStatusKey: string(js),
291+
},
292+
}
293+
if needAddFinalizer {
294+
meta["finalizers"] = sandbox.GetFinalizers()
295+
}
296+
patchData, err := json.Marshal(map[string]any{"metadata": meta})
280297
if err != nil {
281298
return err
282299
}
283-
anno[AnnoAllocStatusKey] = string(js)
284-
sandbox.SetAnnotations(anno)
285-
// Add finalizer to ensure the sandbox is not deleted before all pods are recycled.
286-
controllerutil.AddFinalizer(sandbox, FinalizerPoolAllocation)
287-
patch := client.MergeFrom(old)
288-
return syncer.client.Patch(ctx, sandbox, patch)
300+
obj := &sandboxv1alpha1.BatchSandbox{}
301+
obj.Name = sandbox.Name
302+
obj.Namespace = sandbox.Namespace
303+
return syncer.client.Patch(ctx, obj, client.RawPatch(types.MergePatchType, patchData))
289304
}
290305

291306
func (syncer *annoAllocationSyncer) GetAllocation(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox) (*SandboxAllocation, error) {
@@ -340,20 +355,18 @@ func (syncer *annoAllocationSyncer) GetReleased(ctx context.Context, sandbox *sa
340355
}
341356

342357
func (syncer *annoAllocationSyncer) SetReleased(ctx context.Context, sandbox *sandboxv1alpha1.BatchSandbox, released *AllocationReleased) error {
343-
old, ok := sandbox.DeepCopyObject().(*sandboxv1alpha1.BatchSandbox)
344-
if !ok {
345-
return fmt.Errorf("invalid object")
358+
js, err := json.Marshal(released)
359+
if err != nil {
360+
return err
346361
}
347362
anno := sandbox.GetAnnotations()
348363
if anno == nil {
349364
anno = make(map[string]string)
350365
}
351-
js, err := json.Marshal(released)
352-
if err != nil {
353-
return err
354-
}
355366
anno[AnnoAllocReleasedKey] = string(js)
356367
sandbox.SetAnnotations(anno)
368+
369+
needRemoveFinalizer := false
357370
// If the sandbox is being deleted and all allocated pods have been released,
358371
// remove the finalizer so the sandbox can be garbage collected.
359372
if !sandbox.DeletionTimestamp.IsZero() {
@@ -372,12 +385,34 @@ func (syncer *annoAllocationSyncer) SetReleased(ctx context.Context, sandbox *sa
372385
break
373386
}
374387
}
375-
if allReleased {
376-
controllerutil.RemoveFinalizer(sandbox, FinalizerPoolAllocation)
388+
if allReleased && controllerutil.ContainsFinalizer(sandbox, FinalizerPoolAllocation) {
389+
needRemoveFinalizer = true
390+
filtered := make([]string, 0, len(sandbox.GetFinalizers()))
391+
for _, f := range sandbox.GetFinalizers() {
392+
if f != FinalizerPoolAllocation {
393+
filtered = append(filtered, f)
394+
}
395+
}
396+
sandbox.SetFinalizers(filtered)
377397
}
378398
}
379-
patch := client.MergeFrom(old)
380-
return syncer.client.Patch(ctx, sandbox, patch)
399+
400+
meta := map[string]any{
401+
"annotations": map[string]string{
402+
AnnoAllocReleasedKey: string(js),
403+
},
404+
}
405+
if needRemoveFinalizer {
406+
meta["finalizers"] = sandbox.GetFinalizers()
407+
}
408+
patchData, err := json.Marshal(map[string]any{"metadata": meta})
409+
if err != nil {
410+
return err
411+
}
412+
obj := &sandboxv1alpha1.BatchSandbox{}
413+
obj.Name = sandbox.Name
414+
obj.Namespace = sandbox.Namespace
415+
return syncer.client.Patch(ctx, obj, client.RawPatch(types.MergePatchType, patchData))
381416
}
382417

383418
type AllocSpec struct {

kubernetes/internal/controller/batchsandbox_controller.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,11 @@ type taskScheduleResult struct {
6666
// BatchSandboxReconciler reconciles a BatchSandbox object
6767
type BatchSandboxReconciler struct {
6868
client.Client
69-
Scheme *runtime.Scheme
70-
Recorder record.EventRecorder
71-
ProfileStore *poolassign.ProfileStore
72-
taskSchedulers sync.Map
69+
Scheme *runtime.Scheme
70+
Recorder record.EventRecorder
71+
ProfileStore *poolassign.ProfileStore
72+
taskSchedulers sync.Map
73+
StatusRVExpectation expectations.ResourceVersionExpectation
7374
// ResumePullSecret is the K8s Secret name for pulling snapshot images during resume.
7475
ResumePullSecret string
7576
}
@@ -90,11 +91,13 @@ type BatchSandboxReconciler struct {
9091
//
9192
// For more details, check Reconcile and its Result here:
9293
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile
93-
func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
94+
func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, retErr error) {
9495
log := logf.FromContext(ctx)
96+
start := time.Now()
9597
var aggErrors []error
9698
defer func() {
9799
_ = DurationStore.Pop(req.String())
100+
log.Info("Reconcile finished", "duration", time.Since(start).String(), "requeueAfter", result.RequeueAfter.String(), "error", retErr)
98101
}()
99102
batchSbx := &sandboxv1alpha1.BatchSandbox{}
100103
if err := r.Get(ctx, client.ObjectKey{
@@ -192,6 +195,14 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
192195
}
193196

194197
runtimeView := buildRuntimeView(batchSbx, pods)
198+
// Ensure PauseObservedGeneration is up-to-date so the status patch ACKs the
199+
// current generation without requiring a dedicated API call.
200+
// Skip during Resuming: a newer generation may carry a queued pause request
201+
// that must remain unacknowledged until resume completes and handlePause runs.
202+
if batchSbx.Status.Phase != sandboxv1alpha1.BatchSandboxPhaseResuming &&
203+
runtimeView.status.PauseObservedGeneration < batchSbx.Generation {
204+
runtimeView.status.PauseObservedGeneration = batchSbx.Generation
205+
}
195206

196207
if batchSbx.Status.Phase == sandboxv1alpha1.BatchSandboxPhasePaused {
197208
r.deleteTaskScheduler(ctx, batchSbx)
@@ -210,9 +221,14 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
210221
}
211222
}
212223

213-
aggErrors = append(aggErrors, r.persistRuntimeView(ctx, batchSbx, runtimeView)...)
224+
requeue, persistErrors := r.persistRuntimeView(ctx, batchSbx, runtimeView)
225+
aggErrors = append(aggErrors, persistErrors...)
214226

215-
return reconcile.Result{RequeueAfter: DurationStore.Pop(req.String())}, gerrors.Join(aggErrors...)
227+
requeueAfter := DurationStore.Pop(req.String())
228+
if requeue > 0 && (requeueAfter == 0 || requeue < requeueAfter) {
229+
requeueAfter = requeue
230+
}
231+
return reconcile.Result{RequeueAfter: requeueAfter}, gerrors.Join(aggErrors...)
216232
}
217233

218234
func calPodIndex(poolStrategy strategy.PoolStrategy, batchSbx *sandboxv1alpha1.BatchSandbox, pods []*corev1.Pod) (map[string]int, error) {

kubernetes/internal/controller/batchsandbox_pause_resume.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,9 @@ func (r *BatchSandboxReconciler) dispatchPauseResume(ctx context.Context, bs *sa
209209
result, err := r.handleResume(ctx, bs)
210210
return result, true, err
211211
}
212-
log.Info("Dispatch: ACK only", "generation", generation, "pauseObservedGeneration", pauseObservedGen)
213-
if err := r.ackPauseGeneration(ctx, bs); err != nil {
214-
return ctrl.Result{}, true, err
215-
}
212+
// No pause intent — skip the dedicated ACK API call. The normal flow's
213+
// persistRuntimeView will update PauseObservedGeneration in its status patch.
214+
log.Info("Dispatch: no pause intent, deferring ACK to status patch", "generation", generation, "pauseObservedGeneration", pauseObservedGen)
216215
return ctrl.Result{}, false, nil
217216
}
218217

@@ -479,8 +478,9 @@ func (r *BatchSandboxReconciler) completePause(ctx context.Context, bs *sandboxv
479478

480479
r.deleteTaskScheduler(ctx, bs)
481480

481+
var latest *sandboxv1alpha1.BatchSandbox
482482
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
483-
latest := &sandboxv1alpha1.BatchSandbox{}
483+
latest = &sandboxv1alpha1.BatchSandbox{}
484484
if err := r.Get(ctx, types.NamespacedName{Namespace: bs.Namespace, Name: bs.Name}, latest); err != nil {
485485
return err
486486
}
@@ -496,6 +496,7 @@ func (r *BatchSandboxReconciler) completePause(ctx context.Context, bs *sandboxv
496496
}); err != nil {
497497
return err
498498
}
499+
r.StatusRVExpectation.Expect(latest)
499500

500501
return nil
501502
}
@@ -584,8 +585,9 @@ func (r *BatchSandboxReconciler) continueResume(ctx context.Context, bs *sandbox
584585
}
585586

586587
func (r *BatchSandboxReconciler) ackPauseGeneration(ctx context.Context, bs *sandboxv1alpha1.BatchSandbox) error {
588+
var latest *sandboxv1alpha1.BatchSandbox
587589
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
588-
latest := &sandboxv1alpha1.BatchSandbox{}
590+
latest = &sandboxv1alpha1.BatchSandbox{}
589591
if err := r.Get(ctx, types.NamespacedName{Namespace: bs.Namespace, Name: bs.Name}, latest); err != nil {
590592
return err
591593
}
@@ -595,14 +597,16 @@ func (r *BatchSandboxReconciler) ackPauseGeneration(ctx context.Context, bs *san
595597
}); err != nil {
596598
return err
597599
}
600+
r.StatusRVExpectation.Expect(latest)
598601
bs.Status.PauseObservedGeneration = bs.Generation
599602
applyBatchSandboxPhaseConditions(&bs.Status)
600603
return nil
601604
}
602605

603606
func (r *BatchSandboxReconciler) ackPauseWithPhase(ctx context.Context, bs *sandboxv1alpha1.BatchSandbox, phase sandboxv1alpha1.BatchSandboxPhase, _ string) error {
607+
var latest *sandboxv1alpha1.BatchSandbox
604608
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
605-
latest := &sandboxv1alpha1.BatchSandbox{}
609+
latest = &sandboxv1alpha1.BatchSandbox{}
606610
if err := r.Get(ctx, types.NamespacedName{Namespace: bs.Namespace, Name: bs.Name}, latest); err != nil {
607611
return err
608612
}
@@ -613,6 +617,7 @@ func (r *BatchSandboxReconciler) ackPauseWithPhase(ctx context.Context, bs *sand
613617
}); err != nil {
614618
return err
615619
}
620+
r.StatusRVExpectation.Expect(latest)
616621
bs.Status.PauseObservedGeneration = bs.Generation
617622
bs.Status.Phase = phase
618623
applyBatchSandboxPhaseConditions(&bs.Status)
@@ -640,8 +645,9 @@ func (r *BatchSandboxReconciler) setCondition(
640645
reason string,
641646
message string,
642647
) error {
643-
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
644-
latest := &sandboxv1alpha1.BatchSandbox{}
648+
var latest *sandboxv1alpha1.BatchSandbox
649+
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
650+
latest = &sandboxv1alpha1.BatchSandbox{}
645651
if err := r.Get(ctx, types.NamespacedName{Namespace: bs.Namespace, Name: bs.Name}, latest); err != nil {
646652
return err
647653
}
@@ -674,5 +680,9 @@ func (r *BatchSandboxReconciler) setCondition(
674680

675681
latest.Status.Conditions = conditions
676682
return r.Status().Update(ctx, latest)
677-
})
683+
}); err != nil {
684+
return err
685+
}
686+
r.StatusRVExpectation.Expect(latest)
687+
return nil
678688
}

0 commit comments

Comments
 (0)