summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/server.py3
-rw-r--r--synapse/config/tls.py15
-rw-r--r--synapse/crypto/context_factory.py5
-rw-r--r--synapse/crypto/keyring.py13
-rw-r--r--synapse/federation/federation_client.py121
-rw-r--r--synapse/federation/federation_server.py8
-rw-r--r--synapse/handlers/events.py9
-rw-r--r--synapse/http/server.py44
-rw-r--r--synapse/notifier.py16
-rw-r--r--synapse/util/lrucache.py7
10 files changed, 188 insertions, 53 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4e4892d40b..b042d4eed9 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -30,7 +30,6 @@ class ServerConfig(Config):
         self.pid_file = self.abspath(args.pid_file)
         self.webclient = True
         self.manhole = args.manhole
-        self.no_tls = args.no_tls
         self.soft_file_limit = args.soft_file_limit
 
         if not args.content_addr:
@@ -76,8 +75,6 @@ class ServerConfig(Config):
         server_group.add_argument("--content-addr", default=None,
                                   help="The host and scheme to use for the "
                                   "content repository")
-        server_group.add_argument("--no-tls", action='store_true',
-                                  help="Don't bind to the https port.")
         server_group.add_argument("--soft-file-limit", type=int, default=0,
                                   help="Set the soft limit on the number of "
                                        "file descriptors synapse can use. "
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 384b29e7ba..034f9a7bf0 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -28,9 +28,16 @@ class TlsConfig(Config):
         self.tls_certificate = self.read_tls_certificate(
             args.tls_certificate_path
         )
-        self.tls_private_key = self.read_tls_private_key(
-            args.tls_private_key_path
-        )
+
+        self.no_tls = args.no_tls
+
+        if self.no_tls:
+            self.tls_private_key = None
+        else:
+            self.tls_private_key = self.read_tls_private_key(
+                args.tls_private_key_path
+            )
+
         self.tls_dh_params_path = self.check_file(
             args.tls_dh_params_path, "tls_dh_params"
         )
@@ -45,6 +52,8 @@ class TlsConfig(Config):
                                help="PEM encoded private key for TLS")
         tls_group.add_argument("--tls-dh-params-path",
                                help="PEM dh parameters for ephemeral keys")
+        tls_group.add_argument("--no-tls", action='store_true',
+                               help="Don't bind to the https port.")
 
     def read_tls_certificate(self, cert_path):
         cert_pem = self.read_file(cert_path, "tls_certificate")
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 24d4abf3e9..2f8618a0df 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -38,7 +38,10 @@ class ServerContextFactory(ssl.ContextFactory):
             logger.exception("Failed to enable eliptic curve for TLS")
         context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
         context.use_certificate(config.tls_certificate)
-        context.use_privatekey(config.tls_private_key)
+
+        if not config.no_tls:
+            context.use_privatekey(config.tls_private_key)
+
         context.load_tmp_dh(config.tls_dh_params_path)
         context.set_cipher_list("!ADH:HIGH+kEDH:!AECDH:HIGH+kEECDH")
 
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 828aced44a..f4db7b8a05 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -50,18 +50,27 @@ class Keyring(object):
             )
         try:
             verify_key = yield self.get_server_verify_key(server_name, key_ids)
-        except IOError:
+        except IOError as e:
+            logger.warn(
+                "Got IOError when downloading keys for %s: %s %s",
+                server_name, type(e).__name__, str(e.message),
+            )
             raise SynapseError(
                 502,
                 "Error downloading keys for %s" % (server_name,),
                 Codes.UNAUTHORIZED,
             )
-        except:
+        except Exception as e:
+            logger.warn(
+                "Got Exception when downloading keys for %s: %s %s",
+                server_name, type(e).__name__, str(e.message),
+            )
             raise SynapseError(
                 401,
                 "No key for %s with id %s" % (server_name, key_ids),
                 Codes.UNAUTHORIZED,
             )
+
         try:
             verify_signed_json(json_object, server_name, verify_key)
         except:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ca89a0787c..f131941f45 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,14 +19,18 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Edu
 
-from synapse.api.errors import CodeMessageException, SynapseError
+from synapse.api.errors import (
+    CodeMessageException, HttpResponseException, SynapseError,
+)
 from synapse.util.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 
 from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
 
+import itertools
 import logging
+import random
 
 
 logger = logging.getLogger(__name__)
@@ -440,21 +444,112 @@ class FederationClient(FederationBase):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_missing_events(self, destination, room_id, earliest_events,
+    def get_missing_events(self, destination, room_id, earliest_events_ids,
                            latest_events, limit, min_depth):
-        content = yield self.transport_layer.get_missing_events(
-            destination, room_id, earliest_events, latest_events, limit,
-            min_depth,
-        )
+        """Tries to fetch events we are missing. This is called when we receive
+        an event without having received all of its ancestors.
 
-        events = [
-            self.event_from_pdu_json(e)
-            for e in content.get("events", [])
-        ]
+        Args:
+            destination (str)
+            room_id (str)
+            earliest_events_ids (list): List of event ids. Effectively the
+                events we expected to receive, but haven't. `get_missing_events`
+                should only return events that didn't happen before these.
+            latest_events (list): List of events we have received that we don't
+                have all previous events for.
+            limit (int): Maximum number of events to return.
+            min_depth (int): Minimum depth of events tor return.
+        """
+        try:
+            content = yield self.transport_layer.get_missing_events(
+                destination=destination,
+                room_id=room_id,
+                earliest_events=earliest_events_ids,
+                latest_events=[e.event_id for e in latest_events],
+                limit=limit,
+                min_depth=min_depth,
+            )
+
+            events = [
+                self.event_from_pdu_json(e)
+                for e in content.get("events", [])
+            ]
+
+            signed_events = yield self._check_sigs_and_hash_and_fetch(
+                destination, events, outlier=True
+            )
+
+            have_gotten_all_from_destination = True
+        except HttpResponseException as e:
+            if not e.code == 400:
+                raise
 
-        signed_events = yield self._check_sigs_and_hash_and_fetch(
-            destination, events, outlier=True
-        )
+            # We are probably hitting an old server that doesn't support
+            # get_missing_events
+            signed_events = []
+            have_gotten_all_from_destination = False
+
+        if len(signed_events) >= limit:
+            defer.returnValue(signed_events)
+
+        servers = yield self.store.get_joined_hosts_for_room(room_id)
+
+        servers = set(servers)
+        servers.discard(self.server_name)
+
+        failed_to_fetch = set()
+
+        while len(signed_events) < limit:
+            # Are we missing any?
+
+            seen_events = set(earliest_events_ids)
+            seen_events.update(e.event_id for e in signed_events)
+
+            missing_events = {}
+            for e in itertools.chain(latest_events, signed_events):
+                if e.depth > min_depth:
+                    missing_events.update({
+                        e_id: e.depth for e_id, _ in e.prev_events
+                        if e_id not in seen_events
+                        and e_id not in failed_to_fetch
+                    })
+
+            if not missing_events:
+                break
+
+            have_seen = yield self.store.have_events(missing_events)
+
+            for k in have_seen:
+                missing_events.pop(k, None)
+
+            if not missing_events:
+                break
+
+            # Okay, we haven't gotten everything yet. Lets get them.
+            ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
+
+            if have_gotten_all_from_destination:
+                servers.discard(destination)
+
+            def random_server_list():
+                srvs = list(servers)
+                random.shuffle(srvs)
+                return srvs
+
+            deferreds = [
+                self.get_pdu(
+                    destinations=random_server_list(),
+                    event_id=e_id,
+                )
+                for e_id, depth in ordered_missing[:limit - len(signed_events)]
+            ]
+
+            res = yield defer.DeferredList(deferreds, consumeErrors=True)
+            for (result, val), (e_id, _) in zip(res, ordered_missing):
+                if result:
+                    signed_events.append(val)
+                else:
+                    failed_to_fetch.add(e_id)
 
         defer.returnValue(signed_events)
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 4264d857be..9c7dcdba96 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -413,12 +413,16 @@ class FederationServer(FederationBase):
                     missing_events = yield self.get_missing_events(
                         origin,
                         pdu.room_id,
-                        earliest_events=list(latest),
-                        latest_events=[pdu.event_id],
+                        earliest_events_ids=list(latest),
+                        latest_events=[pdu],
                         limit=10,
                         min_depth=min_depth,
                     )
 
+                    # We want to sort these by depth so we process them and
+                    # tell clients about them in order.
+                    missing_events.sort(key=lambda x: x.depth)
+
                     for e in missing_events:
                         yield self._handle_new_pdu(
                             origin,
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 8d5f5c8499..d3297b7292 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -23,6 +23,7 @@ from synapse.events.utils import serialize_event
 from ._base import BaseHandler
 
 import logging
+import random
 
 
 logger = logging.getLogger(__name__)
@@ -72,6 +73,14 @@ class EventStreamHandler(BaseHandler):
             rm_handler = self.hs.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
+            if timeout:
+                # If they've set a timeout set a minimum limit.
+                timeout = max(timeout, 500)
+
+                # Add some randomness to this value to try and mitigate against
+                # thundering herds on restart.
+                timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+
             with PreserveLoggingContext():
                 events, tokens = yield self.notifier.get_events_for(
                     auth_user, room_ids, pagin_config, timeout
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 74a101a5d7..767c3ef79b 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -124,27 +124,29 @@ class JsonResource(HttpServer, resource.Resource):
             # and path regex match
             for path_entry in self.path_regexs.get(request.method, []):
                 m = path_entry.pattern.match(request.path)
-                if m:
-                    # We found a match! Trigger callback and then return the
-                    # returned response. We pass both the request and any
-                    # matched groups from the regex to the callback.
-
-                    args = [
-                        urllib.unquote(u).decode("UTF-8") for u in m.groups()
-                    ]
-
-                    logger.info(
-                        "Received request: %s %s",
-                        request.method, request.path
-                    )
-
-                    code, response = yield path_entry.callback(
-                        request,
-                        *args
-                    )
-
-                    self._send_response(request, code, response)
-                    return
+                if not m:
+                    continue
+
+                # We found a match! Trigger callback and then return the
+                # returned response. We pass both the request and any
+                # matched groups from the regex to the callback.
+
+                args = [
+                    urllib.unquote(u).decode("UTF-8") for u in m.groups()
+                ]
+
+                logger.info(
+                    "Received request: %s %s",
+                    request.method, request.path
+                )
+
+                code, response = yield path_entry.callback(
+                    request,
+                    *args
+                )
+
+                self._send_response(request, code, response)
+                return
 
             # Huh. No one wanted to handle that? Fiiiiiine. Send 400.
             raise UnrecognizedRequestError()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 09d23e79b8..df13e8ddb6 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -63,7 +63,7 @@ class _NotificationListener(object):
             pass
 
         for room in self.rooms:
-            lst = notifier.rooms_to_listeners.get(room, set())
+            lst = notifier.room_to_listeners.get(room, set())
             lst.discard(self)
 
         notifier.user_to_listeners.get(self.user, set()).discard(self)
@@ -83,7 +83,7 @@ class Notifier(object):
     def __init__(self, hs):
         self.hs = hs
 
-        self.rooms_to_listeners = {}
+        self.room_to_listeners = {}
         self.user_to_listeners = {}
         self.appservice_to_listeners = {}
 
@@ -116,17 +116,17 @@ class Notifier(object):
 
         room_source = self.event_sources.sources["room"]
 
-        listeners = self.rooms_to_listeners.get(room_id, set()).copy()
+        listeners = self.room_to_listeners.get(room_id, set()).copy()
 
         for user in extra_users:
             listeners |= self.user_to_listeners.get(user, set()).copy()
 
         for appservice in self.appservice_to_listeners:
             # TODO (kegan): Redundant appservice listener checks?
-            # App services will already be in the rooms_to_listeners set, but
+            # App services will already be in the room_to_listeners set, but
             # that isn't enough. They need to be checked here in order to
             # receive *invites* for users they are interested in. Does this
-            # make the rooms_to_listeners check somewhat obselete?
+            # make the room_to_listeners check somewhat obselete?
             if appservice.is_interested(event):
                 listeners |= self.appservice_to_listeners.get(
                     appservice, set()
@@ -184,7 +184,7 @@ class Notifier(object):
             listeners |= self.user_to_listeners.get(user, set()).copy()
 
         for room in rooms:
-            listeners |= self.rooms_to_listeners.get(room, set()).copy()
+            listeners |= self.room_to_listeners.get(room, set()).copy()
 
         @defer.inlineCallbacks
         def notify(listener):
@@ -337,7 +337,7 @@ class Notifier(object):
     @log_function
     def _register_with_keys(self, listener):
         for room in listener.rooms:
-            s = self.rooms_to_listeners.setdefault(room, set())
+            s = self.room_to_listeners.setdefault(room, set())
             s.add(listener)
 
         self.user_to_listeners.setdefault(listener.user, set()).add(listener)
@@ -380,5 +380,5 @@ class Notifier(object):
     def _user_joined_room(self, user, room_id):
         new_listeners = self.user_to_listeners.get(user, set())
 
-        listeners = self.rooms_to_listeners.setdefault(room_id, set())
+        listeners = self.room_to_listeners.setdefault(room_id, set())
         listeners |= new_listeners
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index a45c673d32..f115f50e50 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -88,11 +88,15 @@ class LruCache(object):
             else:
                 return default
 
+        def cache_len():
+            return len(cache)
+
         self.sentinel = object()
         self.get = cache_get
         self.set = cache_set
         self.setdefault = cache_set_default
         self.pop = cache_pop
+        self.len = cache_len
 
     def __getitem__(self, key):
         result = self.get(key, self.sentinel)
@@ -108,3 +112,6 @@ class LruCache(object):
         result = self.pop(key, self.sentinel)
         if result is self.sentinel:
             raise KeyError()
+
+    def __len__(self):
+        return self.len()