summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-30 14:03:49 +0100
committerGitHub <noreply@github.com>2024-05-30 14:03:49 +0100
commit5624c8b961ed6a8310a2c6723ae13e854721756b (patch)
treee45dfc271ed2ec540170de66ccc3534b98de1992 /synapse/handlers
parentFix deduplicating of membership events to not create unused state groups. (#1... (diff)
downloadsynapse-5624c8b961ed6a8310a2c6723ae13e854721756b.tar.xz
In sync wait for worker to catch up since token (#17215)
Otherwise things will get confused.

An alternative would be to make sure that for lagging stream we don't
return anything (and make sure the returned next_batch token doesn't go
backwards). But that is a faff.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sync.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ac5bddd52f..1d7d9dfdd0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -284,6 +284,23 @@ class SyncResult:
             or self.device_lists
         )
 
+    @staticmethod
+    def empty(next_batch: StreamToken) -> "SyncResult":
+        "Return a new empty result"
+        return SyncResult(
+            next_batch=next_batch,
+            presence=[],
+            account_data=[],
+            joined=[],
+            invited=[],
+            knocked=[],
+            archived=[],
+            to_device=[],
+            device_lists=DeviceListUpdates(),
+            device_one_time_keys_count={},
+            device_unused_fallback_key_types=[],
+        )
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class E2eeSyncResult:
@@ -497,6 +514,24 @@ class SyncHandler:
         if context:
             context.tag = sync_label
 
+        if since_token is not None:
+            # We need to make sure this worker has caught up with the token. If
+            # this returns false it means we timed out waiting, and we should
+            # just return an empty response.
+            start = self.clock.time_msec()
+            if not await self.notifier.wait_for_stream_token(since_token):
+                logger.warning(
+                    "Timed out waiting for worker to catch up. Returning empty response"
+                )
+                return SyncResult.empty(since_token)
+
+            # If we've spent significant time waiting to catch up, take it off
+            # the timeout.
+            now = self.clock.time_msec()
+            if now - start > 1_000:
+                timeout -= now - start
+                timeout = max(timeout, 0)
+
         # if we have a since token, delete any to-device messages before that token
         # (since we now know that the device has received them)
         if since_token is not None: