summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/client_ips.py48
-rw-r--r--synapse/replication/tcp/client.py7
-rw-r--r--synapse/replication/tcp/commands.py32
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/replication/tcp/resource.py10
5 files changed, 103 insertions, 0 deletions
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
new file mode 100644
index 0000000000..65250285e8
--- /dev/null
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
+from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches.descriptors import Cache
+
+
+class SlavedClientIpStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedClientIpStore, self).__init__(db_conn, hs)
+
+        self.client_ip_last_seen = Cache(
+            name="client_ip_last_seen",
+            keylen=4,
+            max_entries=50000 * CACHE_SIZE_FACTOR,
+        )
+
+    def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
+        now = int(self._clock.time_msec())
+        user_id = user.to_string()
+        key = (user_id, access_token, ip)
+
+        try:
+            last_seen = self.client_ip_last_seen.get(key)
+        except KeyError:
+            last_seen = None
+
+        # Rate-limited inserts
+        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+            return
+
+        self.hs.get_tcp_replication().send_user_ip(
+            user_id, access_token, ip, user_agent, device_id, now
+        )
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 90fb6c1336..6d2513c4e2 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
     FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+    UserIpCommand,
 )
 from .protocol import ClientReplicationStreamProtocol
 
@@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
         cmd = InvalidateCacheCommand(cache_func.__name__, keys)
         self.send_command(cmd)
 
+    def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        """Tell the master that the user made a request.
+        """
+        cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
+        self.send_command(cmd)
+
     def await_sync(self, data):
         """Returns a deferred that is resolved when we receive a SYNC command
         with given data.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 84d2a2272a..a009214e43 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command):
         return " ".join((self.cache_func, json.dumps(self.keys)))
 
 
+class UserIpCommand(Command):
+    """Sent periodically when a worker sees activity from a client.
+
+    Format::
+
+        USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
+    """
+    NAME = "USER_IP"
+
+    def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        self.user_id = user_id
+        self.access_token = access_token
+        self.ip = ip
+        self.user_agent = user_agent
+        self.device_id = device_id
+        self.last_seen = last_seen
+
+    @classmethod
+    def from_line(cls, line):
+        user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
+
+        return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
+
+    def to_line(self):
+        return " ".join((
+            self.user_id, self.access_token, self.ip, self.device_id,
+            str(self.last_seen), self.user_agent,
+        ))
+
+
 # Map of command name to command type.
 COMMAND_MAP = {
     cmd.NAME: cmd
@@ -320,6 +350,7 @@ COMMAND_MAP = {
         SyncCommand,
         RemovePusherCommand,
         InvalidateCacheCommand,
+        UserIpCommand,
     )
 }
 
@@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = (
     FederationAckCommand.NAME,
     RemovePusherCommand.NAME,
     InvalidateCacheCommand.NAME,
+    UserIpCommand.NAME,
     ErrorCommand.NAME,
 )
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9fee2a484b..062272f8dd 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     def on_INVALIDATE_CACHE(self, cmd):
         self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
+    def on_USER_IP(self, cmd):
+        self.streamer.on_user_ip(
+            cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
+            cmd.last_seen,
+        )
+
     @defer.inlineCallbacks
     def subscribe_to_stream(self, stream_name, token):
         """Subscribe the remote to a streams.
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 69c46911ec..3ea3ca5a6f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
 federation_ack_counter = metrics.register_counter("federation_ack")
 remove_pusher_counter = metrics.register_counter("remove_pusher")
 invalidate_cache_counter = metrics.register_counter("invalidate_cache")
+user_ip_cache_counter = metrics.register_counter("user_ip_cache")
 
 logger = logging.getLogger(__name__)
 
@@ -238,6 +239,15 @@ class ReplicationStreamer(object):
         invalidate_cache_counter.inc()
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
+    @measure_func("repl.on_user_ip")
+    def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        """The client saw a user request
+        """
+        user_ip_cache_counter.inc()
+        self.store.insert_client_ip(
+            user_id, access_token, ip, user_agent, device_id, last_seen,
+        )
+
     def send_sync_to_all_connections(self, data):
         """Sends a SYNC command to all clients.