diff options
author | Erik Johnston <erik@matrix.org> | 2020-01-17 10:27:19 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-01-17 10:27:19 +0000 |
commit | a8a50f5b5746279379b4511c8ecb2a40b143fe32 (patch) | |
tree | 5a195c03f3a7d2c20af9bbcb57d1c77fb3dafc11 /synapse/replication/tcp/protocol.py | |
parent | Clarify the `account_validity` and `email` sections of the sample configurati... (diff) | |
download | synapse-a8a50f5b5746279379b4511c8ecb2a40b143fe32.tar.xz |
Wake up transaction queue when remote server comes back online (#6706)
This will be used to retry outbound transactions to a remote server if we think it might have come back up.
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 5f4bdf84d2..131e5acb09 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -76,6 +76,7 @@ from synapse.replication.tcp.commands import ( PingCommand, PositionCommand, RdataCommand, + RemoteServerUpCommand, ReplicateCommand, ServerCommand, SyncCommand, @@ -460,6 +461,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): async def on_INVALIDATE_CACHE(self, cmd): self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): + self.streamer.on_remote_server_up(cmd.data) + async def on_USER_IP(self, cmd): self.streamer.on_user_ip( cmd.user_id, @@ -555,6 +559,9 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def send_sync(self, data): self.send_command(SyncCommand(data)) + def send_remote_server_up(self, server: str): + self.send_command(RemoteServerUpCommand(server)) + def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) self.streamer.lost_connection(self) @@ -589,6 +596,11 @@ class AbstractReplicationClientHandler(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod + async def on_remote_server_up(self, server: str): + """Called when get a new REMOTE_SERVER_UP command.""" + raise NotImplementedError() + + @abc.abstractmethod def get_streams_to_replicate(self): """Called when a new connection has been established and we need to subscribe to streams. @@ -707,6 +719,9 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): async def on_SYNC(self, cmd): self.handler.on_sync(cmd.data) + async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): + self.handler.on_remote_server_up(cmd.data) + def replicate(self, stream_name, token): """Send the subscription request to the server """ |