diff --git a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java index ef8c24edf00..b265126cd20 100644 --- a/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java +++ b/log4j-core-test/src/test/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppenderTest.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -30,14 +31,18 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.ErrorHandler; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.impl.Log4jLogEvent; import org.apache.logging.log4j.core.test.categories.Appenders; @@ -55,6 +60,8 @@ public class KafkaAppenderTest { private static final Serializer SERIALIZER = new ByteArraySerializer(); + private static final AtomicInteger retrySendCount = new AtomicInteger(0); + private static final MockProducer kafka = new MockProducer(true, SERIALIZER, SERIALIZER) { @@ -65,12 +72,13 @@ public synchronized Future send(final ProducerRecord> history = kafka.history(); + assertEquals(2, history.size()); + assertFalse("Error should not be reported when retry succeeds", errorReported.get()); + } finally { + ((KafkaAppender) appender).setHandler(originalHandler); + } + } + @Test public void testAppenderNoEventTimestamp() { final Appender appender = ctx.getRequiredAppender("KafkaAppenderNoEventTimestamp"); diff --git a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java index a9788a2a59b..2fbccff7b04 100644 --- a/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java +++ b/log4j-core/src/main/java/org/apache/logging/log4j/core/appender/mom/kafka/KafkaAppender.java @@ -94,13 +94,7 @@ public KafkaAppender build() { } public Integer getRetryCount() { - Integer intRetryCount = null; - try { - intRetryCount = Integer.valueOf(retryCount); - } catch (NumberFormatException e) { - - } - return intRetryCount; + return retryCount; } public String getTopic() { @@ -216,16 +210,20 @@ public void append(final LogEvent event) { try { tryAppend(event); } catch (final Exception e) { - if (this.retryCount != null) { int currentRetryAttempt = 0; while (currentRetryAttempt < this.retryCount) { currentRetryAttempt++; try { tryAppend(event); - break; - } catch (Exception e1) { - + return; + } catch (final Exception retryException) { + LOGGER.debug( + "Unable to write to Kafka in appender [{}], retry attempt [{}/{}].", + getName(), + currentRetryAttempt, + this.retryCount, + retryException); } } } diff --git a/src/changelog/.2.x.x/fix_kafka_appender_retry_error_reporting.xml b/src/changelog/.2.x.x/fix_kafka_appender_retry_error_reporting.xml new file mode 100644 index 00000000000..f3884f41e8b --- /dev/null +++ b/src/changelog/.2.x.x/fix_kafka_appender_retry_error_reporting.xml @@ -0,0 +1,8 @@ + + + + Fix `KafkaAppender` reporting error to error handler even after a successful retry +