diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d3d40e8682..37d5890c65 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -279,6 +279,23 @@ class SyncResult:
or self.device_lists
)
+ @staticmethod
+ def empty(next_batch: StreamToken) -> "SyncResult":
+ "Return a new empty result"
+ return SyncResult(
+ next_batch=next_batch,
+ presence=[],
+ account_data=[],
+ joined=[],
+ invited=[],
+ knocked=[],
+ archived=[],
+ to_device=[],
+ device_lists=DeviceListUpdates(),
+ device_one_time_keys_count={},
+ device_unused_fallback_key_types=[],
+ )
+
class SyncHandler:
def __init__(self, hs: "HomeServer"):
@@ -401,6 +418,24 @@ class SyncHandler:
if context:
context.tag = sync_label
+ if since_token is not None:
+ # We need to make sure this worker has caught up with the token. If
+ # this returns false it means we timed out waiting, and we should
+ # just return an empty response.
+ start = self.clock.time_msec()
+ if not await self.notifier.wait_for_stream_token(since_token):
+ logger.warning(
+ "Timed out waiting for worker to catch up. Returning empty response"
+ )
+ return SyncResult.empty(since_token)
+
+ # If we've spent significant time waiting to catch up, take it off
+ # the timeout.
+ now = self.clock.time_msec()
+ if now - start > 1_000:
+ timeout -= now - start
+ timeout = max(timeout, 0)
+
# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
if since_token is not None:
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 06ce04c800..459954caeb 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -763,8 +763,24 @@ class Notifier:
return result
- async def wait_for_stream_position(self, stream_token: StreamToken) -> None:
- pass
+ async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
+ """Wait for this worker to catch up with the given stream token."""
+
+ start = self.clock.time_msec()
+ while True:
+ current_token = self.event_sources.get_current_token()
+ if stream_token.is_before_or_eq(current_token):
+ return True
+
+ now = self.clock.time_msec()
+
+ if now - start > 10_000:
+ return False
+
+ logger.info("Waiting for current token to reach %s", stream_token)
+
+ # TODO: be better
+ await self.clock.sleep(0.5)
async def _get_room_ids(
self, user: UserID, explicit_room_id: Optional[str]
|