diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java index b3846e25..4ff12820 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/AxoniqPlatformConfigurerEnhancer.java @@ -165,6 +165,14 @@ public void enhance(ComponentRegistry registry) { UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> { + // Only event processor modules expose a processorName; the doOnSubModules walker + // visits every sub-module (event-sourced entities, command/query modules, ...), so + // skipping non-processor modules here keeps AxoniqPlatformEventHandlingComponent + // from being constructed with a null processor name. + if (!(module instanceof PooledStreamingEventProcessorModule) + && !(module instanceof SubscribingEventProcessorModule)) { + return null; + } componentRegistry .registerDecorator(DecoratorDefinition.forType(EventHandlingComponent.class) .with((cc, name, delegate) -> @@ -176,7 +184,7 @@ public void enhance(ComponentRegistry registry) { .order(0)); return null; - }); + }, true); } /** diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt new file mode 100644 index 00000000..757a3bf6 --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformEntityEvolver.kt @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import org.axonframework.messaging.core.Context +import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.eventhandling.EventMessage +import org.axonframework.modelling.EntityEvolver +import java.util.function.BiConsumer + +/** + * Wraps the underlying [EntityEvolver] of an [org.axonframework.eventsourcing.EventSourcingRepository] + * so that inspection-time replay can fire BEFORE/AFTER hooks per event without reimplementing + * AF5's dispatch. + * + * The hooks are no-ops unless the matching [Context.ResourceKey] resources are present on the + * [ProcessingContext], so command handling and normal event sourcing go through unchanged. + * + * Constructed by [AxoniqPlatformModelInspectionEnhancer], which detects the inner + * [org.axonframework.eventsourcing.EventSourcingRepository] in the decorator chain and reconstructs + * it with this wrapper substituted for its `entityEvolver` argument. + */ +class AxoniqPlatformEntityEvolver( + private val delegate: EntityEvolver, +) : EntityEvolver { + + companion object { + /** Called before [delegate.evolve]. Receives the event and the pre-evolve entity state. */ + val BEFORE_CONSUMER: Context.ResourceKey> = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.BEFORE_CONSUMER") + + /** Called after [delegate.evolve]. Receives the event and the post-evolve entity state. */ + val AFTER_CONSUMER: Context.ResourceKey> = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.AFTER_CONSUMER") + + /** + * Zero-based event index — when present, [evolve] returns the current entity unchanged + * once it has applied this many events. Lets inspection reconstruct state up to a given + * sequence without doing the bookkeeping outside the framework. + */ + val MAX_INDEX: Context.ResourceKey = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.MAX_INDEX") + + /** Internal counter advanced by [evolve] when [MAX_INDEX] is set. */ + val INDEX_COUNTER: Context.ResourceKey = + Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.INDEX_COUNTER") + } + + /** Tiny mutable holder so we can advance the index without re-putting a Long resource each call. */ + class LongCounter(var value: Long = 0) + + override fun evolve(entity: E, event: EventMessage, context: ProcessingContext): E { + val maxIndex = context.getResource(MAX_INDEX) + if (maxIndex != null) { + val counter = context.computeResourceIfAbsent(INDEX_COUNTER) { LongCounter() } + if (counter.value > maxIndex) { + return entity + } + counter.value++ + } + context.getResource(BEFORE_CONSUMER)?.accept(event, entity) + val result = delegate.evolve(entity, event, context) + context.getResource(AFTER_CONSUMER)?.accept(event, result) + return result + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java index 2d0534ed..07eb91f4 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancer.java @@ -18,39 +18,54 @@ import io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer; import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; -import org.axonframework.common.configuration.ComponentDefinition; import org.axonframework.common.configuration.ComponentRegistry; import org.axonframework.common.configuration.ConfigurationEnhancer; -import org.axonframework.common.lifecycle.Phase; -import org.axonframework.eventsourcing.eventstore.EventStorageEngine; -import org.axonframework.modelling.StateManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * Enhancer that registers the {@link RSocketModelInspectionResponder} when both - * {@link StateManager} and {@link EventStorageEngine} are available (AF5 applications). + * Service-loaded enhancer that wires the {@link RSocketModelInspectionResponder} when the + * application has the platform client connected ({@link RSocketHandlerRegistrar} present) and + * {@code axon-eventsourcing} is on the classpath. + * + *

This class is deliberately free of direct references to {@code axon-eventsourcing} types + * so it can be loaded even when event sourcing is absent from the classpath. The actual + * decorator wiring lives in {@link ModelInspectionDecorators} and is only touched after the + * runtime classpath probe succeeds.

+ * + *

We deliberately do not probe for {@code StateManager} either: it's registered by + * {@code ModellingConfigurationDefaults} at {@link Integer#MAX_VALUE}, after this enhancer's + * order, so the probe would falsely return {@code false} during boot.

*/ public class AxoniqPlatformModelInspectionEnhancer implements ConfigurationEnhancer { + private static final Logger logger = LoggerFactory.getLogger(AxoniqPlatformModelInspectionEnhancer.class); + private static final String EVENTSOURCING_PROBE_CLASS = "org.axonframework.eventsourcing.eventstore.EventStorageEngine"; + @Override public void enhance(ComponentRegistry registry) { - if (!registry.hasComponent(StateManager.class) - || !registry.hasComponent(EventStorageEngine.class) - || !registry.hasComponent(RSocketHandlerRegistrar.class)) { + if (!registry.hasComponent(RSocketHandlerRegistrar.class)) { return; } - - registry.registerComponent(ComponentDefinition - .ofType(RSocketModelInspectionResponder.class) - .withBuilder(c -> new RSocketModelInspectionResponder( - c.getComponent(StateManager.class), - c.getComponent(EventStorageEngine.class), - c.getComponent(RSocketHandlerRegistrar.class), - c)) - .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start)); + if (!isClasspathAvailable()) { + logger.debug("axon-eventsourcing not on classpath; skipping model inspection wiring."); + return; + } + ModelInspectionDecorators.apply(registry); } @Override public int order() { return AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + 1; } + + private static boolean isClasspathAvailable() { + try { + Class.forName(EVENTSOURCING_PROBE_CLASS, false, + AxoniqPlatformModelInspectionEnhancer.class.getClassLoader()); + return true; + } catch (ClassNotFoundException e) { + return false; + } + } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java new file mode 100644 index 00000000..c1c8d2ea --- /dev/null +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/ModelInspectionDecorators.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing; + +import io.axoniq.platform.framework.ReflectionKt; +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar; +import org.axonframework.common.configuration.ComponentDefinition; +import org.axonframework.common.configuration.ComponentRegistry; +import org.axonframework.common.configuration.DecoratorDefinition; +import org.axonframework.common.lifecycle.Phase; +import org.axonframework.eventsourcing.EventSourcedEntityFactory; +import org.axonframework.eventsourcing.EventSourcingRepository; +import org.axonframework.eventsourcing.eventstore.EventStorageEngine; +import org.axonframework.eventsourcing.eventstore.EventStore; +import org.axonframework.eventsourcing.handler.SourcingHandler; +import org.axonframework.modelling.EntityEvolver; +import org.axonframework.modelling.repository.Repository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +/** + * Holds the actual decorator and component wiring for model inspection. Kept separate from + * {@link AxoniqPlatformModelInspectionEnhancer} so the enhancer class can be loaded even when + * {@code axon-eventsourcing} is not on the classpath — this class is only touched after a + * {@code Class.forName} probe confirms the module is present. + * + *

We do not walk submodules: AF5's nested module structure shares a single + * {@link org.axonframework.common.configuration.DefaultComponentRegistry} (each {@code BaseModule} + * resolves the parent's {@code ComponentRegistry} component instead of creating its own), so a + * single {@code Repository} decorator at the top covers every event-sourced entity in the + * application — top-level or arbitrarily nested.

+ */ +final class ModelInspectionDecorators { + + private static final Logger logger = LoggerFactory.getLogger(ModelInspectionDecorators.class); + + private ModelInspectionDecorators() { + } + + static void apply(ComponentRegistry registry) { + if (!registry.hasComponent(EventStorageEngine.class)) { + return; + } + // The enhancer pipeline can fire multiple times against the same registry as nested + // module configurations build. Idempotency guard: once the responder is in place, the + // decorator and its lifecycle hook are already registered, so re-running would + // duplicate the wrapping and double-evolve every event. + if (registry.hasComponent(RSocketModelInspectionResponder.class)) { + return; + } + + registry.registerComponent(ComponentDefinition + .ofType(RSocketModelInspectionResponder.class) + .withBuilder(c -> new RSocketModelInspectionResponder( + c.getComponent(EventStorageEngine.class), + c.getComponent(RSocketHandlerRegistrar.class), + c)) + .onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start)); + + // Single decorator at the top covers every Repository registered in the application, + // top-level or nested — AF5's nested modules share the same component registry. + // + // The decorator reconstructs the underlying EventSourcingRepository with entityEvolver + // wrapped in AxoniqPlatformEntityEvolver. We deliberately do NOT decorate the registered + // EntityMetamodel — AnnotatedEventSourcedEntityModule casts the registered metamodel to + // AnnotatedEntityMetamodel inside its EntityIdResolver builder, and a wrapper would + // make that cast fail at startup. + // + // The .onStart hook then registers the rebuilt repository with the responder so it + // knows about this entity for the registered-entities query. + registry.registerDecorator(DecoratorDefinition + .forType(Repository.class) + .with((config, name, delegate) -> rebuildIfEventSourcingRepository(delegate)) + .onStart(Phase.LOCAL_MESSAGE_HANDLER_REGISTRATIONS, (configuration, component) -> { + configuration.getComponent(RSocketModelInspectionResponder.class) + .registerRepository(component); + return CompletableFuture.completedFuture(null); + })); + } + + /** + * Walks the wrapper chain from {@code delegate} downward to find an + * {@link EventSourcingRepository}, reconstructs that ESR with {@code entityEvolver} wrapped + * in {@link AxoniqPlatformEntityEvolver}, and swaps the wrapping component's {@code delegate} + * field to point at the new ESR. The outer wrapper(s) are kept intact so any platform-side + * decoration (e.g. {@code AxoniqPlatformRepository} for metrics) still applies. + * + *

Why peel rather than match {@code instanceof EventSourcingRepository} on the input: + * by the time this decorator runs, lower-order decorators (notably the metrics-adding + * {@code AxoniqPlatformRepository} from the modelling layer at {@code Integer.MIN_VALUE}) + * have already wrapped the ESR. Matching directly would miss every real configuration.

+ * + *

Logged-and-passthrough on reflection failure: if the field layout shifts in a future + * AF release we don't want to break command handling, just lose inspection hooks.

+ */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static Repository rebuildIfEventSourcingRepository(Repository delegate) { + Object current = delegate; + Object parent = null; + while (current != null && !(current instanceof EventSourcingRepository)) { + parent = current; + current = ReflectionKt.getPropertyValue(current, "delegate"); + } + if (!(current instanceof EventSourcingRepository esr)) { + return delegate; + } + try { + Class idType = ReflectionKt.getPropertyValue(esr, "idType"); + Class entityType = ReflectionKt.getPropertyValue(esr, "entityType"); + EventStore eventStore = ReflectionKt.getPropertyValue(esr, "eventStore"); + EventSourcedEntityFactory factory = ReflectionKt.getPropertyValue(esr, "entityFactory"); + EntityEvolver evolver = ReflectionKt.getPropertyValue(esr, "entityEvolver"); + SourcingHandler sourcingHandler = ReflectionKt.getPropertyValue(esr, "sourcingHandler"); + + EntityEvolver wrappedEvolver = new AxoniqPlatformEntityEvolver(evolver); + EventSourcingRepository rebuilt = new EventSourcingRepository( + idType, + entityType, + eventStore, + factory, + wrappedEvolver, + sourcingHandler + ); + if (parent == null) { + // No wrapper between us and the ESR — return the rebuilt ESR directly. + return rebuilt; + } + // Swap the parent wrapper's delegate to point at the rebuilt ESR. Keeps any outer + // wrappers (metrics, etc.) intact, just rewires the bottom of the chain. + ReflectionKt.setPropertyValue(parent, "delegate", rebuilt); + return delegate; + } catch (Exception e) { + logger.warn("[ModelInspection] Could not reconstruct EventSourcingRepository for [{}] — " + + "inspection hooks will be unavailable for this entity, but command handling is unaffected: {}", + esr.entityType().getName(), e.getMessage()); + return delegate; + } + } +} diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt index 18d879fc..0188391e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponder.kt @@ -24,179 +24,73 @@ import io.axoniq.platform.framework.api.* import io.axoniq.platform.framework.client.RSocketHandlerRegistrar import io.axoniq.platform.framework.truncateToBytes import org.axonframework.common.configuration.Configuration -import org.axonframework.common.infra.ComponentDescriptor -import org.axonframework.common.infra.DescribableComponent -import org.axonframework.conversion.Converter -import org.axonframework.eventsourcing.CriteriaResolver -import org.axonframework.eventsourcing.EventSourcedEntityFactory -import org.axonframework.eventsourcing.annotation.AnnotationBasedEventCriteriaResolver -import org.axonframework.eventsourcing.annotation.EventSourcingHandler import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.eventsourcing.eventstore.SourcingCondition -import org.axonframework.eventsourcing.handler.InitializingEntityEvolver import org.axonframework.messaging.core.unitofwork.ProcessingContext +import org.axonframework.messaging.core.unitofwork.UnitOfWorkFactory import org.axonframework.messaging.eventhandling.EventMessage -import org.axonframework.messaging.eventhandling.GenericEventMessage -import org.axonframework.messaging.eventhandling.TerminalEventMessage -import org.axonframework.messaging.eventstreaming.EventCriteria -import org.axonframework.modelling.EntityEvolver -import org.axonframework.modelling.StateManager -import java.lang.reflect.Proxy +import org.axonframework.modelling.repository.Repository import org.slf4j.LoggerFactory import java.util.* +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap +import java.util.function.BiConsumer open class RSocketModelInspectionResponder( - private val stateManager: StateManager, - private val eventStorageEngine: EventStorageEngine, + @Suppress("unused") private val eventStorageEngine: EventStorageEngine, private val registrar: RSocketHandlerRegistrar, - private val configuration: Configuration + private val configuration: Configuration, ) { private val logger = LoggerFactory.getLogger(this::class.java) + private val objectMapper = ObjectMapper().apply { findAndRegisterModules() disable(SerializationFeature.FAIL_ON_EMPTY_BEANS) - // AF5 entities are typically Kotlin data classes / Java records with private fields and no - // public getters in the bean-getter sense. Enable direct field access so Jackson surfaces - // the entity's actual state instead of emitting `{}` for "no discoverable properties". + // AF5 entities are typically Kotlin data classes / Java records with private fields and + // no public bean-style getters. Field access lets Jackson surface the actual state + // instead of `{}`. setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) setVisibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE) setVisibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE) } - /** - * Cache of [CriteriaResolver] instances per (entityType, idType) pair. Resolvers are - * stateless and obtaining one via describe-walking or reflection is not free, so caching - * avoids repeating the work on every query. Keyed by class identity so redeploys with - * different classloaders get fresh entries naturally. - */ - private val criteriaResolverCache = ConcurrentHashMap, Class<*>>, CriteriaResolver>() - - /** - * Cache of [InitializingEntityEvolver] instances per (entityType, idType) pair. The - * initializer combines the entity factory and the raw evolver: if the current state is - * null on a given event, it creates a fresh entity via the factory; otherwise it - * delegates to the evolver. This is exactly what we need for ad-hoc state replay - * starting from a null state. Cache is safe because the initializer is stateless. - */ - private val initializingEvolverCache = ConcurrentHashMap, Class<*>>, InitializingEntityEvolver>() - - /** - * The framework-wide [Converter] used to deserialize raw event payloads (read from the event - * store as `byte[]`) into typed event objects, so the evolver's `@EventSourcingHandler` - * methods can match them by parameter class. Resolved lazily from the [Configuration]. - */ - private val payloadConverter: Converter? by lazy { - try { - configuration.getComponent(Converter::class.java).also { - logger.info("[ModelInspection] Using payload converter [{}] for event deserialization", it.javaClass.simpleName) - } - } catch (e: Exception) { - logger.warn("[ModelInspection] Could not obtain Converter from configuration — events will be passed to evolver with raw payloads (state replay will likely be empty): {}", - e.message) - null - } + private val unitOfWorkFactory: UnitOfWorkFactory by lazy { + configuration.getComponent(UnitOfWorkFactory::class.java) } /** - * Deserializes the event message's raw `byte[]` payload into a typed instance of the event - * class indicated by `message.type().name()`. Without this, the evolver receives `byte[]` - * payloads and no `@EventSourcingHandler(EventClass)` method matches, so state never advances - * past entity creation defaults. - * - * Why not [EventMessage.withConvertedPayload]: that API short-circuits via - * `convertedPayload.class.isAssignableFrom(payloadType())`. When the converter quietly - * returns the same `byte[]` (no registered conversion path for `byte[] -> EventClass`), - * the assignability check passes and the original message is returned untouched — so - * `payloadType()` stays `byte[]` and the evolver can't dispatch to a typed handler. - * - * Instead, we explicitly call [Message.payloadAs] (forces conversion via the converter) - * and reconstruct a [GenericEventMessage] with the typed payload, so the new - * `payloadType()` is the actual event class — exactly what `AnnotatedEntityMetamodel.evolve` - * expects to find a matching `@EventSourcingHandler`. - * - * Returns the original message untouched when the converter is unavailable, the type can't - * be resolved on the classpath, conversion fails, or the result is unexpectedly null. + * Repositories collected at boot from each event-sourced submodule via the decorator hook + * in [AxoniqPlatformModelInspectionEnhancer]. Replaces walking the top-level state manager, + * which only sees the top-level state manager and misses everything registered in submodules. */ - private fun deserializePayload(message: EventMessage): EventMessage { - val converter = payloadConverter ?: return message - val typeName = message.type()?.name() ?: return message - return try { - val cls = Class.forName(typeName) - // Force the converter to produce a typed instance. This bypasses - // withConvertedPayload's assignability short-circuit which keeps payloadType=byte[] - // when the converter returns the input unchanged. - val typedPayload: Any? = message.payloadAs(cls, converter) - if (typedPayload == null || cls.isInstance(typedPayload).not()) { - logger.debug("[ModelInspection] Converter returned unexpected payload for type [{}]: actual=[{}] — keeping original", - typeName, typedPayload?.javaClass?.name) - return message - } - logger.info("[ModelInspection] Converted payload: msgType.name=[{}] msgType.version=[{}] payloadCls=[{}]", - message.type().name(), - message.type()?.version(), - typedPayload.javaClass.name) - // Build a fresh GenericEventMessage whose payloadType() is the typed class. - // GenericMessage's 4-arg constructor derives payloadType from payload.getClass(), - // so the resulting message advertises the correct event class to the evolver. - // Metadata extends Map in AF5 but Kotlin needs an explicit cast - // because of its stricter generics treatment of the Java collection. - @Suppress("UNCHECKED_CAST") - GenericEventMessage( - message.identifier(), - message.type(), - typedPayload, - message.metadata() as Map, - message.timestamp(), - ) - } catch (e: ClassNotFoundException) { - logger.debug("Event type [{}] not on classpath — leaving payload as raw bytes", typeName) - message - } catch (e: Exception) { - logger.debug("Could not convert event payload of type [{}]: {}", typeName, e.message) - message - } - } + private val repositories = ConcurrentHashMap, Class<*>>, Repository>() - /** - * No-op [ProcessingContext] proxy used for ad-hoc evolve calls outside a real unit of work. - * `AnnotatedEntityMetamodel.evolve` requires a non-null context (defensive `Objects.requireNonNull`), - * but during model inspection replay we have no active processing context. The proxy returns - * sane defaults: null for resource lookups, `false` for predicates, `this` for fluent setters, - * and a no-op for void methods. Any method that would actually need a resource will see null - * and either no-op or throw — caught at the [evolveSafely] boundary. - */ - private val noOpProcessingContext: ProcessingContext = Proxy.newProxyInstance( - ProcessingContext::class.java.classLoader, - arrayOf(ProcessingContext::class.java) - ) { proxy, method, _ -> - when (method.returnType) { - java.lang.Boolean.TYPE -> false - ProcessingContext::class.java -> proxy - Void.TYPE -> null - else -> null - } - } as ProcessingContext + @Suppress("UNCHECKED_CAST") + fun registerRepository(repository: Repository<*, *>) { + val key = repository.entityType() to repository.idType() + repositories[key] = repository as Repository + logger.info("[ModelInspection] Registered repository for entity=[{}] id=[{}]", + repository.entityType().name, repository.idType().name) + } fun start() { registrar.registerHandlerWithoutPayload( Routes.Model.REGISTERED_ENTITIES, - this::handleRegisteredEntities + this::handleRegisteredEntities, ) registrar.registerHandlerWithPayload( Routes.Model.DOMAIN_EVENTS, ModelDomainEventsQuery::class.java, - this::handleDomainEvents + this::handleDomainEvents, ) registrar.registerHandlerWithPayload( Routes.Model.ENTITY_STATE_AT_SEQUENCE, ModelEntityStateAtSequenceQuery::class.java, - this::handleEntityStateAtSequence + this::handleEntityStateAtSequence, ) registrar.registerHandlerWithPayload( Routes.Model.REPLAY_TIMELINE, ModelTimelineQuery::class.java, - this::handleTimelineReplay + this::handleTimelineReplay, ) } @@ -204,28 +98,29 @@ open class RSocketModelInspectionResponder( // Registered entities introspection // ------------------------------------------------------------------------------------------ - private fun handleRegisteredEntities(): RegisteredEntitiesResult { + internal fun handleRegisteredEntities(): RegisteredEntitiesResult { logger.debug("Handling Axoniq Platform MODEL_REGISTERED_ENTITIES query") - val entities = stateManager.registeredEntities().map { entityType -> - val idTypeInfos = stateManager.registeredIdsFor(entityType).map { idClass -> - IdType( - type = idClass.name, - idFields = describeIdFields(idClass), - ) - } + val grouped: Map, List>> = repositories.keys + .groupBy({ it.first }, { it.second }) + + val entities = grouped.map { (entityType, idClasses) -> RegisteredEntityInfo( entityType = entityType.name, - idTypes = idTypeInfos, + idTypes = idClasses.map { idClass -> + IdType( + type = idClass.name, + idFields = describeIdFields(idClass), + ) + }, ) } + logger.debug("Found entities: {}", entities) return RegisteredEntitiesResult(entities = entities) } /** - * Returns structural descriptors for the given id class. Empty for "simple" types where - * the frontend should show a single text input (String, primitives, UUID); populated for - * records / data classes / plain objects where each property should get its own input. - * Only 1-deep properties are described — nested types surface as type = "object". + * Returns structural descriptors for the given id class. Empty for "simple" types (single + * text input on the frontend); populated for compound types (one input per descriptor). */ internal fun describeIdFields(idClass: Class<*>): List { if (isSimpleIdType(idClass)) { @@ -236,7 +131,7 @@ open class RSocketModelInspectionResponder( IdFieldDescriptor( name = component.name, type = normalizedType(component.type), - javaType = component.type.name + javaType = component.type.name, ) } } @@ -248,7 +143,7 @@ open class RSocketModelInspectionResponder( IdFieldDescriptor( name = field.name, type = normalizedType(field.type), - javaType = field.type.name + javaType = field.type.name, ) } } @@ -297,39 +192,9 @@ open class RSocketModelInspectionResponder( } // ------------------------------------------------------------------------------------------ - // Criteria resolution + id deserialization + // Id deserialization // ------------------------------------------------------------------------------------------ - /** - * Resolves the [EventCriteria] for a given (entityType, idType, entityId) triple by - * obtaining the registered [CriteriaResolver] for the chosen id type and invoking it - * with the deserialized typed id. Multi-tag and compound-id entities produce the - * correct criteria automatically — no tag-key resolution needed. - */ - private fun resolveCriteria(entityType: Class<*>, idClass: Class<*>, entityId: String): EventCriteria { - val typedId = deserializeEntityId(entityId, idClass) - ?: throw IllegalArgumentException("Could not deserialize id [$entityId] as type [${idClass.name}]") - return resolveCriteriaWithTypedId(entityType, idClass, typedId) - } - - /** - * Same as [resolveCriteria] but skips id deserialization — used by handlers that already - * have the typed id (e.g. for state reconstruction via [InitializingEntityEvolver]). - */ - private fun resolveCriteriaWithTypedId(entityType: Class<*>, idClass: Class<*>, typedId: Any): EventCriteria { - val resolver = obtainCriteriaResolver(entityType, idClass) - // ProcessingContext is @NonNull via JSpecify @NullMarked, but the default - // AnnotationBasedEventCriteriaResolver never reads it. We bypass Kotlin's nullability - // check via reflection; resolvers that actually rely on the context will throw NPE - // which propagates to the handler-level catch and surfaces a clear error. - return invokeResolveWithNullContext(resolver, typedId) - } - - /** - * Parses the incoming [entityId] wire string into the entity's id type. For simple id - * types we parse directly; for compound types the wire format is a JSON object whose - * keys match the id's property names. - */ private fun deserializeEntityId(entityId: String, idClass: Class<*>): Any? { val trimmed = entityId.trim() return when { @@ -367,204 +232,43 @@ open class RSocketModelInspectionResponder( } } - /** - * Obtains a [CriteriaResolver] for the given (entityType, idType) pair. Strategy: - * 1. Walk the registered repository via `describeTo` and pick out the first matching - * [CriteriaResolver] — this honors any custom resolver an application may have wired in. - * 2. Fall back to constructing a fresh [AnnotationBasedEventCriteriaResolver] from the - * available [Configuration], which covers the default annotation-driven setup. - * Results are cached per (entityType, idType) pair. - */ - @Suppress("UNCHECKED_CAST") - private fun obtainCriteriaResolver(entityClass: Class<*>, idClass: Class<*>): CriteriaResolver { - return criteriaResolverCache.getOrPut(entityClass to idClass) { - findInRepository(entityClass, idClass, CriteriaResolver::class.java) as CriteriaResolver? - ?: AnnotationBasedEventCriteriaResolver( - entityClass as Class, - idClass as Class, - configuration - ) as CriteriaResolver - } - } - - /** - * Invokes [CriteriaResolver.resolve] with a no-op [ProcessingContext] via reflection, - * bypassing the JSpecify/Kotlin non-null check on the parameter. The default - * `AnnotationBasedEventCriteriaResolver` doesn't read the context; resolvers that do - * may interact with the proxy (returning null/false defaults) and either no-op or throw, - * which propagates to the handler-level catch. - */ - private fun invokeResolveWithNullContext(resolver: CriteriaResolver, typedId: Any): EventCriteria { - val method = resolver.javaClass.methods.first { it.name == "resolve" && it.parameterCount == 2 } - return method.invoke(resolver, typedId, noOpProcessingContext) as EventCriteria - } - // ------------------------------------------------------------------------------------------ - // Entity evolver lookup + state reconstruction + // UoW-driven inspection load // ------------------------------------------------------------------------------------------ /** - * Obtains an [InitializingEntityEvolver] for the given (entityType, idType). AF5 - * repositories don't pre-instantiate `InitializingEntityEvolver`; they expose the - * `entityFactory` and `entityEvolver` as separate describable properties. We find both - * via the describe tree and construct the initializer manually — same constructor the - * framework uses internally — so null initial state on the first event triggers entity - * creation via the factory rather than no-op'ing. - * - * Returns null when either piece can't be located. + * Resolves the (entityType, idType) pair to the registered repository. Returns null if no + * matching repository was registered at boot — e.g. the entity exists in a non-event-sourced + * module, or the user-supplied class names don't resolve. */ - @Suppress("UNCHECKED_CAST") - private fun obtainInitializingEvolver(entityClass: Class<*>, idClass: Class<*>): InitializingEntityEvolver? { - initializingEvolverCache[entityClass to idClass]?.let { return it } - val factory = findInRepository(entityClass, idClass, EventSourcedEntityFactory::class.java) - as EventSourcedEntityFactory? - val evolver = findInRepository(entityClass, idClass, EntityEvolver::class.java, preferredName = "entityEvolver") - as EntityEvolver? - if (factory == null || evolver == null) { - logger.warn("Could not assemble InitializingEntityEvolver for [{}] / [{}] — factory={}, evolver={}", - entityClass.name, idClass.name, factory?.javaClass?.name, evolver?.javaClass?.name) - return null - } - logger.info("Assembled InitializingEntityEvolver for [{}] / [{}] — factory=[{}], evolver=[{}]", - entityClass.simpleName, idClass.simpleName, factory.javaClass.simpleName, evolver.javaClass.simpleName) - val initializer = InitializingEntityEvolver(factory, evolver) - initializingEvolverCache[entityClass to idClass] = initializer - return initializer + private fun lookupRepository(entityType: Class<*>, idType: Class<*>): Repository? { + return repositories[entityType to idType] } /** - * Invokes [InitializingEntityEvolver.evolve] with a null [ProcessingContext] via reflection - * — same trick as [invokeResolveWithNullContext]. The initializer handles null state by - * delegating to the entity factory before evolving. Custom code that relies on the context - * will throw NPE; we catch at the caller and keep the previous state. - * - * The 4-arg signature is `(I id, E currentState, EventMessage event, ProcessingContext ctx)`. + * Runs [block] inside a real [org.axonframework.messaging.core.unitofwork.UnitOfWork], wiring + * the supplied hooks onto the [ProcessingContext] so the framework's own event-sourcing + * pipeline drives state replay. The repository's load is invoked through the framework path — + * criteria resolution, payload conversion, and `@EventSourcingHandler` dispatch all happen as + * they would in a real command flow. We never append events, so commit is a no-op for storage. */ - private fun invokeEvolveWithNullContext( - evolver: InitializingEntityEvolver, + private fun withInspectionUoW( + repository: Repository, typedId: Any, - currentState: Any?, - message: EventMessage, - ): Any? { - val method = evolver.javaClass.methods.first { it.name == "evolve" && it.parameterCount == 4 } - return method.invoke(evolver, typedId, currentState, message, noOpProcessingContext) - } - - /** - * Walks the repository registered for (entityClass, idClass) via `describeTo` and returns - * the first instance of [target] that surfaces. When [preferredName] is set, the property - * with that exact name wins over any other match (used to prefer raw evolvers over - * lifecycle-wrapped ones). - */ - private fun findInRepository( - entityClass: Class<*>, - idClass: Class<*>, - target: Class, - preferredName: String? = null, - ): T? { - return try { - val repository = stateManager.repository(entityClass, idClass) ?: return null - val repoClass = repository.javaClass.name - // Diagnostic dump (DEBUG): on every lookup, list every describable property — useful - // when debugging an unfamiliar repository implementation, but too noisy for INFO. - if (logger.isDebugEnabled) { - val dump = DescribePropertyDump() - (repository as? DescribableComponent)?.describeTo(dump) - logger.debug("[ModelInspection] Looking for [{}] in repository [{}] for [{}] / [{}]; describe tree:\n{}", - target.simpleName, repoClass, entityClass.simpleName, idClass.simpleName, dump.formatted()) + beforeConsumer: BiConsumer? = null, + afterConsumer: BiConsumer? = null, + maxIndex: Long? = null, + extract: (entity: Any?) -> R, + ): R { + return unitOfWorkFactory.create().executeWithResult { ctx: ProcessingContext -> + beforeConsumer?.let { ctx.putResource(AxoniqPlatformEntityEvolver.BEFORE_CONSUMER, it) } + afterConsumer?.let { ctx.putResource(AxoniqPlatformEntityEvolver.AFTER_CONSUMER, it) } + maxIndex?.let { ctx.putResource(AxoniqPlatformEntityEvolver.MAX_INDEX, it) } + + repository.load(typedId, ctx).thenApply { managed -> + extract(managed?.entity()) } - val finder = NamedDescribablePropertyFinder(target, preferredName) - (repository as? DescribableComponent)?.describeTo(finder) ?: return null - finder.found - } catch (e: Exception) { - logger.debug("describeTo lookup of [{}] for entity [{}] / id [{}] failed: {}", - target.simpleName, entityClass.name, idClass.name, e.message) - null - } - } - - /** Diagnostic descriptor that records every property name + value class seen, recursing into describable nested components. */ - private class DescribePropertyDump : ComponentDescriptor { - private val lines = mutableListOf() - private var depth = 0 - private val visited = mutableSetOf() // identity-based cycle break - - private fun indent() = " ".repeat(depth) - - override fun describeProperty(name: String, value: Any?) { - val cls = value?.javaClass?.name ?: "null" - lines += "${indent()}- $name : $cls" - if (value is DescribableComponent && visited.add(System.identityHashCode(value))) { - depth++ - try { - value.describeTo(this) - } catch (_: Exception) { /* swallow — diagnostic only */ } - depth-- - } - } - - override fun describeProperty(name: String, value: Collection<*>?) { - lines += "${indent()}- $name : Collection(size=${value?.size ?: 0})" - } - - override fun describeProperty(name: String, value: Map<*, *>?) { - lines += "${indent()}- $name : Map(size=${value?.size ?: 0})" - } - - override fun describeProperty(name: String, value: String?) { - lines += "${indent()}- $name : String = $value" - } - - override fun describeProperty(name: String, value: Long?) { - lines += "${indent()}- $name : Long = $value" - } - - override fun describeProperty(name: String, value: Boolean?) { - lines += "${indent()}- $name : Boolean = $value" - } - - override fun describe(): String = "DescribePropertyDump" - - fun formatted(): String = lines.joinToString("\n") - } - - /** - * Drills through a repository's `describeTo` tree looking for a single instance of [target]. - * Property-name preference lets callers pick the canonical match (e.g. "entityEvolver") - * over wrapper variants like "initializingEntityEvolver". - */ - private class NamedDescribablePropertyFinder( - private val target: Class, - private val preferredName: String?, - ) : ComponentDescriptor { - private var preferred: T? = null - private var fallback: T? = null - - @Suppress("UNCHECKED_CAST") - override fun describeProperty(name: String, value: Any?) { - if (value == null) return - if (target.isInstance(value)) { - if (preferredName != null && name == preferredName && preferred == null) { - preferred = value as T - } else if (fallback == null) { - fallback = value as T - } - // Don't recurse into matched component — we already have what we need at this level. - return - } - if (value is DescribableComponent) { - value.describeTo(this) - } - } - - override fun describeProperty(name: String, value: Collection<*>?) { /* not needed */ } - override fun describeProperty(name: String, value: Map<*, *>?) { /* not needed */ } - override fun describeProperty(name: String, value: String?) { /* not needed */ } - override fun describeProperty(name: String, value: Long?) { /* not needed */ } - override fun describeProperty(name: String, value: Boolean?) { /* not needed */ } - override fun describe(): String = "NamedDescribablePropertyFinder<${target.simpleName}>" - - val found: T? get() = preferred ?: fallback + }.get() } // ------------------------------------------------------------------------------------------ @@ -572,9 +276,9 @@ open class RSocketModelInspectionResponder( // ------------------------------------------------------------------------------------------ /** - * Extracts a human-readable type name for the event. Events read directly from the - * [EventStorageEngine] often have a raw byte[] payload whose `payloadType()` returns `[B`. - * In that case the proper event type is available via `message.type().name()`. + * Extracts a human-readable type name. Events read from [EventStorageEngine] often have a + * raw `byte[]` payload whose `payloadType()` returns `[B`; the proper event type is in + * `message.type().name()`. */ private fun extractPayloadTypeName(message: EventMessage): String { return try { @@ -585,9 +289,9 @@ open class RSocketModelInspectionResponder( } /** - * Converts the event payload to a String. When reading directly from [EventStorageEngine], - * payloads are usually raw byte[] containing JSON or CBOR. We try UTF-8 decoding first - * (works for JSON), falling back to Jackson serialization for typed payloads. + * Converts the event payload to a String. Payloads sourced from the event store are + * usually raw `byte[]` containing JSON or CBOR. UTF-8 first (works for JSON) and Jackson as + * fallback for typed payloads. */ private fun extractPayloadAsString(message: EventMessage): String? { val payload = message.payload() ?: return null @@ -621,42 +325,48 @@ open class RSocketModelInspectionResponder( // Query handlers // ------------------------------------------------------------------------------------------ - private fun handleDomainEvents(query: ModelDomainEventsQuery): DomainEventsResult { + internal fun handleDomainEvents(query: ModelDomainEventsQuery): DomainEventsResult { logger.info("Handling Axoniq Platform MODEL_DOMAIN_EVENTS query for entity [{}] id [{}] idType [{}]", query.entityType, query.entityId, query.idType) val entityClass = Class.forName(query.entityType) val idClass = Class.forName(query.idType) - val criteria = resolveCriteria(entityClass, idClass, query.entityId) - val condition = SourcingCondition.conditionFor(criteria) - - val stream = eventStorageEngine.source(condition) - val allEvents = try { - stream.reduce(mutableListOf()) { acc, entry -> - val message = entry.message() - if (message != null && message !is TerminalEventMessage) { - acc.add(DomainEvent( - sequenceNumber = acc.size.toLong(), - timestamp = message.timestamp(), - payloadType = extractPayloadTypeName(message), - payload = extractPayloadAsString(message) - )) - } - acc - }.get() + val repository = lookupRepository(entityClass, idClass) + ?: return DomainEventsResult( + entityId = query.entityId, + entityType = query.entityType, + domainEvents = emptyList(), + page = query.page, + pageSize = query.pageSize, + totalCount = 0L, + ) + val typedId = deserializeEntityId(query.entityId, idClass) + ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") + + val collected = mutableListOf() + try { + withInspectionUoW( + repository = repository, + typedId = typedId, + beforeConsumer = { event, _ -> + collected += DomainEvent( + sequenceNumber = collected.size.toLong(), + timestamp = event.timestamp(), + payloadType = extractPayloadTypeName(event), + payload = extractPayloadAsString(event), + ) + }, + extract = {}, + ) } catch (e: Exception) { logger.error("Error while sourcing events for entity [{}] id [{}]", query.entityType, query.entityId, e) - mutableListOf() } - logger.info("Sourced [{}] events for entity [{}] id [{}]", - allEvents.size, query.entityType, query.entityId) - - val totalCount = allEvents.size.toLong() + val totalCount = collected.size.toLong() val start = query.page * query.pageSize - val end = minOf(start + query.pageSize, allEvents.size) - val pagedEvents = if (start < allEvents.size) allEvents.subList(start, end) else emptyList() + val end = minOf(start + query.pageSize, collected.size) + val pagedEvents = if (start < collected.size) collected.subList(start, end) else emptyList() return DomainEventsResult( entityId = query.entityId, @@ -668,47 +378,30 @@ open class RSocketModelInspectionResponder( ) } - /** - * Reconstructs the entity's state at a specific sequence number by replaying all events - * up to (and including) that sequence through the registered [EntityEvolver]. Returns - * the JSON-serialized state. If no EntityEvolver is registered for the entity, returns - * null state — frontend can treat that as "state reconstruction not available". - */ - private fun handleEntityStateAtSequence(query: ModelEntityStateAtSequenceQuery): EntityStateResult { + internal fun handleEntityStateAtSequence(query: ModelEntityStateAtSequenceQuery): EntityStateResult { logger.info("Handling Axoniq Platform MODEL_ENTITY_STATE_AT_SEQUENCE query for entity [{}] id [{}] idType [{}] seq [{}]", query.entityType, query.entityId, query.idType, query.maxSequenceNumber) val entityClass = Class.forName(query.entityType) val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return EntityStateResult( + type = query.entityType, + entityId = query.entityId, + maxSequenceNumber = query.maxSequenceNumber, + state = null, + ) val typedId = deserializeEntityId(query.entityId, idClass) ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") - val criteria = resolveCriteriaWithTypedId(entityClass, idClass, typedId) - val condition = SourcingCondition.conditionFor(criteria) - val evolver = obtainInitializingEvolver(entityClass, idClass) - - if (evolver == null) { - logger.warn("No InitializingEntityEvolver found for entity [{}] / id [{}] — returning null state", - query.entityType, query.idType) - return EntityStateResult( - type = query.entityType, - entityId = query.entityId, - maxSequenceNumber = query.maxSequenceNumber, - state = null, - ) - } - // Accumulator: (eventsConsumedSoFar, currentState). MessageStream doesn't expose the - // entry index, so we maintain it inline. - val stream = eventStorageEngine.source(condition) val finalState = try { - stream.reduce(StateHolder(0L, null)) { holder, entry -> - val message = entry.message() - if (message == null || message is TerminalEventMessage) return@reduce holder - // Sequence numbers are 0-indexed by event order. Negative maxSequenceNumber = "all events". - val withinWindow = query.maxSequenceNumber < 0 || holder.index <= query.maxSequenceNumber - if (!withinWindow) return@reduce holder - StateHolder(holder.index + 1, evolveSafely(evolver, typedId, holder.state, deserializePayload(message))) - }.get().state + withInspectionUoW( + repository = repository, + typedId = typedId, + // Negative sequence = "all events" — don't set MAX_INDEX so nothing is skipped. + maxIndex = if (query.maxSequenceNumber < 0) null else query.maxSequenceNumber, + extract = { entity -> entity }, + ) } catch (e: Exception) { logger.error("Error while reconstructing state for entity [{}] id [{}]", query.entityType, query.entityId, e) @@ -723,268 +416,77 @@ open class RSocketModelInspectionResponder( ) } - /** Reduce accumulator that tracks the running event index alongside the evolved state. */ - private data class StateHolder(val index: Long, val state: Any?) - - private fun evolveSafely( - evolver: InitializingEntityEvolver, - typedId: Any, - currentState: Any?, - message: EventMessage, - ): Any? { - return try { - // Capture JSON before AF5 dispatch so we can detect whether the metamodel actually - // mutated the entity, and so the [Evolve] log shows the real pre-mutation state. - // Without this snapshot, since entities mutate in place via @EventSourcingHandler, - // re-serializing currentState after evolve would just print the post-mutation state. - val beforeJson = if (currentState != null) safeJson(currentState) else "null" - - val result = invokeEvolveWithNullContext(evolver, typedId, currentState, message) - - val afterJsonInitial = if (result != null) safeJson(result) else "null" - val mutatedByMetamodel = currentState != null && beforeJson != afterJsonInitial - // If state already changed, the metamodel did its job — don't double-apply via - // reflection. If no change AND we have a non-null entity AND a typed payload, try - // direct reflection dispatch on the entity class. - val finalResult = if (!mutatedByMetamodel && result != null && message.payload() != null) { - applyEventViaReflection(result, message) - result - } else { - result - } - - if (logger.isInfoEnabled) { - logger.info("[Evolve] event=[{}] before=[{}] after=[{}] beforeIs=[{}] afterIs=[{}] reflectionFallback=[{}]", - message.payloadType().simpleName, - beforeJson, - safeJson(finalResult), - currentState?.let { System.identityHashCode(it) } ?: 0, - finalResult?.let { System.identityHashCode(it) } ?: 0, - !mutatedByMetamodel && result != null) - } - finalResult - } catch (e: Exception) { - logger.warn("EntityEvolver.evolve threw for event [{}]: {} — keeping previous state", - message.payloadType().name, e.cause?.message ?: e.message, e) - currentState - } - } - - /** - * Reflection-based fallback for entities where AF5's [AnnotationBasedEntityEvolvingComponent] - * dispatch silently no-ops in an ad-hoc inspection context (no real - * [org.axonframework.messaging.core.unitofwork.ProcessingContext], no message handler - * interceptor chain, etc.). - * - * Walks the entity's class hierarchy looking for methods annotated with - * [EventSourcingHandler] whose first parameter type accepts the message payload. Each match - * is invoked directly on the entity. The method is expected to mutate `this` in place - * (typical AF5 pattern: `void on(SomeEvent e) { this.field = e.field(); }`). For methods - * with extra parameters (e.g. for parameter resolvers), passes `null` so we don't break - * compilation, but those handlers may NPE — caught at the boundary. - * - * Idempotent re-invocation of an event handler is the caller's contract; we only invoke this - * path when the AF5 dispatch produced no observable mutation, so we won't re-apply changes. - */ - internal fun applyEventViaReflection(entity: Any, message: EventMessage) { - val payload = message.payload() ?: return - val payloadCls = payload.javaClass - val handlerCandidates = collectEventSourcingHandlers(entity.javaClass) - - // Path A: payload was already deserialized by [deserializePayload] (works when - // event.type().name() is a FQN that Class.forName resolves, e.g. with - // ClassBasedMessageTypeResolver). Match handlers whose first parameter accepts the - // typed payload directly. - if (payloadCls != ByteArray::class.java) { - for ((declaringClass, method) in handlerCandidates) { - val paramType = method.parameterTypes[0] - if (!paramType.isInstance(payload)) continue - invokeHandlerSafely(entity, declaringClass, method, payload, paramType) - return - } - return - } - - // Path B: payload is still raw byte[] because Class.forName couldn't resolve the type - // name (common with @Event(namespace=...) which produces names like - // "quickstart.OrderCreatedEvent" that don't match a real class). Resolve the handler by - // matching the simple class name extracted from message.type().name() against each - // handler's parameter simple name — picking exactly one. Convert byte[] to that - // specific type only. This avoids Jackson's permissive deserialization successfully - // returning a partially-filled instance of the wrong event class (e.g. OrderShippedEvent - // accepting an OrderCreatedEvent JSON because they share an `orderId` field). - val converter = payloadConverter ?: return - val typeName = message.type()?.name() ?: return - val expectedSimpleName = simpleNameFromMessageType(typeName) - val match = handlerCandidates.firstOrNull { (_, method) -> - method.parameterTypes[0].simpleName == expectedSimpleName - } ?: run { - logger.debug("[ModelInspection] No @EventSourcingHandler param simpleName matches [{}] (from type=[{}]) on [{}]", - expectedSimpleName, typeName, entity.javaClass.simpleName) - return - } - val (declaringClass, method) = match - val paramType = method.parameterTypes[0] - val typedArg: Any? = try { - message.payloadAs(paramType, converter) - } catch (e: Exception) { - logger.debug("[ModelInspection] Converter failed for type=[{}] -> param=[{}]: {}", - typeName, paramType.simpleName, e.message) - null - } - if (typedArg == null || !paramType.isInstance(typedArg)) { - logger.debug("[ModelInspection] Converter returned non-matching payload for type=[{}] (target=[{}])", - typeName, paramType.simpleName) - return - } - invokeHandlerSafely(entity, declaringClass, method, typedArg, paramType) - } - - /** - * Extracts the simple Java class name from an [org.axonframework.messaging.core.MessageType] - * name string. Handles both formats: - * - FQN with optional `$` (nested class): `io.axoniq.quickstart.reservation.event.ReservationEvents$SeatReservedEvent` - * → `SeatReservedEvent` - * - Namespaced short form from `@Event(namespace=...)`: `quickstart.OrderCreatedEvent` - * → `OrderCreatedEvent` - */ - internal fun simpleNameFromMessageType(typeName: String): String { - val afterDollar = typeName.substringAfterLast('$') - return afterDollar.substringAfterLast('.') - } - - private fun invokeHandlerSafely( - entity: Any, - declaringClass: Class<*>, - method: java.lang.reflect.Method, - payload: Any, - paramType: Class<*>, - ) { - try { - method.isAccessible = true - val args: Array = if (method.parameterCount == 1) { - arrayOf(payload) - } else { - Array(method.parameterCount) { idx -> if (idx == 0) payload else null } - } - method.invoke(entity, *args) - logger.debug("[ModelInspection] Reflection dispatch invoked [{}#{}] for param=[{}]", - declaringClass.simpleName, method.name, paramType.simpleName) - } catch (e: Exception) { - logger.debug("[ModelInspection] Reflection dispatch failed for [{}#{}]: {}", - declaringClass.simpleName, method.name, e.cause?.message ?: e.message) - } - } - - /** - * Walks the entity's class hierarchy collecting `@EventSourcingHandler` methods with at - * least one parameter (the event). Returned in declaring-class-first order. - */ - private fun collectEventSourcingHandlers(entityClass: Class<*>): List, java.lang.reflect.Method>> { - val result = mutableListOf, java.lang.reflect.Method>>() - var current: Class<*>? = entityClass - while (current != null && current != Any::class.java) { - for (method in current.declaredMethods) { - if (!method.isAnnotationPresent(EventSourcingHandler::class.java)) continue - if (method.parameterCount < 1) continue - result.add(current to method) - } - current = current.superclass - } - return result - } - - private fun safeJson(value: Any?): String = - if (value == null) "null" - else try { - objectMapper.writeValueAsString(value) - } catch (e: Exception) { - "" - } - - /** - * Replays all events for the entity through the registered [EntityEvolver] and emits a - * timeline of `(sequence, event, stateBefore, stateAfter)` entries within the requested window. - * - * State serialization is JSON via Jackson (handles records, Kotlin data classes, POJOs). - * State strings are byte-truncated via [String.truncateToBytes] so a single oversize entity - * cannot blow gRPC/RSocket message size limits. - * - * If no EntityEvolver is registered for the entity (e.g. non-event-sourced repositories), - * `stateBefore`/`stateAfter` are emitted as null and the frontend can collapse that section. - */ - private fun handleTimelineReplay(query: ModelTimelineQuery): ModelTimelineResult { + internal fun handleTimelineReplay(query: ModelTimelineQuery): ModelTimelineResult { logger.info("Handling Axoniq Platform MODEL_REPLAY_TIMELINE query for entity [{}] id [{}] idType [{}] offset [{}] limit [{}]", query.entityType, query.entityId, query.idType, query.offset, query.limit) val entityClass = Class.forName(query.entityType) val idClass = Class.forName(query.idType) + val repository = lookupRepository(entityClass, idClass) + ?: return ModelTimelineResult( + entityType = query.entityType, + entityId = query.entityId, + entries = emptyList(), + offset = query.offset, + totalEvents = 0, + truncated = false, + ) val typedId = deserializeEntityId(query.entityId, idClass) ?: throw IllegalArgumentException("Could not deserialize id [${query.entityId}] as [${query.idType}]") - val criteria = resolveCriteriaWithTypedId(entityClass, idClass, typedId) - val condition = SourcingCondition.conditionFor(criteria) - val evolver = obtainInitializingEvolver(entityClass, idClass) - if (evolver == null) { - logger.warn("No InitializingEntityEvolver found for entity [{}] / id [{}] — timeline state fields will be null", - query.entityType, query.idType) - } val offset = maxOf(0, query.offset) val limit = if (query.limit <= 0) 100 else query.limit - val maxStateSizeBytes = 100 * 1024 // 100 KB per state snapshot before truncation - val maxEventSizeBytes = 50 * 1024 // 50 KB per event payload before truncation + val maxStateSizeBytes = 100 * 1024 + val maxEventSizeBytes = 50 * 1024 val entries = mutableListOf() - var totalEvents = 0 - var currentState: Any? = null + val totalEvents = intArrayOf(0) + // BEFORE/AFTER snapshots have to be paired — capture stateBefore in BEFORE_CONSUMER so it + // reflects the pre-evolve state even when the entity mutates in place. + val pending = arrayOfNulls(2) // [eventMessage, stateBeforeJson] - val stream = eventStorageEngine.source(condition) try { - stream.reduce(Unit) { _, entry -> - val message = entry.message() - if (message == null || message is TerminalEventMessage) return@reduce Unit - - val seq = totalEvents.toLong() - totalEvents++ - - // Capture stateBefore JSON eagerly. Reservation-style entities mutate in place - // via reflection fallback (see [evolveSafely]) so currentState and nextState are - // the same instance. If we serialize stateBefore after evolveSafely, both fields - // would show the post-mutation state and the timeline UI couldn't diff between - // events. Snapshot the JSON now while currentState is still pre-mutation. - val stateBeforeJson = stateAsJson(currentState).truncateToBytes(maxStateSizeBytes) - - val nextState = evolver?.let { evolveSafely(it, typedId, currentState, deserializePayload(message)) } ?: currentState - - if (seq >= offset && entries.size < limit) { - entries.add(ModelTimelineEntry( - sequenceNumber = seq, - timestamp = message.timestamp().toString(), - eventType = extractPayloadTypeName(message), - eventPayload = extractPayloadAsString(message).truncateToBytes(maxEventSizeBytes), - stateBefore = stateBeforeJson, - stateAfter = stateAsJson(nextState).truncateToBytes(maxStateSizeBytes), - )) - } - currentState = nextState - Unit - }.get() + withInspectionUoW( + repository = repository, + typedId = typedId, + beforeConsumer = { event, stateBefore -> + pending[0] = event + pending[1] = stateAsJson(stateBefore).truncateToBytes(maxStateSizeBytes) + }, + afterConsumer = { event, stateAfter -> + val seq = totalEvents[0].toLong() + totalEvents[0]++ + if (seq >= offset && entries.size < limit) { + entries += ModelTimelineEntry( + sequenceNumber = seq, + timestamp = event.timestamp().toString(), + eventType = extractPayloadTypeName(event), + eventPayload = extractPayloadAsString(event).truncateToBytes(maxEventSizeBytes), + stateBefore = pending[1] as String?, + stateAfter = stateAsJson(stateAfter).truncateToBytes(maxStateSizeBytes), + ) + } + pending[0] = null + pending[1] = null + }, + extract = {}, + ) } catch (e: Exception) { logger.error("Error while sourcing events for timeline of entity [{}] id [{}]", query.entityType, query.entityId, e) } - val remainingAfterWindow = maxOf(0, totalEvents - offset - entries.size) + val remainingAfterWindow = maxOf(0, totalEvents[0] - offset - entries.size) val truncated = remainingAfterWindow > 0 logger.info("Sourced [{}] events for timeline of [{}] id [{}] (returning [{}] from offset [{}], truncated={})", - totalEvents, query.entityType, query.entityId, entries.size, offset, truncated) + totalEvents[0], query.entityType, query.entityId, entries.size, offset, truncated) return ModelTimelineResult( entityType = query.entityType, entityId = query.entityId, entries = entries, offset = offset, - totalEvents = totalEvents, + totalEvents = totalEvents[0], truncated = truncated, ) } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt index 4d3241e8..60d5a25e 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/messaging/HandlerMeasurement.kt @@ -58,10 +58,6 @@ class HandlerMeasurement( } fun registerMetricValue(metric: Metric, timeInNs: Long) { - if (completedTime != null) { - logger.warn { "HandlerMeasurement for handler [${message.type()}] is already completed. Can not register metric [$metric] with value [$timeInNs]. Ignoring." } - return - } registeredMetrics.compute(metric) { _, it -> // Sum the metric if it was already registered (it ?: 0L) + timeInNs @@ -69,10 +65,6 @@ class HandlerMeasurement( } fun reportMessageDispatched(messageIdentifier: MessageIdentifier) { - if (completedTime != null) { - logger.warn { "HandlerMeasurement for handler [${message.type()}] is already completed. Can not report dispatched message [$messageIdentifier]. Ignoring." } - return - } dispatchedMessages.add(messageIdentifier) } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt index 8ac2f69b..f029c5bd 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/AxoniqPlatformStateManager.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2025. AxonIQ B.V. + * Copyright (c) 2022-2026. AxonIQ B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,10 @@ class AxoniqPlatformStateManager( private val delegate: StateManager ): StateManager { override fun register(repository: Repository): StateManager { + if(repository is AxoniqPlatformRepository) { + delegate.register(repository) + return this + } delegate.register(AxoniqPlatformRepository(repository)) return this } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java index 6c80d206..3a7c4e59 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java +++ b/framework-client/src/main/java/io/axoniq/platform/framework/modelling/ModellingDecorators.java @@ -35,21 +35,27 @@ private ModellingDecorators() { static void apply(ComponentRegistry registry) { registry.registerDecorator(DecoratorDefinition.forType(StateManager.class) - .with((cc, name, delegate) -> - new AxoniqPlatformStateManager(delegate)) + .with((cc, name, delegate) -> { + if(delegate instanceof AxoniqPlatformStateManager) { + return delegate; + } + return new AxoniqPlatformStateManager(delegate); + }) .order(Integer.MAX_VALUE)); UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> { componentRegistry .registerDecorator(DecoratorDefinition.forType(Repository.class) .with((cc, name, delegate) -> + delegate instanceof AxoniqPlatformRepository ? delegate : new AxoniqPlatformRepository<>(delegate)) .order(Integer.MIN_VALUE)) .registerDecorator(DecoratorDefinition.forType(StateManager.class) .with((cc, name, delegate) -> + delegate instanceof AxoniqPlatformStateManager ? delegate : new AxoniqPlatformStateManager(delegate)) .order(Integer.MAX_VALUE)); return null; - }); + }, true); } } diff --git a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt index 6eefda30..2fcfbbc3 100644 --- a/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt +++ b/framework-client/src/main/java/io/axoniq/platform/framework/utils.kt @@ -18,6 +18,7 @@ package io.axoniq.platform.framework import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer import org.axonframework.common.ReflectionUtils +import org.axonframework.common.configuration.BaseModule import org.axonframework.common.configuration.ComponentRegistry import org.axonframework.common.configuration.Module import java.lang.reflect.Field @@ -188,13 +189,19 @@ fun String?.truncateToBytes(maxBytes: Int): String? { return truncatedContent + truncationMessage } -fun ComponentRegistry.doOnSubModules(block: (ComponentRegistry, Module?) -> Unit) { +fun ComponentRegistry.doOnSubModules(block: (ComponentRegistry, Module?) -> Unit, recursive: Boolean = true) { val modules = this.getPropertyValue>("modules") modules?.forEach { entry -> val module = entry.value - module.getPropertyValue("componentRegistry")?.let { cr -> - block(cr, module) - cr.doOnSubModules(block) + block(this, module) + if (recursive && module is BaseModule<*>) { + // BaseModule's nested registry only materialises when the module is built. Defer + // the inner walk via the public componentRegistry(action) API; the action runs on + // the module's own registry at build time, with arbitrary-depth nesting visible to + // recursive doOnSubModules calls inside. + module.componentRegistry { innerRegistry -> + innerRegistry.doOnSubModules(block, recursive) + } } } } \ No newline at end of file diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt index d67617de..dca2a611 100644 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/AxoniqPlatformModelInspectionEnhancerTest.kt @@ -23,24 +23,32 @@ import io.mockk.verify import org.axonframework.common.configuration.ComponentDefinition import org.axonframework.common.configuration.ComponentRegistry import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.modelling.StateManager import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test /** - * Verifies the registration guard in [AxoniqPlatformModelInspectionEnhancer]: the - * inspection responder must register only when the application has all three required - * components (StateManager + EventStorageEngine + RSocketHandlerRegistrar). Missing any - * of them is the AF4 / non-event-sourced case and registering would NPE on first request. + * Verifies the guard in [AxoniqPlatformModelInspectionEnhancer]: the responder must register + * only when the platform client is wired ([RSocketHandlerRegistrar] present) and an + * [EventStorageEngine] is available. Missing either is the no-event-sourcing / no-platform-client + * case where registering would have nothing to act on. + * + * The enhancer itself does not probe for [EventStorageEngine] directly — it first probes for + * {@code axon-eventsourcing} on the classpath and delegates the rest of the wiring to + * [ModelInspectionDecorators], which is where the [EventStorageEngine] check lives. In tests + * the classpath probe always succeeds (axon-eventsourcing is a test dependency), so we exercise + * both branches via the registry mocks. + * + * StateManager is intentionally NOT probed: ModellingConfigurationDefaults registers it at + * Integer.MAX_VALUE, after this enhancer's order, so the probe would falsely return false + * during a real boot. */ class AxoniqPlatformModelInspectionEnhancerTest { private val enhancer = AxoniqPlatformModelInspectionEnhancer() @Test - fun `registers the responder when StateManager, EventStorageEngine and RSocketHandlerRegistrar are all present`() { + fun `registers the responder when EventStorageEngine and RSocketHandlerRegistrar are both present`() { val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns true every { registry.hasComponent(EventStorageEngine::class.java) } returns true every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true @@ -50,21 +58,8 @@ class AxoniqPlatformModelInspectionEnhancerTest { } @Test - fun `skips registration when StateManager is missing — typical AF4 application`() { + fun `skips registration when EventStorageEngine is missing — typical AF4 application`() { val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns false - every { registry.hasComponent(EventStorageEngine::class.java) } returns true - every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true - - enhancer.enhance(registry) - - verify(exactly = 0) { registry.registerComponent(any>()) } - } - - @Test - fun `skips registration when EventStorageEngine is missing`() { - val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns true every { registry.hasComponent(EventStorageEngine::class.java) } returns false every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns true @@ -76,8 +71,6 @@ class AxoniqPlatformModelInspectionEnhancerTest { @Test fun `skips registration when RSocketHandlerRegistrar is missing — console client not wired`() { val registry = mockk(relaxed = true) - every { registry.hasComponent(StateManager::class.java) } returns true - every { registry.hasComponent(EventStorageEngine::class.java) } returns true every { registry.hasComponent(RSocketHandlerRegistrar::class.java) } returns false enhancer.enhance(registry) diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt index 32d13030..3152e327 100644 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderHelpersTest.kt @@ -20,7 +20,6 @@ import io.axoniq.platform.framework.client.RSocketHandlerRegistrar import io.mockk.mockk import org.axonframework.common.configuration.Configuration import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.modelling.StateManager import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue @@ -32,9 +31,8 @@ import java.util.UUID /** * Unit tests for the pure-logic helpers on [RSocketModelInspectionResponder] that don't - * require a live AF5 configuration / event store. These helpers govern public-facing - * behaviour (id-type description for the FE form, MessageType-name parsing for handler - * dispatch) so regressing them silently breaks the inspection UI. + * require a live AF5 configuration / event store. These helpers govern the FE id-type form, + * so regressing them silently breaks the inspection UI. */ class RSocketModelInspectionResponderHelpersTest { @@ -45,46 +43,12 @@ class RSocketModelInspectionResponderHelpersTest { // The helpers under test never reach into these dependencies, so simple unrecorded // mocks are enough — we don't need MockK relaxed mocks elsewhere. responder = RSocketModelInspectionResponder( - stateManager = mockk(), eventStorageEngine = mockk(), registrar = mockk(), configuration = mockk(), ) } - // --------------------------------------------------------------------------------------- - // simpleNameFromMessageType - // - // Drives Path B handler resolution in applyEventViaReflection. Must produce the same - // simple name regardless of whether the MessageType.name() is a fully qualified class - // name (default ClassBasedMessageTypeResolver) or a namespaced short name from - // @Event(namespace = ...). - // --------------------------------------------------------------------------------------- - - @Test - fun `simpleNameFromMessageType strips package for fully qualified class name`() { - val name = "io.axoniq.quickstart.reservation.event.ReservationEvents\$SeatReservedEvent" - assertEquals("SeatReservedEvent", responder.simpleNameFromMessageType(name)) - } - - @Test - fun `simpleNameFromMessageType strips namespace for short namespaced form`() { - // Format produced by @Event(namespace = "quickstart") on a record class - assertEquals("OrderCreatedEvent", responder.simpleNameFromMessageType("quickstart.OrderCreatedEvent")) - } - - @Test - fun `simpleNameFromMessageType is identity for a single segment`() { - assertEquals("Foo", responder.simpleNameFromMessageType("Foo")) - } - - @Test - fun `simpleNameFromMessageType prefers dollar over dot when both present`() { - // A nested class with a namespaced prefix would be a strange case but the heuristic - // still produces the right simple class name. - assertEquals("Inner", responder.simpleNameFromMessageType("quickstart.Outer\$Inner")) - } - // --------------------------------------------------------------------------------------- // isSimpleIdType // diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt new file mode 100644 index 00000000..646f5411 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderIntegrationTest.kt @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelDomainEventsQuery +import io.axoniq.platform.framework.api.ModelEntityStateAtSequenceQuery +import io.axoniq.platform.framework.api.ModelTimelineQuery +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.BaseModule +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.common.configuration.Configuration +import org.axonframework.common.configuration.LifecycleRegistry +import org.axonframework.common.configuration.Module +import org.axonframework.eventsourcing.annotation.EventSourcedEntity +import org.axonframework.eventsourcing.annotation.EventTag +import org.axonframework.eventsourcing.annotation.reflection.EntityCreator +import org.axonframework.eventsourcing.annotation.reflection.InjectEntityId +import org.axonframework.eventsourcing.configuration.EventSourcedEntityModule +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.eventsourcing.annotation.EventSourcingHandler +import org.axonframework.messaging.commandhandling.configuration.CommandHandlingModule +import org.axonframework.messaging.eventhandling.EventSink +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.messaging.core.MessageType +import org.axonframework.modelling.SimpleStateManager +import org.axonframework.modelling.StateManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * End-to-end test for the model inspection feature: spins up a real + * [EventSourcingConfigurer] with the inspection enhancer wired in, registers an + * event-sourced entity in a sub-module, publishes events through the in-memory event + * store, then drives the responder's query handlers and asserts on their outputs. + * + * The submodule structure is what makes this interesting: the entity is registered as a + * nested module (via [EventSourcedEntityModule.autodetected]) so the enhancer's + * `doOnSubModules` walker has to drill in to find it. A bug there would cause the + * registered-entities query to come back empty. + */ +class RSocketModelInspectionResponderIntegrationTest { + + private lateinit var configuration: AxonConfiguration + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + // Build a minimal AF5 application: + // - InMemoryEventStorageEngine (default) + // - one annotated event-sourced entity registered as a sub-module + // - a stub RSocketHandlerRegistrar (the inspection enhancer requires its presence, + // but we don't exercise the RSocket transport — we call responder methods directly) + // - the AxoniqPlatformModelInspectionEnhancer registered manually + // The AxoniqPlatformModelInspectionEnhancer is auto-discovered via the + // META-INF/services SPI registration — no need to register it manually. + configuration = EventSourcingConfigurer.create() + .registerEntity(EventSourcedEntityModule.autodetected(String::class.java, Reservation::class.java)) + .componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(RSocketHandlerRegistrar::class.java) + .withBuilder { RSocketHandlerRegistrar(CborJackson3EncodingStrategy()) }) + } + .start() + + responder = configuration.getComponent(RSocketModelInspectionResponder::class.java) + + // Publish a known sequence of events for entity "RES-1". + val sink = configuration.getComponent(EventSink::class.java) + sink.publish( + null, + event(ReservationCreated("RES-1", "alice")), + event(ReservationConfirmed("RES-1")), + event(ReservationCancelled("RES-1", "double-booked")), + ).get() + } + + @AfterEach + fun tearDown() { + configuration.shutdown() + } + + private fun event(payload: Any) = GenericEventMessage( + MessageType(payload.javaClass), + payload, + ) + + // ------------------------------------------------------------------------------------------ + // Registered entities + // ------------------------------------------------------------------------------------------ + + @Test + fun `registered entities query discovers entities defined in nested modules`() { + val result = invokeRegisteredEntities() + assertEquals(1, result.entities.size, "expected the Reservation entity to be visible") + + val entity = result.entities.first() + assertEquals(Reservation::class.java.name, entity.entityType) + assertEquals(1, entity.idTypes.size) + assertEquals(String::class.java.name, entity.idTypes.first().type) + // String is a simple id type — no sub-fields surface to the FE. + assertTrue(entity.idTypes.first().idFields.isEmpty()) + } + + // ------------------------------------------------------------------------------------------ + // Domain events listing + // ------------------------------------------------------------------------------------------ + + @Test + fun `domain events query returns the published events in publication order with typed names`() { + val result = responder.handleDomainEvents(domainEventsQuery("RES-1")) + + assertEquals(3, result.totalCount, "all three events should be returned") + assertEquals(3, result.domainEvents.size) + + val payloadTypes = result.domainEvents.map { it.payloadType } + assertEquals( + listOf( + ReservationCreated::class.java.name, + ReservationConfirmed::class.java.name, + ReservationCancelled::class.java.name, + ), + payloadTypes, + ) + + // Sequence numbers are 0-indexed and dense across the listed events. + assertEquals(listOf(0L, 1L, 2L), result.domainEvents.map { it.sequenceNumber }) + } + + @Test + fun `domain events query returns empty result for an unknown entity id without throwing`() { + val result = responder.handleDomainEvents(domainEventsQuery("RES-DOES-NOT-EXIST")) + assertEquals(0, result.totalCount) + assertTrue(result.domainEvents.isEmpty()) + } + + // ------------------------------------------------------------------------------------------ + // Entity state at sequence + // ------------------------------------------------------------------------------------------ + + @Test + fun `entity state at sequence reconstructs intermediate state by replaying through the metamodel`() { + // After event 0 (created): status = CREATED, eventCount = 1 + val afterCreated = responder.handleEntityStateAtSequence(stateQuery("RES-1", 0)) + assertNotNull(afterCreated.state) + assertTrue(afterCreated.state!!.contains("\"status\":\"CREATED\""), afterCreated.state) + assertTrue(afterCreated.state!!.contains("\"eventCount\":1"), afterCreated.state) + + // After event 1 (confirmed): status = CONFIRMED, eventCount = 2 + val afterConfirmed = responder.handleEntityStateAtSequence(stateQuery("RES-1", 1)) + assertNotNull(afterConfirmed.state) + assertTrue(afterConfirmed.state!!.contains("\"status\":\"CONFIRMED\""), afterConfirmed.state) + assertTrue(afterConfirmed.state!!.contains("\"eventCount\":2"), afterConfirmed.state) + + // After event 2 (cancelled): status = CANCELLED, eventCount = 3, reason captured + val afterCancelled = responder.handleEntityStateAtSequence(stateQuery("RES-1", 2)) + assertNotNull(afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"status\":\"CANCELLED\""), afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"eventCount\":3"), afterCancelled.state) + assertTrue(afterCancelled.state!!.contains("\"cancelReason\":\"double-booked\""), afterCancelled.state) + } + + @Test + fun `entity state at negative sequence replays all events`() { + val result = responder.handleEntityStateAtSequence(stateQuery("RES-1", -1)) + assertNotNull(result.state) + assertTrue(result.state!!.contains("\"status\":\"CANCELLED\"")) + assertTrue(result.state!!.contains("\"eventCount\":3")) + } + + @Test + fun `entity state for unknown id returns null state`() { + val result = responder.handleEntityStateAtSequence(stateQuery("RES-MISSING", -1)) + assertNull(result.state) + } + + // ------------------------------------------------------------------------------------------ + // Timeline replay + // ------------------------------------------------------------------------------------------ + + @Test + fun `timeline replay produces stateBefore and stateAfter pairs for every event`() { + val result = responder.handleTimelineReplay(timelineQuery("RES-1")) + + assertEquals(3, result.totalEvents) + assertEquals(3, result.entries.size) + + // Event 0 — AF5's factory has just created the entity (defaults applied: status=CREATED, + // eventCount=0, customerId=null) BEFORE the @EventSourcingHandler runs. So stateBefore + // captures that just-constructed shape; stateAfter captures the post-handler state with + // customerId set and eventCount=1. + val first = result.entries[0] + assertEquals(0L, first.sequenceNumber) + assertEquals(ReservationCreated::class.java.name, first.eventType) + assertNotNull(first.stateBefore) + assertTrue(first.stateBefore!!.contains("\"eventCount\":0")) + assertTrue(first.stateBefore!!.contains("\"customerId\":null")) + assertNotNull(first.stateAfter) + assertTrue(first.stateAfter!!.contains("\"eventCount\":1")) + assertTrue(first.stateAfter!!.contains("\"customerId\":\"alice\"")) + + // Event 1 — stateBefore = CREATED, stateAfter = CONFIRMED. + val second = result.entries[1] + assertEquals(1L, second.sequenceNumber) + assertTrue(second.stateBefore!!.contains("\"status\":\"CREATED\"")) + assertTrue(second.stateAfter!!.contains("\"status\":\"CONFIRMED\"")) + + // Event 2 — stateBefore = CONFIRMED, stateAfter = CANCELLED. + val third = result.entries[2] + assertEquals(2L, third.sequenceNumber) + assertTrue(third.stateBefore!!.contains("\"status\":\"CONFIRMED\"")) + assertTrue(third.stateAfter!!.contains("\"status\":\"CANCELLED\"")) + } + + @Test + fun `timeline replay honours offset and limit`() { + val result = responder.handleTimelineReplay(timelineQuery("RES-1", offset = 1, limit = 1)) + + // Total still reflects the full stream so the FE can drive pagination. + assertEquals(3, result.totalEvents) + assertEquals(1, result.entries.size) + assertEquals(1L, result.entries.first().sequenceNumber) + assertEquals(ReservationConfirmed::class.java.name, result.entries.first().eventType) + assertTrue(result.truncated, "events remain after the requested window") + } + + // ------------------------------------------------------------------------------------------ + // Helpers + // ------------------------------------------------------------------------------------------ + + private fun invokeRegisteredEntities() = responder.handleRegisteredEntities() + + private fun domainEventsQuery(id: String) = ModelDomainEventsQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + ) + + private fun stateQuery(id: String, maxSeq: Long) = ModelEntityStateAtSequenceQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + maxSequenceNumber = maxSeq, + ) + + private fun timelineQuery(id: String, offset: Int = 0, limit: Int = 100) = ModelTimelineQuery( + entityType = Reservation::class.java.name, + entityId = id, + idType = String::class.java.name, + offset = offset, + limit = limit, + ) + + // ------------------------------------------------------------------------------------------ + // Test fixture: entity + events + // ------------------------------------------------------------------------------------------ + + /** + * Status enum (declared as a top-level type within the test file) — using an enum gives us + * a state field whose JSON representation is a clean string we can assert against. + */ + enum class Status { CREATED, CONFIRMED, CANCELLED } + + /** + * Mutable event-sourced entity with `@EventSourcingHandler` methods. We mutate in place + * (the AF5 default for annotated entities) so the test exercises the same dispatch path + * a real application uses. + */ + @EventSourcedEntity(tagKey = "reservationId") + class Reservation @EntityCreator constructor( + // @InjectEntityId disambiguates the id from the payload parameter — without it, + // AF5 treats the first ctor arg as the event payload type and no match exists. + @Suppress("unused") @InjectEntityId val reservationId: String, + ) { + var status: Status = Status.CREATED + var customerId: String? = null + var cancelReason: String? = null + var eventCount: Int = 0 + + @EventSourcingHandler + fun on(event: ReservationCreated) { + customerId = event.customerId + status = Status.CREATED + eventCount++ + } + + @EventSourcingHandler + fun on(@Suppress("unused") event: ReservationConfirmed) { + status = Status.CONFIRMED + eventCount++ + } + + @EventSourcingHandler + fun on(event: ReservationCancelled) { + status = Status.CANCELLED + cancelReason = event.reason + eventCount++ + } + } + + data class ReservationCreated( + @field:EventTag(key = "reservationId") val reservationId: String, + val customerId: String, + ) + + data class ReservationConfirmed( + @field:EventTag(key = "reservationId") val reservationId: String, + ) + + data class ReservationCancelled( + @field:EventTag(key = "reservationId") val reservationId: String, + val reason: String, + ) +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt new file mode 100644 index 00000000..363d4098 --- /dev/null +++ b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderNestedModuleIntegrationTest.kt @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2022-2026. AxonIQ B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.axoniq.platform.framework.eventsourcing + +import io.axoniq.platform.framework.api.ModelEntityStateAtSequenceQuery +import io.axoniq.platform.framework.client.RSocketHandlerRegistrar +import io.axoniq.platform.framework.client.strategy.CborJackson3EncodingStrategy +import org.axonframework.common.configuration.AxonConfiguration +import org.axonframework.common.configuration.BaseModule +import org.axonframework.common.configuration.ComponentDefinition +import org.axonframework.eventsourcing.annotation.EventSourcedEntity +import org.axonframework.eventsourcing.annotation.EventTag +import org.axonframework.eventsourcing.annotation.reflection.EntityCreator +import org.axonframework.eventsourcing.annotation.reflection.InjectEntityId +import org.axonframework.eventsourcing.annotation.EventSourcingHandler +import org.axonframework.eventsourcing.configuration.EventSourcedEntityModule +import org.axonframework.eventsourcing.configuration.EventSourcingConfigurer +import org.axonframework.messaging.core.MessageType +import org.axonframework.messaging.eventhandling.EventSink +import org.axonframework.messaging.eventhandling.GenericEventMessage +import org.axonframework.modelling.SimpleStateManager +import org.axonframework.modelling.StateManager +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +/** + * End-to-end test for the case the existing integration test doesn't cover: an event-sourced + * entity buried inside a custom user-defined [BaseModule]. Without this, you can't tell whether + * inspection works only for the conveniently top-level entity registration that + * `registerEntity(...)` produces, or also for arbitrary user nesting. + * + * The setup: + * - {@link OuterEntity} registered at the top level via the usual `registerEntity(...)` path. + * - [MySubModule] — a hand-rolled [BaseModule] that registers its own [StateManager] and an + * {@link InnerEntity} inside it via `registerModule(EventSourcedEntityModule.autodetected(...))`. + * + * Both entities must surface in the registered-entities query, and queries against the inner one + * must reconstruct state correctly — proving the model-inspection enhancer's submodule walk + * reaches arbitrary depth, not just one level. + */ +class RSocketModelInspectionResponderNestedModuleIntegrationTest { + + private lateinit var configuration: AxonConfiguration + private lateinit var responder: RSocketModelInspectionResponder + + @BeforeEach + fun setUp() { + configuration = EventSourcingConfigurer.create() + .registerEntity(EventSourcedEntityModule.autodetected(String::class.java, OuterEntity::class.java)) + .componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(RSocketHandlerRegistrar::class.java) + .withBuilder { RSocketHandlerRegistrar(CborJackson3EncodingStrategy()) }) + // The custom BaseModule lives directly under the root component registry. + // Inside it, a further sub-module registers InnerEntity — two levels deep. + cr.registerModule(MySubModule()) + } + .start() + + responder = configuration.getComponent(RSocketModelInspectionResponder::class.java) + + val sink = configuration.getComponent(EventSink::class.java) + sink.publish( + null, + event(OuterCreated("OUTER-1", "blue")), + event(InnerOpened("INNER-1", 42)), + event(InnerClosed("INNER-1")), + ).get() + } + + @AfterEach + fun tearDown() { + configuration.shutdown() + } + + private fun event(payload: Any) = GenericEventMessage(MessageType(payload.javaClass), payload) + + @Test + fun `registered entities query surfaces both top-level and deeply nested entities`() { + val result = responder.handleRegisteredEntities() + val typeNames = result.entities.map { it.entityType }.toSet() + + assertTrue(typeNames.contains(OuterEntity::class.java.name), + "expected OuterEntity (top-level) to be registered") + assertTrue(typeNames.contains(InnerEntity::class.java.name), + "expected InnerEntity (nested inside MySubModule) to be registered — submodule walker must reach it") + } + + @Test + fun `state at sequence reconstructs the inner entity in the nested module`() { + val result = responder.handleEntityStateAtSequence(ModelEntityStateAtSequenceQuery( + entityType = InnerEntity::class.java.name, + entityId = "INNER-1", + idType = String::class.java.name, + maxSequenceNumber = -1, + )) + + assertNotNull(result.state, "state must be reconstructed for the inner entity") + assertTrue(result.state!!.contains("\"open\":false"), result.state) + assertTrue(result.state!!.contains("\"value\":42"), result.state) + } + + @Test + fun `state at sequence reconstructs the outer entity registered at the root`() { + val result = responder.handleEntityStateAtSequence(ModelEntityStateAtSequenceQuery( + entityType = OuterEntity::class.java.name, + entityId = "OUTER-1", + idType = String::class.java.name, + maxSequenceNumber = -1, + )) + + assertNotNull(result.state) + assertTrue(result.state!!.contains("\"colour\":\"blue\""), result.state) + } + + // ------------------------------------------------------------------------------------------ + // Test fixtures + // ------------------------------------------------------------------------------------------ + + /** + * Custom user module that owns its own [StateManager] and registers an event-sourced entity + * as a sub-module. Mirrors how a real application might package a bounded context. + */ + class MySubModule : BaseModule("MySubModule") { + init { + componentRegistry { cr -> + cr.registerComponent(ComponentDefinition.ofType(StateManager::class.java) + .withBuilder { SimpleStateManager.named("MySubModuleStateManager") }) + cr.registerModule(EventSourcedEntityModule.autodetected(String::class.java, InnerEntity::class.java)) + } + } + } + + @EventSourcedEntity(tagKey = "outerId") + class OuterEntity @EntityCreator constructor( + @Suppress("unused") @InjectEntityId val outerId: String, + ) { + var colour: String = "" + + @EventSourcingHandler + fun on(event: OuterCreated) { + colour = event.colour + } + } + + data class OuterCreated( + @field:EventTag(key = "outerId") val outerId: String, + val colour: String, + ) + + @EventSourcedEntity(tagKey = "innerId") + class InnerEntity @EntityCreator constructor( + @Suppress("unused") @InjectEntityId val innerId: String, + ) { + var open: Boolean = false + var value: Int = 0 + + @EventSourcingHandler + fun on(event: InnerOpened) { + open = true + value = event.value + } + + @EventSourcingHandler + fun on(@Suppress("unused") event: InnerClosed) { + open = false + } + } + + data class InnerOpened( + @field:EventTag(key = "innerId") val innerId: String, + val value: Int, + ) + + data class InnerClosed( + @field:EventTag(key = "innerId") val innerId: String, + ) +} diff --git a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt b/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt deleted file mode 100644 index ccbafd53..00000000 --- a/framework-client/src/test/kotlin/io/axoniq/platform/framework/eventsourcing/RSocketModelInspectionResponderReflectionDispatchTest.kt +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Copyright (c) 2022-2026. AxonIQ B.V. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.axoniq.platform.framework.eventsourcing - -import io.axoniq.platform.framework.client.RSocketHandlerRegistrar -import io.mockk.every -import io.mockk.mockk -import org.axonframework.common.configuration.Configuration -import org.axonframework.conversion.Converter -import org.axonframework.eventsourcing.annotation.EventSourcingHandler -import org.axonframework.eventsourcing.eventstore.EventStorageEngine -import org.axonframework.messaging.core.MessageType -import org.axonframework.messaging.eventhandling.EventMessage -import org.axonframework.modelling.StateManager -import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.Assertions.assertNull -import org.junit.jupiter.api.BeforeEach -import org.junit.jupiter.api.Test - -/** - * Tests the reflection-based `@EventSourcingHandler` dispatch fallback used when AF5's - * own metamodel dispatch silently no-ops in our ad-hoc inspection context (no real - * `ProcessingContext`, no interceptor chain). - * - * Two dispatch paths are exercised: - * - * - **Path A**: payload is already a typed instance (the FQN [MessageType.name] case where - * [RSocketModelInspectionResponder.deserializePayload] succeeded). The matching handler - * is found by parameter-type assignability. - * - * - **Path B**: payload is still raw `byte[]` because the [MessageType.name] is a short - * namespaced form (e.g. `quickstart.OrderCreatedEvent` from `@Event(namespace=...)`) - * that `Class.forName` can't resolve. The handler is selected by simple-name match - * against [MessageType.name] segments, then the [Converter] is invoked to turn the - * `byte[]` into the handler's parameter type. - * - * Critically: under Path B, Jackson permissive deserialization could happily build any of - * the entity's event classes from any JSON byte sequence (filling whatever fields match, - * defaulting the rest). We must pick the handler **before** invoking the converter, - * otherwise the wrong handler fires (e.g. an `OrderCreated` event mutating the entity as - * if it were `OrderShipped`). - */ -class RSocketModelInspectionResponderReflectionDispatchTest { - - private lateinit var responder: RSocketModelInspectionResponder - private lateinit var configuration: Configuration - private lateinit var converter: Converter - - @BeforeEach - fun setUp() { - configuration = mockk() - converter = mockk() - - // Lazy `payloadConverter` reads `configuration.getComponent(Converter::class.java)`. - // Stub it so Path B can use the converter. - every { configuration.getComponent(Converter::class.java) } returns converter - - responder = RSocketModelInspectionResponder( - stateManager = mockk(), - eventStorageEngine = mockk(), - registrar = mockk(), - configuration = configuration, - ) - } - - // --------------------------------------------------------------------------------------- - // Path A — payload already deserialized - // --------------------------------------------------------------------------------------- - - @Test - fun `Path A invokes the @EventSourcingHandler whose param matches the typed payload`() { - val entity = TestOrder() - val event = TestOrder.OrderCreatedEvent("order-1", "Alice") - val message = mockk() - every { message.payload() } returns event - // Path A doesn't read message.type() at all — we never reach the converter call. - - responder.applyEventViaReflection(entity, message) - - assertEquals("order-1", entity.orderId) - assertEquals("Alice", entity.customerName) - assertEquals(TestOrder.Status.CREATED, entity.status) - } - - @Test - fun `Path A is a no-op when no handler matches the typed payload`() { - val entity = TestOrder() - // Use an event class the entity has no handler for. - val unrelatedEvent = UnrelatedEvent("payload-content") - val message = mockk() - every { message.payload() } returns unrelatedEvent - - responder.applyEventViaReflection(entity, message) - - // Entity untouched because no handler accepted UnrelatedEvent. - assertNull(entity.orderId) - assertEquals(TestOrder.Status.DRAFT, entity.status) - } - - // --------------------------------------------------------------------------------------- - // Path B — raw byte[] payload, simple-name handler resolution - // --------------------------------------------------------------------------------------- - - @Test - fun `Path B selects the handler by simple-name match and converts raw byte payload to it`() { - val entity = TestOrder() - val rawBytes = "{\"orderId\":\"order-2\",\"customerName\":\"Bob\"}".toByteArray() - val typedEvent = TestOrder.OrderCreatedEvent("order-2", "Bob") - val message = mockk() - every { message.payload() } returns rawBytes - every { message.type() } returns MessageType("quickstart.OrderCreatedEvent") - // Converter sees the request for the handler's param type and returns a typed instance. - // Note: the simple-name resolver picks OrderCreatedEvent from `quickstart.OrderCreatedEvent`, - // so the converter is invoked with that type — never with OrderShippedEvent or any other. - every { - message.payloadAs(TestOrder.OrderCreatedEvent::class.java, converter) - } returns typedEvent - - responder.applyEventViaReflection(entity, message) - - assertEquals("order-2", entity.orderId) - assertEquals("Bob", entity.customerName) - assertEquals(TestOrder.Status.CREATED, entity.status) - } - - @Test - fun `Path B does not fire any handler when no entity handler param matches the simple-name`() { - val entity = TestOrder() - val rawBytes = "{\"foo\":\"bar\"}".toByteArray() - val message = mockk() - every { message.payload() } returns rawBytes - // Simple-name "Mystery" doesn't match any of TestOrder's handler param types. - every { message.type() } returns MessageType("some.namespace.Mystery") - - responder.applyEventViaReflection(entity, message) - - // No handler fired → entity stays at defaults. Crucially, the converter was never - // invoked either, because the resolver short-circuited on the missing simple-name. - assertNull(entity.orderId) - assertEquals(TestOrder.Status.DRAFT, entity.status) - } - - @Test - fun `Path B picks correct handler when multiple share an overlapping JSON shape`() { - // Regression guard: under the previous "try every handler with Jackson" approach, - // an OrderCreatedEvent JSON could permissively deserialize to OrderShippedEvent - // (sharing only `orderId`), causing the OrderShipped handler to fire and set - // status=SHIPPED instead of CREATED. The simple-name matcher prevents this. - val entity = TestOrder() - val createdJsonBytes = "{\"orderId\":\"order-3\",\"customerName\":\"Carol\"}".toByteArray() - val message = mockk() - every { message.payload() } returns createdJsonBytes - every { message.type() } returns MessageType("quickstart.OrderCreatedEvent") - - // Only the OrderCreated path should be exercised. We stub it; if the responder - // mistakenly tried OrderShipped, MockK would throw on the unstubbed call. - every { - message.payloadAs(TestOrder.OrderCreatedEvent::class.java, converter) - } returns TestOrder.OrderCreatedEvent("order-3", "Carol") - - responder.applyEventViaReflection(entity, message) - - assertEquals(TestOrder.Status.CREATED, entity.status) // not SHIPPED - assertEquals("order-3", entity.orderId) - assertEquals("Carol", entity.customerName) - } - - // --------------------------------------------------------------------------------------- - // Test fixtures - // --------------------------------------------------------------------------------------- - - /** - * Mirrors the AF5 entity pattern under test: no-arg constructor + several - * `@EventSourcingHandler` methods that mutate `this` in place. - */ - class TestOrder { - var orderId: String? = null - var customerName: String? = null - var carrier: String? = null - var status: Status = Status.DRAFT - - enum class Status { DRAFT, CREATED, SHIPPED } - - @EventSourcingHandler - fun on(event: OrderCreatedEvent) { - this.orderId = event.orderId - this.customerName = event.customerName - this.status = Status.CREATED - } - - @EventSourcingHandler - fun on(event: OrderShippedEvent) { - // If this handler ever fires on an OrderCreated payload (the "Jackson permissive" - // bug we guard against), `status` would jump straight to SHIPPED. - this.orderId = event.orderId - this.carrier = event.carrier - this.status = Status.SHIPPED - } - - @JvmRecord - data class OrderCreatedEvent(val orderId: String, val customerName: String) - - @JvmRecord - data class OrderShippedEvent(val orderId: String, val carrier: String) - } - - /** A class no `TestOrder` handler accepts — used to assert no-op behaviour. */ - @JvmRecord - data class UnrelatedEvent(val payload: String) -}