diff --git a/controllers/keda/scaledjob_controller_test.go b/controllers/keda/scaledjob_controller_test.go index b08a27bd511..ae9c0ccbf6c 100644 --- a/controllers/keda/scaledjob_controller_test.go +++ b/controllers/keda/scaledjob_controller_test.go @@ -186,6 +186,74 @@ var _ = Describe("ScaledJobController", func() { } Expect(foundTriggersField).To(BeTrue()) }) + + It("ScaledJob minReplicaCount defaults to nil when not set", func() { + jobName := "use-default-minreplicacount-value" + sjName := "sj-" + jobName + sj := &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: sjName, + Namespace: "default", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + JobTargetRef: generateJobSpec(jobName), + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + pollingInterval := int32(5) + sj.Spec.PollingInterval = &pollingInterval + err := k8sClient.Create(context.Background(), sj) + Expect(err).ToNot(HaveOccurred()) + + // Confirm the minReplicaCount is nil + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + Expect(err).ToNot(HaveOccurred()) + Expect(sj.Spec.MinReplicaCount).To(BeNil()) + }) + + It("ScaledJob maxReplicaCount defaults to nil when not set", func() { + jobName := "use-default-maxreplicacount-value" + sjName := "sj-" + jobName + sj := &kedav1alpha1.ScaledJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: sjName, + Namespace: "default", + }, + Spec: kedav1alpha1.ScaledJobSpec{ + JobTargetRef: generateJobSpec(jobName), + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + }, + } + pollingInterval := int32(5) + sj.Spec.PollingInterval = &pollingInterval + err := k8sClient.Create(context.Background(), sj) + Expect(err).ToNot(HaveOccurred()) + + // Confirm the maxReplicaCount is nil + err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj) + Expect(err).ToNot(HaveOccurred()) + Expect(sj.Spec.MaxReplicaCount).To(BeNil()) + }) }) }) diff --git a/tests/helper/helper.go b/tests/helper/helper.go index 9335319ea45..9b9f3136743 100644 --- a/tests/helper/helper.go +++ b/tests/helper/helper.go @@ -490,6 +490,35 @@ func WaitForAllPodRunningInNamespace(t *testing.T, kc *kubernetes.Clientset, nam return false } +func WaitForRunningPodCount(t *testing.T, kc *kubernetes.Clientset, scaledJobName, namespace string, target, iterations, interval int) bool { + for i := 0; i < iterations; i++ { + pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: fmt.Sprintf("scaledjob.keda.sh/name=%s", scaledJobName), + }) + if err != nil { + t.Logf("cannot list pods - %s", err) + } + + runningPodCount := 0 + for _, pod := range pods.Items { + if pod.Status.Phase == corev1.PodRunning { + runningPodCount++ + } + } + + t.Logf("Waiting for running pods. Namespace - %s, Current - %d, Target - %d", + namespace, runningPodCount, target) + if runningPodCount == target { + return true + } else if runningPodCount > target { + return false + } + + time.Sleep(time.Duration(interval) * time.Second) + } + return false +} + // Waits until the Horizontal Pod Autoscaler for the scaledObject reports that it has metrics available // to calculate, or until the number of iterations are done, whichever happens first. func WaitForHPAMetricsToPopulate(t *testing.T, kc *kubernetes.Clientset, name, namespace string, iterations, intervalSeconds int) bool { diff --git a/tests/internals/scaling_strategies/accurate_scaling_strategy/accurate_scaling_strategy_test.go b/tests/internals/scaling_strategies/accurate_scaling_strategy/accurate_scaling_strategy_test.go new file mode 100644 index 00000000000..57ece055cce --- /dev/null +++ b/tests/internals/scaling_strategies/accurate_scaling_strategy/accurate_scaling_strategy_test.go @@ -0,0 +1,183 @@ +//go:build e2e +// +build e2e + +package accurate_scaling_strategy_test + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" // For helper methods +) + +var _ = godotenv.Load("../../.env") // For loading env variables from .env + +const ( + testName = "accurate-scaling-strategy-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + scaledJobName = fmt.Sprintf("%s-sj", testName) + connectionString = os.Getenv("TF_AZURE_STORAGE_CONNECTION_STRING") + queueName = fmt.Sprintf("queue-%d", GetRandomNumber()) + secretName = fmt.Sprintf("%s-secret", testName) +) + +// YAML templates for your Kubernetes resources +const ( + secretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + AzureWebJobsStorage: {{.Connection}} +` + + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} + labels: + app: {{.ScaledJobName}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: sleeper + image: docker.io/library/busybox + command: + - sleep + - "120" + imagePullPolicy: IfNotPresent + envFrom: + - secretRef: + name: {{.SecretName}} + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: 10 + scalingStrategy: + strategy: "accurate" + triggers: + - type: azure-queue + metadata: + queueName: {{.QueueName}} + connectionFromEnv: AzureWebJobsStorage + queueLength: '1' +` +) + +type templateData struct { + ScaledJobName string + TestNamespace string + QueueName string + SecretName string + Connection string +} + +func TestScalingStrategy(t *testing.T) { + // Setup + ctx := context.Background() + t.Log("--- setting up ---") + require.NotEmpty(t, connectionString, "TF_AZURE_STORAGE_CONNECTION_STRING env variable is required for azure queue test") + + queueClient, err := azqueue.NewQueueClientFromConnectionString(connectionString, queueName, nil) + assert.NoErrorf(t, err, "cannot create the queue client - %s", err) + _, err = queueClient.Create(ctx, nil) + assert.NoErrorf(t, err, "cannot create the queue - %s", err) + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + _, err := queueClient.Delete(ctx, nil) + assert.NoErrorf(t, err, "cannot delete the queue - %s", err) + }) + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + testAccurateScaling(ctx, t, kc, queueClient) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + // Populate fields required in YAML templates + ScaledJobName: scaledJobName, + TestNamespace: testNamespace, + QueueName: queueName, + Connection: base64.StdEncoding.EncodeToString([]byte(connectionString)), + SecretName: secretName, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "scaledJobTemplate", Config: scaledJobTemplate}, + } +} + +func testAccurateScaling(ctx context.Context, t *testing.T, kc *kubernetes.Clientset, client *azqueue.QueueClient) { + iterationCount := 60 + + // Base case (number of scale = maxScale since pendingJobs = 0). Enqueue up 4 messages + enqueueMessages(ctx, t, client, 4) + + assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1), + "job count should be %d after %d iterations", 4, iterationCount) + + // Clear the messages to simulate message consumption and wait for the job pods to be Running + _, err := client.ClearMessages(ctx, nil) + assert.NoErrorf(t, err, "cannot clear queue - %s", err) + assert.True(t, WaitForRunningPodCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1), + "running pod count should be %d after %d iterations", 4, iterationCount) + + // Wait for job completion + WaitForAllJobsSuccess(t, kc, testNamespace, 120, 1) + + // Test the cap condition (maxScale + runningJobs > maxReplicaCount). Enqueue 4 messages + enqueueMessages(ctx, t, client, 4) + assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 8, iterationCount, 1), + "job count should be %d after %d iterations", 8, iterationCount) + + // Clear the messages to simulate message consumption and wait for the job pods to be Running + _, err = client.ClearMessages(ctx, nil) + assert.NoErrorf(t, err, "cannot clear queue - %s", err) + assert.True(t, WaitForRunningPodCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1), + "running pod count should be %d after %d iterations", 4, iterationCount) + + // Enqueue 8 more messages to trigger the cap condition + enqueueMessages(ctx, t, client, 8) + assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 14, iterationCount, 1), + "job count should be %d after %d iterations", 14, iterationCount) + + // Clear the messages to simulate message consumption and wait for the job pods to be Running + _, err = client.ClearMessages(ctx, nil) + assert.NoErrorf(t, err, "cannot clear queue - %s", err) + assert.True(t, WaitForRunningPodCount(t, kc, scaledJobName, testNamespace, 10, iterationCount, 1), + "running pod count should be %d after %d iterations", 10, iterationCount) + + // Message cleanup and wait for jobs to complete + WaitForAllJobsSuccess(t, kc, testNamespace, 120, 1) +} + +func enqueueMessages(ctx context.Context, t *testing.T, client *azqueue.QueueClient, count int) { + for i := 0; i < count; i++ { + msg := fmt.Sprintf("Message - %d", i) + _, err := client.EnqueueMessage(ctx, msg, nil) + assert.NoErrorf(t, err, "cannot enqueue message - %s", err) + t.Logf("Message queued") + } +} diff --git a/tests/internals/scaling_strategies/eager_scaling_strategy_test.go b/tests/internals/scaling_strategies/eager_scaling_strategy/eager_scaling_strategy_test.go similarity index 100% rename from tests/internals/scaling_strategies/eager_scaling_strategy_test.go rename to tests/internals/scaling_strategies/eager_scaling_strategy/eager_scaling_strategy_test.go