Skip to content

Update wait_for_stream_token(...) patterns and fix sync fetching with unbounded token#19644

Open
MadLittleMods wants to merge 23 commits intodevelopfrom
madlittlemods/wait_for_multi_writer_stream_token
Open

Update wait_for_stream_token(...) patterns and fix sync fetching with unbounded token#19644
MadLittleMods wants to merge 23 commits intodevelopfrom
madlittlemods/wait_for_multi_writer_stream_token

Conversation

@MadLittleMods
Copy link
Copy Markdown
Contributor

@MadLittleMods MadLittleMods commented Apr 2, 2026

Spawning from trying to find the proper way to wait for a token, see #19558 (comment)

Part of #19647

Dev notes


SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sync

Pull Request Checklist

  • Pull request is based on the develop branch
  • Pull request includes a changelog file. The entry should:
    • Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from EventStore to EventWorkerStore.".
    • Use markdown where necessary, mostly for code blocks.
    • End with either a period (.) or an exclamation mark (!).
    • Start with a capital letter.
    • Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry.
  • Code style is correct (run the linters)

Previously, we would wait for the bounded token and then still use the
unbounded `since_token` for all of the queries (flawed).
Comment thread synapse/notifier.py Outdated
# TODO: be better
await self.clock.sleep(Duration(milliseconds=500))

