Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions charts/lfx-v2-auth-service/templates/nats-kv-buckets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,24 @@ spec:
maxBytes: {{ .Values.nats.authelia_email_otp_kv_bucket.maxBytes }}
compression: {{ .Values.nats.authelia_email_otp_kv_bucket.compression }}
ttl: {{ .Values.nats.authelia_email_otp_kv_bucket.ttl }}
{{- end }}
---
{{- if .Values.nats.username_sub_cache_kv_bucket.creation }}
apiVersion: jetstream.nats.io/v1beta2
kind: KeyValue
metadata:
name: {{ .Values.nats.username_sub_cache_kv_bucket.name }}
namespace: {{ .Release.Namespace }}
{{- if .Values.nats.username_sub_cache_kv_bucket.keep }}
annotations:
"helm.sh/resource-policy": keep
{{- end }}
spec:
bucket: {{ .Values.nats.username_sub_cache_kv_bucket.name }}
history: {{ .Values.nats.username_sub_cache_kv_bucket.history }}
storage: {{ .Values.nats.username_sub_cache_kv_bucket.storage }}
maxValueSize: {{ .Values.nats.username_sub_cache_kv_bucket.maxValueSize }}
maxBytes: {{ .Values.nats.username_sub_cache_kv_bucket.maxBytes }}
compression: {{ .Values.nats.username_sub_cache_kv_bucket.compression }}
ttl: {{ .Values.nats.username_sub_cache_kv_bucket.ttl }}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include replicas here as well please?

{{- end }}
20 changes: 20 additions & 0 deletions charts/lfx-v2-auth-service/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ nats:
# ttl is the time-to-live for entries in the bucket (5 minutes for OTPs)
ttl: 5m

username_sub_cache_kv_bucket:
# creation is a boolean to determine if the KV bucket should be created via the helm chart.
creation: true
# keep is a boolean to determine if the KV bucket should be preserved during helm uninstall
keep: true
# name is the name of the KV bucket for caching username → sub lookups
name: auth-service-username-sub-cache
# history is the number of history entries to keep (1 is sufficient for a cache)
history: 1
# storage is the storage type for the KV bucket
storage: file
# maxValueSize is the maximum size of a value (sub strings are short)
maxValueSize: 512 # 512B
# maxBytes is the maximum number of bytes in the KV bucket
maxBytes: 10485760 # 10MB
# compression is a boolean to determine if the KV bucket should be compressed
compression: true
# ttl is the time-to-live for cache entries (7 days)
ttl: 168h

# serviceAccount is the configuration for the Kubernetes service account
## This will be used only if the USER_REPOSITORY_TYPE is authelia
serviceAccount:
Expand Down
4 changes: 4 additions & 0 deletions cmd/server/service/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func QueueSubscriptions(ctx context.Context) error {
service.WithEventPublisherForMessageHandler(natsClient),
}

if kvStore, ok := natsClient.GetKVStore(constants.KVBucketNameUsernameSubCache); ok {
opts = append(opts, service.WithUsernameSubCacheForMessageHandler(kvStore))
}

if os.Getenv(constants.UserRepositoryTypeEnvKey) == constants.UserRepositoryTypeAuth0 {
auth0Domain := os.Getenv(constants.Auth0DomainEnvKey)
if auth0Domain == "" {
Expand Down
35 changes: 22 additions & 13 deletions internal/infrastructure/nats/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,24 +168,33 @@ func NewClient(ctx context.Context, config Config) (*NATSClient, error) {
timeout: config.Timeout,
}

var buckets []string
// Check if Authelia is enabled by checking the environment variable directly
if os.Getenv(constants.UserRepositoryTypeEnvKey) == constants.UserRepositoryTypeAuthelia {
buckets = append(buckets, constants.KVBucketNameAutheliaUsers)
buckets = append(buckets, constants.KVBucketNameAutheliaEmailOTP)
// The username→sub cache bucket is best-effort: if it doesn't exist or is
// unavailable the service still works, just without caching.
if err := client.KeyValueStore(ctx, constants.KVBucketNameUsernameSubCache); err != nil {
slog.WarnContext(ctx, "username→sub cache bucket unavailable, caching disabled",
"error", err,
"bucket", constants.KVBucketNameUsernameSubCache,
)
} else {
Comment on lines +171 to +178
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This best-effort initialization still emits an error-level log because KeyValueStore() logs slog.ErrorContext before returning the error. As a result, when the bucket is missing/unavailable startup will log both an ERROR and this WARN, which contradicts the PR description (“logs a warning and continues”) and may trigger error-based alerting. Consider adding a non-error logging path for optional buckets (e.g., detect/handle missing-bucket/JetStream-unavailable errors here, or add a KeyValueStore variant/flag that doesn’t log at error level for expected best-effort failures).

Copilot uses AI. Check for mistakes.
slog.InfoContext(ctx, "NATS key-value store initialized",
"bucket", constants.KVBucketNameUsernameSubCache,
)
}

for _, bucketName := range buckets {
if err := client.KeyValueStore(ctx, bucketName); err != nil {
slog.ErrorContext(ctx, "failed to initialize NATS key-value store",
"error", err,
// Authelia-specific buckets are required when Authelia is the user repository.
if os.Getenv(constants.UserRepositoryTypeEnvKey) == constants.UserRepositoryTypeAuthelia {
for _, bucketName := range []string{constants.KVBucketNameAutheliaUsers, constants.KVBucketNameAutheliaEmailOTP} {
if err := client.KeyValueStore(ctx, bucketName); err != nil {
slog.ErrorContext(ctx, "failed to initialize NATS key-value store",
"error", err,
"bucket", bucketName,
)
return nil, errors.NewServiceUnavailable("failed to initialize NATS key-value store", err)
}
slog.InfoContext(ctx, "NATS key-value store initialized",
"bucket", bucketName,
)
return nil, errors.NewServiceUnavailable("failed to initialize NATS key-value store", err)
}
slog.InfoContext(ctx, "NATS key-value store initialized",
"bucket", bucketName,
)
}

slog.InfoContext(ctx, "NATS client created successfully",
Expand Down
31 changes: 31 additions & 0 deletions internal/service/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/constants"
errs "github.com/linuxfoundation/lfx-v2-auth-service/pkg/errors"
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/redaction"
"github.com/nats-io/nats.go/jetstream"
)

// UserProfileUpdatedEvent is published after a successful user_metadata update.
Expand All @@ -28,6 +29,12 @@ type UserProfileUpdatedEvent struct {
Timestamp time.Time `json:"timestamp"`
}

// kvCache is the minimal interface used for username→sub caching.
type kvCache interface {
Get(ctx context.Context, key string) (jetstream.KeyValueEntry, error)
Put(ctx context.Context, key string, value []byte) (uint64, error)
}

// UserDataResponse represents the response structure for user update operations
type UserDataResponse struct {
Success bool `json:"success"`
Expand All @@ -46,6 +53,7 @@ type messageHandlerOrchestrator struct {
passwordHandler port.PasswordHandler
impersonator port.Impersonator
eventPublisher port.EventPublisher
usernameSubCache kvCache
}

// MessageHandlerOrchestratorOption defines a function type for setting options
Expand Down Expand Up @@ -107,6 +115,13 @@ func WithEventPublisherForMessageHandler(eventPublisher port.EventPublisher) Mes
}
}

// WithUsernameSubCacheForMessageHandler sets the KV cache used for username→sub lookups
func WithUsernameSubCacheForMessageHandler(cache kvCache) MessageHandlerOrchestratorOption {
return func(m *messageHandlerOrchestrator) {
m.usernameSubCache = cache
}
}

func (m *messageHandlerOrchestrator) errorResponse(error string) []byte {
response := UserDataResponse{
Success: false,
Expand Down Expand Up @@ -192,11 +207,27 @@ func (m *messageHandlerOrchestrator) UsernameToSub(ctx context.Context, msg port
return m.errorResponse("username is required"), nil
}

// Cache-aside: check the KV cache before hitting Auth0.
if m.usernameSubCache != nil {
if entry, err := m.usernameSubCache.Get(ctx, username); err == nil {
slog.DebugContext(ctx, "username→sub cache hit", "username", redaction.Redact(username))
return entry.Value(), nil
}
}

user := &model.User{Username: username}
user, err := m.userReader.SearchUser(ctx, user, constants.CriteriaTypeUsername)
if err != nil {
return m.errorResponse(err.Error()), nil
}

// Populate the cache for future lookups.
if m.usernameSubCache != nil && user.UserID != "" {
if _, putErr := m.usernameSubCache.Put(ctx, username, []byte(user.UserID)); putErr != nil {
slog.WarnContext(ctx, "failed to cache username→sub", "username", redaction.Redact(username), "error", putErr)
}
}

return []byte(user.UserID), nil
}

Expand Down
106 changes: 106 additions & 0 deletions internal/service/message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/linuxfoundation/lfx-v2-auth-service/internal/domain/model"
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/constants"
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/converters"
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/errors"
natsjetstream "github.com/nats-io/nats.go/jetstream"
)

// mockTransportMessenger is a mock implementation of port.TransportMessenger for testing
Expand Down Expand Up @@ -1395,6 +1397,110 @@ func TestMessageHandlerOrchestrator_UpdateUser_EventPublishing(t *testing.T) {
})
}

// mockKVEntry is a minimal implementation of jetstream.KeyValueEntry for testing.
type mockKVEntry struct {
value []byte
}

func (e *mockKVEntry) Value() []byte { return e.value }
func (e *mockKVEntry) Key() string { return "" }
func (e *mockKVEntry) Bucket() string { return "" }
func (e *mockKVEntry) Revision() uint64 { return 0 }
func (e *mockKVEntry) Delta() uint64 { return 0 }
func (e *mockKVEntry) Created() time.Time { return time.Time{} }
func (e *mockKVEntry) Operation() natsjetstream.KeyValueOp { return 0 }

// mockKVCache is a minimal mock for the kvCache interface.
type mockKVCache struct {
getFunc func(ctx context.Context, key string) (natsjetstream.KeyValueEntry, error)
putFunc func(ctx context.Context, key string, value []byte) (uint64, error)
}

func (m *mockKVCache) Get(ctx context.Context, key string) (natsjetstream.KeyValueEntry, error) {
if m.getFunc != nil {
return m.getFunc(ctx, key)
}
return nil, natsjetstream.ErrKeyNotFound
}

func (m *mockKVCache) Put(ctx context.Context, key string, value []byte) (uint64, error) {
if m.putFunc != nil {
return m.putFunc(ctx, key, value)
}
return 0, nil
}

func TestMessageHandlerOrchestrator_UsernameToSub_CacheHit(t *testing.T) {
ctx := context.Background()

searchCalled := false
cache := &mockKVCache{
getFunc: func(_ context.Context, key string) (natsjetstream.KeyValueEntry, error) {
if key == "cacheduser" {
return &mockKVEntry{value: []byte("auth0|cached001")}, nil
}
return nil, natsjetstream.ErrKeyNotFound
},
}

orchestrator := NewMessageHandlerOrchestrator(
WithUserReaderForMessageHandler(&mockUserServiceReader{
searchUserFunc: func(_ context.Context, _ *model.User, _ string) (*model.User, error) {
searchCalled = true
return nil, errors.NewUnexpected("should not be called")
},
}),
WithUsernameSubCacheForMessageHandler(cache),
)

result, err := orchestrator.UsernameToSub(ctx, &mockTransportMessenger{data: []byte("cacheduser")})
if err != nil {
t.Fatalf("UsernameToSub() unexpected error: %v", err)
}
if string(result) != "auth0|cached001" {
t.Errorf("UsernameToSub() = %q, want %q", string(result), "auth0|cached001")
}
if searchCalled {
t.Error("Auth0 SearchUser should not be called on cache hit")
}
}

func TestMessageHandlerOrchestrator_UsernameToSub_CacheMiss(t *testing.T) {
ctx := context.Background()

putKey, putValue := "", ""
cache := &mockKVCache{
getFunc: func(_ context.Context, _ string) (natsjetstream.KeyValueEntry, error) {
return nil, natsjetstream.ErrKeyNotFound
},
putFunc: func(_ context.Context, key string, value []byte) (uint64, error) {
putKey = key
putValue = string(value)
return 1, nil
},
}

orchestrator := NewMessageHandlerOrchestrator(
WithUserReaderForMessageHandler(&mockUserServiceReader{
searchUserFunc: func(_ context.Context, user *model.User, _ string) (*model.User, error) {
return &model.User{UserID: "auth0|fresh001", Username: user.Username}, nil
},
}),
WithUsernameSubCacheForMessageHandler(cache),
)

result, err := orchestrator.UsernameToSub(ctx, &mockTransportMessenger{data: []byte("freshuser")})
if err != nil {
t.Fatalf("UsernameToSub() unexpected error: %v", err)
}
if string(result) != "auth0|fresh001" {
t.Errorf("UsernameToSub() = %q, want %q", string(result), "auth0|fresh001")
}
if putKey != "freshuser" || putValue != "auth0|fresh001" {
t.Errorf("cache.Put() called with key=%q value=%q, want key=%q value=%q", putKey, putValue, "freshuser", "auth0|fresh001")
}
}

func TestNewMessageHandlerOrchestrator(t *testing.T) {
t.Run("create orchestrator with options", func(t *testing.T) {
mockWriter := &mockUserServiceWriter{}
Expand Down
3 changes: 3 additions & 0 deletions pkg/constants/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const (
// KVBucketNameAutheliaEmailOTP is the name of the KV bucket for authelia email OTPs.
KVBucketNameAutheliaEmailOTP = "authelia-email-otp"

// KVBucketNameUsernameSubCache is the name of the KV bucket for caching username → sub lookups.
KVBucketNameUsernameSubCache = "auth-service-username-sub-cache"

// KVLookupPrefixAuthelia is the prefix for lookup keys in the KV store.
KVLookupPrefixAuthelia = "lookup/authelia-users/%s"
)
Loading