summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md12
-rw-r--r--changelog.d/5216.misc1
-rw-r--r--changelog.d/5256.bugfix1
-rw-r--r--changelog.d/5258.bugfix1
-rw-r--r--changelog.d/5274.bugfix1
-rw-r--r--changelog.d/5278.bugfix1
-rw-r--r--changelog.d/5283.misc1
-rw-r--r--changelog.d/5286.feature1
-rw-r--r--changelog.d/5287.misc1
-rw-r--r--changelog.d/5288.misc1
-rw-r--r--changelog.d/5291.bugfix1
-rw-r--r--changelog.d/5293.bugfix1
-rw-r--r--changelog.d/5294.bugfix1
-rw-r--r--changelog.d/5303.misc1
-rw-r--r--debian/changelog6
-rw-r--r--docs/CAPTCHA_SETUP.rst1
-rw-r--r--docs/admin_api/user_admin_api.rst2
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/urls.py1
-rw-r--r--synapse/federation/transport/server.py31
-rw-r--r--synapse/http/matrixfederationclient.py4
-rw-r--r--synapse/rest/admin/__init__.py12
-rw-r--r--synapse/rest/client/v1/login.py4
-rw-r--r--synapse/rest/client/v1/logout.py27
-rw-r--r--synapse/rest/client/v1/room.py2
-rw-r--r--synapse/rest/media/v1/thumbnail_resource.py4
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py12
-rw-r--r--synapse/storage/events.py262
-rw-r--r--synapse/storage/events_bg_updates.py401
-rw-r--r--synapse/storage/events_worker.py57
-rw-r--r--synapse/storage/roommember.py33
-rw-r--r--synapse/storage/schema/delta/54/delete_forward_extremities.sql23
-rw-r--r--synapse/storage/schema/delta/54/stats2.sql28
-rw-r--r--synapse/storage/stats.py130
-rw-r--r--synapse/util/logcontext.py22
-rw-r--r--tests/federation/test_complexity.py90
-rw-r--r--tests/storage/test_cleanup_extrems.py248
38 files changed, 1114 insertions, 315 deletions
diff --git a/CHANGES.md b/CHANGES.md
index 6bdfdd6d70..0ffdf1aaef 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,8 +1,16 @@
-Synapse 0.99.5.1 (2019-05-22)
+Synapse 0.99.5.2 (2019-05-30)
 =============================
 
-No significant changes.
+Bugfixes
+--------
+
+- Fix bug where we leaked extremities when we soft failed events, leading to performance degradation. ([\#5274](https://github.com/matrix-org/synapse/issues/5274), [\#5278](https://github.com/matrix-org/synapse/issues/5278), [\#5291](https://github.com/matrix-org/synapse/issues/5291))
+
+
+Synapse 0.99.5.1 (2019-05-22)
+=============================
 
+0.99.5.1 supersedes 0.99.5 due to malformed debian changelog - no functional changes.
 
 Synapse 0.99.5 (2019-05-22)
 ===========================
diff --git a/changelog.d/5216.misc b/changelog.d/5216.misc
new file mode 100644
index 0000000000..dbfa29475f
--- /dev/null
+++ b/changelog.d/5216.misc
@@ -0,0 +1 @@
+Synapse will now serve the experimental "room complexity" API endpoint.
diff --git a/changelog.d/5256.bugfix b/changelog.d/5256.bugfix
new file mode 100644
index 0000000000..86316ab5dd
--- /dev/null
+++ b/changelog.d/5256.bugfix
@@ -0,0 +1 @@
+Show the correct error when logging out and access token is missing.
diff --git a/changelog.d/5258.bugfix b/changelog.d/5258.bugfix
new file mode 100644
index 0000000000..fb5d44aedb
--- /dev/null
+++ b/changelog.d/5258.bugfix
@@ -0,0 +1 @@
+Fix error when downloading thumbnail with missing width/height parameter.
diff --git a/changelog.d/5274.bugfix b/changelog.d/5274.bugfix
new file mode 100644
index 0000000000..9e14d20289
--- /dev/null
+++ b/changelog.d/5274.bugfix
@@ -0,0 +1 @@
+Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
diff --git a/changelog.d/5278.bugfix b/changelog.d/5278.bugfix
new file mode 100644
index 0000000000..9e14d20289
--- /dev/null
+++ b/changelog.d/5278.bugfix
@@ -0,0 +1 @@
+Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
diff --git a/changelog.d/5283.misc b/changelog.d/5283.misc
new file mode 100644
index 0000000000..002721e566
--- /dev/null
+++ b/changelog.d/5283.misc
@@ -0,0 +1 @@
+Specify the type of reCAPTCHA key to use.
diff --git a/changelog.d/5286.feature b/changelog.d/5286.feature
new file mode 100644
index 0000000000..81860279a3
--- /dev/null
+++ b/changelog.d/5286.feature
@@ -0,0 +1 @@
+CAS login will now hit the r0 API, not the deprecated v1 one.
diff --git a/changelog.d/5287.misc b/changelog.d/5287.misc
new file mode 100644
index 0000000000..1286f1dd08
--- /dev/null
+++ b/changelog.d/5287.misc
@@ -0,0 +1 @@
+Remove spurious debug from MatrixFederationHttpClient.get_json.
diff --git a/changelog.d/5288.misc b/changelog.d/5288.misc
new file mode 100644
index 0000000000..fbf049ba6a
--- /dev/null
+++ b/changelog.d/5288.misc
@@ -0,0 +1 @@
+Improve logging for logcontext leaks.
diff --git a/changelog.d/5291.bugfix b/changelog.d/5291.bugfix
new file mode 100644
index 0000000000..9e14d20289
--- /dev/null
+++ b/changelog.d/5291.bugfix
@@ -0,0 +1 @@
+Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.
diff --git a/changelog.d/5293.bugfix b/changelog.d/5293.bugfix
new file mode 100644
index 0000000000..aa519a8433
--- /dev/null
+++ b/changelog.d/5293.bugfix
@@ -0,0 +1 @@
+Fix a bug where it is not possible to get events in the federation format with the request `GET /_matrix/client/r0/rooms/{roomId}/messages`.
diff --git a/changelog.d/5294.bugfix b/changelog.d/5294.bugfix
new file mode 100644
index 0000000000..5924bda319
--- /dev/null
+++ b/changelog.d/5294.bugfix
@@ -0,0 +1 @@
+Fix performance problems with the rooms stats background update.
diff --git a/changelog.d/5303.misc b/changelog.d/5303.misc
new file mode 100644
index 0000000000..f6a7f1f8e3
--- /dev/null
+++ b/changelog.d/5303.misc
@@ -0,0 +1 @@
+Clarify that the admin change password API logs the user out.
diff --git a/debian/changelog b/debian/changelog
index 90c6b86c5b..6a1a72c0e3 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+matrix-synapse-py3 (0.99.5.2) stable; urgency=medium
+
+  * New synapse release 0.99.5.2.
+
+ -- Synapse Packaging team <packages@matrix.org>  Thu, 30 May 2019 16:28:07 +0100
+
 matrix-synapse-py3 (0.99.5.1) stable; urgency=medium
 
   * New synapse release 0.99.5.1.
diff --git a/docs/CAPTCHA_SETUP.rst b/docs/CAPTCHA_SETUP.rst
index 19a204d9ce..0c22ee4ff6 100644
--- a/docs/CAPTCHA_SETUP.rst
+++ b/docs/CAPTCHA_SETUP.rst
@@ -7,6 +7,7 @@ Requires a public/private key pair from:
 
 https://developers.google.com/recaptcha/
 
+Must be a reCAPTCHA v2 key using the "I'm not a robot" Checkbox option
 
 Setting ReCaptcha Keys
 ----------------------
diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst
index 8aca4f158d..213359d0c0 100644
--- a/docs/admin_api/user_admin_api.rst
+++ b/docs/admin_api/user_admin_api.rst
@@ -69,7 +69,7 @@ An empty body may be passed for backwards compatibility.
 Reset password
 ==============
 
-Changes the password of another user.
+Changes the password of another user. This will automatically log the user out of all their devices.
 
 The api is::
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 4f95778eea..d0e8d7c21b 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -27,4 +27,4 @@ try:
 except ImportError:
     pass
 
-__version__ = "0.99.5.1"
+__version__ = "0.99.5.2"
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 3c6bddff7a..e16c386a14 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
 FEDERATION_PREFIX = "/_matrix/federation"
 FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
 FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
+FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
 STATIC_PREFIX = "/_matrix/static"
 WEB_CLIENT_PREFIX = "/_matrix/client"
 CONTENT_REPO_PREFIX = "/_matrix/content"
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 385eda2dca..d0efc4e0d3 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer
 import synapse
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+    FEDERATION_UNSTABLE_PREFIX,
+    FEDERATION_V1_PREFIX,
+    FEDERATION_V2_PREFIX,
+)
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
@@ -1304,6 +1308,30 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class RoomComplexityServlet(BaseFederationServlet):
+    """
+    Indicates to other servers how complex (and therefore likely
+    resource-intensive) a public room this server knows about is.
+    """
+    PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
+    PREFIX = FEDERATION_UNSTABLE_PREFIX
+
+    @defer.inlineCallbacks
+    def on_GET(self, origin, content, query, room_id):
+
+        store = self.handler.hs.get_datastore()
+
+        is_public = yield store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
+
+        complexity = yield store.get_room_complexity(room_id)
+        defer.returnValue((200, complexity))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationEventServlet,
@@ -1327,6 +1355,7 @@ FEDERATION_SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     FederationVersionServlet,
+    RoomComplexityServlet,
 )
 
 OPENID_SERVLET_CLASSES = (
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 7eefc7b1fc..8197619a78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -711,10 +711,6 @@ class MatrixFederationHttpClient(object):
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
-        logger.debug("get_json args: %s", args)
-
-        logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
-
         request = MatrixFederationRequest(
             method="GET",
             destination=destination,
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 744d85594f..d6c4dcdb18 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
 
     def __init__(self, hs):
         JsonResource.__init__(self, hs, canonical_json=False)
+        register_servlets(hs, self)
 
-        register_servlets_for_client_rest_resource(hs, self)
-        SendServerNoticeServlet(hs).register(self)
-        VersionServlet(hs).register(self)
+
+def register_servlets(hs, http_server):
+    """
+    Register all the admin servlets.
+    """
+    register_servlets_for_client_rest_resource(hs, http_server)
+    SendServerNoticeServlet(hs).register(http_server)
+    VersionServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 5180e9eaf1..029039c162 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -386,7 +386,7 @@ class CasRedirectServlet(RestServlet):
             b"redirectUrl": args[b"redirectUrl"][0]
         }).encode('ascii')
         hs_redirect_url = (self.cas_service_url +
-                           b"/_matrix/client/api/v1/login/cas/ticket")
+                           b"/_matrix/client/r0/login/cas/ticket")
         service_param = urllib.parse.urlencode({
             b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)
         }).encode('ascii')
@@ -395,7 +395,7 @@ class CasRedirectServlet(RestServlet):
 
 
 class CasTicketServlet(ClientV1RestServlet):
-    PATTERNS = client_path_patterns("/login/cas/ticket", releases=())
+    PATTERNS = client_path_patterns("/login/cas/ticket")
 
     def __init__(self, hs):
         super(CasTicketServlet, self).__init__(hs)
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index 430c692336..ba20e75033 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -17,8 +17,6 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.api.errors import AuthError
-
 from .base import ClientV1RestServlet, client_path_patterns
 
 logger = logging.getLogger(__name__)
@@ -38,23 +36,16 @@ class LogoutRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        try:
-            requester = yield self.auth.get_user_by_req(request)
-        except AuthError:
-            # this implies the access token has already been deleted.
-            defer.returnValue((401, {
-                "errcode": "M_UNKNOWN_TOKEN",
-                "error": "Access Token unknown or expired"
-            }))
+        requester = yield self.auth.get_user_by_req(request)
+
+        if requester.device_id is None:
+            # the acccess token wasn't associated with a device.
+            # Just delete the access token
+            access_token = self._auth.get_access_token_from_request(request)
+            yield self._auth_handler.delete_access_token(access_token)
         else:
-            if requester.device_id is None:
-                # the acccess token wasn't associated with a device.
-                # Just delete the access token
-                access_token = self._auth.get_access_token_from_request(request)
-                yield self._auth_handler.delete_access_token(access_token)
-            else:
-                yield self._device_handler.delete_device(
-                    requester.user.to_string(), requester.device_id)
+            yield self._device_handler.delete_device(
+                requester.user.to_string(), requester.device_id)
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 255a85c588..b92c6a9a9c 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -475,6 +475,8 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
         if filter_bytes:
             filter_json = urlparse.unquote(filter_bytes.decode("UTF-8"))
             event_filter = Filter(json.loads(filter_json))
+            if event_filter.filter_json.get("event_format", "client") == "federation":
+                as_client_event = False
         else:
             event_filter = None
         msgs = yield self.pagination_handler.get_messages(
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 5305e9175f..35a750923b 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -56,8 +56,8 @@ class ThumbnailResource(Resource):
     def _async_render_GET(self, request):
         set_cors_headers(request)
         server_name, media_id, _ = parse_media_id(request)
-        width = parse_integer(request, "width")
-        height = parse_integer(request, "height")
+        width = parse_integer(request, "width", required=True)
+        height = parse_integer(request, "height", required=True)
         method = parse_string(request, "method", "scale")
         m_type = parse_string(request, "type", "image/png")
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 66675d08ae..71316f7d09 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -36,6 +36,7 @@ from .engines import PostgresEngine
 from .event_federation import EventFederationStore
 from .event_push_actions import EventPushActionsStore
 from .events import EventsStore
+from .events_bg_updates import EventsBackgroundUpdatesStore
 from .filtering import FilteringStore
 from .group_server import GroupServerStore
 from .keys import KeyStore
@@ -66,6 +67,7 @@ logger = logging.getLogger(__name__)
 
 
 class DataStore(
+    EventsBackgroundUpdatesStore,
     RoomMemberStore,
     RoomStore,
     RegistrationStore,
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index fa6839ceca..3fe827cd43 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -1261,7 +1261,8 @@ class SQLBaseStore(object):
             " AND ".join("%s = ?" % (k,) for k in keyvalues),
         )
 
-        return txn.execute(sql, list(keyvalues.values()))
+        txn.execute(sql, list(keyvalues.values()))
+        return txn.rowcount
 
     def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
         return self.runInteraction(
@@ -1280,9 +1281,12 @@ class SQLBaseStore(object):
             column : column name to test for inclusion against `iterable`
             iterable : list
             keyvalues : dict of column names and values to select the rows with
+
+        Returns:
+            int: Number rows deleted
         """
         if not iterable:
-            return
+            return 0
 
         sql = "DELETE FROM %s" % table
 
@@ -1297,7 +1301,9 @@ class SQLBaseStore(object):
 
         if clauses:
             sql = "%s WHERE %s" % (sql, " AND ".join(clauses))
-        return txn.execute(sql, values)
+        txn.execute(sql, values)
+
+        return txn.rowcount
 
     def _get_cache_dict(
         self, db_conn, table, entity_column, stream_column, max_value, limit=100000
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2ffc27ff41..f9162be9b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -219,41 +220,11 @@ class EventsStore(
     EventsWorkerStore,
     BackgroundUpdateStore,
 ):
-    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
-    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
-        self.register_background_update_handler(
-            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
-        )
-        self.register_background_update_handler(
-            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
-            self._background_reindex_fields_sender,
-        )
-
-        self.register_background_index_update(
-            "event_contains_url_index",
-            index_name="event_contains_url_index",
-            table="events",
-            columns=["room_id", "topological_ordering", "stream_ordering"],
-            where_clause="contains_url = true AND outlier = false",
-        )
-
-        # an event_id index on event_search is useful for the purge_history
-        # api. Plus it means we get to enforce some integrity with a UNIQUE
-        # clause
-        self.register_background_index_update(
-            "event_search_event_id_idx",
-            index_name="event_search_event_id_idx",
-            table="event_search",
-            columns=["event_id"],
-            unique=True,
-            psql_only=True,
-        )
 
         self._event_persist_queue = _EventPeristenceQueue()
-
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
     @defer.inlineCallbacks
@@ -554,10 +525,18 @@ class EventsStore(
             e_id for event in new_events for e_id in event.prev_event_ids()
         )
 
-        # Finally, remove any events which are prev_events of any existing events.
+        # Remove any events which are prev_events of any existing events.
         existing_prevs = yield self._get_events_which_are_prevs(result)
         result.difference_update(existing_prevs)
 
+        # Finally handle the case where the new events have soft-failed prev
+        # events. If they do we need to remove them and their prev events,
+        # otherwise we end up with dangling extremities.
+        existing_prevs = yield self._get_prevs_before_rejected(
+            e_id for event in new_events for e_id in event.prev_event_ids()
+        )
+        result.difference_update(existing_prevs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -573,7 +552,7 @@ class EventsStore(
         """
         results = []
 
-        def _get_events(txn, batch):
+        def _get_events_which_are_prevs_txn(txn, batch):
             sql = """
             SELECT prev_event_id, internal_metadata
             FROM event_edges
@@ -596,11 +575,79 @@ class EventsStore(
             )
 
         for chunk in batch_iter(event_ids, 100):
-            yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
+            yield self.runInteraction(
+                "_get_events_which_are_prevs",
+                _get_events_which_are_prevs_txn,
+                chunk,
+            )
 
         defer.returnValue(results)
 
     @defer.inlineCallbacks
+    def _get_prevs_before_rejected(self, event_ids):
+        """Get soft-failed ancestors to remove from the extremities.
+
+        Given a set of events, find all those that have been soft-failed or
+        rejected. Returns those soft failed/rejected events and their prev
+        events (whether soft-failed/rejected or not), and recurses up the
+        prev-event graph until it finds no more soft-failed/rejected events.
+
+        This is used to find extremities that are ancestors of new events, but
+        are separated by soft failed events.
+
+        Args:
+            event_ids (Iterable[str]): Events to find prev events for. Note
+                that these must have already been persisted.
+
+        Returns:
+            Deferred[set[str]]
+        """
+
+        # The set of event_ids to return. This includes all soft-failed events
+        # and their prev events.
+        existing_prevs = set()
+
+        def _get_prevs_before_rejected_txn(txn, batch):
+            to_recursively_check = batch
+
+            while to_recursively_check:
+                sql = """
+                SELECT
+                    event_id, prev_event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL
+                FROM event_edges
+                    INNER JOIN events USING (event_id)
+                    LEFT JOIN rejections USING (event_id)
+                    LEFT JOIN event_json USING (event_id)
+                WHERE
+                    event_id IN (%s)
+                    AND NOT events.outlier
+                """ % (
+                    ",".join("?" for _ in to_recursively_check),
+                )
+
+                txn.execute(sql, to_recursively_check)
+                to_recursively_check = []
+
+                for event_id, prev_event_id, metadata, rejected in txn:
+                    if prev_event_id in existing_prevs:
+                        continue
+
+                    soft_failed = json.loads(metadata).get("soft_failed")
+                    if soft_failed or rejected:
+                        to_recursively_check.append(prev_event_id)
+                        existing_prevs.add(prev_event_id)
+
+        for chunk in batch_iter(event_ids, 100):
+            yield self.runInteraction(
+                "_get_prevs_before_rejected",
+                _get_prevs_before_rejected_txn,
+                chunk,
+            )
+
+        defer.returnValue(existing_prevs)
+
+    @defer.inlineCallbacks
     def _get_new_state_after_events(
         self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
     ):
@@ -1503,153 +1550,6 @@ class EventsStore(
         ret = yield self.runInteraction("count_daily_active_rooms", _count)
         defer.returnValue(ret)
 
-    @defer.inlineCallbacks
-    def _background_reindex_fields_sender(self, progress, batch_size):
-        target_min_stream_id = progress["target_min_stream_id_inclusive"]
-        max_stream_id = progress["max_stream_id_exclusive"]
-        rows_inserted = progress.get("rows_inserted", 0)
-
-        INSERT_CLUMP_SIZE = 1000
-
-        def reindex_txn(txn):
-            sql = (
-                "SELECT stream_ordering, event_id, json FROM events"
-                " INNER JOIN event_json USING (event_id)"
-                " WHERE ? <= stream_ordering AND stream_ordering < ?"
-                " ORDER BY stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
-            rows = txn.fetchall()
-            if not rows:
-                return 0
-
-            min_stream_id = rows[-1][0]
-
-            update_rows = []
-            for row in rows:
-                try:
-                    event_id = row[1]
-                    event_json = json.loads(row[2])
-                    sender = event_json["sender"]
-                    content = event_json["content"]
-
-                    contains_url = "url" in content
-                    if contains_url:
-                        contains_url &= isinstance(content["url"], text_type)
-                except (KeyError, AttributeError):
-                    # If the event is missing a necessary field then
-                    # skip over it.
-                    continue
-
-                update_rows.append((sender, contains_url, event_id))
-
-            sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
-
-            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
-                clump = update_rows[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
-
-            progress = {
-                "target_min_stream_id_inclusive": target_min_stream_id,
-                "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(rows),
-            }
-
-            self._background_update_progress_txn(
-                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
-            )
-
-            return len(rows)
-
-        result = yield self.runInteraction(
-            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
-        )
-
-        if not result:
-            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
-
-        defer.returnValue(result)
-
-    @defer.inlineCallbacks
-    def _background_reindex_origin_server_ts(self, progress, batch_size):
-        target_min_stream_id = progress["target_min_stream_id_inclusive"]
-        max_stream_id = progress["max_stream_id_exclusive"]
-        rows_inserted = progress.get("rows_inserted", 0)
-
-        INSERT_CLUMP_SIZE = 1000
-
-        def reindex_search_txn(txn):
-            sql = (
-                "SELECT stream_ordering, event_id FROM events"
-                " WHERE ? <= stream_ordering AND stream_ordering < ?"
-                " ORDER BY stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
-            rows = txn.fetchall()
-            if not rows:
-                return 0
-
-            min_stream_id = rows[-1][0]
-            event_ids = [row[1] for row in rows]
-
-            rows_to_update = []
-
-            chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
-            for chunk in chunks:
-                ev_rows = self._simple_select_many_txn(
-                    txn,
-                    table="event_json",
-                    column="event_id",
-                    iterable=chunk,
-                    retcols=["event_id", "json"],
-                    keyvalues={},
-                )
-
-                for row in ev_rows:
-                    event_id = row["event_id"]
-                    event_json = json.loads(row["json"])
-                    try:
-                        origin_server_ts = event_json["origin_server_ts"]
-                    except (KeyError, AttributeError):
-                        # If the event is missing a necessary field then
-                        # skip over it.
-                        continue
-
-                    rows_to_update.append((origin_server_ts, event_id))
-
-            sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
-
-            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
-                clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
-
-            progress = {
-                "target_min_stream_id_inclusive": target_min_stream_id,
-                "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(rows_to_update),
-            }
-
-            self._background_update_progress_txn(
-                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
-            )
-
-            return len(rows_to_update)
-
-        result = yield self.runInteraction(
-            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
-        )
-
-        if not result:
-            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
-
-        defer.returnValue(result)
-
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
         return -self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py
new file mode 100644
index 0000000000..75c1935bf3
--- /dev/null
+++ b/synapse/storage/events_bg_updates.py
@@ -0,0 +1,401 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from six import text_type
+
+from canonicaljson import json
+
+from twisted.internet import defer
+
+from synapse.storage.background_updates import BackgroundUpdateStore
+
+logger = logging.getLogger(__name__)
+
+
+class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
+
+    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
+    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+    DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
+
+    def __init__(self, db_conn, hs):
+        super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
+
+        self.register_background_update_handler(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
+        )
+        self.register_background_update_handler(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
+            self._background_reindex_fields_sender,
+        )
+
+        self.register_background_index_update(
+            "event_contains_url_index",
+            index_name="event_contains_url_index",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering"],
+            where_clause="contains_url = true AND outlier = false",
+        )
+
+        # an event_id index on event_search is useful for the purge_history
+        # api. Plus it means we get to enforce some integrity with a UNIQUE
+        # clause
+        self.register_background_index_update(
+            "event_search_event_id_idx",
+            index_name="event_search_event_id_idx",
+            table="event_search",
+            columns=["event_id"],
+            unique=True,
+            psql_only=True,
+        )
+
+        self.register_background_update_handler(
+            self.DELETE_SOFT_FAILED_EXTREMITIES,
+            self._cleanup_extremities_bg_update,
+        )
+
+    @defer.inlineCallbacks
+    def _background_reindex_fields_sender(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id, json FROM events"
+                " INNER JOIN event_json USING (event_id)"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+
+            update_rows = []
+            for row in rows:
+                try:
+                    event_id = row[1]
+                    event_json = json.loads(row[2])
+                    sender = event_json["sender"]
+                    content = event_json["content"]
+
+                    contains_url = "url" in content
+                    if contains_url:
+                        contains_url &= isinstance(content["url"], text_type)
+                except (KeyError, AttributeError):
+                    # If the event is missing a necessary field then
+                    # skip over it.
+                    continue
+
+                update_rows.append((sender, contains_url, event_id))
+
+            sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
+
+            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
+                clump = update_rows[index : index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows),
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
+            )
+
+            return len(rows)
+
+        result = yield self.runInteraction(
+            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _background_reindex_origin_server_ts(self, progress, batch_size):
+        target_min_stream_id = progress["target_min_stream_id_inclusive"]
+        max_stream_id = progress["max_stream_id_exclusive"]
+        rows_inserted = progress.get("rows_inserted", 0)
+
+        INSERT_CLUMP_SIZE = 1000
+
+        def reindex_search_txn(txn):
+            sql = (
+                "SELECT stream_ordering, event_id FROM events"
+                " WHERE ? <= stream_ordering AND stream_ordering < ?"
+                " ORDER BY stream_ordering DESC"
+                " LIMIT ?"
+            )
+
+            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            min_stream_id = rows[-1][0]
+            event_ids = [row[1] for row in rows]
+
+            rows_to_update = []
+
+            chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
+            for chunk in chunks:
+                ev_rows = self._simple_select_many_txn(
+                    txn,
+                    table="event_json",
+                    column="event_id",
+                    iterable=chunk,
+                    retcols=["event_id", "json"],
+                    keyvalues={},
+                )
+
+                for row in ev_rows:
+                    event_id = row["event_id"]
+                    event_json = json.loads(row["json"])
+                    try:
+                        origin_server_ts = event_json["origin_server_ts"]
+                    except (KeyError, AttributeError):
+                        # If the event is missing a necessary field then
+                        # skip over it.
+                        continue
+
+                    rows_to_update.append((origin_server_ts, event_id))
+
+            sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
+
+            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
+                clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
+                txn.executemany(sql, clump)
+
+            progress = {
+                "target_min_stream_id_inclusive": target_min_stream_id,
+                "max_stream_id_exclusive": min_stream_id,
+                "rows_inserted": rows_inserted + len(rows_to_update),
+            }
+
+            self._background_update_progress_txn(
+                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
+            )
+
+            return len(rows_to_update)
+
+        result = yield self.runInteraction(
+            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
+        )
+
+        if not result:
+            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _cleanup_extremities_bg_update(self, progress, batch_size):
+        """Background update to clean out extremities that should have been
+        deleted previously.
+
+        Mainly used to deal with the aftermath of #5269.
+        """
+
+        # This works by first copying all existing forward extremities into the
+        # `_extremities_to_check` table at start up, and then checking each
+        # event in that table whether we have any descendants that are not
+        # soft-failed/rejected. If that is the case then we delete that event
+        # from the forward extremities table.
+        #
+        # For efficiency, we do this in batches by recursively pulling out all
+        # descendants of a batch until we find the non soft-failed/rejected
+        # events, i.e. the set of descendants whose chain of prev events back
+        # to the batch of extremities are all soft-failed or rejected.
+        # Typically, we won't find any such events as extremities will rarely
+        # have any descendants, but if they do then we should delete those
+        # extremities.
+
+        def _cleanup_extremities_bg_update_txn(txn):
+            # The set of extremity event IDs that we're checking this round
+            original_set = set()
+
+            # A dict[str, set[str]] of event ID to their prev events.
+            graph = {}
+
+            # The set of descendants of the original set that are not rejected
+            # nor soft-failed. Ancestors of these events should be removed
+            # from the forward extremities table.
+            non_rejected_leaves = set()
+
+            # Set of event IDs that have been soft failed, and for which we
+            # should check if they have descendants which haven't been soft
+            # failed.
+            soft_failed_events_to_lookup = set()
+
+            # First, we get `batch_size` events from the table, pulling out
+            # their successor events, if any, and the successor events'
+            # rejection status.
+            txn.execute(
+                """SELECT prev_event_id, event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL, events.outlier
+                FROM (
+                    SELECT event_id AS prev_event_id
+                    FROM _extremities_to_check
+                    LIMIT ?
+                ) AS f
+                LEFT JOIN event_edges USING (prev_event_id)
+                LEFT JOIN events USING (event_id)
+                LEFT JOIN event_json USING (event_id)
+                LEFT JOIN rejections USING (event_id)
+                """, (batch_size,)
+            )
+
+            for prev_event_id, event_id, metadata, rejected, outlier in txn:
+                original_set.add(prev_event_id)
+
+                if not event_id or outlier:
+                    # Common case where the forward extremity doesn't have any
+                    # descendants.
+                    continue
+
+                graph.setdefault(event_id, set()).add(prev_event_id)
+
+                soft_failed = False
+                if metadata:
+                    soft_failed = json.loads(metadata).get("soft_failed")
+
+                if soft_failed or rejected:
+                    soft_failed_events_to_lookup.add(event_id)
+                else:
+                    non_rejected_leaves.add(event_id)
+
+            # Now we recursively check all the soft-failed descendants we
+            # found above in the same way, until we have nothing left to
+            # check.
+            while soft_failed_events_to_lookup:
+                # We only want to do 100 at a time, so we split given list
+                # into two.
+                batch = list(soft_failed_events_to_lookup)
+                to_check, to_defer = batch[:100], batch[100:]
+                soft_failed_events_to_lookup = set(to_defer)
+
+                sql = """SELECT prev_event_id, event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL
+                    FROM event_edges
+                    INNER JOIN events USING (event_id)
+                    INNER JOIN event_json USING (event_id)
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE
+                        prev_event_id IN (%s)
+                        AND NOT events.outlier
+                """ % (
+                    ",".join("?" for _ in to_check),
+                )
+                txn.execute(sql, to_check)
+
+                for prev_event_id, event_id, metadata, rejected in txn:
+                    if event_id in graph:
+                        # Already handled this event previously, but we still
+                        # want to record the edge.
+                        graph[event_id].add(prev_event_id)
+                        continue
+
+                    graph[event_id] = {prev_event_id}
+
+                    soft_failed = json.loads(metadata).get("soft_failed")
+                    if soft_failed or rejected:
+                        soft_failed_events_to_lookup.add(event_id)
+                    else:
+                        non_rejected_leaves.add(event_id)
+
+            # We have a set of non-soft-failed descendants, so we recurse up
+            # the graph to find all ancestors and add them to the set of event
+            # IDs that we can delete from forward extremities table.
+            to_delete = set()
+            while non_rejected_leaves:
+                event_id = non_rejected_leaves.pop()
+                prev_event_ids = graph.get(event_id, set())
+                non_rejected_leaves.update(prev_event_ids)
+                to_delete.update(prev_event_ids)
+
+            to_delete.intersection_update(original_set)
+
+            deleted = self._simple_delete_many_txn(
+                txn=txn,
+                table="event_forward_extremities",
+                column="event_id",
+                iterable=to_delete,
+                keyvalues={},
+            )
+
+            logger.info(
+                "Deleted %d forward extremities of %d checked, to clean up #5269",
+                deleted,
+                len(original_set),
+            )
+
+            if deleted:
+                # We now need to invalidate the caches of these rooms
+                rows = self._simple_select_many_txn(
+                    txn,
+                    table="events",
+                    column="event_id",
+                    iterable=to_delete,
+                    keyvalues={},
+                    retcols=("room_id",)
+                )
+                room_ids = set(row["room_id"] for row in rows)
+                for room_id in room_ids:
+                    txn.call_after(
+                        self.get_latest_event_ids_in_room.invalidate,
+                        (room_id,)
+                    )
+
+            self._simple_delete_many_txn(
+                txn=txn,
+                table="_extremities_to_check",
+                column="event_id",
+                iterable=original_set,
+                keyvalues={},
+            )
+
+            return len(original_set)
+
+        num_handled = yield self.runInteraction(
+            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+        )
+
+        if not num_handled:
+            yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
+
+            def _drop_table_txn(txn):
+                txn.execute("DROP TABLE _extremities_to_check")
+
+            yield self.runInteraction(
+                "_cleanup_extremities_bg_update_drop_table",
+                _drop_table_txn,
+            )
+
+        defer.returnValue(num_handled)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 21b353cad3..1782428048 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import division
+
 import itertools
 import logging
 from collections import namedtuple
@@ -614,9 +616,14 @@ class EventsWorkerStore(SQLBaseStore):
 
     def _get_total_state_event_counts_txn(self, txn, room_id):
         """
-        See get_state_event_counts.
+        See get_total_state_event_counts.
+        """
+        # We join against the events table as that has an index on room_id
+        sql = """
+            SELECT COUNT(*) FROM state_events
+            INNER JOIN events USING (room_id, event_id)
+            WHERE room_id=?
         """
-        sql = "SELECT COUNT(*) FROM state_events WHERE room_id=?"
         txn.execute(sql, (room_id,))
         row = txn.fetchone()
         return row[0] if row else 0
@@ -635,3 +642,49 @@ class EventsWorkerStore(SQLBaseStore):
             "get_total_state_event_counts",
             self._get_total_state_event_counts_txn, room_id
         )
+
+    def _get_current_state_event_counts_txn(self, txn, room_id):
+        """
+        See get_current_state_event_counts.
+        """
+        sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        return row[0] if row else 0
+
+    def get_current_state_event_counts(self, room_id):
+        """
+        Gets the current number of state events in a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_current_state_event_counts",
+            self._get_current_state_event_counts_txn, room_id
+        )
+
+    @defer.inlineCallbacks
+    def get_room_complexity(self, room_id):
+        """
+        Get a rough approximation of the complexity of the room. This is used by
+        remote servers to decide whether they wish to join the room or not.
+        Higher complexity value indicates that being in the room will consume
+        more resources.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[dict[str:int]] of complexity version to complexity.
+        """
+        state_events = yield self.get_current_state_event_counts(room_id)
+
+        # Call this one "v1", so we can introduce new ones as we want to develop
+        # it.
+        complexity_v1 = round(state_events / 500, 2)
+
+        defer.returnValue({"v1": complexity_v1})
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 4bd1669458..7617913326 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -142,26 +142,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return self.runInteraction("get_room_summary", _get_room_summary_txn)
 
-    def _get_user_count_in_room_txn(self, txn, room_id, membership):
+    def _get_user_counts_in_room_txn(self, txn, room_id):
         """
-        See get_user_count_in_room.
-        """
-        sql = (
-            "SELECT count(*) FROM room_memberships as m"
-            " INNER JOIN current_state_events as c"
-            " ON m.event_id = c.event_id "
-            " AND m.room_id = c.room_id "
-            " AND m.user_id = c.state_key"
-            " WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?"
-        )
-
-        txn.execute(sql, (room_id, membership))
-        row = txn.fetchone()
-        return row[0]
-
-    def get_user_count_in_room(self, room_id, membership):
-        """
-        Get the user count in a room with a particular membership.
+        Get the user count in a room by membership.
 
         Args:
             room_id (str)
@@ -170,9 +153,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         Returns:
             Deferred[int]
         """
-        return self.runInteraction(
-            "get_users_in_room", self._get_user_count_in_room_txn, room_id, membership
-        )
+        sql = """
+        SELECT m.membership, count(*) FROM room_memberships as m
+            INNER JOIN current_state_events as c USING(event_id)
+            WHERE c.type = 'm.room.member' AND c.room_id = ?
+            GROUP BY m.membership
+        """
+
+        txn.execute(sql, (room_id,))
+        return {row[0]: row[1] for row in txn}
 
     @cached()
     def get_invited_rooms_for_user(self, user_id):
diff --git a/synapse/storage/schema/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/delta/54/delete_forward_extremities.sql
new file mode 100644
index 0000000000..b062ec840c
--- /dev/null
+++ b/synapse/storage/schema/delta/54/delete_forward_extremities.sql
@@ -0,0 +1,23 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Start a background job to cleanup extremities that were incorrectly added
+-- by bug #5269.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('delete_soft_failed_extremities', '{}');
+
+DROP TABLE IF EXISTS _extremities_to_check;  -- To make this delta schema file idempotent.
+CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities;
+CREATE INDEX _extremities_to_check_id ON _extremities_to_check(event_id);
diff --git a/synapse/storage/schema/delta/54/stats2.sql b/synapse/storage/schema/delta/54/stats2.sql
new file mode 100644
index 0000000000..3b2d48447f
--- /dev/null
+++ b/synapse/storage/schema/delta/54/stats2.sql
@@ -0,0 +1,28 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This delta file gets run after `54/stats.sql` delta.
+
+-- We want to add some indices to the temporary stats table, so we re-insert
+-- 'populate_stats_createtables' if we are still processing the rooms update.
+INSERT INTO background_updates (update_name, progress_json)
+    SELECT 'populate_stats_createtables', '{}'
+    WHERE
+        'populate_stats_process_rooms' IN (
+            SELECT update_name FROM background_updates
+        )
+        AND 'populate_stats_createtables' NOT IN (  -- don't insert if already exists
+            SELECT update_name FROM background_updates
+        );
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index eb0ced5b5e..1c0b183a56 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -18,6 +18,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.storage.prepare_database import get_statements
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
@@ -69,12 +70,25 @@ class StatsStore(StateDeltasStore):
 
         # Get all the rooms that we want to process.
         def _make_staging_area(txn):
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
-            )
-            txn.execute(sql)
+            # Create the temporary tables
+            stmts = get_statements("""
+                -- We just recreate the table, we'll be reinserting the
+                -- correct entries again later anyway.
+                DROP TABLE IF EXISTS {temp}_rooms;
+
+                CREATE TABLE IF NOT EXISTS {temp}_rooms(
+                    room_id TEXT NOT NULL,
+                    events BIGINT NOT NULL
+                );
+
+                CREATE INDEX {temp}_rooms_events
+                    ON {temp}_rooms(events);
+                CREATE INDEX {temp}_rooms_id
+                    ON {temp}_rooms(room_id);
+            """.format(temp=TEMP_TABLE).splitlines())
+
+            for statement in stmts:
+                txn.execute(statement)
 
             sql = (
                 "CREATE TABLE IF NOT EXISTS "
@@ -83,15 +97,16 @@ class StatsStore(StateDeltasStore):
             )
             txn.execute(sql)
 
