Skip to content

GH-11011: Fix MQTT v5 outbound payload conversion#11012

Open
d0v0c wants to merge 1 commit into
spring-projects:mainfrom
d0v0c:fix-mqttv5-outbound-conversion
Open

GH-11011: Fix MQTT v5 outbound payload conversion#11012
d0v0c wants to merge 1 commit into
spring-projects:mainfrom
d0v0c:fix-mqttv5-outbound-conversion

Conversation

@d0v0c
Copy link
Copy Markdown

@d0v0c d0v0c commented May 20, 2026

Fixes #11011

Mqttv5PahoMessageHandler#buildMqttMessage was calling MessageConverter#fromMessage(message, byte[].class) to serialize non-byte[] / non-String payloads, the wrong direction per the Spring Messaging contract. Switch to toMessage(payload, headers).

The class Javadoc gains a short note about the default converter's JSON content-type requirement.

Test plan

New Mqttv5HandlerTests covers two scenarios:

  • publishPojoWithDefaultConverter: round-trips a POJO through ConfigurableCompositeMessageConverter and verifies the published bytes are valid JSON.
  • publishPojoWithCustomConverter: verifies a user-supplied converter's toMessage is invoked on the outbound path.
  • ./gradlew :spring-integration-mqtt:test passes locally with no regressions.

@d0v0c d0v0c force-pushed the fix-mqttv5-outbound-conversion branch from 3c3b861 to 6742a8e Compare May 20, 2026 02:00
Copy link
Copy Markdown
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not critical now, but for the future contributions, please, consider to study this: https://cbea.ms/git-commit/amp/.
The point is that PR doesn’t matter for a long run. Plus in GitHub first commit message becomes a PR description with head line as a title. So, all those stories about test plan would play nice in the commit message.

thanks

MessageConverter converter = getConverter();
body = (byte[]) converter.fromMessage(message, byte[].class);
Message<?> converted = converter.toMessage(payload, message.getHeaders());
body = converted != null ? (byte[]) converted.getPayload() : null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are you sure that all the converters are producing byte[]?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the commit guide—I've added the test plan to the commit message.

As for the cast, the default ConfigurableCompositeMessageConverter safely guarantees a byte[] output. JacksonJsonMessageConverter defaults to byte[] payloads, while the other converters reject unmatched types and return null.

However, for a custom converter, I agree with your concern. It could return a different type (such as String) and trigger a ClassCastException. Added an explicit instanceof byte[] check.

@d0v0c d0v0c force-pushed the fix-mqttv5-outbound-conversion branch from 6742a8e to b09eb0b Compare May 20, 2026 12:28
Copy link
Copy Markdown
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!

* <p>
* Payloads that are not {@code byte[]} or {@link String} are serialized via
* the configured {@link MessageConverter}. The default converter requires
* a JSON content-type header, e.g. {@code application/json}.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct assumption.
The default one is this:

		if (getConverter() == null) {
			setConverter(getBeanFactory()
					.getBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
							MessageConverter.class));
		}

Which is by Spring Integration infrastructure:

	private void registerArgumentResolverMessageConverter() {
		if (!this.beanFactory.containsBean(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)) {
			this.registry.registerBeanDefinition(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME,
					new RootBeanDefinition(ConfigurableCompositeMessageConverter.class));
		}
	}

And that one is like this:

	private static Collection<MessageConverter> initDefaults() {
		List<MessageConverter> converters = new LinkedList<>();

		if (JacksonPresent.isJackson3Present()) {
			JacksonJsonMessageConverter jsonMessageConverter = new JacksonJsonMessageConverter();
			jsonMessageConverter.setStrictContentTypeMatch(true);
			converters.add(jsonMessageConverter);
		}
		else if (JacksonPresent.isJackson2Present()) {
			var mappingJackson2MessageConverter =
					new org.springframework.messaging.converter.MappingJackson2MessageConverter();
			mappingJackson2MessageConverter.setStrictContentTypeMatch(true);
			mappingJackson2MessageConverter.setObjectMapper(
					new org.springframework.integration.support.json.Jackson2JsonObjectMapper().getObjectMapper());
			converters.add(mappingJackson2MessageConverter);
		}
		converters.add(new ByteArrayMessageConverter());
		converters.add(new ObjectStringMessageConverter());

		// TODO do we port it together with MessageConverterUtils ?
		// converters.add(new JavaSerializationMessageConverter());

		return converters;
	}

so, if no Jackson process in classpath, there won't be any JSON conversions.
Please, revise this Javadoc to reflect reality.

Assert.state(converted != null,
() -> "The MQTT payload cannot be null. The '" + converter + "' returned null for: " + message);
Object convertedPayload = converted.getPayload();
Assert.state(convertedPayload instanceof byte[],
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can perform body = ((String) payload).getBytes(StandardCharsets.UTF_8); one more time since since some converters may return a String.
Might be the case that this has to be mentioned in Javadoc above, too.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the String fallback in the else branch as suggested.

Regarding the Javadoc, I'd prefer to treat this fallback as an internal implementation detail rather than explicitly documenting it. My concern is that putting it in the public Javadoc might turn a defensive safeguard into a guaranteed API contract. Keeping it undocumented encourages users to stick to the correct approach (passing byte[]).

*/
public class Mqttv5HandlerTests {

record TestPojo(String name, int value) { }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer to put supporting types in the and of test class.
Well, always in the end.

handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();

ReflectionTestUtils.setField(handler, "mqttClient", mockClient);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot we use ClientManager abstraction instead of this reflection?

*
* @author Deng Pan
*
* @since 7.1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are going to back-port this down to 6.5.9.
Even if it might be some kind of breaking change, it is better to be consistent with an API we rely on as you have pointed out in the issue.

@d0v0c d0v0c force-pushed the fix-mqttv5-outbound-conversion branch from b09eb0b to 784cee3 Compare May 20, 2026 20:05
Copy link
Copy Markdown
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build has failed for Checkstyle:

Error: eckstyle] [ERROR] /home/runner/work/spring-integration/spring-integration/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5HandlerTests.java:29:1: Wrong order for 'org.springframework.integration.mqtt.core.ClientManager' import. [ImportOrder]
> Task :spring-integration-mqtt:checkstyleTest

Please, make sure you use ./gradlew :spring-integration-mqtt:check locally before pushing to the PR.

* The {@link AbstractMqttMessageHandler} implementation for MQTT v5.
* <p>
* Payloads other than {@code byte[]} or {@link String} use the configured {@link MessageConverter}.
* The default converter only supports JSON (requires Jackson on the classpath and
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have discussed with you that this is not correct assumption if Jackson is not on classpath.
Please, revise this Javadoc.
I would fully remove this second sentence you suggest.

Assert.state(converted != null,
() -> "The MQTT payload cannot be null. The '" + converter + "' returned null for: " + message);
Object convertedPayload = converted.getPayload();
if (convertedPayload instanceof byte[]) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use pattern expression instead of cast:

if (convertedPayload instanceof byte[] bytes) {
    body = bytes;
}


// Outbound serialization must go through toMessage(...), never fromMessage(...).
verify(converter).toMessage(any(), any(MessageHeaders.class));
verify(converter, never()).fromMessage(any(), any());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think assertion and that comment above don't make sense.
Please, remove them.

*
* @since 6.5.9
*/
public class Mqttv5HandlerTests {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With JUnit 5 we don't need public/private modifier for classes and methods in tests: makes code much easier to read.

@d0v0c d0v0c force-pushed the fix-mqttv5-outbound-conversion branch from 784cee3 to b8a475e Compare May 21, 2026 14:50
Copy link
Copy Markdown
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good!
Very close to be merged.

Thanks

}
else if (payload instanceof String) {
body = ((String) payload).getBytes(StandardCharsets.UTF_8);
else if (payload instanceof String s) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, our preference is to never use single character variable names.
Please, revise this into something like stringValue if just string doesn't work.


handler.handleMessage(MessageBuilder.withPayload(new TestPojo("test", 2)).build());

// Custom converter returns a String; verify the handler applies the String → bytes safeguard.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that your is UTF-8 symbol to be transferred correctly between different OSs.

Mqttv5PahoMessageHandler#buildMqttMessage was calling
MessageConverter#fromMessage(message, byte[].class) to serialize
non-byte[] / non-String payloads. This was the wrong direction per
the Spring Messaging contract. Switch to toMessage(payload, headers).

Additionally, the class Javadoc gains a short note about the default
converter's JSON content-type requirement.

Test Plan:
New Mqttv5HandlerTests covers two scenarios:
- publishPojoWithDefaultConverter: round-trips a POJO through
  ConfigurableCompositeMessageConverter and verifies the published
  bytes are valid JSON.
- publishPojoWithCustomConverter: verifies a user-supplied
  converter's toMessage is invoked on the outbound path.

Fixes spring-projects#11011
Signed-off-by: d0v0c <dan.deng.pan@gmail.com>
@d0v0c d0v0c force-pushed the fix-mqttv5-outbound-conversion branch from b8a475e to 2f5d3e6 Compare May 21, 2026 15:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Mqttv5PahoMessageHandler fails due to incorrect MessageConverter.fromMessage usage

2 participants