Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cluster/kube/operators/clients/inventory/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type client struct {

type inventory struct {
inventoryV1.Cluster
ctx context.Context
}

type inventoryState struct {
Expand Down Expand Up @@ -219,13 +220,13 @@ func (cl *client) subscriber(in <-chan inventoryV1.Cluster, out chan<- ctypes.In
case inv := <-in:
pending = append(pending, inv)
if och == nil {
msg = newInventory(pending[0])
msg = newInventory(cl.ctx, pending[0])
och = out
}
case och <- msg:
pending = pending[1:]
if len(pending) > 0 {
msg = newInventory(pending[0])
msg = newInventory(cl.ctx, pending[0])
} else {
och = nil
msg = nil
Expand Down
126 changes: 126 additions & 0 deletions cluster/kube/operators/clients/inventory/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -211,6 +212,7 @@ func makeInventoryScaffold(t *testing.T) *inventoryScaffold {
kc := kfake.NewClientset()
ctx = context.WithValue(ctx, fromctx.CtxKeyKubeClientSet, kubernetes.Interface(kc))
ctx = context.WithValue(ctx, fromctx.CtxKeyInventoryUnderTest, true)
ctx = logr.NewContext(ctx, logr.Discard())

gSrv := setupInventoryGRPC(ctx, group, ports[0])

Expand Down Expand Up @@ -315,6 +317,130 @@ func TestInventoryZero(t *testing.T) {
require.Len(t, inv.Metrics().Nodes, 0)
}

func makeBaseCluster() inventoryV1.Cluster {
return inventoryV1.Cluster{
Nodes: inventoryV1.Nodes{
inventoryV1.Node{
Name: "node",
Resources: inventoryV1.NodeResources{
CPU: inventoryV1.CPU{
Quantity: inventoryV1.NewResourcePairMilli(1000, 1000, 0, resource.DecimalSI),
},
Memory: inventoryV1.Memory{
Quantity: inventoryV1.NewResourcePair(1024, 1024, 0, resource.DecimalSI),
},
GPU: inventoryV1.GPU{
Quantity: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI),
},
EphemeralStorage: inventoryV1.NewResourcePair(100, 100, 0, resource.DecimalSI),
VolumesAttached: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI),
VolumesMounted: inventoryV1.NewResourcePair(0, 0, 0, resource.DecimalSI),
},
Capabilities: inventoryV1.NodeCapabilities{},
},
},
}
}

func TestInventoryMetrics_negative_quantities_clamped(t *testing.T) {
tests := []struct {
name string
mutate func(c *inventoryV1.Cluster)
check func(t *testing.T, m inventoryV1.Metrics)
}{
{
name: "negative_allocatable_gpu",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.GPU.Quantity.Allocatable.Set(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Available.GPU, "negative allocatable must yield 0")
assert.Equal(t, uint64(0), m.TotalAvailable.GPU)
},
},
{
name: "negative_allocatable_cpu",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.CPU.Quantity.Allocatable.SetMilli(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.CPU)
assert.Equal(t, uint64(0), m.TotalAllocatable.CPU)
},
},
{
name: "negative_allocatable_memory",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.Memory.Quantity.Allocatable.Set(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.Memory)
assert.Equal(t, uint64(0), m.TotalAllocatable.Memory)
},
},
{
name: "negative_allocatable_storage_ephemeral",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.EphemeralStorage.Allocatable.Set(-1)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Allocatable.StorageEphemeral)
assert.Equal(t, uint64(0), m.TotalAllocatable.StorageEphemeral)
},
},
{
name: "negative_allocatable_storage_class",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Capabilities.StorageClasses = []string{"default"}
c.Storage = inventoryV1.ClusterStorage{
{
Quantity: inventoryV1.NewResourcePair(0, -1, 0, resource.DecimalSI),
Info: inventoryV1.StorageInfo{Class: "default"},
},
}
},
check: func(t *testing.T, m inventoryV1.Metrics) {
v, exists := m.TotalAllocatable.Storage["default"]
require.True(t, exists, "storage class key must be present")
assert.Equal(t, uint64(0), v)
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
},
{
name: "allocated_exceeds_allocatable_gpu",
mutate: func(c *inventoryV1.Cluster) {
c.Nodes[0].Resources.GPU.Quantity = inventoryV1.NewResourcePair(4, 4, 10, resource.DecimalSI)
},
check: func(t *testing.T, m inventoryV1.Metrics) {
require.Len(t, m.Nodes, 1)
assert.Equal(t, uint64(0), m.Nodes[0].Available.GPU, "allocated>allocatable must yield 0 available")
assert.Equal(t, uint64(4), m.Nodes[0].Allocatable.GPU)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scaffold := makeInventoryScaffold(t)
cl, err := NewClient(scaffold.ctx)
require.NoError(t, err)
require.NotNil(t, cl)

cluster := makeBaseCluster()
tt.mutate(&cluster)
scaffold.gInv.invch <- cluster

inv := waitForInventory(t, cl.ResultChan())
require.NotNil(t, inv)

tt.check(t, inv.Metrics())
})
}
}

