diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 8b2c4c3043..3ea3ca5a6f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
federation_ack_counter = metrics.register_counter("federation_ack")
remove_pusher_counter = metrics.register_counter("remove_pusher")
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
+user_ip_cache_counter = metrics.register_counter("user_ip_cache")
logger = logging.getLogger(__name__)
@@ -67,6 +68,7 @@ class ReplicationStreamer(object):
self.store = hs.get_datastore()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
# Current connections.
self.connections = []
@@ -99,7 +101,7 @@ class ReplicationStreamer(object):
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
- hs.get_notifier().add_replication_callback(self.on_notifier_poke)
+ self.notifier.add_replication_callback(self.on_notifier_poke)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
@@ -237,6 +239,15 @@ class ReplicationStreamer(object):
invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys))
+ @measure_func("repl.on_user_ip")
+ def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+ """The client saw a user request
+ """
+ user_ip_cache_counter.inc()
+ self.store.insert_client_ip(
+ user_id, access_token, ip, user_agent, device_id, last_seen,
+ )
+
def send_sync_to_all_connections(self, data):
"""Sends a SYNC command to all clients.
|