Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/event-scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,146 @@ event-driven scheduling, then a new trigger must be created.
This new trigger must inherit ``BaseEventTrigger`` and ensure it properly works with event-driven scheduling.
It might inherit from the existing trigger as well if both triggers share some common code.

Sharing one poll across sibling triggers
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. versionadded:: 3.3

When several ``AssetWatcher`` instances on different assets back triggers that read from the **same upstream resource**
— a directory of flag files, a polling REST endpoint, and similar idempotent or
subscriber-side-effect sources — the triggerer would otherwise spin up one independent poll loop per trigger. For a
shared source with twenty subscribers that means twenty poll loops, twenty connections, twenty sets of API calls per
cadence. See "Suitable upstreams" below for the precise scope.
Comment thread
Lee-W marked this conversation as resolved.

``BaseEventTrigger`` supports an opt-in path so that sibling triggers share a single underlying poll, while each
trigger keeps its own DB row, its own ``run_trigger`` task, and its own per-instance filtering. To participate, a
subclass overrides three hooks:

* :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` — return a key identifying the shared
upstream (typically a tuple of strings). Triggers whose key compares equal will share one poll. Returning ``None``
(the default) opts out — the trigger runs its own independent ``run()`` loop, exactly as before. The return value
is read **once** when the triggerer starts this trigger; changing it mid-lifetime has no effect on group
membership, so siblings that should share a poll must return the same key from the outset.
The key must be deterministic — derive it from configuration fields, never from per-call values such as
``time.time()`` or ``uuid.uuid4()``, because the comparison must be stable across the lifetime of the group.

* :py:meth:`~airflow.triggers.base.BaseEventTrigger.open_shared_stream` — a ``@classmethod`` coroutine the triggerer
drives **once per shared-stream group** to yield raw events from the upstream. Because the triggerer reuses one
trigger's kwargs to drive the shared poll, only rely on fields whose values participate in ``shared_stream_key``.

* :py:meth:`~airflow.triggers.base.BaseEventTrigger.filter_shared_stream` — an instance method that consumes the
broadcast raw stream and yields the ``TriggerEvent`` instances this trigger should fire. Per-trigger filtering
(e.g. only events matching this instance's ``filename``) lives here.

Example: a ``DirectoryFileDeleteTrigger`` that fires when a per-asset flag file appears in a shared inbox directory:

.. code-block:: python

from collections.abc import AsyncIterator, Hashable
from typing import Any

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class DirectoryFileDeleteTrigger(BaseEventTrigger):
def __init__(self, *, directory, filename, poke_interval=5.0):
super().__init__()
self.directory = directory
self.filename = filename
self.poke_interval = poke_interval

def shared_stream_key(self) -> Hashable | None:
# All triggers on the same directory + cadence share one scan.
return ("directory-scan", self.directory, self.poke_interval)
Comment thread
Lee-W marked this conversation as resolved.

@classmethod
async def open_shared_stream(cls, kwargs: dict[str, Any]) -> AsyncIterator[Any]:
# Drives one directory listing loop per group.
...

async def filter_shared_stream(self, shared_stream: AsyncIterator[Any]) -> AsyncIterator[TriggerEvent]:
# Each instance fires only for its own filename.
async for snapshot in shared_stream:
if self.filename in snapshot["names"]:
yield TriggerEvent(...)
return

A complete example using this trigger ships in
``airflow.example_dags.example_asset_with_watchers``, where two sibling
``DirectoryFileDeleteTrigger`` watchers share one directory scan alongside
a standalone ``FileDeleteTrigger`` watcher in the same Dag.

What is and isn't shared
^^^^^^^^^^^^^^^^^^^^^^^^

The sharing is narrower than the name might suggest:

* **Shared** (one per ``shared_stream_key``): the ``open_shared_stream`` async generator and its upstream I/O — for
example, the actual ``iterdir`` calls on the directory or polling REST API calls.

* **Not shared** (one per trigger): the ``Trigger`` DB row, the trigger instance, the ``run_trigger``
asyncio task, and the ``filter_shared_stream`` async generator. Each ``AssetWatcher`` still appears as its own
trigger in the UI and in the metadata database.

In other words, the savings is at the poll-loop and upstream-I/O layer, not at the persistence or scheduling layer.

Suitable upstreams
^^^^^^^^^^^^^^^^^^

The shared-stream channel is **one-way** today: events flow from
``open_shared_stream`` out to each subscriber's ``filter_shared_stream``,
and there is no way for a subscriber to tell the producer "I accepted /
dropped / committed this event". That restricts the pattern to upstreams
whose consumption does **not** depend on a side effect on a handle that
only the producer holds. Good fits:

* Idempotent / read-only reads — directory scans, polling REST APIs.
* Subscriber-side-effect cleanup, where the trigger's per-event action
(``unlink``, local marking, …) goes through APIs the subscriber owns
independently of the shared producer handle.

Currently **not** in scope: Kafka consumers (regardless of commit mode),
SQS with delete-on-process or visibility extension, and any source where
progress on the producer's handle is tied to the subscriber's accept /
reject decision. These sources need a way for the subscriber to signal
acceptance back to the producer, which the current shared-stream API does
not provide.

Verifying that sharing is active
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The triggerer logs the creation of each shared-stream group, and names the poll task after its key:

.. code-block:: text

Shared stream group started key=('directory-scan', '/tmp/region-flags', 5.0)

.. code-block:: text

asyncio task name: shared-stream-poll[('directory-scan', '/tmp/region-flags', 5.0)]

If sharing is active you should see exactly one ``Shared stream group started`` line per distinct key, regardless of
how many subscribers join it. If you see one log line per subscriber instead, the keys probably do not compare equal
— verify that ``shared_stream_key`` returns identical values across the siblings.

Slow-subscriber overflow
^^^^^^^^^^^^^^^^^^^^^^^^

Each subscriber in a shared-stream group has a bounded in-memory queue. If the poll loop
produces events faster than a subscriber's ``filter_shared_stream`` can consume them, the
queue fills and that trigger is failed with ``_SubscriberOverflow`` — a deliberate fail-fast
rather than unbounded memory growth.

If subscribers repeatedly overflow, there are two ways to address this:

* Raise ``[triggerer] shared_stream_subscriber_queue_size`` to give the
filter more slack before the overflow threshold is reached.
* Redesign :py:meth:`~airflow.triggers.base.BaseEventTrigger.shared_stream_key` so fewer
sibling triggers share a single group — a narrower group reduces the rate at which any
one subscriber needs to consume events.

Both reduce the mismatch between producer throughput and per-subscriber consume rate.

Avoid infinite scheduling
~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/66584.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Sibling ``BaseEventTrigger`` instances on different ``AssetWatcher`` s can now share a single underlying poll loop in the triggerer by overriding ``shared_stream_key``, ``open_shared_stream``, and ``filter_shared_stream``. Triggers that opt out (the default) keep their existing independent ``run()`` loop behavior.
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2773,6 +2773,17 @@ triggerer:
type: boolean
example: ~
default: "False"
shared_stream_subscriber_queue_size:
description: |
Per-subscriber buffer size for shared-stream triggers (triggers that opt into a shared poll loop
via ``BaseEventTrigger.shared_stream_key``). Each subscribing trigger keeps an in-memory queue of
raw events produced by the shared poll; if a slow subscriber fills its queue, only that subscriber
fails, sibling subscribers are unaffected. Increase if a slow subscriber must tolerate bursts from
a fast upstream.
version_added: 3.3.0
type: integer
example: ~
default: "1024"
kerberos:
description: ~
options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,55 @@
# specific language governing permissions and limitations
# under the License.
"""
Example DAG for demonstrating the usage of event driven scheduling using assets and triggers.
Example Dag for event-driven scheduling using Assets and AssetWatchers.

Three watchers demonstrate the two trigger patterns in one place:

* The first watcher uses ``FileDeleteTrigger`` for a single specific path —
one watcher, one independent poll loop in the triggerer.
* The other two use ``DirectoryFileDeleteTrigger`` with a matching
``shared_stream_key`` of ``("directory-scan", directory, poke_interval)``;
the triggerer runs **one** directory listing loop for the pair and
broadcasts the result to both. Each still fires only for its own filename.

The Dag runs when any of the three watchers' assets is updated. Touch
``/tmp/test``, ``/tmp/region-flags/us.flag``, or ``/tmp/region-flags/eu.flag``
to trigger a run.
"""

from __future__ import annotations

from airflow.providers.standard.triggers.file import FileDeleteTrigger
from airflow.providers.standard.triggers.file import (
DirectoryFileDeleteTrigger,
FileDeleteTrigger,
)
from airflow.sdk import DAG, Asset, AssetWatcher, chain, task

file_path = "/tmp/test"
# Independent single-file watcher — has its own poll loop in the triggerer.
single_file_trigger = FileDeleteTrigger(filepath="/tmp/test")
single_file_asset = Asset(
"example_asset",
watchers=[AssetWatcher(name="test_asset_watcher", trigger=single_file_trigger)],
)

trigger = FileDeleteTrigger(filepath=file_path)
asset = Asset("example_asset", watchers=[AssetWatcher(name="test_asset_watcher", trigger=trigger)])
# Shared-stream watchers — same directory + poke interval, so the triggerer
# runs one scan for both. Each watcher's ``filter_shared_stream`` matches on
# its own filename and ``unlink``s the flag file as a subscriber-side effect.
us_trigger = DirectoryFileDeleteTrigger(directory="/tmp/region-flags", filename="us.flag", poke_interval=5.0)
eu_trigger = DirectoryFileDeleteTrigger(directory="/tmp/region-flags", filename="eu.flag", poke_interval=5.0)
us_asset = Asset(
"region_us_flag",
watchers=[AssetWatcher(name="us_flag_watcher", trigger=us_trigger)],
)
eu_asset = Asset(
"region_eu_flag",
watchers=[AssetWatcher(name="eu_flag_watcher", trigger=eu_trigger)],
)


with DAG(
dag_id="example_asset_with_watchers",
schedule=[asset],
schedule=[single_file_asset, us_asset, eu_asset],
catchup=False,
):

Expand Down
57 changes: 55 additions & 2 deletions airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import threading
import time
from collections import deque
from collections.abc import Callable, Generator, Iterable, Iterator
from collections.abc import Callable, Generator, Hashable, Iterable, Iterator
from contextlib import contextmanager, suppress
from datetime import datetime
from socket import socket
Expand Down Expand Up @@ -109,6 +109,7 @@
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
from airflow.serialization.serialized_objects import DagSerialization
from airflow.triggers.base import BaseEventTrigger, BaseTrigger, DiscrimatedTriggerEvent, TriggerEvent
from airflow.triggers.shared_stream import SharedStreamManager
from airflow.utils.helpers import log_filename_template_renderer
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import create_session, provide_session
Expand Down Expand Up @@ -1069,6 +1070,10 @@ def __init__(self):
self.failed_triggers = deque()
self.job_id = None
self._stop_event = None
self._shared_streams = SharedStreamManager(
log=self.log,
max_subscriber_queue=conf.getint("triggerer", "shared_stream_subscriber_queue_size"),
)
self.blocked_main_thread_warning_threshold = conf.getfloat(
"triggerer", "blocked_main_thread_warning_threshold"
)
Expand Down Expand Up @@ -1136,6 +1141,12 @@ async def arun(self):
reader_task.cancel()
with suppress(asyncio.CancelledError):
await reader_task
# Safety net: cancel any shared-stream poll tasks whose group
# survived per-trigger cleanup. The normal eviction path is
# ``SharedStreamManager.unsubscribe`` in ``run_trigger``'s
# finally; this call only matters when that path was bypassed
# (e.g. the unsubscribe coroutine raised and was swallowed).
await self._shared_streams.stop_all()
# Wait for supporting tasks to complete
await watchdog

Expand Down Expand Up @@ -1437,12 +1448,43 @@ async def run_trigger(

name = self.triggers[trigger_id]["name"]
self.log.info("trigger %s starting", name)

# Triggers that opt into a shared underlying I/O stream
# (BaseEventTrigger.shared_stream_key returns non-None) consume a
# broadcast stream produced by SharedStreamManager and convert it
# via filter_shared_stream(). Everything else stays on the original
# standalone-run() path. The key is computed after
# render_template_fields so any templated attributes are already
# resolved when the key is constructed.
event_trigger: BaseEventTrigger | None = None
if isinstance(trigger, BaseEventTrigger):
event_trigger = trigger
shared_key: Hashable | None = None

with _make_trigger_span(ti=trigger.task_instance, trigger_id=trigger_id, name=name) as span:
try:
if context is not None:
trigger.render_template_fields(context=context)

async for event in trigger.run():
if event_trigger is not None:
try:
shared_key = event_trigger.shared_stream_key()
except Exception:
self.log.exception(
"shared_stream_key() raised; falling back to standalone run",
trigger_id=trigger_id,
)
shared_key = None

if shared_key is not None and event_trigger is not None:
shared_stream = self._shared_streams.subscribe(
trigger_id=trigger_id, trigger=event_trigger, key=shared_key
)
event_stream = event_trigger.filter_shared_stream(shared_stream)
else:
event_stream = trigger.run()

async for event in event_stream:
await self.log.ainfo(
"Trigger fired event", name=self.triggers[trigger_id]["name"], result=event
)
Expand Down Expand Up @@ -1486,6 +1528,17 @@ async def run_trigger(
# fine, the cleanup process will understand that, but we want to
# allow triggers a chance to cleanup, either in that case or if
# they exit cleanly. Exception from cleanup methods are ignored.
if shared_key is not None:
try:
await self._shared_streams.unsubscribe(trigger_id, shared_key)
except Exception:
# Best-effort cleanup, but log so we don't lose
# cancel-propagation or _handle_poll_terminate bugs.
self.log.exception(
"Failed to unsubscribe trigger from shared stream",
trigger_id=trigger_id,
key=shared_key,
)
with suppress(Exception):
await trigger.cleanup()

Expand Down
Loading