Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions controllers/keda/scaledjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,74 @@ var _ = Describe("ScaledJobController", func() {
}
Expect(foundTriggersField).To(BeTrue())
})

It("ScaledJob minReplicaCount defaults to nil when not set", func() {
jobName := "use-default-minreplicacount-value"
sjName := "sj-" + jobName
sj := &kedav1alpha1.ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: sjName,
Namespace: "default",
},
Spec: kedav1alpha1.ScaledJobSpec{
JobTargetRef: generateJobSpec(jobName),
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cron",
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
},
},
}
pollingInterval := int32(5)
sj.Spec.PollingInterval = &pollingInterval
err := k8sClient.Create(context.Background(), sj)
Expect(err).ToNot(HaveOccurred())

// Confirm the minReplicaCount is nil
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
Expect(err).ToNot(HaveOccurred())
Expect(sj.Spec.MinReplicaCount).To(BeNil())
})

It("ScaledJob maxReplicaCount defaults to nil when not set", func() {
jobName := "use-default-maxreplicacount-value"
sjName := "sj-" + jobName
sj := &kedav1alpha1.ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: sjName,
Namespace: "default",
},
Spec: kedav1alpha1.ScaledJobSpec{
JobTargetRef: generateJobSpec(jobName),
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cron",
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
},
},
}
pollingInterval := int32(5)
sj.Spec.PollingInterval = &pollingInterval
err := k8sClient.Create(context.Background(), sj)
Expect(err).ToNot(HaveOccurred())

// Confirm the maxReplicaCount is nil
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
Expect(err).ToNot(HaveOccurred())
Expect(sj.Spec.MaxReplicaCount).To(BeNil())
})
})
})

Expand Down
29 changes: 29 additions & 0 deletions tests/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,35 @@ func WaitForAllPodRunningInNamespace(t *testing.T, kc *kubernetes.Clientset, nam
return false
}

func WaitForRunningPodCount(t *testing.T, kc *kubernetes.Clientset, scaledJobName, namespace string, target, iterations, interval int) bool {
for i := 0; i < iterations; i++ {
pods, err := kc.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("scaledjob.keda.sh/name=%s", scaledJobName),
})
if err != nil {
t.Logf("cannot list pods - %s", err)
}

runningPodCount := 0
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodRunning {
runningPodCount++
}
}

t.Logf("Waiting for running pods. Namespace - %s, Current - %d, Target - %d",
namespace, runningPodCount, target)
if runningPodCount == target {
return true
} else if runningPodCount > target {
return false
}

time.Sleep(time.Duration(interval) * time.Second)
}
return false
}

