diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1c7946522a..1312b6f21e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -126,12 +126,11 @@ class ReplicationDataHandler:
StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
)
elif stream_name == ReceiptsStream.NAME:
+ new_token = self.store.get_max_receipt_stream_id()
self.notifier.on_new_event(
- StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows]
- )
- await self._pusher_pool.on_new_receipts(
- token, token, {row.room_id for row in rows}
+ StreamKeyType.RECEIPT, new_token, rooms=[row.room_id for row in rows]
)
+ await self._pusher_pool.on_new_receipts({row.user_id for row in rows})
elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
@@ -281,14 +280,6 @@ class ReplicationDataHandler:
# may be streaming.
self.notifier.notify_replication()
- def on_remote_server_up(self, server: str) -> None:
- """Called when get a new REMOTE_SERVER_UP command."""
-
- # Let's wake up the transaction queue for the server in case we have
- # pending stuff to send to it.
- if self.send_handler:
- self.send_handler.wake_destination(server)
-
async def wait_for_stream_position(
self,
instance_name: str,
@@ -339,7 +330,7 @@ class ReplicationDataHandler:
try:
await make_deferred_yieldable(deferred)
except defer.TimeoutError:
- logger.error(
+ logger.warning(
"Timed out waiting for repl stream %r to reach %s (%s)"
"; currently at: %s",
stream_name,
@@ -407,9 +398,6 @@ class FederationSenderHandler:
self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
- def wake_destination(self, server: str) -> None:
- self.federation_sender.wake_destination(server)
-
async def process_replication_rows(
self, stream_name: str, token: int, rows: list
) -> None:
|