summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py20
1 files changed, 18 insertions, 2 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py

index 06ce04c800..459954caeb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -763,8 +763,24 @@ class Notifier: return result - async def wait_for_stream_position(self, stream_token: StreamToken) -> None: - pass + async def wait_for_stream_token(self, stream_token: StreamToken) -> bool: + """Wait for this worker to catch up with the given stream token.""" + + start = self.clock.time_msec() + while True: + current_token = self.event_sources.get_current_token() + if stream_token.is_before_or_eq(current_token): + return True + + now = self.clock.time_msec() + + if now - start > 10_000: + return False + + logger.info("Waiting for current token to reach %s", stream_token) + + # TODO: be better + await self.clock.sleep(0.5) async def _get_room_ids( self, user: UserID, explicit_room_id: Optional[str]