Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
Expand All @@ -30,6 +31,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -38,6 +41,7 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.ErrorHandler;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.core.test.categories.Appenders;
Expand All @@ -55,6 +59,8 @@ public class KafkaAppenderTest {

private static final Serializer<byte[]> SERIALIZER = new ByteArraySerializer();

private static final AtomicInteger retrySendCount = new AtomicInteger(0);

private static final MockProducer<byte[], byte[]> kafka =
new MockProducer<byte[], byte[]>(true, SERIALIZER, SERIALIZER) {

Expand All @@ -65,12 +71,13 @@ public synchronized Future<RecordMetadata> send(final ProducerRecord<byte[], byt

final boolean isRetryTest = "true".equals(ThreadContext.get("KafkaAppenderWithRetryCount"));
if (isRetryTest) {
try {
throw new TimeoutException();
} catch (TimeoutException e) {
// TODO Auto-generated catch block
throw new RuntimeException(e);
}
throw new RuntimeException(new TimeoutException());
}

final boolean isRetrySuccessTest =
"true".equals(ThreadContext.get("KafkaAppenderRetrySuccessTest"));
if (isRetrySuccessTest && retrySendCount.getAndIncrement() == 0) {
throw new RuntimeException(new TimeoutException());
}

return retVal;
Expand Down Expand Up @@ -206,6 +213,40 @@ public void testAppendWithRetryCount() {
}
}

@Test
public void testRetrySuccessDoesNotReportError() {
retrySendCount.set(0);
final AtomicBoolean errorReported = new AtomicBoolean(false);
final Appender appender = ctx.getRequiredAppender("KafkaAppenderWithRetryCount");
final ErrorHandler originalHandler = ((KafkaAppender) appender).getHandler();
try {
ThreadContext.put("KafkaAppenderRetrySuccessTest", "true");
Comment thread
ramanathan1504 marked this conversation as resolved.
Outdated
((KafkaAppender) appender).setHandler(new ErrorHandler() {
@Override
public void error(final String msg) {
errorReported.set(true);
}

@Override
public void error(final String msg, final Throwable t) {
errorReported.set(true);
}

@Override
public void error(final String msg, final LogEvent event, final Throwable t) {
errorReported.set(true);
}
});
appender.append(createLogEvent());
final List<ProducerRecord<byte[], byte[]>> history = kafka.history();
assertEquals(2, history.size());
assertFalse("Error should not be reported when retry succeeds", errorReported.get());
} finally {
ThreadContext.clearMap();
((KafkaAppender) appender).setHandler(originalHandler);
}
}

@Test
public void testAppenderNoEventTimestamp() {
final Appender appender = ctx.getRequiredAppender("KafkaAppenderNoEventTimestamp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,7 @@ public KafkaAppender build() {
}

public Integer getRetryCount() {
Integer intRetryCount = null;
try {
intRetryCount = Integer.valueOf(retryCount);
} catch (NumberFormatException e) {

}
return intRetryCount;
return retryCount;
}

public String getTopic() {
Expand Down Expand Up @@ -216,16 +210,20 @@ public void append(final LogEvent event) {
try {
tryAppend(event);
} catch (final Exception e) {

if (this.retryCount != null) {
int currentRetryAttempt = 0;
while (currentRetryAttempt < this.retryCount) {
currentRetryAttempt++;
try {
tryAppend(event);
break;
} catch (Exception e1) {

return;
} catch (final Exception retryException) {
LOGGER.debug(
"Unable to write to Kafka in appender [{}], retry attempt [{}/{}].",
getName(),
currentRetryAttempt,
this.retryCount,
retryException);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<entry xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://logging.apache.org/xml/ns"
xsi:schemaLocation="https://logging.apache.org/xml/ns https://logging.apache.org/xml/ns/log4j-changelog-0.xsd"
type="fixed">
<issue id="4125" link="https://github.com/apache/logging-log4j2/pull/4125"/>
<description format="asciidoc">Fix `KafkaAppender` reporting error to error handler even after a successful retry</description>
</entry>