Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type client struct {

type inventory struct {
inventoryV1.Cluster
ctx context.Context
}

type inventoryState struct {
Expand Down Expand Up @@ -219,13 +220,13 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In
case inv := <-in:
pending = append(pending, inv)
if och == nil {
msg = newInventory(pending[0])
msg = newInventory(cl.ctx, pending[0])
och = out
}
case och <- msg:
pending = pending[1:]
if len(pending) > 0 {
msg = newInventory(pending[0])
msg = newInventory(cl.ctx, pending[0])
} else {
och = nil
msg = nil
Expand Down
124 changes: 124 additions & 0 deletions cluster/kube/operators/clients/inventory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -211,6 +212,7 @@ func makeInventoryScaffold(t *testing.T) *inventoryScaffold {
kc := kfake.NewClientset()
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyInventoryUnderTest, true)
ctx = logr.NewContext(ctx, logr.Discard())

gSrv := setupInventoryGRPC(ctx, group, ports[0])

Expand Down Expand Up @@ -315,6 +317,128 @@ func TestInventoryZero(t *testing.T) {
require.Len(t, inv.Metrics().Nodes, 0)
}

func makeBaseCluster() inventoryV1.Cluster {
return inventoryV1.Cluster{
Nodes: inventoryV1.Nodes{
inventoryV1.Node{
Name: "node",
Resources: inventoryV1.NodeResources{
CPU: inventoryV1.CPU{
Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI),
},
Memory: inventoryV1.Memory{
Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI),
},
GPU: inventoryV1.GPU{
Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI),
},
EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI),
VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI),
VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI),
},
Capabilities: inventoryV1.NodeCapabilities{},
},
},
}
}

func TestInventoryMetrics_negative_quantities_clamped(t *testing.T) {
tests := []struct {
name string
mutate func(c *inventoryV1.Cluster)
check func(t *testing.T, m inventoryV1.Metrics)
}{
{
name: "negative_allocatable_gpu",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.GPU.Quantity.Allocatable.Set(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Available.GPU, "negative allocatable must yield 0")
assert.Equal(t, uint64(0), m.TotalAvailable.GPU)
},
},
{
name: "negative_allocatable_cpu",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.CPU.Quantity.Allocatable.SetMilli(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.CPU)
assert.Equal(t, uint64(0), m.TotalAllocatable.CPU)
},
},
{
name: "negative_allocatable_memory",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.Memory.Quantity.Allocatable.Set(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.Memory)
assert.Equal(t, uint64(0), m.TotalAllocatable.Memory)
},
},
{
name: "negative_allocatable_storage_ephemeral",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.EphemeralStorage.Allocatable.Set(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.StorageEphemeral)
assert.Equal(t, uint64(0), m.TotalAllocatable.StorageEphemeral)
},
},
{
name: "negative_allocatable_storage_class",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Capabilities.StorageClasses = []string{"default"}
c.Storage = inventoryV1.ClusterStorage{
{
Quantity: inventoryV1.NewResourcePair(0, -1, 0, resource.DecimalSI),
Info: inventoryV1.StorageInfo{Class: "default"},
},
}
},
check: func(t *testing.T, m inventoryV1.Metrics) {
assert.Equal(t, uint64(0), m.TotalAllocatable.Storage["default"])
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
},
{
name: "allocated_exceeds_allocatable_gpu",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.GPU.Quantity = inventoryV1.NewResourcePair(4, 4, 10, resource.DecimalSI)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Available.GPU, "allocated>allocatable must yield 0 available")
assert.Equal(t, uint64(4), m.Nodes[0].Allocatable.GPU)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scaffold := makeInventoryScaffold(t)
cl, err := NewClient(scaffold.ctx)
require.NoError(t, err)
require.NotNil(t, cl)

cluster := makeBaseCluster()
tt.mutate(&cluster)
scaffold.gInv.invch <- cluster

inv := waitForInventory(t, cl.ResultChan())
require.NotNil(t, inv)

tt.check(t, inv.Metrics())
})
}
}