async def wait_for_multi_writer_stream_token(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added in order to support the use case in #19558 (comment)

Currently unused in this PR but this is concept is complicated enough to deserve its own PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that PR doesn't use a a MultiWriterStreamToken though? As it uses an int?

The nice thing with the StreamToken above is that you don't need to worry about the mapping from ID gen to token, as it happens automatically. Otherwise to type it you have to use generic params and a Protocol I think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can craft a new MultiWriterStreamToken(stream=id) with the int position as the minimum position.

Ideally, the endpoint would use MultiWriterStreamToken though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I suppose. Though by the same measure we could craft a StreamToken (with 0 for the other streams), and then we didn't have to a) have two similar functions, and b) didn't have to worry about how you ensure the token and ID generator matches.

I think that would basically look something like adding a def empty() -> StreamToken function, and then doing: stream_token = StreamToken.empty().copy_and_replace(...). Though I guess the main advantage there is purely avoiding duplicate function.

Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, this aligns with my idea that all endpoints should be using StreamToken

Removed wait_for_multi_writer_stream_token(...)

We can already do StreamToken.START.copy_and_replace(StreamKeyType.RECEIPT, receipt_token)

Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, actually. The problem is that the quarantined_media stream (the use case we want to use this for) is currently not part of the StreamToken as its not something we currently care about /sync'ing (has its own API).

But by the same logic that every endpoint should be using StreamToken, the quarantined_media stream probably should be part of the StreamToken 🤔. Handling this in #19764

This PR is still a good incremental improvement on top of everything. Any further decision can be made in a follow-up PR.

Comment thread synapse/notifier.py Outdated
Comment on lines +865 to +870
original_stream_token = stream_token
max_token = await self.event_sources.bound_future_token(stream_token)
assert stream_token.is_before_or_eq(max_token), (
f"Unable to wait for invalid future stream token (token={original_stream_token} has positions "
"ahead of our max persisted position {max_token})"
)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're just trying to assert 'stream_token is not an invalid future token'

This logic is a bit obtuse but seemed to be the easiest way to accomplish this kind of thing for StreamToken.

See wait_for_multi_writer_stream_token(...) below for a more straight-forward version.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should raise a proper exception here, and not mark its as a programming error. I think there are two legitimate causes of this:

  1. User manually fiddles with the token (unlikely but we shouldn't rule it out)
  2. The server operator has restored the DB from backup.

That last case is particularly annoying, and we should in future do better (e.g. ideally we'd automatically clear all SSS connections, etc).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token should already be sanitized by this point:

synapse/synapse/notifier.py

Lines 855 to 857 in 9cc939a

# Assert as we consider this a Synapse programming error. We shouldn't be
# handing out invalid future tokens and tokens should be validated before it
# reaches this point.

In other words, we handle sanitization (bounding or validation with better error) in the layer above.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, right. Seems odd to validate the thing twice, given bound_future_token calls multiple DB functions and is on the hot path.

I think I'd be in favour of either a) removing the check here or b) removing the check higher up and have this function do the validation.

Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to remove the assert within wait_for_stream_token(...)

This is the better way as we will sanitize/validate differently depending on the endpoint (i.e bound_future_token for /sync but raise M_UNKNOWN_POS for Sliding Sync. And to avoid people falling into bad patterns (using the unbounded token on the outside).

Comment thread synapse/notifier.py Outdated
# TODO: be better
await self.clock.sleep(Duration(milliseconds=500))

async def wait_for_multi_writer_stream_token(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on wait_for_stream_token(...) above

Comment thread synapse/handlers/sync.py
Comment on lines +416 to +424
# Work around a bug where older Synapse versions gave out tokens "from the
# future", i.e. that are ahead of the tokens persisted in the DB. This could
# also happen if a user is intentionally messing with the token so this also
# acts as sanitization/validation.
#
# If the token has positions ahead of our persisted positions in the
# database (invalid), then we simply use our max persisted position (recover
# gracefully); instead of waiting for a position that may never come around.
since_token = await self.event_sources.bound_future_token(since_token)
Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done in order to fix sync waiting for a bounded token (wait_for_stream_token(...) did the bounding previously) but using the unbounded version to fetch data. Noticed while working on adding the new wait_for_multi_writer_stream_token(...) method.

We moved the token bounding outside as it encourages people to update the token before waiting and use the updated token afterwards. Otherwise, it's too easy to carry on with the foot-guns like we had before.

Comment on lines +161 to +166
from_token = SlidingSyncStreamToken(
stream_token=await self.event_sources.bound_future_token(
from_token.stream_token
),
connection_position=from_token.connection_position,
)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concept as the fix for synapse/handlers/sync.py below. Jump down to that one first as it's simpler to understand.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if instead we should reset the connection, since this shouldn't happen and that is a clearer way of restoring things?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that sounds good but should be tackled in another PR where it can get its own dedicated place to lay out the reasoning ⏩ - I've added a FIXME comment to mark the plans

It also gets to the point behind why we tried to gracefully handle this situation for /sync in the first place? I would have gone the route of blowing up the requests so clients can just restart their sync loop. Depends if we trust clients to restart on Matrix errors like M_INVALID_PARAM 🤷 which they probably should.

In the case of Sliding Sync (which spec'ed M_UNKNOWN_POS), sending M_UNKNOWN_POS (resetting the connection) fits perfectly for this scenario to convey what we're running into 👍

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also gets to the point behind why we tried to gracefully handle this situation for /sync in the first place? I would have gone the route of blowing up the requests so clients can just restart their sync loop. Depends if we trust clients to restart on Matrix errors like M_INVALID_PARAM 🤷 which they probably should.

Yes, we didn't do this on /sync because we didn't have a mechanism to signal to clients that they should clear their cache and restart (and that is a much more invasive things to do in the v2 api)

Comment thread synapse/types/__init__.py Outdated
See #19644 (comment)

Example test failure:
```shell
$ SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sync.SyncTestCase_state.test_wait_for_invalid_future_sync_token_ROOM
tests.handlers.test_sync
  SyncTestCase_state
    test_wait_for_invalid_future_sync_token_ROOM ...                     [FAIL]

===============================================================================
[FAIL]
Traceback (most recent call last):
  File "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.14/lib/python3.14/site-packages/parameterized/parameterized.py", line 620, in standalone_func
    return func(*(a + p.args), **p.kwargs, **kw)
  File "/home/eric/Documents/github/element/synapse/tests/handlers/test_sync.py", line 1115, in test_wait_for_invalid_future_sync_token
    self.get_success(sync_d)
  File "/home/eric/Documents/github/element/synapse/tests/unittest.py", line 742, in get_success
    return self.successResultOf(deferred)
  File "/home/eric/.cache/pypoetry/virtualenvs/matrix-synapse-xCtC9ulO-py3.14/lib/python3.14/site-packages/twisted/trial/_synctest.py", line 723, in successResultOf
    self.fail(
twisted.trial.unittest.FailTest: Success result expected on <Deferred at 0x7f7579165b50>, found no result instead

tests.handlers.test_sync.SyncTestCase_state.test_wait_for_invalid_future_sync_token_ROOM
-------------------------------------------------------------------------------
Ran 1 tests in 0.163s

FAILED (failures=1)
```
Comment thread synapse/notifier.py Outdated

async def wait_for_multi_writer_stream_token(
self,
token: MultiWriterStreamToken,
Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to make the typing better here?

I want to accept token: AbstractMultiWriterStreamToken but then how do I manage the usage AbstractMultiWriterStreamToken.from_generator(id_gen) (this isn't right)? I need the type of whatever was passed in.

)

# This should block waiting for the presence stream to update
self.pump()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pump doesn't actually do anything (doesn't even advance time at all since the default is 0)

But it's good practice for us to advance time and actually stress the sleep loop to make sure we're actually waiting so I've carried that forward with self.reactor.advance(Duration(seconds=2).as_secs())

# Marking the stream ID as persisted should unblock the request.
self.get_success(ctx_mgr.__aexit__(None, None, None))

self.get_success(sync_d, by=1.0)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of relying on by=1.0, I just updated to use an explicit self.reactor.advance(Duration(seconds=1).as_secs())

Comment thread tests/test_notifier.py Outdated
Comment thread tests/test_notifier.py Outdated
Comment thread synapse/notifier.py
@MadLittleMods MadLittleMods marked this pull request as ready for review April 3, 2026 14:44
@MadLittleMods MadLittleMods requested a review from a team as a code owner April 3, 2026 14:44
@anoadragon453 anoadragon453 removed the request for review from a team May 1, 2026 11:32
@anoadragon453
Copy link
Copy Markdown
Member

Taking off the general review queue as @erikjohnston said he has an in-progress review on this one already.

@MadLittleMods MadLittleMods changed the title Add wait_for_multi_writer_stream_token(...) and fix sync fetching with unbounded token Update wait_for_stream_token(...) patterns and fix sync fetching with unbounded token May 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants