Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ Note that the external-resizer does not scale with more replicas. Only one exter

* `--leader-election-retry-period <duration>`: Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.

* `--node-deployment`: Enables deploying the external-resizer together with a CSI driver on nodes to manage node-local volumes. When enabled, each resizer instance only handles resize operations for PVs whose node affinity matches the local node. Requires the `NODE_NAME` environment variable to be set and is mutually exclusive with `--leader-election`. Off by default.

* `--timeout <duration>`: Timeout of all calls to CSI driver. It should be set to value that accommodates majority of `ControllerExpandVolume` calls. 10 seconds is used by default.

* `-kube-api-burst <int>` : Burst to use while communicating with the kubernetes apiserver. Defaults to 10. (default 10).
Expand Down Expand Up @@ -104,6 +106,18 @@ Note that the external-resizer does not scale with more replicas. Only one exter

* All glog / klog arguments are supported, such as `-v <log level>` or `-alsologtostderr`.

### Distributed resizing

For CSI drivers that manage node-local volumes (e.g., LVM, hostpath), it is common to deploy the driver as a DaemonSet with sidecars running on each node. The `--node-deployment` flag enables this model for the external-resizer: each instance only processes resize requests for volumes whose PV node affinity matches the node it runs on.

To use this feature:

1. Deploy the external-resizer as a sidecar in the DaemonSet alongside the CSI driver.
2. Set `--node-deployment=true` and do not enable `--leader-election`.
3. Set the `NODE_NAME` environment variable to the node name (e.g., via the downward API `spec.nodeName`).

This feature complements the `--node-deployment` flag available in external-provisioner and external-snapshotter.

### HTTP endpoint

The external-resizer optionally exposes an HTTP endpoint at address:port specified by `--http-endpoint` argument. When set, these two paths are exposed:
Expand Down
18 changes: 17 additions & 1 deletion cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ var (

handleVolumeInUseError = flag.Bool("handle-volume-inuse-error", true, "Flag to turn on/off capability to handle volume in use error in resizer controller. Defaults to true if not set.")

enableNodeDeployment = flag.Bool("node-deployment", false, "Enables deploying the external-resizer together with a CSI driver on nodes to manage node-local volumes. Requires NODE_NAME env var and is mutually exclusive with --leader-election.")

featureGates map[string]bool

version = "unknown"
Expand All @@ -101,6 +103,20 @@ func main() {
}
klog.InfoS("Version", "version", version)

var nodeName string
if *enableNodeDeployment {
if standardflags.Configuration.LeaderElection {
klog.ErrorS(nil, "Cannot use --node-deployment with --leader-election")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
nodeName = os.Getenv("NODE_NAME")
if nodeName == "" {
klog.ErrorS(nil, "NODE_NAME environment variable must be set when using --node-deployment")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
klog.InfoS("Running in node-deployment mode", "nodeName", nodeName)
}

if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" {
klog.ErrorS(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set.")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
Expand Down Expand Up @@ -237,7 +253,7 @@ func main() {
resizerName := csiResizer.Name()
rc = controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory,
workqueue.NewTypedItemExponentialFailureRateLimiter[string](*retryIntervalStart, *retryIntervalMax),
*handleVolumeInUseError, *retryIntervalMax)
*handleVolumeInUseError, *retryIntervalMax, nodeName)

leaseHolder = resizerName
}
Expand Down
39 changes: 38 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type resizeController struct {
// a cache to store PersistentVolumeClaim objects
claims cache.Store
handleVolumeInUseError bool

// nodeName is set when running in node-deployment mode. When non-empty,
// only PVs with nodeAffinity matching this node are processed.
nodeName string
}

// NewResizeController returns a ResizeController.
Expand All @@ -93,7 +97,8 @@ func NewResizeController(
informerFactory informers.SharedInformerFactory,
pvcRateLimiter workqueue.TypedRateLimiter[string],
handleVolumeInUseError bool,
maxRetryInterval time.Duration) ResizeController {
maxRetryInterval time.Duration,
nodeName string) ResizeController {
pvInformer := informerFactory.Core().V1().PersistentVolumes()
pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
eventBroadcaster := record.NewBroadcaster()
Expand Down Expand Up @@ -121,6 +126,7 @@ func NewResizeController(
finalErrorPVCs: sets.New[string](),
usedPVCs: newUsedPVCStore(),
handleVolumeInUseError: handleVolumeInUseError,
nodeName: nodeName,
}

// Add a resync period as the PVC's request size can be resized again when we handling
Expand Down Expand Up @@ -416,6 +422,11 @@ func (ctrl *resizeController) pvcNeedResize(pvc *v1.PersistentVolumeClaim) bool

// pvNeedResize returns true if a pv supports and also requests resize.
func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
if !ctrl.pvBelongsToNode(pv) {
klog.V(4).InfoS("PV does not belong to this node, skipping", "controller", ctrl.name, "PV", klog.KObj(pv), "nodeName", ctrl.nodeName)
return false
}

if !ctrl.resizer.CanSupport(pv, pvc) {
klog.V(4).InfoS("Resizer doesn't support PV", "controller", ctrl.name, "PV", klog.KObj(pv))
return false
Expand Down Expand Up @@ -448,6 +459,32 @@ func (ctrl *resizeController) pvNeedResize(pvc *v1.PersistentVolumeClaim, pv *v1
return true
}

// pvBelongsToNode returns true if the PV's nodeAffinity matches the controller's
// nodeName, or if node filtering is disabled (nodeName is empty). PVs without
// nodeAffinity are not processed by any instance when node filtering is enabled,
// as they are assumed to not be node-local volumes.
func (ctrl *resizeController) pvBelongsToNode(pv *v1.PersistentVolume) bool {
if ctrl.nodeName == "" {
return true
}
na := pv.Spec.NodeAffinity
if na == nil || na.Required == nil {
return false
}
for _, term := range na.Required.NodeSelectorTerms {
for _, expr := range term.MatchExpressions {
if expr.Operator == v1.NodeSelectorOpIn {
for _, v := range expr.Values {
if v == ctrl.nodeName {
return true
}
}
}
}
}
return false
}

// isNodeExpandComplete returns true if pvc.Status.Capacity >= pv.Spec.Capacity
func (ctrl *resizeController) isNodeExpandComplete(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) bool {
klog.V(4).InfoS("Capacity of pv and pvc", "PV", klog.KObj(pv), "pvCapacity", pv.Spec.Capacity[v1.ResourceStorage], "PVC", klog.KObj(pvc), "pvcCapacity", pvc.Status.Capacity[v1.ResourceStorage])
Expand Down
112 changes: 109 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ func TestController(t *testing.T) {
kubeClient, time.Second,
informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](),
!test.disableVolumeInUseErrorHandler,
2*time.Minute /* maxRetryInterval */)
2*time.Minute, /* maxRetryInterval */
"" /* nodeName */)

ctrlInstance, _ := controller.(*resizeController)

Expand Down Expand Up @@ -409,8 +410,9 @@ func TestResizePVC(t *testing.T) {
controller := NewResizeController(driverName, csiResizer,
kubeClient, time.Second,
informerFactory, workqueue.DefaultTypedControllerRateLimiter[string](),
true, /* disableVolumeInUseErrorHandler*/
2*time.Minute /* maxRetryInterval */)
true, /* disableVolumeInUseErrorHandler*/
2*time.Minute, /* maxRetryInterval */
"" /* nodeName */)

ctrlInstance, _ := controller.(*resizeController)

Expand Down Expand Up @@ -548,3 +550,107 @@ func fakeK8s(objs []runtime.Object) (kubernetes.Interface, informers.SharedInfor
informerFactory := informers.NewSharedInformerFactory(client, 0)
return client, informerFactory
}

func TestPvBelongsToNode(t *testing.T) {
pvWithAffinity := func(key, value string) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: "test-pv"},
Spec: v1.PersistentVolumeSpec{
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: key,
Operator: v1.NodeSelectorOpIn,
Values: []string{value},
},
},
},
},
},
},
},
}
}

