diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java index c986f8afc2eb..4b46b941f4f6 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java @@ -150,12 +150,17 @@ protected boolean receive(Envelope envelope) { private void commit(boolean partialCommit) { try { doCommit(partialCommit); - } catch (Exception e) { - LOG.warn( - "Coordinator {} failed to commit for commit {}, will try again next cycle", - taskId, - commitState.currentCommitId(), - e); + } catch (RuntimeException e) { + if (partialCommit) { + LOG.warn( + "Partial commit {} failed for task {}, will retry", + commitState.currentCommitId(), + taskId, + e); + } else { + LOG.error("Commit {} failed for task {}", commitState.currentCommitId(), taskId, e); + throw e; + } } finally { commitState.endCurrentCommit(); } diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java index ed370fcdad35..0b5553e127ef 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCoordinator.java @@ -19,12 +19,16 @@ package org.apache.iceberg.connect.channel; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import java.time.OffsetDateTime; import java.util.List; import java.util.UUID; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DataOperations; @@ -45,6 +49,8 @@ import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types.StructType; @@ -135,14 +141,35 @@ public void testCommitError() { .withRecordCount(5) .build(); - coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null); + assertThatThrownBy( + () -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Cannot find partition spec"); - // no commit messages sent assertThat(producer.history()).hasSize(1); - assertThat(table.snapshots()).isEmpty(); } + @Test + public void testCommitFailedExceptionPropagates() { + Table spiedTable = spy(table); + AppendFiles spiedAppend = spy(table.newAppend()); + doThrow(new CommitFailedException("Glue detected concurrent update")) + .when(spiedAppend) + .commit(); + when(spiedTable.newAppend()).thenReturn(spiedAppend); + when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable); + + assertThatThrownBy( + () -> + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(), + EventTestUtil.now())) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Glue detected concurrent update"); + } + private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) { byte[] bytes = producer.history().get(idx).value(); Event commitTable = AvroUtil.decode(bytes); @@ -289,13 +316,18 @@ public void testCoordinatorCommittedOffsetValidation() { Snapshot firstSnapshot = table.currentSnapshot(); assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); - // Trigger commit to the table - coordinatorTest( - ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now()); + // Trigger commit to the table - should throw ValidationException + assertThatThrownBy( + () -> + coordinatorTest( + ImmutableList.of(EventTestUtil.createDataFile()), + ImmutableList.of(), + EventTestUtil.now())) + .isInstanceOf(ValidationException.class) + .hasMessageContaining("stale offsets"); - // Assert that the table was not updated and offsets remain table.refresh(); assertThat(table.snapshots()).hasSize(2); - assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); + assertThat(table.currentSnapshot().summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}"); } }