summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-11-19 12:54:29 -0600
committerAmber Brown <hawkowl@atleastfornow.net>2018-11-19 12:54:29 -0600
commit678ad155a2eecc344d93c6849f7bd887bcc42191 (patch)
treef591ac30afc0e589c483421cbf8061fb56e27504 /synapse/storage
parentMerge branch 'release-v0.33.8' (diff)
parenttowncrier (diff)
downloadsynapse-678ad155a2eecc344d93c6849f7bd887bcc42191.tar.xz
Merge tag 'v0.33.9' 0.34.0rc2
Features
--------

- Include flags to optionally add `m.login.terms` to the registration flow when consent tracking is enabled.
([\#4004](https://github.com/matrix-org/synapse/issues/4004), [\#4133](https://github.com/matrix-org/synapse/issues/4133),
[\#4142](https://github.com/matrix-org/synapse/issues/4142), [\#4184](https://github.com/matrix-org/synapse/issues/4184))
- Support for replacing rooms with new ones ([\#4091](https://github.com/matrix-org/synapse/issues/4091), [\#4099](https://github.com/matrix-org/synapse/issues/4099),
[\#4100](https://github.com/matrix-org/synapse/issues/4100), [\#4101](https://github.com/matrix-org/synapse/issues/4101))

Bugfixes
--------

- Fix exceptions when using the email mailer on Python 3. ([\#4095](https://github.com/matrix-org/synapse/issues/4095))
- Fix e2e key backup with more than 9 backup versions ([\#4113](https://github.com/matrix-org/synapse/issues/4113))
- Searches that request profile info now no longer fail with a 500. ([\#4122](https://github.com/matrix-org/synapse/issues/4122))
- fix return code of empty key backups ([\#4123](https://github.com/matrix-org/synapse/issues/4123))
- If the typing stream ID goes backwards (as on a worker when the master restarts), the worker's typing handler will no longer erroneously report rooms containing new
typing events. ([\#4127](https://github.com/matrix-org/synapse/issues/4127))
- Fix table lock of device_lists_remote_cache which could freeze the application ([\#4132](https://github.com/matrix-org/synapse/issues/4132))
- Fix exception when using state res v2 algorithm ([\#4135](https://github.com/matrix-org/synapse/issues/4135))
- Generating the user consent URI no longer fails on Python 3. ([\#4140](https://github.com/matrix-org/synapse/issues/4140),
[\#4163](https://github.com/matrix-org/synapse/issues/4163))
- Loading URL previews from the DB cache on Postgres will no longer cause Unicode type errors when responding to the request, and URL previews will no longer fail if
the remote server returns a Content-Type header with the chartype in quotes. ([\#4157](https://github.com/matrix-org/synapse/issues/4157))
- The hash_password script now works on Python 3. ([\#4161](https://github.com/matrix-org/synapse/issues/4161))
- Fix noop checks when updating device keys, reducing spurious device list update notifications. ([\#4164](https://github.com/matrix-org/synapse/issues/4164))

Deprecations and Removals
-------------------------

- The disused and un-specced identicon generator has been removed. ([\#4106](https://github.com/matrix-org/synapse/issues/4106))
- The obsolete and non-functional /pull federation endpoint has been removed. ([\#4118](https://github.com/matrix-org/synapse/issues/4118))
- The deprecated v1 key exchange endpoints have been removed. ([\#4119](https://github.com/matrix-org/synapse/issues/4119))
- Synapse will no longer fetch keys using the fallback deprecated v1 key exchange method and will now always use v2.
([\#4120](https://github.com/matrix-org/synapse/issues/4120))

Internal Changes
----------------

- Fix build of Docker image with docker-compose ([\#3778](https://github.com/matrix-org/synapse/issues/3778))
- Delete unreferenced state groups during history purge ([\#4006](https://github.com/matrix-org/synapse/issues/4006))
- The "Received rdata" log messages on workers is now logged at DEBUG, not INFO. ([\#4108](https://github.com/matrix-org/synapse/issues/4108))
- Reduce replication traffic for device lists ([\#4109](https://github.com/matrix-org/synapse/issues/4109))
- Fix `synapse_replication_tcp_protocol_*_commands` metric label to be full command name, rather than just the first character
([\#4110](https://github.com/matrix-org/synapse/issues/4110))
- Log some bits about room creation ([\#4121](https://github.com/matrix-org/synapse/issues/4121))
- Fix `tox` failure on old systems ([\#4124](https://github.com/matrix-org/synapse/issues/4124))
- Add STATE_V2_TEST room version ([\#4128](https://github.com/matrix-org/synapse/issues/4128))
- Clean up event accesses and tests ([\#4137](https://github.com/matrix-org/synapse/issues/4137))
- The default logging config will now set an explicit log file encoding of UTF-8. ([\#4138](https://github.com/matrix-org/synapse/issues/4138))
- Add helpers functions for getting prev and auth events of an event ([\#4139](https://github.com/matrix-org/synapse/issues/4139))
- Add some tests for the HTTP pusher. ([\#4149](https://github.com/matrix-org/synapse/issues/4149))
- add purge_history.sh and purge_remote_media.sh scripts to contrib/ ([\#4155](https://github.com/matrix-org/synapse/issues/4155))
- HTTP tests have been refactored to contain less boilerplate. ([\#4156](https://github.com/matrix-org/synapse/issues/4156))
- Drop incoming events from federation for unknown rooms ([\#4165](https://github.com/matrix-org/synapse/issues/4165))
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/devices.py100
-rw-r--r--synapse/storage/e2e_room_keys.py19
-rw-r--r--synapse/storage/end_to_end_keys.py5
-rw-r--r--synapse/storage/event_federation.py4
-rw-r--r--synapse/storage/events.py165
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/room.py2
-rw-r--r--synapse/storage/schema/delta/40/device_list_streams.sql9
-rw-r--r--synapse/storage/schema/delta/52/add_event_to_state_group_index.sql19
-rw-r--r--synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql36
-rw-r--r--synapse/storage/schema/delta/52/e2e_room_keys.sql53
-rw-r--r--synapse/storage/state.py7
12 files changed, 351 insertions, 70 deletions
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index d10ff9e4b9..ecdab34e7d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -22,14 +22,19 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 
-from ._base import Cache, SQLBaseStore, db_to_json
+from ._base import Cache, db_to_json
 
 logger = logging.getLogger(__name__)
 
+DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
+    "drop_device_list_streams_non_unique_indexes"
+)
 
-class DeviceStore(SQLBaseStore):
+
+class DeviceStore(BackgroundUpdateStore):
     def __init__(self, db_conn, hs):
         super(DeviceStore, self).__init__(db_conn, hs)
 
@@ -52,6 +57,30 @@ class DeviceStore(SQLBaseStore):
             columns=["user_id", "device_id"],
         )
 
+        # create a unique index on device_lists_remote_cache
+        self.register_background_index_update(
+            "device_lists_remote_cache_unique_idx",
+            index_name="device_lists_remote_cache_unique_id",
+            table="device_lists_remote_cache",
+            columns=["user_id", "device_id"],
+            unique=True,
+        )
+
+        # And one on device_lists_remote_extremeties
+        self.register_background_index_update(
+            "device_lists_remote_extremeties_unique_idx",
+            index_name="device_lists_remote_extremeties_unique_idx",
+            table="device_lists_remote_extremeties",
+            columns=["user_id"],
+            unique=True,
+        )
+
+        # once they complete, we can remove the old non-unique indexes.
+        self.register_background_update_handler(
+            DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
+            self._drop_device_list_streams_non_unique_indexes,
+        )
+
     @defer.inlineCallbacks
     def store_device(self, user_id, device_id,
                      initial_device_display_name):
@@ -239,7 +268,19 @@ class DeviceStore(SQLBaseStore):
 
     def update_remote_device_list_cache_entry(self, user_id, device_id, content,
                                               stream_id):
-        """Updates a single user's device in the cache.
+        """Updates a single device in the cache of a remote user's devicelist.
+
+        Note: assumes that we are the only thread that can be updating this user's
+        device list.
+
+        Args:
+            user_id (str): User to update device list for
+            device_id (str): ID of decivice being updated
+            content (dict): new data on this device
+            stream_id (int): the version of the device list
+
+        Returns:
+            Deferred[None]
         """
         return self.runInteraction(
             "update_remote_device_list_cache_entry",
@@ -272,7 +313,11 @@ class DeviceStore(SQLBaseStore):
                 },
                 values={
                     "content": json.dumps(content),
-                }
+                },
+
+                # we don't need to lock, because we assume we are the only thread
+                # updating this user's devices.
+                lock=False,
             )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
@@ -289,11 +334,26 @@ class DeviceStore(SQLBaseStore):
             },
             values={
                 "stream_id": stream_id,
-            }
+            },
+
+            # again, we can assume we are the only thread updating this user's
+            # extremity.
+            lock=False,
         )
 
     def update_remote_device_list_cache(self, user_id, devices, stream_id):
-        """Replace the cache of the remote user's devices.
+        """Replace the entire cache of the remote user's devices.
+
+        Note: assumes that we are the only thread that can be updating this user's
+        device list.
+
+        Args:
+            user_id (str): User to update device list for
+            devices (list[dict]): list of device objects supplied over federation
+            stream_id (int): the version of the device list
+
+        Returns:
+            Deferred[None]
         """
         return self.runInteraction(
             "update_remote_device_list_cache",
@@ -338,7 +398,11 @@ class DeviceStore(SQLBaseStore):
             },
             values={
                 "stream_id": stream_id,
-            }
+            },
+
+            # we don't need to lock, because we can assume we are the only thread
+            # updating this user's extremity.
+            lock=False,
         )
 
     def get_devices_by_remote(self, destination, from_stream_id):
@@ -589,10 +653,14 @@ class DeviceStore(SQLBaseStore):
         combined list of changes to devices, and which destinations need to be
         poked. `destination` may be None if no destinations need to be poked.
         """
+        # We do a group by here as there can be a large number of duplicate
+        # entries, since we throw away device IDs.
         sql = """
-            SELECT stream_id, user_id, destination FROM device_lists_stream
+            SELECT MAX(stream_id) AS stream_id, user_id, destination
+            FROM device_lists_stream
             LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
             WHERE ? < stream_id AND stream_id <= ?
+            GROUP BY user_id, destination
         """
         return self._execute(
             "get_all_device_list_changes_for_remotes", None,
@@ -718,3 +786,19 @@ class DeviceStore(SQLBaseStore):
             "_prune_old_outbound_device_pokes",
             _prune_txn,
         )
+
+    @defer.inlineCallbacks
+    def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
+        def f(conn):
+            txn = conn.cursor()
+            txn.execute(
+                "DROP INDEX IF EXISTS device_lists_remote_cache_id"
+            )
+            txn.execute(
+                "DROP INDEX IF EXISTS device_lists_remote_extremeties_id"
+            )
+            txn.close()
+
+        yield self.runWithConnection(f)
+        yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+        defer.returnValue(1)
diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py
index f25ded2295..16b7f005aa 100644
--- a/synapse/storage/e2e_room_keys.py
+++ b/synapse/storage/e2e_room_keys.py
@@ -118,6 +118,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             these room keys.
         """
 
+        try:
+            version = int(version)
+        except ValueError:
+            defer.returnValue({'rooms': {}})
+
         keyvalues = {
             "user_id": user_id,
             "version": version,
@@ -212,14 +217,23 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         Raises:
             StoreError: with code 404 if there are no e2e_room_keys_versions present
         Returns:
-            A deferred dict giving the info metadata for this backup version
+            A deferred dict giving the info metadata for this backup version, with
+            fields including:
+                version(str)
+                algorithm(str)
+                auth_data(object): opaque dict supplied by the client
         """
 
         def _get_e2e_room_keys_version_info_txn(txn):
             if version is None:
                 this_version = self._get_current_version(txn, user_id)
             else:
-                this_version = version
+                try:
+                    this_version = int(version)
+                except ValueError:
+                    # Our versions are all ints so if we can't convert it to an integer,
+                    # it isn't there.
+                    raise StoreError(404, "No row found")
 
             result = self._simple_select_one_txn(
                 txn,
@@ -236,6 +250,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 ),
             )
             result["auth_data"] = json.loads(result["auth_data"])
+            result["version"] = str(result["version"])
             return result
 
         return self.runInteraction(
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 1f1721e820..2a0f6cfca9 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -40,7 +40,10 @@ class EndToEndKeyStore(SQLBaseStore):
                 allow_none=True,
             )
 
-            new_key_json = encode_canonical_json(device_keys)
+            # In py3 we need old_key_json to match new_key_json type. The DB
+            # returns unicode while encode_canonical_json returns bytes.
+            new_key_json = encode_canonical_json(device_keys).decode("utf-8")
+
             if old_key_json == new_key_json:
                 return False
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 3faca2a042..d3b9dea1d6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -477,7 +477,7 @@ class EventFederationStore(EventFederationWorkerStore):
                     "is_state": False,
                 }
                 for ev in events
-                for e_id, _ in ev.prev_events
+                for e_id in ev.prev_event_ids()
             ],
         )
 
@@ -510,7 +510,7 @@ class EventFederationStore(EventFederationWorkerStore):
 
         txn.executemany(query, [
             (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
-            for ev in events for e_id, _ in ev.prev_events
+            for ev in events for e_id in ev.prev_event_ids()
             if not ev.internal_metadata.is_outlier()
         ])
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 8881b009df..2047110b1d 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -38,6 +38,7 @@ from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.event_federation import EventFederationStore
 from synapse.storage.events_worker import EventsWorkerStore
+from synapse.storage.state import StateGroupWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import batch_iter
 from synapse.util.async_helpers import ObservableDeferred
@@ -205,7 +206,8 @@ def _retry_on_integrity_error(func):
 
 # inherits from EventFederationStore so that we can call _update_backward_extremities
 # and _handle_mult_prev_events (though arguably those could both be moved in here)
-class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
+class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
+                  BackgroundUpdateStore):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
@@ -414,7 +416,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                             )
                             if len_1:
                                 all_single_prev_not_state = all(
-                                    len(event.prev_events) == 1
+                                    len(event.prev_event_ids()) == 1
                                     and not event.is_state()
                                     for event, ctx in ev_ctx_rm
                                 )
@@ -438,7 +440,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                                 # guess this by looking at the prev_events and checking
                                 # if they match the current forward extremities.
                                 for ev, _ in ev_ctx_rm:
-                                    prev_event_ids = set(e for e, _ in ev.prev_events)
+                                    prev_event_ids = set(ev.prev_event_ids())
                                     if latest_event_ids == prev_event_ids:
                                         state_delta_reuse_delta_counter.inc()
                                         break
@@ -549,7 +551,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         result.difference_update(
             e_id
             for event in new_events
-            for e_id, _ in event.prev_events
+            for e_id in event.prev_event_ids()
         )
 
         # Finally, remove any events which are prev_events of any existing events.
@@ -867,7 +869,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
                     "auth_id": auth_id,
                 }
                 for event, _ in events_and_contexts
-                for auth_id, _ in event.auth_events
+                for auth_id in event.auth_event_ids()
                 if event.is_state()
             ],
         )
@@ -2034,55 +2036,37 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
         logger.info("[purge] finding redundant state groups")
 
-        # Get all state groups that are only referenced by events that are
-        # to be deleted.
-        # This works by first getting state groups that we may want to delete,
-        # joining against event_to_state_groups to get events that use that
-        # state group, then left joining against events_to_purge again. Any
-        # state group where the left join produce *no nulls* are referenced
-        # only by events that are going to be purged.
+        # Get all state groups that are referenced by events that are to be
+        # deleted. We then go and check if they are referenced by other events
+        # or state groups, and if not we delete them.
         txn.execute("""
-            SELECT state_group FROM
-            (
-                SELECT DISTINCT state_group FROM events_to_purge
-                INNER JOIN event_to_state_groups USING (event_id)
-            ) AS sp
-            INNER JOIN event_to_state_groups USING (state_group)
-            LEFT JOIN events_to_purge AS ep USING (event_id)
-            GROUP BY state_group
-            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+            SELECT DISTINCT state_group FROM events_to_purge
+            INNER JOIN event_to_state_groups USING (event_id)
         """)
 
-        state_rows = txn.fetchall()
-        logger.info("[purge] found %i redundant state groups", len(state_rows))
-
-        # make a set of the redundant state groups, so that we can look them up
-        # efficiently
-        state_groups_to_delete = set([sg for sg, in state_rows])
-
-        # Now we get all the state groups that rely on these state groups
-        logger.info("[purge] finding state groups which depend on redundant"
-                    " state groups")
-        remaining_state_groups = []
-        for i in range(0, len(state_rows), 100):
-            chunk = [sg for sg, in state_rows[i:i + 100]]
-            # look for state groups whose prev_state_group is one we are about
-            # to delete
-            rows = self._simple_select_many_txn(
-                txn,
-                table="state_group_edges",
-                column="prev_state_group",
-                iterable=chunk,
-                retcols=["state_group"],
-                keyvalues={},
-            )
-            remaining_state_groups.extend(
-                row["state_group"] for row in rows
+        referenced_state_groups = set(sg for sg, in txn)
+        logger.info(
+            "[purge] found %i referenced state groups",
+            len(referenced_state_groups),
+        )
 
-                # exclude state groups we are about to delete: no point in
-                # updating them
-                if row["state_group"] not in state_groups_to_delete
+        logger.info("[purge] finding state groups that can be deleted")
+
+        state_groups_to_delete, remaining_state_groups = (
+            self._find_unreferenced_groups_during_purge(
+                txn, referenced_state_groups,
             )
+        )
+
+        logger.info(
+            "[purge] found %i state groups to delete",
+            len(state_groups_to_delete),
+        )
+
+        logger.info(
+            "[purge] de-delta-ing %i remaining state groups",
+            len(remaining_state_groups),
+        )
 
         # Now we turn the state groups that reference to-be-deleted state
         # groups to non delta versions.
@@ -2127,11 +2111,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
         logger.info("[purge] removing redundant state groups")
         txn.executemany(
             "DELETE FROM state_groups_state WHERE state_group = ?",
-            state_rows
+            ((sg,) for sg in state_groups_to_delete),
         )
         txn.executemany(
             "DELETE FROM state_groups WHERE id = ?",
-            state_rows
+            ((sg,) for sg in state_groups_to_delete),
         )
 
         logger.info("[purge] removing events from event_to_state_groups")
@@ -2227,6 +2211,85 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
 
         logger.info("[purge] done")
 
+    def _find_unreferenced_groups_during_purge(self, txn, state_groups):
+        """Used when purging history to figure out which state groups can be
+        deleted and which need to be de-delta'ed (due to one of its prev groups
+        being scheduled for deletion).
+
+        Args:
+            txn
+            state_groups (set[int]): Set of state groups referenced by events
+                that are going to be deleted.
+
+        Returns:
+            tuple[set[int], set[int]]: The set of state groups that can be
+            deleted and the set of state groups that need to be de-delta'ed
+        """
+        # Graph of state group -> previous group
+        graph = {}
+
+        # Set of events that we have found to be referenced by events
+        referenced_groups = set()
+
+        # Set of state groups we've already seen
+        state_groups_seen = set(state_groups)
+
+        # Set of state groups to handle next.
+        next_to_search = set(state_groups)
+        while next_to_search:
+            # We bound size of groups we're looking up at once, to stop the
+            # SQL query getting too big
+            if len(next_to_search) < 100:
+                current_search = next_to_search
+                next_to_search = set()
+            else:
+                current_search = set(itertools.islice(next_to_search, 100))
+                next_to_search -= current_search
+
+            # Check if state groups are referenced
+            sql = """
+                SELECT DISTINCT state_group FROM event_to_state_groups
+                LEFT JOIN events_to_purge AS ep USING (event_id)
+                WHERE state_group IN (%s) AND ep.event_id IS NULL
+            """ % (",".join("?" for _ in current_search),)
+            txn.execute(sql, list(current_search))
+
+            referenced = set(sg for sg, in txn)
+            referenced_groups |= referenced
+
+            # We don't continue iterating up the state group graphs for state
+            # groups that are referenced.
+            current_search -= referenced
+
+            rows = self._simple_select_many_txn(
+                txn,
+                table="state_group_edges",
+                column="prev_state_group",
+                iterable=current_search,
+                keyvalues={},
+                retcols=("prev_state_group", "state_group",),
+            )
+
+            prevs = set(row["state_group"] for row in rows)
+            # We don't bother re-handling groups we've already seen
+            prevs -= state_groups_seen
+            next_to_search |= prevs
+            state_groups_seen |= prevs
+
+            for row in rows:
+                # Note: Each state group can have at most one prev group
+                graph[row["state_group"]] = row["prev_state_group"]
+
+        to_delete = state_groups_seen - referenced_groups
+
+        to_dedelta = set()
+        for sg in referenced_groups:
+            prev_sg = graph.get(sg)
+            if prev_sg and prev_sg in to_delete:
+                to_dedelta.add(sg)
+
+        return to_delete, to_dedelta
+
     @defer.inlineCallbacks
     def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index b364719312..bd740e1e45 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 51
+SCHEMA_VERSION = 52
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 61013b8919..41c65e112a 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -47,7 +47,7 @@ class RoomWorkerStore(SQLBaseStore):
         Args:
             room_id (str): The ID of the room to retrieve.
         Returns:
-            A namedtuple containing the room information, or an empty list.
+            A dict containing the room information, or None if the room is unknown.
         """
         return self._simple_select_one(
             table="rooms",
diff --git a/synapse/storage/schema/delta/40/device_list_streams.sql b/synapse/storage/schema/delta/40/device_list_streams.sql
index 54841b3843..dd6dcb65f1 100644
--- a/synapse/storage/schema/delta/40/device_list_streams.sql
+++ b/synapse/storage/schema/delta/40/device_list_streams.sql
@@ -20,9 +20,6 @@ CREATE TABLE device_lists_remote_cache (
     content TEXT NOT NULL
 );
 
-CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
-
-
 -- The last update we got for a user. Empty if we're not receiving updates for
 -- that user.
 CREATE TABLE device_lists_remote_extremeties (
@@ -30,7 +27,11 @@ CREATE TABLE device_lists_remote_extremeties (
     stream_id TEXT NOT NULL
 );
 
-CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
+-- we used to create non-unique indexes on these tables, but as of update 52 we create
+-- unique indexes concurrently:
+--
+-- CREATE INDEX device_lists_remote_cache_id ON device_lists_remote_cache(user_id, device_id);
+-- CREATE INDEX device_lists_remote_extremeties_id ON device_lists_remote_extremeties(user_id, stream_id);
 
 
 -- Stream of device lists updates. Includes both local and remotes
diff --git a/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql
new file mode 100644
index 0000000000..91e03d13e1
--- /dev/null
+++ b/synapse/storage/schema/delta/52/add_event_to_state_group_index.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is needed to efficiently check for unreferenced state groups during
+-- purge. Added events_to_state_group(state_group) index
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('event_to_state_groups_sg_index', '{}');
diff --git a/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
new file mode 100644
index 0000000000..bfa49e6f92
--- /dev/null
+++ b/synapse/storage/schema/delta/52/device_list_streams_unique_idx.sql
@@ -0,0 +1,36 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- register a background update which will create a unique index on
+-- device_lists_remote_cache
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('device_lists_remote_cache_unique_idx', '{}');
+
+-- and one on device_lists_remote_extremeties
+INSERT into background_updates (update_name, progress_json, depends_on)
+    VALUES (
+        'device_lists_remote_extremeties_unique_idx', '{}',
+
+        -- doesn't really depend on this, but we need to make sure both happen
+        -- before we drop the old indexes.
+        'device_lists_remote_cache_unique_idx'
+    );
+
+-- once they complete, we can drop the old indexes.
+INSERT into background_updates (update_name, progress_json, depends_on)
+    VALUES (
+        'drop_device_list_streams_non_unique_indexes', '{}',
+        'device_lists_remote_extremeties_unique_idx'
+    );
diff --git a/synapse/storage/schema/delta/52/e2e_room_keys.sql b/synapse/storage/schema/delta/52/e2e_room_keys.sql
new file mode 100644
index 0000000000..db687cccae
--- /dev/null
+++ b/synapse/storage/schema/delta/52/e2e_room_keys.sql
@@ -0,0 +1,53 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Change version column to an integer so we can do MAX() sensibly
+ */
+CREATE TABLE e2e_room_keys_versions_new (
+    user_id TEXT NOT NULL,
+    version BIGINT NOT NULL,
+    algorithm TEXT NOT NULL,
+    auth_data TEXT NOT NULL,
+    deleted SMALLINT DEFAULT 0 NOT NULL
+);
+
+INSERT INTO e2e_room_keys_versions_new
+    SELECT user_id, CAST(version as BIGINT), algorithm, auth_data, deleted FROM e2e_room_keys_versions;
+
+DROP TABLE e2e_room_keys_versions;
+ALTER TABLE e2e_room_keys_versions_new RENAME TO e2e_room_keys_versions;
+
+CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version);
+
+/* Change e2e_rooms_keys to match
+ */
+CREATE TABLE e2e_room_keys_new (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    session_id TEXT NOT NULL,
+    version BIGINT NOT NULL,
+    first_message_index INT,
+    forwarded_count INT,
+    is_verified BOOLEAN,
+    session_data TEXT NOT NULL
+);
+
+INSERT INTO e2e_room_keys_new
+    SELECT user_id, room_id, session_id, CAST(version as BIGINT), first_message_index, forwarded_count, is_verified, session_data FROM e2e_room_keys;
+
+DROP TABLE e2e_room_keys;
+ALTER TABLE e2e_room_keys_new RENAME TO e2e_room_keys;
+
+CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ef65929bb2..d737bd6778 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -1257,6 +1257,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
     STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
     STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
     CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
+    EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
 
     def __init__(self, db_conn, hs):
         super(StateStore, self).__init__(db_conn, hs)
@@ -1275,6 +1276,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
             columns=["state_key"],
             where_clause="type='m.room.member'",
         )
+        self.register_background_index_update(
+            self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
+            index_name="event_to_state_groups_sg_index",
+            table="event_to_state_groups",
+            columns=["state_group"],
+        )
 
     def _store_event_state_mappings_txn(self, txn, events_and_contexts):
         state_groups = {}