Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ KARPENTER_IAM_ROLE_ARN ?= arn:aws:iam::${AWS_ACCOUNT_ID}:role/${CLUSTER_NAME}-ka
HELM_OPTS ?= --set serviceAccount.annotations.eks\\.amazonaws\\.com/role-arn=${KARPENTER_IAM_ROLE_ARN} \
--set settings.clusterName=${CLUSTER_NAME} \
--set settings.interruptionQueue=${CLUSTER_NAME} \
--set settings.enableZonalShift=${ENABLE_ZONAL_SHIFT}\
--set controller.resources.requests.cpu=1 \
--set controller.resources.requests.memory=1Gi \
--set controller.resources.limits.cpu=1 \
Expand Down Expand Up @@ -60,6 +61,7 @@ run: ## Run Karpenter controller binary against your local cluster with latest C
DISABLE_LEADER_ELECTION=true \
CLUSTER_NAME=${CLUSTER_NAME} \
INTERRUPTION_QUEUE=${CLUSTER_NAME} \
ENABLE_ZONAL_SHIFT=true \
FEATURE_GATES="SpotToSpotConsolidation=true,NodeOverlay=true,StaticCapacity=true" \
LOG_LEVEL="debug" \
go run ./cmd/controller/main.go
Expand Down
2 changes: 2 additions & 0 deletions charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ settings:
# Interruption handling is disabled if not specified. Enabling interruption handling may
# require additional permissions on the controller service account. Additional permissions are outlined in the docs.
interruptionQueue: ""
# -- Marking this true signals EKS to onboard the cluster to Zonal Shift.
enableZonalShift: false
# -- Reserved ENIs are not included in the calculations for max-pods or kube-reserved.
# This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.
reservedENIs: "0"
Expand Down
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func main() {
op.InstanceTypesProvider,
op.CapacityReservationProvider,
op.AMIResolver,
op.ZonalShiftProvider,
)...).
Start(ctx)
}
2 changes: 2 additions & 0 deletions hack/docs/instancetypes_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"

"github.com/patrickmn/go-cache"
"github.com/samber/lo"
Expand Down Expand Up @@ -147,6 +148,7 @@ below are the resources available with some assumptions and after the instance o
instancetype.NewDefaultResolver(
region,
),
arczonalshift.NewNoopProvider(),
)
if err = instanceTypeProvider.UpdateInstanceTypes(ctx); err != nil {
log.Fatalf("updating instance types, %s", err)
Expand Down
2 changes: 2 additions & 0 deletions hack/tools/launchtemplate_counter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -74,6 +75,7 @@ func main() {
instancetype.NewDefaultResolver(
region,
),
arczonalshift.NewNoopProvider(),
)
if err := instanceTypeProvider.UpdateInstanceTypes(ctx); err != nil {
log.Fatalf("updating instance types, %s", err)
Expand Down
1 change: 1 addition & 0 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func main() {
op.InstanceTypesProvider,
op.CapacityReservationProvider,
op.AMIResolver,
op.ZonalShiftProvider,
)...).
Start(ctx)
wg.Wait()
Expand Down
34 changes: 34 additions & 0 deletions kwok/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/arczonalshift"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/eks"
"github.com/aws/aws-sdk-go-v2/service/iam"
Expand All @@ -33,6 +34,8 @@ import (
"github.com/awslabs/operatorpkg/option"
"sigs.k8s.io/controller-runtime/pkg/manager"

arczonalshiftProvider "github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
arczonalshiftProvider "github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
ZonalShiftProvider arconalshiftprovider.Provider

We don't use camel case in imports


"github.com/aws/smithy-go"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"
Expand Down Expand Up @@ -94,6 +97,7 @@ type Operator struct {
SSMProvider ssmp.Provider
CapacityReservationProvider capacityreservation.Provider
EC2API *kwokec2.Client
ZonalShiftProvider arczonalshiftProvider.Provider
}

func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) {
Expand All @@ -118,6 +122,25 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
} else {
log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
}
var zsProvider arczonalshiftProvider.Provider
if options.FromContext(ctx).EnableZonalShift {
input := eks.DescribeClusterInput{Name: &options.FromContext(ctx).ClusterName}
output, _ := eksapi.DescribeCluster(ctx, &input)
clusterArn := output.Cluster.Arn
arczonalshiftAPI := arczonalshift.NewFromConfig(cfg)
inputGMR := arczonalshift.GetManagedResourceInput{ResourceIdentifier: clusterArn}
_, getManagedResourceErr := arczonalshiftAPI.GetManagedResource(ctx, &inputGMR)
if getManagedResourceErr != nil {
// Resource is not found/registered in Zonal Shift. Log a message and use the NoopProvider so we don't block starting up.
log.FromContext(ctx).WithValues("Cluster", clusterArn).V(1).Info("Cluster not found in Zonal Shift")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error not Info

zsProvider = arczonalshiftProvider.NewNoopProvider()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably panic, I don't think we should fail with just a log

} else {
azMap := GetAvailablityZoneMapping(ctx, ec2api)
zsProvider = arczonalshiftProvider.NewProvider(arczonalshiftAPI, operator.Clock, *clusterArn, azMap)
}
} else {
zsProvider = arczonalshiftProvider.NewNoopProvider()
}
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)
validationCache := cache.New(awscache.ValidationTTL, awscache.DefaultCleanupInterval)
Expand Down Expand Up @@ -175,6 +198,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
capacityReservationProvider,
unavailableOfferingsCache,
instancetype.NewDefaultResolver(cfg.Region),
zsProvider,
)
// Ensure we're able to hydrate instance types before starting any reliant controllers.
// Instance type updates are hydrated asynchronously after this by controllers.
Expand Down Expand Up @@ -216,6 +240,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
SSMProvider: ssmProvider,
CapacityReservationProvider: capacityReservationProvider,
EC2API: ec2api,
ZonalShiftProvider: zsProvider,
}
}

Expand Down Expand Up @@ -311,3 +336,12 @@ func SetupIndexers(ctx context.Context, mgr manager.Manager) {
return []string{id}
}), "failed to setup node instanceID indexer")
}

func GetAvailablityZoneMapping(ctx context.Context, ec2Api sdk.EC2API) map[string]string {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to exist, we have the mapping from subnets already, more below

azMap := make(map[string]string)
output, _ := ec2Api.DescribeAvailabilityZones(ctx, &ec2.DescribeAvailabilityZonesInput{})
for _, az := range output.AvailabilityZones {
azMap[*az.ZoneName] = *az.ZoneId
}
return azMap
}
7 changes: 7 additions & 0 deletions pkg/aws/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package sdk
import (
"context"

"github.com/aws/aws-sdk-go-v2/service/arczonalshift"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/eks"
"github.com/aws/aws-sdk-go-v2/service/iam"
Expand All @@ -27,6 +28,7 @@ import (
)

type EC2API interface {
DescribeAvailabilityZones(context.Context, *ec2.DescribeAvailabilityZonesInput, ...func(*ec2.Options)) (*ec2.DescribeAvailabilityZonesOutput, error)
DescribeCapacityReservations(context.Context, *ec2.DescribeCapacityReservationsInput, ...func(*ec2.Options)) (*ec2.DescribeCapacityReservationsOutput, error)
DescribeImages(context.Context, *ec2.DescribeImagesInput, ...func(*ec2.Options)) (*ec2.DescribeImagesOutput, error)
DescribeLaunchTemplates(context.Context, *ec2.DescribeLaunchTemplatesInput, ...func(*ec2.Options)) (*ec2.DescribeLaunchTemplatesOutput, error)
Expand Down Expand Up @@ -72,6 +74,11 @@ type SQSAPI interface {
SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
}

type ARCZonalShiftAPI interface {
ListZonalShifts(context.Context, *arczonalshift.ListZonalShiftsInput, ...func(*arczonalshift.Options)) (*arczonalshift.ListZonalShiftsOutput, error)
GetManagedResource(context.Context, *arczonalshift.GetManagedResourceInput, ...func(*arczonalshift.Options)) (*arczonalshift.GetManagedResourceOutput, error)
}

type TimestreamWriteAPI interface {
WriteRecords(ctx context.Context, params *timestreamwrite.WriteRecordsInput, optFns ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error)
}
55 changes: 55 additions & 0 deletions pkg/controllers/arczonalshift/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package arczonalshift

import (
"context"
"fmt"
"time"

"github.com/awslabs/operatorpkg/reconciler"
"github.com/awslabs/operatorpkg/singleton"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/karpenter/pkg/operator/injection"

"github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
)

type Controller struct {
arczonalshiftProvider arczonalshift.Provider
}

func NewController(
arczonalshiftProvider arczonalshift.Provider,
) *Controller {
return &Controller{
arczonalshiftProvider: arczonalshiftProvider,
}
}

func (c *Controller) Reconcile(ctx context.Context) (reconciler.Result, error) {
ctx = injection.WithControllerName(ctx, "zonalshift")
err := c.arczonalshiftProvider.UpdateZonalShifts(ctx)
if err != nil {
return reconciler.Result{}, fmt.Errorf("updating zonal shifts: %w", err)
}

return reconciler.Result{RequeueAfter: 30 * time.Second}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).Named("zonalshift").WatchesRawSource(singleton.Source()).Complete(singleton.AsReconciler(c))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can put these . calls on new lines

}
5 changes: 5 additions & 0 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/karpenter/pkg/cloudprovider"

arczonalshiftcontroller "github.com/aws/karpenter-provider-aws/pkg/controllers/arczonalshift"

"github.com/aws/aws-sdk-go-v2/aws"
servicesqs "github.com/aws/aws-sdk-go-v2/service/sqs"

Expand Down Expand Up @@ -54,6 +56,7 @@ import (
nodeclassgarbagecollection "github.com/aws/karpenter-provider-aws/pkg/controllers/nodeclass/garbagecollection"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
"github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
"github.com/aws/karpenter-provider-aws/pkg/providers/instance"
"github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancetype"
Expand Down Expand Up @@ -87,6 +90,7 @@ func NewControllers(
instanceTypeProvider *instancetype.DefaultProvider,
capacityReservationProvider capacityreservationprovider.Provider,
amiResolver amifamily.Resolver,
zonalshiftProvider arczonalshift.Provider,
) []controller.Controller {
controllers := []controller.Controller{
nodeclasshash.NewController(kubeClient),
Expand All @@ -102,6 +106,7 @@ func NewControllers(
crcapacitytype.NewController(kubeClient, cloudProvider),
crexpiration.NewController(clk, kubeClient, cloudProvider, capacityReservationProvider),
metrics.NewController(kubeClient, cloudProvider),
arczonalshiftcontroller.NewController(zonalshiftProvider),
}
// Instance profile garbage collection requires IAM API access. Skip registering the controller when running
// in isolated VPC mode to avoid initiating calls to public AWS endpoints that won’t be reachable.
Expand Down
33 changes: 33 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
awsmiddleware "github.com/aws/aws-sdk-go-v2/aws/middleware"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/arczonalshift"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/eks"
"github.com/aws/aws-sdk-go-v2/service/iam"
Expand Down Expand Up @@ -54,6 +55,7 @@ import (
awscache "github.com/aws/karpenter-provider-aws/pkg/cache"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/providers/amifamily"
arczonalshiftProvider "github.com/aws/karpenter-provider-aws/pkg/providers/arczonalshift"
"github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation"
"github.com/aws/karpenter-provider-aws/pkg/providers/instance"
"github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile"
Expand Down Expand Up @@ -92,6 +94,7 @@ type Operator struct {
SSMProvider ssmp.Provider
CapacityReservationProvider capacityreservation.Provider
EC2API *ec2.Client
ZonalShiftProvider arczonalshiftProvider.Provider
}

func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) {
Expand Down Expand Up @@ -127,6 +130,25 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
} else {
log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
}
var zsProvider arczonalshiftProvider.Provider
if options.FromContext(ctx).EnableZonalShift {
inputzs := eks.DescribeClusterInput{Name: &options.FromContext(ctx).ClusterName}
outputzs, _ := eksapi.DescribeCluster(ctx, &inputzs)
clusterArn := outputzs.Cluster.Arn
arczonalshiftAPI := arczonalshift.NewFromConfig(cfg)
inputGMR := arczonalshift.GetManagedResourceInput{ResourceIdentifier: clusterArn}
_, getManagedResourceErr := arczonalshiftAPI.GetManagedResource(ctx, &inputGMR)
if getManagedResourceErr != nil {
// Resource is not found/registered in Zonal Shift. Log a message and use the NoopProvider so we don't block starting up.
log.FromContext(ctx).WithValues("Cluster", clusterArn).V(1).Info("Cluster not found in Zonal Shift")
zsProvider = arczonalshiftProvider.NewNoopProvider()
} else {
azMap := GetAvailablityZoneMapping(ctx, ec2api)
zsProvider = arczonalshiftProvider.NewProvider(arczonalshiftAPI, operator.Clock, *clusterArn, azMap)
}
} else {
zsProvider = arczonalshiftProvider.NewNoopProvider()
}
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)
validationCache := cache.New(awscache.ValidationTTL, awscache.DefaultCleanupInterval)
Expand Down Expand Up @@ -184,6 +206,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
capacityReservationProvider,
unavailableOfferingsCache,
instancetype.NewDefaultResolver(cfg.Region),
zsProvider,
)
// Ensure we're able to hydrate instance types before starting any reliant controllers.
// Instance type updates are hydrated asynchronously after this by controllers.
Expand Down Expand Up @@ -225,6 +248,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
SSMProvider: ssmProvider,
CapacityReservationProvider: capacityReservationProvider,
EC2API: ec2api,
ZonalShiftProvider: zsProvider,
}
}

Expand Down Expand Up @@ -320,3 +344,12 @@ func SetupIndexers(ctx context.Context, mgr manager.Manager) {
return []string{id}
}), "failed to setup node instanceID indexer")
}

func GetAvailablityZoneMapping(ctx context.Context, ec2Api sdk.EC2API) map[string]string {
azMap := make(map[string]string)
output, _ := ec2Api.DescribeAvailabilityZones(ctx, &ec2.DescribeAvailabilityZonesInput{})
for _, az := range output.AvailabilityZones {
azMap[*az.ZoneName] = *az.ZoneId
}
return azMap
}
2 changes: 2 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Options struct {
InterruptionQueue string
ReservedENIs int
DisableDryRun bool
EnableZonalShift bool
}

func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
Expand All @@ -55,6 +56,7 @@ func (o *Options) AddFlags(fs *coreoptions.FlagSet) {
fs.StringVar(&o.InterruptionQueue, "interruption-queue", env.WithDefaultString("INTERRUPTION_QUEUE", ""), "Interruption queue is the name of the SQS queue used for processing interruption events from EC2. Interruption handling is disabled if not specified. Enabling interruption handling may require additional permissions on the controller service account. Additional permissions are outlined in the docs.")
fs.IntVar(&o.ReservedENIs, "reserved-enis", env.WithDefaultInt("RESERVED_ENIS", 0), "Reserved ENIs are not included in the calculations for max-pods or kube-reserved. This is most often used in the VPC CNI custom networking setup https://docs.aws.amazon.com/eks/latest/userguide/cni-custom-network.html.")
fs.BoolVarWithEnv(&o.DisableDryRun, "disable-dry-run", "DISABLE_DRY_RUN", false, "If true, then disable dry run validation for EC2NodeClasses.")
fs.BoolVarWithEnv(&o.EnableZonalShift, "enable-zonal-shift", "ENABLE_ZONAL_SHIFT", false, "If true, then enable zonal shifting feature.")
}

func (o *Options) Parse(fs *coreoptions.FlagSet, args ...string) error {
Expand Down
Loading