2 files changed, 8 insertions, 3 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 8e46957d15..5f1bf492c1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -188,7 +188,7 @@ class FederationRemoteSendQueue:
for key in keys[:i]:
del self.edus[key]
- def notify_new_events(self, current_id):
+ def notify_new_events(self, max_token):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index e33b29a42c..604cfd1935 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -40,7 +40,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -154,10 +154,15 @@ class FederationSender:
self._per_destination_queues[destination] = queue
return queue
- def notify_new_events(self, current_id: int) -> None:
+ def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ current_id = max_token.stream
+
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
|