summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7c1cd3b5f2..c87eb748c0 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -721,7 +721,6 @@ class Notifier:
                         user.to_string(),
                         new_events,
                         is_peeking=is_peeking,
-                        msc4115_membership_on_events=self.hs.config.experimental.msc4115_membership_on_events,
                     )
                 elif keyname == StreamKeyType.PRESENCE:
                     now = self.clock.time_msec()
@@ -763,6 +762,29 @@ class Notifier:
 
         return result
 
+    async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
+        """Wait for this worker to catch up with the given stream token."""
+
+        start = self.clock.time_msec()
+        while True:
+            current_token = self.event_sources.get_current_token()
+            if stream_token.is_before_or_eq(current_token):
+                return True
+
+            now = self.clock.time_msec()
+
+            if now - start > 10_000:
+                return False
+
+            logger.info(
+                "Waiting for current token to reach %s; currently at %s",
+                stream_token,
+                current_token,
+            )
+
+            # TODO: be better
+            await self.clock.sleep(0.5)
+
     async def _get_room_ids(
         self, user: UserID, explicit_room_id: Optional[str]
     ) -> Tuple[StrCollection, bool]: