From fa1def912434421079a5185c4ac1dc319d45210c Mon Sep 17 00:00:00 2001 From: bussyjd Date: Sat, 30 May 2026 02:41:28 +0400 Subject: [PATCH] Fix agent-backed service readiness reconciliation --- cmd/obol/sell.go | 7 +- cmd/obol/sell_agent.go | 4 +- .../infrastructure/base/templates/x402.yaml | 27 ++++--- internal/serviceoffercontroller/agent.go | 44 ++++++++++- internal/serviceoffercontroller/agent_test.go | 55 +++++++++++++ .../serviceoffercontroller/agent_wallet.go | 77 +++++++++++++------ .../agent_wallet_test.go | 8 ++ 7 files changed, 183 insertions(+), 39 deletions(-) diff --git a/cmd/obol/sell.go b/cmd/obol/sell.go index b06508f7..a2dd00ac 100644 --- a/cmd/obol/sell.go +++ b/cmd/obol/sell.go @@ -1588,7 +1588,11 @@ func waitForOfferReady(cfg *config.Config, u *ui.UI, name, ns string, timeout ti } var ready bool - _ = u.RunWithSpinner("Waiting for service to be Ready (up to 60s)", func() error { + timeoutLabel := fmt.Sprintf("%ds", int(timeout.Round(time.Second).Seconds())) + if timeout >= time.Minute && timeout%time.Minute == 0 { + timeoutLabel = fmt.Sprintf("%dm", int(timeout.Minutes())) + } + _ = u.RunWithSpinner("Waiting for service to be Ready (up to "+timeoutLabel+")", func() error { for time.Now().Before(deadline) { if check() { ready = true @@ -4168,4 +4172,3 @@ func manifestNSName(manifest map[string]any) (string, string) { name, _ := md["name"].(string) return ns, name } - diff --git a/cmd/obol/sell_agent.go b/cmd/obol/sell_agent.go index 80b77315..0f196791 100644 --- a/cmd/obol/sell_agent.go +++ b/cmd/obol/sell_agent.go @@ -430,8 +430,10 @@ func runAgentBackedDemo( u.Dim(" Run on-chain discovery later: obol sell register --chain " + chain) } + ready := waitForOfferReady(cfg, u, name, offerNs, 2*time.Minute) + u.Blank() - printDemoTryIt(u, name, typeName, price, symbol, chain, tunnelURL, false) + printDemoTryIt(u, name, typeName, price, symbol, chain, tunnelURL, ready) return nil } diff --git a/internal/embed/infrastructure/base/templates/x402.yaml b/internal/embed/infrastructure/base/templates/x402.yaml index c64a8b3f..1d882ccf 100644 --- a/internal/embed/infrastructure/base/templates/x402.yaml +++ b/internal/embed/infrastructure/base/templates/x402.yaml @@ -140,21 +140,30 @@ rules: resources: ["namespaces"] verbs: ["get", "list", "watch", "create"] # Per-agent runtime resources written by the Agent reconciler. - # ServiceAccount + PVC are namespace-scoped child resources; secrets - # are limited to creating the agent's API-server token (random) and - # reading litellm-secrets for the master key. Reading all secrets - # cluster-wide is intentionally not granted; the agent reconciler only - # needs read access on litellm-secrets, but the dynamic client makes - # narrowing harder than it's worth for v1. + # `create` cannot be safely resourceName-scoped in Kubernetes RBAC, + # but follow-up reads/updates/deletes are limited to the fixed child + # object names the controller owns. - apiGroups: [""] resources: ["serviceaccounts"] - verbs: ["get", "create", "update", "patch"] + verbs: ["create"] + - apiGroups: [""] + resources: ["serviceaccounts"] + resourceNames: ["hermes"] + verbs: ["get", "update", "delete"] + - apiGroups: [""] + resources: ["persistentvolumeclaims"] + verbs: ["create"] - apiGroups: [""] resources: ["persistentvolumeclaims"] - verbs: ["get", "create", "update", "patch"] + resourceNames: ["hermes-data"] + verbs: ["get", "delete"] + - apiGroups: [""] + resources: ["secrets"] + verbs: ["create"] - apiGroups: [""] resources: ["secrets"] - verbs: ["get", "create", "update", "patch"] + resourceNames: ["hermes-api-server", "remote-signer-keystore"] + verbs: ["get", "update", "delete"] - apiGroups: [""] resources: ["pods"] verbs: ["get", "list"] diff --git a/internal/serviceoffercontroller/agent.go b/internal/serviceoffercontroller/agent.go index 9235121a..03735fcb 100644 --- a/internal/serviceoffercontroller/agent.go +++ b/internal/serviceoffercontroller/agent.go @@ -41,6 +41,8 @@ const ( agentFinalizer = "agents.obol.org/finalizer" ) +var agentReadinessRequeueDelay = 5 * time.Second + func (c *Controller) enqueueAgent(obj any) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil { @@ -227,11 +229,23 @@ func (c *Controller) reconcileAgent(ctx context.Context, key string) error { setAgentCondition(&status, agentConditionProvisioned, "False", "WaitingForDeployment", "Hermes deployment has no ready replicas yet") setAgentCondition(&status, agentConditionReady, "False", "WaitingForDeployment", "Hermes deployment has no ready replicas yet") status.Phase = monetizeapi.AgentPhaseProvisioning + if err := c.updateAgentStatus(ctx, raw, status); err != nil { + return err + } + c.requeueAgentAfter(key, agentReadinessRequeueDelay) + return nil } return c.updateAgentStatus(ctx, raw, status) } +func (c *Controller) requeueAgentAfter(key string, delay time.Duration) { + if c.agentQueue == nil { + return + } + c.agentQueue.AddAfter(key, delay) +} + // validateAgentSpec returns ok=false with a short reason+message when the // CR cannot be reconciled regardless of cluster state. CRD-level OpenAPI // validation already rejects most malformed specs at admission; this is a @@ -387,8 +401,8 @@ func (c *Controller) tearDownAgent(ctx context.Context, agent *monetizeapi.Agent {"ServiceAccount/hermes", dynNs(c.client.Resource(monetizeapi.ServiceAccountGVR)), hermesServiceName}, } for _, s := range steps { - if err := s.resource.Delete(ctx, s.name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf("delete %s: %w", s.label, err) + if err := c.deleteAgentChild(ctx, agent, s.label, s.resource, s.name); err != nil { + return err } } @@ -400,6 +414,32 @@ func (c *Controller) tearDownAgent(ctx context.Context, agent *monetizeapi.Agent return nil } +func (c *Controller) deleteAgentChild(ctx context.Context, agent *monetizeapi.Agent, label string, resource dynamic.ResourceInterface, name string) error { + child, err := resource.Get(ctx, name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("get %s before delete: %w", label, err) + } + if !agentOwnsChild(agent, child) { + log.Printf("serviceoffer-controller: skip deleting %s for Agent %s/%s: child lacks matching controller ownership labels", label, agent.Namespace, agent.Name) + return nil + } + if err := resource.Delete(ctx, name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete %s: %w", label, err) + } + return nil +} + +func agentOwnsChild(agent *monetizeapi.Agent, child *unstructured.Unstructured) bool { + labels := child.GetLabels() + if labels["app.kubernetes.io/managed-by"] != "serviceoffer-controller" { + return false + } + return labels["obol.org/agent"] == agent.Name || labels["app.kubernetes.io/instance"] == agent.Name +} + // applyAgentObject is a get-or-create-or-update wrapper used for agent // provisioning. The existing applyObject in controller.go relies on // server-side apply (ApplyPatchType), which the fake dynamic client used diff --git a/internal/serviceoffercontroller/agent_test.go b/internal/serviceoffercontroller/agent_test.go index d6717a07..b513da9f 100644 --- a/internal/serviceoffercontroller/agent_test.go +++ b/internal/serviceoffercontroller/agent_test.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/util/workqueue" ) func TestValidateAgentSpec_Happy(t *testing.T) { @@ -208,6 +209,45 @@ func TestReconcileAgent_HappyPath_ProvisionsAndPins(t *testing.T) { } } +func TestReconcileAgent_WaitingForDeploymentRequeues(t *testing.T) { + oldDelay := agentReadinessRequeueDelay + agentReadinessRequeueDelay = 0 + defer func() { agentReadinessRequeueDelay = oldDelay }() + + agent := &monetizeapi.Agent{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "obol.org/v1alpha1", + Kind: "Agent", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "quant", + Namespace: "agent-quant", + }, + Spec: monetizeapi.AgentSpec{ + Runtime: "hermes", + Model: "qwen3.5:9b", + Skills: []string{"addresses"}, + }, + } + c := newProvisioningTestController(t, agent, litellmSecretObject(t, "test-master-key")) + + if err := c.reconcileAgent(context.Background(), "agent-quant/quant"); err != nil { + t.Fatalf("reconcileAgent (finalizer): %v", err) + } + if err := c.reconcileAgent(context.Background(), "agent-quant/quant"); err != nil { + t.Fatalf("reconcileAgent (waiting): %v", err) + } + + key, shutdown := c.agentQueue.Get() + if shutdown { + t.Fatal("agent queue unexpectedly shut down") + } + if key != "agent-quant/quant" { + t.Fatalf("requeued key = %q, want agent-quant/quant", key) + } + c.agentQueue.Done(key) +} + func TestReconcileAgent_NoModel_ParksAtProvisioning(t *testing.T) { // EffectiveModel returns "" when neither spec nor status carries one. // The reconciler must surface that via a clear ModelUnpinned condition @@ -326,6 +366,20 @@ func TestReconcileAgent_DeletionTriggersTeardown(t *testing.T) { } } +func TestTearDownAgent_SkipsUnownedFixedNameChildren(t *testing.T) { + agent := agentWithWallet(t, "doomed", "agent-doomed", true) + unowned := buildAgentAPISecret(agent, "secret") + unowned.SetLabels(map[string]string{"app.kubernetes.io/name": hermesServiceName}) + c := newProvisioningTestController(t, agent, litellmSecretObject(t, "key"), unowned) + + if err := c.tearDownAgent(context.Background(), agent); err != nil { + t.Fatalf("tearDownAgent: %v", err) + } + if !resourceExists(t, c, "secrets", "agent-doomed", hermesAPISecret) { + t.Fatal("unowned fixed-name Secret should not be deleted") + } +} + func newAgentTestController(t *testing.T, agents ...*monetizeapi.Agent) *Controller { t.Helper() @@ -383,6 +437,7 @@ func newProvisioningTestController(t *testing.T, agent *monetizeapi.Agent, seedO services: dynClient.Resource(monetizeapi.ServiceGVR), configMaps: dynClient.Resource(monetizeapi.ConfigMapGVR), deployments: dynClient.Resource(monetizeapi.DeploymentGVR), + agentQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), } } diff --git a/internal/serviceoffercontroller/agent_wallet.go b/internal/serviceoffercontroller/agent_wallet.go index 6b589fe2..18802d9d 100644 --- a/internal/serviceoffercontroller/agent_wallet.go +++ b/internal/serviceoffercontroller/agent_wallet.go @@ -49,7 +49,7 @@ func (c *Controller) ensureAgentWallet(ctx context.Context, agent *monetizeapi.A return "", nil } - address, err := c.ensureSignerKeystore(ctx, agent.Namespace) + address, err := c.ensureSignerKeystore(ctx, agent) if err != nil { return "", fmt.Errorf("ensure keystore: %w", err) } @@ -74,12 +74,13 @@ const signerKeystoreAddressAnnotation = "obol.org/wallet-address" // ensureSignerKeystore is the keystore-side half of ensureAgentWallet. // Either reads an existing keystore Secret's address annotation, or // mints a new keypair, encrypts to V3, and writes the Secret. -func (c *Controller) ensureSignerKeystore(ctx context.Context, namespace string) (string, error) { +func (c *Controller) ensureSignerKeystore(ctx context.Context, agent *monetizeapi.Agent) (string, error) { + namespace := agent.Namespace existing, err := c.client.Resource(monetizeapi.SecretGVR).Namespace(namespace).Get(ctx, remoteSignerSecretName, metav1.GetOptions{}) if err == nil { annotations := existing.GetAnnotations() if addr := annotations[signerKeystoreAddressAnnotation]; addr != "" { - if err := c.ensureCanonicalKeystoreKey(ctx, namespace, existing); err != nil { + if err := c.ensureCanonicalKeystoreKeyAndLabels(ctx, agent, existing); err != nil { return "", err } return addr, nil @@ -100,49 +101,75 @@ func (c *Controller) ensureSignerKeystore(ctx context.Context, namespace string) } secret := buildSignerKeystoreSecret(namespace, mat) + ensureRemoteSignerSecretLabels(secret, agent.Name) if err := c.applyAgentObject(ctx, c.client.Resource(monetizeapi.SecretGVR).Namespace(namespace), secret); err != nil { return "", err } return mat.Address, nil } -func (c *Controller) ensureCanonicalKeystoreKey(ctx context.Context, namespace string, secret *unstructured.Unstructured) error { +func (c *Controller) ensureCanonicalKeystoreKeyAndLabels(ctx context.Context, agent *monetizeapi.Agent, secret *unstructured.Unstructured) error { + changed := ensureRemoteSignerSecretLabels(secret, agent.Name) data, _, err := unstructured.NestedStringMap(secret.Object, "data") if err != nil { return fmt.Errorf("read %s data: %w", remoteSignerSecretName, err) } - if data[remoteSignerKeystoreKey] != "" { - return nil - } - - var candidateKey, candidateValue string - for key, value := range data { - if key == "password" || !strings.HasSuffix(key, ".json") || value == "" { - continue + if data[remoteSignerKeystoreKey] == "" { + var candidateKey, candidateValue string + for key, value := range data { + if key == "password" || !strings.HasSuffix(key, ".json") || value == "" { + continue + } + if candidateKey != "" { + return fmt.Errorf("secret %s/%s has multiple legacy keystore JSON data keys (%q and %q); refusing to choose one", agent.Namespace, remoteSignerSecretName, candidateKey, key) + } + candidateKey = key + candidateValue = value } - if candidateKey != "" { - return fmt.Errorf("secret %s/%s has multiple legacy keystore JSON data keys (%q and %q); refusing to choose one", namespace, remoteSignerSecretName, candidateKey, key) + if candidateKey == "" { + return fmt.Errorf("secret %s/%s has wallet annotation but no keystore JSON data", agent.Namespace, remoteSignerSecretName) } - candidateKey = key - candidateValue = value - } - if candidateKey == "" { - return fmt.Errorf("secret %s/%s has wallet annotation but no keystore JSON data", namespace, remoteSignerSecretName) + data[remoteSignerKeystoreKey] = candidateValue + if err := unstructured.SetNestedStringMap(secret.Object, data, "data"); err != nil { + return fmt.Errorf("set canonical keystore key: %w", err) + } + changed = true } - - data[remoteSignerKeystoreKey] = candidateValue - if err := unstructured.SetNestedStringMap(secret.Object, data, "data"); err != nil { - return fmt.Errorf("set canonical keystore key: %w", err) + if !changed { + return nil } - _, err = c.client.Resource(monetizeapi.SecretGVR).Namespace(namespace).Update(ctx, secret, metav1.UpdateOptions{ + _, err = c.client.Resource(monetizeapi.SecretGVR).Namespace(agent.Namespace).Update(ctx, secret, metav1.UpdateOptions{ FieldManager: controllerFieldManager, }) if err != nil { - return fmt.Errorf("update %s/%s with canonical keystore key: %w", namespace, remoteSignerSecretName, err) + return fmt.Errorf("update %s/%s with canonical keystore metadata: %w", agent.Namespace, remoteSignerSecretName, err) } return nil } +func ensureRemoteSignerSecretLabels(secret *unstructured.Unstructured, agentName string) bool { + labels := secret.GetLabels() + if labels == nil { + labels = map[string]string{} + } + changed := false + for key, value := range map[string]string{ + "app.kubernetes.io/name": remoteSignerName, + "app.kubernetes.io/managed-by": "serviceoffer-controller", + "app.kubernetes.io/instance": agentName, + "obol.org/agent": agentName, + } { + if labels[key] != value { + labels[key] = value + changed = true + } + } + if changed { + secret.SetLabels(labels) + } + return changed +} + func buildSignerKeystoreSecret(namespace string, mat *openclaw.KeystoreMaterial) *unstructured.Unstructured { u := &unstructured.Unstructured{} u.SetUnstructuredContent(map[string]any{ diff --git a/internal/serviceoffercontroller/agent_wallet_test.go b/internal/serviceoffercontroller/agent_wallet_test.go index 037bf4f2..f742aee6 100644 --- a/internal/serviceoffercontroller/agent_wallet_test.go +++ b/internal/serviceoffercontroller/agent_wallet_test.go @@ -35,6 +35,10 @@ func TestEnsureAgentWallet_FreshKeystore(t *testing.T) { if annotations[signerKeystoreAddressAnnotation] != address { t.Errorf("address annotation = %q, want %q", annotations[signerKeystoreAddressAnnotation], address) } + labels := secret.GetLabels() + if labels["app.kubernetes.io/instance"] != agent.Name || labels["obol.org/agent"] != agent.Name { + t.Errorf("agent ownership labels missing from remote-signer Secret: %+v", labels) + } dataMap, _, _ := unstructured.NestedStringMap(secret.Object, "data") pwd, ok := dataMap["password"] if !ok || pwd == "" { @@ -94,6 +98,10 @@ func TestEnsureAgentWallet_ReusesExistingKeystore(t *testing.T) { t.Errorf("address = %q, want pre-seeded value", address) } secret := getRemoteSignerSecret(t, c, "agent-quant") + labels := secret.GetLabels() + if labels["app.kubernetes.io/instance"] != agent.Name || labels["obol.org/agent"] != agent.Name { + t.Errorf("existing remote-signer Secret was not labeled for safe teardown: %+v", labels) + } dataMap := remoteSignerSecretData(t, secret) wantData := remoteSignerSecretData(t, preSeeded) if dataMap[remoteSignerKeystoreKey] != wantData[remoteSignerKeystoreKey] {