Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ public void enhance(ComponentRegistry registry) {


UtilsKt.doOnSubModules(registry, (componentRegistry, module) -> {
// Only event processor modules expose a processorName; the doOnSubModules walker
// visits every sub-module (event-sourced entities, command/query modules, ...), so
// skipping non-processor modules here keeps AxoniqPlatformEventHandlingComponent
// from being constructed with a null processor name.
if (!(module instanceof PooledStreamingEventProcessorModule)
&& !(module instanceof SubscribingEventProcessorModule)) {
return null;
}
componentRegistry
.registerDecorator(DecoratorDefinition.forType(EventHandlingComponent.class)
.with((cc, name, delegate) ->
Expand All @@ -176,7 +184,7 @@ public void enhance(ComponentRegistry registry) {
.order(0));

return null;
});
}, true);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.eventsourcing

import org.axonframework.messaging.core.Context
import org.axonframework.messaging.core.unitofwork.ProcessingContext
import org.axonframework.messaging.eventhandling.EventMessage
import org.axonframework.modelling.EntityEvolver
import java.util.function.BiConsumer

/**
* Wraps the underlying [EntityEvolver] of an [org.axonframework.eventsourcing.EventSourcingRepository]
* so that inspection-time replay can fire BEFORE/AFTER hooks per event without reimplementing
* AF5's dispatch.
*
* The hooks are no-ops unless the matching [Context.ResourceKey] resources are present on the
* [ProcessingContext], so command handling and normal event sourcing go through unchanged.
*
* Constructed by [AxoniqPlatformModelInspectionEnhancer], which detects the inner
* [org.axonframework.eventsourcing.EventSourcingRepository] in the decorator chain and reconstructs
* it with this wrapper substituted for its `entityEvolver` argument.
*/
class AxoniqPlatformEntityEvolver<E : Any>(
private val delegate: EntityEvolver<E>,
) : EntityEvolver<E> {

companion object {
/** Called before [delegate.evolve]. Receives the event and the pre-evolve entity state. */
val BEFORE_CONSUMER: Context.ResourceKey<BiConsumer<EventMessage, Any?>> =
Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.BEFORE_CONSUMER")

/** Called after [delegate.evolve]. Receives the event and the post-evolve entity state. */
val AFTER_CONSUMER: Context.ResourceKey<BiConsumer<EventMessage, Any?>> =
Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.AFTER_CONSUMER")

/**
* Zero-based event index — when present, [evolve] returns the current entity unchanged
* once it has applied this many events. Lets inspection reconstruct state up to a given
* sequence without doing the bookkeeping outside the framework.
*/
val MAX_INDEX: Context.ResourceKey<Long> =
Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.MAX_INDEX")

/** Internal counter advanced by [evolve] when [MAX_INDEX] is set. */
val INDEX_COUNTER: Context.ResourceKey<LongCounter> =
Context.ResourceKey.withLabel("AxoniqPlatformEntityEvolver.INDEX_COUNTER")
}

/** Tiny mutable holder so we can advance the index without re-putting a Long resource each call. */
class LongCounter(var value: Long = 0)

override fun evolve(entity: E, event: EventMessage, context: ProcessingContext): E {
val maxIndex = context.getResource(MAX_INDEX)
if (maxIndex != null) {
val counter = context.computeResourceIfAbsent(INDEX_COUNTER) { LongCounter() }
if (counter.value > maxIndex) {
return entity
}
counter.value++
}
context.getResource(BEFORE_CONSUMER)?.accept(event, entity)
val result = delegate.evolve(entity, event, context)
context.getResource(AFTER_CONSUMER)?.accept(event, result)
return result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,54 @@

import io.axoniq.platform.framework.AxoniqPlatformConfigurerEnhancer;
import io.axoniq.platform.framework.client.RSocketHandlerRegistrar;
import org.axonframework.common.configuration.ComponentDefinition;
import org.axonframework.common.configuration.ComponentRegistry;
import org.axonframework.common.configuration.ConfigurationEnhancer;
import org.axonframework.common.lifecycle.Phase;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.modelling.StateManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Enhancer that registers the {@link RSocketModelInspectionResponder} when both
* {@link StateManager} and {@link EventStorageEngine} are available (AF5 applications).
* Service-loaded enhancer that wires the {@link RSocketModelInspectionResponder} when the
* application has the platform client connected ({@link RSocketHandlerRegistrar} present) and
* {@code axon-eventsourcing} is on the classpath.
*
* <p>This class is deliberately free of direct references to {@code axon-eventsourcing} types
* so it can be loaded even when event sourcing is absent from the classpath. The actual
* decorator wiring lives in {@link ModelInspectionDecorators} and is only touched after the
* runtime classpath probe succeeds.</p>
*
* <p>We deliberately do <em>not</em> probe for {@code StateManager} either: it's registered by
* {@code ModellingConfigurationDefaults} at {@link Integer#MAX_VALUE}, after this enhancer's
* order, so the probe would falsely return {@code false} during boot.</p>
*/
public class AxoniqPlatformModelInspectionEnhancer implements ConfigurationEnhancer {

private static final Logger logger = LoggerFactory.getLogger(AxoniqPlatformModelInspectionEnhancer.class);
private static final String EVENTSOURCING_PROBE_CLASS = "org.axonframework.eventsourcing.eventstore.EventStorageEngine";

@Override
public void enhance(ComponentRegistry registry) {
if (!registry.hasComponent(StateManager.class)
|| !registry.hasComponent(EventStorageEngine.class)
|| !registry.hasComponent(RSocketHandlerRegistrar.class)) {
if (!registry.hasComponent(RSocketHandlerRegistrar.class)) {
return;
}

registry.registerComponent(ComponentDefinition
.ofType(RSocketModelInspectionResponder.class)
.withBuilder(c -> new RSocketModelInspectionResponder(
c.getComponent(StateManager.class),
c.getComponent(EventStorageEngine.class),
c.getComponent(RSocketHandlerRegistrar.class),
c))
.onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start));
if (!isClasspathAvailable()) {
logger.debug("axon-eventsourcing not on classpath; skipping model inspection wiring.");
return;
}
ModelInspectionDecorators.apply(registry);
}

@Override
public int order() {
return AxoniqPlatformConfigurerEnhancer.PLATFORM_ENHANCER_ORDER + 1;
}

private static boolean isClasspathAvailable() {
try {
Class.forName(EVENTSOURCING_PROBE_CLASS, false,
AxoniqPlatformModelInspectionEnhancer.class.getClassLoader());
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2022-2026. AxonIQ B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.platform.framework.eventsourcing;

import io.axoniq.platform.framework.ReflectionKt;
import io.axoniq.platform.framework.client.RSocketHandlerRegistrar;
import org.axonframework.common.configuration.ComponentDefinition;
import org.axonframework.common.configuration.ComponentRegistry;
import org.axonframework.common.configuration.DecoratorDefinition;
import org.axonframework.common.lifecycle.Phase;
import org.axonframework.eventsourcing.EventSourcedEntityFactory;
import org.axonframework.eventsourcing.EventSourcingRepository;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.handler.SourcingHandler;
import org.axonframework.modelling.EntityEvolver;
import org.axonframework.modelling.repository.Repository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

/**
* Holds the actual decorator and component wiring for model inspection. Kept separate from
* {@link AxoniqPlatformModelInspectionEnhancer} so the enhancer class can be loaded even when
* {@code axon-eventsourcing} is not on the classpath — this class is only touched after a
* {@code Class.forName} probe confirms the module is present.
*
* <p>We do <em>not</em> walk submodules: AF5's nested module structure shares a single
* {@link org.axonframework.common.configuration.DefaultComponentRegistry} (each {@code BaseModule}
* resolves the parent's {@code ComponentRegistry} component instead of creating its own), so a
* single {@code Repository} decorator at the top covers every event-sourced entity in the
* application — top-level or arbitrarily nested.</p>
*/
final class ModelInspectionDecorators {

private static final Logger logger = LoggerFactory.getLogger(ModelInspectionDecorators.class);

private ModelInspectionDecorators() {
}

static void apply(ComponentRegistry registry) {
if (!registry.hasComponent(EventStorageEngine.class)) {
return;
}
// The enhancer pipeline can fire multiple times against the same registry as nested
// module configurations build. Idempotency guard: once the responder is in place, the
// decorator and its lifecycle hook are already registered, so re-running would
// duplicate the wrapping and double-evolve every event.
if (registry.hasComponent(RSocketModelInspectionResponder.class)) {
return;
}

registry.registerComponent(ComponentDefinition
.ofType(RSocketModelInspectionResponder.class)
.withBuilder(c -> new RSocketModelInspectionResponder(
c.getComponent(EventStorageEngine.class),
c.getComponent(RSocketHandlerRegistrar.class),
c))
.onStart(Phase.EXTERNAL_CONNECTIONS, RSocketModelInspectionResponder::start));

// Single decorator at the top covers every Repository registered in the application,
// top-level or nested — AF5's nested modules share the same component registry.
//
// The decorator reconstructs the underlying EventSourcingRepository with entityEvolver
// wrapped in AxoniqPlatformEntityEvolver. We deliberately do NOT decorate the registered
// EntityMetamodel — AnnotatedEventSourcedEntityModule casts the registered metamodel to
// AnnotatedEntityMetamodel inside its EntityIdResolver builder, and a wrapper would
// make that cast fail at startup.
//
// The .onStart hook then registers the rebuilt repository with the responder so it
// knows about this entity for the registered-entities query.
registry.registerDecorator(DecoratorDefinition
.forType(Repository.class)
.with((config, name, delegate) -> rebuildIfEventSourcingRepository(delegate))
.onStart(Phase.LOCAL_MESSAGE_HANDLER_REGISTRATIONS, (configuration, component) -> {
configuration.getComponent(RSocketModelInspectionResponder.class)
.registerRepository(component);
return CompletableFuture.completedFuture(null);
}));
}

/**
* Walks the wrapper chain from {@code delegate} downward to find an
* {@link EventSourcingRepository}, reconstructs that ESR with {@code entityEvolver} wrapped
* in {@link AxoniqPlatformEntityEvolver}, and swaps the wrapping component's {@code delegate}
* field to point at the new ESR. The outer wrapper(s) are kept intact so any platform-side
* decoration (e.g. {@code AxoniqPlatformRepository} for metrics) still applies.
*
* <p>Why peel rather than match {@code instanceof EventSourcingRepository} on the input:
* by the time this decorator runs, lower-order decorators (notably the metrics-adding
* {@code AxoniqPlatformRepository} from the modelling layer at {@code Integer.MIN_VALUE})
* have already wrapped the ESR. Matching directly would miss every real configuration.</p>
*
* <p>Logged-and-passthrough on reflection failure: if the field layout shifts in a future
* AF release we don't want to break command handling, just lose inspection hooks.</p>
*/
@SuppressWarnings({"rawtypes", "unchecked"})
private static Repository<?, ?> rebuildIfEventSourcingRepository(Repository<?, ?> delegate) {
Object current = delegate;
Object parent = null;
while (current != null && !(current instanceof EventSourcingRepository<?, ?>)) {
parent = current;
current = ReflectionKt.getPropertyValue(current, "delegate");
}
if (!(current instanceof EventSourcingRepository<?, ?> esr)) {
return delegate;
}
try {
Class<?> idType = ReflectionKt.getPropertyValue(esr, "idType");
Class<?> entityType = ReflectionKt.getPropertyValue(esr, "entityType");
EventStore eventStore = ReflectionKt.getPropertyValue(esr, "eventStore");
EventSourcedEntityFactory factory = ReflectionKt.getPropertyValue(esr, "entityFactory");
EntityEvolver evolver = ReflectionKt.getPropertyValue(esr, "entityEvolver");
SourcingHandler sourcingHandler = ReflectionKt.getPropertyValue(esr, "sourcingHandler");

EntityEvolver wrappedEvolver = new AxoniqPlatformEntityEvolver(evolver);
EventSourcingRepository rebuilt = new EventSourcingRepository(
idType,
entityType,
eventStore,
factory,
wrappedEvolver,
sourcingHandler
);
if (parent == null) {
// No wrapper between us and the ESR — return the rebuilt ESR directly.
return rebuilt;
}
// Swap the parent wrapper's delegate to point at the rebuilt ESR. Keeps any outer
// wrappers (metrics, etc.) intact, just rewires the bottom of the chain.
ReflectionKt.setPropertyValue(parent, "delegate", rebuilt);
return delegate;
} catch (Exception e) {
logger.warn("[ModelInspection] Could not reconstruct EventSourcingRepository for [{}] — " +
"inspection hooks will be unavailable for this entity, but command handling is unaffected: {}",
esr.entityType().getName(), e.getMessage());
return delegate;
}
}
}
Loading