Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.exceptions.CleanableFailure;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
Expand Down Expand Up @@ -150,12 +153,28 @@ protected boolean receive(Envelope envelope) {
private void commit(boolean partialCommit) {
try {
doCommit(partialCommit);
} catch (Exception e) {
} catch (CommitFailedException | CommitStateUnknownException e) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two different issues here, and both look bad.

CommitStateUnknownException should not be retried blindly. Its own Javadoc says we don’t know whether the commit succeeded, and retrying can create duplicates. Since the files stay in commitBuffer, the next cycle can commit the same files again. If the first commit actually landed, we get duplicate rows. This should be fatal and require manual table-state check.

CommitFailedException is also not something we should swallow. That is exactly the bug from #15878. A WARN + “retry next cycle” is not enough here: Kafka offsets may already be flushed, and on rebalance the in-memory commit state can be lost. Then the data is gone and the operator never sees a real failure.

Flink lets CommitFailedException propagate. I think we should do the same here.

LOG.warn(
"Coordinator {} failed to commit for commit {}, will try again next cycle",
taskId,
"Commit {} failed, will retry on next cycle: {}",
commitState.currentCommitId(),
e.getMessage(),
e);
} catch (RuntimeException e) {
if (e instanceof CleanableFailure) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CleanableFailure doesn’t mean “safe to retry.”

Its only uncommitted metadata can be cleaned up.

This branch also catches things that are clearly not retryable:

  • ForbiddenException / 403
  • BadRequestException / 400
  • NotAuthorizedException / 401
  • ValidationException

So a bad credential could keep retrying forever at WARN, with no clear signal to the operator.

If we want retry behavior here, I’d make it explicit: list the retryable exception types instead of using CleanableFailure as the check.

LOG.warn(
"Commit {} failed, will retry on next cycle: {}",
commitState.currentCommitId(),
e.getMessage(),
e);
} else {
LOG.error(
"Commit {} failed fatally for task {}: {}",
commitState.currentCommitId(),
taskId,
e.getMessage(),
e);
throw e;
}
} finally {
commitState.endCurrentCommit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
package org.apache.iceberg.connect.channel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
Expand All @@ -45,6 +49,8 @@
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.StructType;
Expand Down Expand Up @@ -135,14 +141,50 @@ public void testCommitError() {
.withRecordCount(5)
.build();

coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
assertThatThrownBy(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coordinatorTest() calls coordinator.process() directly, so it skips the real production path.

In prod the flow is:

CoordinatorThread.run() catches the exception → marks terminated = true → next CommitterImpl.save() calls processControlEvents() → throws NotRunningException.

This test doesn’t cover that. It would still pass even if CoordinatorThread swallowed the failure.

I think we need an end-to-end test that goes through CoordinatorThread + CommitterImpl.

() -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find partition spec");

// no commit messages sent
assertThat(producer.history()).hasSize(1);

assertThat(table.snapshots()).isEmpty();
}

@Test
public void testCommitFailedExceptionSwallowed() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says “Verify issue #15878, but the test still expects CommitFailedException to be ignored.

That was the bug in #15878: the commit failure was swallowed and data was lost.

To verify the fix, the test should assert that the next CommitterImpl.save() throws NotRunningException, meaning the task moved to FAILED and the operator can see the failure.

// Verify issue #15878: a CommitFailedException from the catalog (e.g., Glue concurrent
// update) is logged and swallowed so the coordinator retries on the next cycle, rather
// than killing the task permanently.
Table spiedTable = spy(table);
AppendFiles spiedAppend = spy(table.newAppend());
doThrow(new CommitFailedException("Glue detected concurrent update"))
.when(spiedAppend)
.commit();
when(spiedTable.newAppend()).thenReturn(spiedAppend);
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);

// Should not throw -- CommitFailedException is retryable
coordinatorTest(
ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now());
}

@Test
public void testCommitStateUnknownExceptionSwallowed() {
Table spiedTable = spy(table);
AppendFiles spiedAppend = spy(table.newAppend());
doThrow(new CommitStateUnknownException(new RuntimeException("connection reset")))
.when(spiedAppend)
.commit();
when(spiedTable.newAppend()).thenReturn(spiedAppend);
when(catalog.loadTable(TABLE_IDENTIFIER)).thenReturn(spiedTable);

// Should not throw -- CommitStateUnknownException is retryable
coordinatorTest(
ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now());
}

private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) {
byte[] bytes = producer.history().get(idx).value();
Event commitTable = AvroUtil.decode(bytes);
Expand Down Expand Up @@ -289,7 +331,8 @@ public void testCoordinatorCommittedOffsetValidation() {
Snapshot firstSnapshot = table.currentSnapshot();
assertThat(firstSnapshot.summary()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");

// Trigger commit to the table
// Trigger commit to the table; the coordinator detects stale offsets via the
// ValidationException (a CleanableFailure), logs a warning, and retries next cycle.
coordinatorTest(
ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), EventTestUtil.now());

Expand Down