-            # Get rooms we want to process from the database
+            # Get rooms we want to process from the database, only adding
+            # those that we haven't (i.e. those not in room_stats_earliest_token)
             sql = """
-                SELECT room_id, count(*) FROM current_state_events
-                GROUP BY room_id
-            """
+                INSERT INTO %s_rooms (room_id, events)
+                SELECT c.room_id, count(*) FROM current_state_events AS c
+                LEFT JOIN room_stats_earliest_token AS t USING (room_id)
+                WHERE t.room_id IS NULL
+                GROUP BY c.room_id
+            """ % (TEMP_TABLE,)
             txn.execute(sql)
-            rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()]
-            self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms)
-            del rooms
 
         new_pos = yield self.get_max_stream_id_in_current_state_deltas()
         yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
@@ -179,46 +194,39 @@ class StatsStore(StateDeltasStore):
 
             current_state_ids = yield self.get_current_state_ids(room_id)
 
-            join_rules = yield self.get_event(
-                current_state_ids.get((EventTypes.JoinRules, "")), allow_none=True
-            )
-            history_visibility = yield self.get_event(
-                current_state_ids.get((EventTypes.RoomHistoryVisibility, "")),
-                allow_none=True,
-            )
-            encryption = yield self.get_event(
-                current_state_ids.get((EventTypes.RoomEncryption, "")), allow_none=True
-            )
-            name = yield self.get_event(
-                current_state_ids.get((EventTypes.Name, "")), allow_none=True
-            )
-            topic = yield self.get_event(
-                current_state_ids.get((EventTypes.Topic, "")), allow_none=True
-            )
-            avatar = yield self.get_event(
-                current_state_ids.get((EventTypes.RoomAvatar, "")), allow_none=True
+            join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
+            history_visibility_id = current_state_ids.get(
+                (EventTypes.RoomHistoryVisibility, "")
             )
-            canonical_alias = yield self.get_event(
-                current_state_ids.get((EventTypes.CanonicalAlias, "")), allow_none=True
-            )
-
-            def _or_none(x, arg):
-                if x:
-                    return x.content.get(arg)
+            encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
+            name_id = current_state_ids.get((EventTypes.Name, ""))
+            topic_id = current_state_ids.get((EventTypes.Topic, ""))
+            avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
+            canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
+
+            state_events = yield self.get_events([
+                join_rules_id, history_visibility_id, encryption_id, name_id,
+                topic_id, avatar_id, canonical_alias_id,
+            ])
+
+            def _get_or_none(event_id, arg):
+                event = state_events.get(event_id)
+                if event:
+                    return event.content.get(arg)
                 return None
 
             yield self.update_room_state(
                 room_id,
                 {
-                    "join_rules": _or_none(join_rules, "join_rule"),
-                    "history_visibility": _or_none(
-                        history_visibility, "history_visibility"
+                    "join_rules": _get_or_none(join_rules_id, "join_rule"),
+                    "history_visibility": _get_or_none(
+                        history_visibility_id, "history_visibility"
                     ),
-                    "encryption": _or_none(encryption, "algorithm"),
-                    "name": _or_none(name, "name"),
-                    "topic": _or_none(topic, "topic"),
-                    "avatar": _or_none(avatar, "url"),
-                    "canonical_alias": _or_none(canonical_alias, "alias"),
+                    "encryption": _get_or_none(encryption_id, "algorithm"),
+                    "name": _get_or_none(name_id, "name"),
+                    "topic": _get_or_none(topic_id, "topic"),
+                    "avatar": _get_or_none(avatar_id, "url"),
+                    "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
                 },
             )
 
@@ -233,18 +241,9 @@ class StatsStore(StateDeltasStore):
                 current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
 
                 current_state_events = len(current_state_ids)
-                joined_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.JOIN
-                )
-                invited_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.INVITE
-                )
-                left_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.LEAVE
-                )
-                banned_members = self._get_user_count_in_room_txn(
-                    txn, room_id, Membership.BAN
-                )
+
+                membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
+
                 total_state_events = self._get_total_state_event_counts_txn(
                     txn, room_id
                 )
@@ -257,10 +256,10 @@ class StatsStore(StateDeltasStore):
                     {
                         "bucket_size": self.stats_bucket_size,
                         "current_state_events": current_state_events,
-                        "joined_members": joined_members,
-                        "invited_members": invited_members,
-                        "left_members": left_members,
-                        "banned_members": banned_members,
+                        "joined_members": membership_counts.get(Membership.JOIN, 0),
+                        "invited_members": membership_counts.get(Membership.INVITE, 0),
+                        "left_members": membership_counts.get(Membership.LEAVE, 0),
+                        "banned_members": membership_counts.get(Membership.BAN, 0),
                         "state_events": total_state_events,
                     },
                 )
@@ -270,10 +269,13 @@ class StatsStore(StateDeltasStore):
                     {"room_id": room_id, "token": current_token},
                 )
 
+                # We've finished a room. Delete it from the table.
+                self._simple_delete_one_txn(
+                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id},
+                )
+
             yield self.runInteraction("update_room_stats", _fetch_data)
 
-            # We've finished a room. Delete it from the table.
-            yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id})
             # Update the remaining counter.
             progress["remaining"] -= 1
             yield self.runInteraction(
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 311b49e18a..fe412355d8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -226,6 +226,8 @@ class LoggingContext(object):
             self.request = request
 
     def __str__(self):
+        if self.request:
+            return str(self.request)
         return "%s@%x" % (self.name, id(self))
 
     @classmethod
@@ -274,12 +276,10 @@ class LoggingContext(object):
         current = self.set_current_context(self.previous_context)
         if current is not self:
             if current is self.sentinel:
-                logger.warn("Expected logging context %s has been lost", self)
+                logger.warning("Expected logging context %s was lost", self)
             else:
-                logger.warn(
-                    "Current logging context %s is not expected context %s",
-                    current,
-                    self
+                logger.warning(
+                    "Expected logging context %s but found %s", self, current
                 )
         self.previous_context = None
         self.alive = False
@@ -433,10 +433,14 @@ class PreserveLoggingContext(object):
         context = LoggingContext.set_current_context(self.current_context)
 
         if context != self.new_context:
-            logger.warn(
-                "Unexpected logging context: %s is not %s",
-                context, self.new_context,
-            )
+            if context is LoggingContext.sentinel:
+                logger.warning("Expected logging context %s was lost", self.new_context)
+            else:
+                logger.warning(
+                    "Expected logging context %s but found %s",
+                    self.new_context,
+                    context,
+                )
 
         if self.current_context is not LoggingContext.sentinel:
             if not self.current_context.alive:
diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py
new file mode 100644
index 0000000000..1e3e5aec66
--- /dev/null
+++ b/tests/federation/test_complexity.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 Matrix.org Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.config.ratelimiting import FederationRateLimitConfig
+from synapse.federation.transport import server
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+from synapse.util.ratelimitutils import FederationRateLimiter
+
+from tests import unittest
+
+
+class RoomComplexityTests(unittest.HomeserverTestCase):
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def default_config(self, name='test'):
+        config = super(RoomComplexityTests, self).default_config(name=name)
+        config["limit_large_remote_room_joins"] = True
+        config["limit_large_remote_room_complexity"] = 0.05
+        return config
+
+    def prepare(self, reactor, clock, homeserver):
+        class Authenticator(object):
+            def authenticate_request(self, request, content):
+                return defer.succeed("otherserver.nottld")
+
+        ratelimiter = FederationRateLimiter(
+            clock,
+            FederationRateLimitConfig(
+                window_size=1,
+                sleep_limit=1,
+                sleep_msec=1,
+                reject_limit=1000,
+                concurrent_requests=1000,
+            ),
+        )
+        server.register_servlets(
+            homeserver, self.resource, Authenticator(), ratelimiter
+        )
+
+    def test_complexity_simple(self):
+
+        u1 = self.register_user("u1", "pass")
+        u1_token = self.login("u1", "pass")
+
+        room_1 = self.helper.create_room_as(u1, tok=u1_token)
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
+        )
+
+        # Get the room complexity
+        request, channel = self.make_request(
+            "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
+        )
+        self.render(request)
+        self.assertEquals(200, channel.code)
+        complexity = channel.json_body["v1"]
+        self.assertTrue(complexity > 0, complexity)
+
+        # Artificially raise the complexity
+        store = self.hs.get_datastore()
+        store.get_current_state_event_counts = lambda x: defer.succeed(500 * 1.23)
+
+        # Get the room complexity again -- make sure it's our artificial value
+        request, channel = self.make_request(
+            "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
+        )
+        self.render(request)
+        self.assertEquals(200, channel.code)
+        complexity = channel.json_body["v1"]
+        self.assertEqual(complexity, 1.23)
diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py
new file mode 100644
index 0000000000..6dda66ecd3
--- /dev/null
+++ b/tests/storage/test_cleanup_extrems.py
@@ -0,0 +1,248 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os.path
+
+from synapse.api.constants import EventTypes
+from synapse.storage import prepare_database
+from synapse.types import Requester, UserID
+
+from tests.unittest import HomeserverTestCase
+
+
+class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
+    """Test the background update to clean forward extremities table.
+    """
+
+    def prepare(self, reactor, clock, homeserver):
+        self.store = homeserver.get_datastore()
+        self.event_creator = homeserver.get_event_creation_handler()
+        self.room_creator = homeserver.get_room_creation_handler()
+
+        # Create a test user and room
+        self.user = UserID("alice", "test")
+        self.requester = Requester(self.user, None, False, None, None)
+        info = self.get_success(self.room_creator.create_room(self.requester, {}))
+        self.room_id = info["room_id"]
+
+    def create_and_send_event(self, soft_failed=False, prev_event_ids=None):
+        """Create and send an event.
+
+        Args:
+            soft_failed (bool): Whether to create a soft failed event or not
+            prev_event_ids (list[str]|None): Explicitly set the prev events,
+                or if None just use the default
+
+        Returns:
+            str: The new event's ID.
+        """
+        prev_events_and_hashes = None
+        if prev_event_ids:
+            prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
+
+        event, context = self.get_success(
+            self.event_creator.create_event(
+                self.requester,
+                {
+                    "type": EventTypes.Message,
+                    "room_id": self.room_id,
+                    "sender": self.user.to_string(),
+                    "content": {"body": "", "msgtype": "m.text"},
+                },
+                prev_events_and_hashes=prev_events_and_hashes,
+            )
+        )
+
+        if soft_failed:
+            event.internal_metadata.soft_failed = True
+
+        self.get_success(
+            self.event_creator.send_nonmember_event(self.requester, event, context)
+        )
+
+        return event.event_id
+
+    def add_extremity(self, event_id):
+        """Add the given event as an extremity to the room.
+        """
+        self.get_success(
+            self.store._simple_insert(
+                table="event_forward_extremities",
+                values={"room_id": self.room_id, "event_id": event_id},
+                desc="test_add_extremity",
+            )
+        )
+
+        self.store.get_latest_event_ids_in_room.invalidate((self.room_id,))
+
+    def run_background_update(self):
+        """Re run the background update to clean up the extremities.
+        """
+        # Make sure we don't clash with in progress updates.
+        self.assertTrue(self.store._all_done, "Background updates are still ongoing")
+
+        schema_path = os.path.join(
+            prepare_database.dir_path,
+            "schema",
+            "delta",
+            "54",
+            "delete_forward_extremities.sql",
+        )
+
+        def run_delta_file(txn):
+            prepare_database.executescript(txn, schema_path)
+
+        self.get_success(
+            self.store.runInteraction("test_delete_forward_extremities", run_delta_file)
+        )
+
+        # Ugh, have to reset this flag
+        self.store._all_done = False
+
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+    def test_soft_failed_extremities_handled_correctly(self):
+        """Test that extremities are correctly calculated in the presence of
+        soft failed events.
+
+        Tests a graph like:
+
+            A <- SF1 <- SF2 <- B
+
+        Where SF* are soft failed.
+        """
+
+        # Create the room graph
+        event_id_1 = self.create_and_send_event()
+        event_id_2 = self.create_and_send_event(True, [event_id_1])
+        event_id_3 = self.create_and_send_event(True, [event_id_2])
+        event_id_4 = self.create_and_send_event(False, [event_id_3])
+
+        # Check the latest events are as expected
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+
+        self.assertEqual(latest_event_ids, [event_id_4])
+
+    def test_basic_cleanup(self):
+        """Test that extremities are correctly calculated in the presence of
+        soft failed events.
+
+        Tests a graph like:
+
+            A <- SF1 <- B
+
+        Where SF* are soft failed, and with extremities of A and B
+        """
+        # Create the room graph
+        event_id_a = self.create_and_send_event()
+        event_id_sf1 = self.create_and_send_event(True, [event_id_a])
+        event_id_b = self.create_and_send_event(False, [event_id_sf1])
+
+        # Add the new extremity and check the latest events are as expected
+        self.add_extremity(event_id_a)
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
+
+        # Run the background update and check it did the right thing
+        self.run_background_update()
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(latest_event_ids, [event_id_b])
+
+    def test_chain_of_fail_cleanup(self):
+        """Test that extremities are correctly calculated in the presence of
+        soft failed events.
+
+        Tests a graph like:
+
+            A <- SF1 <- SF2 <- B
+
+        Where SF* are soft failed, and with extremities of A and B
+        """
+        # Create the room graph
+        event_id_a = self.create_and_send_event()
+        event_id_sf1 = self.create_and_send_event(True, [event_id_a])
+        event_id_sf2 = self.create_and_send_event(True, [event_id_sf1])
+        event_id_b = self.create_and_send_event(False, [event_id_sf2])
+
+        # Add the new extremity and check the latest events are as expected
+        self.add_extremity(event_id_a)
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b)))
+
+        # Run the background update and check it did the right thing
+        self.run_background_update()
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(latest_event_ids, [event_id_b])
+
+    def test_forked_graph_cleanup(self):
+        r"""Test that extremities are correctly calculated in the presence of
+        soft failed events.
+
+        Tests a graph like, where time flows down the page:
+
+                A     B
+               / \   /
+              /   \ /
+            SF1   SF2
+             |     |
+            SF3    |
+           /   \   |
+           |    \  |
+           C     SF4
+
+        Where SF* are soft failed, and with them A, B and C marked as
+        extremities. This should resolve to B and C being marked as extremity.
+        """
+        # Create the room graph
+        event_id_a = self.create_and_send_event()
+        event_id_b = self.create_and_send_event()
+        event_id_sf1 = self.create_and_send_event(True, [event_id_a])
+        event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b])
+        event_id_sf3 = self.create_and_send_event(True, [event_id_sf1])
+        self.create_and_send_event(True, [event_id_sf2, event_id_sf3])  # SF4
+        event_id_c = self.create_and_send_event(False, [event_id_sf3])
+
+        # Add the new extremity and check the latest events are as expected
+        self.add_extremity(event_id_a)
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(
+            set(latest_event_ids), set((event_id_a, event_id_b, event_id_c))
+        )
+
+        # Run the background update and check it did the right thing
+        self.run_background_update()
+
+        latest_event_ids = self.get_success(
+            self.store.get_latest_event_ids_in_room(self.room_id)
+        )
+        self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))