From 1fd8769b9301a9f18e17d9eeba54afc795e078df Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Fri, 20 Mar 2026 17:40:14 +0100 Subject: [PATCH 1/6] fix: gpu count sanitizing --- .../operators/clients/inventory/client.go | 6 +- .../clients/inventory/client_test.go | 221 ++++++++++++++++++ .../operators/clients/inventory/inventory.go | 110 +++++++-- operator/inventory/node-discovery.go | 24 +- operator/inventory/node-discovery_test.go | 19 ++ 5 files changed, 349 insertions(+), 31 deletions(-) create mode 100644 operator/inventory/node-discovery_test.go diff --git a/cluster/kube/operators/clients/inventory/client.go b/cluster/kube/operators/clients/inventory/client.go index 0b8e8ee0..52f5684a 100644 --- a/cluster/kube/operators/clients/inventory/client.go +++ b/cluster/kube/operators/clients/inventory/client.go @@ -15,6 +15,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" + "github.com/go-logr/logr" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" "pkg.akt.dev/go/util/ctxlog" @@ -46,6 +47,7 @@ type client struct { type inventory struct { inventoryV1.Cluster + log logr.Logger } type inventoryState struct { @@ -219,13 +221,13 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In case inv := <-in: pending = append(pending, inv) if och == nil { - msg = newInventory(pending[0]) + msg = newInventory(cl.ctx, pending[0]) och = out } case och <- msg: pending = pending[1:] if len(pending) > 0 { - msg = newInventory(pending[0]) + msg = newInventory(cl.ctx, pending[0]) } else { och = nil msg = nil diff --git a/cluster/kube/operators/clients/inventory/client_test.go b/cluster/kube/operators/clients/inventory/client_test.go index b94d10db..4a1cfb8b 100644 --- a/cluster/kube/operators/clients/inventory/client_test.go +++ b/cluster/kube/operators/clients/inventory/client_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/go-logr/logr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -211,6 +212,7 @@ func makeInventoryScaffold(t *testing.T) *inventoryScaffold { kc := kfake.NewClientset() ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc)) ctx = context.WithValue(ctx, fromctx.CtxKeyInventoryUnderTest, true) + ctx = logr.NewContext(ctx, logr.Discard()) gSrv := setupInventoryGRPC(ctx, group, ports[0]) @@ -315,6 +317,225 @@ func TestInventoryZero(t *testing.T) { require.Len(t, inv.Metrics().Nodes, 0) } +func TestInventoryMetrics_bad_gpu_values_no_overflow(t *testing.T) { + scaffold := makeInventoryScaffold(t) + cl, err := NewClient(scaffold.ctx) + require.NoError(t, err) + require.NotNil(t, cl) + + scaffold.gInv.invch <- inventoryV1.Cluster{ + Nodes: inventoryV1.Nodes{ + inventoryV1.Node{ + Name: "bad-gpu-node", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(0, -1, 0, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + }, + }, + } + + inv := waitForInventory(t, cl.ResultChan()) + require.NotNil(t, inv) + + metrics := inv.Metrics() + require.Len(t, metrics.Nodes, 1) + assert.Equal(t, uint64(0), metrics.Nodes[0].Available.GPU, "negative allocatable must yield 0, not uint64 overflow") + assert.Equal(t, uint64(0), metrics.TotalAvailable.GPU) +} + +func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) { + tests := []struct { + name string + cluster inventoryV1.Cluster + check func(t *testing.T, m inventoryV1.Metrics) + }{ + { + name: "negative_allocatable_cpu", + cluster: inventoryV1.Cluster{ + Nodes: inventoryV1.Nodes{ + inventoryV1.Node{ + Name: "bad-cpu-node", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(1000, -1, 0, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + }, + }, + }, + check: func(t *testing.T, m inventoryV1.Metrics) { + require.Len(t, m.Nodes, 1) + assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.CPU) + assert.Equal(t, uint64(0), m.TotalAllocatable.CPU) + }, + }, + { + name: "negative_allocatable_memory", + cluster: inventoryV1.Cluster{ + Nodes: inventoryV1.Nodes{ + inventoryV1.Node{ + Name: "bad-memory-node", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(1024, -1, 0, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + }, + }, + }, + check: func(t *testing.T, m inventoryV1.Metrics) { + require.Len(t, m.Nodes, 1) + assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.Memory) + assert.Equal(t, uint64(0), m.TotalAllocatable.Memory) + }, + }, + { + name: "negative_allocatable_storage_ephemeral", + cluster: inventoryV1.Cluster{ + Nodes: inventoryV1.Nodes{ + inventoryV1.Node{ + Name: "bad-storage-node", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(100, -1, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + }, + }, + }, + check: func(t *testing.T, m inventoryV1.Metrics) { + require.Len(t, m.Nodes, 1) + assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.StorageEphemeral) + assert.Equal(t, uint64(0), m.TotalAllocatable.StorageEphemeral) + }, + }, + { + name: "negative_allocatable_storage_class", + cluster: inventoryV1.Cluster{ + Nodes: inventoryV1.Nodes{ + inventoryV1.Node{ + Name: "node", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{ + StorageClasses: []string{"default"}, + }, + }, + }, + Storage: inventoryV1.ClusterStorage{ + { + Quantity: inventoryV1.NewResourcePair(0, -1, 0, resource.DecimalSI), + Info: inventoryV1.StorageInfo{Class: "default"}, + }, + }, + }, + check: func(t *testing.T, m inventoryV1.Metrics) { + assert.Equal(t, uint64(0), m.TotalAllocatable.Storage["default"]) + }, + }, + { + name: "allocated_exceeds_allocatable_gpu", + cluster: inventoryV1.Cluster{ + Nodes: inventoryV1.Nodes{ + inventoryV1.Node{ + Name: "overalloc-node", + Resources: inventoryV1.NodeResources{ + CPU: inventoryV1.CPU{ + Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), + }, + Memory: inventoryV1.Memory{ + Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), + }, + GPU: inventoryV1.GPU{ + Quantity: inventoryV1.NewResourcePair(4, 4, 10, resource.DecimalSI), + }, + EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), + VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), + }, + Capabilities: inventoryV1.NodeCapabilities{}, + }, + }, + }, + check: func(t *testing.T, m inventoryV1.Metrics) { + require.Len(t, m.Nodes, 1) + assert.Equal(t, uint64(0), m.Nodes[0].Available.GPU, "allocated>allocatable must yield 0 available") + assert.Equal(t, uint64(4), m.Nodes[0].Allocatable.GPU) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scaffold := makeInventoryScaffold(t) + cl, err := NewClient(scaffold.ctx) + require.NoError(t, err) + require.NotNil(t, cl) + + scaffold.gInv.invch <- tt.cluster + + inv := waitForInventory(t, cl.ResultChan()) + require.NotNil(t, inv) + + tt.check(t, inv.Metrics()) + }) + } +} + func TestInventorySingleNodeNoPods(t *testing.T) { const expectedCPU = 13 const expectedMemory = 14 diff --git a/cluster/kube/operators/clients/inventory/inventory.go b/cluster/kube/operators/clients/inventory/inventory.go index 2b415bc2..24569b8a 100644 --- a/cluster/kube/operators/clients/inventory/inventory.go +++ b/cluster/kube/operators/clients/inventory/inventory.go @@ -1,10 +1,15 @@ package inventory import ( + "context" + "errors" "fmt" "reflect" "strings" + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/resource" + inventoryV1 "pkg.akt.dev/go/inventory/v1" dvbeta "pkg.akt.dev/go/node/deployment/v1beta4" attrtypes "pkg.akt.dev/go/node/types/attributes/v1" @@ -14,24 +19,67 @@ import ( ctypes "github.com/akash-network/provider/cluster/types/v1beta3" cinventory "github.com/akash-network/provider/cluster/types/v1beta3/clients/inventory" crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2" + "github.com/akash-network/provider/tools/fromctx" +) + +var ( + errNegativeQuantityClamped = errors.New("inventory resource quantity was negative (device plugin or allocation bug), clamped to 0 to prevent uint64 overflow") +) + +const ( + stageAllocatableCPU = "allocatable_cpu" + stageAllocatableGPU = "allocatable_gpu" + stageAllocatableMemory = "allocatable_memory" + stageAllocatableStorage = "allocatable_storage" + stageAllocatableStorageEphemeral = "allocatable_storage_ephemeral" + stageAvailableCPU = "available_cpu" + stageAvailableGPU = "available_gpu" + stageAvailableMemory = "available_memory" + stageAvailableStorage = "available_storage" + stageAvailableStorageEphemeral = "available_storage_ephemeral" ) var _ ctypes.Inventory = (*inventory)(nil) -func newInventory(clState inventoryV1.Cluster) *inventory { - inv := &inventory{ - Cluster: clState, +func safeQuantityToUint64(log logr.Logger, q *resource.Quantity, stage string, keysAndValues ...interface{}) uint64 { + if q == nil { + return 0 + } + v := q.Value() + if v < 0 { + kvs := append([]interface{}{"stage", stage, "value", v}, keysAndValues...) + log.Error(errNegativeQuantityClamped, "negative quantity clamped", kvs...) + return 0 + } + return uint64(v) +} + +func safeMilliQuantityToUint64(log logr.Logger, q *resource.Quantity, stage string, keysAndValues ...interface{}) uint64 { + if q == nil { + return 0 + } + v := q.MilliValue() + if v < 0 { + kvs := append([]interface{}{"stage", stage, "milliValue", v}, keysAndValues...) + log.Error(errNegativeQuantityClamped, "negative quantity clamped", kvs...) + return 0 } + return uint64(v) +} - return inv +func newInventory(ctx context.Context, clState inventoryV1.Cluster) *inventory { + log := fromctx.LogrFromCtx(ctx).WithName("inventory") + return &inventory{ + Cluster: clState, + log: log, + } } func (inv *inventory) dup() inventory { - dup := inventory{ + return inventory{ Cluster: *inv.Cluster.Dup(), + log: inv.log, } - - return dup } func (inv *inventory) Dup() ctypes.Inventory { @@ -332,47 +380,63 @@ func (inv *inventory) Metrics() inventoryV1.Metrics { Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.Nodes)), } + log := inv.log for _, nd := range inv.Nodes { + ndLog := log.WithValues("node", nd.Name) + gpuAllocatable := safeQuantityToUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable, stageAllocatableGPU) invNode := inventoryV1.NodeMetrics{ Name: nd.Name, Allocatable: inventoryV1.ResourcesMetric{ - CPU: uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()), // nolint: gosec - GPU: uint64(nd.Resources.GPU.Quantity.Allocatable.Value()), // nolint: gosec - Memory: uint64(nd.Resources.Memory.Quantity.Allocatable.Value()), // nolint: gosec - StorageEphemeral: uint64(nd.Resources.EphemeralStorage.Allocatable.Value()), // nolint: gosec + CPU: safeMilliQuantityToUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, stageAllocatableCPU), + GPU: gpuAllocatable, + Memory: safeQuantityToUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, stageAllocatableMemory), + StorageEphemeral: safeQuantityToUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, stageAllocatableStorageEphemeral), }, } - cpuTotal += uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()) // nolint: gosec - gpuTotal += uint64(nd.Resources.GPU.Quantity.Allocatable.Value()) // nolint: gosec - memoryTotal += uint64(nd.Resources.Memory.Quantity.Allocatable.Value()) // nolint: gosec - storageEphemeralTotal += uint64(nd.Resources.EphemeralStorage.Allocatable.Value()) // nolint: gosec + cpuTotal += safeMilliQuantityToUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, stageAllocatableCPU) + gpuTotal += gpuAllocatable + memoryTotal += safeQuantityToUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, stageAllocatableMemory) + storageEphemeralTotal += safeQuantityToUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, stageAllocatableStorageEphemeral) avail := nd.Resources.CPU.Quantity.Available() - invNode.Available.CPU = uint64(avail.MilliValue()) // nolint: gosec + invNode.Available.CPU = safeMilliQuantityToUint64(ndLog, avail, stageAvailableCPU) cpuAvailable += invNode.Available.CPU avail = nd.Resources.GPU.Quantity.Available() - invNode.Available.GPU = uint64(avail.Value()) // nolint: gosec + if nd.Resources.GPU.Quantity.Allocatable != nil && nd.Resources.GPU.Quantity.Allocatable.Value() < 0 { + ndLog.Error(errNegativeQuantityClamped, stageAvailableGPU+" clamped: allocatable negative (device plugin)", "allocatable", nd.Resources.GPU.Quantity.Allocatable.Value()) + invNode.Available.GPU = 0 + } else { + allocatableVal := int64(0) + allocatedVal := int64(0) + if nd.Resources.GPU.Quantity.Allocatable != nil { + allocatableVal = nd.Resources.GPU.Quantity.Allocatable.Value() + } + if nd.Resources.GPU.Quantity.Allocated != nil { + allocatedVal = nd.Resources.GPU.Quantity.Allocated.Value() + } + invNode.Available.GPU = safeQuantityToUint64(ndLog, avail, stageAvailableGPU, "allocatable", allocatableVal, "allocated", allocatedVal) + } gpuAvailable += invNode.Available.GPU avail = nd.Resources.Memory.Quantity.Available() - invNode.Available.Memory = uint64(avail.Value()) // nolint: gosec + invNode.Available.Memory = safeQuantityToUint64(ndLog, avail, stageAvailableMemory) memoryAvailable += invNode.Available.Memory avail = nd.Resources.EphemeralStorage.Available() - invNode.Available.StorageEphemeral = uint64(avail.Value()) // nolint: gosec + invNode.Available.StorageEphemeral = safeQuantityToUint64(ndLog, avail, stageAvailableStorageEphemeral) storageEphemeralAvailable += invNode.Available.StorageEphemeral ret.Nodes = append(ret.Nodes, invNode) } for _, class := range inv.Storage { - tmp := class.Quantity.Allocatable.DeepCopy() - storageTotal[class.Info.Class] = uint64(tmp.Value()) //nolint: gosec + classLog := log.WithValues("storageClass", class.Info.Class) + storageTotal[class.Info.Class] = safeQuantityToUint64(classLog, class.Quantity.Allocatable, stageAllocatableStorage) - tmp = *class.Quantity.Available() - storageAvailable[class.Info.Class] = uint64(tmp.Value()) //nolint: gosec + tmp := class.Quantity.Available() + storageAvailable[class.Info.Class] = safeQuantityToUint64(classLog, tmp, stageAvailableStorage) } ret.TotalAllocatable = inventoryV1.MetricTotal{ diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index 140082fb..eee55414 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/go-logr/logr" "github.com/jaypipes/ghw/pkg/cpu" "github.com/jaypipes/ghw/pkg/gpu" "github.com/jaypipes/ghw/pkg/memory" @@ -32,11 +33,20 @@ import ( ) var ( - errWorkerExit = errors.New("worker finished") + errWorkerExit = errors.New("worker finished") + errInvalidGPUQuantity = errors.New("invalid GPU quantity from node, clamping to 0") labelNvidiaComGPUPresent = fmt.Sprintf("%s.present", builder.ResourceGPUNvidia) ) +func sanitizeGPUQuantity(log logr.Logger, nodeName, resourceName string, val int64) int64 { + if val < 0 { + log.Error(errInvalidGPUQuantity, "invalid GPU quantity from node, clamping to 0", "node", nodeName, "resource", resourceName, "value", val) + return 0 + } + return val +} + type k8sPatch struct { Op string `json:"op"` Path string `json:"path"` @@ -522,7 +532,7 @@ func (dp *nodeDiscovery) monitor() error { switch evt.Type { case watch.Modified: if nodeAllocatableChanged(knode, obj) { - updateNodeInfo(obj, &node) + updateNodeInfo(ctx, obj, &node) if err = restartPodsWatcher(); err != nil { return err } @@ -659,12 +669,14 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors, knode *corev1. }, } - updateNodeInfo(knode, &res) + updateNodeInfo(dp.ctx, knode, &res) return res } -func updateNodeInfo(knode *corev1.Node, node *v1.Node) { +func updateNodeInfo(ctx context.Context, knode *corev1.Node, node *v1.Node) { + log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + for name, r := range knode.Status.Allocatable { switch name { case corev1.ResourceCPU: @@ -676,7 +688,7 @@ func updateNodeInfo(knode *corev1.Node, node *v1.Node) { case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Allocatable.Set(r.Value()) + node.Resources.GPU.Quantity.Allocatable.Set(sanitizeGPUQuantity(log, knode.Name, string(name), r.Value())) } } @@ -691,7 +703,7 @@ func updateNodeInfo(knode *corev1.Node, node *v1.Node) { case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Capacity.Set(r.Value()) + node.Resources.GPU.Quantity.Capacity.Set(sanitizeGPUQuantity(log, knode.Name, string(name), r.Value())) } } } diff --git a/operator/inventory/node-discovery_test.go b/operator/inventory/node-discovery_test.go new file mode 100644 index 00000000..d4c47236 --- /dev/null +++ b/operator/inventory/node-discovery_test.go @@ -0,0 +1,19 @@ +package inventory + +import ( + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" +) + +func TestSanitizeGPUQuantity(t *testing.T) { + log := logr.Discard() + + // positive values should be returned as is + assert.Equal(t, int64(10000), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", 10000)) + + // negative values should be clamped to 0 + assert.Equal(t, int64(0), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", -1)) + assert.Equal(t, int64(0), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", -1000)) +} From 1a01ea0280207d5cd20e576e764743727546ed30 Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Fri, 20 Mar 2026 19:14:18 +0100 Subject: [PATCH 2/6] fix: handle watch.Added event --- operator/inventory/node-discovery.go | 79 ++++++++++++----------- operator/inventory/node-discovery_test.go | 16 ++++- 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index eee55414..265e898d 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -33,8 +33,9 @@ import ( ) var ( - errWorkerExit = errors.New("worker finished") - errInvalidGPUQuantity = errors.New("invalid GPU quantity from node, clamping to 0") + errWorkerExit = errors.New("worker finished") + errInvalidGPUQuantity = errors.New("invalid GPU quantity from node, clamping to 0") + errAllocatedUnderflowClamp = errors.New("allocated underflow (event reorder or duplicate delete), clamped to 0") labelNvidiaComGPUPresent = fmt.Sprintf("%s.present", builder.ResourceGPUNvidia) ) @@ -351,6 +352,10 @@ initloop: } } +func podKey(pod *corev1.Pod) string { + return pod.Namespace + "/" + pod.Name +} + func isPodAllocated(status corev1.PodStatus) bool { for _, condition := range status.Conditions { if condition.Type == corev1.PodScheduled { @@ -429,10 +434,8 @@ func (dp *nodeDiscovery) monitor() error { restartPodsWatcher := func() error { if podsWatch != nil { - select { - case <-podsWatch.ResultChan(): - default: - } + podsWatch.Stop() + podsWatch = nil } var terr error @@ -465,7 +468,7 @@ func (dp *nodeDiscovery) monitor() error { addPodAllocatedResources(&node, pod) - currPods[pod.Name] = *pod + currPods[podKey(pod)] = *pod } return nil @@ -528,21 +531,22 @@ func (dp *nodeDiscovery) monitor() error { evt := rEvt.(watch.Event) switch obj := evt.Object.(type) { case *corev1.Node: - if obj.Name == dp.name { - switch evt.Type { - case watch.Modified: - if nodeAllocatableChanged(knode, obj) { - updateNodeInfo(ctx, obj, &node) - if err = restartPodsWatcher(); err != nil { - return err - } + if obj.Name != dp.name { + continue + } + switch evt.Type { + case watch.Added: + fallthrough + case watch.Modified: + if evt.Type == watch.Added || (knode != nil && nodeAllocatableChanged(knode, obj)) { + updateNodeInfo(ctx, obj, &node) + if err = restartPodsWatcher(); err != nil { + return err } - - signalLabels() } - - knode = obj.DeepCopy() + signalLabels() } + knode = obj.DeepCopy() } case res, isopen := <-podsWatch.ResultChan(): if !isopen { @@ -555,24 +559,25 @@ func (dp *nodeDiscovery) monitor() error { } obj := res.Object.(*corev1.Pod) + key := podKey(obj) switch res.Type { case watch.Added: fallthrough case watch.Modified: - if _, exists := currPods[obj.Name]; !exists && isPodAllocated(obj.Status) { - currPods[obj.Name] = *obj.DeepCopy() + if _, exists := currPods[key]; !exists && isPodAllocated(obj.Status) { + currPods[key] = *obj.DeepCopy() addPodAllocatedResources(&node, obj) } case watch.Deleted: - pod, exists := currPods[obj.Name] + pod, exists := currPods[key] if !exists { log.Info("received pod delete event for item that does not exist. check node inventory logic, it's might have bug in it!") break } - subPodAllocatedResources(&node, &pod) + subPodAllocatedResources(log, &node, &pod) - delete(currPods, obj.Name) + delete(currPods, key) } signalState() case <-statech: @@ -745,29 +750,31 @@ func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { } } -func subAllocatedNLZ(allocated *resource.Quantity, val resource.Quantity) { - newVal := allocated.Value() - val.Value() - if newVal < 0 { - newVal = 0 +func subAllocatedNLZ(log logr.Logger, nodeName, resourceName string, allocated *resource.Quantity, val resource.Quantity) { + before := allocated.DeepCopy() + allocated.Sub(val) + zero := resource.NewQuantity(0, resource.DecimalSI) + if allocated.Cmp(*zero) < 0 { + log.Error(errAllocatedUnderflowClamp, "allocated underflow clamped", "node", nodeName, "resource", resourceName, "allocated_before", before.String(), "subtracted", val.String()) + allocated.Set(0) } - - allocated.Set(newVal) } -func subPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { +func subPodAllocatedResources(log logr.Logger, node *v1.Node, pod *corev1.Pod) { + nodeName := node.Name for _, container := range pod.Spec.Containers { for name, quantity := range container.Resources.Requests { switch name { case corev1.ResourceCPU: - subAllocatedNLZ(node.Resources.CPU.Quantity.Allocated, quantity) + subAllocatedNLZ(log, nodeName, string(corev1.ResourceCPU), node.Resources.CPU.Quantity.Allocated, quantity) case corev1.ResourceMemory: - subAllocatedNLZ(node.Resources.Memory.Quantity.Allocated, quantity) + subAllocatedNLZ(log, nodeName, string(corev1.ResourceMemory), node.Resources.Memory.Quantity.Allocated, quantity) case corev1.ResourceEphemeralStorage: - subAllocatedNLZ(node.Resources.EphemeralStorage.Allocated, quantity) + subAllocatedNLZ(log, nodeName, string(corev1.ResourceEphemeralStorage), node.Resources.EphemeralStorage.Allocated, quantity) case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - subAllocatedNLZ(node.Resources.GPU.Quantity.Allocated, quantity) + subAllocatedNLZ(log, nodeName, string(name), node.Resources.GPU.Quantity.Allocated, quantity) } } @@ -776,7 +783,7 @@ func subPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { continue } - subAllocatedNLZ(node.Resources.Memory.Quantity.Allocated, *vol.EmptyDir.SizeLimit) + subAllocatedNLZ(log, nodeName, "memory.emptydir", node.Resources.Memory.Quantity.Allocated, *vol.EmptyDir.SizeLimit) } } } diff --git a/operator/inventory/node-discovery_test.go b/operator/inventory/node-discovery_test.go index d4c47236..d9e68cf9 100644 --- a/operator/inventory/node-discovery_test.go +++ b/operator/inventory/node-discovery_test.go @@ -5,15 +5,25 @@ import ( "github.com/go-logr/logr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/resource" ) func TestSanitizeGPUQuantity(t *testing.T) { log := logr.Discard() - // positive values should be returned as is assert.Equal(t, int64(10000), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", 10000)) - - // negative values should be clamped to 0 assert.Equal(t, int64(0), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", -1)) assert.Equal(t, int64(0), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", -1000)) } + +func TestSubAllocatedNLZ_underflow_clamped(t *testing.T) { + log := logr.Discard() + + allocated := resource.NewQuantity(2, resource.DecimalSI) + val := resource.NewQuantity(5, resource.DecimalSI) + + subAllocatedNLZ(log, "node1", "nvidia.com/gpu", allocated, *val) + + require.Equal(t, int64(0), allocated.Value(), "allocated underflow must be clamped to 0") +} From 69a7bf0b4abdc932fb8dd44acdbe3ea647c2c538 Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Mon, 23 Mar 2026 12:44:19 +0100 Subject: [PATCH 3/6] chore: simplified code --- .../operators/clients/inventory/inventory.go | 93 ++++++------------- operator/inventory/node-discovery.go | 36 ++++--- operator/inventory/node-discovery_test.go | 9 +- 3 files changed, 58 insertions(+), 80 deletions(-) diff --git a/cluster/kube/operators/clients/inventory/inventory.go b/cluster/kube/operators/clients/inventory/inventory.go index 24569b8a..b003540f 100644 --- a/cluster/kube/operators/clients/inventory/inventory.go +++ b/cluster/kube/operators/clients/inventory/inventory.go @@ -23,48 +23,29 @@ import ( ) var ( - errNegativeQuantityClamped = errors.New("inventory resource quantity was negative (device plugin or allocation bug), clamped to 0 to prevent uint64 overflow") -) + _ ctypes.Inventory = (*inventory)(nil) -const ( - stageAllocatableCPU = "allocatable_cpu" - stageAllocatableGPU = "allocatable_gpu" - stageAllocatableMemory = "allocatable_memory" - stageAllocatableStorage = "allocatable_storage" - stageAllocatableStorageEphemeral = "allocatable_storage_ephemeral" - stageAvailableCPU = "available_cpu" - stageAvailableGPU = "available_gpu" - stageAvailableMemory = "available_memory" - stageAvailableStorage = "available_storage" - stageAvailableStorageEphemeral = "available_storage_ephemeral" + errNegativeQuantity = errors.New("negative resource quantity clamped to 0") + quantityZero = resource.NewQuantity(0, resource.DecimalSI) ) -var _ ctypes.Inventory = (*inventory)(nil) - -func safeQuantityToUint64(log logr.Logger, q *resource.Quantity, stage string, keysAndValues ...interface{}) uint64 { - if q == nil { - return 0 - } - v := q.Value() +func clampUint64(log logr.Logger, v int64, what string) uint64 { if v < 0 { - kvs := append([]interface{}{"stage", stage, "value", v}, keysAndValues...) - log.Error(errNegativeQuantityClamped, "negative quantity clamped", kvs...) + log.Error(errNegativeQuantity, "clamped to 0", "what", what, "value", v) return 0 } return uint64(v) } -func safeMilliQuantityToUint64(log logr.Logger, q *resource.Quantity, stage string, keysAndValues ...interface{}) uint64 { - if q == nil { - return 0 - } - v := q.MilliValue() - if v < 0 { - kvs := append([]interface{}{"stage", stage, "milliValue", v}, keysAndValues...) - log.Error(errNegativeQuantityClamped, "negative quantity clamped", kvs...) +// clampAvailableUint64 guards against ResourcePair.Available() returning MaxInt64 +// when allocatable is -1 (the "unlimited" sentinel). If allocatable is negative +// for any reason, the Available() result is unreliable. +func clampAvailableUint64(log logr.Logger, allocatable *resource.Quantity, v int64, what string) uint64 { + if allocatable.Cmp(*quantityZero) < 0 { + log.Error(errNegativeQuantity, "available clamped: negative allocatable", "what", what) return 0 } - return uint64(v) + return clampUint64(log, v, what) } func newInventory(ctx context.Context, clState inventoryV1.Cluster) *inventory { @@ -383,49 +364,37 @@ func (inv *inventory) Metrics() inventoryV1.Metrics { log := inv.log for _, nd := range inv.Nodes { ndLog := log.WithValues("node", nd.Name) - gpuAllocatable := safeQuantityToUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable, stageAllocatableGPU) + + cpuAllocatable := clampUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable.MilliValue(), "allocatable_cpu") + gpuAllocatable := clampUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable.Value(), "allocatable_gpu") + memoryAllocatable := clampUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable.Value(), "allocatable_memory") + storageEphAllocatable := clampUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable.Value(), "allocatable_storage_ephemeral") + invNode := inventoryV1.NodeMetrics{ Name: nd.Name, Allocatable: inventoryV1.ResourcesMetric{ - CPU: safeMilliQuantityToUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, stageAllocatableCPU), + CPU: cpuAllocatable, GPU: gpuAllocatable, - Memory: safeQuantityToUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, stageAllocatableMemory), - StorageEphemeral: safeQuantityToUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, stageAllocatableStorageEphemeral), + Memory: memoryAllocatable, + StorageEphemeral: storageEphAllocatable, }, } - cpuTotal += safeMilliQuantityToUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, stageAllocatableCPU) + cpuTotal += cpuAllocatable gpuTotal += gpuAllocatable - memoryTotal += safeQuantityToUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, stageAllocatableMemory) - storageEphemeralTotal += safeQuantityToUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, stageAllocatableStorageEphemeral) + memoryTotal += memoryAllocatable + storageEphemeralTotal += storageEphAllocatable - avail := nd.Resources.CPU.Quantity.Available() - invNode.Available.CPU = safeMilliQuantityToUint64(ndLog, avail, stageAvailableCPU) + invNode.Available.CPU = clampAvailableUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, nd.Resources.CPU.Quantity.Available().MilliValue(), "available_cpu") cpuAvailable += invNode.Available.CPU - avail = nd.Resources.GPU.Quantity.Available() - if nd.Resources.GPU.Quantity.Allocatable != nil && nd.Resources.GPU.Quantity.Allocatable.Value() < 0 { - ndLog.Error(errNegativeQuantityClamped, stageAvailableGPU+" clamped: allocatable negative (device plugin)", "allocatable", nd.Resources.GPU.Quantity.Allocatable.Value()) - invNode.Available.GPU = 0 - } else { - allocatableVal := int64(0) - allocatedVal := int64(0) - if nd.Resources.GPU.Quantity.Allocatable != nil { - allocatableVal = nd.Resources.GPU.Quantity.Allocatable.Value() - } - if nd.Resources.GPU.Quantity.Allocated != nil { - allocatedVal = nd.Resources.GPU.Quantity.Allocated.Value() - } - invNode.Available.GPU = safeQuantityToUint64(ndLog, avail, stageAvailableGPU, "allocatable", allocatableVal, "allocated", allocatedVal) - } + invNode.Available.GPU = clampAvailableUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable, nd.Resources.GPU.Quantity.Available().Value(), "available_gpu") gpuAvailable += invNode.Available.GPU - avail = nd.Resources.Memory.Quantity.Available() - invNode.Available.Memory = safeQuantityToUint64(ndLog, avail, stageAvailableMemory) + invNode.Available.Memory = clampAvailableUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, nd.Resources.Memory.Quantity.Available().Value(), "available_memory") memoryAvailable += invNode.Available.Memory - avail = nd.Resources.EphemeralStorage.Available() - invNode.Available.StorageEphemeral = safeQuantityToUint64(ndLog, avail, stageAvailableStorageEphemeral) + invNode.Available.StorageEphemeral = clampAvailableUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, nd.Resources.EphemeralStorage.Available().Value(), "available_storage_ephemeral") storageEphemeralAvailable += invNode.Available.StorageEphemeral ret.Nodes = append(ret.Nodes, invNode) @@ -433,10 +402,8 @@ func (inv *inventory) Metrics() inventoryV1.Metrics { for _, class := range inv.Storage { classLog := log.WithValues("storageClass", class.Info.Class) - storageTotal[class.Info.Class] = safeQuantityToUint64(classLog, class.Quantity.Allocatable, stageAllocatableStorage) - - tmp := class.Quantity.Available() - storageAvailable[class.Info.Class] = safeQuantityToUint64(classLog, tmp, stageAvailableStorage) + storageTotal[class.Info.Class] = clampUint64(classLog, class.Quantity.Allocatable.Value(), "allocatable_storage") + storageAvailable[class.Info.Class] = clampAvailableUint64(classLog, class.Quantity.Allocatable, class.Quantity.Available().Value(), "available_storage") } ret.TotalAllocatable = inventoryV1.MetricTotal{ diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index 265e898d..799b3395 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -34,15 +34,18 @@ import ( var ( errWorkerExit = errors.New("worker finished") - errInvalidGPUQuantity = errors.New("invalid GPU quantity from node, clamping to 0") + errInvalidResourceQuantity = errors.New("invalid resource quantity from node, clamping to 0") errAllocatedUnderflowClamp = errors.New("allocated underflow (event reorder or duplicate delete), clamped to 0") + errGPUExceedsCapacity = errors.New("GPU allocatable exceeds capacity") + + quantityZero = resource.NewQuantity(0, resource.DecimalSI) labelNvidiaComGPUPresent = fmt.Sprintf("%s.present", builder.ResourceGPUNvidia) ) -func sanitizeGPUQuantity(log logr.Logger, nodeName, resourceName string, val int64) int64 { +func sanitizeResourceQuantity(log logr.Logger, nodeName, resourceName string, val int64) int64 { if val < 0 { - log.Error(errInvalidGPUQuantity, "invalid GPU quantity from node, clamping to 0", "node", nodeName, "resource", resourceName, "value", val) + log.Error(errInvalidResourceQuantity, "clamping to 0", "node", nodeName, "resource", resourceName, "value", val) return 0 } return val @@ -685,32 +688,40 @@ func updateNodeInfo(ctx context.Context, knode *corev1.Node, node *v1.Node) { for name, r := range knode.Status.Allocatable { switch name { case corev1.ResourceCPU: - node.Resources.CPU.Quantity.Allocatable.SetMilli(r.MilliValue()) + node.Resources.CPU.Quantity.Allocatable.SetMilli(sanitizeResourceQuantity(log, knode.Name, string(name), r.MilliValue())) case corev1.ResourceMemory: - node.Resources.Memory.Quantity.Allocatable.Set(r.Value()) + node.Resources.Memory.Quantity.Allocatable.Set(sanitizeResourceQuantity(log, knode.Name, string(name), r.Value())) case corev1.ResourceEphemeralStorage: - node.Resources.EphemeralStorage.Allocatable.Set(r.Value()) + node.Resources.EphemeralStorage.Allocatable.Set(sanitizeResourceQuantity(log, knode.Name, string(name), r.Value())) case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Allocatable.Set(sanitizeGPUQuantity(log, knode.Name, string(name), r.Value())) + node.Resources.GPU.Quantity.Allocatable.Set(sanitizeResourceQuantity(log, knode.Name, string(name), r.Value())) } } for name, r := range knode.Status.Capacity { switch name { case corev1.ResourceCPU: - node.Resources.CPU.Quantity.Capacity.SetMilli(r.MilliValue()) + node.Resources.CPU.Quantity.Capacity.SetMilli(sanitizeResourceQuantity(log, knode.Name, string(name), r.MilliValue())) case corev1.ResourceMemory: - node.Resources.Memory.Quantity.Capacity.Set(r.Value()) + node.Resources.Memory.Quantity.Capacity.Set(sanitizeResourceQuantity(log, knode.Name, string(name), r.Value())) case corev1.ResourceEphemeralStorage: - node.Resources.EphemeralStorage.Capacity.Set(r.Value()) + node.Resources.EphemeralStorage.Capacity.Set(sanitizeResourceQuantity(log, knode.Name, string(name), r.Value())) case builder.ResourceGPUNvidia: fallthrough case builder.ResourceGPUAMD: - node.Resources.GPU.Quantity.Capacity.Set(sanitizeGPUQuantity(log, knode.Name, string(name), r.Value())) + node.Resources.GPU.Quantity.Capacity.Set(sanitizeResourceQuantity(log, knode.Name, string(name), r.Value())) } } + + gpuAllocatable := node.Resources.GPU.Quantity.Allocatable.Value() + gpuCapacity := node.Resources.GPU.Quantity.Capacity.Value() + if gpuCapacity > 0 && gpuAllocatable > gpuCapacity { + log.Error(errGPUExceedsCapacity, "clamping allocatable to capacity", + "node", knode.Name, "allocatable", gpuAllocatable, "capacity", gpuCapacity) + node.Resources.GPU.Quantity.Allocatable.Set(gpuCapacity) + } } func nodeResetAllocated(node *v1.Node) { @@ -753,8 +764,7 @@ func addPodAllocatedResources(node *v1.Node, pod *corev1.Pod) { func subAllocatedNLZ(log logr.Logger, nodeName, resourceName string, allocated *resource.Quantity, val resource.Quantity) { before := allocated.DeepCopy() allocated.Sub(val) - zero := resource.NewQuantity(0, resource.DecimalSI) - if allocated.Cmp(*zero) < 0 { + if allocated.Cmp(*quantityZero) < 0 { log.Error(errAllocatedUnderflowClamp, "allocated underflow clamped", "node", nodeName, "resource", resourceName, "allocated_before", before.String(), "subtracted", val.String()) allocated.Set(0) } diff --git a/operator/inventory/node-discovery_test.go b/operator/inventory/node-discovery_test.go index d9e68cf9..eb841595 100644 --- a/operator/inventory/node-discovery_test.go +++ b/operator/inventory/node-discovery_test.go @@ -9,12 +9,13 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) -func TestSanitizeGPUQuantity(t *testing.T) { +func TestSanitizeResourceQuantity(t *testing.T) { log := logr.Discard() - assert.Equal(t, int64(10000), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", 10000)) - assert.Equal(t, int64(0), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", -1)) - assert.Equal(t, int64(0), sanitizeGPUQuantity(log, "node1", "nvidia.com/gpu", -1000)) + assert.Equal(t, int64(10000), sanitizeResourceQuantity(log, "node1", "nvidia.com/gpu", 10000)) + assert.Equal(t, int64(0), sanitizeResourceQuantity(log, "node1", "nvidia.com/gpu", -1)) + assert.Equal(t, int64(0), sanitizeResourceQuantity(log, "node1", "memory", -1000)) + assert.Equal(t, int64(0), sanitizeResourceQuantity(log, "node1", "cpu", 0)) } func TestSubAllocatedNLZ_underflow_clamped(t *testing.T) { From f578883f691d48a63f4e5085ee6cfd25c207be0f Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Mon, 23 Mar 2026 18:54:33 +0100 Subject: [PATCH 4/6] chore: logger --- cluster/kube/operators/clients/inventory/client.go | 3 +-- cluster/kube/operators/clients/inventory/inventory.go | 7 +++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/cluster/kube/operators/clients/inventory/client.go b/cluster/kube/operators/clients/inventory/client.go index 52f5684a..9cc8f38f 100644 --- a/cluster/kube/operators/clients/inventory/client.go +++ b/cluster/kube/operators/clients/inventory/client.go @@ -15,7 +15,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" - "github.com/go-logr/logr" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" "pkg.akt.dev/go/util/ctxlog" @@ -47,7 +46,7 @@ type client struct { type inventory struct { inventoryV1.Cluster - log logr.Logger + ctx context.Context } type inventoryState struct { diff --git a/cluster/kube/operators/clients/inventory/inventory.go b/cluster/kube/operators/clients/inventory/inventory.go index b003540f..8b8e5733 100644 --- a/cluster/kube/operators/clients/inventory/inventory.go +++ b/cluster/kube/operators/clients/inventory/inventory.go @@ -49,17 +49,16 @@ func clampAvailableUint64(log logr.Logger, allocatable *resource.Quantity, v int } func newInventory(ctx context.Context, clState inventoryV1.Cluster) *inventory { - log := fromctx.LogrFromCtx(ctx).WithName("inventory") return &inventory{ Cluster: clState, - log: log, + ctx: ctx, } } func (inv *inventory) dup() inventory { return inventory{ Cluster: *inv.Cluster.Dup(), - log: inv.log, + ctx: inv.ctx, } } @@ -361,7 +360,7 @@ func (inv *inventory) Metrics() inventoryV1.Metrics { Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.Nodes)), } - log := inv.log + log := fromctx.LogrFromCtx(inv.ctx).WithName("inventory.metrics") for _, nd := range inv.Nodes { ndLog := log.WithValues("node", nd.Name) From 25ecd45e684b208fc6e9bd4c671ad21e6f33fc8f Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Wed, 25 Mar 2026 14:29:46 +0100 Subject: [PATCH 5/6] chore: reduced boilerplate code in tests --- .../clients/inventory/client_test.go | 165 ++++-------------- .../operators/clients/inventory/inventory.go | 11 +- 2 files changed, 40 insertions(+), 136 deletions(-) diff --git a/cluster/kube/operators/clients/inventory/client_test.go b/cluster/kube/operators/clients/inventory/client_test.go index 4a1cfb8b..6fb93d37 100644 --- a/cluster/kube/operators/clients/inventory/client_test.go +++ b/cluster/kube/operators/clients/inventory/client_test.go @@ -317,16 +317,11 @@ func TestInventoryZero(t *testing.T) { require.Len(t, inv.Metrics().Nodes, 0) } -func TestInventoryMetrics_bad_gpu_values_no_overflow(t *testing.T) { - scaffold := makeInventoryScaffold(t) - cl, err := NewClient(scaffold.ctx) - require.NoError(t, err) - require.NotNil(t, cl) - - scaffold.gInv.invch <- inventoryV1.Cluster{ +func makeBaseCluster() inventoryV1.Cluster { + return inventoryV1.Cluster{ Nodes: inventoryV1.Nodes{ inventoryV1.Node{ - Name: "bad-gpu-node", + Name: "node", Resources: inventoryV1.NodeResources{ CPU: inventoryV1.CPU{ Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), @@ -335,7 +330,7 @@ func TestInventoryMetrics_bad_gpu_values_no_overflow(t *testing.T) { Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), }, GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, -1, 0, resource.DecimalSI), + Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), }, EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), @@ -345,45 +340,29 @@ func TestInventoryMetrics_bad_gpu_values_no_overflow(t *testing.T) { }, }, } - - inv := waitForInventory(t, cl.ResultChan()) - require.NotNil(t, inv) - - metrics := inv.Metrics() - require.Len(t, metrics.Nodes, 1) - assert.Equal(t, uint64(0), metrics.Nodes[0].Available.GPU, "negative allocatable must yield 0, not uint64 overflow") - assert.Equal(t, uint64(0), metrics.TotalAvailable.GPU) } -func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) { +func TestInventoryMetrics_negative_quantities_clamped(t *testing.T) { tests := []struct { - name string - cluster inventoryV1.Cluster - check func(t *testing.T, m inventoryV1.Metrics) + name string + mutate func(c *inventoryV1.Cluster) + check func(t *testing.T, m inventoryV1.Metrics) }{ + { + name: "negative_allocatable_gpu", + mutate: func(c *inventoryV1.Cluster) { + c.Nodes[0].Resources.GPU.Quantity.Allocatable.Set(-1) + }, + check: func(t *testing.T, m inventoryV1.Metrics) { + require.Len(t, m.Nodes, 1) + assert.Equal(t, uint64(0), m.Nodes[0].Available.GPU, "negative allocatable must yield 0") + assert.Equal(t, uint64(0), m.TotalAvailable.GPU) + }, + }, { name: "negative_allocatable_cpu", - cluster: inventoryV1.Cluster{ - Nodes: inventoryV1.Nodes{ - inventoryV1.Node{ - Name: "bad-cpu-node", - Resources: inventoryV1.NodeResources{ - CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(1000, -1, 0, resource.DecimalSI), - }, - Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), - }, - GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - Capabilities: inventoryV1.NodeCapabilities{}, - }, - }, + mutate: func(c *inventoryV1.Cluster) { + c.Nodes[0].Resources.CPU.Quantity.Allocatable.SetMilli(-1) }, check: func(t *testing.T, m inventoryV1.Metrics) { require.Len(t, m.Nodes, 1) @@ -393,27 +372,8 @@ func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) }, { name: "negative_allocatable_memory", - cluster: inventoryV1.Cluster{ - Nodes: inventoryV1.Nodes{ - inventoryV1.Node{ - Name: "bad-memory-node", - Resources: inventoryV1.NodeResources{ - CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), - }, - Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(1024, -1, 0, resource.DecimalSI), - }, - GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - Capabilities: inventoryV1.NodeCapabilities{}, - }, - }, + mutate: func(c *inventoryV1.Cluster) { + c.Nodes[0].Resources.Memory.Quantity.Allocatable.Set(-1) }, check: func(t *testing.T, m inventoryV1.Metrics) { require.Len(t, m.Nodes, 1) @@ -423,27 +383,8 @@ func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) }, { name: "negative_allocatable_storage_ephemeral", - cluster: inventoryV1.Cluster{ - Nodes: inventoryV1.Nodes{ - inventoryV1.Node{ - Name: "bad-storage-node", - Resources: inventoryV1.NodeResources{ - CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), - }, - Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), - }, - GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - EphemeralStorage: inventoryV1.NewResourcePair(100, -1, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - Capabilities: inventoryV1.NodeCapabilities{}, - }, - }, + mutate: func(c *inventoryV1.Cluster) { + c.Nodes[0].Resources.EphemeralStorage.Allocatable.Set(-1) }, check: func(t *testing.T, m inventoryV1.Metrics) { require.Len(t, m.Nodes, 1) @@ -453,35 +394,14 @@ func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) }, { name: "negative_allocatable_storage_class", - cluster: inventoryV1.Cluster{ - Nodes: inventoryV1.Nodes{ - inventoryV1.Node{ - Name: "node", - Resources: inventoryV1.NodeResources{ - CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), - }, - Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), - }, - GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - Capabilities: inventoryV1.NodeCapabilities{ - StorageClasses: []string{"default"}, - }, - }, - }, - Storage: inventoryV1.ClusterStorage{ + mutate: func(c *inventoryV1.Cluster) { + c.Nodes[0].Capabilities.StorageClasses = []string{"default"} + c.Storage = inventoryV1.ClusterStorage{ { Quantity: inventoryV1.NewResourcePair(0, -1, 0, resource.DecimalSI), Info: inventoryV1.StorageInfo{Class: "default"}, }, - }, + } }, check: func(t *testing.T, m inventoryV1.Metrics) { assert.Equal(t, uint64(0), m.TotalAllocatable.Storage["default"]) @@ -489,27 +409,8 @@ func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) }, { name: "allocated_exceeds_allocatable_gpu", - cluster: inventoryV1.Cluster{ - Nodes: inventoryV1.Nodes{ - inventoryV1.Node{ - Name: "overalloc-node", - Resources: inventoryV1.NodeResources{ - CPU: inventoryV1.CPU{ - Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI), - }, - Memory: inventoryV1.Memory{ - Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI), - }, - GPU: inventoryV1.GPU{ - Quantity: inventoryV1.NewResourcePair(4, 4, 10, resource.DecimalSI), - }, - EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI), - VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI), - }, - Capabilities: inventoryV1.NodeCapabilities{}, - }, - }, + mutate: func(c *inventoryV1.Cluster) { + c.Nodes[0].Resources.GPU.Quantity = inventoryV1.NewResourcePair(4, 4, 10, resource.DecimalSI) }, check: func(t *testing.T, m inventoryV1.Metrics) { require.Len(t, m.Nodes, 1) @@ -526,7 +427,9 @@ func TestInventoryMetrics_negative_quantities_clamped_no_overflow(t *testing.T) require.NoError(t, err) require.NotNil(t, cl) - scaffold.gInv.invch <- tt.cluster + cluster := makeBaseCluster() + tt.mutate(&cluster) + scaffold.gInv.invch <- cluster inv := waitForInventory(t, cl.ResultChan()) require.NotNil(t, inv) diff --git a/cluster/kube/operators/clients/inventory/inventory.go b/cluster/kube/operators/clients/inventory/inventory.go index 8b8e5733..717d0dc6 100644 --- a/cluster/kube/operators/clients/inventory/inventory.go +++ b/cluster/kube/operators/clients/inventory/inventory.go @@ -25,7 +25,7 @@ import ( var ( _ ctypes.Inventory = (*inventory)(nil) - errNegativeQuantity = errors.New("negative resource quantity clamped to 0") + errNegativeQuantity = errors.New("negative resource quantity") quantityZero = resource.NewQuantity(0, resource.DecimalSI) ) @@ -37,12 +37,13 @@ func clampUint64(log logr.Logger, v int64, what string) uint64 { return uint64(v) } -// clampAvailableUint64 guards against ResourcePair.Available() returning MaxInt64 -// when allocatable is -1 (the "unlimited" sentinel). If allocatable is negative -// for any reason, the Available() result is unreliable. +// clampAvailableUint64 is a defense-in-depth guard for ResourcePair.Available(). +// Available() treats allocatable == -1 as an "unlimited" sentinel and returns MaxInt64. +// Normal code path sanitizes inputs so -1 never reaches here, but if it did +// (e.g. skipped sanitization), the uint64 result would be catastrophically wrong. func clampAvailableUint64(log logr.Logger, allocatable *resource.Quantity, v int64, what string) uint64 { if allocatable.Cmp(*quantityZero) < 0 { - log.Error(errNegativeQuantity, "available clamped: negative allocatable", "what", what) + log.Error(errNegativeQuantity, "allocatable is negative, available clamped to 0", "what", what, "allocatable", allocatable.String()) return 0 } return clampUint64(log, v, what) From 824e56535ae55831147a6bb8de82d921fd297a1f Mon Sep 17 00:00:00 2001 From: Artem Shcherbatiuk Date: Fri, 27 Mar 2026 13:57:03 +0100 Subject: [PATCH 6/6] fix: clamp stale resources on node update and tighten GPU capacity guard --- .../kube/operators/clients/inventory/client_test.go | 4 +++- operator/inventory/node-discovery.go | 13 ++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/cluster/kube/operators/clients/inventory/client_test.go b/cluster/kube/operators/clients/inventory/client_test.go index 6fb93d37..4ec84b46 100644 --- a/cluster/kube/operators/clients/inventory/client_test.go +++ b/cluster/kube/operators/clients/inventory/client_test.go @@ -404,7 +404,9 @@ func TestInventoryMetrics_negative_quantities_clamped(t *testing.T) { } }, check: func(t *testing.T, m inventoryV1.Metrics) { - assert.Equal(t, uint64(0), m.TotalAllocatable.Storage["default"]) + v, exists := m.TotalAllocatable.Storage["default"] + require.True(t, exists, "storage class key must be present") + assert.Equal(t, uint64(0), v) }, }, { diff --git a/operator/inventory/node-discovery.go b/operator/inventory/node-discovery.go index 799b3395..8b004237 100644 --- a/operator/inventory/node-discovery.go +++ b/operator/inventory/node-discovery.go @@ -685,6 +685,17 @@ func (dp *nodeDiscovery) initNodeInfo(gpusIDs RegistryGPUVendors, knode *corev1. func updateNodeInfo(ctx context.Context, knode *corev1.Node, node *v1.Node) { log := fromctx.LogrFromCtx(ctx).WithName("node.monitor") + // Reset all fields before repopulating from knode so that resources + // removed between updates (e.g. GPU device plugin unregistered) don't leave stale values. + node.Resources.CPU.Quantity.Allocatable.SetMilli(0) + node.Resources.CPU.Quantity.Capacity.SetMilli(0) + node.Resources.Memory.Quantity.Allocatable.Set(0) + node.Resources.Memory.Quantity.Capacity.Set(0) + node.Resources.EphemeralStorage.Allocatable.Set(0) + node.Resources.EphemeralStorage.Capacity.Set(0) + node.Resources.GPU.Quantity.Allocatable.Set(0) + node.Resources.GPU.Quantity.Capacity.Set(0) + for name, r := range knode.Status.Allocatable { switch name { case corev1.ResourceCPU: @@ -717,7 +728,7 @@ func updateNodeInfo(ctx context.Context, knode *corev1.Node, node *v1.Node) { gpuAllocatable := node.Resources.GPU.Quantity.Allocatable.Value() gpuCapacity := node.Resources.GPU.Quantity.Capacity.Value() - if gpuCapacity > 0 && gpuAllocatable > gpuCapacity { + if gpuAllocatable > gpuCapacity { log.Error(errGPUExceedsCapacity, "clamping allocatable to capacity", "node", knode.Name, "allocatable", gpuAllocatable, "capacity", gpuCapacity) node.Resources.GPU.Quantity.Allocatable.Set(gpuCapacity)