Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ var (
AnnotationEC2NodeClassHashVersion = apis.Group + "/ec2nodeclass-hash-version"
AnnotationInstanceTagged = apis.Group + "/tagged"
AnnotationInstanceProfile = apis.Group + "/instance-profile-name"
AnnotationInstanceInterrupted = apis.Group + "/instance-interrupted"

NodeClaimTagKey = coreapis.Group + "/nodeclaim"
NameTagKey = "Name"
Expand Down
27 changes: 27 additions & 0 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/utils"

Expand Down Expand Up @@ -249,6 +250,19 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)
if id := nodeClaim.Labels[cloudprovider.ReservationIDLabel]; id != "" && cloudprovider.IsNodeClaimNotFoundError(err) {
c.capacityReservationProvider.MarkTerminated(id)
}

if cloudprovider.IsNodeClaimNotFoundError(err) && isInterruptibleInstance(nodeClaim) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the metric will produce false positives on the normal Karpenter-initiated termination path. When Karpenter terminates a spot/IODCR instance (consolidation, drift, etc), the instance can be gone by the time cloudProvider.Delete() calls DescribeInstances, which returns NodeClaimNotFoundError. Since Karpenter is the one that initiated the termination (not an interruption), there's no interruption annotation right? So the metric increments even though no SQS message was expected?

capacityType := nodeClaim.Labels[karpv1.CapacityTypeLabelKey]
instanceInterrupted := nodeClaim.Annotations[v1.AnnotationInstanceInterrupted]
if instanceInterrupted == "" {
log.FromContext(ctx).Info("detected instance termination without interruption notification",
"capacity-type", capacityType)
interruption.MissedInterruptionTerminations.Inc(map[string]string{
"capacity_type": capacityType,
})
}
}

return err
}

Expand Down Expand Up @@ -482,3 +496,16 @@ func newTerminatingNodeClassError(name string) *errors.StatusError {
err.ErrStatus.Message = fmt.Sprintf("%s %q is terminating, treating as not found", qualifiedResource.String(), name)
return err
}

func isInterruptibleInstance(nodeClaim *karpv1.NodeClaim) bool {
capacityType := nodeClaim.Labels[karpv1.CapacityTypeLabelKey]
if capacityType == karpv1.CapacityTypeSpot {
return true
}
if capacityType == karpv1.CapacityTypeReserved {
if interruptibleLabel, ok := nodeClaim.Labels[v1.LabelCapacityReservationInterruptible]; ok && interruptibleLabel == "true" {
return true
}
}
return false
}
41 changes: 41 additions & 0 deletions pkg/cloudprovider/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/apis"
v1 "github.com/aws/karpenter-provider-aws/pkg/apis/v1"
"github.com/aws/karpenter-provider-aws/pkg/cloudprovider"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption"
"github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass"
"github.com/aws/karpenter-provider-aws/pkg/fake"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
Expand Down Expand Up @@ -1517,4 +1518,44 @@ var _ = Describe("CloudProvider", func() {
Entry("when the capacity reservation type is capacity-block", v1.CapacityReservationTypeCapacityBlock, false),
)
})
Context("Interruption Metric Tracking", func() {
It("should increment MissedInterruptionTerminations metric depending on interruption annotation", func() {
nodePool.Spec.Template.Spec.Requirements = []karpv1.NodeSelectorRequirementWithMinValues{
{
Key: karpv1.CapacityTypeLabelKey,
Operator: corev1.NodeSelectorOpIn,
Values: []string{karpv1.CapacityTypeSpot},
},
}
nodeClass.Spec.Kubelet = &v1.KubeletConfiguration{
MaxPods: aws.Int32(1),
}
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
for range 2 {
pod := coretest.UnschedulablePod()
ExpectApplied(ctx, env.Client, pod)
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
ExpectScheduled(ctx, env.Client, pod)
}
ncs := ExpectNodeClaims(ctx, env.Client)
Expect(ncs).To(HaveLen(2))

// add annotation to only 1 nodeclaim
ncs[0].Annotations = lo.Assign(ncs[0].Annotations, map[string]string{
v1.AnnotationInstanceInterrupted: "true",
})
ExpectApplied(ctx, env.Client, ncs[0])

// mock underlying instance as already terminated
awsEnv.EC2API.DescribeInstancesBehavior.Output.Set(&ec2.DescribeInstancesOutput{
Reservations: []ec2types.Reservation{},
})
err1 := cloudProvider.Delete(ctx, ncs[0])
err2 := cloudProvider.Delete(ctx, ncs[1])
Expect(corecloudprovider.IsNodeClaimNotFoundError(err1)).To(BeTrue())
Expect(corecloudprovider.IsNodeClaimNotFoundError(err2)).To(BeTrue())
// one nodeclaim did not have the interrupted annotation
ExpectMetricCounterValue(interruption.MissedInterruptionTerminations, 1, map[string]string{"capacity_type": karpv1.CapacityTypeSpot})
})
})
})
11 changes: 11 additions & 0 deletions pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/awslabs/operatorpkg/reconciler"
"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -241,6 +243,15 @@ func (c *Controller) handleNodeClaim(ctx context.Context, msg messages.Message,
}

if action != NoAction {
stored := nodeClaim.DeepCopy()
nodeClaim.Annotations = lo.Assign(nodeClaim.Annotations, map[string]string{
v1.AnnotationInstanceInterrupted: "true",
})
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return fmt.Errorf("annotating nodeclaim as interrupted, %w", err)
}
}
return c.deleteNodeClaim(ctx, msg, nodeClaim, node)
}
return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/controllers/interruption/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ const (
)

var (
MissedInterruptionTerminations = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: interruptionSubsystem,
Name: "missed_termination_total",
Help: "Count of instance terminations that were not notified via SQS interruption messages",
},
[]string{metrics.CapacityTypeLabel},
)
ReceivedMessages = opmetrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Expand Down
4 changes: 4 additions & 0 deletions website/content/en/preview/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ Amount of time an interruption message is on the queue before it is processed by
Count of messages deleted from the SQS queue.
- Stability Level: STABLE

### `karpenter_interruption_missed_termination_total`
Count of instance terminations that were not notified via SQS interruption messages
- Stability Level: STABLE

## Cluster Metrics

### `karpenter_cluster_utilization_percent`
Expand Down