diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index a59ab01471..b620b29dfb 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -17,7 +17,7 @@
"""
import logging
-from typing import Any, Dict, List
+from typing import Any, Callable, Dict, List
from prometheus_client import Counter
@@ -63,7 +63,7 @@ class ReplicationClientHandler:
self.presence_handler = hs.get_presence_handler()
self.instance_id = hs.get_instance_id()
- self.connections = []
+ self.connections = [] # type: List[Any]
self.streams = {
stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
@@ -186,6 +186,8 @@ class ReplicationClientHandler:
cmd.device_id,
cmd.last_seen,
)
+
+ if self._server_notices_sender:
await self._server_notices_sender.on_user_ip(cmd.user_id)
async def on_RDATA(self, cmd: RdataCommand):
@@ -259,7 +261,7 @@ class ReplicationClientHandler:
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
"""Called when get a new REMOTE_SERVER_UP command."""
if self.is_master:
- self.notifier.notify_remote_server_up(cmd.server)
+ self.notifier.notify_remote_server_up(cmd.data)
def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
@@ -296,7 +298,7 @@ class ReplicationClientHandler:
cmd = RemovePusherCommand(app_id, push_key, user_id)
self.send_command(cmd)
- def send_invalidate_cache(self, cache_func: str, keys: tuple):
+ def send_invalidate_cache(self, cache_func: Callable, keys: tuple):
"""Poke the master to invalidate a cache.
"""
cmd = InvalidateCacheCommand(cache_func.__name__, keys)
|