Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/sandbox-manager/infra/sandboxcr/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

checkpointUtils "github.com/openkruise/agents/pkg/utils/checkpoint"

"github.com/openkruise/agents/api/v1alpha1"
"github.com/openkruise/agents/pkg/sandbox-manager/clients"
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/sandbox-manager/consts"
"github.com/openkruise/agents/pkg/sandbox-manager/infra"
"github.com/openkruise/agents/pkg/utils"
"github.com/openkruise/agents/pkg/utils/runtime"
stateutils "github.com/openkruise/agents/pkg/utils/sandboxutils"
)

Expand Down Expand Up @@ -99,6 +102,14 @@ func CloneSandbox(ctx context.Context, opts infra.CloneSandboxOptions, cache *Ca
}

// Step 5: csi mount
// If opts.CSIMount is not provided from request, try to resolve mount options from sandbox annotation.
if opts.CSIMount == nil {
var resolveErr error
opts.CSIMount, resolveErr = runtime.ResolveCSIMountFromAnnotation(ctx, sbx.Sandbox, sbx.Client, sbx.Cache, sbx.storageRegistry)
if resolveErr != nil {
return nil, metrics, resolveErr
}
}
if opts.CSIMount != nil {
log.Info("starting to perform csi mount")
metrics.CSIMount, err = processCSIMounts(ctx, sbx, *opts.CSIMount)
Expand Down Expand Up @@ -168,6 +179,8 @@ func createSandboxFromCheckpoint(ctx context.Context, opts infra.CloneSandboxOpt
sbx.Annotations[v1alpha1.AnnotationRuntimeAccessToken] = initRuntimeOpts.AccessToken
sbx.Annotations[v1alpha1.AnnotationInitRuntimeRequest] = cp.Annotations[v1alpha1.AnnotationInitRuntimeRequest]
}
// e.g., copy csi mount config from checkpoint to sandbox obj
checkpointUtils.RestoreAnnotationsFromCheckpoint(cp, sbx.Sandbox)
DefaultPostProcessClonedSandbox(sbx.Sandbox)
log.Info("creating new sandbox from checkpoint")
sbx.Sandbox, err = DefaultCreateSandbox(ctx, sbx.Sandbox, client, cache)
Expand Down Expand Up @@ -318,6 +331,8 @@ func CreateCheckpoint(ctx context.Context, sbx *v1alpha1.Sandbox, client clients
}
}
}
// to make sure the sandbox annotations are propagated to the checkpoint
checkpointUtils.PropagateAnnotationsToCheckpoint(sbx, cp)
cp, err = DefaultCreateCheckpoint(ctx, client, cp)
if err != nil {
log.Error(err, "failed to create checkpoint")
Expand Down
141 changes: 127 additions & 14 deletions pkg/sandbox-manager/infra/sandboxcr/clone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"testing"
"time"

"github.com/openkruise/agents/pkg/utils/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
Expand All @@ -16,11 +15,14 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

"github.com/openkruise/agents/pkg/utils/runtime"

"github.com/openkruise/agents/api/v1alpha1"
"github.com/openkruise/agents/pkg/sandbox-manager/clients"
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/sandbox-manager/consts"
"github.com/openkruise/agents/pkg/sandbox-manager/infra"
"github.com/openkruise/agents/pkg/servers/e2b/models"
utils "github.com/openkruise/agents/pkg/utils/sandbox-manager"
testutils "github.com/openkruise/agents/test/utils"
)
Expand Down Expand Up @@ -186,14 +188,15 @@ func TestCloneSandbox(t *testing.T) {
t.Cleanup(func() { DefaultCreateSandbox = origCreateSandbox })

tests := []struct {
name string
opts infra.CloneSandboxOptions
serverOpts testutils.TestRuntimeServerOptions
initRuntime *config.InitRuntimeOptions
sbxOverride sbxOverride
preProcess func(t *testing.T, cache *Cache, client *clients.ClientSet)
postCheck func(t *testing.T, sbx infra.Sandbox, metrics infra.CloneMetrics)
expectError string
name string
opts infra.CloneSandboxOptions
serverOpts testutils.TestRuntimeServerOptions
initRuntime *config.InitRuntimeOptions
sbxOverride sbxOverride
checkpointAnnotations map[string]string
preProcess func(t *testing.T, cache *Cache, client *clients.ClientSet)
postCheck func(t *testing.T, sbx infra.Sandbox, metrics infra.CloneMetrics)
expectError string
}{
{
name: "successful clone",
Expand Down Expand Up @@ -480,6 +483,70 @@ func TestCloneSandbox(t *testing.T) {
sbxOverride: sbxOverride{Name: "test-sandbox-csi-mount-2", AccessToken: runtime.AccessToken},
expectError: "failed to perform csi mount",
},
{
name: "annotation fallback - invalid json in checkpoint annotation",
opts: infra.CloneSandboxOptions{
User: user,
CheckPointID: checkpointID,
WaitReadyTimeout: 30 * time.Second,
},
serverOpts: testutils.TestRuntimeServerOptions{
RunCommandResult: runtime.RunCommandResult{
PID: 1,
Exited: true,
},
RunCommandImmediately: true,
},
sbxOverride: sbxOverride{Name: "test-sandbox-anno-invalid-json"},
checkpointAnnotations: map[string]string{
models.ExtensionKeyClaimWithCSIMount_MountConfig: "not-valid-json",
},
expectError: "failed to parse csi mount config from annotation",
},
{
name: "annotation fallback - valid json but pv not found",
opts: infra.CloneSandboxOptions{
User: user,
CheckPointID: checkpointID,
WaitReadyTimeout: 30 * time.Second,
},
serverOpts: testutils.TestRuntimeServerOptions{
RunCommandResult: runtime.RunCommandResult{
PID: 1,
Exited: true,
},
RunCommandImmediately: true,
},
sbxOverride: sbxOverride{Name: "test-sandbox-anno-pv-missing"},
checkpointAnnotations: map[string]string{
models.ExtensionKeyClaimWithCSIMount_MountConfig: `[{"pvName":"non-existent-pv","mountPath":"/data"}]`,
},
expectError: "failed to generate csi mount options config",
},
{
name: "annotation fallback - no csi annotation in checkpoint",
opts: infra.CloneSandboxOptions{
User: user,
CheckPointID: checkpointID,
WaitReadyTimeout: 30 * time.Second,
},
serverOpts: testutils.TestRuntimeServerOptions{
RunCommandResult: runtime.RunCommandResult{
PID: 1,
Exited: true,
},
RunCommandImmediately: true,
},
sbxOverride: sbxOverride{Name: "test-sandbox-anno-none"},
checkpointAnnotations: map[string]string{
"some-other-annotation": "value",
},
postCheck: func(t *testing.T, sbx infra.Sandbox, metrics infra.CloneMetrics) {
assert.NotNil(t, sbx)
// No CSI mount should have happened
assert.Equal(t, time.Duration(0), metrics.CSIMount)
},
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -525,10 +592,15 @@ func TestCloneSandbox(t *testing.T) {

// Create Checkpoint with same name as SandboxTemplate
if tt.opts.CheckPointID != "non-existent-checkpoint" && tt.name != "checkpoint without template label" && tt.name != "template not found" {
cpAnnotations := make(map[string]string)
for k, v := range tt.checkpointAnnotations {
cpAnnotations[k] = v
}
cp := &v1alpha1.Checkpoint{
ObjectMeta: metav1.ObjectMeta{
Name: checkpointID,
Namespace: "default",
Name: checkpointID,
Namespace: "default",
Annotations: cpAnnotations,
Labels: map[string]string{
v1alpha1.LabelSandboxTemplate: checkpointID,
},
Expand All @@ -540,9 +612,7 @@ func TestCloneSandbox(t *testing.T) {
if tt.initRuntime != nil {
initRuntimeAnnotation, err := json.Marshal(tt.initRuntime)
require.NoError(t, err)
cp.Annotations = map[string]string{
v1alpha1.AnnotationInitRuntimeRequest: string(initRuntimeAnnotation),
}
cp.Annotations[v1alpha1.AnnotationInitRuntimeRequest] = string(initRuntimeAnnotation)
}
_, err = client.ApiV1alpha1().Checkpoints("default").Create(context.Background(), cp, metav1.CreateOptions{})
require.NoError(t, err)
Expand Down Expand Up @@ -1097,6 +1167,49 @@ func TestCreateCheckPoint(t *testing.T) {
assert.Nil(t, tmpl.Spec.Runtimes, "template Runtimes should be nil when sandbox has no Runtimes")
},
},
{
name: "checkpoint with CSI mount annotation - propagated to checkpoint",
sandbox: func() *v1alpha1.Sandbox {
sbx := newTestSandbox("test-sandbox-csi")
sbx.Annotations[models.ExtensionKeyClaimWithCSIMount_MountConfig] = `[{"driver":"nfs","source":"/data"}]`
return sbx
}(),
cpStatus: v1alpha1.CheckpointStatus{
Phase: v1alpha1.CheckpointSucceeded,
CheckpointId: "cp-id-csi",
},
tmplOverride: tmplOverride{Name: "tmpl-csi", UID: "uid-csi"},
opts: infra.CreateCheckpointOptions{
WaitSuccessTimeout: 5 * time.Second,
},
postCheck: func(t *testing.T, id string, clientSet *clients.ClientSet) {
assert.Equal(t, "cp-id-csi", id)
// Verify CSI mount annotation is propagated to checkpoint
cp, err := clientSet.ApiV1alpha1().Checkpoints("default").Get(context.Background(), "tmpl-csi", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, `[{"driver":"nfs","source":"/data"}]`, cp.Annotations[models.ExtensionKeyClaimWithCSIMount_MountConfig],
"CSI mount annotation should be propagated to checkpoint")
},
},
{
name: "checkpoint without CSI mount annotation - checkpoint has no CSI annotation",
sandbox: newTestSandbox("test-sandbox-no-csi"),
cpStatus: v1alpha1.CheckpointStatus{
Phase: v1alpha1.CheckpointSucceeded,
CheckpointId: "cp-id-no-csi",
},
tmplOverride: tmplOverride{Name: "tmpl-no-csi", UID: "uid-no-csi"},
opts: infra.CreateCheckpointOptions{
WaitSuccessTimeout: 5 * time.Second,
},
postCheck: func(t *testing.T, id string, clientSet *clients.ClientSet) {
assert.Equal(t, "cp-id-no-csi", id)
cp, err := clientSet.ApiV1alpha1().Checkpoints("default").Get(context.Background(), "tmpl-no-csi", metav1.GetOptions{})
require.NoError(t, err)
assert.Empty(t, cp.Annotations[models.ExtensionKeyClaimWithCSIMount_MountConfig],
"checkpoint should not have CSI mount annotation when sandbox doesn't have one")
},
},
{
name: "checkpoint with sandbox PersistentContents - inherit from template",
sandbox: func() *v1alpha1.Sandbox {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sandbox-manager/infra/sandboxcr/sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"net/http"
"time"

"github.com/openkruise/agents/pkg/utils/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"github.com/openkruise/agents/pkg/utils/runtime"

agentsv1alpha1 "github.com/openkruise/agents/api/v1alpha1"
"github.com/openkruise/agents/pkg/agent-runtime/storages"
"github.com/openkruise/agents/pkg/proxy"
Expand Down Expand Up @@ -308,7 +309,7 @@ func (s *Sandbox) Resume(ctx context.Context) error {
}

// Perform csi mount after resume
csiMountConfigRequests, err := getCsiMountExtensionRequest(s.Sandbox)
csiMountConfigRequests, err := runtime.GetCsiMountExtensionRequest(s.Sandbox)
if err != nil {
log.Error(err, "failed to get csi mount request")
return fmt.Errorf("failed to get csi mount request: %w", err)
Expand Down
13 changes: 0 additions & 13 deletions pkg/sandbox-manager/infra/sandboxcr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/openkruise/agents/api/v1alpha1"
"github.com/openkruise/agents/pkg/sandbox-manager/config"
"github.com/openkruise/agents/pkg/servers/e2b/models"
)

func SetSandboxCondition(sbx *v1alpha1.Sandbox, tp string, status metav1.ConditionStatus, reason, message string) {
Expand Down Expand Up @@ -57,15 +56,3 @@ func getInitRuntimeRequest(s metav1.Object) (*config.InitRuntimeOptions, error)
}
return initRuntimeOpts, nil
}

func getCsiMountExtensionRequest(s metav1.Object) ([]v1alpha1.CSIMountConfig, error) {
var csiMountRequests []v1alpha1.CSIMountConfig
csiMountRequestsRaw := s.GetAnnotations()[models.ExtensionKeyClaimWithCSIMount_MountConfig]
if csiMountRequestsRaw == "" {
return nil, nil
}
if err := json.Unmarshal([]byte(csiMountRequestsRaw), &csiMountRequests); err != nil {
return nil, fmt.Errorf("failed to unmarshal csi mount options: %v", err)
}
return csiMountRequests, nil
}
Loading
Loading