// Waits until the Horizontal Pod Autoscaler for the scaledObject reports that it has metrics available
// to calculate, or until the number of iterations are done, whichever happens first.
func WaitForHPAMetricsToPopulate(t *testing.T, kc *kubernetes.Clientset, name, namespace string, iterations, intervalSeconds int) bool {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
//go:build e2e
// +build e2e

package accurate_scaling_strategy_test

import (
"context"
"encoding/base64"
"fmt"
"os"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/joho/godotenv"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes"

. "github.com/kedacore/keda/v2/tests/helper" // For helper methods
)

var _ = godotenv.Load("../../.env") // For loading env variables from .env

const (
testName = "accurate-scaling-strategy-test"
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
scaledJobName = fmt.Sprintf("%s-sj", testName)
connectionString = os.Getenv("TF_AZURE_STORAGE_CONNECTION_STRING")
queueName = fmt.Sprintf("queue-%d", GetRandomNumber())
secretName = fmt.Sprintf("%s-secret", testName)
)

// YAML templates for your Kubernetes resources
const (
secretTemplate = `
apiVersion: v1
kind: Secret
metadata:
name: {{.SecretName}}
namespace: {{.TestNamespace}}
data:
AzureWebJobsStorage: {{.Connection}}
`

scaledJobTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: {{.ScaledJobName}}
namespace: {{.TestNamespace}}
labels:
app: {{.ScaledJobName}}
spec:
jobTargetRef:
template:
spec:
containers:
- name: sleeper
image: docker.io/library/busybox
command:
- sleep
- "120"
imagePullPolicy: IfNotPresent
envFrom:
- secretRef:
name: {{.SecretName}}
restartPolicy: Never
backoffLimit: 1
pollingInterval: 5
maxReplicaCount: 10
scalingStrategy:
strategy: "accurate"
triggers:
- type: azure-queue
metadata:
queueName: {{.QueueName}}
connectionFromEnv: AzureWebJobsStorage
queueLength: '1'
`
)

type templateData struct {
ScaledJobName string
TestNamespace string
QueueName string
SecretName string
Connection string
}

func TestScalingStrategy(t *testing.T) {
// Setup
ctx := context.Background()
t.Log("--- setting up ---")
require.NotEmpty(t, connectionString, "TF_AZURE_STORAGE_CONNECTION_STRING env variable is required for azure queue test")

queueClient, err := azqueue.NewQueueClientFromConnectionString(connectionString, queueName, nil)
assert.NoErrorf(t, err, "cannot create the queue client - %s", err)
_, err = queueClient.Create(ctx, nil)
assert.NoErrorf(t, err, "cannot create the queue - %s", err)

// Create kubernetes resources
kc := GetKubernetesClient(t)
data, templates := getTemplateData()

t.Cleanup(func() {
DeleteKubernetesResources(t, testNamespace, data, templates)
_, err := queueClient.Delete(ctx, nil)
assert.NoErrorf(t, err, "cannot delete the queue - %s", err)
})

CreateKubernetesResources(t, kc, testNamespace, data, templates)
testAccurateScaling(ctx, t, kc, queueClient)
}

func getTemplateData() (templateData, []Template) {
return templateData{
// Populate fields required in YAML templates
ScaledJobName: scaledJobName,
TestNamespace: testNamespace,
QueueName: queueName,
Connection: base64.StdEncoding.EncodeToString([]byte(connectionString)),
SecretName: secretName,
}, []Template{
{Name: "secretTemplate", Config: secretTemplate},
{Name: "scaledJobTemplate", Config: scaledJobTemplate},
}
}

func testAccurateScaling(ctx context.Context, t *testing.T, kc *kubernetes.Clientset, client *azqueue.QueueClient) {
iterationCount := 60

// Base case (number of scale = maxScale since pendingJobs = 0). Enqueue up 4 messages
enqueueMessages(ctx, t, client, 4)

assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1),
"job count should be %d after %d iterations", 4, iterationCount)

// Clear the messages to simulate message consumption and wait for the job pods to be Running
_, err := client.ClearMessages(ctx, nil)
assert.NoErrorf(t, err, "cannot clear queue - %s", err)
assert.True(t, WaitForRunningPodCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1),
"running pod count should be %d after %d iterations", 4, iterationCount)

// Wait for job completion
WaitForAllJobsSuccess(t, kc, testNamespace, 120, 1)

// Test the cap condition (maxScale + runningJobs > maxReplicaCount). Enqueue 4 messages
enqueueMessages(ctx, t, client, 4)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 8, iterationCount, 1),
"job count should be %d after %d iterations", 8, iterationCount)

// Clear the messages to simulate message consumption and wait for the job pods to be Running
_, err = client.ClearMessages(ctx, nil)
assert.NoErrorf(t, err, "cannot clear queue - %s", err)
assert.True(t, WaitForRunningPodCount(t, kc, scaledJobName, testNamespace, 4, iterationCount, 1),
"running pod count should be %d after %d iterations", 4, iterationCount)

// Enqueue 8 more messages to trigger the cap condition
enqueueMessages(ctx, t, client, 8)
assert.True(t, WaitForScaledJobCount(t, kc, scaledJobName, testNamespace, 14, iterationCount, 1),
"job count should be %d after %d iterations", 14, iterationCount)

// Clear the messages to simulate message consumption and wait for the job pods to be Running
_, err = client.ClearMessages(ctx, nil)
assert.NoErrorf(t, err, "cannot clear queue - %s", err)
assert.True(t, WaitForRunningPodCount(t, kc, scaledJobName, testNamespace, 10, iterationCount, 1),
"running pod count should be %d after %d iterations", 10, iterationCount)

// Message cleanup and wait for jobs to complete
WaitForAllJobsSuccess(t, kc, testNamespace, 120, 1)
}

func enqueueMessages(ctx context.Context, t *testing.T, client *azqueue.QueueClient, count int) {
for i := 0; i < count; i++ {
msg := fmt.Sprintf("Message - %d", i)
_, err := client.EnqueueMessage(ctx, msg, nil)
assert.NoErrorf(t, err, "cannot enqueue message - %s", err)
t.Logf("Message queued")
}
}
Loading