Skip to content

[Kafka Connect] Duplicate file registration across snapshots during coordinator recovery (append-only, IKC 1.10.1) #16282

@tongwai-wong-appier

Description

@tongwai-wong-appier

Apache Iceberg version

1.10.1 (latest release)

Query engine

Kafka Connect

Please describe the bug 🐞

Summary

When a Kafka Connection timed out triggers a worker task crash and coordinator switchover, the same parquet data file can be registered in two different Iceberg snapshots, causing row duplication. This occurs on append-only tables with no equality deletes.

We have observed this in production across two independent incidents on IKC 1.10.1 with a REST catalog (backed by GCS storage).

Mechanism

The failure sequence is:

  1. Coordinator C1 initiates a commit cycle (commitId A).
  2. Worker writes data file X to storage and sends DATA_WRITTEN to the control topic.
  3. C1 receives DATA_WRITTEN, commits file X to the Iceberg table → Snapshot S1 is created successfully.
  4. Before C1 can advance the control-topic consumer offset past the DATA_WRITTEN event, the worker hits a Connection timed out → task crash → coordinator shutdown.
  5. A new coordinator C2 starts. Its control-topic consumer position is before the DATA_WRITTEN event for file X (since C1 never advanced the offset).
  6. C2 re-consumes the stale DATA_WRITTEN for file X, buffers it, and commits it to the table → Snapshot S2 is created with the same file X.
  7. Result: file X appears in both S1 and S2. Every row in that file is now read twice.

Key observation: this is NOT fixed by PR #15710 / PR #15651

Both PRs address a related but distinct problem: stale and current DataWritten events mixed in a single RowDelta, causing sequence number collision that breaks equality-delete semantics.

In our append-only scenario, sequence numbers are irrelevant. The problem is that Iceberg's AppendFiles does not check whether a file_path is already registered in the table. Even with per-commitId RowDelta grouping, the stale event would still be committed as a separate RowDelta, and the file would be registered a second time.

The root issue is the lack of idempotent file registration in the coordinator's commit path, combined with non-atomic offset advancement (Iceberg commit succeeds but control-topic offset is not advanced before crash).

Evidence

Observed on IKC 1.10.1 with a JDBC catalog and GCS storage, on an append-only table with no equality deletes.

Incident — 2026-05-07 13:00 UTC

  • Duplicate groups: 4,873
  • Duplicate rows: 9,746 (extra: 4,873)
  • Same physical file registered in two snapshots:
Snapshot 8264179290764999750  @ 2026-05-07T13:14:36 UTC
Snapshot  314603727896153559  @ 2026-05-07T13:36:44 UTC
  • The file was written by the worker at ~13:14 (confirmed via timestamp embedded in the file name).
  • Between 13:15 and 13:35, the log shows 20 consecutive minutes of Commit timeout reached — two coordinators racing for the optimistic lock on the metadata file:
13:12:40  Cannot commit: metadata location ...532037... has changed from ...532038...
13:12:47  Cannot commit: metadata location ...532038... has changed from ...532039...
...
13:14:22  WorkerSinkTask ... Error: Connection timed out
13:14:22  Committer lost leader partition. Stopping Coordinator.
13:14:33  Cannot commit: metadata location ...532046... has changed from ...532047...
13:14:42  Commit a52ae0ca complete, snapshot 8264179290764999750, valid-through 2026-05-07T13:14:02.856Z
          ← Snapshot S1 succeeds (old coordinator's commit)
13:15:33 – 13:34:34  Commit timeout reached (×20, one per minute)
          ← New coordinator cannot commit because old coordinator's zombie is still racing
13:35:02  New coordinator discovers group coordinator, resets offsets
13:36:44  Snapshot 314603727896153559 committed
          ← Snapshot S2: same file re-registered

Confirmed via metadata inspection

Using the data_files metadata table with time travel:

WITH base AS (
    SELECT *,
        COUNT(1) OVER (PARTITION BY file_path) AS _file_count
    FROM <table>.data_files
    TIMESTAMP AS OF '<snapshot_timestamp>'
)
SELECT file_path, record_count, _file_count
FROM base
WHERE _file_count >= 2

Result confirms: identical file_path, identical record_count, appearing in two different snapshots.

Possible fix directions (our speculation — happy to discuss)

From our analysis, the fundamental issue is that file registration via AppendFiles / RowDelta is not idempotent — appending a file that is already in the table creates a duplicate manifest entry rather than a no-op. Combined with the non-atomic relationship between Iceberg commit and control-topic offset advancement, any coordinator crash between these two steps will cause re-registration on recovery.

We are not sure which approach would be most appropriate within the IKC codebase, but some directions we considered:

  1. Commit-level file dedup: Before committing, check if any of the pending file_paths are already registered in the current table snapshot. Skip files that are already present.
  2. Stale event filtering by committed offset: On coordinator startup, read the table's last committed offsets and discard any DataWritten events whose offsets fall within the already-committed range.

We'd appreciate guidance from maintainers on the preferred approach.

Related issues

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions