From f1eb79d2787f3271074800e5cf8200c44f593285 Mon Sep 17 00:00:00 2001 From: Jordan Evans Date: Wed, 22 Apr 2026 11:26:27 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20add=20NATS=20KV=20cache=20bucket=20for?= =?UTF-8?q?=20username=E2=86=92sub=20lookups?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce the auth-service-username-sub-cache KV bucket (168h TTL) to cache Auth0 username→sub results, reducing Management API pressure and lookup latency. Cache is checked before each Auth0 call; a miss falls through to Auth0 and populates the cache on success. Issue: LFXV2-1561 Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: Jordan Evans --- .../templates/nats-kv-buckets.yaml | 20 ++++ charts/lfx-v2-auth-service/values.yaml | 20 ++++ cmd/server/service/providers.go | 4 + internal/infrastructure/nats/client.go | 35 +++--- internal/service/message_handler.go | 31 +++++ internal/service/message_handler_test.go | 106 ++++++++++++++++++ pkg/constants/storage.go | 3 + 7 files changed, 206 insertions(+), 13 deletions(-) diff --git a/charts/lfx-v2-auth-service/templates/nats-kv-buckets.yaml b/charts/lfx-v2-auth-service/templates/nats-kv-buckets.yaml index 23b288e..f199bfa 100644 --- a/charts/lfx-v2-auth-service/templates/nats-kv-buckets.yaml +++ b/charts/lfx-v2-auth-service/templates/nats-kv-buckets.yaml @@ -40,4 +40,24 @@ spec: maxBytes: {{ .Values.nats.authelia_email_otp_kv_bucket.maxBytes }} compression: {{ .Values.nats.authelia_email_otp_kv_bucket.compression }} ttl: {{ .Values.nats.authelia_email_otp_kv_bucket.ttl }} +{{- end }} +--- +{{- if .Values.nats.username_sub_cache_kv_bucket.creation }} +apiVersion: jetstream.nats.io/v1beta2 +kind: KeyValue +metadata: + name: {{ .Values.nats.username_sub_cache_kv_bucket.name }} + namespace: {{ .Release.Namespace }} + {{- if .Values.nats.username_sub_cache_kv_bucket.keep }} + annotations: + "helm.sh/resource-policy": keep + {{- end }} +spec: + bucket: {{ .Values.nats.username_sub_cache_kv_bucket.name }} + history: {{ .Values.nats.username_sub_cache_kv_bucket.history }} + storage: {{ .Values.nats.username_sub_cache_kv_bucket.storage }} + maxValueSize: {{ .Values.nats.username_sub_cache_kv_bucket.maxValueSize }} + maxBytes: {{ .Values.nats.username_sub_cache_kv_bucket.maxBytes }} + compression: {{ .Values.nats.username_sub_cache_kv_bucket.compression }} + ttl: {{ .Values.nats.username_sub_cache_kv_bucket.ttl }} {{- end }} \ No newline at end of file diff --git a/charts/lfx-v2-auth-service/values.yaml b/charts/lfx-v2-auth-service/values.yaml index d494ad8..142f0ab 100644 --- a/charts/lfx-v2-auth-service/values.yaml +++ b/charts/lfx-v2-auth-service/values.yaml @@ -95,6 +95,26 @@ nats: # ttl is the time-to-live for entries in the bucket (5 minutes for OTPs) ttl: 5m + username_sub_cache_kv_bucket: + # creation is a boolean to determine if the KV bucket should be created via the helm chart. + creation: true + # keep is a boolean to determine if the KV bucket should be preserved during helm uninstall + keep: true + # name is the name of the KV bucket for caching username → sub lookups + name: auth-service-username-sub-cache + # history is the number of history entries to keep (1 is sufficient for a cache) + history: 1 + # storage is the storage type for the KV bucket + storage: file + # maxValueSize is the maximum size of a value (sub strings are short) + maxValueSize: 512 # 512B + # maxBytes is the maximum number of bytes in the KV bucket + maxBytes: 10485760 # 10MB + # compression is a boolean to determine if the KV bucket should be compressed + compression: true + # ttl is the time-to-live for cache entries (7 days) + ttl: 168h + # serviceAccount is the configuration for the Kubernetes service account ## This will be used only if the USER_REPOSITORY_TYPE is authelia serviceAccount: diff --git a/cmd/server/service/providers.go b/cmd/server/service/providers.go index 47b2b2b..775591b 100644 --- a/cmd/server/service/providers.go +++ b/cmd/server/service/providers.go @@ -196,6 +196,10 @@ func QueueSubscriptions(ctx context.Context) error { service.WithEventPublisherForMessageHandler(natsClient), } + if kvStore, ok := natsClient.GetKVStore(constants.KVBucketNameUsernameSubCache); ok { + opts = append(opts, service.WithUsernameSubCacheForMessageHandler(kvStore)) + } + if os.Getenv(constants.UserRepositoryTypeEnvKey) == constants.UserRepositoryTypeAuth0 { auth0Domain := os.Getenv(constants.Auth0DomainEnvKey) if auth0Domain == "" { diff --git a/internal/infrastructure/nats/client.go b/internal/infrastructure/nats/client.go index 63885db..01a6939 100644 --- a/internal/infrastructure/nats/client.go +++ b/internal/infrastructure/nats/client.go @@ -168,24 +168,33 @@ func NewClient(ctx context.Context, config Config) (*NATSClient, error) { timeout: config.Timeout, } - var buckets []string - // Check if Authelia is enabled by checking the environment variable directly - if os.Getenv(constants.UserRepositoryTypeEnvKey) == constants.UserRepositoryTypeAuthelia { - buckets = append(buckets, constants.KVBucketNameAutheliaUsers) - buckets = append(buckets, constants.KVBucketNameAutheliaEmailOTP) + // The username→sub cache bucket is best-effort: if it doesn't exist or is + // unavailable the service still works, just without caching. + if err := client.KeyValueStore(ctx, constants.KVBucketNameUsernameSubCache); err != nil { + slog.WarnContext(ctx, "username→sub cache bucket unavailable, caching disabled", + "error", err, + "bucket", constants.KVBucketNameUsernameSubCache, + ) + } else { + slog.InfoContext(ctx, "NATS key-value store initialized", + "bucket", constants.KVBucketNameUsernameSubCache, + ) } - for _, bucketName := range buckets { - if err := client.KeyValueStore(ctx, bucketName); err != nil { - slog.ErrorContext(ctx, "failed to initialize NATS key-value store", - "error", err, + // Authelia-specific buckets are required when Authelia is the user repository. + if os.Getenv(constants.UserRepositoryTypeEnvKey) == constants.UserRepositoryTypeAuthelia { + for _, bucketName := range []string{constants.KVBucketNameAutheliaUsers, constants.KVBucketNameAutheliaEmailOTP} { + if err := client.KeyValueStore(ctx, bucketName); err != nil { + slog.ErrorContext(ctx, "failed to initialize NATS key-value store", + "error", err, + "bucket", bucketName, + ) + return nil, errors.NewServiceUnavailable("failed to initialize NATS key-value store", err) + } + slog.InfoContext(ctx, "NATS key-value store initialized", "bucket", bucketName, ) - return nil, errors.NewServiceUnavailable("failed to initialize NATS key-value store", err) } - slog.InfoContext(ctx, "NATS key-value store initialized", - "bucket", bucketName, - ) } slog.InfoContext(ctx, "NATS client created successfully", diff --git a/internal/service/message_handler.go b/internal/service/message_handler.go index 248acc3..69aad22 100644 --- a/internal/service/message_handler.go +++ b/internal/service/message_handler.go @@ -16,6 +16,7 @@ import ( "github.com/linuxfoundation/lfx-v2-auth-service/pkg/constants" errs "github.com/linuxfoundation/lfx-v2-auth-service/pkg/errors" "github.com/linuxfoundation/lfx-v2-auth-service/pkg/redaction" + "github.com/nats-io/nats.go/jetstream" ) // UserProfileUpdatedEvent is published after a successful user_metadata update. @@ -28,6 +29,12 @@ type UserProfileUpdatedEvent struct { Timestamp time.Time `json:"timestamp"` } +// kvCache is the minimal interface used for username→sub caching. +type kvCache interface { + Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error) + Put(ctx context.Context, key string, value []byte) (uint64, error) +} + // UserDataResponse represents the response structure for user update operations type UserDataResponse struct { Success bool `json:"success"` @@ -46,6 +53,7 @@ type messageHandlerOrchestrator struct { passwordHandler port.PasswordHandler impersonator port.Impersonator eventPublisher port.EventPublisher + usernameSubCache kvCache } // MessageHandlerOrchestratorOption defines a function type for setting options @@ -107,6 +115,13 @@ func WithEventPublisherForMessageHandler(eventPublisher port.EventPublisher) Mes } } +// WithUsernameSubCacheForMessageHandler sets the KV cache used for username→sub lookups +func WithUsernameSubCacheForMessageHandler(cache kvCache) MessageHandlerOrchestratorOption { + return func(m *messageHandlerOrchestrator) { + m.usernameSubCache = cache + } +} + func (m *messageHandlerOrchestrator) errorResponse(error string) []byte { response := UserDataResponse{ Success: false, @@ -192,11 +207,27 @@ func (m *messageHandlerOrchestrator) UsernameToSub(ctx context.Context, msg port return m.errorResponse("username is required"), nil } + // Cache-aside: check the KV cache before hitting Auth0. + if m.usernameSubCache != nil { + if entry, err := m.usernameSubCache.Get(ctx, username); err == nil { + slog.DebugContext(ctx, "username→sub cache hit", "username", redaction.Redact(username)) + return entry.Value(), nil + } + } + user := &model.User{Username: username} user, err := m.userReader.SearchUser(ctx, user, constants.CriteriaTypeUsername) if err != nil { return m.errorResponse(err.Error()), nil } + + // Populate the cache for future lookups. + if m.usernameSubCache != nil && user.UserID != "" { + if _, putErr := m.usernameSubCache.Put(ctx, username, []byte(user.UserID)); putErr != nil { + slog.WarnContext(ctx, "failed to cache username→sub", "username", redaction.Redact(username), "error", putErr) + } + } + return []byte(user.UserID), nil } diff --git a/internal/service/message_handler_test.go b/internal/service/message_handler_test.go index 90eeb9e..e4ce40a 100644 --- a/internal/service/message_handler_test.go +++ b/internal/service/message_handler_test.go @@ -9,11 +9,13 @@ import ( "fmt" "strings" "testing" + "time" "github.com/linuxfoundation/lfx-v2-auth-service/internal/domain/model" "github.com/linuxfoundation/lfx-v2-auth-service/pkg/constants" "github.com/linuxfoundation/lfx-v2-auth-service/pkg/converters" "github.com/linuxfoundation/lfx-v2-auth-service/pkg/errors" + natsjetstream "github.com/nats-io/nats.go/jetstream" ) // mockTransportMessenger is a mock implementation of port.TransportMessenger for testing @@ -1395,6 +1397,110 @@ func TestMessageHandlerOrchestrator_UpdateUser_EventPublishing(t *testing.T) { }) } +// mockKVEntry is a minimal implementation of jetstream.KeyValueEntry for testing. +type mockKVEntry struct { + value []byte +} + +func (e *mockKVEntry) Value() []byte { return e.value } +func (e *mockKVEntry) Key() string { return "" } +func (e *mockKVEntry) Bucket() string { return "" } +func (e *mockKVEntry) Revision() uint64 { return 0 } +func (e *mockKVEntry) Delta() uint64 { return 0 } +func (e *mockKVEntry) Created() time.Time { return time.Time{} } +func (e *mockKVEntry) Operation() natsjetstream.KeyValueOp { return 0 } + +// mockKVCache is a minimal mock for the kvCache interface. +type mockKVCache struct { + getFunc func(ctx context.Context, key string) (natsjetstream.KeyValueEntry, error) + putFunc func(ctx context.Context, key string, value []byte) (uint64, error) +} + +func (m *mockKVCache) Get(ctx context.Context, key string) (natsjetstream.KeyValueEntry, error) { + if m.getFunc != nil { + return m.getFunc(ctx, key) + } + return nil, natsjetstream.ErrKeyNotFound +} + +func (m *mockKVCache) Put(ctx context.Context, key string, value []byte) (uint64, error) { + if m.putFunc != nil { + return m.putFunc(ctx, key, value) + } + return 0, nil +} + +func TestMessageHandlerOrchestrator_UsernameToSub_CacheHit(t *testing.T) { + ctx := context.Background() + + searchCalled := false + cache := &mockKVCache{ + getFunc: func(_ context.Context, key string) (natsjetstream.KeyValueEntry, error) { + if key == "cacheduser" { + return &mockKVEntry{value: []byte("auth0|cached001")}, nil + } + return nil, natsjetstream.ErrKeyNotFound + }, + } + + orchestrator := NewMessageHandlerOrchestrator( + WithUserReaderForMessageHandler(&mockUserServiceReader{ + searchUserFunc: func(_ context.Context, _ *model.User, _ string) (*model.User, error) { + searchCalled = true + return nil, errors.NewUnexpected("should not be called") + }, + }), + WithUsernameSubCacheForMessageHandler(cache), + ) + + result, err := orchestrator.UsernameToSub(ctx, &mockTransportMessenger{data: []byte("cacheduser")}) + if err != nil { + t.Fatalf("UsernameToSub() unexpected error: %v", err) + } + if string(result) != "auth0|cached001" { + t.Errorf("UsernameToSub() = %q, want %q", string(result), "auth0|cached001") + } + if searchCalled { + t.Error("Auth0 SearchUser should not be called on cache hit") + } +} + +func TestMessageHandlerOrchestrator_UsernameToSub_CacheMiss(t *testing.T) { + ctx := context.Background() + + putKey, putValue := "", "" + cache := &mockKVCache{ + getFunc: func(_ context.Context, _ string) (natsjetstream.KeyValueEntry, error) { + return nil, natsjetstream.ErrKeyNotFound + }, + putFunc: func(_ context.Context, key string, value []byte) (uint64, error) { + putKey = key + putValue = string(value) + return 1, nil + }, + } + + orchestrator := NewMessageHandlerOrchestrator( + WithUserReaderForMessageHandler(&mockUserServiceReader{ + searchUserFunc: func(_ context.Context, user *model.User, _ string) (*model.User, error) { + return &model.User{UserID: "auth0|fresh001", Username: user.Username}, nil + }, + }), + WithUsernameSubCacheForMessageHandler(cache), + ) + + result, err := orchestrator.UsernameToSub(ctx, &mockTransportMessenger{data: []byte("freshuser")}) + if err != nil { + t.Fatalf("UsernameToSub() unexpected error: %v", err) + } + if string(result) != "auth0|fresh001" { + t.Errorf("UsernameToSub() = %q, want %q", string(result), "auth0|fresh001") + } + if putKey != "freshuser" || putValue != "auth0|fresh001" { + t.Errorf("cache.Put() called with key=%q value=%q, want key=%q value=%q", putKey, putValue, "freshuser", "auth0|fresh001") + } +} + func TestNewMessageHandlerOrchestrator(t *testing.T) { t.Run("create orchestrator with options", func(t *testing.T) { mockWriter := &mockUserServiceWriter{} diff --git a/pkg/constants/storage.go b/pkg/constants/storage.go index a84cf63..44e9971 100644 --- a/pkg/constants/storage.go +++ b/pkg/constants/storage.go @@ -11,6 +11,9 @@ const ( // KVBucketNameAutheliaEmailOTP is the name of the KV bucket for authelia email OTPs. KVBucketNameAutheliaEmailOTP = "authelia-email-otp" + // KVBucketNameUsernameSubCache is the name of the KV bucket for caching username → sub lookups. + KVBucketNameUsernameSubCache = "auth-service-username-sub-cache" + // KVLookupPrefixAuthelia is the prefix for lookup keys in the KV store. KVLookupPrefixAuthelia = "lookup/authelia-users/%s" )