summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--contrib/cmdclient/http.py16
-rw-r--r--synapse/federation/federation_client.py50
-rw-r--r--synapse/federation/transport/client.py40
-rw-r--r--synapse/handlers/federation.py34
-rw-r--r--synapse/handlers/room_member.py23
-rw-r--r--synapse/http/matrixfederationclient.py21
-rw-r--r--synapse/replication/slave/storage/events.py3
-rw-r--r--synapse/rest/key/v2/local_key_resource.py5
-rw-r--r--synapse/storage/receipts.py11
-rw-r--r--synapse/storage/state.py9
-rw-r--r--synapse/util/caches/__init__.py31
-rw-r--r--synapse/util/caches/descriptors.py39
12 files changed, 173 insertions, 109 deletions
diff --git a/contrib/cmdclient/http.py b/contrib/cmdclient/http.py
index 4186897316..c833f3f318 100644
--- a/contrib/cmdclient/http.py
+++ b/contrib/cmdclient/http.py
@@ -36,15 +36,13 @@ class HttpClient(object):
                 the request body. This will be encoded as JSON.
 
         Returns:
-            Deferred: Succeeds when we get *any* HTTP response.
-
-            The result of the deferred is a tuple of `(code, response)`,
-            where `response` is a dict representing the decoded JSON body.
+            Deferred: Succeeds when we get a 2xx HTTP response. The result
+            will be the decoded JSON body.
         """
         pass
 
     def get_json(self, url, args=None):
-        """ Get's some json from the given host homeserver and path
+        """ Gets some json from the given host homeserver and path
 
         Args:
             url (str): The URL to GET data from.
@@ -54,10 +52,8 @@ class HttpClient(object):
                 and *not* a string.
 
         Returns:
-            Deferred: Succeeds when we get *any* HTTP response.
-
-            The result of the deferred is a tuple of `(code, response)`,
-            where `response` is a dict representing the decoded JSON body.
+            Deferred: Succeeds when we get a 2xx HTTP response. The result
+            will be the decoded JSON body.
         """
         pass
 
@@ -214,4 +210,4 @@ class _JsonProducer(object):
         pass
 
     def stopProducing(self):
-        pass
\ No newline at end of file
+        pass
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index deee0f4904..861441708b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -474,8 +474,13 @@ class FederationClient(FederationBase):
             content (object): Any additional data to put into the content field
                 of the event.
         Return:
