summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py2
-rw-r--r--synapse/app/frontend_proxy.py267
-rw-r--r--synapse/config/workers.py2
-rw-r--r--synapse/federation/transport/server.py24
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py25
-rw-r--r--synapse/push/push_rule_evaluator.py4
-rw-r--r--synapse/replication/slave/storage/client_ips.py3
-rw-r--r--synapse/replication/tcp/commands.py14
-rw-r--r--synapse/storage/account_data.py13
-rw-r--r--synapse/storage/client_ips.py8
-rw-r--r--synapse/visibility.py19
11 files changed, 325 insertions, 56 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index f8266d1c81..e3da45b416 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -209,7 +209,7 @@ class Auth(object):
             )[0]
             if user and access_token and ip_addr:
                 self.store.insert_client_ip(
-                    user=user,
+                    user_id=user.to_string(),
                     access_token=access_token,
                     ip=ip_addr,
                     user_agent=user_agent,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
new file mode 100644
index 0000000000..c8fa7854ad
--- /dev/null
+++ b/synapse/app/frontend_proxy.py
@@ -0,0 +1,267 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.crypto import context_factory
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request,
+)
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+
+logger = logging.getLogger("synapse.app.frontend_proxy")
+
+
+class KeyUploadServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$",
+                                  releases=())
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(KeyUploadServlet, self).__init__()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+        self.http_client = hs.get_simple_http_client()
+        self.main_uri = hs.config.worker_main_http_uri
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, device_id):
+        requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+        user_id = requester.user.to_string()
+        body = parse_json_object_from_request(request)
+
+        if device_id is not None:
+            # passing the device_id here is deprecated; however, we allow it
+            # for now for compatibility with older clients.
+            if (requester.device_id is not None and
+                    device_id != requester.device_id):
+                logger.warning("Client uploading keys for a different device "
+                               "(logged in as %s, uploading for %s)",
+                               requester.device_id, device_id)
+        else:
+            device_id = requester.device_id
+
+        if device_id is None:
+            raise SynapseError(
+                400,
+                "To upload keys, you must pass device_id when authenticating"
+            )
+
+        if body:
+            # They're actually trying to upload something, proxy to main synapse.
+            result = yield self.http_client.post_json_get_json(
+                self.main_uri + request.uri,
+                body,
+            )
+
+            defer.returnValue((200, result))
+        else:
+            # Just interested in counts.
+            result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+            defer.returnValue((200, {"one_time_key_counts": result}))
+
+
+class FrontendProxySlavedStore(
+    SlavedDeviceStore,
+    SlavedClientIpStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class FrontendProxyServer(HomeServer):
+    def get_db_conn(self, run_new_connection=True):
+        # Any param beginning with cp_ is a parameter for adbapi, and should
+        # not be passed to the database engine.
+        db_params = {
+            k: v for k, v in self.db_config.get("args", {}).items()
+            if not k.startswith("cp_")
+        }
+        db_conn = self.database_engine.module.connect(**db_params)
+
+        if run_new_connection:
+            self.database_engine.on_new_connection(db_conn)
+        return db_conn
+
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_addresses = listener_config["bind_addresses"]
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    KeyUploadServlet(self).register(resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+
+        for address in bind_addresses:
+            reactor.listenTCP(
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                interface=address
+            )
+
+        logger.info("Synapse client reader now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                bind_addresses = listener["bind_addresses"]
+
+                for address in bind_addresses:
+                    reactor.listenTCP(
+                        listener["port"],
+                        manhole(
+                            username="matrix",
+                            password="rabbithole",
+                            globals={"hs": self},
+                        ),
+                        interface=address
+                    )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse frontend proxy", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.frontend_proxy"
+
+    assert config.worker_main_http_uri is not None
+
+    setup_logging(config, use_worker_options=True)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = FrontendProxyServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def run():
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
+            logger.info("Running")
+            change_resource_limit(config.soft_file_limit)
+            if config.gc_thresholds:
+                gc.set_threshold(*config.gc_thresholds)
+            reactor.run()
+
+    def start():
+        ss.get_state_handler().start_caching()
+        ss.get_datastore().start_profiling()
+
+    reactor.callWhenRunning(start)
+
+    if config.worker_daemonize:
+        daemon = Daemonize(
+            app="synapse-frontend-proxy",
+            pid=config.worker_pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ea48d931a1..99d5d8aaeb 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -32,6 +32,8 @@ class WorkerConfig(Config):
         self.worker_replication_port = config.get("worker_replication_port", None)
         self.worker_name = config.get("worker_name", self.worker_app)
 
+        self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+
         if self.worker_listeners:
             for listener in self.worker_listeners:
                 bind_address = listener.pop("bind_address", None)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3d676e7d8b..a78f01e442 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -153,12 +153,10 @@ class Authenticator(object):
 class BaseFederationServlet(object):
     REQUIRE_AUTH = True
 
-    def __init__(self, handler, authenticator, ratelimiter, server_name,
-                 room_list_handler):
+    def __init__(self, handler, authenticator, ratelimiter, server_name):
         self.handler = handler
         self.authenticator = authenticator
         self.ratelimiter = ratelimiter
-        self.room_list_handler = room_list_handler
 
     def _wrap(self, func):
         authenticator = self.authenticator
@@ -590,7 +588,7 @@ class PublicRoomList(BaseFederationServlet):
         else:
             network_tuple = ThirdPartyInstanceID(None, None)
 
-        data = yield self.room_list_handler.get_local_public_room_list(
+        data = yield self.handler.get_local_public_room_list(
             limit, since_token,
             network_tuple=network_tuple
         )
@@ -611,7 +609,7 @@ class FederationVersionServlet(BaseFederationServlet):
         }))
 
 
-SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationPullServlet,
     FederationEventServlet,
@@ -634,17 +632,27 @@ SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     OpenIdUserInfo,
-    PublicRoomList,
     FederationVersionServlet,
 )
 
+ROOM_LIST_CLASSES = (
+    PublicRoomList,
+)
+
 
 def register_servlets(hs, resource, authenticator, ratelimiter):
-    for servletclass in SERVLET_CLASSES:
+    for servletclass in FEDERATION_SERVLET_CLASSES:
         servletclass(
             handler=hs.get_replication_layer(),
             authenticator=authenticator,
             ratelimiter=ratelimiter,
             server_name=hs.hostname,
-            room_list_handler=hs.get_room_list_handler(),
+        ).register(resource)
+
+    for servletclass in ROOM_LIST_CLASSES:
+        servletclass(
+            handler=hs.get_room_list_handler(),
+            authenticator=authenticator,
+            ratelimiter=ratelimiter,
+            server_name=hs.hostname,
         ).register(resource)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9a96e6fe8f..803ac3e75b 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,7 +19,6 @@ from twisted.internet import defer
 
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
-from synapse.visibility import filter_events_for_clients_context
 from synapse.api.constants import EventTypes, Membership
 from synapse.util.caches.descriptors import cached
 from synapse.util.async import Linearizer
@@ -92,15 +91,6 @@ class BulkPushRuleEvaluator(object):
         rules_by_user = yield self._get_rules_for_event(event, context)
         actions_by_user = {}
 
-        # None of these users can be peeking since this list of users comes
-        # from the set of users in the room, so we know for sure they're all
-        # actually in the room.
-        user_tuples = [(u, False) for u in rules_by_user]
-
-        filtered_by_user = yield filter_events_for_clients_context(
-            self.store, user_tuples, [event], {event.event_id: context}
-        )
-
         room_members = yield self.store.get_joined_users_from_context(
             event, context
         )
@@ -110,6 +100,14 @@ class BulkPushRuleEvaluator(object):
         condition_cache = {}
 
         for uid, rules in rules_by_user.iteritems():
+            if event.sender == uid:
+                continue
+
+            if not event.is_state():
+                is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+                if is_ignored:
+                    continue
+
             display_name = None
             profile_info = room_members.get(uid)
             if profile_info:
@@ -121,13 +119,6 @@ class BulkPushRuleEvaluator(object):
                 if event.type == EventTypes.Member and event.state_key == uid:
                     display_name = event.content.get("displayname", None)
 
-            filtered = filtered_by_user[uid]
-            if len(filtered) == 0:
-                continue
-
-            if filtered[0].sender == uid:
-                continue
-
             for rule in rules:
                 if 'enabled' in rule and not rule['enabled']:
                     continue
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 4d88046579..172c27c137 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -200,7 +200,9 @@ def _glob_to_re(glob, word_boundary):
         return re.compile(r, flags=re.IGNORECASE)
 
 
-def _flatten_dict(d, prefix=[], result={}):
+def _flatten_dict(d, prefix=[], result=None):
+    if result is None:
+        result = {}
     for key, value in d.items():
         if isinstance(value, basestring):
             result[".".join(prefix + [key])] = value.lower()
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 65250285e8..352c9a2aa8 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -29,9 +29,8 @@ class SlavedClientIpStore(BaseSlavedStore):
             max_entries=50000 * CACHE_SIZE_FACTOR,
         )
 
