diff options
author | Erik Johnston <erikj@element.io> | 2024-05-30 14:03:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-30 14:03:49 +0100 |
commit | 5624c8b961ed6a8310a2c6723ae13e854721756b (patch) | |
tree | e45dfc271ed2ec540170de66ccc3534b98de1992 /synapse/handlers | |
parent | Fix deduplicating of membership events to not create unused state groups. (#1... (diff) | |
download | synapse-5624c8b961ed6a8310a2c6723ae13e854721756b.tar.xz |
In sync wait for worker to catch up since token (#17215)
Otherwise things will get confused. An alternative would be to make sure that for lagging stream we don't return anything (and make sure the returned next_batch token doesn't go backwards). But that is a faff.
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/sync.py | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ac5bddd52f..1d7d9dfdd0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -284,6 +284,23 @@ class SyncResult: or self.device_lists ) + @staticmethod + def empty(next_batch: StreamToken) -> "SyncResult": + "Return a new empty result" + return SyncResult( + next_batch=next_batch, + presence=[], + account_data=[], + joined=[], + invited=[], + knocked=[], + archived=[], + to_device=[], + device_lists=DeviceListUpdates(), + device_one_time_keys_count={}, + device_unused_fallback_key_types=[], + ) + @attr.s(slots=True, frozen=True, auto_attribs=True) class E2eeSyncResult: @@ -497,6 +514,24 @@ class SyncHandler: if context: context.tag = sync_label + if since_token is not None: + # We need to make sure this worker has caught up with the token. If + # this returns false it means we timed out waiting, and we should + # just return an empty response. + start = self.clock.time_msec() + if not await self.notifier.wait_for_stream_token(since_token): + logger.warning( + "Timed out waiting for worker to catch up. Returning empty response" + ) + return SyncResult.empty(since_token) + + # If we've spent significant time waiting to catch up, take it off + # the timeout. + now = self.clock.time_msec() + if now - start > 1_000: + timeout -= now - start + timeout = max(timeout, 0) + # if we have a since token, delete any to-device messages before that token # (since we now know that the device has received them) if since_token is not None: |