diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 5d108fe11b..a2cabba7b1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -39,6 +39,7 @@ from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
FederationAckCommand,
+ LockReleasedCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
@@ -248,6 +249,9 @@ class ReplicationCommandHandler:
if self._is_master or self._should_insert_client_ips:
self.subscribe_to_channel("USER_IP")
+ if hs.config.redis.redis_enabled:
+ self._notifier.add_lock_released_callback(self.on_lock_released)
+
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -648,6 +652,17 @@ class ReplicationCommandHandler:
self._notifier.notify_remote_server_up(cmd.data)
+ def on_LOCK_RELEASED(
+ self, conn: IReplicationConnection, cmd: LockReleasedCommand
+ ) -> None:
+ """Called when we get a new LOCK_RELEASED command."""
+ if cmd.instance_name == self._instance_name:
+ return
+
+ self._notifier.notify_lock_released(
+ cmd.instance_name, cmd.lock_name, cmd.lock_key
+ )
+
def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
self._connections.append(connection)
@@ -754,6 +769,13 @@ class ReplicationCommandHandler:
"""
self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
+ def on_lock_released(
+ self, instance_name: str, lock_name: str, lock_key: str
+ ) -> None:
+ """Called when we released a lock and should notify other instances."""
+ if instance_name == self._instance_name:
+ self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key))
+
UpdateToken = TypeVar("UpdateToken")
UpdateRow = TypeVar("UpdateRow")
|