diff --git a/api/v1alpha1/source_webhook_types.go b/api/v1alpha1/source_webhook_types.go index 7cb6d93..3fdd273 100644 --- a/api/v1alpha1/source_webhook_types.go +++ b/api/v1alpha1/source_webhook_types.go @@ -2,16 +2,6 @@ package v1alpha1 // WebhookConfig contains configuration for Webhook notifications. type WebhookConfig struct { - // Path that the webhook will receive the notifications. - // If not present `/webhook` will be used. The path always expects a POST and this is not configurable - // +optional - Path string `json:"path"` - - // Address is the address where the webhook will be served in your infrastructure. - // If not present, defaults to `:8090` - // +optional - Address string `json:"address"` - // SecretIdentifierOnPayload is the key that the reloader will look for in the payload. // The value of this key should be the same name as in the external secret. It will default to `0.data.ObjectName` if not set // +optional diff --git a/cmd/main.go b/cmd/main.go index 6ac58f5..aa514ba 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -31,13 +31,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - "sigs.k8s.io/controller-runtime/pkg/webhook" + crtwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" externalsecrets "github.com/external-secrets/external-secrets/apis/externalsecrets/v1" pushsecrets "github.com/external-secrets/external-secrets/apis/externalsecrets/v1alpha1" "github.com/external-secrets/reloader/api/v1alpha1" "github.com/external-secrets/reloader/internal/controller" + "github.com/external-secrets/reloader/internal/listener/webhook" // +kubebuilder:scaffold:imports ) @@ -99,10 +100,12 @@ func main() { tlsOpts = append(tlsOpts, disableHTTP2) } - webhookServer := webhook.NewServer(webhook.Options{ + crdWebhookServer := crtwebhook.NewServer(crtwebhook.Options{ TLSOpts: tlsOpts, }) + notificationWebhook := webhook.NewWebhookServer(webhookAddr, ctrl.Log.WithName("notification")) + metricsServerOptions := metricsserver.Options{ BindAddress: metricsAddr, SecureServing: secureMetrics, @@ -118,7 +121,7 @@ func main() { mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, Metrics: metricsServerOptions, - WebhookServer: webhookServer, + WebhookServer: crdWebhookServer, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "0cd7d2f7.externalsecrets.com", @@ -128,9 +131,15 @@ func main() { os.Exit(1) } + if err := mgr.Add(notificationWebhook); err != nil { + setupLog.Error(err, "unable to add notification webhook server") + os.Exit(1) + } + if err = (controller.NewReloaderReconciler( mgr.GetClient(), mgr.GetScheme(), + notificationWebhook, )).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Reloader") os.Exit(1) diff --git a/deploy/charts/reloader/crds/reloader.external-secrets.io_configs.yaml b/deploy/charts/reloader/crds/reloader.external-secrets.io_configs.yaml index 88b28fd..0122f03 100644 --- a/deploy/charts/reloader/crds/reloader.external-secrets.io_configs.yaml +++ b/deploy/charts/reloader/crds/reloader.external-secrets.io_configs.yaml @@ -1184,21 +1184,11 @@ spec: webhook: description: Webhook configuration (required if Type is Webhook). properties: - address: - description: |- - Address is the address where the webhook will be served in your infrastructure. - If not present, defaults to `:8090` - type: string identifierPathOnPayload: description: |- SecretIdentifierOnPayload is the key that the reloader will look for in the payload. The value of this key should be the same name as in the external secret. It will default to `0.data.ObjectName` if not set type: string - path: - description: |- - Path that the webhook will receive the notifications. - If not present `/webhook` will be used. The path always expects a POST and this is not configurable - type: string retryPolicy: description: |- RetryPolicy represents the policy to retry when a message fails. diff --git a/deploy/charts/reloader/templates/deployment.yaml b/deploy/charts/reloader/templates/deployment.yaml index b29d04d..c63e4d1 100644 --- a/deploy/charts/reloader/templates/deployment.yaml +++ b/deploy/charts/reloader/templates/deployment.yaml @@ -51,6 +51,9 @@ spec: args: - --health-probe-bind-address=:8081 - --metrics-bind-address=:{{ .Values.metrics.listen.port }} + {{- if .Values.service.webhook.enabled }} + - --webhook-bind-address=:{{ .Values.service.webhook.listenPort }} + {{- end }} {{- if .Values.leaderElect }} - --leader-elect {{- end }} @@ -66,15 +69,17 @@ spec: {{- toYaml . | nindent 12 }} {{- end }} ports: - - name: http - containerPort: {{ .Values.service.port }} - protocol: TCP - name: healthz containerPort: 8081 protocol: TCP - name: metrics containerPort: {{ .Values.metrics.listen.port }} protocol: TCP + {{- if .Values.service.webhook.enabled }} + - name: webhook + containerPort: {{ .Values.service.webhook.listenPort }} + protocol: TCP + {{- end }} livenessProbe: {{- toYaml .Values.livenessProbe | nindent 12 }} readinessProbe: diff --git a/deploy/charts/reloader/templates/ingress.yaml b/deploy/charts/reloader/templates/ingress.yaml index 796e535..2cb3488 100644 --- a/deploy/charts/reloader/templates/ingress.yaml +++ b/deploy/charts/reloader/templates/ingress.yaml @@ -1,10 +1,13 @@ {{- if .Values.ingress.enabled -}} +{{- if not .Values.service.webhook.enabled }} +{{- fail "ingress.enabled requires service.webhook.enabled (Ingress routes to the webhook Service)" }} +{{- end }} {{- $fullName := include "reloader.fullname" . -}} -{{- $svcPort := .Values.service.port -}} +{{- $svcPort := .Values.service.webhook.port -}} apiVersion: networking.k8s.io/v1 kind: Ingress metadata: - name: {{ $fullName }} + name: {{ $fullName }}-webhook namespace: {{ include "reloader.namespace" . }} labels: {{- include "reloader.labels" . | nindent 4 }} @@ -33,10 +36,10 @@ spec: paths: {{- range .paths }} - path: {{ .path }} - pathType: {{ .pathType | default "ImplementationSpecific" }} + pathType: {{ .pathType | default "Prefix" }} backend: service: - name: {{ $fullName }} + name: {{ $fullName }}-webhook port: number: {{ $svcPort }} {{- end }} diff --git a/deploy/charts/reloader/templates/service.yaml b/deploy/charts/reloader/templates/service.yaml index 3f2705e..d4bcf98 100644 --- a/deploy/charts/reloader/templates/service.yaml +++ b/deploy/charts/reloader/templates/service.yaml @@ -1,20 +1,3 @@ -apiVersion: v1 -kind: Service -metadata: - name: {{ include "reloader.fullname" . }} - namespace: {{ include "reloader.namespace" . }} - labels: - {{- include "reloader.labels" . | nindent 4 }} -spec: - type: {{ .Values.service.type }} - ports: - - port: {{ .Values.service.port }} - targetPort: http - protocol: TCP - name: http - selector: - {{- include "reloader.selectorLabels" . | nindent 4 }} ---- {{- if .Values.service.webhook.enabled }} apiVersion: v1 kind: Service @@ -33,8 +16,8 @@ spec: selector: {{- include "reloader.selectorLabels" . | nindent 4 }} {{- end }} ---- {{- if .Values.service.socket.enabled }} +--- apiVersion: v1 kind: Service metadata: diff --git a/deploy/charts/reloader/values.yaml b/deploy/charts/reloader/values.yaml index 674a405..a87f70e 100644 --- a/deploy/charts/reloader/values.yaml +++ b/deploy/charts/reloader/values.yaml @@ -49,14 +49,14 @@ securityContext: type: RuntimeDefault service: - type: ClusterIP - port: 8080 - + # When enabled, the chart sets --webhook-bind-address and exposes the webhook port on the Deployment. + # Send POST to http://-webhook..svc:/webhook/ (cluster-scoped Config name). webhook: enabled: false type: ClusterIP port: 8090 - targetPort: 8090 + listenPort: 8090 + targetPort: webhook socket: enabled: false @@ -126,11 +126,12 @@ certificate: ingress: enabled: false + # Routes to the webhook Service (-webhook). Requires service.webhook.enabled. className: "" annotations: {} hosts: - host: chart-example.local paths: - - path: / - pathType: ImplementationSpecific + - path: /webhook + pathType: Prefix tls: [] diff --git a/docs/reference/api-reference.md b/docs/reference/api-reference.md index d845f62..3898f8b 100644 --- a/docs/reference/api-reference.md +++ b/docs/reference/api-reference.md @@ -410,8 +410,8 @@ WebhookConfig contains configuration for Webhook notifications. | Field | Type | Description | Validation | |--------------------------|-------------------------------|-------------------------------------------------------------------------------------------------------|------------| -| `path` | string | Endpoint path (default: `/webhook`). Always expects a POST request. | | -| `address` | string | Address where the webhook is served. Defaults to `:8090`. | | | `identifierPathOnPayload`| string | Key in the payload used to identify the secret. Defaults to `0.data.ObjectName` if not set. | | | `webhookAuth` | [WebhookAuth](#webhookauth) | Authentication method for the webhook. | | | `retryPolicy` | [RetryPolicy](#retrypolicy) | Policy to retry failed messages. If not set, 4xx will be returned and no retry will be attempted. | | + +The controller serves all webhook `Config` resources on one HTTP listener (`--webhook-bind-address`). Each `Config` is reachable at `POST /webhook/`. diff --git a/docs/sources/webhook.md b/docs/sources/webhook.md index 9831ede..2c6dcd3 100644 --- a/docs/sources/webhook.md +++ b/docs/sources/webhook.md @@ -1,21 +1,30 @@ # Webhook Source -This guide explains how to set up the Webhook notification source for the Reloader component in your environment. Using Webhooks as a notification source allows you to trigger secret rotation events via HTTP calls to your Webhook endpoint. +This guide explains how to set up the Webhook notification source for the Reloader component in your environment. Using webhooks as a notification source lets you trigger secret rotation events by sending HTTP POST requests to the Reloader process. + +## How it works + +The controller runs a **single shared HTTP server** for all `Config` resources. The listen address is set with the controller flag **`--webhook-bind-address`** (default `:8082`). Each cluster-scoped `Config` is exposed at: + +`POST /webhook/` + +There is no per-CR URL path or bind address; callers use the `Config` name in the path. ## Configuration -To configure a Webhook as a notification source, the Reloader needs to be provided with the URL path to listen on, as well as the identifier in the payload that refers to the secret being rotated. +Configure a `NotificationSource` with `type: Webhook` and a `webhook` block. The main field is **`identifierPathOnPayload`** (JSON path in the body where the secret name appears). -### Key Fields +### Key fields -* **path**: Specifies the Webhook path that the Reloader will listen to. This is the endpoint where Webhook notifications will be received. -* **identifierPathOnPayload**: Defines the key in the payload that contains the secret identifier. The identifier must match the name of the secret being rotated. By default, the path is `0.data.ObjectName`. +* **identifierPathOnPayload**: JSON path in the POST body for the secret identifier. It must match the name of the secret being rotated. If omitted, the default path is `0.data.ObjectName`. +* **webhookAuth** (optional): Basic or bearer authentication for incoming requests. +* **retryPolicy** (optional): Retry failed publishes to the internal event channel. -### Payload Structure +### Payload structure -The Webhook notification must contain a payload with a secret identifier. The Reloader will extract this identifier based on the path defined in the configuration. +The POST body must be JSON containing the secret identifier at the configured path. -#### Example Payload +#### Example payload ```json { @@ -27,14 +36,14 @@ The Webhook notification must contain a payload with a secret identifier. The Re } ``` -In this example, the Webhook payload contains a secret identifier at `0.data.ObjectName`, which corresponds to the secret named `my-secret`. The Reloader will use this identifier to rotate the appropriate secret. +Here the identifier is at `0.data.ObjectName`, matching the secret name `my-secret`. ### Triggering a webhook notification -To trigger a secret rotation, send an HTTP POST request to the Webhook endpoint you've configured. +Send an HTTP POST to the Reloader webhook base URL with path `/webhook/`. ```bash -curl -X POST https://your-rotator-endpoint/webhook \ +curl -X POST "http://:/webhook/my-reloader-config" \ -H "Content-Type: application/json" \ -d '{ "0": { @@ -45,6 +54,12 @@ curl -X POST https://your-rotator-endpoint/webhook \ }' ``` -Once this request is received by the Reloader, it will extract the secret identifier and proceed with the rotation process for the specified secret. +Replace `my-reloader-config` with the `metadata.name` of your `Config` CR. + +### Helm + +If you use the chart under `deploy/charts/reloader`, set **`service.webhook.enabled: true`**. The chart then adds **`--webhook-bind-address`** and a **`webhook`** container port using **`service.webhook.listenPort`** (default `8090`, aligned with the optional `*-webhook` Service). You can still override the flag with **`extraArgs`** if needed. + +There is no default “main” HTTP `Service` on port 8080. **`ingress.enabled`** requires **`service.webhook.enabled`**: the Ingress targets the **`{{ release }}-webhook`** Service on **`service.webhook.port`** (paths such as **`/webhook/...`**). -Any service that can call an endpoint can trigger the rotation as long as you configure the keys accordingly. +Any client that can reach the Service or host on that port can trigger rotation as long as the JSON path and optional auth match your `Config`. diff --git a/examples/manifests/keeper-security-webhook-to-external-secrets/config.yaml b/examples/manifests/keeper-security-webhook-to-external-secrets/config.yaml new file mode 100644 index 0000000..0bfc669 --- /dev/null +++ b/examples/manifests/keeper-security-webhook-to-external-secrets/config.yaml @@ -0,0 +1,14 @@ +apiVersion: reloader.external-secrets.io/v1alpha1 +kind: Config +metadata: + name: keeper # will listen on the webhook at /webhook/keeper +spec: + notificationSources: + - type: Webhook + webhook: + identifierPathOnPayload: "record_uid" + destinationsToWatch: + - type: ExternalSecret + externalSecret: + labelSelectors: + matchLabels: {} diff --git a/go.mod b/go.mod index 8037a40..0c1b0a3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/external-secrets/reloader -go 1.26.1 +go 1.26.2 require ( cloud.google.com/go/iam v1.8.0 diff --git a/internal/controller/reloader_controller.go b/internal/controller/reloader_controller.go index 8140e61..ac2d6fa 100644 --- a/internal/controller/reloader_controller.go +++ b/internal/controller/reloader_controller.go @@ -11,6 +11,7 @@ import ( "github.com/external-secrets/reloader/internal/events" "github.com/external-secrets/reloader/internal/handler" "github.com/external-secrets/reloader/internal/listener" + "github.com/external-secrets/reloader/internal/listener/webhook" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -38,6 +39,7 @@ type ReloaderReconciler struct { // Internal fields listenerManager *listener.Manager + webhookServer *webhook.WebhookServer // eventChan is a channel that transports SecretRotationEvent instances between various parts of the system, such as event handlers and listeners. eventChan chan events.SecretRotationEvent @@ -45,19 +47,20 @@ type ReloaderReconciler struct { } // NewReloaderReconciler creates a new ReloaderReconciler with the default factory. -func NewReloaderReconciler(client client.Client, scheme *runtime.Scheme) *ReloaderReconciler { +func NewReloaderReconciler(client client.Client, scheme *runtime.Scheme, hook *webhook.WebhookServer) *ReloaderReconciler { return &ReloaderReconciler{ - Client: client, - Scheme: scheme, - eventChan: make(chan events.SecretRotationEvent), - eventHandler: handler.NewEventHandler(client), + Client: client, + Scheme: scheme, + webhookServer: hook, + eventChan: make(chan events.SecretRotationEvent), + eventHandler: handler.NewEventHandler(client), } } // SetupWithManager sets up the controller with the Manager. func (r *ReloaderReconciler) SetupWithManager(mgr ctrl.Manager) error { ctx, cancel := context.WithCancel(context.Background()) - r.listenerManager = listener.NewListenerManager(ctx, r.eventChan, r.Client, log.FromContext(ctx)) + r.listenerManager = listener.NewListenerManager(ctx, r.eventChan, r.Client, log.FromContext(ctx), r.webhookServer) // Start a goroutine to process events go r.processEvents(ctx) @@ -107,17 +110,18 @@ func (r *ReloaderReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if err := r.Get(ctx, req.NamespacedName, &cfg); err != nil { if apierrors.IsNotFound(err) { - if err := r.listenerManager.StopAll(); err != nil { + // Object is gone (e.g. after finalizer). Only tear down listeners for this Config — not all Configs. + manifestName := types.NamespacedName{ + Namespace: req.Namespace, + Name: req.Name, + } + if err := r.listenerManager.ManageListeners(manifestName, nil); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } - - // Error reading the object - requeue the request. - if !apierrors.IsNotFound(err) { - logger.Error(err, "unable to fetch Reloader deployment") - return ctrl.Result{}, err - } + logger.Error(err, "unable to fetch Config") + return ctrl.Result{}, err } if cfg.DeletionTimestamp != nil && controllerutil.ContainsFinalizer(&cfg, reloaderFinalizer) { // Handle any cleanup logic here, as this is a DELETE request diff --git a/internal/controller/reloader_controller_test.go b/internal/controller/reloader_controller_test.go index 57e04c7..7c215ed 100644 --- a/internal/controller/reloader_controller_test.go +++ b/internal/controller/reloader_controller_test.go @@ -57,7 +57,7 @@ var _ = Describe("Reloader Controller", func() { fakeClient = fake.NewClientBuilder().WithScheme(scheme).Build() eventChan = make(chan events.SecretRotationEvent, 10) - manager := listener.NewListenerManager(ctx, eventChan, fakeClient, log.FromContext(ctx)) + manager := listener.NewListenerManager(ctx, eventChan, fakeClient, log.FromContext(ctx), nil) eventHandler := handler.NewEventHandler(fakeClient) reconciler = &ReloaderReconciler{ diff --git a/internal/listener/manager.go b/internal/listener/manager.go index d1b8417..ce06e7d 100644 --- a/internal/listener/manager.go +++ b/internal/listener/manager.go @@ -11,6 +11,7 @@ import ( esov1alpha1 "github.com/external-secrets/reloader/api/v1alpha1" "github.com/external-secrets/reloader/internal/events" "github.com/external-secrets/reloader/internal/listener/schema" + "github.com/external-secrets/reloader/internal/listener/webhook" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -18,21 +19,23 @@ import ( // Manager manages event listeners for secret rotation events. It coordinates the creation, starting, and stopping of listeners. type Manager struct { - context context.Context - client client.Client - eventChan chan events.SecretRotationEvent - listeners map[types.NamespacedName]map[string]schema.Listener - mu sync.Mutex - logger logr.Logger + context context.Context + client client.Client + eventChan chan events.SecretRotationEvent + listeners map[types.NamespacedName]map[string]schema.Listener + mu sync.Mutex + logger logr.Logger + webhookServer *webhook.WebhookServer } -func NewListenerManager(ctx context.Context, eventChan chan events.SecretRotationEvent, client client.Client, logger logr.Logger) *Manager { +func NewListenerManager(ctx context.Context, eventChan chan events.SecretRotationEvent, client client.Client, logger logr.Logger, hook *webhook.WebhookServer) *Manager { return &Manager{ - context: ctx, - eventChan: eventChan, - client: client, - listeners: make(map[types.NamespacedName]map[string]schema.Listener), - logger: logger, + context: ctx, + eventChan: eventChan, + client: client, + listeners: make(map[types.NamespacedName]map[string]schema.Listener), + logger: logger, + webhookServer: hook, } } @@ -69,26 +72,57 @@ func (lm *Manager) ManageListeners(manifestName types.NamespacedName, sources [] // Add new listeners for key, source := range desiredListeners { - if _, exists := lm.listeners[manifestName][key]; !exists { - lm.logger.Info("Creating new eventListener", "key", key, "type", source.Type) - prov := schema.GetProvider(source.Type) - if prov == nil { - lm.logger.Error(nil, "failed to get provider", "type", source.Type) + if _, exists := lm.listeners[manifestName][key]; exists { + if source.Type == schema.WEBHOOK && lm.webhookServer != nil && source.Webhook != nil { + lm.webhookServer.Register(manifestName.Name, lm.context, source.Webhook, lm.client, lm.eventChan, lm.logger) + lm.logger.V(1).Info("updated webhook registration", "manifest", manifestName, "key", key) + } else { + lm.logger.V(1).Info("listener already exists", "key", key) + } + continue + } + lm.logger.Info("Creating new eventListener", "key", key, "type", source.Type) + if source.Type == schema.WEBHOOK { + if lm.webhookServer == nil { + lm.logger.Error(nil, "webhook server is not configured; cannot start Webhook listener") continue } - eventListener, err := prov.CreateListener(lm.context, &source, lm.client, lm.eventChan, lm.logger) - if err != nil { - lm.logger.Error(err, "failed to create listener", "key", key) + if source.Webhook == nil { + lm.logger.Error(nil, "webhook config is nil") continue } + eventListener := webhook.NewWebhookListener( + lm.webhookServer, + manifestName.Name, + lm.context, + source.Webhook, + lm.client, + lm.eventChan, + lm.logger, + ) if err := eventListener.Start(); err != nil { lm.logger.Error(err, "failed to start listener", "key", key) continue } lm.listeners[manifestName][key] = eventListener - } else { - lm.logger.V(1).Info("listener already exists", "key", key) + continue + } + + prov := schema.GetProvider(source.Type) + if prov == nil { + lm.logger.Error(nil, "failed to get provider", "type", source.Type) + continue } + eventListener, err := prov.CreateListener(lm.context, &source, lm.client, lm.eventChan, lm.logger) + if err != nil { + lm.logger.Error(err, "failed to create listener", "key", key) + continue + } + if err := eventListener.Start(); err != nil { + lm.logger.Error(err, "failed to start listener", "key", key) + continue + } + lm.listeners[manifestName][key] = eventListener } // cleanup if empty if len(lm.listeners[manifestName]) == 0 { @@ -122,6 +156,10 @@ func (lm *Manager) StopAll() error { // generateListenerKey creates a unique key for a NotificationSource based on its Type and configuration. func generateListenerKey(source esov1alpha1.NotificationSource) (string, error) { + if source.Type == schema.WEBHOOK { + return schema.WEBHOOK, nil + } + // Marshal the specific configuration based on the Type var config any switch source.Type { @@ -131,8 +169,6 @@ func generateListenerKey(source esov1alpha1.NotificationSource) (string, error) config = source.AzureEventGrid case schema.GOOGLE_PUB_SUB: config = source.GooglePubSub - case schema.WEBHOOK: - config = source.Webhook case schema.HASHICORP_VAULT: config = source.HashicorpVault case schema.TCP_SOCKET: diff --git a/internal/listener/manager_test.go b/internal/listener/manager_test.go index 1c4126a..79cec1a 100644 --- a/internal/listener/manager_test.go +++ b/internal/listener/manager_test.go @@ -80,3 +80,32 @@ func TestGenerateListenerKey_KubernetesConfigMap_UnsupportedType(t *testing.T) { t.Fatal("expected error for unsupported type") } } + +func TestGenerateListenerKey_Webhook(t *testing.T) { + s1 := esov1alpha1.NotificationSource{ + Type: schema.WEBHOOK, + Webhook: &esov1alpha1.WebhookConfig{ + SecretIdentifierOnPayload: "custom.path", + }, + } + s2 := esov1alpha1.NotificationSource{ + Type: schema.WEBHOOK, + Webhook: &esov1alpha1.WebhookConfig{ + SecretIdentifierOnPayload: "other.path", + }, + } + key1, err := generateListenerKey(s1) + if err != nil { + t.Fatalf("generateListenerKey: %v", err) + } + key2, err := generateListenerKey(s2) + if err != nil { + t.Fatalf("generateListenerKey s2: %v", err) + } + if key1 != schema.WEBHOOK || key2 != schema.WEBHOOK { + t.Errorf("expected key %q, got %q and %q", schema.WEBHOOK, key1, key2) + } + if key1 != key2 { + t.Errorf("webhook key should be stable for a shared route: %q vs %q", key1, key2) + } +} diff --git a/internal/listener/register.go b/internal/listener/register.go index 407aa5c..d93a737 100644 --- a/internal/listener/register.go +++ b/internal/listener/register.go @@ -9,5 +9,4 @@ import ( _ "github.com/external-secrets/reloader/internal/listener/pubsub" _ "github.com/external-secrets/reloader/internal/listener/sqs" _ "github.com/external-secrets/reloader/internal/listener/tcp" - _ "github.com/external-secrets/reloader/internal/listener/webhook" ) diff --git a/internal/listener/webhook/listener.go b/internal/listener/webhook/listener.go index 44c6bbb..f024099 100644 --- a/internal/listener/webhook/listener.go +++ b/internal/listener/webhook/listener.go @@ -2,361 +2,54 @@ package webhook import ( "context" - "encoding/base64" - "errors" - "fmt" - "io" - "math" - "net/http" - "strconv" - "strings" - "time" v1alpha1 "github.com/external-secrets/reloader/api/v1alpha1" "github.com/external-secrets/reloader/internal/events" "github.com/external-secrets/reloader/internal/listener/schema" - "github.com/external-secrets/reloader/internal/util" "github.com/go-logr/logr" - "github.com/tidwall/gjson" "sigs.k8s.io/controller-runtime/pkg/client" ) -const defautlIdentifierPath = "0.data.ObjectName" -const defaultServerAddress = ":8090" -const defaultPath = "/webhook" -const maxPortNumber = 65535 -const defaultMaxRetries = 10 - -type RetryMessage struct { - event events.SecretRotationEvent - currentRun int - retryAt time.Time -} - -// WebhookListener listens for webhook notifications to handle secret rotation events. +// WebhookListener implements schema.Listener for one Config's webhook route on the shared WebhookServer. type WebhookListener struct { - ctx context.Context - cancel context.CancelFunc - config *v1alpha1.WebhookConfig + server *WebhookServer + configName string + routeCtx context.Context + cfg *v1alpha1.WebhookConfig + client client.Client eventChan chan events.SecretRotationEvent - server *http.Server logger logr.Logger - client client.Client - retryQueue chan *RetryMessage -} - -// Start initiates the WebhookListener to begin listening for incoming webhook requests. -func (h *WebhookListener) Start() error { - h.logger.Info("Starting Webhook Listener...") - - // Only handle errors if policy is configured - if h.config != nil && h.config.RetryPolicy != nil { - go h.handleErrors() - } - go func() { - defer h.logger.Info("Stopping Webhook Listener...") - err := h.server.ListenAndServe() - if err == http.ErrServerClosed { - return - } - - <-h.ctx.Done() - }() - return nil -} - -// Stop gracefully shuts down the WebhookListener by closing the stopChan channel which triggers the termination process. -func (h *WebhookListener) Stop() error { - close(h.retryQueue) - h.stopRetryQueue() - h.cancel() - return h.server.Close() -} - -func (h *WebhookListener) webhookHandler(w http.ResponseWriter, r *http.Request) { - var err error - err = h.authenticate(r.Header) - if err != nil { - h.logger.Error(err, "Couldn't authenticate request") - w.WriteHeader(http.StatusUnauthorized) - _, _ = fmt.Fprintln(w, "Couldn't authenticate request") - return - } - - payload, err := parsePayloadToString(r.Body) - if err != nil { - h.logger.Error(err, "Couldn't parse event payload") - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintln(w, "Couldn't decode webhook payload. Send a valid json") - return - } - - err = r.Body.Close() - if err != nil { - h.logger.Error(err, "Error closing the payload body") - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintln(w, "Couldn't decode webhook payload. Send a valid json") - return - } - - identifierPath := h.getIdentifierPath() - - secretIdentifier, err := getSecretIdentifierFromPayload(payload, identifierPath) - if err != nil { - message := fmt.Sprintf("Secret Identifier not found on payload."+ - "Ensure that your secret is on the following path: %s", identifierPath) - h.logger.Error(err, message) - - w.WriteHeader(http.StatusBadRequest) - _, _ = fmt.Fprintln(w, message) - return - } - - if event, err := h.processSecret(secretIdentifier); err != nil { - message := "Failed to process event" - h.logger.Error(err, message) - w.WriteHeader(http.StatusInternalServerError) - _, _ = fmt.Fprintln(w, message) - - if h.config != nil && h.config.RetryPolicy != nil { - h.retryQueue <- &RetryMessage{event: event, currentRun: 1, retryAt: time.Now()} - } - return - } - - w.WriteHeader(http.StatusNoContent) - _, _ = fmt.Fprintln(w, "") -} - -func (h *WebhookListener) authenticate(header http.Header) error { - if h.config == nil || h.config.Auth == nil { - return nil - } - - basicAuth := h.config.Auth.BasicAuth - bearer := h.config.Auth.BearerToken - - if basicAuth == nil && bearer == nil { - return nil - } - - authHeader := strings.Split(header.Get("Authorization"), " ") - if len(authHeader) != 2 { - return errors.New("malformed authorization header. Use `Bearer ` or `Basic `") - } - - if basicAuth != nil && strings.Contains(strings.ToLower(authHeader[0]), "basic") { - return authenticateWithBasicAuth(h.ctx, h.client, authHeader[1], basicAuth, h.logger) - } - - return authenticateWithBearer(h.ctx, h.client, authHeader[1], bearer, h.logger) -} - -func (h *WebhookListener) createHandler() { - path := defaultPath - if h.config != nil && h.config.Path != "" { - path = h.config.Path - if path[0] != '/' { - path = "/" + path - } - } - mux := http.NewServeMux() - mux.HandleFunc(fmt.Sprintf("POST %s", path), h.webhookHandler) - h.server.Handler = mux -} - -func (h *WebhookListener) getIdentifierPath() string { - identifierPath := defautlIdentifierPath - if h.config != nil && h.config.SecretIdentifierOnPayload != "" { - identifierPath = h.config.SecretIdentifierOnPayload - } - - return identifierPath -} - -func (h *WebhookListener) handleErrors() { - maxRetries := min(h.config.RetryPolicy.MaxRetries, defaultMaxRetries) - for message := range h.retryQueue { - beforeOrNow := message.retryAt.Compare(time.Now()) <= 0 - - if beforeOrNow { - err := h.processEvent(message.event) - if err == nil { - h.logger.Info(fmt.Sprintf( - "Message for '%s' successfully processed after %d retries", - message.event.SecretIdentifier, - message.currentRun, - )) - // if message was processed, drop it - continue - } - - if message.currentRun >= maxRetries { - h.logger.Error(err, fmt.Sprintf( - "Message for '%s' was not processed after %d retries", - message.event.SecretIdentifier, - message.currentRun, - )) - // if message was retried (successfully or not) up to max times, drop it - - continue - } - - message.currentRun++ - message.retryAt = getNextRetryAt(h.config.RetryPolicy.Algorithm, message.currentRun) - } - h.retryQueue <- message - } -} - -func (h *WebhookListener) stopRetryQueue() { - h.logger.Info("Processing all messages left for retry ignoring algorithm") - for message := range h.retryQueue { - - err := h.processEvent(message.event) - if err == nil { - h.logger.Info(fmt.Sprintf( - "Message for '%s' successfully processed after %d retries", - message.event.SecretIdentifier, - message.currentRun, - )) - } else { - h.logger.Info("Message for '%s' was not processed") - - } - } -} - -func getNextRetryAt(algorithm string, currentRun int) time.Time { - if algorithm == "linear" { - return time.Now().Add(time.Second) - } - - duration := time.Duration(math.Exp2(float64(currentRun))) - return time.Now().Add(time.Second * duration) -} - -func parsePayloadToString(body io.ReadCloser) (string, error) { - b, err := io.ReadAll(body) - if err != nil { - return "", err - } - - return string(b), nil -} - -func getSecretIdentifierFromPayload(payload, identifierPath string) (string, error) { - secretIdentifier := gjson.Get(payload, identifierPath) - - if !secretIdentifier.Exists() { - err := errors.New("secret not found on event") - return "", err - } - - return secretIdentifier.String(), nil -} - -func (h *WebhookListener) processSecret(secretIdentifier string) (events.SecretRotationEvent, error) { - event := events.SecretRotationEvent{ - SecretIdentifier: secretIdentifier, - RotationTimestamp: time.Now().Format("2006-01-02-15-04-05.000"), - TriggerSource: schema.WEBHOOK, - } - return event, h.processEvent(event) -} - -func (h *WebhookListener) processEvent(event events.SecretRotationEvent) error { - select { - case h.eventChan <- event: - h.logger.Info("Published event to eventChan", "Event", event) - return nil - case <-h.ctx.Done(): - return h.ctx.Err() - } } -func createServer(config *v1alpha1.WebhookConfig) (*http.Server, error) { - address := defaultServerAddress - if config != nil && config.Address != "" { - err := validateAddress(config.Address) - if err != nil { - return nil, err - } - address = config.Address - } - - return &http.Server{Addr: address}, nil -} - -func validateAddress(address string) error { - splitAddress := strings.Split(address, ":") - lengthSplit := len(splitAddress) - if lengthSplit > 2 { - return errors.New("address should contain single colon. Use the format `[host]:port`, with optional host") - } - - port := splitAddress[0] - if lengthSplit == 2 { - port = splitAddress[1] - } - - intPort, err := strconv.Atoi(port) - if err != nil || intPort > maxPortNumber || intPort < 1 { - return fmt.Errorf("port should be an integer between 1 and %d", maxPortNumber) - } - +// NewWebhookListener returns a schema.Listener that registers/unregisters one route on the shared server. +func NewWebhookListener( + server *WebhookServer, + configName string, + routeCtx context.Context, + cfg *v1alpha1.WebhookConfig, + k8sClient client.Client, + eventChan chan events.SecretRotationEvent, + logger logr.Logger, +) schema.Listener { + return &WebhookListener{ + server: server, + configName: configName, + routeCtx: routeCtx, + cfg: cfg, + client: k8sClient, + eventChan: eventChan, + logger: logger, + } +} + +// Start registers the route on the shared server. +func (l *WebhookListener) Start() error { + l.server.Register(l.configName, l.routeCtx, l.cfg, l.client, l.eventChan, l.logger) return nil } -func authenticateWithBasicAuth(ctx context.Context, k8sClient client.Client, requestToken string, basicAuth *v1alpha1.BasicAuth, logger logr.Logger) error { - username, err := decodeSecret(ctx, k8sClient, &basicAuth.UsernameSecretRef, logger) - if err != nil { - return err - } - - password, err := decodeSecret(ctx, k8sClient, &basicAuth.PasswordSecretRef, logger) - if err != nil { - return err - } - - userPwOnRequest, err := base64.StdEncoding.DecodeString(requestToken) - if err != nil { - return err - } - - storedUserPw := fmt.Sprintf("%s:%s", username, password) - - if storedUserPw != string(userPwOnRequest) { - return errors.New("invalid token. unauthenticated request") - } - +// Stop unregisters the route. +func (l *WebhookListener) Stop() error { + l.server.Unregister(l.configName) return nil } - -func authenticateWithBearer(ctx context.Context, k8sClient client.Client, requestToken string, bearer *v1alpha1.BearerToken, logger logr.Logger) error { - token, err := decodeSecret(ctx, k8sClient, &bearer.BearerTokenSecretRef, logger) - if err != nil { - return err - } - - if token != requestToken { - return errors.New("invalid token. unauthenticated request") - } - - return nil -} - -func decodeSecret(ctx context.Context, k8sClient client.Client, config *v1alpha1.SecretKeySelector, logger logr.Logger) (string, error) { - secret, err := util.GetSecret(ctx, k8sClient, config.Name, config.Namespace, logger) - if err != nil { - return "", err - } - - secretBytes, ok := secret.Data[config.Key] - if !ok { - return "", fmt.Errorf("%s not found in secret %s", config.Key, config.Name) - - } - - return string(secretBytes), nil -} diff --git a/internal/listener/webhook/provider.go b/internal/listener/webhook/provider.go deleted file mode 100644 index bbd58d1..0000000 --- a/internal/listener/webhook/provider.go +++ /dev/null @@ -1,49 +0,0 @@ -package webhook - -import ( - "context" - "errors" - "fmt" - - v1alpha1 "github.com/external-secrets/reloader/api/v1alpha1" - "github.com/external-secrets/reloader/internal/events" - "github.com/external-secrets/reloader/internal/listener/schema" - "github.com/go-logr/logr" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -type Provider struct { -} - -// NewWebhookListener creates a new Listener that listens for webhook notifications based on the provided configuration and event channel. -func (p *Provider) CreateListener(ctx context.Context, config *v1alpha1.NotificationSource, client client.Client, eventChan chan events.SecretRotationEvent, logger logr.Logger) (schema.Listener, error) { - if config == nil || config.Webhook == nil { - return nil, errors.New("webhook config is nil") - } - server, err := createServer(config.Webhook) - if err != nil { - logger.Error(err, "failed to create webhook server") - return nil, fmt.Errorf("failed to create webhook server: %w", err) - } - - childCtx, cancel := context.WithCancel(ctx) - - listener := &WebhookListener{ - config: config.Webhook, - eventChan: eventChan, - ctx: childCtx, - cancel: cancel, - logger: logger, - server: server, - client: client, - retryQueue: make(chan *RetryMessage), - } - - listener.createHandler() - - return listener, nil -} - -func init() { - schema.RegisterProvider(schema.WEBHOOK, &Provider{}) -} diff --git a/internal/listener/webhook/route.go b/internal/listener/webhook/route.go new file mode 100644 index 0000000..616eb66 --- /dev/null +++ b/internal/listener/webhook/route.go @@ -0,0 +1,308 @@ +package webhook + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "math" + "net/http" + "strings" + "sync" + "time" + + v1alpha1 "github.com/external-secrets/reloader/api/v1alpha1" + "github.com/external-secrets/reloader/internal/events" + "github.com/external-secrets/reloader/internal/listener/schema" + "github.com/external-secrets/reloader/internal/util" + "github.com/go-logr/logr" + "github.com/tidwall/gjson" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const defaultIdentifierPath = "0.data.ObjectName" +const defaultMaxRetries = 10 + +// route holds the per-Config handler state for a single webhook endpoint. +type route struct { + configName string + config *v1alpha1.WebhookConfig + eventChan chan events.SecretRotationEvent + client client.Client + logger logr.Logger + retryQueue chan *retryMessage + ctx context.Context + cancel context.CancelFunc + retryWG sync.WaitGroup +} + +type retryMessage struct { + event events.SecretRotationEvent + currentRun int + retryAt time.Time +} + +func newRoute(parentCtx context.Context, configName string, cfg *v1alpha1.WebhookConfig, k8sClient client.Client, eventChan chan events.SecretRotationEvent, logger logr.Logger) *route { + ctx, cancel := context.WithCancel(parentCtx) + r := &route{ + configName: configName, + config: cfg, + eventChan: eventChan, + client: k8sClient, + logger: logger.WithName("webhook-route").WithValues("config", configName), + retryQueue: make(chan *retryMessage), + ctx: ctx, + cancel: cancel, + } + if cfg != nil && cfg.RetryPolicy != nil { + r.retryWG.Add(1) + go func() { + defer r.retryWG.Done() + r.handleRetries() + }() + } + return r +} + +func (r *route) shutdown() { + r.cancel() + r.retryWG.Wait() + close(r.retryQueue) + r.drainRetryQueue() +} + +func (r *route) handle(w http.ResponseWriter, req *http.Request) { + if err := r.authenticate(req.Header); err != nil { + r.logger.Error(err, "Couldn't authenticate request") + w.WriteHeader(http.StatusUnauthorized) + _, _ = fmt.Fprintln(w, "Couldn't authenticate request") + return + } + + payload, err := parsePayloadToString(req.Body) + if err != nil { + r.logger.Error(err, "Couldn't parse event payload") + w.WriteHeader(http.StatusBadRequest) + _, _ = fmt.Fprintln(w, "Couldn't decode webhook payload. Send a valid json") + return + } + _ = req.Body.Close() + + identifierPath := r.getIdentifierPath() + + secretIdentifier, err := getSecretIdentifierFromPayload(payload, identifierPath) + if err != nil { + message := fmt.Sprintf("Secret Identifier not found on payload. "+ + "Ensure that your secret is on the following path: %s", identifierPath) + r.logger.Error(err, message) + w.WriteHeader(http.StatusBadRequest) + _, _ = fmt.Fprintln(w, message) + return + } + + if event, err := r.processSecret(secretIdentifier); err != nil { + r.logger.Error(err, "Failed to process event") + w.WriteHeader(http.StatusInternalServerError) + _, _ = fmt.Fprintln(w, "Failed to process event") + + if r.config != nil && r.config.RetryPolicy != nil { + select { + case r.retryQueue <- &retryMessage{event: event, currentRun: 1, retryAt: time.Now()}: + case <-r.ctx.Done(): + } + } + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (r *route) authenticate(header http.Header) error { + if r.config == nil || r.config.Auth == nil { + return nil + } + + basicAuth := r.config.Auth.BasicAuth + bearer := r.config.Auth.BearerToken + + if basicAuth == nil && bearer == nil { + return nil + } + + authHeader := strings.SplitN(header.Get("Authorization"), " ", 2) + if len(authHeader) != 2 { + return errors.New("malformed authorization header. Use `Bearer ` or `Basic `") + } + + if basicAuth != nil && strings.EqualFold(authHeader[0], "basic") { + return authenticateWithBasicAuth(r.ctx, r.client, authHeader[1], basicAuth, r.logger) + } + + return authenticateWithBearer(r.ctx, r.client, authHeader[1], bearer, r.logger) +} + +func (r *route) getIdentifierPath() string { + if r.config != nil && r.config.SecretIdentifierOnPayload != "" { + return r.config.SecretIdentifierOnPayload + } + return defaultIdentifierPath +} + +func (r *route) processSecret(secretIdentifier string) (events.SecretRotationEvent, error) { + event := events.SecretRotationEvent{ + SecretIdentifier: secretIdentifier, + RotationTimestamp: time.Now().Format("2006-01-02-15-04-05.000"), + TriggerSource: schema.WEBHOOK, + } + return event, r.processEvent(event) +} + +func (r *route) processEvent(event events.SecretRotationEvent) error { + select { + case r.eventChan <- event: + r.logger.Info("Published event to eventChan", "Event", event) + return nil + case <-r.ctx.Done(): + return r.ctx.Err() + } +} + +func (r *route) handleRetries() { + maxRetries := min(r.config.RetryPolicy.MaxRetries, defaultMaxRetries) + for { + select { + case <-r.ctx.Done(): + return + case message, ok := <-r.retryQueue: + if !ok { + return + } + if r.ctx.Err() != nil { + return + } + + beforeOrNow := message.retryAt.Compare(time.Now()) <= 0 + if beforeOrNow { + err := r.processEvent(message.event) + if err == nil { + r.logger.Info(fmt.Sprintf( + "Message for '%s' successfully processed after %d retries", + message.event.SecretIdentifier, + message.currentRun, + )) + continue + } + + if message.currentRun >= maxRetries { + r.logger.Error(err, fmt.Sprintf( + "Message for '%s' was not processed after %d retries", + message.event.SecretIdentifier, + message.currentRun, + )) + continue + } + + message.currentRun++ + message.retryAt = getNextRetryAt(r.config.RetryPolicy.Algorithm, message.currentRun) + } + select { + case r.retryQueue <- message: + case <-r.ctx.Done(): + return + } + } + } +} + +func (r *route) drainRetryQueue() { + r.logger.Info("Draining retry queue") + for message := range r.retryQueue { + err := r.processEvent(message.event) + if err == nil { + r.logger.Info(fmt.Sprintf( + "Message for '%s' successfully processed after %d retries", + message.event.SecretIdentifier, + message.currentRun, + )) + } else { + r.logger.Info(fmt.Sprintf("Message for '%s' was not processed", message.event.SecretIdentifier)) + } + } +} + +func getNextRetryAt(algorithm string, currentRun int) time.Time { + if algorithm == "linear" { + return time.Now().Add(time.Second) + } + duration := time.Duration(math.Exp2(float64(currentRun))) + return time.Now().Add(time.Second * duration) +} + +func parsePayloadToString(body io.ReadCloser) (string, error) { + b, err := io.ReadAll(body) + if err != nil { + return "", err + } + return string(b), nil +} + +func getSecretIdentifierFromPayload(payload, identifierPath string) (string, error) { + secretIdentifier := gjson.Get(payload, identifierPath) + if !secretIdentifier.Exists() { + return "", errors.New("secret not found on event") + } + return secretIdentifier.String(), nil +} + +func authenticateWithBasicAuth(ctx context.Context, k8sClient client.Client, requestToken string, basicAuth *v1alpha1.BasicAuth, logger logr.Logger) error { + username, err := decodeSecret(ctx, k8sClient, &basicAuth.UsernameSecretRef, logger) + if err != nil { + return err + } + + password, err := decodeSecret(ctx, k8sClient, &basicAuth.PasswordSecretRef, logger) + if err != nil { + return err + } + + userPwOnRequest, err := base64.StdEncoding.DecodeString(requestToken) + if err != nil { + return err + } + + storedUserPw := fmt.Sprintf("%s:%s", username, password) + + if storedUserPw != string(userPwOnRequest) { + return errors.New("invalid token. unauthenticated request") + } + + return nil +} + +func authenticateWithBearer(ctx context.Context, k8sClient client.Client, requestToken string, bearer *v1alpha1.BearerToken, logger logr.Logger) error { + token, err := decodeSecret(ctx, k8sClient, &bearer.BearerTokenSecretRef, logger) + if err != nil { + return err + } + + if token != requestToken { + return errors.New("invalid token. unauthenticated request") + } + + return nil +} + +func decodeSecret(ctx context.Context, k8sClient client.Client, config *v1alpha1.SecretKeySelector, logger logr.Logger) (string, error) { + secret, err := util.GetSecret(ctx, k8sClient, config.Name, config.Namespace, logger) + if err != nil { + return "", err + } + + secretBytes, ok := secret.Data[config.Key] + if !ok { + return "", fmt.Errorf("%s not found in secret %s", config.Key, config.Name) + } + + return string(secretBytes), nil +} diff --git a/internal/listener/webhook/server.go b/internal/listener/webhook/server.go new file mode 100644 index 0000000..b853ca6 --- /dev/null +++ b/internal/listener/webhook/server.go @@ -0,0 +1,135 @@ +package webhook + +import ( + "context" + "fmt" + "net/http" + "sync" + "time" + + v1alpha1 "github.com/external-secrets/reloader/api/v1alpha1" + "github.com/external-secrets/reloader/internal/events" + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const webhookRoutePrefix = "/webhook/" + +// WebhookServer manages a single shared HTTP server for all webhook Config CRs. +// Each Config gets a route at POST /webhook/{config-name}. +type WebhookServer struct { + addr string + server *http.Server + routes map[string]*route + mu sync.RWMutex + logger logr.Logger + httpShutdownMu sync.Once +} + +// NewWebhookServer constructs a server; call Start to listen. +func NewWebhookServer(addr string, logger logr.Logger) *WebhookServer { + s := &WebhookServer{ + addr: addr, + routes: make(map[string]*route), + logger: logger.WithName("webhook-server"), + } + s.server = &http.Server{ + Addr: addr, + Handler: s.buildMux(), + } + return s +} + +// Start implements manager.Runnable: listens until ctx is cancelled, then shuts down gracefully. +func (s *WebhookServer) Start(ctx context.Context) error { + s.logger.Info("Starting shared webhook server", "addr", s.addr) + errCh := make(chan error, 1) + go func() { + err := s.server.ListenAndServe() + errCh <- err + }() + select { + case err := <-errCh: + if err != nil && err != http.ErrServerClosed { + return err + } + return nil + case <-ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = s.closeHTTPServer(shutdownCtx) + err := <-errCh + if err != nil && err != http.ErrServerClosed { + return err + } + return nil + } +} + +func (s *WebhookServer) closeHTTPServer(ctx context.Context) error { + var shutdownErr error + s.httpShutdownMu.Do(func() { + shutdownErr = s.server.Shutdown(ctx) + }) + return shutdownErr +} + +// Register adds or replaces the route for the given Config name. +func (s *WebhookServer) Register(configName string, routeCtx context.Context, cfg *v1alpha1.WebhookConfig, k8sClient client.Client, eventChan chan events.SecretRotationEvent, logger logr.Logger) { + s.mu.Lock() + defer s.mu.Unlock() + + if existing, ok := s.routes[configName]; ok { + existing.shutdown() + } + + r := newRoute(routeCtx, configName, cfg, k8sClient, eventChan, logger) + s.routes[configName] = r + s.server.Handler = s.buildMux() + + s.logger.Info("Registered webhook route", "config", configName, "path", webhookRoutePrefix+configName) +} + +// Unregister removes the route for the given Config name. +func (s *WebhookServer) Unregister(configName string) { + s.mu.Lock() + defer s.mu.Unlock() + + if r, ok := s.routes[configName]; ok { + r.shutdown() + delete(s.routes, configName) + s.server.Handler = s.buildMux() + s.logger.Info("Unregistered webhook route", "config", configName) + } +} + +// HasRoute reports whether a route exists for configName. +func (s *WebhookServer) HasRoute(configName string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + _, ok := s.routes[configName] + return ok +} + +// buildMux rebuilds the mux from current routes. Caller must hold s.mu (write lock). +func (s *WebhookServer) buildMux() *http.ServeMux { + mux := http.NewServeMux() + for name, r := range s.routes { + pattern := fmt.Sprintf("POST %s%s", webhookRoutePrefix, name) + h := r + mux.HandleFunc(pattern, recoverMiddleware(h.handle, s.logger)) + } + return mux +} + +func recoverMiddleware(next http.HandlerFunc, logger logr.Logger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer func() { + if rec := recover(); rec != nil { + logger.Error(fmt.Errorf("panic: %v", rec), "Recovered from panic in webhook handler") + w.WriteHeader(http.StatusInternalServerError) + } + }() + next(w, r) + } +} diff --git a/internal/listener/webhook/server_test.go b/internal/listener/webhook/server_test.go new file mode 100644 index 0000000..5fdd556 --- /dev/null +++ b/internal/listener/webhook/server_test.go @@ -0,0 +1,122 @@ +package webhook + +import ( + "context" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + v1alpha1 "github.com/external-secrets/reloader/api/v1alpha1" + "github.com/external-secrets/reloader/internal/events" + "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestRecoverMiddleware(t *testing.T) { + h := recoverMiddleware(func(http.ResponseWriter, *http.Request) { + panic("boom") + }, logr.Discard()) + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, "/", nil) + h(rec, req) + if rec.Code != http.StatusInternalServerError { + t.Fatalf("expected 500, got %d", rec.Code) + } +} + +func TestWebhookServer_RegisterHasRouteUnregister(t *testing.T) { + s := NewWebhookServer("127.0.0.1:1", logr.Discard()) + ctx := context.Background() + s.Register("cfg-a", ctx, &v1alpha1.WebhookConfig{}, nil, nil, logr.Discard()) + if !s.HasRoute("cfg-a") { + t.Fatal("expected route cfg-a") + } + s.Unregister("cfg-a") + if s.HasRoute("cfg-a") { + t.Fatal("expected route removed") + } +} + +func TestWebhookServer_ConcurrentRegister(t *testing.T) { + s := NewWebhookServer("127.0.0.1:1", logr.Discard()) + ctx := context.Background() + const n = 32 + done := make(chan struct{}) + for i := 0; i < n; i++ { + go func() { + defer func() { done <- struct{}{} }() + s.Register("same", ctx, &v1alpha1.WebhookConfig{ + SecretIdentifierOnPayload: "id", + }, nil, nil, logr.Discard()) + }() + } + for i := 0; i < n; i++ { + <-done + } + if !s.HasRoute("same") { + t.Fatal("expected one stable route") + } +} + +func waitTCP(t *testing.T, addr string) { + t.Helper() + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + c, err := net.DialTimeout("tcp", addr, 50*time.Millisecond) + if err == nil { + _ = c.Close() + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("address %s did not become reachable", addr) +} + +func TestWebhookServer_HTTPPostNoAuth(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + addr := ln.Addr().String() + _ = ln.Close() + + s := NewWebhookServer(addr, logr.Discard()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { _ = s.Start(ctx) }() + waitTCP(t, addr) + + eventCh := make(chan events.SecretRotationEvent, 2) + cl := fake.NewClientBuilder().Build() + + s.Register("myconfig", ctx, &v1alpha1.WebhookConfig{}, cl, eventCh, logr.Discard()) + + body := `{"0":{"data":{"ObjectName":"secret-one"}}}` + req, err := http.NewRequest(http.MethodPost, "http://"+addr+"/webhook/myconfig", strings.NewReader(body)) + if err != nil { + t.Fatal(err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + t.Fatalf("expected 204, got %d", resp.StatusCode) + } + select { + case ev := <-eventCh: + if ev.SecretIdentifier != "secret-one" { + t.Fatalf("unexpected event: %+v", ev) + } + default: + t.Fatal("expected event on channel") + } + + cancel() +}