diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 9f24ce39b974..403cf4f2bc5a 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -42,6 +42,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
+ implementation project(path: ":model:fn-execution", configuration: "shadow")
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_69_0
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java
new file mode 100644
index 000000000000..bdce67f6bad5
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.values.CausedByDrain;
+
+/**
+ * Encapsulates metadata that propagates with elements in the pipeline.
+ *
+ *
This metadata is sent along with elements. It currently includes fields like {@link
+ * CausedByDrain}, and is designed to be extensible to support future metadata fields such as
+ * OpenTelemetry context or CDC (Change Data Capture) kind.
+ *
+ *
The purpose of this class is to group targeted metadata fields together. This makes it easier
+ * to define combination strategies (e.g., when accumulating state in {@code ReduceFnRunner}) when
+ * multiple elements are merged or grouped, without having to extend method signatures or state
+ * handling for every new metadata field.
+ */
+@AutoValue
+public abstract class CombinedMetadata {
+ public abstract CausedByDrain causedByDrain();
+
+ public static CombinedMetadata create(CausedByDrain causedByDrain) {
+ return new AutoValue_CombinedMetadata(causedByDrain);
+ }
+
+ public static CombinedMetadata createDefault() {
+ return create(CausedByDrain.NORMAL);
+ }
+
+ public static class Coder extends AtomicCoder {
+ private static final Coder INSTANCE = new Coder();
+
+ public static Coder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(CombinedMetadata value, OutputStream outStream) throws IOException {
+ BeamFnApi.Elements.ElementMetadata proto =
+ BeamFnApi.Elements.ElementMetadata.newBuilder()
+ .setDrain(
+ value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
+ ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
+ .build();
+ proto.writeDelimitedTo(outStream);
+ }
+
+ @Override
+ public CombinedMetadata decode(InputStream inStream) throws IOException {
+ BeamFnApi.Elements.ElementMetadata proto =
+ BeamFnApi.Elements.ElementMetadata.parseDelimitedFrom(inStream);
+ if (proto == null) {
+ return CombinedMetadata.createDefault();
+ }
+
+ CausedByDrain causedByDrain =
+ proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ ? CausedByDrain.CAUSED_BY_DRAIN
+ : CausedByDrain.NORMAL;
+
+ return CombinedMetadata.create(causedByDrain);
+ }
+ }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java
new file mode 100644
index 000000000000..a2f3f26520ef
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadataCombiner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.values.CausedByDrain;
+
+/** Combiner for CombinedMetadata. */
+class CombinedMetadataCombiner
+ extends CombineFn {
+ private static final CombinedMetadataCombiner INSTANCE = new CombinedMetadataCombiner();
+
+ public static CombinedMetadataCombiner of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public CombinedMetadata createAccumulator() {
+ return CombinedMetadata.create(CausedByDrainCombiner.of().createAccumulator());
+ }
+
+ @Override
+ public CombinedMetadata addInput(CombinedMetadata accumulator, CombinedMetadata input) {
+ return CombinedMetadata.create(
+ CausedByDrainCombiner.of().addInput(accumulator.causedByDrain(), input.causedByDrain()));
+ }
+
+ @Override
+ public CombinedMetadata mergeAccumulators(Iterable accumulators) {
+ CombinedMetadata result = createAccumulator();
+ for (CombinedMetadata accum : accumulators) {
+ result = addInput(result, accum);
+ }
+ return result;
+ }
+
+ @Override
+ public CombinedMetadata extractOutput(CombinedMetadata accumulator) {
+ return accumulator;
+ }
+
+ /** Combiner for CausedByDrain metadata. */
+ static class CausedByDrainCombiner implements MetadataCombiner {
+ private static final CausedByDrainCombiner INSTANCE = new CausedByDrainCombiner();
+
+ public static CausedByDrainCombiner of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public CausedByDrain createAccumulator() {
+ return CausedByDrain.NORMAL;
+ }
+
+ @Override
+ public CausedByDrain addInput(CausedByDrain current, CausedByDrain input) {
+ if (current == CausedByDrain.CAUSED_BY_DRAIN || input == CausedByDrain.CAUSED_BY_DRAIN) {
+ return CausedByDrain.CAUSED_BY_DRAIN;
+ }
+ return CausedByDrain.NORMAL;
+ }
+ }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index f5587b46598a..b206e66dcf3e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -145,7 +145,13 @@ public Iterable> filter(
} else {
nonLateElements.add(
WindowedValues.of(
- element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
+ element.getValue(),
+ element.getTimestamp(),
+ window,
+ element.getPaneInfo(),
+ element.getRecordId(),
+ element.getRecordOffset(),
+ element.causedByDrain()));
}
}
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java
new file mode 100644
index 000000000000..55884a8e43a8
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MetadataCombiner.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+/** Interface for combining pipeline metadata. */
+interface MetadataCombiner {
+ T createAccumulator();
+
+ T addInput(T accumulator, T input);
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 0721ddc4685e..1ae0c52f853a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -38,6 +38,7 @@
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -107,6 +108,12 @@ public class ReduceFnRunner {
*
It uses discarding or accumulation mode according to the {@link WindowingStrategy}.
*
*/
+ static final StateTag>
+ METADATA_TAG =
+ StateTags.makeSystemTagInternal(
+ StateTags.combiningValue(
+ "combinedMetadata", CombinedMetadata.Coder.of(), CombinedMetadataCombiner.of()));
+
private final WindowingStrategy