summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/urls.py1
-rw-r--r--synapse/app/_base.py20
-rw-r--r--synapse/config/user_directory.py6
-rw-r--r--synapse/crypto/keyring.py144
-rw-r--r--synapse/federation/transport/server.py31
-rw-r--r--synapse/handlers/presence.py99
-rw-r--r--synapse/http/matrixfederationclient.py4
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/rest/admin/__init__.py12
-rw-r--r--synapse/rest/client/v1/login.py4
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py4
-rw-r--r--synapse/storage/events.py82
-rw-r--r--synapse/storage/events_worker.py50
-rw-r--r--synapse/storage/schema/delta/54/account_validity_with_renewal.sql (renamed from synapse/storage/schema/delta/54/account_validity.sql)3
-rw-r--r--synapse/storage/stats.py2
-rw-r--r--synapse/util/logcontext.py22
16 files changed, 349 insertions, 137 deletions
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 3c6bddff7a..e16c386a14 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
 FEDERATION_PREFIX = "/_matrix/federation"
 FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
 FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
+FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
 STATIC_PREFIX = "/_matrix/static"
 WEB_CLIENT_PREFIX = "/_matrix/client"
 CONTENT_REPO_PREFIX = "/_matrix/content"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 08199a5e8d..8cc990399f 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -344,15 +344,21 @@ class _LimitedHostnameResolver(object):
 
     def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
                         addressTypes=None, transportSemantics='TCP'):
-        # Note this is happening deep within the reactor, so we don't need to
-        # worry about log contexts.
-
         # We need this function to return `resolutionReceiver` so we do all the
         # actual logic involving deferreds in a separate function.
-        self._resolve(
-            resolutionReceiver, hostName, portNumber,
-            addressTypes, transportSemantics,
-        )
+
+        # even though this is happening within the depths of twisted, we need to drop
+        # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
+        # the logcontext if it returns an incomplete deferred; (b) _resolve will
+        # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
+        with PreserveLoggingContext():
+            self._resolve(
+                resolutionReceiver,
+                hostName,
+                portNumber,
+                addressTypes,
+                transportSemantics,
+            )
 
         return resolutionReceiver
 
diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py
index 142754a7dc..023997ccde 100644
--- a/synapse/config/user_directory.py
+++ b/synapse/config/user_directory.py
@@ -43,9 +43,9 @@ class UserDirectoryConfig(Config):
         #
         # 'search_all_users' defines whether to search all users visible to your HS
         # when searching the user directory, rather than limiting to users visible
-        # in public rooms.  Defaults to false.  If you set it True, you'll have to run
-        # UPDATE user_directory_stream_pos SET stream_id = NULL;
-        # on your database to tell it to rebuild the user_directory search indexes.
+        # in public rooms.  Defaults to false.  If you set it True, you'll have to
+        # rebuild the user_directory search indexes, see
+        # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md
         #
         #user_directory:
         #  enabled: true
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index eaf41b983c..c63f106cf3 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -17,6 +17,7 @@
 import logging
 from collections import namedtuple
 
+import six
 from six import raise_from
 from six.moves import urllib
 
@@ -180,9 +181,7 @@ class Keyring(object):
 
             # We want to wait for any previous lookups to complete before
             # proceeding.
-            yield self.wait_for_previous_lookups(
-                [rq.server_name for rq in verify_requests], server_to_deferred
-            )
+            yield self.wait_for_previous_lookups(server_to_deferred)
 
             # Actually start fetching keys.
             self._get_server_verify_keys(verify_requests)
@@ -215,12 +214,11 @@ class Keyring(object):
             logger.exception("Error starting key lookups")
 
     @defer.inlineCallbacks
-    def wait_for_previous_lookups(self, server_names, server_to_deferred):
+    def wait_for_previous_lookups(self, server_to_deferred):
         """Waits for any previous key lookups for the given servers to finish.
 
         Args:
-            server_names (list): list of server_names we want to lookup
-            server_to_deferred (dict): server_name to deferred which gets
+            server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
                 resolved once we've finished looking up keys for that server.
                 The Deferreds should be regular twisted ones which call their
                 callbacks with no logcontext.
@@ -233,7 +231,7 @@ class Keyring(object):
         while True:
             wait_on = [
                 (server_name, self.key_downloads[server_name])
-                for server_name in server_names
+                for server_name in server_to_deferred.keys()
                 if server_name in self.key_downloads
             ]
             if not wait_on:
@@ -349,6 +347,7 @@ class KeyFetcher(object):
         Args:
             server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]):
                 list of (server_name, iterable[key_id]) tuples to fetch keys for
+                Note that the iterables may be iterated more than once.
 
         Returns:
             Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
@@ -394,8 +393,7 @@ class BaseV2KeyFetcher(object):
         POST /_matrix/key/v2/query.
 
         Checks that each signature in the response that claims to come from the origin
-        server is valid. (Does not check that there actually is such a signature, for
-        some reason.)
+        server is valid, and that there is at least one such signature.
 
         Stores the json in server_keys_json so that it can be used for future responses
         to /_matrix/key/v2/query.
@@ -430,16 +428,25 @@ class BaseV2KeyFetcher(object):
                     verify_key=verify_key, valid_until_ts=ts_valid_until_ms
                 )
 
-        # TODO: improve this signature checking
         server_name = response_json["server_name"]
+        verified = False
         for key_id in response_json["signatures"].get(server_name, {}):
-            if key_id not in verify_keys:
+            # each of the keys used for the signature must be present in the response
+            # json.
+            key = verify_keys.get(key_id)
+            if not key:
                 raise KeyLookupError(
-                    "Key response must include verification keys for all signatures"
+                    "Key response is signed by key id %s:%s but that key is not "
+                    "present in the response" % (server_name, key_id)
                 )
 
-            verify_signed_json(
-                response_json, server_name, verify_keys[key_id].verify_key
+            verify_signed_json(response_json, server_name, key.verify_key)
+            verified = True
+
+        if not verified:
+            raise KeyLookupError(
+                "Key response for %s is not signed by the origin server"
+                % (server_name,)
             )
 
         for key_id, key_data in response_json["old_verify_keys"].items():
@@ -549,7 +556,16 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
         Returns:
             Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map
                 from server_name -> key_id -> FetchKeyResult
+
+        Raises:
+            KeyLookupError if there was an error processing the entire response from
+                the server
         """
+        logger.info(
+            "Requesting keys %s from notary server %s",
+            server_names_and_key_ids,
+            perspective_name,
+        )
         # TODO(mark): Set the minimum_valid_until_ts to that needed by
         # the events being validated or the current time if validating
         # an incoming request.
@@ -578,40 +594,31 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
         time_now_ms = self.clock.time_msec()
 
         for response in query_response["server_keys"]:
-            if (
-                u"signatures" not in response
-                or perspective_name not in response[u"signatures"]
-            ):
+            # do this first, so that we can give useful errors thereafter
+            server_name = response.get("server_name")
+            if not isinstance(server_name, six.string_types):
                 raise KeyLookupError(
-                    "Key response not signed by perspective server"
-                    " %r" % (perspective_name,)
+                    "Malformed response from key notary server %s: invalid server_name"
+                    % (perspective_name,)
                 )
 
-            verified = False
-            for key_id in response[u"signatures"][perspective_name]:
-                if key_id in perspective_keys:
-                    verify_signed_json(
-                        response, perspective_name, perspective_keys[key_id]
-                    )
-                    verified = True
-
-            if not verified:
-                logging.info(
-                    "Response from perspective server %r not signed with a"
-                    " known key, signed with: %r, known keys: %r",
+            try:
+                processed_response = yield self._process_perspectives_response(
                     perspective_name,
-                    list(response[u"signatures"][perspective_name]),
-                    list(perspective_keys),
+                    perspective_keys,
+                    response,
+                    time_added_ms=time_now_ms,
                 )
-                raise KeyLookupError(
-                    "Response not signed with a known key for perspective"
-                    " server %r" % (perspective_name,)
+            except KeyLookupError as e:
+                logger.warning(
+                    "Error processing response from key notary server %s for origin "
+                    "server %s: %s",
+                    perspective_name,
+                    server_name,
+                    e,
                 )
-
-            processed_response = yield self.process_v2_response(
-                perspective_name, response, time_added_ms=time_now_ms
-            )
-            server_name = response["server_name"]
+                # we continue to process the rest of the response
+                continue
 
             added_keys.extend(
                 (server_name, key_id, key) for key_id, key in processed_response.items()
@@ -624,6 +631,53 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
 
         defer.returnValue(keys)
 
+    def _process_perspectives_response(
+        self, perspective_name, perspective_keys, response, time_added_ms
+    ):
+        """Parse a 'Server Keys' structure from the result of a /key/query request
+
+        Checks that the entry is correctly signed by the perspectives server, and then
+        passes over to process_v2_response
+
+        Args:
+            perspective_name (str): the name of the notary server that produced this
+                result
+
+            perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
+                notary server
+
+            response (dict): the json-decoded Server Keys response object
+
+            time_added_ms (int): the timestamp to record in server_keys_json
+
+        Returns:
+            Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
+        """
+        if (
+            u"signatures" not in response
+            or perspective_name not in response[u"signatures"]
+        ):
+            raise KeyLookupError("Response not signed by the notary server")
+
+        verified = False
+        for key_id in response[u"signatures"][perspective_name]:
+            if key_id in perspective_keys:
+                verify_signed_json(response, perspective_name, perspective_keys[key_id])
+                verified = True
+
+        if not verified:
+            raise KeyLookupError(
+                "Response not signed with a known key: signed with: %r, known keys: %r"
+                % (
+                    list(response[u"signatures"][perspective_name].keys()),
+                    list(perspective_keys.keys()),
+                )
+            )
+
+        return self.process_v2_response(
+            perspective_name, response, time_added_ms=time_added_ms
+        )
+
 
 class ServerKeyFetcher(BaseV2KeyFetcher):
     """KeyFetcher impl which fetches keys from the origin servers"""
@@ -677,12 +731,6 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
             except HttpResponseException as e:
                 raise_from(KeyLookupError("Remote server returned an error"), e)
 
-            if (
-                u"signatures" not in response
-                or server_name not in response[u"signatures"]
-            ):
-                raise KeyLookupError("Key response not signed by remote server")
-
             if response["server_name"] != server_name:
                 raise KeyLookupError(
                     "Expected a response for server %r not %r"
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 385eda2dca..d0efc4e0d3 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer
 import synapse
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+    FEDERATION_UNSTABLE_PREFIX,
+    FEDERATION_V1_PREFIX,
+    FEDERATION_V2_PREFIX,
+)
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
@@ -1304,6 +1308,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class RoomComplexityServlet(BaseFederationServlet):
+    """
+    Indicates to other servers how complex (and therefore likely
+    resource-intensive) a public room this server knows about is.
+    """
+    PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
+    PREFIX = FEDERATION_UNSTABLE_PREFIX
+
+    @defer.inlineCallbacks
+    def on_GET(self, origin, content, query, room_id):
+
+        store = self.handler.hs.get_datastore()
+
+        is_public = yield store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
+
+        complexity = yield store.get_room_complexity(room_id)
+        defer.returnValue((200, complexity))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationEventServlet,
@@ -1327,6 +1355,7 @@ FEDERATION_SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     FederationVersionServlet,
+    RoomComplexityServlet,
 )
 
 OPENID_SERVLET_CLASSES = (
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59d53f1050..6209858bbb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -182,17 +182,27 @@ class PresenceHandler(object):
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
+        def run_timeout_handler():
+            return run_as_background_process(
+                "handle_presence_timeouts", self._handle_timeouts
+            )
+
         self.clock.call_later(
             30,
             self.clock.looping_call,
-            self._handle_timeouts,
+            run_timeout_handler,
             5000,
         )
 
+        def run_persister():
+            return run_as_background_process(
+                "persist_presence_changes", self._persist_unpersisted_changes
+            )
+
         self.clock.call_later(
             60,
             self.clock.looping_call,
-            self._persist_unpersisted_changes,
+            run_persister,
             60 * 1000,
         )
 
@@ -229,6 +239,7 @@ class PresenceHandler(object):
         )
 
         if self.unpersisted_users_changes:
+
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in self.unpersisted_users_changes
@@ -240,30 +251,18 @@ class PresenceHandler(object):
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
-        logger.info(
-            "Performing _persist_unpersisted_changes. Persisting %d unpersisted changes",
-            len(self.unpersisted_users_changes)
-        )
-
         unpersisted = self.unpersisted_users_changes
         self.unpersisted_users_changes = set()
 
         if unpersisted:
+            logger.info(
+                "Persisting %d upersisted presence updates", len(unpersisted)
+            )
             yield self.store.update_presence([
                 self.user_to_current_state[user_id]
                 for user_id in unpersisted
             ])
 
-        logger.info("Finished _persist_unpersisted_changes")
-
-    @defer.inlineCallbacks
-    def _update_states_and_catch_exception(self, new_states):
-        try:
-            res = yield self._update_states(new_states)
-            defer.returnValue(res)
-        except Exception:
-            logger.exception("Error updating presence")
-
     @defer.inlineCallbacks
     def _update_states(self, new_states):
         """Updates presence of users. Sets the appropriate timeouts. Pokes
@@ -338,45 +337,41 @@ class PresenceHandler(object):
         logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        try:
-            with Measure(self.clock, "presence_handle_timeouts"):
-                # Fetch the list of users that *may* have timed out. Things may have
-                # changed since the timeout was set, so we won't necessarily have to
-                # take any action.
-                users_to_check = set(self.wheel_timer.fetch(now))
-
-                # Check whether the lists of syncing processes from an external
-                # process have expired.
-                expired_process_ids = [
-                    process_id for process_id, last_update
-                    in self.external_process_last_updated_ms.items()
-                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
-                ]
-                for process_id in expired_process_ids:
-                    users_to_check.update(
-                        self.external_process_last_updated_ms.pop(process_id, ())
-                    )
-                    self.external_process_last_update.pop(process_id)
+        # Fetch the list of users that *may* have timed out. Things may have
+        # changed since the timeout was set, so we won't necessarily have to
+        # take any action.
+        users_to_check = set(self.wheel_timer.fetch(now))
+
+        # Check whether the lists of syncing processes from an external
+        # process have expired.
+        expired_process_ids = [
+            process_id for process_id, last_update
+            in self.external_process_last_updated_ms.items()
+            if now - last_update > EXTERNAL_PROCESS_EXPIRY
+        ]
+        for process_id in expired_process_ids:
+            users_to_check.update(
+                self.external_process_last_updated_ms.pop(process_id, ())
+            )
+            self.external_process_last_update.pop(process_id)
 
-                states = [
-                    self.user_to_current_state.get(
-                        user_id, UserPresenceState.default(user_id)
-                    )
-                    for user_id in users_to_check
-                ]
+        states = [
+            self.user_to_current_state.get(
+                user_id, UserPresenceState.default(user_id)
+            )
+            for user_id in users_to_check
+        ]
 
-                timers_fired_counter.inc(len(states))
+        timers_fired_counter.inc(len(states))
 
-                changes = handle_timeouts(
-                    states,
-                    is_mine_fn=self.is_mine_id,
-                    syncing_user_ids=self.get_currently_syncing_users(),
-                    now=now,
-                )
+        changes = handle_timeouts(
+            states,
+            is_mine_fn=self.is_mine_id,
+            syncing_user_ids=self.get_currently_syncing_users(),
+            now=now,
+        )
 
-            run_in_background(self._update_states_and_catch_exception, changes)
-        except Exception:
-            logger.exception("Exception in _handle_timeouts loop")
+        return self._update_states(changes)
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 7eefc7b1fc..8197619a78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -711,10 +711,6 @@ class MatrixFederationHttpClient(object):
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
-        logger.debug("get_json args: %s", args)
-
-        logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
-
         request = MatrixFederationRequest(
             method="GET",
             destination=destination,
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 528125e737..197c652850 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -55,7 +55,7 @@ def parse_integer_from_args(args, name, default=None, required=False):
             return int(args[name][0])
         except Exception:
             message = "Query parameter %r must be an integer" % (name,)
-            raise SynapseError(400, message)
+            raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
     else:
         if required:
             message = "Missing integer query parameter %r" % (name,)
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 744d85594f..d6c4dcdb18 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
 
     def __init__(self, hs):
         JsonResource.__init__(self, hs, canonical_json=False)
+        register_servlets(hs, self)
 
-        register_servlets_for_client_rest_resource(hs, self)
-        SendServerNoticeServlet(hs).register(self)
-        VersionServlet(hs).register(self)
+
+def register_servlets(hs, http_server):
+    """
+    Register all the admin servlets.
+    """
+    register_servlets_for_client_rest_resource(hs, http_server)
+    SendServerNoticeServlet(hs).register(http_server)
+    VersionServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 5180e9eaf1..029039c162 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -386,7 +386,7 @@ class CasRedirectServlet(RestServlet):
             b"redirectUrl": args[b"redirectUrl"][0]
         }).encode('ascii')
         hs_redirect_url = (self.cas_service_url +
-                           b"/_matrix/client/api/v1/login/cas/ticket")
+                           b"/_matrix/client/r0/login/cas/ticket")
         service_param = urllib.parse.urlencode({
             b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)
         }).encode('ascii')
@@ -395,7 +395,7 @@ class CasRedirectServlet(RestServlet):
 
 
 class CasTicketServlet(ClientV1RestServlet):
-    PATTERNS = client_path_patterns("/login/cas/ticket", releases=())
+    PATTERNS = client_path_patterns("/login/cas/ticket")
 
     def __init__(self, hs):
         super(CasTicketServlet, self).__init__(hs)
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 5305e9175f..35a750923b 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -56,8 +56,8 @@ class ThumbnailResource(Resource):
     def _async_render_GET(self, request):
         set_cors_headers(request)
         server_name, media_id, _ = parse_media_id(request)
-        width = parse_integer(request, "width")
-        height = parse_integer(request, "height")
+        width = parse_integer(request, "width", required=True)
+        height = parse_integer(request, "height", required=True)
         method = parse_string(request, "method", "scale")
         m_type = parse_string(request, "type", "image/png")
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2ffc27ff41..6e9f3d1dc0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -554,10 +554,18 @@ class EventsStore(
             e_id for event in new_events for e_id in event.prev_event_ids()
         )
 
-        # Finally, remove any events which are prev_events of any existing events.
+        # Remove any events which are prev_events of any existing events.
         existing_prevs = yield self._get_events_which_are_prevs(result)
         result.difference_update(existing_prevs)
 
+        # Finally handle the case where the new events have soft-failed prev
+        # events. If they do we need to remove them and their prev events,
+        # otherwise we end up with dangling extremities.
+        existing_prevs = yield self._get_prevs_before_rejected(
+            e_id for event in new_events for e_id in event.prev_event_ids()
+        )
+        result.difference_update(existing_prevs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -573,7 +581,7 @@ class EventsStore(
         """
         results = []
 
-        def _get_events(txn, batch):
+        def _get_events_which_are_prevs_txn(txn, batch):
             sql = """
             SELECT prev_event_id, internal_metadata
             FROM event_edges
@@ -596,11 +604,79 @@ class EventsStore(
             )
 
         for chunk in batch_iter(event_ids, 100):
-            yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
+            yield self.runInteraction(
+                "_get_events_which_are_prevs",
+                _get_events_which_are_prevs_txn,
+                chunk,
+            )
 
         defer.returnValue(results)
 
     @defer.inlineCallbacks
+    def _get_prevs_before_rejected(self, event_ids):
+        """Get soft-failed ancestors to remove from the extremities.
+
+        Given a set of events, find all those that have been soft-failed or
+        rejected. Returns those soft failed/rejected events and their prev
+        events (whether soft-failed/rejected or not), and recurses up the
+        prev-event graph until it finds no more soft-failed/rejected events.
+
+        This is used to find extremities that are ancestors of new events, but
+        are separated by soft failed events.
+
+        Args:
+            event_ids (Iterable[str]): Events to find prev events for. Note
+                that these must have already been persisted.
+
+        Returns:
+            Deferred[set[str]]
+        """
+
+        # The set of event_ids to return. This includes all soft-failed events
+        # and their prev events.
+        existing_prevs = set()
+
+        def _get_prevs_before_rejected_txn(txn, batch):
+            to_recursively_check = batch
+
+            while to_recursively_check:
+                sql = """
+                SELECT
+                    event_id, prev_event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL
+                FROM event_edges
+                    INNER JOIN events USING (event_id)
+                    LEFT JOIN rejections USING (event_id)
+                    LEFT JOIN event_json USING (event_id)
+                WHERE
+                    event_id IN (%s)
+                    AND NOT events.outlier
+                """ % (
+                    ",".join("?" for _ in to_recursively_check),
+                )
+
+                txn.execute(sql, to_recursively_check)
+                to_recursively_check = []
+
+                for event_id, prev_event_id, metadata, rejected in txn:
+                    if prev_event_id in existing_prevs:
+                        continue
+
+                    soft_failed = json.loads(metadata).get("soft_failed")
+                    if soft_failed or rejected:
+                        to_recursively_check.append(prev_event_id)
+                        existing_prevs.add(prev_event_id)
+
+        for chunk in batch_iter(event_ids, 100):
+            yield self.runInteraction(
+                "_get_prevs_before_rejected",
+                _get_prevs_before_rejected_txn,
+                chunk,
+            )
+
+        defer.returnValue(existing_prevs)
+
+    @defer.inlineCallbacks
     def _get_new_state_after_events(
         self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
     ):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 21b353cad3..b56c83e460 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import division
+
 import itertools
 import logging
 from collections import namedtuple
@@ -614,7 +616,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     def _get_total_state_event_counts_txn(self, txn, room_id):
         """
-        See get_state_event_counts.
+        See get_total_state_event_counts.
         """
         sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
         txn.execute(sql, (room_id,))
@@ -635,3 +637,49 @@ class EventsWorkerStore(SQLBaseStore):
             "get_total_state_event_counts",
             self._get_total_state_event_counts_txn, room_id
         )
+
+    def _get_current_state_event_counts_txn(self, txn, room_id):
+        """
+        See get_current_state_event_counts.
+        """
+        sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        return row[0] if row else 0
+
+    def get_current_state_event_counts(self, room_id):
+        """
+        Gets the current number of state events in a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_current_state_event_counts",
+            self._get_current_state_event_counts_txn, room_id
+        )
+
+    @defer.inlineCallbacks
+    def get_room_complexity(self, room_id):
+        """
+        Get a rough approximation of the complexity of the room. This is used by
+        remote servers to decide whether they wish to join the room or not.
+        Higher complexity value indicates that being in the room will consume
+        more resources.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[dict[str:int]] of complexity version to complexity.
+        """
+        state_events = yield self.get_current_state_event_counts(room_id)
+
+        # Call this one "v1", so we can introduce new ones as we want to develop
+        # it.
+        complexity_v1 = round(state_events / 500, 2)
+
+        defer.returnValue({"v1": complexity_v1})
diff --git a/synapse/storage/schema/delta/54/account_validity.sql b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
index 2357626000..0adb2ad55e 100644
--- a/synapse/storage/schema/delta/54/account_validity.sql
+++ b/synapse/storage/schema/delta/54/account_validity_with_renewal.sql
@@ -13,6 +13,9 @@
  * limitations under the License.
  */
 
+-- We previously changed the schema for this table without renaming the file, which means
+-- that some databases might still be using the old schema. This ensures Synapse uses the
+-- right schema for the table.
 DROP TABLE IF EXISTS account_validity;
 
 -- Track what users are in public rooms.
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 71b80a891d..eb0ced5b5e 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -169,7 +169,7 @@ class StatsStore(StateDeltasStore):
 
         logger.info(
             "Processing the next %d rooms of %d remaining",
-            (len(rooms_to_work_on), progress["remaining"]),
+            len(rooms_to_work_on), progress["remaining"],
         )
 
         # Number of state events we've processed by going through each room
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 311b49e18a..fe412355d8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -226,6 +226,8 @@ class LoggingContext(object):
             self.request = request
 
     def __str__(self):
+        if self.request:
+            return str(self.request)
         return "%s@%x" % (self.name, id(self))
 
     @classmethod
@@ -274,12 +276,10 @@ class LoggingContext(object):
         current = self.set_current_context(self.previous_context)
         if current is not self:
             if current is self.sentinel:
-                logger.warn("Expected logging context %s has been lost", self)
+                logger.warning("Expected logging context %s was lost", self)
             else:
-                logger.warn(
-                    "Current logging context %s is not expected context %s",
-                    current,
-                    self
+                logger.warning(
+                    "Expected logging context %s but found %s", self, current
                 )
         self.previous_context = None
         self.alive = False
@@ -433,10 +433,14 @@ class PreserveLoggingContext(object):
         context = LoggingContext.set_current_context(self.current_context)
 
         if context != self.new_context:
-            logger.warn(
-                "Unexpected logging context: %s is not %s",
-                context, self.new_context,
-            )
+            if context is LoggingContext.sentinel:
+                logger.warning("Expected logging context %s was lost", self.new_context)
+            else:
+                logger.warning(
+                    "Expected logging context %s but found %s",
+                    self.new_context,
+                    context,
+                )
 
         if self.current_context is not LoggingContext.sentinel:
             if not self.current_context.alive: