From 7b250f5eafee94945ceb2ed1c95d76a0768f18b0 Mon Sep 17 00:00:00 2001 From: Marco Nenciarini Date: Sat, 11 Apr 2026 17:41:06 +0200 Subject: [PATCH] Add --node-deployment flag for distributed resizing For CSI drivers deployed as DaemonSets managing node-local volumes, there is no central controller pod where a centralized external-resizer can run. The provisioner and snapshotter sidecars already support --node-deployment for this deployment model. This adds the same capability to the external-resizer. When enabled, each instance only processes resize requests for PVs whose nodeAffinity matches the local node. The flag requires NODE_NAME env var and is mutually exclusive with --leader-election. The filtering is applied in the controller (pvNeedResize and pvCanBeExpanded), covering both Resizer implementations without changes to the Resizer interface. Signed-off-by: Marco Nenciarini --- README.md | 14 +++ cmd/csi-resizer/main.go | 18 +++- pkg/controller/controller.go | 39 +++++++- pkg/controller/controller_test.go | 112 +++++++++++++++++++++- pkg/controller/expand_and_recover.go | 5 + pkg/controller/expand_and_recover_test.go | 7 +- pkg/controller/resize_status_test.go | 5 +- 7 files changed, 191 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 63600b969..e05b967a9 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,8 @@ Note that the external-resizer does not scale with more replicas. Only one exter * `--leader-election-retry-period `: Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds. +* `--node-deployment`: Enables deploying the external-resizer together with a CSI driver on nodes to manage node-local volumes. When enabled, each resizer instance only handles resize operations for PVs whose node affinity matches the local node. Requires the `NODE_NAME` environment variable to be set and is mutually exclusive with `--leader-election`. Off by default. + * `--timeout `: Timeout of all calls to CSI driver. It should be set to value that accommodates majority of `ControllerExpandVolume` calls. 10 seconds is used by default. * `-kube-api-burst ` : Burst to use while communicating with the kubernetes apiserver. Defaults to 10. (default 10). @@ -104,6 +106,18 @@ Note that the external-resizer does not scale with more replicas. Only one exter * All glog / klog arguments are supported, such as `-v ` or `-alsologtostderr`. +### Distributed resizing + +For CSI drivers that manage node-local volumes (e.g., LVM, hostpath), it is common to deploy the driver as a DaemonSet with sidecars running on each node. The `--node-deployment` flag enables this model for the external-resizer: each instance only processes resize requests for volumes whose PV node affinity matches the node it runs on. + +To use this feature: + +1. Deploy the external-resizer as a sidecar in the DaemonSet alongside the CSI driver. +2. Set `--node-deployment=true` and do not enable `--leader-election`. +3. Set the `NODE_NAME` environment variable to the node name (e.g., via the downward API `spec.nodeName`). + +This feature complements the `--node-deployment` flag available in external-provisioner and external-snapshotter. + ### HTTP endpoint The external-resizer optionally exposes an HTTP endpoint at address:port specified by `--http-endpoint` argument. When set, these two paths are exposed: diff --git a/cmd/csi-resizer/main.go b/cmd/csi-resizer/main.go index 3307e689d..2090c0fef 100644 --- a/cmd/csi-resizer/main.go +++ b/cmd/csi-resizer/main.go @@ -75,6 +75,8 @@ var ( handleVolumeInUseError = flag.Bool("handle-volume-inuse-error", true, "Flag to turn on/off capability to handle volume in use error in resizer controller. Defaults to true if not set.") + enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the external-resizer together with a CSI driver on nodes to manage node-local volumes. Requires NODE_NAME env var and is mutually exclusive with --leader-election.") + featureGates map[string]bool version = "unknown" @@ -101,6 +103,20 @@ func main() { } klog.InfoS("Version", "version", version) + var nodeName string + if *enableNodeDeployment { + if standardflags.Configuration.LeaderElection { + klog.ErrorS(nil, "Cannot use --node-deployment with --leader-election") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + nodeName = os.Getenv("NODE_NAME") + if nodeName == "" { + klog.ErrorS(nil, "NODE_NAME environment variable must be set when using --node-deployment") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + klog.InfoS("Running in node-deployment mode", "nodeName", nodeName) + } + if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" { klog.ErrorS(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set.") klog.FlushAndExit(klog.ExitFlushTimeout, 1) @@ -237,7 +253,7 @@ func main() { resizerName := csiResizer.Name() rc = controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory, workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax), - *handleVolumeInUseError, *retryIntervalMax) + *handleVolumeInUseError, *retryIntervalMax, nodeName) leaseHolder = resizerName } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 45ac5866d..19a005807 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -82,6 +82,10 @@ type resizeController struct { // a cache to store PersistentVolumeClaim objects claims cache.Store handleVolumeInUseError bool + + // nodeName is set when running in node-deployment mode. When non-empty, + // only PVs with nodeAffinity matching this node are processed. + nodeName string } // NewResizeController returns a ResizeController. @@ -93,7 +97,8 @@ func NewResizeController( informerFactory informers.SharedInformerFactory, pvcRateLimiter workqueue.TypedRateLimiter[string], handleVolumeInUseError bool, - maxRetryInterval time.Duration) ResizeController { + maxRetryInterval time.Duration, + nodeName string) ResizeController { pvInformer := informerFactory.Core().V1().PersistentVolumes() pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims() eventBroadcaster := record.NewBroadcaster() @@ -121,6 +126,7 @@ func NewResizeController( finalErrorPVCs: sets.New[string](), usedPVCs: newUsedPVCStore(), handleVolumeInUseError: handleVolumeInUseError, + nodeName: nodeName, } // Add a resync period as the PVC's request size can be resized again when we handling @@ -416,6 +422,11 @@ func (ctrl *resizeController) pvcNeedResize(pvc *v1.PersistentVolumeClaim) bool // pvNeedResize returns true if a pv supports and also requests resize. func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { + if !ctrl.pvBelongsToNode(pv) { + klog.V(4).InfoS("PV does not belong to this node, skipping", "controller", ctrl.name, "PV", klog.KObj(pv), "nodeName", ctrl.nodeName) + return false + } + if !ctrl.resizer.CanSupport(pv, pvc) { klog.V(4).InfoS("Resizer doesn't support PV", "controller", ctrl.name, "PV", klog.KObj(pv)) return false @@ -448,6 +459,32 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1 return true } +// pvBelongsToNode returns true if the PV's nodeAffinity matches the controller's +// nodeName, or if node filtering is disabled (nodeName is empty). PVs without +// nodeAffinity are not processed by any instance when node filtering is enabled, +// as they are assumed to not be node-local volumes. +func (ctrl *resizeController) pvBelongsToNode(pv *v1.PersistentVolume) bool { + if ctrl.nodeName == "" { + return true + } + na := pv.Spec.NodeAffinity + if na == nil || na.Required == nil { + return false + } + for _, term := range na.Required.NodeSelectorTerms { + for _, expr := range term.MatchExpressions { + if expr.Operator == v1.NodeSelectorOpIn { + for _, v := range expr.Values { + if v == ctrl.nodeName { + return true + } + } + } + } + } + return false +} + // isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity func (ctrl *resizeController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool { klog.V(4).InfoS("Capacity of pv and pvc", "PV", klog.KObj(pv), "pvCapacity", pv.Spec.Capacity[v1.ResourceStorage], "PVC", klog.KObj(pvc), "pvcCapacity", pvc.Status.Capacity[v1.ResourceStorage]) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 0a79862c0..1cf8ac70d 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -247,7 +247,8 @@ func TestController(t *testing.T) { kubeClient, time.Second, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](), !test.disableVolumeInUseErrorHandler, - 2*time.Minute /* maxRetryInterval */) + 2*time.Minute, /* maxRetryInterval */ + "" /* nodeName */) ctrlInstance, _ := controller.(*resizeController) @@ -409,8 +410,9 @@ func TestResizePVC(t *testing.T) { controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](), - true, /* disableVolumeInUseErrorHandler*/ - 2*time.Minute /* maxRetryInterval */) + true, /* disableVolumeInUseErrorHandler*/ + 2*time.Minute, /* maxRetryInterval */ + "" /* nodeName */) ctrlInstance, _ := controller.(*resizeController) @@ -548,3 +550,107 @@ func fakeK8s(objs []runtime.Object) (kubernetes.Interface, informers.SharedInfor informerFactory := informers.NewSharedInformerFactory(client, 0) return client, informerFactory } + +func TestPvBelongsToNode(t *testing.T) { + pvWithAffinity := func(key, value string) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pv"}, + Spec: v1.PersistentVolumeSpec{ + NodeAffinity: &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: []string{value}, + }, + }, + }, + }, + }, + }, + }, + } + } + + pvNoAffinity := &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pv"}, + Spec: v1.PersistentVolumeSpec{}, + } + + tests := []struct { + name string + nodeName string + pv *v1.PersistentVolume + expected bool + }{ + { + name: "node filtering disabled (empty nodeName)", + nodeName: "", + pv: pvWithAffinity("topology.hostpath.csi/node", "node-a"), + expected: true, + }, + { + name: "matching node affinity", + nodeName: "node-a", + pv: pvWithAffinity("topology.hostpath.csi/node", "node-a"), + expected: true, + }, + { + name: "non-matching node affinity", + nodeName: "node-b", + pv: pvWithAffinity("topology.hostpath.csi/node", "node-a"), + expected: false, + }, + { + name: "PV without node affinity (not a node-local volume)", + nodeName: "node-a", + pv: pvNoAffinity, + expected: false, + }, + { + name: "PV without node affinity, filtering disabled", + nodeName: "", + pv: pvNoAffinity, + expected: true, + }, + { + name: "matching in second NodeSelectorTerm (OR semantics)", + nodeName: "node-a", + pv: &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pv"}, + Spec: v1.PersistentVolumeSpec{ + NodeAffinity: &v1.VolumeNodeAffinity{ + Required: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: "topology.hostpath.csi/node", Operator: v1.NodeSelectorOpIn, Values: []string{"node-b"}}, + }, + }, + { + MatchExpressions: []v1.NodeSelectorRequirement{ + {Key: "topology.hostpath.csi/node", Operator: v1.NodeSelectorOpIn, Values: []string{"node-a"}}, + }, + }, + }, + }, + }, + }, + }, + expected: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctrl := &resizeController{nodeName: tc.nodeName} + result := ctrl.pvBelongsToNode(tc.pv) + if result != tc.expected { + t.Errorf("pvBelongsToNode() = %v, expected %v", result, tc.expected) + } + }) + } +} diff --git a/pkg/controller/expand_and_recover.go b/pkg/controller/expand_and_recover.go index 589ee7eb7..8179ab2c8 100644 --- a/pkg/controller/expand_and_recover.go +++ b/pkg/controller/expand_and_recover.go @@ -291,6 +291,11 @@ func (ctrl *resizeController) callResizeOnPlugin( // checks if pv can be expanded func (ctrl *resizeController) pvCanBeExpanded(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) bool { + if !ctrl.pvBelongsToNode(pv) { + klog.V(4).InfoS("PV does not belong to this node, skipping", "controller", ctrl.name, "PV", klog.KObj(pv), "nodeName", ctrl.nodeName) + return false + } + if !ctrl.resizer.CanSupport(pv, pvc) { klog.V(4).InfoS("Resizer doesn't support PV", "controller", ctrl.name, "PV", klog.KObj(pv)) return false diff --git a/pkg/controller/expand_and_recover_test.go b/pkg/controller/expand_and_recover_test.go index 1ddef0f4d..40ad1f87a 100644 --- a/pkg/controller/expand_and_recover_test.go +++ b/pkg/controller/expand_and_recover_test.go @@ -182,7 +182,9 @@ func TestExpandAndRecover(t *testing.T) { controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string](), true /*handleVolumeInUseError*/, 2*time.Minute /*maxRetryInterval*/) + workqueue.DefaultTypedControllerRateLimiter[string](), true, /*handleVolumeInUseError*/ + 2*time.Minute, /*maxRetryInterval*/ + "" /* nodeName */) ctrlInstance, _ := controller.(*resizeController) recorder := record.NewFakeRecorder(10) @@ -253,7 +255,8 @@ func TestExpandAndRecoverConcurrent(t *testing.T) { controller := NewResizeController(driverName, csiResizer, kubeClient, time.Second, informerFactory, - workqueue.DefaultTypedControllerRateLimiter[string](), true, 2*time.Minute) + workqueue.DefaultTypedControllerRateLimiter[string](), true, + 2*time.Minute, "" /* nodeName */) ctrlInstance := controller.(*resizeController) ctrlInstance.eventRecorder = record.NewFakeRecorder(1000) diff --git a/pkg/controller/resize_status_test.go b/pkg/controller/resize_status_test.go index fe8609e32..196d00077 100644 --- a/pkg/controller/resize_status_test.go +++ b/pkg/controller/resize_status_test.go @@ -95,8 +95,9 @@ func TestResizeFunctions(t *testing.T) { csiResizer, kubeClient, time.Second, informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](), - true, /*handleVolumeInUseError*/ - 2*time.Minute /*maxRetryInterval*/) + true, /*handleVolumeInUseError*/ + 2*time.Minute, /*maxRetryInterval*/ + "" /* nodeName */) ctrlInstance, _ := controller.(*resizeController)