summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py3
-rw-r--r--synapse/app/client_reader.py4
-rw-r--r--synapse/app/media_repository.py4
-rw-r--r--synapse/app/synchrotron.py4
-rw-r--r--synapse/app/user_dir.py4
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/message.py9
-rw-r--r--synapse/replication/slave/storage/client_ips.py48
-rw-r--r--synapse/replication/tcp/client.py7
-rw-r--r--synapse/replication/tcp/commands.py32
-rw-r--r--synapse/replication/tcp/protocol.py6
-rw-r--r--synapse/replication/tcp/resource.py10
-rw-r--r--synapse/storage/__init__.py10
-rw-r--r--synapse/storage/client_ips.py127
14 files changed, 220 insertions, 52 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 0c297cb022..10f4972369 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -23,7 +23,6 @@ from synapse import event_auth
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import AuthError, Codes
 from synapse.types import UserID
-from synapse.util import logcontext
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -200,7 +199,7 @@ class Auth(object):
                 default=[""]
             )[0]
             if user and access_token and ip_addr:
-                logcontext.preserve_fn(self.store.insert_client_ip)(
+                self.store.insert_client_ip(
                     user=user,
                     access_token=access_token,
                     ip=ip_addr,
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 9b72c649ac..09bc1935f1 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -24,6 +24,7 @@ from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
@@ -33,7 +34,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
@@ -65,8 +65,8 @@ class ClientReaderSlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     TransactionStore,
+    SlavedClientIpStore,
     BaseSlavedStore,
-    ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
     pass
 
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 26c4416956..f57ec784fe 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -23,13 +23,13 @@ from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.httpresourcetree import create_resource_tree
@@ -60,10 +60,10 @@ logger = logging.getLogger("synapse.app.media_repository")
 class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    SlavedClientIpStore,
     TransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
-    ClientIpStore,
 ):
     pass
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 13c00ef2ba..4bdd99a966 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -29,6 +29,7 @@ from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -42,7 +43,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
@@ -77,9 +77,9 @@ class SynchrotronSlavedStore(
     SlavedPresenceStore,
     SlavedDeviceInboxStore,
     SlavedDeviceStore,
+    SlavedClientIpStore,
     RoomStore,
     BaseSlavedStore,
-    ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
     who_forgot_in_room = (
         RoomMemberStore.__dict__["who_forgot_in_room"]
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 6d2aebe8de..8c6300db9d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -26,12 +26,12 @@ from synapse.http.server import JsonResource
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha import user_directory
 from synapse.storage.engines import create_engine
-from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
@@ -58,9 +58,9 @@ class UserDirectorySlaveStore(
     SlavedEventStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
+    SlavedClientIpStore,
     UserDirectoryStore,
     BaseSlavedStore,
-    ClientIpStore,  # After BaseSlavedStore because the constructor is different
 ):
     def __init__(self, db_conn, hs):
         super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 982cda3edf..ed60d494ff 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -106,7 +106,7 @@ class DeviceHandler(BaseHandler):
         device_map = yield self.store.get_devices_by_user(user_id)
 
         ips = yield self.store.get_last_client_ip_by_device(
-            devices=((user_id, device_id) for device_id in device_map.keys())
+            user_id, device_id=None
         )
 
         devices = device_map.values()
@@ -133,7 +133,7 @@ class DeviceHandler(BaseHandler):
         except errors.StoreError:
             raise errors.NotFoundError
         ips = yield self.store.get_last_client_ip_by_device(
-            devices=((user_id, device_id),)
+            user_id, device_id,
         )
         _update_device_from_client_ips(device, ips)
         defer.returnValue(device)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a04f634c5c..24c9ffdb20 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,6 +34,7 @@ from canonicaljson import encode_canonical_json
 
 import logging
 import random
+import ujson
 
 logger = logging.getLogger(__name__)
 
@@ -498,6 +499,14 @@ class MessageHandler(BaseHandler):
             logger.warn("Denying new event %r because %s", event, err)
             raise err
 
+        # Ensure that we can round trip before trying to persist in db
+        try:
+            dump = ujson.dumps(event.content)
+            ujson.loads(dump)
+        except:
+            logger.exception("Failed to encode content: %r", event.content)
+            raise
+
         yield self.maybe_kick_guest_users(event, context)
 
         if event.type == EventTypes.CanonicalAlias:
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
new file mode 100644
index 0000000000..65250285e8
--- /dev/null
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseSlavedStore
+from synapse.storage.client_ips import LAST_SEEN_GRANULARITY
+from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches.descriptors import Cache
+
+
+class SlavedClientIpStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(SlavedClientIpStore, self).__init__(db_conn, hs)
+
+        self.client_ip_last_seen = Cache(
+            name="client_ip_last_seen",
+            keylen=4,
+            max_entries=50000 * CACHE_SIZE_FACTOR,
+        )
+
+    def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
+        now = int(self._clock.time_msec())
+        user_id = user.to_string()
+        key = (user_id, access_token, ip)
+
+        try:
+            last_seen = self.client_ip_last_seen.get(key)
+        except KeyError:
+            last_seen = None
+
+        # Rate-limited inserts
+        if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
+            return
+
+        self.hs.get_tcp_replication().send_user_ip(
+            user_id, access_token, ip, user_agent, device_id, now
+        )
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 90fb6c1336..6d2513c4e2 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
     FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand,
+    UserIpCommand,
 )
 from .protocol import ClientReplicationStreamProtocol
 
@@ -178,6 +179,12 @@ class ReplicationClientHandler(object):
         cmd = InvalidateCacheCommand(cache_func.__name__, keys)
         self.send_command(cmd)
 
+    def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        """Tell the master that the user made a request.
+        """
+        cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
+        self.send_command(cmd)
+
     def await_sync(self, data):
         """Returns a deferred that is resolved when we receive a SYNC command
         with given data.
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 84d2a2272a..a009214e43 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command):
         return " ".join((self.cache_func, json.dumps(self.keys)))
 
 
+class UserIpCommand(Command):
+    """Sent periodically when a worker sees activity from a client.
+
+    Format::
+
+        USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent>
+    """
+    NAME = "USER_IP"
+
+    def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        self.user_id = user_id
+        self.access_token = access_token
+        self.ip = ip
+        self.user_agent = user_agent
+        self.device_id = device_id
+        self.last_seen = last_seen
+
+    @classmethod
+    def from_line(cls, line):
+        user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
+
+        return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
+
+    def to_line(self):
+        return " ".join((
+            self.user_id, self.access_token, self.ip, self.device_id,
+            str(self.last_seen), self.user_agent,
+        ))
+
+
 # Map of command name to command type.
 COMMAND_MAP = {
     cmd.NAME: cmd
@@ -320,6 +350,7 @@ COMMAND_MAP = {
         SyncCommand,
         RemovePusherCommand,
         InvalidateCacheCommand,
+        UserIpCommand,
     )
 }
 
@@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = (
     FederationAckCommand.NAME,
     RemovePusherCommand.NAME,
     InvalidateCacheCommand.NAME,
+    UserIpCommand.NAME,
     ErrorCommand.NAME,
 )
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 9fee2a484b..062272f8dd 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
     def on_INVALIDATE_CACHE(self, cmd):
         self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
+    def on_USER_IP(self, cmd):
+        self.streamer.on_user_ip(
+            cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id,
+            cmd.last_seen,
+        )
+
     @defer.inlineCallbacks
     def subscribe_to_stream(self, stream_name, token):
         """Subscribe the remote to a streams.
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 69c46911ec..3ea3ca5a6f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync")
 federation_ack_counter = metrics.register_counter("federation_ack")
 remove_pusher_counter = metrics.register_counter("remove_pusher")
 invalidate_cache_counter = metrics.register_counter("invalidate_cache")
+user_ip_cache_counter = metrics.register_counter("user_ip_cache")
 
 logger = logging.getLogger(__name__)
 
@@ -238,6 +239,15 @@ class ReplicationStreamer(object):
         invalidate_cache_counter.inc()
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
+    @measure_func("repl.on_user_ip")
+    def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
+        """The client saw a user request
+        """
+        user_ip_cache_counter.inc()
+        self.store.insert_client_ip(
+            user_id, access_token, ip, user_agent, device_id, last_seen,
+        )
+
     def send_sync_to_all_connections(self, data):
         """Sends a SYNC command to all clients.
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index f119c5a758..b92472df33 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -304,16 +304,6 @@ class DataStore(RoomMemberStore, RoomStore,
         ret = yield self.runInteraction("count_users", _count_users)
         defer.returnValue(ret)
 
-    def get_user_ip_and_agents(self, user):
-        return self._simple_select_list(
-            table="user_ips",
-            keyvalues={"user_id": user.to_string()},
-            retcols=[
-                "access_token", "ip", "user_agent", "last_seen"
-            ],
-            desc="get_user_ip_and_agents",
-        )
-
     def get_users(self):
         """Function to reterive a list of users in users table.
 
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 014ab635b7..88a5eb232f 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -15,7 +15,7 @@
 
 import logging
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 
 from ._base import Cache
 from . import background_updates
@@ -50,7 +50,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             columns=["user_id", "device_id", "last_seen"],
         )
 
-    @defer.inlineCallbacks
+        # (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
+        self._batch_row_update = {}
+
+        self._client_ip_looper = self._clock.looping_call(
+            self._update_client_ips_batch, 5 * 1000
+        )
+        reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
+
     def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
         now = int(self._clock.time_msec())
         key = (user.to_string(), access_token, ip)
@@ -62,34 +69,48 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
 
         # Rate-limited inserts
         if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
-            defer.returnValue(None)
+            return
 
         self.client_ip_last_seen.prefill(key, now)
 
-        # It's safe not to lock here: a) no unique constraint,
-        # b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
-        yield self._simple_upsert(
-            "user_ips",
-            keyvalues={
-                "user_id": user.to_string(),
-                "access_token": access_token,
-                "ip": ip,
-                "user_agent": user_agent,
-                "device_id": device_id,
-            },
-            values={
-                "last_seen": now,
-            },
-            desc="insert_client_ip",
-            lock=False,
+        self._batch_row_update[key] = (user_agent, device_id, now)
+
+    def _update_client_ips_batch(self):
+        to_update = self._batch_row_update
+        self._batch_row_update = {}
+        return self.runInteraction(
+            "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
         )
 
+    def _update_client_ips_batch_txn(self, txn, to_update):
+        self.database_engine.lock_table(txn, "user_ips")
+
+        for entry in to_update.iteritems():
+            (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
+
+            self._simple_upsert_txn(
+                txn,
+                table="user_ips",
+                keyvalues={
+                    "user_id": user_id,
+                    "access_token": access_token,
+                    "ip": ip,
+                    "user_agent": user_agent,
+                    "device_id": device_id,
+                },
+                values={
+                    "last_seen": last_seen,
+                },
+                lock=False,
+            )
+
     @defer.inlineCallbacks
-    def get_last_client_ip_by_device(self, devices):
+    def get_last_client_ip_by_device(self, user_id, device_id):
         """For each device_id listed, give the user_ip it was last seen on
 
         Args:
-            devices (iterable[(str, str)]):  list of (user_id, device_id) pairs
+            user_id (str)
+            device_id (str): If None fetches all devices for the user
 
         Returns:
             defer.Deferred: resolves to a dict, where the keys
@@ -100,6 +121,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         res = yield self.runInteraction(
             "get_last_client_ip_by_device",
             self._get_last_client_ip_by_device_txn,
+            user_id, device_id,
             retcols=(
                 "user_id",
                 "access_token",
@@ -108,23 +130,34 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 "device_id",
                 "last_seen",
             ),
-            devices=devices
         )
 
         ret = {(d["user_id"], d["device_id"]): d for d in res}
+        for key in self._batch_row_update:
+            uid, access_token, ip = key
+            if uid == user_id:
+                user_agent, did, last_seen = self._batch_row_update[key]
+                if not device_id or did == device_id:
+                    ret[(user_id, device_id)] = {
+                        "user_id": user_id,
+                        "access_token": access_token,
+                        "ip": ip,
+                        "user_agent": user_agent,
+                        "device_id": did,
+                        "last_seen": last_seen,
+                    }
         defer.returnValue(ret)
 
     @classmethod
-    def _get_last_client_ip_by_device_txn(cls, txn, devices, retcols):
+    def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
         where_clauses = []
         bindings = []
-        for (user_id, device_id) in devices:
-            if device_id is None:
-                where_clauses.append("(user_id = ? AND device_id IS NULL)")
-                bindings.extend((user_id, ))
-            else:
-                where_clauses.append("(user_id = ? AND device_id = ?)")
-                bindings.extend((user_id, device_id))
+        if device_id is None:
+            where_clauses.append("user_id = ?")
+            bindings.extend((user_id, ))
+        else:
+            where_clauses.append("(user_id = ? AND device_id = ?)")
+            bindings.extend((user_id, device_id))
 
         if not where_clauses:
             return []
@@ -152,3 +185,37 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
 
         txn.execute(sql, bindings)
         return cls.cursor_to_dict(txn)
+
+    @defer.inlineCallbacks
+    def get_user_ip_and_agents(self, user):
+        user_id = user.to_string()
+        results = {}
+
+        for key in self._batch_row_update:
+            uid, access_token, ip = key
+            if uid == user_id:
+                user_agent, _, last_seen = self._batch_row_update[key]
+                results[(access_token, ip)] = (user_agent, last_seen)
+
+        rows = yield self._simple_select_list(
+            table="user_ips",
+            keyvalues={"user_id": user_id},
+            retcols=[
+                "access_token", "ip", "user_agent", "last_seen"
+            ],
+            desc="get_user_ip_and_agents",
+        )
+
+        results.update(
+            ((row["access_token"], row["ip"]), (row["user_agent"], row["last_seen"]))
+            for row in rows
+        )
+        defer.returnValue(list(
+            {
+                "access_token": access_token,
+                "ip": ip,
+                "user_agent": user_agent,
+                "last_seen": last_seen,
+            }
+            for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+        ))