diff options
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 22 |
1 files changed, 22 insertions, 0 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 5d108fe11b..a2cabba7b1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -39,6 +39,7 @@ from synapse.replication.tcp.commands import ( ClearUserSyncsCommand, Command, FederationAckCommand, + LockReleasedCommand, PositionCommand, RdataCommand, RemoteServerUpCommand, @@ -248,6 +249,9 @@ class ReplicationCommandHandler: if self._is_master or self._should_insert_client_ips: self.subscribe_to_channel("USER_IP") + if hs.config.redis.redis_enabled: + self._notifier.add_lock_released_callback(self.on_lock_released) + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -648,6 +652,17 @@ class ReplicationCommandHandler: self._notifier.notify_remote_server_up(cmd.data) + def on_LOCK_RELEASED( + self, conn: IReplicationConnection, cmd: LockReleasedCommand + ) -> None: + """Called when we get a new LOCK_RELEASED command.""" + if cmd.instance_name == self._instance_name: + return + + self._notifier.notify_lock_released( + cmd.instance_name, cmd.lock_name, cmd.lock_key + ) + def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" self._connections.append(connection) @@ -754,6 +769,13 @@ class ReplicationCommandHandler: """ self.send_command(RdataCommand(stream_name, self._instance_name, token, data)) + def on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Called when we released a lock and should notify other instances.""" + if instance_name == self._instance_name: + self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key)) + UpdateToken = TypeVar("UpdateToken") UpdateRow = TypeVar("UpdateRow") |