-            A tuple of (origin (str), event (object)) where origin is the remote
-            homeserver which generated the event.
+            Deferred: resolves to a tuple of (origin (str), event (object))
+            where origin is the remote homeserver which generated the event.
+
+            Fails with a ``CodeMessageException`` if the chosen remote server
+            returns a 300/400 code.
+
+            Fails with a ``RuntimeError`` if no servers were reachable.
         """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
@@ -528,6 +533,27 @@ class FederationClient(FederationBase):
 
     @defer.inlineCallbacks
     def send_join(self, destinations, pdu):
+        """Sends a join event to one of a list of homeservers.
+
+        Doing so will cause the remote server to add the event to the graph,
+        and send the event out to the rest of the federation.
+
+        Args:
+            destinations (str): Candidate homeservers which are probably
+                participating in the room.
+            pdu (BaseEvent): event to be sent
+
+        Return:
+            Deferred: resolves to a dict with members ``origin`` (a string
+            giving the serer the event was sent to, ``state`` (?) and
+            ``auth_chain``.
+
+            Fails with a ``CodeMessageException`` if the chosen remote server
+            returns a 300/400 code.
+
+            Fails with a ``RuntimeError`` if no servers were reachable.
+        """
+
         for destination in destinations:
             if destination == self.server_name:
                 continue
@@ -635,6 +661,26 @@ class FederationClient(FederationBase):
 
     @defer.inlineCallbacks
     def send_leave(self, destinations, pdu):
+        """Sends a leave event to one of a list of homeservers.
+
+        Doing so will cause the remote server to add the event to the graph,
+        and send the event out to the rest of the federation.
+
+        This is mostly useful to reject received invites.
+
+        Args:
+            destinations (str): Candidate homeservers which are probably
+                participating in the room.
+            pdu (BaseEvent): event to be sent
+
+        Return:
+            Deferred: resolves to None.
+
+            Fails with a ``CodeMessageException`` if the chosen remote server
+            returns a non-200 code.
+
+            Fails with a ``RuntimeError`` if no servers were reachable.
+        """
         for destination in destinations:
             if destination == self.server_name:
                 continue
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 15a03378f5..52b2a717d2 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -193,6 +193,26 @@ class TransportLayerClient(object):
     @defer.inlineCallbacks
     @log_function
     def make_membership_event(self, destination, room_id, user_id, membership):
+        """Asks a remote server to build and sign us a membership event
+
+        Note that this does not append any events to any graphs.
+
+        Args:
+            destination (str): address of remote homeserver
+            room_id (str): room to join/leave
+            user_id (str): user to be joined/left
+            membership (str): one of join/leave
+
+        Returns:
+            Deferred: Succeeds when we get a 2xx HTTP response. The result
+            will be the decoded JSON body (ie, the new event).
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response
+            code >= 300.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
+        """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
             raise RuntimeError(
@@ -201,11 +221,23 @@ class TransportLayerClient(object):
             )
         path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id)
 
+        ignore_backoff = False
+        retry_on_dns_fail = False
+
+        if membership == Membership.LEAVE:
+            # we particularly want to do our best to send leave events. The
+            # problem is that if it fails, we won't retry it later, so if the
+            # remote server was just having a momentary blip, the room will be
+            # out of sync.
+            ignore_backoff = True
+            retry_on_dns_fail = True
+
         content = yield self.client.get_json(
             destination=destination,
             path=path,
-            retry_on_dns_fail=False,
+            retry_on_dns_fail=retry_on_dns_fail,
             timeout=20000,
+            ignore_backoff=ignore_backoff,
         )
 
         defer.returnValue(content)
@@ -232,6 +264,12 @@ class TransportLayerClient(object):
             destination=destination,
             path=path,
             data=content,
+
+            # we want to do our best to send this through. The problem is
+            # that if it fails, we won't retry it later, so if the remote
+            # server was just having a momentary blip, the room will be out of
+            # sync.
+            ignore_backoff=True,
         )
 
         defer.returnValue(response)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2d9126dd86..52be5a402d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1090,19 +1090,13 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        try:
-            origin, event = yield self._make_and_verify_event(
-                target_hosts,
-                room_id,
-                user_id,
-                "leave"
-            )
-            event = self._sign_event(event)
-        except SynapseError:
-            raise
-        except CodeMessageException as e:
-            logger.warn("Failed to reject invite: %s", e)
-            raise SynapseError(500, "Failed to reject invite")
+        origin, event = yield self._make_and_verify_event(
+            target_hosts,
+            room_id,
+            user_id,
+            "leave"
+        )
+        event = self._sign_event(event)
 
         # Try the host that we succesfully called /make_leave/ on first for
         # the /send_leave/ request.
@@ -1112,16 +1106,10 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        try:
-            yield self.replication_layer.send_leave(
-                target_hosts,
-                event
-            )
-        except SynapseError:
-            raise
-        except CodeMessageException as e:
-            logger.warn("Failed to reject invite: %s", e)
-            raise SynapseError(500, "Failed to reject invite")
+        yield self.replication_layer.send_leave(
+            target_hosts,
+            event
+        )
 
         context = yield self.state_handler.compute_event_context(event)
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2052d6d05f..28b2c80a93 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -139,13 +139,6 @@ class RoomMemberHandler(BaseHandler):
         )
         yield user_joined_room(self.distributor, user, room_id)
 
-    def reject_remote_invite(self, user_id, room_id, remote_room_hosts):
-        return self.hs.get_handlers().federation_handler.do_remotely_reject_invite(
-            remote_room_hosts,
-            room_id,
-            user_id
-        )
-
     @defer.inlineCallbacks
     def update_membership(
             self,
@@ -286,13 +279,21 @@ class RoomMemberHandler(BaseHandler):
                 else:
                     # send the rejection to the inviter's HS.
                     remote_room_hosts = remote_room_hosts + [inviter.domain]
-
+                    fed_handler = self.hs.get_handlers().federation_handler
                     try:
-                        ret = yield self.reject_remote_invite(
-                            target.to_string(), room_id, remote_room_hosts
+                        ret = yield fed_handler.do_remotely_reject_invite(
+                            remote_room_hosts,
+                            room_id,
+                            target.to_string(),
                         )
                         defer.returnValue(ret)
-                    except SynapseError as e:
+                    except Exception as e:
+                        # if we were unable to reject the exception, just mark
+                        # it as rejected on our end and plough ahead.
+                        #
+                        # The 'except' clause is very broad, but we need to
+                        # capture everything from DNS failures upwards
+                        #
                         logger.warn("Failed to reject invite: %s", e)
 
                         yield self.store.locally_reject_invite(
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 62b4d7e93d..747a791f83 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -125,6 +125,8 @@ class MatrixFederationHttpClient(object):
                 code >= 300.
             Fails with ``NotRetryingDestination`` if we are not yet ready
                 to retry this server.
+            (May also fail with plenty of other Exceptions for things like DNS
+                failures, connection failures, SSL failures.)
         """
         limiter = yield synapse.util.retryutils.get_retry_limiter(
             destination,
@@ -302,8 +304,10 @@ class MatrixFederationHttpClient(object):
 
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
-            will be the decoded JSON body. On a 4xx or 5xx error response a
-            CodeMessageException is raised.
+            will be the decoded JSON body.
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response
+            code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
@@ -360,8 +364,10 @@ class MatrixFederationHttpClient(object):
                 try the request anyway.
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
-            will be the decoded JSON body. On a 4xx or 5xx error response a
-            CodeMessageException is raised.
+            will be the decoded JSON body.
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response
+            code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
@@ -410,10 +416,11 @@ class MatrixFederationHttpClient(object):
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
         Returns:
-            Deferred: Succeeds when we get *any* HTTP response.
+            Deferred: Succeeds when we get a 2xx HTTP response. The result
+            will be the decoded JSON body.
 
-            The result of the deferred is a tuple of `(code, response)`,
-            where `response` is a dict representing the decoded JSON body.
+            Fails with ``HTTPRequestException`` if we get an HTTP response
+            code >= 300.
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 4ca1e5aa8c..ab48ff925e 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -102,9 +102,6 @@ class SlavedEventStore(BaseSlavedStore):
     _get_state_groups_from_groups_txn = (
         DataStore._get_state_groups_from_groups_txn.__func__
     )
-    _get_state_group_from_group = (
-        StateStore.__dict__["_get_state_group_from_group"]
-    )
     get_recent_event_ids_for_room = (
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
diff --git a/synapse/rest/key/v2/local_key_resource.py b/synapse/rest/key/v2/local_key_resource.py
index ff95269ba8..be68d9a096 100644
--- a/synapse/rest/key/v2/local_key_resource.py
+++ b/synapse/rest/key/v2/local_key_resource.py
@@ -84,12 +84,11 @@ class LocalKey(Resource):
             }
 
         old_verify_keys = {}
-        for key in self.config.old_signing_keys:
-            key_id = "%s:%s" % (key.alg, key.version)
+        for key_id, key in self.config.old_signing_keys.items():
             verify_key_bytes = key.encode()
             old_verify_keys[key_id] = {
                 u"key": encode_base64(verify_key_bytes),
-                u"expired_ts": key.expired,
+                u"expired_ts": key.expired_ts,
             }
 
         tls_fingerprints = self.config.tls_fingerprints
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b0f8c2787..efb90c3c91 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore):
         # Returns an ObservableDeferred
         res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
 
-        if res and res.called and user_id in res.result:
-            # We'd only be adding to the set, so no point invalidating if the
-            # user is already there
-            return
+        if res:
+            if isinstance(res, defer.Deferred) and res.called:
+                res = res.result
+            if user_id in res:
+                # We'd only be adding to the set, so no point invalidating if the
+                # user is already there
+                return
 
         self.get_users_with_read_receipts_in_room.invalidate((room_id,))
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index acd69944c4..e89001d994 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -279,12 +279,7 @@ class StateStore(SQLBaseStore):
 
             return count
 
-    @cached(num_args=2, max_entries=100000, iterable=True)
-    def _get_state_group_from_group(self, group, types):
-        raise NotImplementedError()
-
-    @cachedList(cached_method_name="_get_state_group_from_group",
-                list_name="groups", num_args=2, inlineCallbacks=True)
+    @defer.inlineCallbacks
     def _get_state_groups_from_groups(self, groups, types):
         """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
         """
@@ -512,7 +507,7 @@ class StateStore(SQLBaseStore):
         state_map = yield self.get_state_ids_for_events([event_id], types)
         defer.returnValue(state_map[event_id])
 
-    @cached(num_args=2, max_entries=100000)
+    @cached(num_args=2, max_entries=50000)
     def _get_state_group_for_event(self, room_id, event_id):
         return self._simple_select_one_onecol(
             table="event_to_state_groups",
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 8a7774a88e..4a83c46d98 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -14,13 +14,10 @@
 # limitations under the License.
 
 import synapse.metrics
-from lrucache import LruCache
 import os
 
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
-DEBUG_CACHES = False
-
 metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
 
 caches_by_name = {}
@@ -40,10 +37,6 @@ def register_cache(name, cache):
     )
 
 
-_string_cache = LruCache(int(100000 * CACHE_SIZE_FACTOR))
-_stirng_cache_metrics = register_cache("string_cache", _string_cache)
-
-
 KNOWN_KEYS = {
     key: key for key in
     (
@@ -67,14 +60,16 @@ KNOWN_KEYS = {
 
 
 def intern_string(string):
-    """Takes a (potentially) unicode string and interns using custom cache
+    """Takes a (potentially) unicode string and interns it if it's ascii
     """
-    new_str = _string_cache.setdefault(string, string)
-    if new_str is string:
-        _stirng_cache_metrics.inc_hits()
-    else:
-        _stirng_cache_metrics.inc_misses()
-    return new_str
+    if string is None:
+        return None
+
+    try:
+        string = string.encode("ascii")
+        return intern(string)
+    except UnicodeEncodeError:
+        return string
 
 
 def intern_dict(dictionary):
@@ -87,13 +82,9 @@ def intern_dict(dictionary):
 
 
 def _intern_known_values(key, value):
-    intern_str_keys = ("event_id", "room_id")
-    intern_unicode_keys = ("sender", "user_id", "type", "state_key")
-
-    if key in intern_str_keys:
-        return intern(value.encode('ascii'))
+    intern_keys = ("event_id", "room_id", "sender", "user_id", "type", "state_key",)
 
-    if key in intern_unicode_keys:
+    if key in intern_keys:
         return intern_string(value)
 
     return value
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 9d0d0be1f9..807e147657 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 
-from . import DEBUG_CACHES, register_cache
+from . import register_cache
 
 from twisted.internet import defer
 from collections import namedtuple
@@ -76,7 +76,7 @@ class Cache(object):
 
         self.cache = LruCache(
             max_size=max_entries, keylen=keylen, cache_type=cache_type,
-            size_callback=(lambda d: len(d.result)) if iterable else None,
+            size_callback=(lambda d: len(d)) if iterable else None,
         )
 
         self.name = name
@@ -96,6 +96,17 @@ class Cache(object):
                 )
 
     def get(self, key, default=_CacheSentinel, callback=None):
+        """Looks the key up in the caches.
+
+        Args:
+            key(tuple)
+            default: What is returned if key is not in the caches. If not
+                specified then function throws KeyError instead
+            callback(fn): Gets called when the entry in the cache is invalidated
+
+        Returns:
+            Either a Deferred or the raw result
+        """
         callbacks = [callback] if callback else []
         val = self._pending_deferred_cache.get(key, _CacheSentinel)
         if val is not _CacheSentinel:
@@ -137,7 +148,7 @@ class Cache(object):
             if self.sequence == entry.sequence:
                 existing_entry = self._pending_deferred_cache.pop(key, None)
                 if existing_entry is entry:
-                    self.cache.set(key, entry.deferred, entry.callbacks)
+                    self.cache.set(key, result, entry.callbacks)
                 else:
                     entry.invalidate()
             else:
@@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase):
             try:
                 cached_result_d = cache.get(cache_key, callback=invalidate_callback)
 
-                observer = cached_result_d.observe()
-                if DEBUG_CACHES:
-                    @defer.inlineCallbacks
-                    def check_result(cached_result):
-                        actual_result = yield self.function_to_call(obj, *args, **kwargs)
-                        if actual_result != cached_result:
-                            logger.error(
-                                "Stale cache entry %s%r: cached: %r, actual %r",
-                                self.orig.__name__, cache_key,
-                                cached_result, actual_result,
-                            )
-                            raise ValueError("Stale cache entry")
-                        defer.returnValue(cached_result)
-                    observer.addCallback(check_result)
+                if isinstance(cached_result_d, ObservableDeferred):
+                    observer = cached_result_d.observe()
+                else:
+                    observer = cached_result_d
 
             except KeyError:
                 ret = defer.maybeDeferred(
@@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
 
                 try:
                     res = cache.get(tuple(key), callback=invalidate_callback)
-                    if not res.has_succeeded():
+                    if not isinstance(res, ObservableDeferred):
+                        results[arg] = res
+                    elif not res.has_succeeded():
                         res = res.observe()
                         res.addCallback(lambda r, arg: (arg, r), arg)
                         cached_defers[arg] = res