-    def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
+    def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
         now = int(self._clock.time_msec())
-        user_id = user.to_string()
         key = (user_id, access_token, ip)
 
         try:
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index a009214e43..171227cce2 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -323,14 +323,18 @@ class UserIpCommand(Command):
 
     @classmethod
     def from_line(cls, line):
-        user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5)
+        user_id, jsn = line.split(" ", 1)
 
-        return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen))
+        access_token, ip, user_agent, device_id, last_seen = json.loads(jsn)
+
+        return cls(
+            user_id, access_token, ip, user_agent, device_id, last_seen
+        )
 
     def to_line(self):
-        return " ".join((
-            self.user_id, self.access_token, self.ip, self.device_id,
-            str(self.last_seen), self.user_agent,
+        return self.user_id + " " + json.dumps((
+            self.access_token, self.ip, self.user_agent, self.device_id,
+            self.last_seen,
         ))
 
 
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index aa84ffc2b0..ff14e54c11 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore):
             " WHERE stream_id < ?"
         )
         txn.execute(update_max_id_sql, (next_id, next_id))
+
+    @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
+    def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
+        ignored_account_data = yield self.get_global_account_data_by_type_for_user(
+            "m.ignored_user_list", ignorer_user_id,
+            on_invalidate=cache_context.invalidate,
+        )
+        if not ignored_account_data:
+            defer.returnValue(False)
+
+        defer.returnValue(
+            ignored_user_id in ignored_account_data.get("ignored_users", {})
+        )
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index fc468ea185..3c95e90eca 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -56,9 +56,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         )
         reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
 
-    def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
-        now = int(self._clock.time_msec())
-        key = (user.to_string(), access_token, ip)
+    def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id,
+                         now=None):
+        if not now:
+            now = int(self._clock.time_msec())
+        key = (user_id, access_token, ip)
 
         try:
             last_seen = self.client_ip_last_seen.get(key)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index c4dd9ae2c7..5590b866ed 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
 
 
 @defer.inlineCallbacks
-def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context):
-    user_ids = set(u[0] for u in user_tuples)
-    event_id_to_state = {}
-    for event_id, context in event_id_to_context.items():
-        state = yield store.get_events([
-            e_id
-            for key, e_id in context.current_state_ids.iteritems()
-            if key == (EventTypes.RoomHistoryVisibility, "")
-            or (key[0] == EventTypes.Member and key[1] in user_ids)
-        ])
-        event_id_to_state[event_id] = state
-
-    res = yield filter_events_for_clients(
-        store, user_tuples, events, event_id_to_state
-    )
-    defer.returnValue(res)
-
-
-@defer.inlineCallbacks
 def filter_events_for_client(store, user_id, events, is_peeking=False):
     """
     Check which events a user is allowed to see