1 files changed, 18 insertions, 2 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 06ce04c800..459954caeb 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -763,8 +763,24 @@ class Notifier:
return result
- async def wait_for_stream_position(self, stream_token: StreamToken) -> None:
- pass
+ async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
+ """Wait for this worker to catch up with the given stream token."""
+
+ start = self.clock.time_msec()
+ while True:
+ current_token = self.event_sources.get_current_token()
+ if stream_token.is_before_or_eq(current_token):
+ return True
+
+ now = self.clock.time_msec()
+
+ if now - start > 10_000:
+ return False
+
+ logger.info("Waiting for current token to reach %s", stream_token)
+
+ # TODO: be better
+ await self.clock.sleep(0.5)
async def _get_room_ids(
self, user: UserID, explicit_room_id: Optional[str]
|