feat: Introduce new Azure Cosmos DB Change Feed Scaler#7557
feat: Introduce new Azure Cosmos DB Change Feed Scaler#7557yash2710 wants to merge 13 commits intokedacore:mainfrom
Conversation
|
Thank you for your contribution! 🙏 Please understand that we will do our best to review your PR and give you feedback as soon as possible, but please bear with us if it takes a little longer as expected. While you are waiting, make sure to:
Once the initial tests are successful, a KEDA member will ensure that the e2e tests are run. Once the e2e tests have been successfully completed, the PR may be merged at a later date. Please be patient. Learn more about our contribution guide. |
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
2203d23 to
f6277f1
Compare
f6277f1 to
5291ee4
Compare
| TF_AZURE_APP_INSIGHTS_NAME= | ||
| TF_AZURE_DATA_EXPLORER_DB= | ||
| TF_AZURE_DATA_EXPLORER_ENDPOINT= | ||
| TF_AZURE_COSMOSDB_CONNECTION_STRING= |
There was a problem hiding this comment.
Question to the reviewer/maintainer -
- Where do I need to define the connection string
- What subscription needs to hold the test comos db account?
There was a problem hiding this comment.
@JorTurFer Can you help with this? I bet you can do this faster than me.
|
hello @zroubalik I wonder if you can review this or point us to folks who can help? much appreciated! |
|
Hi @rickbrouwer Appreciate your help with the reviews so far! Can you please help us finish with the final set of changes as well (hopefully!) @yash2710 has them ready for review. Thanks! |
|
Here is a somewhat longer review. I haven't gone through everything by a long shot yet, but here is a first part of my input. I see you're using Next I see you've built a full Cosmos DB REST client in this file (connection string parsing, session token parsing, 410 retry). The other Azure scalers seems to all delegate this to the official SDKs (eventhubs, servicebus, blob). Microsoft also ships github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos, which handles auth, cloud environments and token refresh. Is there a reason we can't use the SDK here? In all the other scalers we put validation in a Validate() method on the metadata struct (see for example azureBlobMetadata.Validate() and azureServiceBusMetadata.Validate()). We should move the validation over to match that pattern for consistency. Last for now. I see you are using a |
|
Hi @rickbrouwer ! To set some context, both @yash2710 and I work on the Cosmos DB SDKs team and own the Go SDK you referenced.
To help set some context here: Yes, we could use the current v1 of the SDK here. However, the Cosmos DB SDK is quite complicated, with a lot of failover policies, cross partition query support, and even custom TCP protocols when implemented fully. The v1 Go SDK does not implement most of this functionality. Fortunately, the functionality needed to read change feed processor state is minimal and is indeed currently supported by that v1 Go SDK. The problem isn't now, it's the future roadmap. As part of an effort to make our SDK maintenance more sustainable (we maintain 6 SDKs right now, each of which has to implement this complex logic the same way), we are moving down a path towards using a core native "driver" component (like Our current plan is to shift to wrapping this native component in a v2 Go SDK, which means that SDK depends on CGo. Obviously, that's got a lot of risk and challenges to it, but most critical to right now is that using this v2 SDK would end up requiring KEDA to build with CGo and link to this native component. We don't believe this is a reasonable thing to ask KEDA to do. Fortunately, the functionality needed for KEDA here is, as I mentioned before, minimal and expressed in the fully supported, documented, and highly stable REST API (which documents authentication, basic retry policies, and the "streaming" queries needed for reading this state). So, rather than add a dependency on the v1 Go SDK that likely will end up trapped on v1 (even after v1 goes out of support), our thinking was to build this component independent of the Go SDK. We're happy to take on maintenance of this component, as we believe that burden to be minimal and much easier than trying to maintain support for the entire v1 Go SDK indefinitely.
This is the minimum required version from the servers to access the APIs we need, it's quite common to reference the lowest necessary version in this case. Hope that clarifies things a bit! As for the other questions, I'll leave that to @yash2710 ! |
|
That certainly clarifies it! Thanks for the detailed answer. Then that seems perfectly fine to me. |
Add internal KEDA scaler for Azure Cosmos DB change feed processor lag estimation. Translates the existing C# external scaler to a native Go internal scaler. - REST API client with HMAC-SHA256 and workload identity auth - Supports .NET and Java SDK lease formats (PK range and EPK based) - Configurable lag and activation thresholds - Separate data and lease container connection support - Partition split detection with automatic retry - Unit tests with httptest mocks for all lease formats - E2E test scaffold - Auto-generated scaler metadata schema Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
Implement addDocuments using Cosmos DB REST API with HMAC-SHA256 auth. Fix changelog format, golangci-lint errcheck/staticcheck/unparam issues. Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
Change the scaler metric from counting partitions-with-lag to summing total estimated lag across all partitions. This matches the EventHub scaler's approach and provides better scaling behavior: - Small lag across many partitions no longer over-provisions replicas - HPA formula: replicas = ceil(totalLag / changeFeedLagThreshold) - Capped at partition count to prevent over-scaling - Renamed metadata: lagThreshold -> changeFeedLagThreshold (default: 100) - Added getChangeFeedTotalLagRelatedToPartitionAmount partition cap Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
On failure to read lease documents or change feed, return partitions * threshold as the metric (scale to max replicas) instead of propagating the error. This ensures the system is not under-provisioned during transient failures. Caches last known partition count for use during errors. Falls back to threshold value if no partition count is cached. Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
When error occurs with no prior successful poll (e.g. fresh operator restart with bad credentials), return 100*threshold instead of just threshold to ensure HPA scales to maxReplicaCount. Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
- Log partition count and per-partition lag at debug level - Log partition split detection as warning - Log which error fallback path is taken (cached vs uncached) - Log fallback lag value used during errors - Log unparseable session tokens and LSN values - Log empty lease container at debug level Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
Instead of returning a large fallback value when there is no cached partition count, propagate the error to KEDA. This lets KEDA's standard error handling (keep current replicas) and optional fallback config handle the situation. Cached partition fallback still returns max lag when available. Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
Add setupCosmosDB function that creates the database, data container, and lease container via REST API before running tests. Handles 409 Conflict (already exists) gracefully. Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
- generateCosmosDBAuthToken returns (string, error) instead of silently returning empty string on base64 decode failure - Add url.QueryEscape to AAD bearer token Authorization header - Filter lease documents by processorName using parameterized STARTSWITH query to prevent over-counting when multiple processors share a lease container - Add processorName field to cosmosDBClient struct Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
…ardcoding Replace hardcoded azure.PublicCloud.ResourceIdentifiers.CosmosDB with cloud-aware resolution using azure.ParseEnvironmentProperty, consistent with azure_eventhub_scaler.go and azure_servicebus_scaler.go. This ensures workload identity authentication uses the correct token scope for sovereign clouds (Azure China, US Gov, German) and supports Private cloud configurations via the 'cosmosDBResourceURL' metadata key. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
Follow the convention used by azureBlobMetadata.Validate() and azureServiceBusMetadata.Validate() cross-field normalization lives in Validate() which is called automatically by TypedConfig via the CustomValidator interface. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
…410, propagate errors - Use json.Marshal for processorName in query body to safely handle special characters (e.g. quotes) in processor names - Return 0 instead of -1 for partition lag on 410 Gone (split/merge) - Remove custom error fallback logic in GetMetricsAndActivity; just propagate errors to let KEDA's fallback spec on ScaledObject handle it, consistent with EventHub, ServiceBus, and Blob scalers - Remove lastPartitionCount field that was only used for the removed fallback logic Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Yash Trivedi <yash2710@users.noreply.github.com>
a3e7605 to
0805d72
Compare
|
@rickbrouwer addressed the comments. Also, can you help answer #7557 (comment) |
There was a problem hiding this comment.
I see that in estimateOnce a single partition error causes the whole estimation to fail. If you have a container with many partitions and one has a transient issue, you lose all metrics for that polling interval. Just checking if that is correct and intended that way.
Further, the lease query uses STARTSWITH(c.id, @prefix) with just the processor name, which means a processor named app would also match leases from a processor named app-extended. Should we append a separator like . to the prefix to avoid this collision?"
And the static check is failing, please run go fmt on the affected file.
| // 2. Extract latest LSN from session token | ||
| // 3. If items present: lag = sessionLSN - firstItem._lsn + 1 | ||
| // 4. If no items (304): lag = 0 (caught up) | ||
| // 5. If 410 Gone: report lag = -1 (split/merge) |
There was a problem hiding this comment.
| // 5. If 410 Gone: report lag = -1 (split/merge) | |
| // 5. If 410 Gone: report lag = 0 (split/merge) |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new native (internal) KEDA scaler type (azure-cosmosdb) that estimates Azure Cosmos DB Change Feed Processor lag by querying lease documents and reading the change feed via the Cosmos DB REST API, enabling scaling without an external gRPC scaler.
Changes:
- Added the internal
azure-cosmosdbscaler implementation and unit tests (including lease format variants and split retry behavior). - Registered the scaler in the scaler builder and updated generated scaler metadata schema (YAML/JSON).
- Added an e2e test scaffold and a new
.envvariable placeholder, plus a changelog entry.
Reviewed changes
Copilot reviewed 6 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/scalers/azure_cosmosdb_scaler.go |
Implements Cosmos DB change feed lag estimation (REST client + auth) and exposes metrics for KEDA scaling. |
pkg/scalers/azure_cosmosdb_scaler_test.go |
Adds unit tests for metadata parsing, auth token generation, lease parsing, lag estimation, and split retry. |
pkg/scaling/scalers_builder.go |
Registers the new azure-cosmosdb scaler type in the scaler factory. |
schema/generated/scalers-schema.yaml |
Adds schema entry for azure-cosmosdb trigger metadata/auth parameters. |
schema/generated/scalers-schema.json |
Adds schema entry for azure-cosmosdb trigger metadata/auth parameters. |
tests/scalers/azure/azure_cosmosdb/azure_cosmosdb_test.go |
Adds an e2e test scaffold for the new scaler and helper REST calls to create resources and inject documents. |
tests/.env |
Adds TF_AZURE_COSMOSDB_CONNECTION_STRING placeholder for e2e runs. |
CHANGELOG.md |
Announces the new Azure Cosmos DB Change Feed scaler. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Default lease settings to data settings if not specified | ||
| if m.LeaseConnection == "" { | ||
| m.LeaseConnection = m.Connection | ||
| } | ||
| if m.LeaseEndpoint == "" { |
| func (m *azureCosmosDBMetadata) Validate() error { | ||
| // Default lease settings to data settings if not specified | ||
| if m.LeaseConnection == "" { | ||
| m.LeaseConnection = m.Connection | ||
| } | ||
| if m.LeaseEndpoint == "" { | ||
| m.LeaseEndpoint = m.Endpoint | ||
| } | ||
| if m.LeaseCosmosDBKey == "" { | ||
| m.LeaseCosmosDBKey = m.CosmosDBKey | ||
| } | ||
| return nil |
| // Set up workload identity credential for bearer token auth | ||
| if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload && client.dataKey == "" { | ||
| cosmosDBResourceURLProvider := func(env azure.AzEnvironment) (string, error) { | ||
| return env.ResourceIdentifiers.CosmosDB, nil | ||
| } | ||
| cosmosDBResourceURL, err := azure.ParseEnvironmentProperty(triggerMetadata, "cosmosDBResourceURL", cosmosDBResourceURLProvider) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("error resolving cosmos db resource URL: %w", err) | ||
| } | ||
| client.cosmosDBResourceURL = cosmosDBResourceURL | ||
|
|
||
| cred, err := azure.NewChainedCredential(logger, podIdentity) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("error creating azure credential for workload identity: %w", err) | ||
| } | ||
| client.credential = cred | ||
| } |
| name: "podIdentity azure-workload with endpoint", | ||
| metadata: map[string]string{ | ||
| "endpoint": "https://test.documents.azure.com:443/", | ||
| "databaseId": "testdb", | ||
| "containerId": "testcontainer", | ||
| "leaseDatabaseId": "testdb", | ||
| "leaseContainerId": "leases", | ||
| "processorName": "testprocessor", | ||
| }, | ||
| isError: false, | ||
| resolvedEnv: map[string]string{}, | ||
| authParams: map[string]string{ | ||
| "cosmosDBKey": "dGVzdGtleQ==", | ||
| }, | ||
| podIdentity: kedav1alpha1.PodIdentityProviderAzureWorkload, | ||
| }, |
| connectionString = os.Getenv("TF_AZURE_COSMOSDB_CONNECTION_STRING") | ||
| testNamespace = fmt.Sprintf("%s-ns", testName) | ||
| secretName = fmt.Sprintf("%s-secret", testName) | ||
| deploymentName = fmt.Sprintf("%s-deployment", testName) | ||
| scaledObjectName = fmt.Sprintf("%s-so", testName) | ||
| databaseID = "keda-test-db" | ||
| containerID = "keda-test-container" | ||
| leaseDatabaseID = "keda-test-db" | ||
| leaseContainerID = "keda-test-leases" | ||
| processorName = "keda-test-processor" |
| func cosmosAuthToken(verb, resourceType, resourceLink, date, key string) string { | ||
| keyBytes, err := base64.StdEncoding.DecodeString(key) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| text := fmt.Sprintf("%s\n%s\n%s\n%s\n\n", | ||
| strings.ToLower(verb), | ||
| strings.ToLower(resourceType), | ||
| resourceLink, | ||
| strings.ToLower(date)) | ||
| h := hmac.New(sha256.New, keyBytes) | ||
| h.Write([]byte(text)) | ||
| sig := base64.StdEncoding.EncodeToString(h.Sum(nil)) | ||
| return url.QueryEscape(fmt.Sprintf("type=master&ver=1.0&sig=%s", sig)) | ||
| } |
Add internal KEDA scaler for Azure Cosmos DB change feed processor lag estimation. Translates the existing C# external scaler to a native Go internal scaler.
ceil(totalLag / changeFeedLagThreshold)partitionCount * thresholdto prevent over-scalingExample with 4 partitions and
changeFeedLagThreshold: 100:ceil(30/100) = 1ceil(150/100) = 2ceil(350/100) = 4, but capped at partition count4 * 100 = 400reported lag → 4 replicasThis was validated on AKS with gradual scale-up (1→2→3→4) and scale-down to zero with 300s cooldown.
Checklist
make generate-scalers-schemahas been run to update any outdated generated filesFixes #7556
Relates to kedacore/keda-docs#1721