diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/sync.py | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ac5bddd52f..1d7d9dfdd0 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -284,6 +284,23 @@ class SyncResult: or self.device_lists ) + @staticmethod + def empty(next_batch: StreamToken) -> "SyncResult": + "Return a new empty result" + return SyncResult( + next_batch=next_batch, + presence=[], + account_data=[], + joined=[], + invited=[], + knocked=[], + archived=[], + to_device=[], + device_lists=DeviceListUpdates(), + device_one_time_keys_count={}, + device_unused_fallback_key_types=[], + ) + @attr.s(slots=True, frozen=True, auto_attribs=True) class E2eeSyncResult: @@ -497,6 +514,24 @@ class SyncHandler: if context: context.tag = sync_label + if since_token is not None: + # We need to make sure this worker has caught up with the token. If + # this returns false it means we timed out waiting, and we should + # just return an empty response. + start = self.clock.time_msec() + if not await self.notifier.wait_for_stream_token(since_token): + logger.warning( + "Timed out waiting for worker to catch up. Returning empty response" + ) + return SyncResult.empty(since_token) + + # If we've spent significant time waiting to catch up, take it off + # the timeout. + now = self.clock.time_msec() + if now - start > 1_000: + timeout -= now - start + timeout = max(timeout, 0) + # if we have a since token, delete any to-device messages before that token # (since we now know that the device has received them) if since_token is not None: |