Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions internal/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,40 @@ package metrics

import (
"context"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"

operatorv1alpha1 "github.com/kcp-dev/kcp-operator/sdk/apis/operator/v1alpha1"
)

const (
// UnknownPhase is used when the phase of a resource is empty.
UnknownPhase = "Unknown"
)

// MetricsCollector collects metrics for kcp-operator resources.
type MetricsCollector struct {
client ctrlruntimeclient.Client
mu sync.RWMutex
}

// NewMetricsCollector creates a new MetricsCollector.
func NewMetricsCollector(client ctrlruntimeclient.Client) *MetricsCollector {
return &MetricsCollector{
client: client,
}
}

// Start begins periodic metrics updates every 30 seconds until ctx is canceled.
func (mc *MetricsCollector) Start(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)

// Initial update
mc.updateObjectCounts(ctx)

for {
Expand All @@ -55,6 +64,7 @@ func (mc *MetricsCollector) Start(ctx context.Context) {
}
}

// recordConditionStatuses sets Prometheus metrics for resource conditions.
func recordConditionStatuses(resourceType, name, namespace string, conditions []metav1.Condition) {
for _, condition := range conditions {
ConditionStatus.
Expand All @@ -63,7 +73,11 @@ func recordConditionStatuses(resourceType, name, namespace string, conditions []
}
}

// updateObjectCounts resets and repopulates all resource metrics.
func (mc *MetricsCollector) updateObjectCounts(ctx context.Context) {
mc.mu.Lock()
defer mc.mu.Unlock()

ConditionStatus.Reset()
mc.updateRootShardCounts(ctx)
mc.updateShardCounts(ctx)
Expand All @@ -73,6 +87,7 @@ func (mc *MetricsCollector) updateObjectCounts(ctx context.Context) {
mc.updateVirtualWorkspaceCounts(ctx)
}

// updateRootSharedCounts updates metrics for RootShard resources.
func (mc *MetricsCollector) updateRootShardCounts(ctx context.Context) {
var rootShards operatorv1alpha1.RootShardList
if err := mc.client.List(ctx, &rootShards); err != nil {
Expand Down Expand Up @@ -102,6 +117,7 @@ func (mc *MetricsCollector) updateRootShardCounts(ctx context.Context) {
}
}

// updateShardCounts updates metrics for Shard resources.
func (mc *MetricsCollector) updateShardCounts(ctx context.Context) {
var shards operatorv1alpha1.ShardList
if err := mc.client.List(ctx, &shards); err != nil {
Expand Down Expand Up @@ -131,6 +147,7 @@ func (mc *MetricsCollector) updateShardCounts(ctx context.Context) {
}
}

// updateFrontProxyCounts updates metrics for FrontProxy resources.
func (mc *MetricsCollector) updateFrontProxyCounts(ctx context.Context) {
var frontProxies operatorv1alpha1.FrontProxyList
if err := mc.client.List(ctx, &frontProxies); err != nil {
Expand Down Expand Up @@ -160,6 +177,7 @@ func (mc *MetricsCollector) updateFrontProxyCounts(ctx context.Context) {
}
}

// updateCacheServerCounts updates metrics for CacheServer resources.
func (mc *MetricsCollector) updateCacheServerCounts(ctx context.Context) {
var cacheServers operatorv1alpha1.CacheServerList
if err := mc.client.List(ctx, &cacheServers); err != nil {
Expand All @@ -178,6 +196,7 @@ func (mc *MetricsCollector) updateCacheServerCounts(ctx context.Context) {
}
}

// updateKubeconfigCounts updates metrics for Kubeconfig resources.
func (mc *MetricsCollector) updateKubeconfigCounts(ctx context.Context) {
var kubeconfigs operatorv1alpha1.KubeconfigList
if err := mc.client.List(ctx, &kubeconfigs); err != nil {
Expand All @@ -198,6 +217,7 @@ func (mc *MetricsCollector) updateKubeconfigCounts(ctx context.Context) {
}
}

// updateVirtualWorkspaceCounts updates metrics for VirtualWorkspace resources.
func (mc *MetricsCollector) updateVirtualWorkspaceCounts(ctx context.Context) {
var virtualWorkspaces operatorv1alpha1.VirtualWorkspaceList
if err := mc.client.List(ctx, &virtualWorkspaces); err != nil {
Expand All @@ -215,3 +235,16 @@ func (mc *MetricsCollector) updateVirtualWorkspaceCounts(ctx context.Context) {
VirtualWorkspaceCount.WithLabelValues(namespace).Set(float64(count))
}
}

// Collect safely exposes a consistent snapshot of metrics to Prometheus.
func (mc *MetricsCollector) Collect(ch chan<- prometheus.Metric) {
mc.mu.RLock()
defer mc.mu.RUnlock()

ConditionStatus.Collect(ch)
RootShardCount.Collect(ch)
ShardCount.Collect(ch)
FrontProxyCount.Collect(ch)
CacheServerCount.Collect(ch)
KubeconfigCount.Collect(ch)
}