summary refs log tree commit diff
path: root/synapse/push/pusherpool.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-14 13:27:51 +0100
committerGitHub <noreply@github.com>2020-10-14 13:27:51 +0100
commit921a3f8a59da0f8fe706a22627f464a74b54c992 (patch)
tree0c1a960f010adb3156c4c0a5f6625d32d025ba80 /synapse/push/pusherpool.py
parentMake sure a retention policy is a state event (#8527) (diff)
downloadsynapse-921a3f8a59da0f8fe706a22627f464a74b54c992.tar.xz
Fix not sending events over federation when using sharded event persisters (#8536)
* Fix outbound federaion with multiple event persisters.

We incorrectly notified federation senders that the minimum persisted
stream position had advanced when we got an `RDATA` from an event
persister.

Notifying of federation senders already correctly happens in the
notifier, so we just delete the offending line.

* Change some interfaces to use RoomStreamToken.

By enforcing use of `RoomStreamTokens` we make it less likely that
people pass in random ints that they got from somewhere random.
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r--synapse/push/pusherpool.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 76150e117b..0080c68ce2 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -24,6 +24,7 @@ from synapse.push import PusherConfigException
 from synapse.push.emailpusher import EmailPusher
 from synapse.push.httppusher import HttpPusher
 from synapse.push.pusher import PusherFactory
+from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 
 if TYPE_CHECKING:
@@ -186,11 +187,16 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, max_stream_id: int):
+    async def on_new_notifications(self, max_token: RoomStreamToken):
         if not self.pushers:
             # nothing to do here.
             return
 
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_id = max_token.stream
+
         if max_stream_id < self._last_room_stream_id_seen:
             # Nothing to do
             return
@@ -214,7 +220,7 @@ class PusherPool:
 
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_notifications(max_stream_id)
+                        p.on_new_notifications(max_token)
 
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")