func TestInventorySingleNodeNoPods(t *testing.T) {
const expectedCPU = 13
const expectedMemory = 14
Expand Down
89 changes: 60 additions & 29 deletions cluster/kube/operators/clients/inventory/inventory.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package inventory

import (
"context"
"errors"
"fmt"
"reflect"
"strings"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/resource"

inventoryV1 "pkg.akt.dev/go/inventory/v1"
dvbeta "pkg.akt.dev/go/node/deployment/v1beta4"
attrtypes "pkg.akt.dev/go/node/types/attributes/v1"
Expand All @@ -14,24 +19,48 @@ import (
ctypes "github.com/akash-network/provider/cluster/types/v1beta3"
cinventory "github.com/akash-network/provider/cluster/types/v1beta3/clients/inventory"
crd "github.com/akash-network/provider/pkg/apis/akash.network/v2beta2"
"github.com/akash-network/provider/tools/fromctx"
)

var _ ctypes.Inventory = (*inventory)(nil)
var (
_ ctypes.Inventory = (*inventory)(nil)

func newInventory(clState inventoryV1.Cluster) *inventory {
inv := &inventory{
Cluster: clState,
errNegativeQuantity = errors.New("negative resource quantity")
quantityZero = resource.NewQuantity(0, resource.DecimalSI)
)

func clampUint64(log logr.Logger, v int64, what string) uint64 {
if v < 0 {
log.Error(errNegativeQuantity, "clamped to 0", "what", what, "value", v)
return 0
}
return uint64(v)
}

// clampAvailableUint64 is a defense-in-depth guard for ResourcePair.Available().
// Available() treats allocatable == -1 as an "unlimited" sentinel and returns MaxInt64.
// Normal code path sanitizes inputs so -1 never reaches here, but if it did
// (e.g. skipped sanitization), the uint64 result would be catastrophically wrong.
func clampAvailableUint64(log logr.Logger, allocatable *resource.Quantity, v int64, what string) uint64 {
if allocatable.Cmp(*quantityZero) < 0 {
log.Error(errNegativeQuantity, "allocatable is negative, available clamped to 0", "what", what, "allocatable", allocatable.String())
return 0
}
return clampUint64(log, v, what)
}

return inv
func newInventory(ctx context.Context, clState inventoryV1.Cluster) *inventory {
return &inventory{
Cluster: clState,
ctx: ctx,
}
}

func (inv *inventory) dup() inventory {
dup := inventory{
return inventory{
Cluster: *inv.Cluster.Dup(),
ctx: inv.ctx,
}

return dup
}

func (inv *inventory) Dup() ctypes.Inventory {
Expand Down Expand Up @@ -332,47 +361,49 @@ func (inv *inventory) Metrics() inventoryV1.Metrics {
Nodes: make([]inventoryV1.NodeMetrics, 0, len(inv.Nodes)),
}

log := fromctx.LogrFromCtx(inv.ctx).WithName("inventory.metrics")
for _, nd := range inv.Nodes {
ndLog := log.WithValues("node", nd.Name)

cpuAllocatable := clampUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable.MilliValue(), "allocatable_cpu")
gpuAllocatable := clampUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable.Value(), "allocatable_gpu")
memoryAllocatable := clampUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable.Value(), "allocatable_memory")
storageEphAllocatable := clampUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable.Value(), "allocatable_storage_ephemeral")

invNode := inventoryV1.NodeMetrics{
Name: nd.Name,
Allocatable: inventoryV1.ResourcesMetric{
CPU: uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()), // nolint: gosec
GPU: uint64(nd.Resources.GPU.Quantity.Allocatable.Value()), // nolint: gosec
Memory: uint64(nd.Resources.Memory.Quantity.Allocatable.Value()), // nolint: gosec
StorageEphemeral: uint64(nd.Resources.EphemeralStorage.Allocatable.Value()), // nolint: gosec
CPU: cpuAllocatable,
GPU: gpuAllocatable,
Memory: memoryAllocatable,
StorageEphemeral: storageEphAllocatable,
},
}

cpuTotal += uint64(nd.Resources.CPU.Quantity.Allocatable.MilliValue()) // nolint: gosec
gpuTotal += uint64(nd.Resources.GPU.Quantity.Allocatable.Value()) // nolint: gosec
memoryTotal += uint64(nd.Resources.Memory.Quantity.Allocatable.Value()) // nolint: gosec
storageEphemeralTotal += uint64(nd.Resources.EphemeralStorage.Allocatable.Value()) // nolint: gosec
cpuTotal += cpuAllocatable
gpuTotal += gpuAllocatable
memoryTotal += memoryAllocatable
storageEphemeralTotal += storageEphAllocatable

avail := nd.Resources.CPU.Quantity.Available()
invNode.Available.CPU = uint64(avail.MilliValue()) // nolint: gosec
invNode.Available.CPU = clampAvailableUint64(ndLog, nd.Resources.CPU.Quantity.Allocatable, nd.Resources.CPU.Quantity.Available().MilliValue(), "available_cpu")
cpuAvailable += invNode.Available.CPU

avail = nd.Resources.GPU.Quantity.Available()
invNode.Available.GPU = uint64(avail.Value()) // nolint: gosec
invNode.Available.GPU = clampAvailableUint64(ndLog, nd.Resources.GPU.Quantity.Allocatable, nd.Resources.GPU.Quantity.Available().Value(), "available_gpu")
gpuAvailable += invNode.Available.GPU

avail = nd.Resources.Memory.Quantity.Available()
invNode.Available.Memory = uint64(avail.Value()) // nolint: gosec
invNode.Available.Memory = clampAvailableUint64(ndLog, nd.Resources.Memory.Quantity.Allocatable, nd.Resources.Memory.Quantity.Available().Value(), "available_memory")
memoryAvailable += invNode.Available.Memory

avail = nd.Resources.EphemeralStorage.Available()
invNode.Available.StorageEphemeral = uint64(avail.Value()) // nolint: gosec
invNode.Available.StorageEphemeral = clampAvailableUint64(ndLog, nd.Resources.EphemeralStorage.Allocatable, nd.Resources.EphemeralStorage.Available().Value(), "available_storage_ephemeral")
storageEphemeralAvailable += invNode.Available.StorageEphemeral

ret.Nodes = append(ret.Nodes, invNode)
}

for _, class := range inv.Storage {
tmp := class.Quantity.Allocatable.DeepCopy()
storageTotal[class.Info.Class] = uint64(tmp.Value()) //nolint: gosec

tmp = *class.Quantity.Available()
storageAvailable[class.Info.Class] = uint64(tmp.Value()) //nolint: gosec
classLog := log.WithValues("storageClass", class.Info.Class)
storageTotal[class.Info.Class] = clampUint64(classLog, class.Quantity.Allocatable.Value(), "allocatable_storage")
storageAvailable[class.Info.Class] = clampAvailableUint64(classLog, class.Quantity.Allocatable, class.Quantity.Available().Value(), "available_storage")
}

ret.TotalAllocatable = inventoryV1.MetricTotal{
Expand Down
Loading
Loading