summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-01-17 10:27:19 +0000
committerGitHub <noreply@github.com>2020-01-17 10:27:19 +0000
commita8a50f5b5746279379b4511c8ecb2a40b143fe32 (patch)
tree5a195c03f3a7d2c20af9bbcb57d1c77fb3dafc11 /synapse/replication/tcp/protocol.py
parentClarify the `account_validity` and `email` sections of the sample configurati... (diff)
downloadsynapse-a8a50f5b5746279379b4511c8ecb2a40b143fe32.tar.xz
Wake up transaction queue when remote server comes back online (#6706)
This will be used to retry outbound transactions to a remote server if
we think it might have come back up.
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py15
1 files changed, 15 insertions, 0 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 5f4bdf84d2..131e5acb09 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -76,6 +76,7 @@ from synapse.replication.tcp.commands import (
     PingCommand,
     PositionCommand,
     RdataCommand,
+    RemoteServerUpCommand,
     ReplicateCommand,
     ServerCommand,
     SyncCommand,
@@ -460,6 +461,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     async def on_INVALIDATE_CACHE(self, cmd):
         self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
+    async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
+        self.streamer.on_remote_server_up(cmd.data)
+
     async def on_USER_IP(self, cmd):
         self.streamer.on_user_ip(
             cmd.user_id,
@@ -555,6 +559,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     def send_sync(self, data):
         self.send_command(SyncCommand(data))
 
+    def send_remote_server_up(self, server: str):
+        self.send_command(RemoteServerUpCommand(server))
+
     def on_connection_closed(self):
         BaseReplicationStreamProtocol.on_connection_closed(self)
         self.streamer.lost_connection(self)
@@ -589,6 +596,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
+    async def on_remote_server_up(self, server: str):
+        """Called when get a new REMOTE_SERVER_UP command."""
+        raise NotImplementedError()
+
+    @abc.abstractmethod
     def get_streams_to_replicate(self):
         """Called when a new connection has been established and we need to
         subscribe to streams.
@@ -707,6 +719,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
     async def on_SYNC(self, cmd):
         self.handler.on_sync(cmd.data)
 
+    async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
+        self.handler.on_remote_server_up(cmd.data)
+
     def replicate(self, stream_name, token):
         """Send the subscription request to the server
         """