pvNoAffinity := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: "test-pv"},
Spec: v1.PersistentVolumeSpec{},
}

tests := []struct {
name string
nodeName string
pv *v1.PersistentVolume
expected bool
}{
{
name: "node filtering disabled (empty nodeName)",
nodeName: "",
pv: pvWithAffinity("topology.hostpath.csi/node", "node-a"),
expected: true,
},
{
name: "matching node affinity",
nodeName: "node-a",
pv: pvWithAffinity("topology.hostpath.csi/node", "node-a"),
expected: true,
},
{
name: "non-matching node affinity",
nodeName: "node-b",
pv: pvWithAffinity("topology.hostpath.csi/node", "node-a"),
expected: false,
},
{
name: "PV without node affinity (not a node-local volume)",
nodeName: "node-a",
pv: pvNoAffinity,
expected: false,
},
{
name: "PV without node affinity, filtering disabled",
nodeName: "",
pv: pvNoAffinity,
expected: true,
},
{
name: "matching in second NodeSelectorTerm (OR semantics)",
nodeName: "node-a",
pv: &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{Name: "test-pv"},
Spec: v1.PersistentVolumeSpec{
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "topology.hostpath.csi/node", Operator: v1.NodeSelectorOpIn, Values: []string{"node-b"}},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{Key: "topology.hostpath.csi/node", Operator: v1.NodeSelectorOpIn, Values: []string{"node-a"}},
},
},
},
},
},
},
},
expected: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctrl := &resizeController{nodeName: tc.nodeName}
result := ctrl.pvBelongsToNode(tc.pv)
if result != tc.expected {
t.Errorf("pvBelongsToNode() = %v, expected %v", result, tc.expected)
}
})
}
}
5 changes: 5 additions & 0 deletions pkg/controller/expand_and_recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func (ctrl *resizeController) callResizeOnPlugin(

// checks if pv can be expanded
func (ctrl *resizeController) pvCanBeExpanded(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) bool {
if !ctrl.pvBelongsToNode(pv) {
klog.V(4).InfoS("PV does not belong to this node, skipping", "controller", ctrl.name, "PV", klog.KObj(pv), "nodeName", ctrl.nodeName)
return false
}

if !ctrl.resizer.CanSupport(pv, pvc) {
klog.V(4).InfoS("Resizer doesn't support PV", "controller", ctrl.name, "PV", klog.KObj(pv))
return false
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/expand_and_recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func TestExpandAndRecover(t *testing.T) {
controller := NewResizeController(driverName,
csiResizer, kubeClient,
time.Second, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string](), true /*handleVolumeInUseError*/, 2*time.Minute /*maxRetryInterval*/)
workqueue.DefaultTypedControllerRateLimiter[string](), true, /*handleVolumeInUseError*/
2*time.Minute, /*maxRetryInterval*/
"" /* nodeName */)

ctrlInstance, _ := controller.(*resizeController)
recorder := record.NewFakeRecorder(10)
Expand Down Expand Up @@ -253,7 +255,8 @@ func TestExpandAndRecoverConcurrent(t *testing.T) {
controller := NewResizeController(driverName,
csiResizer, kubeClient,
time.Second, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string](), true, 2*time.Minute)
workqueue.DefaultTypedControllerRateLimiter[string](), true,
2*time.Minute, "" /* nodeName */)

ctrlInstance := controller.(*resizeController)
ctrlInstance.eventRecorder = record.NewFakeRecorder(1000)
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/resize_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ func TestResizeFunctions(t *testing.T) {
csiResizer, kubeClient,
time.Second, informerFactory,
workqueue.DefaultTypedControllerRateLimiter[string](),
true, /*handleVolumeInUseError*/
2*time.Minute /*maxRetryInterval*/)
true, /*handleVolumeInUseError*/
2*time.Minute, /*maxRetryInterval*/
"" /* nodeName */)

ctrlInstance, _ := controller.(*resizeController)

Expand Down