summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-09-09 16:56:08 +0100
committerGitHub <noreply@github.com>2020-09-09 16:56:08 +0100
commitc9dbee50aefc22390f600a0219ca7fa1ae9acd88 (patch)
tree4cdba63f3398d455aec93f9735618d86aa57072d /synapse/push
parentRevert "Fixup pusher pool notifications" (diff)
downloadsynapse-c9dbee50aefc22390f600a0219ca7fa1ae9acd88.tar.xz
Fixup pusher pool notifications (#8287)
`pusher_pool.on_new_notifications` expected a min and max stream ID, however that was not what we were passing in. Instead, let's just pass it the current max stream ID and have it track the last stream ID it got passed.

I believe that it mostly worked as we called the function for every event. However, it would break for events that got persisted out of order, i.e, that were persisted but the max stream ID wasn't incremented as not all preceding events had finished persisting, and push for that event would be delayed until another event got pushed to the effected users.
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/emailpusher.py2
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/push/pusherpool.py19
3 files changed, 18 insertions, 5 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index b7ea4438e0..28bd8ab748 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -91,7 +91,7 @@ class EmailPusher:
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+    def on_new_notifications(self, max_stream_ordering):
         if self.max_stream_ordering:
             self.max_stream_ordering = max(
                 max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f21fa9b659..26706bf3e1 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -114,7 +114,7 @@ class HttpPusher:
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+    def on_new_notifications(self, max_stream_ordering):
         self.max_stream_ordering = max(
             max_stream_ordering, self.max_stream_ordering or 0
         )
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 3c3262a88c..fa8473bf8d 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -64,6 +64,12 @@ class PusherPool:
         self._pusher_shard_config = hs.config.push.pusher_shard_config
         self._instance_name = hs.get_instance_name()
 
+        # Record the last stream ID that we were poked about so we can get
+        # changes since then. We set this to the current max stream ID on
+        # startup as every individual pusher will have checked for changes on
+        # startup.
+        self._last_room_stream_id_seen = self.store.get_room_max_stream_ordering()
+
         # map from user id to app_id:pushkey to pusher
         self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
 
@@ -178,20 +184,27 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, min_stream_id, max_stream_id):
+    async def on_new_notifications(self, max_stream_id):
         if not self.pushers:
             # nothing to do here.
             return
 
+        if max_stream_id < self._last_room_stream_id_seen:
+            # Nothing to do
+            return
+
+        prev_stream_id = self._last_room_stream_id_seen
+        self._last_room_stream_id_seen = max_stream_id
+
         try:
             users_affected = await self.store.get_push_action_users_in_range(
-                min_stream_id, max_stream_id
+                prev_stream_id, max_stream_id
             )
 
             for u in users_affected:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_notifications(min_stream_id, max_stream_id)
+                        p.on_new_notifications(max_stream_id)
 
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")