summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-14 13:27:51 +0100
committerGitHub <noreply@github.com>2020-10-14 13:27:51 +0100
commit921a3f8a59da0f8fe706a22627f464a74b54c992 (patch)
tree0c1a960f010adb3156c4c0a5f6625d32d025ba80 /synapse/federation
parentMake sure a retention policy is a state event (#8527) (diff)
downloadsynapse-921a3f8a59da0f8fe706a22627f464a74b54c992.tar.xz
Fix not sending events over federation when using sharded event persisters (#8536)
* Fix outbound federaion with multiple event persisters.

We incorrectly notified federation senders that the minimum persisted
stream position had advanced when we got an `RDATA` from an event
persister.

Notifying of federation senders already correctly happens in the
notifier, so we just delete the offending line.

* Change some interfaces to use RoomStreamToken.

By enforcing use of `RoomStreamTokens` we make it less likely that
people pass in random ints that they got from somewhere random.
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/federation/sender/__init__.py9
2 files changed, 8 insertions, 3 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py

index 8e46957d15..5f1bf492c1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -188,7 +188,7 @@ class FederationRemoteSendQueue: for key in keys[:i]: del self.edus[key] - def notify_new_events(self, current_id): + def notify_new_events(self, max_token): """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index e33b29a42c..604cfd1935 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py
@@ -40,7 +40,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -154,10 +154,15 @@ class FederationSender: self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id: int) -> None: + def notify_new_events(self, max_token: RoomStreamToken) -> None: """This gets called when we have some new events we might want to send out to other servers. """ + # We just use the minimum stream ordering and ignore the vector clock + # component. This is safe to do as long as we *always* ignore the vector + # clock components. + current_id = max_token.stream + self._last_poked_id = max(current_id, self._last_poked_id) if self._is_processing: