summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/federation_sender.py')
-rw-r--r--synapse/app/federation_sender.py17
1 files changed, 13 insertions, 4 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 83c436229c..38d11fdd0f 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -145,9 +145,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
         self.send_handler = FederationSenderHandler(hs, self)
 
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
-        yield super(FederationSenderReplicationHandler, self).on_rdata(
+    async def on_rdata(self, stream_name, token, rows):
+        await super(FederationSenderReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -159,6 +158,13 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         args.update(self.send_handler.stream_positions())
         return args
 
+    def on_remote_server_up(self, server: str):
+        """Called when get a new REMOTE_SERVER_UP command."""
+
+        # Let's wake up the transaction queue for the server in case we have
+        # pending stuff to send to it.
+        self.send_handler.wake_destination(server)
+
 
 def start(config_options):
     try:
@@ -206,7 +212,7 @@ class FederationSenderHandler(object):
     to the federation sender.
     """
 
-    def __init__(self, hs, replication_client):
+    def __init__(self, hs: FederationSenderServer, replication_client):
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
@@ -227,6 +233,9 @@ class FederationSenderHandler(object):
             self.store.get_room_max_stream_ordering()
         )
 
+    def wake_destination(self, server: str):
+        self.federation_sender.wake_destination(server)
+
     def stream_positions(self):
         return {"federation": self.federation_position}