From aaa229f55e999899fe66219ff40bd3b97e48083f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:10:22 +0000 Subject: [PATCH 01/47] Add MSC4354 experimental feature flag --- docker/complement/conf/workers-shared-extra.yaml.j2 | 2 ++ synapse/config/experimental.py | 3 +++ 2 files changed, 5 insertions(+) diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2 index 101ff153a5a..120b3b94962 100644 --- a/docker/complement/conf/workers-shared-extra.yaml.j2 +++ b/docker/complement/conf/workers-shared-extra.yaml.j2 @@ -139,6 +139,8 @@ experimental_features: msc4155_enabled: true # Thread Subscriptions msc4306_enabled: true + # Sticky Events + msc4354_enabled: true server_notices: system_mxid_localpart: _server diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index dc5e096791a..a1a537545d7 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -597,5 +597,8 @@ def read_config( # (and MSC4308: Thread Subscriptions extension to Sliding Sync) self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False) + # MSC4354: Sticky Events + self.msc4354_enabled: bool = experimental.get("msc4354_enabled", False) + # MSC4380: Invite blocking self.msc4380_enabled: bool = experimental.get("msc4380_enabled", False) From a25a5b553b9dae63a9ba116c4e42d10128ac8517 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:33:47 +0000 Subject: [PATCH 02/47] Expose MSC4354 enablement on /versions --- synapse/rest/client/versions.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 75f27c98dea..89458495311 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -182,6 +182,8 @@ async def on_GET(self, request: SynapseRequest) -> tuple[int, JsonDict]: "org.matrix.msc4306": self.config.experimental.msc4306_enabled, # MSC4169: Backwards-compatible redaction sending using `/send` "com.beeper.msc4169": self.config.experimental.msc4169_enabled, + # MSC4354: Sticky events + "org.matrix.msc4354": self.config.experimental.msc4354_enabled, # MSC4380: Invite blocking "org.matrix.msc4380": self.config.experimental.msc4380_enabled, }, From 7abcf6d48b6bd5238b3b7394da668e285681b240 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:13:03 +0000 Subject: [PATCH 03/47] Add constants for sticky events --- synapse/api/constants.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 9b6a68e929c..8d87fc42089 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -24,7 +24,7 @@ """Contains constants from the specification.""" import enum -from typing import Final +from typing import Final, TypedDict # the max size of a (canonical-json-encoded) event MAX_PDU_SIZE = 65536 @@ -292,6 +292,8 @@ class EventUnsignedContentFields: # Requesting user's membership, per MSC4115 MEMBERSHIP: Final = "membership" + STICKY_TTL: Final = "msc4354_sticky_duration_ttl_ms" + class MTextFields: """Fields found inside m.text content blocks.""" @@ -377,3 +379,18 @@ class Direction(enum.Enum): class ProfileFields: DISPLAYNAME: Final = "displayname" AVATAR_URL: Final = "avatar_url" + + +class StickyEventField(TypedDict): + duration_ms: int + + +class StickyEvent: + QUERY_PARAM_NAME: Final = "org.matrix.msc4354.sticky_duration_ms" + FIELD_NAME: Final = "msc4354_sticky" + MAX_DURATION_MS: Final = 3600000 # 1 hour + """ + Maximum stickiness duration as specified in MSC4354. + Ensures that data in the /sync response can go down and not grow unbounded. + """ + MAX_EVENTS_IN_SYNC: Final = 100 From c2d4cf490ca76b5e4441ee3d54aec4e4a6da0300 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:10:36 +0000 Subject: [PATCH 04/47] Add sticky_events table --- synapse/_scripts/synapse_port_db.py | 1 + .../schema/main/delta/93/01_sticky_events.sql | 51 +++++++++++++++++++ .../93/01_sticky_events_seq.sql.postgres | 18 +++++++ 3 files changed, 70 insertions(+) create mode 100644 synapse/storage/schema/main/delta/93/01_sticky_events.sql create mode 100644 synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 79b2a0c528e..4fa3def624e 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -132,6 +132,7 @@ "has_known_state", "is_encrypted", ], + "sticky_events": ["soft_failed"], "thread_subscriptions": ["subscribed", "automatic"], "users": ["shadow_banned", "approved", "locked", "suspended"], "un_partial_stated_event_stream": ["rejection_status_changed"], diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql new file mode 100644 index 00000000000..1acd706bc06 --- /dev/null +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -0,0 +1,51 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +CREATE TABLE sticky_events ( + -- Position in the sticky events stream + stream_id INTEGER NOT NULL PRIMARY KEY, + + -- Name of the worker sending this. (This makes the stream compatible with multiple writers.) + instance_name TEXT NOT NULL, + + -- The event ID of the sticky event itself. + event_id TEXT NOT NULL, + + -- The room ID that the sticky event is in. + -- Denormalised for performance. + room_id TEXT NOT NULL, + + -- The stream_ordering of the event. + -- Denormalised for performance since we will want to sort these by stream_ordering + -- when fetching them. + event_stream_ordering INTEGER NOT NULL UNIQUE, + + -- Sender of the sticky event. + -- Denormalised for performance so we can query only for sticky events originating + -- from our homeserver. + sender TEXT NOT NULL, + + -- When the sticky event expires. + expires_at BIGINT NOT NULL, + + -- Whether the event is soft-failed. + -- Denormalised for performance when we want to re-evaluate the soft-failed state of sticky events. + soft_failed BOOLEAN NOT NULL +); + +-- 1. For pulling out sticky events by room at send time, obeying stream ordering range limits. +-- 2. For pulling out soft failed events by room +CREATE INDEX sticky_events_room_idx ON sticky_events (room_id, event_stream_ordering, soft_failed); + +-- A optional integer for combining sticky events with delayed events. Used at send time. +ALTER TABLE delayed_events ADD COLUMN sticky_duration_ms BIGINT; diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres b/synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres new file mode 100644 index 00000000000..9ba72856bc9 --- /dev/null +++ b/synapse/storage/schema/main/delta/93/01_sticky_events_seq.sql.postgres @@ -0,0 +1,18 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +CREATE SEQUENCE sticky_events_sequence; +-- Synapse streams start at 2, because the default position is 1 +-- so any item inserted at position 1 is ignored. +-- We have to use nextval not START WITH 2, see https://github.com/element-hq/synapse/issues/18712 +SELECT nextval('sticky_events_sequence'); From ec5f2f8f394425a6a345d25b2c26c6dfcdbab7b9 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:49:08 +0000 Subject: [PATCH 05/47] Add sticky events store and stream --- synapse/app/generic_worker.py | 2 + synapse/config/workers.py | 2 +- synapse/notifier.py | 1 + synapse/replication/tcp/client.py | 11 +- synapse/replication/tcp/handler.py | 7 + synapse/replication/tcp/streams/__init__.py | 3 + synapse/replication/tcp/streams/_base.py | 45 ++++++ synapse/storage/databases/main/__init__.py | 2 + .../storage/databases/main/events_worker.py | 9 ++ .../storage/databases/main/sticky_events.py | 153 ++++++++++++++++++ synapse/streams/events.py | 3 + synapse/types/__init__.py | 10 +- tests/rest/admin/test_room.py | 4 +- tests/rest/client/test_rooms.py | 4 +- 14 files changed, 249 insertions(+), 7 deletions(-) create mode 100644 synapse/storage/databases/main/sticky_events.py diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 0a4abd18393..159cd44237e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -102,6 +102,7 @@ from synapse.storage.databases.main.sliding_sync import SlidingSyncStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.databases.main.stats import StatsStore +from synapse.storage.databases.main.sticky_events import StickyEventsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.tags import TagsWorkerStore from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore @@ -137,6 +138,7 @@ class GenericWorkerStore( RoomWorkerStore, DirectoryWorkerStore, ThreadSubscriptionsWorkerStore, + StickyEventsWorkerStore, PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ec8ab9506bc..42f11b15d8f 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -127,7 +127,7 @@ class WriterLocations: """Specifies the instances that write various streams. Attributes: - events: The instances that write to the event and backfill streams. + events: The instances that write to the event, backfill and sticky events streams. typing: The instances that write to the typing stream. Currently can only be a single instance. to_device: The instances that write to the to_device stream. Currently diff --git a/synapse/notifier.py b/synapse/notifier.py index cf3923110e3..93d438def71 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -526,6 +526,7 @@ def on_new_event( StreamKeyType.TYPING, StreamKeyType.UN_PARTIAL_STATED_ROOMS, StreamKeyType.THREAD_SUBSCRIPTIONS, + StreamKeyType.STICKY_EVENTS, ], new_token: int, users: Collection[str | UserID] | None = None, diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fdda932ead2..bc7e46d4c92 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -43,7 +43,10 @@ UnPartialStatedEventStream, UnPartialStatedRoomStream, ) -from synapse.replication.tcp.streams._base import ThreadSubscriptionsStream +from synapse.replication.tcp.streams._base import ( + StickyEventsStream, + ThreadSubscriptionsStream, +) from synapse.replication.tcp.streams.events import ( EventsStream, EventsStreamEventRow, @@ -262,6 +265,12 @@ async def on_rdata( token, users=[row.user_id for row in rows], ) + elif stream_name == StickyEventsStream.NAME: + self.notifier.on_new_event( + StreamKeyType.STICKY_EVENTS, + token, + rooms=[row.room_id for row in rows], + ) await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 05370045e6b..d8c42b6a344 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -66,6 +66,7 @@ ) from synapse.replication.tcp.streams._base import ( DeviceListsStream, + StickyEventsStream, ThreadSubscriptionsStream, ) from synapse.util.background_queue import BackgroundQueue @@ -216,6 +217,12 @@ def __init__(self, hs: "HomeServer"): continue + if isinstance(stream, StickyEventsStream): + if hs.get_instance_name() in hs.config.worker.writers.events: + self._streams_to_replicate.append(stream) + + continue + if isinstance(stream, DeviceListsStream): if hs.get_instance_name() in hs.config.worker.writers.device_lists: self._streams_to_replicate.append(stream) diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 87ac0a5ae17..067847617fa 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -40,6 +40,7 @@ PushersStream, PushRulesStream, ReceiptsStream, + StickyEventsStream, Stream, ThreadSubscriptionsStream, ToDeviceStream, @@ -68,6 +69,7 @@ ToDeviceStream, FederationStream, AccountDataStream, + StickyEventsStream, ThreadSubscriptionsStream, UnPartialStatedRoomStream, UnPartialStatedEventStream, @@ -90,6 +92,7 @@ "ToDeviceStream", "FederationStream", "AccountDataStream", + "StickyEventsStream", "ThreadSubscriptionsStream", "UnPartialStatedRoomStream", "UnPartialStatedEventStream", diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 4fb2aac2029..336b50160b5 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -763,3 +763,48 @@ async def _update_function( return [], to_token, False return rows, rows[-1][0], len(updates) == limit + + +@attr.s(slots=True, auto_attribs=True) +class StickyEventsStreamRow: + """Stream to inform workers about changes to sticky events.""" + + room_id: str + + event_id: str + """The sticky event ID""" + + +class StickyEventsStream(_StreamFromIdGen): + """A sticky event was changed.""" + + NAME = "sticky_events" + ROW_TYPE = StickyEventsStreamRow + + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastores().main + super().__init__( + hs.get_instance_name(), + self._update_function, + self.store._sticky_events_id_gen, + ) + + async def _update_function( + self, instance_name: str, from_token: int, to_token: int, limit: int + ) -> StreamUpdateResult: + updates = await self.store.get_updated_sticky_events( + from_id=from_token, to_id=to_token, limit=limit + ) + rows = [ + ( + stream_id, + # These are the args to `StickyEventsStreamRow` + (room_id, event_id), + ) + for stream_id, room_id, event_id, _ in updates + ] + + if not rows: + return [], to_token, False + + return rows, rows[-1][0], len(updates) == limit diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 12593094f18..9f8d4debbe0 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -34,6 +34,7 @@ ) from synapse.storage.databases.main.sliding_sync import SlidingSyncStore from synapse.storage.databases.main.stats import UserSortOrder +from synapse.storage.databases.main.sticky_events import StickyEventsWorkerStore from synapse.storage.databases.main.thread_subscriptions import ( ThreadSubscriptionsWorkerStore, ) @@ -144,6 +145,7 @@ class DataStore( TagsStore, AccountDataStore, ThreadSubscriptionsWorkerStore, + StickyEventsWorkerStore, PushRulesWorkerStore, StreamWorkerStore, OpenIdStore, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ae6ee50dc24..e13f807148f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -68,6 +68,10 @@ wrap_as_background_process, ) from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream +from synapse.replication.tcp.streams._base import ( + StickyEventsStream, + StickyEventsStreamRow, +) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -459,6 +463,11 @@ def process_replication_rows( # If the partial-stated event became rejected or unrejected # when it wasn't before, we need to invalidate this cache. self._invalidate_local_get_event_cache(row.event_id) + elif stream_name == StickyEventsStream.NAME: + for row in rows: + assert isinstance(row, StickyEventsStreamRow) + # In case soft-failure status changed, invalidate the cache. + self._invalidate_local_get_event_cache(row.event_id) super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py new file mode 100644 index 00000000000..d12a633e07b --- /dev/null +++ b/synapse/storage/databases/main/sticky_events.py @@ -0,0 +1,153 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +import logging +from typing import ( + TYPE_CHECKING, + cast, +) + +from twisted.internet.defer import Deferred + +from synapse.events import EventBase +from synapse.replication.tcp.streams._base import StickyEventsStream +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) +from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore +from synapse.storage.databases.main.state import StateGroupWorkerStore +from synapse.storage.util.id_generators import MultiWriterIdGenerator +from synapse.util.duration import Duration + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + +# Remove entries from the sticky_events table at this frequency. +# Note: this does NOT mean we don't honour shorter expiration timeouts. +# Consumers call 'get_sticky_events_in_rooms' which has `WHERE expires_at > ?` +# to filter out expired sticky events that have yet to be deleted. +DELETE_EXPIRED_STICKY_EVENTS_INTERVAL = Duration(hours=1) + + +class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._can_write_to_sticky_events = ( + self._instance_name in hs.config.worker.writers.events + ) + + # Technically this means we will cleanup N times, once per event persister, maybe put on master? + if self._can_write_to_sticky_events: + self.clock.looping_call( + self._run_background_cleanup, DELETE_EXPIRED_STICKY_EVENTS_INTERVAL + ) + + self._sticky_events_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + notifier=hs.get_replication_notifier(), + stream_name="sticky_events", + server_name=self.server_name, + instance_name=self._instance_name, + tables=[ + ("sticky_events", "instance_name", "stream_id"), + ], + sequence_name="sticky_events_sequence", + writers=hs.config.worker.writers.events, + ) + + def process_replication_position( + self, stream_name: str, instance_name: str, token: int + ) -> None: + if stream_name == StickyEventsStream.NAME: + self._sticky_events_id_gen.advance(instance_name, token) + super().process_replication_position(stream_name, instance_name, token) + + def get_max_sticky_events_stream_id(self) -> int: + """Get the current maximum stream_id for thread subscriptions. + + Returns: + The maximum stream_id + """ + return self._sticky_events_id_gen.get_current_token() + + def get_sticky_events_stream_id_generator(self) -> MultiWriterIdGenerator: + return self._sticky_events_id_gen + + async def get_updated_sticky_events( + self, from_id: int, to_id: int, limit: int + ) -> list[tuple[int, str, str, bool]]: + """Get updates to sticky events between two stream IDs. + + Args: + from_id: The starting stream ID (exclusive) + to_id: The ending stream ID (inclusive) + limit: The maximum number of rows to return + + Returns: + list of (stream_id, room_id, event_id, soft_failed) tuples + """ + return await self.db_pool.runInteraction( + "get_updated_sticky_events", + self._get_updated_sticky_events_txn, + from_id, + to_id, + limit, + ) + + def _get_updated_sticky_events_txn( + self, txn: LoggingTransaction, from_id: int, to_id: int, limit: int + ) -> list[tuple[int, str, str, bool]]: + txn.execute( + """ + SELECT stream_id, room_id, event_id, soft_failed + FROM sticky_events + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """, + (from_id, to_id, limit), + ) + return cast(list[tuple[int, str, str, bool]], txn.fetchall()) + + async def _delete_expired_sticky_events(self) -> None: + logger.info("delete_expired_sticky_events") + await self.db_pool.runInteraction( + "_delete_expired_sticky_events", + self._delete_expired_sticky_events_txn, + self.clock.time_msec(), + ) + + def _delete_expired_sticky_events_txn( + self, txn: LoggingTransaction, now: int + ) -> None: + txn.execute( + """ + DELETE FROM sticky_events WHERE expires_at < ? + """, + (now,), + ) + + def _run_background_cleanup(self) -> Deferred: + return self.hs.run_as_background_process( + "delete_expired_sticky_events", + self._delete_expired_sticky_events, + ) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 143f659499b..d2720fb9592 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -84,6 +84,7 @@ def get_current_token(self) -> StreamToken: self._instance_name ) thread_subscriptions_key = self.store.get_max_thread_subscriptions_stream_id() + sticky_events_key = self.store.get_max_sticky_events_stream_id() token = StreamToken( room_key=self.sources.room.get_current_key(), @@ -98,6 +99,7 @@ def get_current_token(self) -> StreamToken: groups_key=0, un_partial_stated_rooms_key=un_partial_stated_rooms_key, thread_subscriptions_key=thread_subscriptions_key, + sticky_events_key=sticky_events_key, ) return token @@ -125,6 +127,7 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken: StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(), StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(), StreamKeyType.THREAD_SUBSCRIPTIONS: self.store.get_thread_subscriptions_stream_id_generator(), + StreamKeyType.STICKY_EVENTS: self.store.get_sticky_events_stream_id_generator(), } for _, key in StreamKeyType.__members__.items(): diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 16892b37c0b..b9e9c21741e 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1006,6 +1006,7 @@ class StreamKeyType(Enum): DEVICE_LIST = "device_list_key" UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key" THREAD_SUBSCRIPTIONS = "thread_subscriptions_key" + STICKY_EVENTS = "sticky_events_key" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -1027,6 +1028,7 @@ class StreamToken: 9. `groups_key`: `1` (note that this key is now unused) 10. `un_partial_stated_rooms_key`: `379` 11. `thread_subscriptions_key`: 4242 + 12. `sticky_events_key`: 4141 You can see how many of these keys correspond to the various fields in a "/sync" response: @@ -1086,6 +1088,7 @@ class StreamToken: groups_key: int un_partial_stated_rooms_key: int thread_subscriptions_key: int + sticky_events_key: int _SEPARATOR = "_" START: ClassVar["StreamToken"] @@ -1114,6 +1117,7 @@ async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": groups_key, un_partial_stated_rooms_key, thread_subscriptions_key, + sticky_events_key, ) = keys return cls( @@ -1130,6 +1134,7 @@ async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": groups_key=int(groups_key), un_partial_stated_rooms_key=int(un_partial_stated_rooms_key), thread_subscriptions_key=int(thread_subscriptions_key), + sticky_events_key=int(sticky_events_key), ) except CancelledError: raise @@ -1153,6 +1158,7 @@ async def to_string(self, store: "DataStore") -> str: str(self.groups_key), str(self.un_partial_stated_rooms_key), str(self.thread_subscriptions_key), + str(self.sticky_events_key), ] ) @@ -1218,6 +1224,7 @@ def get_field( StreamKeyType.TYPING, StreamKeyType.UN_PARTIAL_STATED_ROOMS, StreamKeyType.THREAD_SUBSCRIPTIONS, + StreamKeyType.STICKY_EVENTS, ], ) -> int: ... @@ -1274,7 +1281,7 @@ def __str__(self) -> str: f"account_data: {self.account_data_key}, push_rules: {self.push_rules_key}, " f"to_device: {self.to_device_key}, device_list: {self.device_list_key}, " f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key}," - f"thread_subscriptions: {self.thread_subscriptions_key})" + f"thread_subscriptions: {self.thread_subscriptions_key}, sticky_events: {self.sticky_events_key})" ) @@ -1290,6 +1297,7 @@ def __str__(self) -> str: groups_key=0, un_partial_stated_rooms_key=0, thread_subscriptions_key=0, + sticky_events_key=0, ) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index ad713b4da4d..5956a43d71e 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -2545,7 +2545,7 @@ def test_timestamp_to_event(self) -> None: def test_topo_token_is_accepted(self) -> None: """Test Topo Token is accepted.""" - token = "t1-0_0_0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), @@ -2559,7 +2559,7 @@ def test_topo_token_is_accepted(self) -> None: def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: """Test that stream token is accepted for forward pagination.""" - token = "s0_0_0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token), diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 926560afd6b..f85c9939ce4 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -2245,7 +2245,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.room_id = self.helper.create_room_as(self.user_id) def test_topo_token_is_accepted(self) -> None: - token = "t1-0_0_0_0_0_0_0_0_0_0_0" + token = "t1-0_0_0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) @@ -2256,7 +2256,7 @@ def test_topo_token_is_accepted(self) -> None: self.assertTrue("end" in channel.json_body) def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None: - token = "s0_0_0_0_0_0_0_0_0_0_0" + token = "s0_0_0_0_0_0_0_0_0_0_0_0" channel = self.make_request( "GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token) ) From 003fe1f5357bc13842086b78f36868f4dc85dc79 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:15:18 +0000 Subject: [PATCH 06/47] EventBase: add the concept of sticky_duration --- synapse/events/__init__.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index c7eaf7eda2b..83916211af8 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -36,7 +36,12 @@ import attr from unpaddedbase64 import encode_base64 -from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + RelationTypes, + StickyEvent, +) from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.synapse_rust.events import EventInternalMetadata from synapse.types import ( @@ -318,6 +323,23 @@ def freeze(self) -> None: # this will be a no-op if the event dict is already frozen. self._dict = freeze(self._dict) + def sticky_duration(self) -> int | None: + """ + Returns the effective sticky duration of this event, or None + if the event does not have a sticky duration. + (Sticky Events are a MSC4354 feature.) + + Clamps the sticky duration to the maximum allowed duration. + """ + sticky_obj = self.get_dict().get(StickyEvent.FIELD_NAME, None) + if type(sticky_obj) is not dict: + return None + sticky_duration_ms = sticky_obj.get("duration_ms", None) + # MSC: Valid values are the integer range 0-MAX_DURATION_MS + if type(sticky_duration_ms) is int and sticky_duration_ms >= 0: + return min(sticky_duration_ms, StickyEvent.MAX_DURATION_MS) + return None + def __str__(self) -> str: return self.__repr__() From c7075d64d1882a5402f3eeb9394f27c2de7d2432 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:15:49 +0000 Subject: [PATCH 07/47] EventBuilder: allow building events with sticky event fields --- synapse/events/builder.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 6a2812109d0..1490aa57527 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -24,7 +24,7 @@ import attr from signedjson.types import SigningKey -from synapse.api.constants import MAX_DEPTH, EventTypes +from synapse.api.constants import MAX_DEPTH, EventTypes, StickyEvent, StickyEventField from synapse.api.room_versions import ( KNOWN_EVENT_FORMAT_VERSIONS, EventFormatVersions, @@ -89,6 +89,10 @@ class EventBuilder: content: JsonDict = attr.Factory(dict) unsigned: JsonDict = attr.Factory(dict) + sticky: StickyEventField | None = None + """ + Fields for MSC4354: Sticky Events + """ # These only exist on a subset of events, so they raise AttributeError if # someone tries to get them when they don't exist. @@ -269,6 +273,9 @@ async def build( if self._origin_server_ts is not None: event_dict["origin_server_ts"] = self._origin_server_ts + if self.sticky is not None: + event_dict[StickyEvent.FIELD_NAME] = self.sticky + return create_local_event_from_event_dict( clock=self._clock, hostname=self._hostname, @@ -318,6 +325,7 @@ def for_room_version( unsigned=key_values.get("unsigned", {}), redacts=key_values.get("redacts", None), origin_server_ts=key_values.get("origin_server_ts", None), + sticky=key_values.get(StickyEvent.FIELD_NAME, None), ) From 0a4f7e65a72646d413f2f8e49e89501e27164e12 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 18:25:41 +0000 Subject: [PATCH 08/47] store method: insert_sticky_events_txn --- .../storage/databases/main/sticky_events.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index d12a633e07b..3efae45b518 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -128,6 +128,79 @@ def _get_updated_sticky_events_txn( ) return cast(list[tuple[int, str, str, bool]], txn.fetchall()) + def insert_sticky_events_txn( + self, + txn: LoggingTransaction, + events: list[EventBase], + ) -> None: + now_ms = self.clock.time_msec() + # event, expires_at + sticky_events: list[tuple[EventBase, int]] = [] + for ev in events: + # MSC: Note: policy servers and other similar antispam techniques still apply to these events. + if ev.internal_metadata.policy_server_spammy: + continue + # We shouldn't be passed rejected events, but if we do, we filter them out too. + if ev.rejected_reason is not None: + continue + # We can't persist outlier sticky events as we don't know the room state at that event + if ev.internal_metadata.is_outlier(): + continue + sticky_duration = ev.sticky_duration() + if sticky_duration is None: + continue + # Calculate the end time as start_time + effecitve sticky duration + expires_at = min(ev.origin_server_ts, now_ms) + sticky_duration + # Filter out already expired sticky events + if expires_at <= now_ms: + continue + + sticky_events.append((ev, expires_at)) + + if len(sticky_events) == 0: + return + + logger.info( + "inserting %d sticky events in room %s", + len(sticky_events), + sticky_events[0][0].room_id, + ) + + # Generate stream_ids in one go + sticky_events_with_ids = zip( + sticky_events, + self._sticky_events_id_gen.get_next_mult_txn(txn, len(sticky_events)), + strict=True, + ) + + self.db_pool.simple_insert_many_txn( + txn, + "sticky_events", + keys=( + "instance_name", + "stream_id", + "room_id", + "event_id", + "event_stream_ordering", + "sender", + "expires_at", + "soft_failed", + ), + values=[ + ( + self._instance_name, + stream_id, + ev.room_id, + ev.event_id, + ev.internal_metadata.stream_ordering, + ev.sender, + expires_at, + ev.internal_metadata.is_soft_failed(), + ) + for (ev, expires_at), stream_id in sticky_events_with_ids + ], + ) + async def _delete_expired_sticky_events(self) -> None: logger.info("delete_expired_sticky_events") await self.db_pool.runInteraction( From 53e4968220d4d821190daf8e8cc056a42f5b8ea7 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:41:16 +0000 Subject: [PATCH 09/47] When persisting currently-sticky events, add to sticky event stream --- synapse/storage/databases/main/events.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 60fc884c3a6..cb452dbc9b1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -264,6 +264,7 @@ def __init__( self.database_engine = db.engine self._clock = hs.get_clock() self._instance_name = hs.get_instance_name() + self._msc4354_enabled = hs.config.experimental.msc4354_enabled self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id @@ -1185,6 +1186,11 @@ def _persist_events_txn( sliding_sync_table_changes, ) + if self._msc4354_enabled: + self.store.insert_sticky_events_txn( + txn, [ev for ev, _ in events_and_contexts] + ) + # We only update the sliding sync tables for non-backfilled events. self._update_sliding_sync_tables_with_new_persisted_events_txn( txn, room_id, events_and_contexts @@ -2646,6 +2652,11 @@ def _update_outliers_txn( # event isn't an outlier any more. self._update_backward_extremeties(txn, [event]) + if self._msc4354_enabled and event.sticky_duration(): + # The de-outliered event is sticky. Update the sticky events table to ensure + # we deliver this down /sync. + self.store.insert_sticky_events_txn(txn, [event]) + return [ec for ec in events_and_contexts if ec[0] not in to_remove] def _store_event_txn( From 137c457ca9e1ac6362e2a2decedab528772027cd Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:34:24 +0000 Subject: [PATCH 10/47] Allow clients to send sticky events Including delayed events --- synapse/handlers/delayed_events.py | 13 ++++++++--- synapse/rest/client/room.py | 23 ++++++++++++++++++- .../storage/databases/main/delayed_events.py | 9 +++++++- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index c58d1d42bcf..a9b32b6fb03 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -17,7 +17,7 @@ from twisted.internet.interfaces import IDelayedCall -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, StickyEvent from synapse.api.errors import ShadowBanError, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME @@ -333,6 +333,7 @@ async def add( origin_server_ts: int | None, content: JsonDict, delay: int, + sticky_duration_ms: int | None, ) -> str: """ Creates a new delayed event and schedules its delivery. @@ -346,7 +347,9 @@ async def add( If None, the timestamp will be the actual time when the event is sent. content: The content of the event to be sent. delay: How long (in milliseconds) to wait before automatically sending the event. - + sticky_duration_ms: If an MSC4354 sticky event: the sticky duration (in milliseconds). + The event will be attempted to be reliably delivered to clients and remote servers + during its sticky period. Returns: The ID of the added delayed event. Raises: @@ -382,6 +385,7 @@ async def add( origin_server_ts=origin_server_ts, content=content, delay=delay, + sticky_duration_ms=sticky_duration_ms, ) if self._repl_client is not None: @@ -570,7 +574,10 @@ async def _send_event( if event.state_key is not None: event_dict["state_key"] = event.state_key - + if event.sticky_duration_ms is not None: + event_dict[StickyEvent.FIELD_NAME] = { + "duration_ms": event.sticky_duration_ms, + } ( sent_event, _, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 5e7dcb01911..d09425a1869 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -34,7 +34,7 @@ from twisted.web.server import Request from synapse import event_auth -from synapse.api.constants import Direction, EventTypes, Membership +from synapse.api.constants import Direction, EventTypes, Membership, StickyEvent from synapse.api.errors import ( AuthError, Codes, @@ -210,6 +210,7 @@ def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self._max_event_delay_ms = hs.config.server.max_event_delay_ms self._spam_checker_module_callbacks = hs.get_module_api_callbacks().spam_checker + self._msc4354_enabled = hs.config.experimental.msc4354_enabled def register(self, http_server: HttpServer) -> None: # /rooms/$roomid/state/$eventtype @@ -331,6 +332,10 @@ async def on_PUT( if requester.app_service: origin_server_ts = parse_integer(request, "ts") + sticky_duration_ms: int | None = None + if self._msc4354_enabled: + sticky_duration_ms = parse_integer(request, StickyEvent.QUERY_PARAM_NAME) + delay = _parse_request_delay(request, self._max_event_delay_ms) if delay is not None: delay_id = await self.delayed_events_handler.add( @@ -341,6 +346,7 @@ async def on_PUT( origin_server_ts=origin_server_ts, content=content, delay=delay, + sticky_duration_ms=sticky_duration_ms, ) set_tag("delay_id", delay_id) @@ -368,6 +374,10 @@ async def on_PUT( "room_id": room_id, "sender": requester.user.to_string(), } + if sticky_duration_ms is not None: + event_dict[StickyEvent.FIELD_NAME] = { + "duration_ms": sticky_duration_ms, + } if state_key is not None: event_dict["state_key"] = state_key @@ -400,6 +410,7 @@ def __init__(self, hs: "HomeServer"): self.delayed_events_handler = hs.get_delayed_events_handler() self.auth = hs.get_auth() self._max_event_delay_ms = hs.config.server.max_event_delay_ms + self._msc4354_enabled = hs.config.experimental.msc4354_enabled def register(self, http_server: HttpServer) -> None: # /rooms/$roomid/send/$event_type[/$txn_id] @@ -420,6 +431,10 @@ async def _do( if requester.app_service: origin_server_ts = parse_integer(request, "ts") + sticky_duration_ms: int | None = None + if self._msc4354_enabled: + sticky_duration_ms = parse_integer(request, StickyEvent.QUERY_PARAM_NAME) + delay = _parse_request_delay(request, self._max_event_delay_ms) if delay is not None: delay_id = await self.delayed_events_handler.add( @@ -430,6 +445,7 @@ async def _do( origin_server_ts=origin_server_ts, content=content, delay=delay, + sticky_duration_ms=sticky_duration_ms, ) set_tag("delay_id", delay_id) @@ -446,6 +462,11 @@ async def _do( if origin_server_ts is not None: event_dict["origin_server_ts"] = origin_server_ts + if sticky_duration_ms is not None: + event_dict[StickyEvent.FIELD_NAME] = { + "duration_ms": sticky_duration_ms, + } + try: ( event, diff --git a/synapse/storage/databases/main/delayed_events.py b/synapse/storage/databases/main/delayed_events.py index 55471505154..1727f589e2a 100644 --- a/synapse/storage/databases/main/delayed_events.py +++ b/synapse/storage/databases/main/delayed_events.py @@ -54,6 +54,7 @@ class EventDetails: origin_server_ts: Timestamp | None content: JsonDict device_id: DeviceID | None + sticky_duration_ms: int | None @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -122,6 +123,7 @@ async def add_delayed_event( origin_server_ts: int | None, content: JsonDict, delay: int, + sticky_duration_ms: int | None, ) -> tuple[DelayID, Timestamp]: """ Inserts a new delayed event in the DB. @@ -148,6 +150,7 @@ def add_delayed_event_txn(txn: LoggingTransaction) -> Timestamp: "state_key": state_key, "origin_server_ts": origin_server_ts, "content": json_encoder.encode(content), + "sticky_duration_ms": sticky_duration_ms, }, ) @@ -299,6 +302,7 @@ def process_timeout_delayed_events_txn( "send_ts", "content", "device_id", + "sticky_duration_ms", ) ) sql_update = "UPDATE delayed_events SET is_processed = TRUE" @@ -344,6 +348,7 @@ def process_timeout_delayed_events_txn( Timestamp(row[5] if row[5] is not None else row[6]), db_to_json(row[7]), DeviceID(row[8]) if row[8] is not None else None, + int(row[9]) if row[9] is not None else None, DelayID(row[0]), UserLocalpart(row[1]), ) @@ -392,6 +397,7 @@ def process_target_delayed_event_txn( origin_server_ts, content, device_id, + sticky_duration_ms, user_localpart """, (delay_id,), @@ -407,8 +413,9 @@ def process_target_delayed_event_txn( Timestamp(row[3]) if row[3] is not None else None, db_to_json(row[4]), DeviceID(row[5]) if row[5] is not None else None, + int(row[6]) if row[6] is not None else None, DelayID(delay_id), - UserLocalpart(row[6]), + UserLocalpart(row[7]), ) return event, self._get_next_delayed_event_send_ts_txn(txn) From 90e9539c47d7a4ef80e014e59759316fceadb511 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:27:14 +0000 Subject: [PATCH 11/47] Add test helper for sending sticky events --- tests/rest/client/utils.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index b3808d75bb9..81a95461cff 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -453,6 +453,40 @@ def send_event( return channel.json_body + def send_sticky_event( + self, + room_id: str, + type: str, + *, + duration_ms: int, + content: dict | None = None, + txn_id: str | None = None, + tok: str | None = None, + expect_code: int = HTTPStatus.OK, + custom_headers: Iterable[tuple[AnyStr, AnyStr]] | None = None, + ) -> JsonDict: + if txn_id is None: + txn_id = f"m{time.time()}" + + path = f"/_matrix/client/r0/rooms/{room_id}/send/{type}/{txn_id}?org.matrix.msc4354.sticky_duration_ms={duration_ms}" + if tok: + path = path + f"&access_token={tok}" + + channel = make_request( + self.reactor, + self.site, + "PUT", + path, + content or {}, + custom_headers=custom_headers, + ) + + assert channel.code == expect_code, ( + f"Expected: {expect_code}, got: {channel.code}, resp: {channel.result['body']!r}" + ) + + return channel.json_body + def get_event( self, room_id: str, From 9a0b28e81007f5453cdcb045de1953a29db3dff1 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 22 Dec 2025 14:27:59 +0000 Subject: [PATCH 12/47] Expose the sticky event TTL to clients --- synapse/visibility.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/visibility.py b/synapse/visibility.py index 452a2d50fbb..bfde05fe81f 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -237,6 +237,15 @@ def allowed(event: EventBase) -> EventBase | None: # to the cache! cloned = clone_event(filtered) cloned.unsigned[EventUnsignedContentFields.MEMBERSHIP] = user_membership + if storage.main.config.experimental.msc4354_enabled: + sticky_duration = cloned.sticky_duration() + if sticky_duration: + now = storage.main.clock.time_msec() + expires_at = min(cloned.origin_server_ts, now) + sticky_duration + if expires_at > now: + cloned.unsigned[EventUnsignedContentFields.STICKY_TTL] = ( + expires_at - now + ) return cloned From b41ef410025bc4bc9727938a45de7597dae9e88b Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 5 Jan 2026 14:03:56 +0000 Subject: [PATCH 13/47] Add a test for sticky TTL calculation and exposure to clients --- tests/rest/client/test_sticky_events.py | 117 ++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 tests/rest/client/test_sticky_events.py diff --git a/tests/rest/client/test_sticky_events.py b/tests/rest/client/test_sticky_events.py new file mode 100644 index 00000000000..a16f74c4256 --- /dev/null +++ b/tests/rest/client/test_sticky_events.py @@ -0,0 +1,117 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# + +from twisted.internet.testing import MemoryReactor + +from synapse.api.constants import EventTypes, EventUnsignedContentFields +from synapse.rest import admin +from synapse.rest.client import login, register, room +from synapse.server import HomeServer +from synapse.types import JsonDict +from synapse.util.clock import Clock + +from tests import unittest + + +class StickyEventsClientTestCase(unittest.HomeserverTestCase): + """Tests sticky events retrieved via the /event/ endpoint.""" + + servlets = [ + room.register_servlets, + login.register_servlets, + register.register_servlets, + admin.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + config["experimental_features"] = {"msc4354_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + # Arrange: Register an account + self.user_id = self.register_user("user1", "pass") + self.token = self.login(self.user_id, "pass") + + # Arrange: Create a room + self.room_id = self.helper.create_room_as(self.user_id, tok=self.token) + + def _assert_event_sticky_for(self, event_id: str, sticky_ttl: int) -> None: + channel = self.make_request( + "GET", + f"/rooms/{self.room_id}/event/{event_id}", + access_token=self.token, + ) + + self.assertEqual( + channel.code, 200, f"could not retrieve event {event_id}: {channel.result}" + ) + event = channel.json_body + + self.assertIn( + EventUnsignedContentFields.STICKY_TTL, + event["unsigned"], + f"No {EventUnsignedContentFields.STICKY_TTL} field in {event_id}; event not sticky: {event}", + ) + self.assertEqual( + event["unsigned"][EventUnsignedContentFields.STICKY_TTL], + sticky_ttl, + f"{event_id} had an unexpected sticky TTL: {event}", + ) + + def _assert_event_not_sticky(self, event_id: str) -> None: + channel = self.make_request( + "GET", + f"/rooms/{self.room_id}/event/{event_id}", + access_token=self.token, + ) + + self.assertEqual( + channel.code, 200, f"could not retrieve event {event_id}: {channel.result}" + ) + event = channel.json_body + + self.assertNotIn( + EventUnsignedContentFields.STICKY_TTL, + event["unsigned"], + f"{EventUnsignedContentFields.STICKY_TTL} field unexpectedly found in {event_id}: {event}", + ) + + def test_sticky_event_via_event_endpoint(self) -> None: + # Arrange: Send a sticky event with a specific duration + sticky_event_response = self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + # sticky duration is 1 minute + duration_ms=60_000, + content={"body": "sticky message", "msgtype": "m.text"}, + tok=self.token, + ) + event_id = sticky_event_response["event_id"] + + # If we request the event immediately, it will still have + # 1 minute of stickiness + # The other 100 ms is advanced in FakeChannel.await_result. + self._assert_event_sticky_for(event_id, 59_900) + + # But if we advance time by 59.799 seconds... + # we will get the event on its last millisecond of stickiness + # The other 100 ms is advanced in FakeChannel.await_result. + self.reactor.advance(59.799) + self._assert_event_sticky_for(event_id, 1) + + # Advancing time any more, the event is no longer sticky + self.reactor.advance(0.001) + self._assert_event_not_sticky(event_id) From fb67e9a77b443f10f41011abae0674f1cebede86 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Fri, 9 Jan 2026 16:36:20 +0000 Subject: [PATCH 14/47] Newsfile Signed-off-by: Olivier 'reivilibre --- changelog.d/19365.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19365.feature diff --git a/changelog.d/19365.feature b/changelog.d/19365.feature new file mode 100644 index 00000000000..c35afdc179e --- /dev/null +++ b/changelog.d/19365.feature @@ -0,0 +1 @@ +Support sending and receiving [MSC4354 Sticky Event](https://github.com/matrix-org/matrix-spec-proposals/pull/4354) metadata. \ No newline at end of file From 383d97c432215ece416841ff78f08f602c656f16 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 22 Jan 2026 20:14:49 +0000 Subject: [PATCH 15/47] Use Duration --- synapse/api/constants.py | 4 +++- synapse/events/__init__.py | 11 ++++++++--- synapse/storage/databases/main/sticky_events.py | 2 +- synapse/visibility.py | 10 ++++++---- tests/rest/client/test_sticky_events.py | 4 ++-- tests/rest/client/utils.py | 5 +++-- 6 files changed, 23 insertions(+), 13 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 8d87fc42089..a478846f52d 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -26,6 +26,8 @@ import enum from typing import Final, TypedDict +from synapse.util.duration import Duration + # the max size of a (canonical-json-encoded) event MAX_PDU_SIZE = 65536 @@ -388,7 +390,7 @@ class StickyEventField(TypedDict): class StickyEvent: QUERY_PARAM_NAME: Final = "org.matrix.msc4354.sticky_duration_ms" FIELD_NAME: Final = "msc4354_sticky" - MAX_DURATION_MS: Final = 3600000 # 1 hour + MAX_DURATION: Duration = Duration(hours=1) """ Maximum stickiness duration as specified in MSC4354. Ensures that data in the /sync response can go down and not grow unbounded. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 83916211af8..f8f388c3427 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -49,6 +49,7 @@ StrCollection, ) from synapse.util.caches import intern_dict +from synapse.util.duration import Duration from synapse.util.frozenutils import freeze if TYPE_CHECKING: @@ -323,7 +324,7 @@ def freeze(self) -> None: # this will be a no-op if the event dict is already frozen. self._dict = freeze(self._dict) - def sticky_duration(self) -> int | None: + def sticky_duration(self) -> Duration | None: """ Returns the effective sticky duration of this event, or None if the event does not have a sticky duration. @@ -335,9 +336,13 @@ def sticky_duration(self) -> int | None: if type(sticky_obj) is not dict: return None sticky_duration_ms = sticky_obj.get("duration_ms", None) - # MSC: Valid values are the integer range 0-MAX_DURATION_MS + # MSC: Clamp to 0 and MAX_DURATION (1 hour) + # `type(..) is int` needed to avoid accepting bool if type(sticky_duration_ms) is int and sticky_duration_ms >= 0: - return min(sticky_duration_ms, StickyEvent.MAX_DURATION_MS) + return min( + Duration(milliseconds=sticky_duration_ms), + StickyEvent.MAX_DURATION, + ) return None def __str__(self) -> str: diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 3efae45b518..f12ff96e23a 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -150,7 +150,7 @@ def insert_sticky_events_txn( if sticky_duration is None: continue # Calculate the end time as start_time + effecitve sticky duration - expires_at = min(ev.origin_server_ts, now_ms) + sticky_duration + expires_at = min(ev.origin_server_ts, now_ms) + sticky_duration.as_millis() # Filter out already expired sticky events if expires_at <= now_ms: continue diff --git a/synapse/visibility.py b/synapse/visibility.py index bfde05fe81f..28fb319e6c8 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -240,11 +240,13 @@ def allowed(event: EventBase) -> EventBase | None: if storage.main.config.experimental.msc4354_enabled: sticky_duration = cloned.sticky_duration() if sticky_duration: - now = storage.main.clock.time_msec() - expires_at = min(cloned.origin_server_ts, now) + sticky_duration - if expires_at > now: + now_ms = storage.main.clock.time_msec() + expires_at = ( + min(cloned.origin_server_ts, now_ms) + sticky_duration.as_millis() + ) + if expires_at > now_ms: cloned.unsigned[EventUnsignedContentFields.STICKY_TTL] = ( - expires_at - now + expires_at - now_ms ) return cloned diff --git a/tests/rest/client/test_sticky_events.py b/tests/rest/client/test_sticky_events.py index a16f74c4256..bec5a4b6f7b 100644 --- a/tests/rest/client/test_sticky_events.py +++ b/tests/rest/client/test_sticky_events.py @@ -21,6 +21,7 @@ from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.duration import Duration from tests import unittest @@ -94,8 +95,7 @@ def test_sticky_event_via_event_endpoint(self) -> None: sticky_event_response = self.helper.send_sticky_event( self.room_id, EventTypes.Message, - # sticky duration is 1 minute - duration_ms=60_000, + duration=Duration(minutes=1), content={"body": "sticky message", "msgtype": "m.text"}, tok=self.token, ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 81a95461cff..886a73230f3 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -48,6 +48,7 @@ from synapse.api.errors import Codes from synapse.server import HomeServer from synapse.types import JsonDict +from synapse.util.duration import Duration from tests.server import FakeChannel, make_request from tests.test_utils.html_parsers import TestHtmlParser @@ -458,7 +459,7 @@ def send_sticky_event( room_id: str, type: str, *, - duration_ms: int, + duration: Duration, content: dict | None = None, txn_id: str | None = None, tok: str | None = None, @@ -468,7 +469,7 @@ def send_sticky_event( if txn_id is None: txn_id = f"m{time.time()}" - path = f"/_matrix/client/r0/rooms/{room_id}/send/{type}/{txn_id}?org.matrix.msc4354.sticky_duration_ms={duration_ms}" + path = f"/_matrix/client/r0/rooms/{room_id}/send/{type}/{txn_id}?org.matrix.msc4354.sticky_duration_ms={duration.as_millis()}" if tok: path = path + f"&access_token={tok}" From 47306d2efca729379e947d8a7d371816a17685ab Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 22 Jan 2026 20:16:34 +0000 Subject: [PATCH 16/47] Tweak comment on SQL column --- synapse/storage/schema/main/delta/93/01_sticky_events.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index 1acd706bc06..bcd44060bcd 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -35,7 +35,7 @@ CREATE TABLE sticky_events ( -- from our homeserver. sender TEXT NOT NULL, - -- When the sticky event expires. + -- When the sticky event expires, in milliseconds since the Unix epoch. expires_at BIGINT NOT NULL, -- Whether the event is soft-failed. From beead8a1c867fbf846b85e04933422f3ead4be8f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 22 Jan 2026 20:17:44 +0000 Subject: [PATCH 17/47] Update synapse/config/workers.py Co-authored-by: Eric Eastwood --- synapse/config/workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 42f11b15d8f..0a9f3aeba76 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -127,7 +127,7 @@ class WriterLocations: """Specifies the instances that write various streams. Attributes: - events: The instances that write to the event, backfill and sticky events streams. + events: The instances that write to the event, backfill and `sticky_events` streams. typing: The instances that write to the typing stream. Currently can only be a single instance. to_device: The instances that write to the to_device stream. Currently From 1353ec341e48df738d28655ad0b2da8d6243adfe Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 12:51:28 +0000 Subject: [PATCH 18/47] Comments on constants --- synapse/api/constants.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index a478846f52d..7a42b1564ed 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -384,15 +384,37 @@ class ProfileFields: class StickyEventField(TypedDict): + """ + Dict content of the `sticky` part of an event. + """ + duration_ms: int class StickyEvent: QUERY_PARAM_NAME: Final = "org.matrix.msc4354.sticky_duration_ms" + """ + Query parameter used by clients for setting the sticky duration of an event they are sending. + + Applies to: + - /rooms/.../send/... + - /rooms/.../state/... + """ + FIELD_NAME: Final = "msc4354_sticky" + """ + Name of the field in the top-level event dict that contains the sticky event dict. + """ + MAX_DURATION: Duration = Duration(hours=1) """ Maximum stickiness duration as specified in MSC4354. Ensures that data in the /sync response can go down and not grow unbounded. """ + MAX_EVENTS_IN_SYNC: Final = 100 + """ + Maximum number of sticky events to include in /sync. + + This is the default specified in the MSC. Chosen arbitrarily. + """ From a424310d7e18ed7b555d7c5941594fe4b30714b1 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 12:51:43 +0000 Subject: [PATCH 19/47] FIELD_NAME -> EVENT_FIELD_NAME --- synapse/api/constants.py | 2 +- synapse/events/__init__.py | 2 +- synapse/events/builder.py | 4 ++-- synapse/handlers/delayed_events.py | 2 +- synapse/rest/client/room.py | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 7a42b1564ed..b8ef5dac50f 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -401,7 +401,7 @@ class StickyEvent: - /rooms/.../state/... """ - FIELD_NAME: Final = "msc4354_sticky" + EVENT_FIELD_NAME: Final = "msc4354_sticky" """ Name of the field in the top-level event dict that contains the sticky event dict. """ diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index f8f388c3427..9ef0334167f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -332,7 +332,7 @@ def sticky_duration(self) -> Duration | None: Clamps the sticky duration to the maximum allowed duration. """ - sticky_obj = self.get_dict().get(StickyEvent.FIELD_NAME, None) + sticky_obj = self.get_dict().get(StickyEvent.EVENT_FIELD_NAME, None) if type(sticky_obj) is not dict: return None sticky_duration_ms = sticky_obj.get("duration_ms", None) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 1490aa57527..2cd1bf6106f 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -274,7 +274,7 @@ async def build( event_dict["origin_server_ts"] = self._origin_server_ts if self.sticky is not None: - event_dict[StickyEvent.FIELD_NAME] = self.sticky + event_dict[StickyEvent.EVENT_FIELD_NAME] = self.sticky return create_local_event_from_event_dict( clock=self._clock, @@ -325,7 +325,7 @@ def for_room_version( unsigned=key_values.get("unsigned", {}), redacts=key_values.get("redacts", None), origin_server_ts=key_values.get("origin_server_ts", None), - sticky=key_values.get(StickyEvent.FIELD_NAME, None), + sticky=key_values.get(StickyEvent.EVENT_FIELD_NAME, None), ) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index a9b32b6fb03..c667470a825 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -575,7 +575,7 @@ async def _send_event( if event.state_key is not None: event_dict["state_key"] = event.state_key if event.sticky_duration_ms is not None: - event_dict[StickyEvent.FIELD_NAME] = { + event_dict[StickyEvent.EVENT_FIELD_NAME] = { "duration_ms": event.sticky_duration_ms, } ( diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index d09425a1869..0797d08d763 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -375,7 +375,7 @@ async def on_PUT( "sender": requester.user.to_string(), } if sticky_duration_ms is not None: - event_dict[StickyEvent.FIELD_NAME] = { + event_dict[StickyEvent.EVENT_FIELD_NAME] = { "duration_ms": sticky_duration_ms, } @@ -463,7 +463,7 @@ async def _do( event_dict["origin_server_ts"] = origin_server_ts if sticky_duration_ms is not None: - event_dict[StickyEvent.FIELD_NAME] = { + event_dict[StickyEvent.EVENT_FIELD_NAME] = { "duration_ms": sticky_duration_ms, } From 2522c38f09056e67fcdc1c9852f8ab17074253ef Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 12:59:17 +0000 Subject: [PATCH 20/47] Docstring explain stream writers for events and sticky_events --- synapse/config/workers.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 0a9f3aeba76..996be88cb26 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -128,6 +128,8 @@ class WriterLocations: Attributes: events: The instances that write to the event, backfill and `sticky_events` streams. + (`sticky_events` is written to during event persistence so must be handled by the + same stream writers.) typing: The instances that write to the typing stream. Currently can only be a single instance. to_device: The instances that write to the to_device stream. Currently From 450d5ee6485a1b4304ecaacf0bf9847c44cdfead Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:04:37 +0000 Subject: [PATCH 21/47] Make explanation not double negative --- synapse/storage/databases/main/sticky_events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index f12ff96e23a..92013e36fc4 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -36,9 +36,9 @@ logger = logging.getLogger(__name__) # Remove entries from the sticky_events table at this frequency. -# Note: this does NOT mean we don't honour shorter expiration timeouts. -# Consumers call 'get_sticky_events_in_rooms' which has `WHERE expires_at > ?` -# to filter out expired sticky events that have yet to be deleted. +# Note: don't be misled, we still honour shorter expiration timeouts, +# because readers of the sticky_events table filter out expired sticky events +# themselves, even if they aren't deleted from the table yet. DELETE_EXPIRED_STICKY_EVENTS_INTERVAL = Duration(hours=1) From 34d23efb86ba46df6c1232ac2c025523895d3cce Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:07:32 +0000 Subject: [PATCH 22/47] Comment why 1 hour --- synapse/storage/databases/main/sticky_events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 92013e36fc4..a7fde3ca10f 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -39,6 +39,10 @@ # Note: don't be misled, we still honour shorter expiration timeouts, # because readers of the sticky_events table filter out expired sticky events # themselves, even if they aren't deleted from the table yet. +# +# Currently just an arbitrary choice. +# Frequent enough to clean up expired sticky events promptly, +# especially given the short cap on the lifetime of sticky events. DELETE_EXPIRED_STICKY_EVENTS_INTERVAL = Duration(hours=1) From f1ec22ba094765e9738e3830f92c5230c4bc9ef3 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:15:51 +0000 Subject: [PATCH 23/47] Stagger start of cleanup looping calls --- synapse/storage/databases/main/sticky_events.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index a7fde3ca10f..36c9d092e7b 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -11,6 +11,7 @@ # See the GNU Affero General Public License for more details: # . import logging +import random from typing import ( TYPE_CHECKING, cast, @@ -61,8 +62,18 @@ def __init__( # Technically this means we will cleanup N times, once per event persister, maybe put on master? if self._can_write_to_sticky_events: - self.clock.looping_call( - self._run_background_cleanup, DELETE_EXPIRED_STICKY_EVENTS_INTERVAL + # Start a looping call to clean up the sticky_events table + # Because this will run once per event persister, + # randomly stagger the initial time so that they don't all + # coincide with each other if the workers are deployed at the + # same time. + # It's not critical, this is just best-effort. + self.clock.call_later( + # random() is 0.0 to 1.0 + DELETE_EXPIRED_STICKY_EVENTS_INTERVAL * random.random(), + self.clock.looping_call, + self._run_background_cleanup, + DELETE_EXPIRED_STICKY_EVENTS_INTERVAL, ) self._sticky_events_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator( From d28f17ace5241be71f433ff103d47b990f12fffa Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:17:05 +0000 Subject: [PATCH 24/47] Specify bounds --- synapse/storage/databases/main/sticky_events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 36c9d092e7b..507f59386d3 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -113,6 +113,8 @@ async def get_updated_sticky_events( ) -> list[tuple[int, str, str, bool]]: """Get updates to sticky events between two stream IDs. + Bounds: from_id < ... <= to_id + Args: from_id: The starting stream ID (exclusive) to_id: The ending stream ID (inclusive) From 33f953e141d83e28398b1892da5b77bdc665d40c Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:21:46 +0000 Subject: [PATCH 25/47] Introduce StickyEventUpdate dataclass --- synapse/replication/tcp/streams/_base.py | 6 ++--- .../storage/databases/main/sticky_events.py | 26 +++++++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 336b50160b5..1ea6b4fa857 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -797,11 +797,11 @@ async def _update_function( ) rows = [ ( - stream_id, + update.stream_id, # These are the args to `StickyEventsStreamRow` - (room_id, event_id), + (update.room_id, update.event_id), ) - for stream_id, room_id, event_id, _ in updates + for update in updates ] if not rows: diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 507f59386d3..54e82654432 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -12,9 +12,9 @@ # . import logging import random +from dataclasses import dataclass from typing import ( TYPE_CHECKING, - cast, ) from twisted.internet.defer import Deferred @@ -47,6 +47,14 @@ DELETE_EXPIRED_STICKY_EVENTS_INTERVAL = Duration(hours=1) +@dataclass(frozen=True) +class StickyEventUpdate: + stream_id: int + room_id: str + event_id: str + soft_failed: bool + + class StickyEventsWorkerStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): def __init__( self, @@ -110,7 +118,7 @@ def get_sticky_events_stream_id_generator(self) -> MultiWriterIdGenerator: async def get_updated_sticky_events( self, from_id: int, to_id: int, limit: int - ) -> list[tuple[int, str, str, bool]]: + ) -> list[StickyEventUpdate]: """Get updates to sticky events between two stream IDs. Bounds: from_id < ... <= to_id @@ -121,7 +129,7 @@ async def get_updated_sticky_events( limit: The maximum number of rows to return Returns: - list of (stream_id, room_id, event_id, soft_failed) tuples + list of StickyEventUpdate update rows """ return await self.db_pool.runInteraction( "get_updated_sticky_events", @@ -133,7 +141,7 @@ async def get_updated_sticky_events( def _get_updated_sticky_events_txn( self, txn: LoggingTransaction, from_id: int, to_id: int, limit: int - ) -> list[tuple[int, str, str, bool]]: + ) -> list[StickyEventUpdate]: txn.execute( """ SELECT stream_id, room_id, event_id, soft_failed @@ -143,7 +151,15 @@ def _get_updated_sticky_events_txn( """, (from_id, to_id, limit), ) - return cast(list[tuple[int, str, str, bool]], txn.fetchall()) + return [ + StickyEventUpdate( + stream_id=stream_id, + room_id=room_id, + event_id=event_id, + soft_failed=soft_failed, + ) + for stream_id, room_id, event_id, soft_failed in txn + ] def insert_sticky_events_txn( self, From 5f8f30245224f97e8e8eb8bd028dfce36f4b6d8b Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:25:44 +0000 Subject: [PATCH 26/47] Remove stale log --- synapse/storage/databases/main/sticky_events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 54e82654432..f423fd77ccd 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -235,7 +235,6 @@ def insert_sticky_events_txn( ) async def _delete_expired_sticky_events(self) -> None: - logger.info("delete_expired_sticky_events") await self.db_pool.runInteraction( "_delete_expired_sticky_events", self._delete_expired_sticky_events_txn, From 015819eb8534c0340e28c4dd8e62929f99662adf Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:29:40 +0000 Subject: [PATCH 27/47] Docstring on delete_expired_sticky_events --- synapse/storage/databases/main/sticky_events.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index f423fd77ccd..d80ed2249f8 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -244,6 +244,13 @@ async def _delete_expired_sticky_events(self) -> None: def _delete_expired_sticky_events_txn( self, txn: LoggingTransaction, now: int ) -> None: + """ + From the `sticky_events` table, deletes all entries whose expiry is in the past + (older than `now`). + + This is fine because we don't consider the events as sticky anymore when that's + happened. + """ txn.execute( """ DELETE FROM sticky_events WHERE expires_at < ? From 3c51231cd5e03c7009e828666dd8d4f5a2de2fdb Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 13:34:24 +0000 Subject: [PATCH 28/47] Use StickyEventField --- synapse/handlers/delayed_events.py | 8 ++++---- synapse/rest/client/room.py | 20 +++++++++++++------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/delayed_events.py b/synapse/handlers/delayed_events.py index c667470a825..7e41716f1e2 100644 --- a/synapse/handlers/delayed_events.py +++ b/synapse/handlers/delayed_events.py @@ -17,7 +17,7 @@ from twisted.internet.interfaces import IDelayedCall -from synapse.api.constants import EventTypes, StickyEvent +from synapse.api.constants import EventTypes, StickyEvent, StickyEventField from synapse.api.errors import ShadowBanError, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME @@ -575,9 +575,9 @@ async def _send_event( if event.state_key is not None: event_dict["state_key"] = event.state_key if event.sticky_duration_ms is not None: - event_dict[StickyEvent.EVENT_FIELD_NAME] = { - "duration_ms": event.sticky_duration_ms, - } + event_dict[StickyEvent.EVENT_FIELD_NAME] = StickyEventField( + duration_ms=event.sticky_duration_ms + ) ( sent_event, _, diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 0797d08d763..9172bfcb4ed 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -34,7 +34,13 @@ from twisted.web.server import Request from synapse import event_auth -from synapse.api.constants import Direction, EventTypes, Membership, StickyEvent +from synapse.api.constants import ( + Direction, + EventTypes, + Membership, + StickyEvent, + StickyEventField, +) from synapse.api.errors import ( AuthError, Codes, @@ -375,9 +381,9 @@ async def on_PUT( "sender": requester.user.to_string(), } if sticky_duration_ms is not None: - event_dict[StickyEvent.EVENT_FIELD_NAME] = { - "duration_ms": sticky_duration_ms, - } + event_dict[StickyEvent.EVENT_FIELD_NAME] = StickyEventField( + duration_ms=sticky_duration_ms + ) if state_key is not None: event_dict["state_key"] = state_key @@ -463,9 +469,9 @@ async def _do( event_dict["origin_server_ts"] = origin_server_ts if sticky_duration_ms is not None: - event_dict[StickyEvent.EVENT_FIELD_NAME] = { - "duration_ms": sticky_duration_ms, - } + event_dict[StickyEvent.EVENT_FIELD_NAME] = StickyEventField( + duration_ms=sticky_duration_ms + ) try: ( From 11378acf14a3af31fb53599277190fc6cf4592d2 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 14:18:16 +0000 Subject: [PATCH 29/47] Remove un-soft-failing related handling --- synapse/storage/databases/main/events_worker.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e13f807148f..ae6ee50dc24 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -68,10 +68,6 @@ wrap_as_background_process, ) from synapse.replication.tcp.streams import BackfillStream, UnPartialStatedEventStream -from synapse.replication.tcp.streams._base import ( - StickyEventsStream, - StickyEventsStreamRow, -) from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.partial_state import UnPartialStatedEventStreamRow from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -463,11 +459,6 @@ def process_replication_rows( # If the partial-stated event became rejected or unrejected # when it wasn't before, we need to invalidate this cache. self._invalidate_local_get_event_cache(row.event_id) - elif stream_name == StickyEventsStream.NAME: - for row in rows: - assert isinstance(row, StickyEventsStreamRow) - # In case soft-failure status changed, invalidate the cache. - self._invalidate_local_get_event_cache(row.event_id) super().process_replication_rows(stream_name, instance_name, token, rows) From db9504fb558353495c6715d848bd254d6e3a576b Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Mon, 26 Jan 2026 16:32:24 +0000 Subject: [PATCH 30/47] Add tracking issue --- synapse/config/experimental.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index a1a537545d7..0d45a44b58e 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -598,6 +598,7 @@ def read_config( self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False) # MSC4354: Sticky Events + # Tracked in: https://github.com/element-hq/synapse/issues/19409 self.msc4354_enabled: bool = experimental.get("msc4354_enabled", False) # MSC4380: Invite blocking From ae5f1c512598373b39fb42156611b1c9fa09ff7a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 27 Jan 2026 14:25:48 +0000 Subject: [PATCH 31/47] Don't denormalise `soft_failed` as it could become mutable --- synapse/_scripts/synapse_port_db.py | 1 - .../storage/databases/main/sticky_events.py | 21 ++++++++++++------- .../schema/main/delta/93/01_sticky_events.sql | 11 +++------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 4fa3def624e..79b2a0c528e 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -132,7 +132,6 @@ "has_known_state", "is_encrypted", ], - "sticky_events": ["soft_failed"], "thread_subscriptions": ["subscribed", "automatic"], "users": ["shadow_banned", "approved", "locked", "suspended"], "un_partial_stated_event_stream": ["rejection_status_changed"], diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index d80ed2249f8..87a8e6e2560 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -28,6 +28,7 @@ ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore +from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.duration import Duration @@ -117,7 +118,7 @@ def get_sticky_events_stream_id_generator(self) -> MultiWriterIdGenerator: return self._sticky_events_id_gen async def get_updated_sticky_events( - self, from_id: int, to_id: int, limit: int + self, *, from_id: int, to_id: int, limit: int ) -> list[StickyEventUpdate]: """Get updates to sticky events between two stream IDs. @@ -142,21 +143,29 @@ async def get_updated_sticky_events( def _get_updated_sticky_events_txn( self, txn: LoggingTransaction, from_id: int, to_id: int, limit: int ) -> list[StickyEventUpdate]: + if isinstance(self.database_engine, PostgresEngine): + expr_soft_failed = "COALESCE(((ej.internal_metadata::jsonb)->>'soft_failed')::boolean, FALSE)" + else: + expr_soft_failed = "COALESCE(ej.internal_metadata->>'soft_failed', FALSE)" + txn.execute( - """ - SELECT stream_id, room_id, event_id, soft_failed - FROM sticky_events + f""" + SELECT se.stream_id, se.room_id, se.event_id, + {expr_soft_failed} AS "soft_failed" + FROM sticky_events se + INNER JOIN event_json ej USING (event_id) WHERE ? < stream_id AND stream_id <= ? LIMIT ? """, (from_id, to_id, limit), ) + return [ StickyEventUpdate( stream_id=stream_id, room_id=room_id, event_id=event_id, - soft_failed=soft_failed, + soft_failed=bool(soft_failed), ) for stream_id, room_id, event_id, soft_failed in txn ] @@ -217,7 +226,6 @@ def insert_sticky_events_txn( "event_stream_ordering", "sender", "expires_at", - "soft_failed", ), values=[ ( @@ -228,7 +236,6 @@ def insert_sticky_events_txn( ev.internal_metadata.stream_ordering, ev.sender, expires_at, - ev.internal_metadata.is_soft_failed(), ) for (ev, expires_at), stream_id in sticky_events_with_ids ], diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index bcd44060bcd..52afe734558 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -36,16 +36,11 @@ CREATE TABLE sticky_events ( sender TEXT NOT NULL, -- When the sticky event expires, in milliseconds since the Unix epoch. - expires_at BIGINT NOT NULL, - - -- Whether the event is soft-failed. - -- Denormalised for performance when we want to re-evaluate the soft-failed state of sticky events. - soft_failed BOOLEAN NOT NULL + expires_at BIGINT NOT NULL ); --- 1. For pulling out sticky events by room at send time, obeying stream ordering range limits. --- 2. For pulling out soft failed events by room -CREATE INDEX sticky_events_room_idx ON sticky_events (room_id, event_stream_ordering, soft_failed); +-- For pulling out sticky events by room at send time, obeying stream ordering range limits. +CREATE INDEX sticky_events_room_idx ON sticky_events (room_id, event_stream_ordering); -- A optional integer for combining sticky events with delayed events. Used at send time. ALTER TABLE delayed_events ADD COLUMN sticky_duration_ms BIGINT; From 65273730e69b9aab66a2850625cbd1c741f9719f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 27 Jan 2026 16:29:39 +0000 Subject: [PATCH 32/47] Add tests for the sticky events store methods --- tests/storage/test_sticky_events.py | 154 ++++++++++++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 tests/storage/test_sticky_events.py diff --git a/tests/storage/test_sticky_events.py b/tests/storage/test_sticky_events.py new file mode 100644 index 00000000000..d08b622ac2e --- /dev/null +++ b/tests/storage/test_sticky_events.py @@ -0,0 +1,154 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2026 Element Creations Ltd. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +from twisted.internet.testing import MemoryReactor + +from synapse.api.constants import EventTypes +from synapse.rest import admin +from synapse.rest.client import login, register, room +from synapse.server import HomeServer +from synapse.types import JsonDict +from synapse.util.clock import Clock +from synapse.util.duration import Duration + +from tests import unittest + + +class StickyEventsTestCase(unittest.HomeserverTestCase): + servlets = [ + room.register_servlets, + login.register_servlets, + register.register_servlets, + admin.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + config["experimental_features"] = {"msc4354_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = self.hs.get_datastores().main + + # Register an account and create a room + self.user_id = self.register_user("user", "pass") + self.token = self.login(self.user_id, "pass") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.token) + + def test_get_updated_sticky_events(self) -> None: + """Test getting updated sticky events between stream IDs.""" + # Get the starting stream_id + start_id = self.store.get_max_sticky_events_stream_id() + + event_id_1 = self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(minutes=1), + content={"body": "message 1", "msgtype": "m.text"}, + tok=self.token, + )["event_id"] + + event_id_2 = self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(minutes=1), + content={"body": "message 2", "msgtype": "m.text"}, + tok=self.token, + )["event_id"] + + # Get all updates + updates = self.get_success( + self.store.get_updated_sticky_events( + from_id=start_id, to_id=start_id + 2, limit=10 + ) + ) + self.assertEqual(len(updates), 2) + self.assertEqual(updates[0].event_id, event_id_1) + self.assertEqual(updates[0].soft_failed, False) + self.assertEqual(updates[1].event_id, event_id_2) + self.assertEqual(updates[1].soft_failed, False) + + # Get only the second update + updates = self.get_success( + self.store.get_updated_sticky_events( + from_id=start_id + 1, to_id=start_id + 2, limit=10 + ) + ) + self.assertEqual(len(updates), 1) + self.assertEqual(updates[0].event_id, event_id_2) + self.assertEqual(updates[0].soft_failed, False) + + def test_delete_expired_sticky_events(self) -> None: + """Test deletion of expired sticky events.""" + # Get the starting stream_id + start_id = self.store.get_max_sticky_events_stream_id() + + # Insert an expired event by advancing time past its duration + self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(milliseconds=1), + content={"body": "expired message", "msgtype": "m.text"}, + tok=self.token, + ) + self.reactor.advance(0.002) + + # Insert a non-expired event + event_id_2 = self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(minutes=1), + content={"body": "non-expired message", "msgtype": "m.text"}, + tok=self.token, + )["event_id"] + + # Delete expired events + self.get_success(self.store._delete_expired_sticky_events()) + + # Check that only the non-expired event remains + updates = self.get_success( + self.store.get_updated_sticky_events( + from_id=start_id, to_id=start_id + 2, limit=10 + ) + ) + self.assertEqual(len(updates), 1) + self.assertEqual(updates[0].event_id, event_id_2) + + def test_get_updated_sticky_events_with_limit(self) -> None: + """Test that the limit parameter works correctly.""" + # Get the starting stream_id + start_id = self.store.get_max_sticky_events_stream_id() + + event_id_1 = self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(minutes=1), + content={"body": "message 1", "msgtype": "m.text"}, + tok=self.token, + )["event_id"] + + self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(minutes=1), + content={"body": "message 2", "msgtype": "m.text"}, + tok=self.token, + ) + + # Get only the first update + updates = self.get_success( + self.store.get_updated_sticky_events( + from_id=start_id, to_id=start_id + 2, limit=1 + ) + ) + self.assertEqual(len(updates), 1) + self.assertEqual(updates[0].event_id, event_id_1) From d80d2c4efd18e2ef236eef1a9d032059fc3ddffb Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 27 Jan 2026 16:35:37 +0000 Subject: [PATCH 33/47] Describe what types of events do and don't live in sticky_events table --- synapse/storage/databases/main/sticky_events.py | 11 +++++++++++ .../storage/schema/main/delta/93/01_sticky_events.sql | 4 ++++ 2 files changed, 15 insertions(+) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 87a8e6e2560..d76cfce6307 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -175,6 +175,17 @@ def insert_sticky_events_txn( txn: LoggingTransaction, events: list[EventBase], ) -> None: + """ + Insert events into the sticky_events table. + + Skips inserting events: + - if they are considered spammy by the policy server; + (unsure if correct, track: https://github.com/matrix-org/matrix-spec-proposals/pull/4354#discussion_r2727593350) + - if they are rejected; + - if they are outliers (they should be reconsidered for insertion when de-outliered); or + - if they are not sticky (e.g. if the stickiness expired). + """ + now_ms = self.clock.time_msec() # event, expires_at sticky_events: list[tuple[EventBase, int]] = [] diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index 52afe734558..88dc4b4b515 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -11,6 +11,10 @@ -- See the GNU Affero General Public License for more details: -- . +-- Tracks sticky events. +-- Excludes 'polisy_server_spammy' events, outliers, rejected events. +-- May contain sticky events that have expired since being inserted, +-- although they will be periodically cleaned up in the background. CREATE TABLE sticky_events ( -- Position in the sticky events stream stream_id INTEGER NOT NULL PRIMARY KEY, From e3a8f19a30f2bac568116791d79510aad2272add Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 27 Jan 2026 16:40:37 +0000 Subject: [PATCH 34/47] Note that events persisted before enablement won't be considered sticky --- synapse/config/experimental.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 0d45a44b58e..71b91e8b3ab 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -599,6 +599,8 @@ def read_config( # MSC4354: Sticky Events # Tracked in: https://github.com/element-hq/synapse/issues/19409 + # Note that sticky events persisted before this feature is enabled will not be + # considered sticky by the local homeserver. self.msc4354_enabled: bool = experimental.get("msc4354_enabled", False) # MSC4380: Invite blocking From f42af6a9be10f519ce9e70935d304087abdb02dc Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 29 Jan 2026 14:16:06 +0000 Subject: [PATCH 35/47] Update synapse/storage/schema/main/delta/93/01_sticky_events.sql Co-authored-by: Eric Eastwood --- synapse/storage/schema/main/delta/93/01_sticky_events.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index 88dc4b4b515..6e023e3fdf5 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -12,7 +12,7 @@ -- . -- Tracks sticky events. --- Excludes 'polisy_server_spammy' events, outliers, rejected events. +-- Excludes 'policy_server_spammy' events, outliers, rejected events. -- May contain sticky events that have expired since being inserted, -- although they will be periodically cleaned up in the background. CREATE TABLE sticky_events ( From 3be0e879ecdf3764b0e12be6fa8162cf146b783c Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:36:32 +0000 Subject: [PATCH 36/47] tests: just get the stream IDs between insertions --- tests/storage/test_sticky_events.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/storage/test_sticky_events.py b/tests/storage/test_sticky_events.py index d08b622ac2e..65df7870013 100644 --- a/tests/storage/test_sticky_events.py +++ b/tests/storage/test_sticky_events.py @@ -57,6 +57,8 @@ def test_get_updated_sticky_events(self) -> None: tok=self.token, )["event_id"] + mid_id = self.store.get_max_sticky_events_stream_id() + event_id_2 = self.helper.send_sticky_event( self.room_id, EventTypes.Message, @@ -65,10 +67,12 @@ def test_get_updated_sticky_events(self) -> None: tok=self.token, )["event_id"] + end_id = self.store.get_max_sticky_events_stream_id() + # Get all updates updates = self.get_success( self.store.get_updated_sticky_events( - from_id=start_id, to_id=start_id + 2, limit=10 + from_id=start_id, to_id=end_id, limit=10 ) ) self.assertEqual(len(updates), 2) @@ -79,9 +83,7 @@ def test_get_updated_sticky_events(self) -> None: # Get only the second update updates = self.get_success( - self.store.get_updated_sticky_events( - from_id=start_id + 1, to_id=start_id + 2, limit=10 - ) + self.store.get_updated_sticky_events(from_id=mid_id, to_id=end_id, limit=10) ) self.assertEqual(len(updates), 1) self.assertEqual(updates[0].event_id, event_id_2) From efc08d2128dfb9e4c1df2cf66cd9bc3576733776 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:37:17 +0000 Subject: [PATCH 37/47] tests: Read raw table for checking expired sticky events are deleted --- tests/storage/test_sticky_events.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/tests/storage/test_sticky_events.py b/tests/storage/test_sticky_events.py index 65df7870013..bebfba0c7c5 100644 --- a/tests/storage/test_sticky_events.py +++ b/tests/storage/test_sticky_events.py @@ -91,9 +91,6 @@ def test_get_updated_sticky_events(self) -> None: def test_delete_expired_sticky_events(self) -> None: """Test deletion of expired sticky events.""" - # Get the starting stream_id - start_id = self.store.get_max_sticky_events_stream_id() - # Insert an expired event by advancing time past its duration self.helper.send_sticky_event( self.room_id, @@ -113,17 +110,23 @@ def test_delete_expired_sticky_events(self) -> None: tok=self.token, )["event_id"] + end_id = self.store.get_max_sticky_events_stream_id() + # Delete expired events self.get_success(self.store._delete_expired_sticky_events()) # Check that only the non-expired event remains - updates = self.get_success( - self.store.get_updated_sticky_events( - from_id=start_id, to_id=start_id + 2, limit=10 + sticky_events = self.get_success( + self.store.db_pool.simple_select_list( + table="sticky_events", keyvalues=None, retcols=("stream_id", "event_id") ) ) - self.assertEqual(len(updates), 1) - self.assertEqual(updates[0].event_id, event_id_2) + self.assertEqual( + sticky_events, + [ + (end_id, event_id_2), + ], + ) def test_get_updated_sticky_events_with_limit(self) -> None: """Test that the limit parameter works correctly.""" From 5a5efc50df4b6ae3dc856194561f9199a662e0ee Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:39:43 +0000 Subject: [PATCH 38/47] Comment why we min(origin_server_ts, now_ms) --- synapse/visibility.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/visibility.py b/synapse/visibility.py index 28fb319e6c8..5ba2a14a24a 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -242,6 +242,9 @@ def allowed(event: EventBase) -> EventBase | None: if sticky_duration: now_ms = storage.main.clock.time_msec() expires_at = ( + # min() ensures that the origin server can't lie about the time and + # send the event 'in the future', as that would allow them to exceed + # the 1 hour limit on stickiness duration. min(cloned.origin_server_ts, now_ms) + sticky_duration.as_millis() ) if expires_at > now_ms: From d83656a8d652c86adf37181e20816391142500f2 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:51:25 +0000 Subject: [PATCH 39/47] Explain skip rationale --- synapse/storage/databases/main/sticky_events.py | 9 +++++++++ .../storage/schema/main/delta/93/01_sticky_events.sql | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index d76cfce6307..a7d5351cd4c 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -184,6 +184,15 @@ def insert_sticky_events_txn( - if they are rejected; - if they are outliers (they should be reconsidered for insertion when de-outliered); or - if they are not sticky (e.g. if the stickiness expired). + + Skipping the insertion of these types of 'invalid' events is useful for performance reasons because + they would fill up the table yet we wouldn't show them to clients anyway. + + Since syncing clients can't (easily?) 'skip over' sticky events (due to being in-order, reliably delivered), + tracking loads of invalid events in the table could make it expensive for servers to retrieve the sticky events that are actually valid. + + For instance, someone spamming 1000s of rejected or 'policy_server_spammy' events could clog up this table in a way that means we either + have to deliver empty payloads to syncing clients, or consider substantially more than 100 events in order to gather a 100-sized batch to send down. """ now_ms = self.clock.time_msec() diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index 6e023e3fdf5..f92336c21bd 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -13,6 +13,16 @@ -- Tracks sticky events. -- Excludes 'policy_server_spammy' events, outliers, rejected events. +-- +-- Skipping the insertion of these types of 'invalid' events is useful for performance reasons because +-- they would fill up the table yet we wouldn't show them to clients anyway. +-- +-- Since syncing clients can't (easily?) 'skip over' sticky events (due to being in-order, reliably delivered), +-- tracking loads of invalid events in the table could make it expensive for servers to retrieve the sticky events that are actually valid. +-- +-- For instance, someone spamming 1000s of rejected or 'policy_server_spammy' events could clog up this table in a way that means we either +-- have to deliver empty payloads to syncing clients, or consider substantially more than 100 events in order to gather a 100-sized batch to send down. +-- -- May contain sticky events that have expired since being inserted, -- although they will be periodically cleaned up in the background. CREATE TABLE sticky_events ( From cf162930992c4e5c545f174769b5f85cd4901b0f Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:53:19 +0000 Subject: [PATCH 40/47] Expand comment on staggering --- synapse/storage/databases/main/sticky_events.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index a7d5351cd4c..2461e1be6d5 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -71,12 +71,21 @@ def __init__( # Technically this means we will cleanup N times, once per event persister, maybe put on master? if self._can_write_to_sticky_events: - # Start a looping call to clean up the sticky_events table - # Because this will run once per event persister, + # Start a looping call to clean up the `sticky_events` table + # + # Because this will run once per event persister (for now), # randomly stagger the initial time so that they don't all # coincide with each other if the workers are deployed at the - # same time. - # It's not critical, this is just best-effort. + # same time. This allows each cleanup to be somewhat more effective + # than if they all started at the same time, as they would all be + # cleaning up the same thing whereas each worker gets to clean up a little + # throughout the hour when they're staggered. + # + # Concurrent execution of the same deletions could also lead to + # repeatable serialisation violations in the database transaction, + # meaning we'd have to retry the transaction several times. + # + # This staggering is not critical, it's just best-effort. self.clock.call_later( # random() is 0.0 to 1.0 DELETE_EXPIRED_STICKY_EVENTS_INTERVAL * random.random(), From b0693b5f71963a81ff59dd91ec2844017fad41e7 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:54:22 +0000 Subject: [PATCH 41/47] bool subclass of int clarification --- synapse/events/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 9ef0334167f..e6162997dd7 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -337,7 +337,8 @@ def sticky_duration(self) -> Duration | None: return None sticky_duration_ms = sticky_obj.get("duration_ms", None) # MSC: Clamp to 0 and MAX_DURATION (1 hour) - # `type(..) is int` needed to avoid accepting bool + # We use `type(...) is int` to avoid accepting bools as `isinstance(True, int)` + # (bool is a subclass of int) if type(sticky_duration_ms) is int and sticky_duration_ms >= 0: return min( Duration(milliseconds=sticky_duration_ms), From c0c78dca8d54e74b7e8d4e1987785bf299742d3d Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 13:57:56 +0000 Subject: [PATCH 42/47] Add a test for sticky events not being sticky when feature disabled --- tests/rest/client/test_sticky_events.py | 59 +++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 4 deletions(-) diff --git a/tests/rest/client/test_sticky_events.py b/tests/rest/client/test_sticky_events.py index bec5a4b6f7b..7fbb8ff2edf 100644 --- a/tests/rest/client/test_sticky_events.py +++ b/tests/rest/client/test_sticky_events.py @@ -27,8 +27,6 @@ class StickyEventsClientTestCase(unittest.HomeserverTestCase): - """Tests sticky events retrieved via the /event/ endpoint.""" - servlets = [ room.register_servlets, login.register_servlets, @@ -42,11 +40,11 @@ def default_config(self) -> JsonDict: return config def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - # Arrange: Register an account + # Register an account self.user_id = self.register_user("user1", "pass") self.token = self.login(self.user_id, "pass") - # Arrange: Create a room + # Create a room self.room_id = self.helper.create_room_as(self.user_id, tok=self.token) def _assert_event_sticky_for(self, event_id: str, sticky_ttl: int) -> None: @@ -115,3 +113,56 @@ def test_sticky_event_via_event_endpoint(self) -> None: # Advancing time any more, the event is no longer sticky self.reactor.advance(0.001) self._assert_event_not_sticky(event_id) + + +class StickyEventsDisabledClientTestCase(unittest.HomeserverTestCase): + """ + Tests client-facing behaviour of sticky events when the feature is + disabled. + """ + + servlets = [ + room.register_servlets, + login.register_servlets, + register.register_servlets, + admin.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + # Register an account + self.user_id = self.register_user("user1", "pass") + self.token = self.login(self.user_id, "pass") + + # Create a room + self.room_id = self.helper.create_room_as(self.user_id, tok=self.token) + + def _assert_event_not_sticky(self, event_id: str) -> None: + channel = self.make_request( + "GET", + f"/rooms/{self.room_id}/event/{event_id}", + access_token=self.token, + ) + + self.assertEqual( + channel.code, 200, f"could not retrieve event {event_id}: {channel.result}" + ) + event = channel.json_body + + self.assertNotIn( + EventUnsignedContentFields.STICKY_TTL, + event["unsigned"], + f"{EventUnsignedContentFields.STICKY_TTL} field unexpectedly found in {event_id}: {event}", + ) + + def test_sticky_event_via_event_endpoint(self) -> None: + sticky_event_response = self.helper.send_sticky_event( + self.room_id, + EventTypes.Message, + duration=Duration(minutes=1), + content={"body": "sticky message", "msgtype": "m.text"}, + tok=self.token, + ) + event_id = sticky_event_response["event_id"] + + # Since the feature is disabled, the event isn't sticky + self._assert_event_not_sticky(event_id) From 2b822ddf82fdb4107f8c30394664261b5509f1f5 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Tue, 3 Feb 2026 15:01:45 +0000 Subject: [PATCH 43/47] Apparently a `?` in a SQL comment causes the migration to fail :D --- synapse/storage/schema/main/delta/93/01_sticky_events.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index f92336c21bd..3f32c7d24f2 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -17,7 +17,7 @@ -- Skipping the insertion of these types of 'invalid' events is useful for performance reasons because -- they would fill up the table yet we wouldn't show them to clients anyway. -- --- Since syncing clients can't (easily?) 'skip over' sticky events (due to being in-order, reliably delivered), +-- Since syncing clients can't (easily) 'skip over' sticky events (due to being in-order, reliably delivered), -- tracking loads of invalid events in the table could make it expensive for servers to retrieve the sticky events that are actually valid. -- -- For instance, someone spamming 1000s of rejected or 'policy_server_spammy' events could clog up this table in a way that means we either From 601b1f39c19fad7bdf501ffb93ee53cd6750631a Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Wed, 4 Feb 2026 11:14:03 +0000 Subject: [PATCH 44/47] Suggested doc tweaks --- .../storage/databases/main/sticky_events.py | 18 ++++++++++-------- .../schema/main/delta/93/01_sticky_events.sql | 6 +++--- tests/rest/client/test_sticky_events.py | 6 +++++- tests/rest/client/utils.py | 4 ++++ tests/storage/test_sticky_events.py | 4 ++++ 5 files changed, 26 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 2461e1be6d5..399af2b4393 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -37,15 +37,17 @@ logger = logging.getLogger(__name__) -# Remove entries from the sticky_events table at this frequency. -# Note: don't be misled, we still honour shorter expiration timeouts, -# because readers of the sticky_events table filter out expired sticky events -# themselves, even if they aren't deleted from the table yet. -# -# Currently just an arbitrary choice. -# Frequent enough to clean up expired sticky events promptly, -# especially given the short cap on the lifetime of sticky events. DELETE_EXPIRED_STICKY_EVENTS_INTERVAL = Duration(hours=1) +""" +Remove entries from the sticky_events table at this frequency. +Note: don't be misled, we still honour shorter expiration timeouts, +because readers of the sticky_events table filter out expired sticky events +themselves, even if they aren't deleted from the table yet. + +Currently just an arbitrary choice. +Frequent enough to clean up expired sticky events promptly, +especially given the short cap on the lifetime of sticky events. +""" @dataclass(frozen=True) diff --git a/synapse/storage/schema/main/delta/93/01_sticky_events.sql b/synapse/storage/schema/main/delta/93/01_sticky_events.sql index 3f32c7d24f2..59fded5959a 100644 --- a/synapse/storage/schema/main/delta/93/01_sticky_events.sql +++ b/synapse/storage/schema/main/delta/93/01_sticky_events.sql @@ -36,17 +36,17 @@ CREATE TABLE sticky_events ( event_id TEXT NOT NULL, -- The room ID that the sticky event is in. - -- Denormalised for performance. + -- Denormalised for performance. (Safe as it's an immutable property of the event.) room_id TEXT NOT NULL, -- The stream_ordering of the event. -- Denormalised for performance since we will want to sort these by stream_ordering - -- when fetching them. + -- when fetching them. (Safe as it's an immutable property of the event.) event_stream_ordering INTEGER NOT NULL UNIQUE, -- Sender of the sticky event. -- Denormalised for performance so we can query only for sticky events originating - -- from our homeserver. + -- from our homeserver. (Safe as it's an immutable property of the event.) sender TEXT NOT NULL, -- When the sticky event expires, in milliseconds since the Unix epoch. diff --git a/tests/rest/client/test_sticky_events.py b/tests/rest/client/test_sticky_events.py index 7fbb8ff2edf..4e822246bb7 100644 --- a/tests/rest/client/test_sticky_events.py +++ b/tests/rest/client/test_sticky_events.py @@ -27,6 +27,10 @@ class StickyEventsClientTestCase(unittest.HomeserverTestCase): + """ + Tests for the client-server API parts of MSC4354: Sticky Events + """ + servlets = [ room.register_servlets, login.register_servlets, @@ -117,7 +121,7 @@ def test_sticky_event_via_event_endpoint(self) -> None: class StickyEventsDisabledClientTestCase(unittest.HomeserverTestCase): """ - Tests client-facing behaviour of sticky events when the feature is + Tests client-facing behaviour of MSC4354: Sticky Events when the feature is disabled. """ diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 886a73230f3..bfa8e6f3d84 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -466,6 +466,10 @@ def send_sticky_event( expect_code: int = HTTPStatus.OK, custom_headers: Iterable[tuple[AnyStr, AnyStr]] | None = None, ) -> JsonDict: + """ + Send an event that has a sticky duration according to MSC4354. + """ + if txn_id is None: txn_id = f"m{time.time()}" diff --git a/tests/storage/test_sticky_events.py b/tests/storage/test_sticky_events.py index bebfba0c7c5..93ca17835ab 100644 --- a/tests/storage/test_sticky_events.py +++ b/tests/storage/test_sticky_events.py @@ -24,6 +24,10 @@ class StickyEventsTestCase(unittest.HomeserverTestCase): + """ + Tests for the storage functions related to MSC4354: Sticky Events + """ + servlets = [ room.register_servlets, login.register_servlets, From 02d245f210802c4a61532248b8571a11e941bbdf Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 5 Feb 2026 11:55:35 +0000 Subject: [PATCH 45/47] Prevent starting up with feature when SQLite is too old --- .../storage/databases/main/sticky_events.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/sticky_events.py b/synapse/storage/databases/main/sticky_events.py index 399af2b4393..101306296e4 100644 --- a/synapse/storage/databases/main/sticky_events.py +++ b/synapse/storage/databases/main/sticky_events.py @@ -28,7 +28,7 @@ ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.duration import Duration @@ -110,6 +110,16 @@ def __init__( writers=hs.config.worker.writers.events, ) + if hs.config.experimental.msc4354_enabled and isinstance( + self.database_engine, Sqlite3Engine + ): + import sqlite3 + + if sqlite3.sqlite_version_info < (3, 40, 0): + raise RuntimeError( + f"Experimental MSC4354 Sticky Events enabled but SQLite3 version is too old: {sqlite3.sqlite_version_info}, must be at least 3.40. Disable MSC4354 Sticky Events, switch to Postgres, or upgrade SQLite. See https://github.com/element-hq/synapse/issues/19428" + ) + def process_replication_position( self, stream_name: str, instance_name: str, token: int ) -> None: @@ -143,6 +153,15 @@ async def get_updated_sticky_events( Returns: list of StickyEventUpdate update rows """ + + if not self.hs.config.experimental.msc4354_enabled: + # We need to prevent `_get_updated_sticky_events_txn` + # from running when MSC4354 is turned off, because the query used + # for SQLite is not compatible with Ubuntu 22.04 (as used in our CI olddeps run). + # It's technically out of support. + # See: https://github.com/element-hq/synapse/issues/19428 + return [] + return await self.db_pool.runInteraction( "get_updated_sticky_events", self._get_updated_sticky_events_txn, From 737d4823d32cffbb0c56e57f8bf93c34de235eac Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 5 Feb 2026 18:31:25 +0000 Subject: [PATCH 46/47] Skip the test on old SQLite --- tests/rest/client/test_sticky_events.py | 7 +++++++ tests/storage/test_sticky_events.py | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/tests/rest/client/test_sticky_events.py b/tests/rest/client/test_sticky_events.py index 4e822246bb7..a6e704fe8c2 100644 --- a/tests/rest/client/test_sticky_events.py +++ b/tests/rest/client/test_sticky_events.py @@ -13,6 +13,8 @@ # # +import sqlite3 + from twisted.internet.testing import MemoryReactor from synapse.api.constants import EventTypes, EventUnsignedContentFields @@ -24,6 +26,7 @@ from synapse.util.duration import Duration from tests import unittest +from tests.utils import USE_POSTGRES_FOR_TESTS class StickyEventsClientTestCase(unittest.HomeserverTestCase): @@ -31,6 +34,10 @@ class StickyEventsClientTestCase(unittest.HomeserverTestCase): Tests for the client-server API parts of MSC4354: Sticky Events """ + if not USE_POSTGRES_FOR_TESTS and sqlite3.sqlite_version_info < (3, 40, 0): + # We need the JSON functionality in SQLite + skip = f"SQLite version is too old to support sticky events: {sqlite3.sqlite_version_info} (See https://github.com/element-hq/synapse/issues/19428)" + servlets = [ room.register_servlets, login.register_servlets, diff --git a/tests/storage/test_sticky_events.py b/tests/storage/test_sticky_events.py index 93ca17835ab..a29731cc885 100644 --- a/tests/storage/test_sticky_events.py +++ b/tests/storage/test_sticky_events.py @@ -10,6 +10,8 @@ # # See the GNU Affero General Public License for more details: # . +import sqlite3 + from twisted.internet.testing import MemoryReactor from synapse.api.constants import EventTypes @@ -21,6 +23,7 @@ from synapse.util.duration import Duration from tests import unittest +from tests.utils import USE_POSTGRES_FOR_TESTS class StickyEventsTestCase(unittest.HomeserverTestCase): @@ -28,6 +31,10 @@ class StickyEventsTestCase(unittest.HomeserverTestCase): Tests for the storage functions related to MSC4354: Sticky Events """ + if not USE_POSTGRES_FOR_TESTS and sqlite3.sqlite_version_info < (3, 40, 0): + # We need the JSON functionality in SQLite + skip = f"SQLite version is too old to support sticky events: {sqlite3.sqlite_version_info} (See https://github.com/element-hq/synapse/issues/19428)" + servlets = [ room.register_servlets, login.register_servlets, From a5354771821f3407110bfb7fdcf3b76c2a670a92 Mon Sep 17 00:00:00 2001 From: Olivier 'reivilibre Date: Thu, 5 Feb 2026 18:33:56 +0000 Subject: [PATCH 47/47] Add test for de-outliering events --- tests/storage/test_sticky_events.py | 112 +++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) diff --git a/tests/storage/test_sticky_events.py b/tests/storage/test_sticky_events.py index a29731cc885..60243cb2f40 100644 --- a/tests/storage/test_sticky_events.py +++ b/tests/storage/test_sticky_events.py @@ -14,11 +14,18 @@ from twisted.internet.testing import MemoryReactor -from synapse.api.constants import EventTypes +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + StickyEvent, + StickyEventField, +) +from synapse.api.room_versions import RoomVersions from synapse.rest import admin from synapse.rest.client import login, register, room from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, create_requester from synapse.util.clock import Clock from synapse.util.duration import Duration @@ -168,3 +175,104 @@ def test_get_updated_sticky_events_with_limit(self) -> None: ) self.assertEqual(len(updates), 1) self.assertEqual(updates[0].event_id, event_id_1) + + def test_outlier_events_not_in_table(self) -> None: + """ + Tests the behaviour of outliered and then de-outliered events in the + sticky_events table: they should only be added once they are de-outliered. + """ + persist_controller = self.hs.get_storage_controllers().persistence + assert persist_controller is not None + + user1_id = self.register_user("user1", "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + start_id = self.store.get_max_sticky_events_stream_id() + + room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, room_version=RoomVersions.V10.identifier + ) + + # Create a membership event + event_dict = { + "type": EventTypes.Member, + "state_key": user1_id, + "sender": user1_id, + "room_id": room_id, + "content": {EventContentFields.MEMBERSHIP: Membership.JOIN}, + StickyEvent.EVENT_FIELD_NAME: StickyEventField( + duration_ms=Duration(hours=1).as_millis() + ), + } + + # Create the event twice: once as an outlier, once as a non-outlier. + # It's not at all obvious, but event creation before is deterministic + # (provided we don't change the forward extremities of the room!), + # so these two events are actually the same event with the same event ID. + ( + event_outlier, + unpersisted_context_outlier, + ) = self.get_success( + self.hs.get_event_creation_handler().create_event( + requester=create_requester(user1_id), + event_dict=event_dict, + outlier=True, + ) + ) + ( + event_non_outlier, + unpersisted_context_non_outlier, + ) = self.get_success( + self.hs.get_event_creation_handler().create_event( + requester=create_requester(user1_id), + event_dict=event_dict, + outlier=False, + ) + ) + + # Safety check that we're testing what we think we are + self.assertEqual(event_outlier.event_id, event_non_outlier.event_id) + + # Now persist the event as an outlier first of all + # FIXME: Should we use an `EventContext.for_outlier(...)` here? + # Doesn't seem to matter for this test. + context_outlier = self.get_success( + unpersisted_context_outlier.persist(event_outlier) + ) + self.get_success( + persist_controller.persist_event( + event_outlier, + context_outlier, + ) + ) + + # Since the event is outliered, it won't show up in the sticky_events table... + sticky_events = self.get_success( + self.store.db_pool.simple_select_list( + table="sticky_events", keyvalues=None, retcols=("stream_id", "event_id") + ) + ) + self.assertEqual(len(sticky_events), 0) + + # Now persist the event properly so that it gets de-outliered. + context_non_outlier = self.get_success( + unpersisted_context_non_outlier.persist(event_non_outlier) + ) + self.get_success( + persist_controller.persist_event( + event_non_outlier, + context_non_outlier, + ) + ) + + end_id = self.store.get_max_sticky_events_stream_id() + + # Check the event made it into the sticky_events table + updates = self.get_success( + self.store.get_updated_sticky_events( + from_id=start_id, to_id=end_id, limit=10 + ) + ) + self.assertEqual(len(updates), 1) + self.assertEqual(updates[0].event_id, event_non_outlier.event_id)