Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/controller/sandbox/pod_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"context"
"reflect"

"github.com/openkruise/agents/pkg/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/openkruise/agents/pkg/controller/sandbox/core"
"github.com/openkruise/agents/pkg/utils"
)

// SandboxPodEventHandler watches Pods created by the Sandbox controller.
Expand All @@ -47,6 +49,7 @@ func (e *SandboxPodEventHandler) Update(_ context.Context, evt event.TypedUpdate

func (e *SandboxPodEventHandler) Delete(_ context.Context, evt event.TypedDeleteEvent[client.Object], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
w.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(evt.Object)})
core.ResourceVersionExpectations.Delete(evt.Object)
}

func (e *SandboxPodEventHandler) Generic(context.Context, event.TypedGenericEvent[client.Object], workqueue.TypedRateLimitingInterface[reconcile.Request]) {
Expand Down
106 changes: 73 additions & 33 deletions pkg/controller/sandbox/sandbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,10 @@ func (r *SandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (cr
}

logger.Info("Began to process Sandbox for reconcile")
if pod != nil {
core.ScaleExpectation.ObserveScale(utils.GetControllerKey(box), expectations.Create, pod.Name)
}
if isSatisfied, unsatisfiedDuration, _ := core.ScaleExpectation.SatisfiedExpectations(utils.GetControllerKey(box)); !isSatisfied {
if unsatisfiedDuration < expectations.ExpectationTimeout {
logger.Info("Not satisfied ScaleExpectation for Sandbox, wait for cache event")
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
klog.InfoS("ScaleExpectation unsatisfied overtime for Sandbox, wait for cache event timeout", "timeout", unsatisfiedDuration)
core.ScaleExpectation.DeleteExpectations(utils.GetControllerKey(box))
}
// If resourceVersion expectations have not satisfied yet, just skip this reconcile
core.ResourceVersionExpectations.Observe(box)
if isSatisfied, unsatisfiedDuration := core.ResourceVersionExpectations.IsSatisfied(box); !isSatisfied {
if unsatisfiedDuration < expectations.ExpectationTimeout {
logger.Info("Not satisfied resourceVersion for Sandbox, wait for cache event")
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}
klog.InfoS("ResourceVersionExpectations unsatisfied overtime for Sandbox, wait for cache event timeout", "timeout", unsatisfiedDuration)
core.ResourceVersionExpectations.Delete(box)

// Check expectations
if requeue, result := r.checkExpectations(ctx, pod, box); requeue {
return result, nil
}

defer func() {
Expand Down Expand Up @@ -185,12 +169,36 @@ func (r *SandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (cr
}

// Check ShutdownTime and PauseTime
requeueAfter, shouldReturn, returnErr := r.checkTimedActions(ctx, box)
if shouldReturn {
return ctrl.Result{}, returnErr
}

// calculate sandbox status
var shouldRequeue bool
newStatus, shouldRequeue = calculateStatus(args)
if shouldRequeue {
return reconcile.Result{RequeueAfter: requeueAfter}, r.updateSandboxStatus(ctx, *newStatus, box)
}

return r.handlePhaseTransition(ctx, args, newStatus, requeueAfter)
}

func (r *SandboxReconciler) handleTerminating(ctx context.Context, args core.EnsureFuncArgs) (ctrl.Result, error) {
pod, _, _ := args.Pod, args.Box, args.NewStatus
return ctrl.Result{}, r.getControl(pod).EnsureSandboxTerminated(ctx, args)
}

// checkTimedActions handles ShutdownTime and PauseTime checks
func (r *SandboxReconciler) checkTimedActions(ctx context.Context, box *agentsv1alpha1.Sandbox) (time.Duration, bool, error) {
logger := logf.FromContext(ctx).WithValues("sandbox", klog.KObj(box))
now := metav1.Now()
var requeueAfter time.Duration

if box.Spec.ShutdownTime != nil && box.DeletionTimestamp == nil {
if box.Spec.ShutdownTime.Before(&now) {
logger.Info("sandbox shutdown time reached, will be deleted", "shutdownTime", box.Spec.ShutdownTime)
return ctrl.Result{}, r.Delete(ctx, box)
return 0, true, r.Delete(ctx, box)
}
requeueAfter = box.Spec.ShutdownTime.Sub(now.Time)
}
Expand All @@ -200,18 +208,18 @@ func (r *SandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (cr
modified := box.DeepCopy()
patch := client.MergeFrom(box)
modified.Spec.Paused = true
return ctrl.Result{}, r.Patch(ctx, modified, patch)
return 0, true, r.Patch(ctx, modified, patch)
}
requeueAfter = min(requeueAfter, box.Spec.PauseTime.Sub(now.Time))
}
return requeueAfter, false, nil
}

// calculate sandbox status
var shouldRequeue bool
newStatus, shouldRequeue = calculateStatus(args)
if shouldRequeue {
return reconcile.Result{RequeueAfter: requeueAfter}, r.updateSandboxStatus(ctx, *newStatus, box)
}
// handlePhaseTransition handles sandbox phase transitions
func (r *SandboxReconciler) handlePhaseTransition(ctx context.Context, args core.EnsureFuncArgs, newStatus *agentsv1alpha1.SandboxStatus, requeueAfter time.Duration) (ctrl.Result, error) {
logger := logf.FromContext(ctx).WithValues("sandbox", klog.KObj(args.Box))

var err error
switch newStatus.Phase {
case agentsv1alpha1.SandboxPending:
requeueAfter, err = r.getControl(args.Pod).EnsureSandboxRunning(ctx, args)
Expand All @@ -222,18 +230,50 @@ func (r *SandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request) (cr
case agentsv1alpha1.SandboxResuming:
err = r.getControl(args.Pod).EnsureSandboxResumed(ctx, args)
default:
logger.Info("sandbox status phase is invalid", "phase", box.Status.Phase)
logger.Info("sandbox status phase is invalid", "phase", args.Box.Status.Phase)
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
if err != nil {
return reconcile.Result{}, err
}
return ctrl.Result{RequeueAfter: requeueAfter}, r.updateSandboxStatus(ctx, *newStatus, box)
return ctrl.Result{RequeueAfter: requeueAfter}, r.updateSandboxStatus(ctx, *newStatus, args.Box)
}

func (r *SandboxReconciler) handleTerminating(ctx context.Context, args core.EnsureFuncArgs) (ctrl.Result, error) {
pod, _, _ := args.Pod, args.Box, args.NewStatus
return ctrl.Result{}, r.getControl(pod).EnsureSandboxTerminated(ctx, args)
// checkExpectations verifies scale and resource version expectations are satisfied
func (r *SandboxReconciler) checkExpectations(ctx context.Context, pod *corev1.Pod, box *agentsv1alpha1.Sandbox) (bool, ctrl.Result) {
logger := logf.FromContext(ctx).WithValues("sandbox", klog.KObj(box))

if pod != nil {
core.ScaleExpectation.ObserveScale(utils.GetControllerKey(box), expectations.Create, pod.Name)
core.ResourceVersionExpectations.Observe(pod)
if isSatisfied, unsatisfiedDuration := core.ResourceVersionExpectations.IsSatisfied(pod); !isSatisfied {
if unsatisfiedDuration < expectations.ExpectationTimeout {
logger.Info("Not satisfied resourceVersion for Pod, wait for cache event")
return true, reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}
}
klog.InfoS("ResourceVersionExpectations unsatisfied overtime for Pod, wait for cache event timeout", "timeout", unsatisfiedDuration)
core.ResourceVersionExpectations.Delete(pod)
}
}
if isSatisfied, unsatisfiedDuration, _ := core.ScaleExpectation.SatisfiedExpectations(utils.GetControllerKey(box)); !isSatisfied {
if unsatisfiedDuration < expectations.ExpectationTimeout {
logger.Info("Not satisfied ScaleExpectation for Sandbox, wait for cache event")
return true, reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}
}
klog.InfoS("ScaleExpectation unsatisfied overtime for Sandbox, wait for cache event timeout", "timeout", unsatisfiedDuration)
core.ScaleExpectation.DeleteExpectations(utils.GetControllerKey(box))
}
// If resourceVersion expectations have not satisfied yet, just skip this reconcile
core.ResourceVersionExpectations.Observe(box)
if isSatisfied, unsatisfiedDuration := core.ResourceVersionExpectations.IsSatisfied(box); !isSatisfied {
if unsatisfiedDuration < expectations.ExpectationTimeout {
logger.Info("Not satisfied resourceVersion for Sandbox, wait for cache event")
return true, reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}
}
klog.InfoS("ResourceVersionExpectations unsatisfied overtime for Sandbox, wait for cache event timeout", "timeout", unsatisfiedDuration)
core.ResourceVersionExpectations.Delete(box)
}
return false, ctrl.Result{}
}

func isSandboxCompletedPhase(phase agentsv1alpha1.SandboxPhase) bool {
Expand Down
181 changes: 181 additions & 0 deletions pkg/controller/sandbox/sandbox_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
agentsv1alpha1 "github.com/openkruise/agents/api/v1alpha1"
"github.com/openkruise/agents/pkg/controller/sandbox/core"
"github.com/openkruise/agents/pkg/utils"
"github.com/openkruise/agents/pkg/utils/expectations"
utilfeature "github.com/openkruise/agents/pkg/utils/feature"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -2321,3 +2322,183 @@ func TestSandboxReconciler_Reconcile_RateLimitFeatureGate(t *testing.T) {
})
}
}

func TestSandboxReconciler_checkExpectations(t *testing.T) {
scheme := runtime.NewScheme()
_ = clientgoscheme.AddToScheme(scheme)
_ = agentsv1alpha1.AddToScheme(scheme)

tests := []struct {
name string
pod *corev1.Pod
sandbox *agentsv1alpha1.Sandbox
setupExpect func()
expectRequeue bool
}{
{
name: "pod nil - expectations satisfied",
sandbox: &agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-1",
},
},
setupExpect: func() {
// Clean up all expectations
},
expectRequeue: false,
},
{
name: "pod exists - resourceVersion not satisfied within timeout",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-2",
ResourceVersion: "100",
},
},
sandbox: &agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-2",
},
},
setupExpect: func() {
core.ResourceVersionExpectations.Expect(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-2",
ResourceVersion: "200",
},
})
},
expectRequeue: true,
},
{
name: "pod exists - resourceVersion satisfied",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-3",
ResourceVersion: "100",
},
},
sandbox: &agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-3",
},
},
setupExpect: func() {
core.ResourceVersionExpectations.Expect(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-3",
ResourceVersion: "100",
},
})
},
expectRequeue: false,
},
{
name: "scale expectation not satisfied within timeout",
sandbox: &agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-4",
},
},
setupExpect: func() {
key := utils.GetControllerKey(&agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-4",
},
})
core.ScaleExpectation.ExpectScale(key, expectations.Create, "pod-1")
},
expectRequeue: true,
},
{
name: "sandbox resourceVersion not satisfied within timeout",
sandbox: &agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-5",
ResourceVersion: "100",
},
},
setupExpect: func() {
core.ResourceVersionExpectations.Expect(&agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-5",
ResourceVersion: "200",
},
})
},
expectRequeue: true,
},
{
name: "sandbox resourceVersion satisfied",
sandbox: &agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-6",
ResourceVersion: "100",
},
},
setupExpect: func() {
core.ResourceVersionExpectations.Expect(&agentsv1alpha1.Sandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sandbox",
Namespace: "default",
UID: "test-uid-6",
ResourceVersion: "100",
},
})
},
expectRequeue: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Clean up expectations before each test
if tt.pod != nil {
core.ResourceVersionExpectations.Delete(tt.pod)
}
core.ResourceVersionExpectations.Delete(tt.sandbox)
core.ScaleExpectation.DeleteExpectations(utils.GetControllerKey(tt.sandbox))

tt.setupExpect()

reconciler := &SandboxReconciler{}
ctx := context.Background()

requeue, result := reconciler.checkExpectations(ctx, tt.pod, tt.sandbox)

if requeue != tt.expectRequeue {
t.Errorf("checkExpectations() requeue = %v, expectRequeue %v", requeue, tt.expectRequeue)
}
if tt.expectRequeue && result.RequeueAfter == 0 {
t.Errorf("expected RequeueAfter > 0, got 0")
}
if !tt.expectRequeue && result.RequeueAfter != 0 {
t.Errorf("expected RequeueAfter == 0, got %v", result.RequeueAfter)
}
})
}
}
Loading