diff options
author | Erik Johnston <erik@matrix.org> | 2020-10-14 13:27:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-14 13:27:51 +0100 |
commit | 921a3f8a59da0f8fe706a22627f464a74b54c992 (patch) | |
tree | 0c1a960f010adb3156c4c0a5f6625d32d025ba80 /synapse/push/emailpusher.py | |
parent | Make sure a retention policy is a state event (#8527) (diff) | |
download | synapse-921a3f8a59da0f8fe706a22627f464a74b54c992.tar.xz |
Fix not sending events over federation when using sharded event persisters (#8536)
* Fix outbound federaion with multiple event persisters. We incorrectly notified federation senders that the minimum persisted stream position had advanced when we got an `RDATA` from an event persister. Notifying of federation senders already correctly happens in the notifier, so we just delete the offending line. * Change some interfaces to use RoomStreamToken. By enforcing use of `RoomStreamTokens` we make it less likely that people pass in random ints that they got from somewhere random.
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r-- | synapse/push/emailpusher.py | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 28bd8ab748..c6763971ee 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,6 +18,7 @@ import logging from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -91,7 +92,12 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_stream_ordering): + def on_new_notifications(self, max_token: RoomStreamToken): + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + max_stream_ordering = max_token.stream + if self.max_stream_ordering: self.max_stream_ordering = max( max_stream_ordering, self.max_stream_ordering |