summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2023-11-09 09:48:24 -0500
committerPatrick Cloke <patrickc@matrix.org>2023-11-09 09:48:24 -0500
commit396fa974a10a393be068ff2ab87134e89567b808 (patch)
treefa9cc3cd57a77b71b68dd5f22afe952220ed936a /synapse/replication/tcp/client.py
parentDon't use separate copy_read method. (diff)
parentBump pyicu from 2.11 to 2.12 (#16603) (diff)
downloadsynapse-396fa974a10a393be068ff2ab87134e89567b808.tar.xz
Merge remote-tracking branch 'origin/develop' into clokep/psycopg3
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py20
1 files changed, 4 insertions, 16 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index 1c7946522a..1312b6f21e 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -126,12 +126,11 @@ class ReplicationDataHandler: StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows] ) elif stream_name == ReceiptsStream.NAME: + new_token = self.store.get_max_receipt_stream_id() self.notifier.on_new_event( - StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] - ) - await self._pusher_pool.on_new_receipts( - token, token, {row.room_id for row in rows} + StreamKeyType.RECEIPT, new_token, rooms=[row.room_id for row in rows] ) + await self._pusher_pool.on_new_receipts({row.user_id for row in rows}) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: @@ -281,14 +280,6 @@ class ReplicationDataHandler: # may be streaming. self.notifier.notify_replication() - def on_remote_server_up(self, server: str) -> None: - """Called when get a new REMOTE_SERVER_UP command.""" - - # Let's wake up the transaction queue for the server in case we have - # pending stuff to send to it. - if self.send_handler: - self.send_handler.wake_destination(server) - async def wait_for_stream_position( self, instance_name: str, @@ -339,7 +330,7 @@ class ReplicationDataHandler: try: await make_deferred_yieldable(deferred) except defer.TimeoutError: - logger.error( + logger.warning( "Timed out waiting for repl stream %r to reach %s (%s)" "; currently at: %s", stream_name, @@ -407,9 +398,6 @@ class FederationSenderHandler: self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") - def wake_destination(self, server: str) -> None: - self.federation_sender.wake_destination(server) - async def process_replication_rows( self, stream_name: str, token: int, rows: list ) -> None: