diff --git a/pkg/apis/v1/labels.go b/pkg/apis/v1/labels.go index c2740def4df5..6ddaa412f73c 100644 --- a/pkg/apis/v1/labels.go +++ b/pkg/apis/v1/labels.go @@ -171,6 +171,7 @@ var ( AnnotationEC2NodeClassHashVersion = apis.Group + "/ec2nodeclass-hash-version" AnnotationInstanceTagged = apis.Group + "/tagged" AnnotationInstanceProfile = apis.Group + "/instance-profile-name" + AnnotationInstanceInterrupted = apis.Group + "/instance-interrupted" NodeClaimTagKey = coreapis.Group + "/nodeclaim" NameTagKey = "Name" diff --git a/pkg/cloudprovider/cloudprovider.go b/pkg/cloudprovider/cloudprovider.go index 7e2ca909ec69..926ff113f9d1 100644 --- a/pkg/cloudprovider/cloudprovider.go +++ b/pkg/cloudprovider/cloudprovider.go @@ -36,6 +36,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/apis" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" + "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption" "github.com/aws/karpenter-provider-aws/pkg/operator/options" "github.com/aws/karpenter-provider-aws/pkg/utils" @@ -249,6 +250,19 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim) if id := nodeClaim.Labels[cloudprovider.ReservationIDLabel]; id != "" && cloudprovider.IsNodeClaimNotFoundError(err) { c.capacityReservationProvider.MarkTerminated(id) } + + if cloudprovider.IsNodeClaimNotFoundError(err) && isInterruptibleInstance(nodeClaim) { + capacityType := nodeClaim.Labels[karpv1.CapacityTypeLabelKey] + instanceInterrupted := nodeClaim.Annotations[v1.AnnotationInstanceInterrupted] + if instanceInterrupted == "" { + log.FromContext(ctx).Info("detected instance termination without interruption notification", + "capacity-type", capacityType) + interruption.MissedInterruptionTerminations.Inc(map[string]string{ + "capacity_type": capacityType, + }) + } + } + return err } @@ -482,3 +496,16 @@ func newTerminatingNodeClassError(name string) *errors.StatusError { err.ErrStatus.Message = fmt.Sprintf("%s %q is terminating, treating as not found", qualifiedResource.String(), name) return err } + +func isInterruptibleInstance(nodeClaim *karpv1.NodeClaim) bool { + capacityType := nodeClaim.Labels[karpv1.CapacityTypeLabelKey] + if capacityType == karpv1.CapacityTypeSpot { + return true + } + if capacityType == karpv1.CapacityTypeReserved { + if interruptibleLabel, ok := nodeClaim.Labels[v1.LabelCapacityReservationInterruptible]; ok && interruptibleLabel == "true" { + return true + } + } + return false +} diff --git a/pkg/cloudprovider/suite_test.go b/pkg/cloudprovider/suite_test.go index 1ffd14a4bea6..04e7070ccd84 100644 --- a/pkg/cloudprovider/suite_test.go +++ b/pkg/cloudprovider/suite_test.go @@ -43,6 +43,7 @@ import ( "github.com/aws/karpenter-provider-aws/pkg/apis" v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1" "github.com/aws/karpenter-provider-aws/pkg/cloudprovider" + "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption" "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass" "github.com/aws/karpenter-provider-aws/pkg/fake" "github.com/aws/karpenter-provider-aws/pkg/operator/options" @@ -1517,4 +1518,44 @@ var _ = Describe("CloudProvider", func() { Entry("when the capacity reservation type is capacity-block", v1.CapacityReservationTypeCapacityBlock, false), ) }) + Context("Interruption Metric Tracking", func() { + It("should increment MissedInterruptionTerminations metric depending on interruption annotation", func() { + nodePool.Spec.Template.Spec.Requirements = []karpv1.NodeSelectorRequirementWithMinValues{ + { + Key: karpv1.CapacityTypeLabelKey, + Operator: corev1.NodeSelectorOpIn, + Values: []string{karpv1.CapacityTypeSpot}, + }, + } + nodeClass.Spec.Kubelet = &v1.KubeletConfiguration{ + MaxPods: aws.Int32(1), + } + ExpectApplied(ctx, env.Client, nodePool, nodeClass) + for range 2 { + pod := coretest.UnschedulablePod() + ExpectApplied(ctx, env.Client, pod) + ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod) + ExpectScheduled(ctx, env.Client, pod) + } + ncs := ExpectNodeClaims(ctx, env.Client) + Expect(ncs).To(HaveLen(2)) + + // add annotation to only 1 nodeclaim + ncs[0].Annotations = lo.Assign(ncs[0].Annotations, map[string]string{ + v1.AnnotationInstanceInterrupted: "true", + }) + ExpectApplied(ctx, env.Client, ncs[0]) + + // mock underlying instance as already terminated + awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(&ec2.DescribeInstancesOutput{ + Reservations: []ec2types.Reservation{}, + }) + err1 := cloudProvider.Delete(ctx, ncs[0]) + err2 := cloudProvider.Delete(ctx, ncs[1]) + Expect(corecloudprovider.IsNodeClaimNotFoundError(err1)).To(BeTrue()) + Expect(corecloudprovider.IsNodeClaimNotFoundError(err2)).To(BeTrue()) + // one nodeclaim did not have the interrupted annotation + ExpectMetricCounterValue(interruption.MissedInterruptionTerminations, 1, map[string]string{"capacity_type": karpv1.CapacityTypeSpot}) + }) + }) }) diff --git a/pkg/controllers/interruption/controller.go b/pkg/controllers/interruption/controller.go index 70f05c10a64c..43758153b42d 100644 --- a/pkg/controllers/interruption/controller.go +++ b/pkg/controllers/interruption/controller.go @@ -27,8 +27,10 @@ import ( sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/awslabs/operatorpkg/reconciler" "github.com/awslabs/operatorpkg/singleton" + "github.com/samber/lo" "go.uber.org/multierr" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" @@ -241,6 +243,15 @@ func (c *Controller) handleNodeClaim(ctx context.Context, msg messages.Message, } if action != NoAction { + stored := nodeClaim.DeepCopy() + nodeClaim.Annotations = lo.Assign(nodeClaim.Annotations, map[string]string{ + v1.AnnotationInstanceInterrupted: "true", + }) + if !equality.Semantic.DeepEqual(stored, nodeClaim) { + if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { + return fmt.Errorf("annotating nodeclaim as interrupted, %w", err) + } + } return c.deleteNodeClaim(ctx, msg, nodeClaim, node) } return nil diff --git a/pkg/controllers/interruption/metrics.go b/pkg/controllers/interruption/metrics.go index 655183f4190b..2fa33876bd32 100644 --- a/pkg/controllers/interruption/metrics.go +++ b/pkg/controllers/interruption/metrics.go @@ -28,6 +28,16 @@ const ( ) var ( + MissedInterruptionTerminations = opmetrics.NewPrometheusCounter( + crmetrics.Registry, + prometheus.CounterOpts{ + Namespace: metrics.Namespace, + Subsystem: interruptionSubsystem, + Name: "missed_termination_total", + Help: "Count of instance terminations that were not notified via SQS interruption messages", + }, + []string{metrics.CapacityTypeLabel}, + ) ReceivedMessages = opmetrics.NewPrometheusCounter( crmetrics.Registry, prometheus.CounterOpts{ diff --git a/website/content/en/preview/reference/metrics.md b/website/content/en/preview/reference/metrics.md index 386ee4f8e912..ac7fe702ab5f 100644 --- a/website/content/en/preview/reference/metrics.md +++ b/website/content/en/preview/reference/metrics.md @@ -275,6 +275,10 @@ Amount of time an interruption message is on the queue before it is processed by Count of messages deleted from the SQS queue. - Stability Level: STABLE +### `karpenter_interruption_missed_termination_total` +Count of instance terminations that were not notified via SQS interruption messages +- Stability Level: STABLE + ## Cluster Metrics ### `karpenter_cluster_utilization_percent`