func TestInventorySingleNodeNoPods(t *testing.T) {
const expectedCPU = 13
const expectedMemory = 14
Expand Down
89 changes: 60 additions & 29 deletions cluster/kube/operators/clients/inventory/inventory.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package inventory

import (
"context"
"errors"
"fmt"
"reflect"
"strings"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/resource"

inventoryV1 "pkg.akt.dev/go/inventory/v1"
dvbeta "pkg.akt.dev/go/node/deployment/v1beta4"
attrtypes "pkg.akt.dev/go/node/types/attributes/v1"
Expand All @@ -14,24 +19,48 @@ import (
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
cinventory "github.com/akash-network/provider/cluster/types/v1beta3/clients/inventory"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
"github.com/akash-network/provider/tools/fromctx"
)

var _ ctypes.Inventory = (*inventory)(nil)
var (
_ ctypes.Inventory = (*inventory)(nil)

func newInventory(clState inventoryV1.Cluster) *inventory {
inv := &inventory{
Cluster: clState,
errNegativeQuantity = errors.New("negative resource quantity")
quantityZero = resource.NewQuantity(0, resource.DecimalSI)
)

func clampUint64(log logr.Logger, v int64, what string) uint64 {
if v < 0 {
log.Error(errNegativeQuantity, "clamped to 0", "what", what, "value", v)
return 0
}
return uint64(v)
}

// clampAvailableUint64 is a defense-in-depth guard for ResourcePair.Available().
// Available() treats allocatable == -1 as an "unlimited" sentinel and returns MaxInt64.
// Normal code path sanitizes inputs so -1 never reaches here, but if it did
// (e.g. skipped sanitization), the uint64 result would be catastrophically wrong.
func clampAvailableUint64(log logr.Logger, allocatable *resource.Quantity, v int64, what string) uint64 {
if allocatable.Cmp(*quantityZero) < 0 {
log.Error(errNegativeQuantity, "allocatable is negative, available clamped to 0", "what", what, "allocatable", allocatable.String())
return 0
}
return clampUint64(log, v, what)
}

return inv
func newInventory(ctx context.Context, clState inventoryV1.Cluster) *inventory {
return &inventory{
Cluster: clState,
ctx: ctx,
}
}

func (inv *inventory) dup() inventory {
dup := inventory{
return inventory{
Cluster: *inv.Cluster.Dup(),
ctx: inv.ctx,
}

return dup
}

func (inv *inventory) Dup() ctypes.Inventory {
Expand Down Expand Up @@ -332,47 +361,49 @@ func (inv *inventory) Metrics() inventoryV1.Metrics {
Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.Nodes)),
}

log := fromctx.LogrFromCtx(inv.ctx).WithName("inventory.metrics")
for _, nd := range inv.Nodes {
ndLog := log.WithValues("node", nd.Name)

cpuAllocatable := clampUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable.MilliValue(), "allocatable_cpu")
gpuAllocatable := clampUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable.Value(), "allocatable_gpu")
memoryAllocatable := clampUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable.Value(), "allocatable_memory")
storageEphAllocatable := clampUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable.Value(), "allocatable_storage_ephemeral")

invNode := inventoryV1.NodeMetrics{
Name: nd.Name,
Allocatable: inventoryV1.ResourcesMetric{
CPU: uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()), // nolint: gosec
GPU: uint64(nd.Resources.GPU.Quantity.Allocatable.Value()), // nolint: gosec
Memory: uint64(nd.Resources.Memory.Quantity.Allocatable.Value()), // nolint: gosec
StorageEphemeral: uint64(nd.Resources.EphemeralStorage.Allocatable.Value()), // nolint: gosec
CPU: cpuAllocatable,
GPU: gpuAllocatable,
Memory: memoryAllocatable,
StorageEphemeral: storageEphAllocatable,
},
}

cpuTotal += uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()) // nolint: gosec
gpuTotal += uint64(nd.Resources.GPU.Quantity.Allocatable.Value()) // nolint: gosec
memoryTotal += uint64(nd.Resources.Memory.Quantity.Allocatable.Value()) // nolint: gosec
storageEphemeralTotal += uint64(nd.Resources.EphemeralStorage.Allocatable.Value()) // nolint: gosec
cpuTotal += cpuAllocatable
gpuTotal += gpuAllocatable
memoryTotal += memoryAllocatable
storageEphemeralTotal += storageEphAllocatable

avail := nd.Resources.CPU.Quantity.Available()
invNode.Available.CPU = uint64(avail.MilliValue()) // nolint: gosec
invNode.Available.CPU = clampAvailableUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, nd.Resources.CPU.Quantity.Available().MilliValue(), "available_cpu")
cpuAvailable += invNode.Available.CPU

avail = nd.Resources.GPU.Quantity.Available()
invNode.Available.GPU = uint64(avail.Value()) // nolint: gosec
invNode.Available.GPU = clampAvailableUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable, nd.Resources.GPU.Quantity.Available().Value(), "available_gpu")
gpuAvailable += invNode.Available.GPU

avail = nd.Resources.Memory.Quantity.Available()
invNode.Available.Memory = uint64(avail.Value()) // nolint: gosec
invNode.Available.Memory = clampAvailableUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, nd.Resources.Memory.Quantity.Available().Value(), "available_memory")
memoryAvailable += invNode.Available.Memory

avail = nd.Resources.EphemeralStorage.Available()
invNode.Available.StorageEphemeral = uint64(avail.Value()) // nolint: gosec
invNode.Available.StorageEphemeral = clampAvailableUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, nd.Resources.EphemeralStorage.Available().Value(), "available_storage_ephemeral")
storageEphemeralAvailable += invNode.Available.StorageEphemeral

ret.Nodes = append(ret.Nodes, invNode)
}

for _, class := range inv.Storage {
tmp := class.Quantity.Allocatable.DeepCopy()
storageTotal[class.Info.Class] = uint64(tmp.Value()) //nolint: gosec

tmp = *class.Quantity.Available()
storageAvailable[class.Info.Class] = uint64(tmp.Value()) //nolint: gosec
classLog := log.WithValues("storageClass", class.Info.Class)
storageTotal[class.Info.Class] = clampUint64(classLog, class.Quantity.Allocatable.Value(), "allocatable_storage")
storageAvailable[class.Info.Class] = clampAvailableUint64(classLog, class.Quantity.Allocatable, class.Quantity.Available().Value(), "available_storage")
}

ret.TotalAllocatable = inventoryV1.MetricTotal{
Expand Down
Loading
Loading