summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/_base.py62
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py19
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py6
-rw-r--r--synapse/storage/data_stores/main/events.py357
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py70
-rw-r--r--synapse/storage/data_stores/main/filtering.py2
-rw-r--r--synapse/storage/data_stores/main/group_server.py15
-rw-r--r--synapse/storage/data_stores/main/media_repository.py6
-rw-r--r--synapse/storage/data_stores/main/receipts.py2
-rw-r--r--synapse/storage/data_stores/main/registration.py12
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql17
-rw-r--r--synapse/storage/data_stores/main/state.py31
-rw-r--r--synapse/storage/data_stores/main/stream.py42
-rw-r--r--synapse/storage/data_stores/main/tags.py4
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/purge_events.py117
17 files changed, 540 insertions, 226 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0a1a8cc1e5..0460fe8cc9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
 from synapse.storage.data_stores import DataStores
 from synapse.storage.data_stores.main import DataStore
 from synapse.storage.persist_events import EventsPersistenceStorage
+from synapse.storage.purge_events import PurgeEventsStorage
 from synapse.storage.state import StateGroupStorage
 
 __all__ = ["DataStores", "DataStore"]
@@ -46,6 +47,7 @@ class Storage(object):
         self.main = stores.main
 
         self.persistence = EventsPersistenceStorage(hs, stores)
+        self.purge_events = PurgeEventsStorage(hs, stores)
         self.state = StateGroupStorage(hs, stores)
 
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1a2b7ebe25..459901ac60 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -361,14 +361,11 @@ class SQLBaseStore(object):
                 expiration_ts,
             )
 
-        self._simple_insert_txn(
+        self._simple_upsert_txn(
             txn,
             "account_validity",
-            values={
-                "user_id": user_id,
-                "expiration_ts_ms": expiration_ts,
-                "email_sent": False,
-            },
+            keyvalues={"user_id": user_id},
+            values={"expiration_ts_ms": expiration_ts, "email_sent": False},
         )
 
     def start_profiling(self):
@@ -412,16 +409,15 @@ class SQLBaseStore(object):
             i = 0
             N = 5
             while True:
+                cursor = LoggingTransaction(
+                    conn.cursor(),
+                    name,
+                    self.database_engine,
+                    after_callbacks,
+                    exception_callbacks,
+                )
                 try:
-                    txn = conn.cursor()
-                    txn = LoggingTransaction(
-                        txn,
-                        name,
-                        self.database_engine,
-                        after_callbacks,
-                        exception_callbacks,
-                    )
-                    r = func(txn, *args, **kwargs)
+                    r = func(cursor, *args, **kwargs)
                     conn.commit()
                     return r
                 except self.database_engine.module.OperationalError as e:
@@ -459,6 +455,40 @@ class SQLBaseStore(object):
                                 )
                             continue
                     raise
+                finally:
+                    # we're either about to retry with a new cursor, or we're about to
+                    # release the connection. Once we release the connection, it could
+                    # get used for another query, which might do a conn.rollback().
+                    #
+                    # In the latter case, even though that probably wouldn't affect the
+                    # results of this transaction, python's sqlite will reset all
+                    # statements on the connection [1], which will make our cursor
+                    # invalid [2].
+                    #
+                    # In any case, continuing to read rows after commit()ing seems
+                    # dubious from the PoV of ACID transactional semantics
+                    # (sqlite explicitly says that once you commit, you may see rows
+                    # from subsequent updates.)
+                    #
+                    # In psycopg2, cursors are essentially a client-side fabrication -
+                    # all the data is transferred to the client side when the statement
+                    # finishes executing - so in theory we could go on streaming results
+                    # from the cursor, but attempting to do so would make us
+                    # incompatible with sqlite, so let's make sure we're not doing that
+                    # by closing the cursor.
+                    #
+                    # (*named* cursors in psycopg2 are different and are proper server-
+                    # side things, but (a) we don't use them and (b) they are implicitly
+                    # closed by ending the transaction anyway.)
+                    #
+                    # In short, if we haven't finished with the cursor yet, that's a
+                    # problem waiting to bite us.
+                    #
+                    # TL;DR: we're done with the cursor, so we can close it.
+                    #
+                    # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
+                    # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
+                    cursor.close()
         except Exception as e:
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
@@ -854,7 +884,7 @@ class SQLBaseStore(object):
             allvalues.update(values)
             latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
 
-        sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
+        sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index f04aad0743..a23744f11c 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     def _add_messages_to_local_device_inbox_txn(
         self, txn, stream_id, messages_by_user_then_device
     ):
-        sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
-        txn.execute(sql, (stream_id, stream_id))
+        # Compatible method of performing an upsert
+        sql = "SELECT stream_id FROM device_max_stream_id"
+
+        txn.execute(sql)
+        rows = txn.fetchone()
+        if rows:
+            db_stream_id = rows[0]
+            if db_stream_id < stream_id:
+                # Insert the new stream_id
+                sql = "UPDATE device_max_stream_id SET stream_id = ?"
+        else:
+            # No rows, perform an insert
+            sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
+
+        txn.execute(sql, (stream_id,))
 
         local_by_user_then_device = {}
         for user_id, messages_by_device in messages_by_user_then_device.items():
@@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
+                sql = "SELECT device_id FROM devices WHERE user_id = ?"
                 txn.execute(sql, (user_id,))
                 message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 073412a78d..d8ad59ad93 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 result.setdefault(user_id, {})[device_id] = None
 
         # get signatures on the device
-        signature_sql = (
-            "SELECT * " "  FROM e2e_cross_signing_signatures " " WHERE %s"
-        ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
+        signature_sql = ("SELECT *  FROM e2e_cross_signing_signatures WHERE %s") % (
+            " OR ".join("(" + q + ")" for q in signature_query_clauses)
+        )
 
         txn.execute(signature_sql, signature_query_params)
         rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 301f8ea128..627c0b67f1 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -713,9 +713,7 @@ class EventsStore(
 
                 metadata_json = encode_json(event.internal_metadata.get_dict())
 
-                sql = (
-                    "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
-                )
+                sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
                 txn.execute(sql, (metadata_json, event.event_id))
 
                 # Add an entry to the ex_outlier_stream table to replicate the
@@ -732,7 +730,7 @@ class EventsStore(
                     },
                 )
 
-                sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
+                sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
                 txn.execute(sql, (False, event.event_id))
 
                 # Update the event_backward_extremities table now that this
@@ -1375,6 +1373,10 @@ class EventsStore(
                 if True, we will delete local events as well as remote ones
                 (instead of just marking them as outliers and deleting their
                 state groups).
+
+        Returns:
+            Deferred[set[int]]: The set of state groups that are referenced by
+            deleted events.
         """
 
         return self.runInteraction(
@@ -1475,7 +1477,7 @@ class EventsStore(
 
         # We do joins against events_to_purge for e.g. calculating state
         # groups to purge, etc., so lets make an index.
-        txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
+        txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
 
         txn.execute("SELECT event_id, should_delete FROM events_to_purge")
         event_rows = txn.fetchall()
@@ -1511,11 +1513,10 @@ class EventsStore(
             [(room_id, event_id) for event_id, in new_backwards_extrems],
         )
 
-        logger.info("[purge] finding redundant state groups")
+        logger.info("[purge] finding state groups referenced by deleted events")
 
         # Get all state groups that are referenced by events that are to be
-        # deleted. We then go and check if they are referenced by other events
-        # or state groups, and if not we delete them.
+        # deleted.
         txn.execute(
             """
             SELECT DISTINCT state_group FROM events_to_purge
@@ -1528,60 +1529,6 @@ class EventsStore(
             "[purge] found %i referenced state groups", len(referenced_state_groups)
         )
 
-        logger.info("[purge] finding state groups that can be deleted")
-
-        _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
-        state_groups_to_delete, remaining_state_groups = _
-
-        logger.info(
-            "[purge] found %i state groups to delete", len(state_groups_to_delete)
-        )
-
-        logger.info(
-            "[purge] de-delta-ing %i remaining state groups",
-            len(remaining_state_groups),
-        )
-
-        # Now we turn the state groups that reference to-be-deleted state
-        # groups to non delta versions.
-        for sg in remaining_state_groups:
-            logger.info("[purge] de-delta-ing remaining state group %s", sg)
-            curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
-            curr_state = curr_state[sg]
-
-            self._simple_delete_txn(
-                txn, table="state_groups_state", keyvalues={"state_group": sg}
-            )
-
-            self._simple_delete_txn(
-                txn, table="state_group_edges", keyvalues={"state_group": sg}
-            )
-
-            self._simple_insert_many_txn(
-                txn,
-                table="state_groups_state",
-                values=[
-                    {
-                        "state_group": sg,
-                        "room_id": room_id,
-                        "type": key[0],
-                        "state_key": key[1],
-                        "event_id": state_id,
-                    }
-                    for key, state_id in iteritems(curr_state)
-                ],
-            )
-
-        logger.info("[purge] removing redundant state groups")
-        txn.executemany(
-            "DELETE FROM state_groups_state WHERE state_group = ?",
-            ((sg,) for sg in state_groups_to_delete),
-        )
-        txn.executemany(
-            "DELETE FROM state_groups WHERE id = ?",
-            ((sg,) for sg in state_groups_to_delete),
-        )
-
         logger.info("[purge] removing events from event_to_state_groups")
         txn.execute(
             "DELETE FROM event_to_state_groups "
@@ -1668,138 +1615,35 @@ class EventsStore(
 
         logger.info("[purge] done")
 
-    def _find_unreferenced_groups_during_purge(self, txn, state_groups):
-        """Used when purging history to figure out which state groups can be
-        deleted and which need to be de-delta'ed (due to one of its prev groups
-        being scheduled for deletion).
-
-        Args:
-            txn
-            state_groups (set[int]): Set of state groups referenced by events
-                that are going to be deleted.
-
-        Returns:
-            tuple[set[int], set[int]]: The set of state groups that can be
-            deleted and the set of state groups that need to be de-delta'ed
-        """
-        # Graph of state group -> previous group
-        graph = {}
-
-        # Set of events that we have found to be referenced by events
-        referenced_groups = set()
-
-        # Set of state groups we've already seen
-        state_groups_seen = set(state_groups)
-
-        # Set of state groups to handle next.
-        next_to_search = set(state_groups)
-        while next_to_search:
-            # We bound size of groups we're looking up at once, to stop the
-            # SQL query getting too big
-            if len(next_to_search) < 100:
-                current_search = next_to_search
-                next_to_search = set()
-            else:
-                current_search = set(itertools.islice(next_to_search, 100))
-                next_to_search -= current_search
-
-            # Check if state groups are referenced
-            sql = """
-                SELECT DISTINCT state_group FROM event_to_state_groups
-                LEFT JOIN events_to_purge AS ep USING (event_id)
-                WHERE ep.event_id IS NULL AND
-            """
-            clause, args = make_in_list_sql_clause(
-                txn.database_engine, "state_group", current_search
-            )
-            txn.execute(sql + clause, list(args))
-
-            referenced = set(sg for sg, in txn)
-            referenced_groups |= referenced
-
-            # We don't continue iterating up the state group graphs for state
-            # groups that are referenced.
-            current_search -= referenced
-
-            rows = self._simple_select_many_txn(
-                txn,
-                table="state_group_edges",
-                column="prev_state_group",
-                iterable=current_search,
-                keyvalues={},
-                retcols=("prev_state_group", "state_group"),
-            )
-
-            prevs = set(row["state_group"] for row in rows)
-            # We don't bother re-handling groups we've already seen
-            prevs -= state_groups_seen
-            next_to_search |= prevs
-            state_groups_seen |= prevs
-
-            for row in rows:
-                # Note: Each state group can have at most one prev group
-                graph[row["state_group"]] = row["prev_state_group"]
-
-        to_delete = state_groups_seen - referenced_groups
-
-        to_dedelta = set()
-        for sg in referenced_groups:
-            prev_sg = graph.get(sg)
-            if prev_sg and prev_sg in to_delete:
-                to_dedelta.add(sg)
-
-        return to_delete, to_dedelta
+        return referenced_state_groups
 
     def purge_room(self, room_id):
         """Deletes all record of a room
 
         Args:
-            room_id (str):
+            room_id (str)
+
+        Returns:
+            Deferred[List[int]]: The list of state groups to delete.
         """
 
         return self.runInteraction("purge_room", self._purge_room_txn, room_id)
 
     def _purge_room_txn(self, txn, room_id):
-        # first we have to delete the state groups states
-        logger.info("[purge] removing %s from state_groups_state", room_id)
-
+        # First we fetch all the state groups that should be deleted, before
+        # we delete that information.
         txn.execute(
             """
-            DELETE FROM state_groups_state WHERE state_group IN (
-              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
-              WHERE events.room_id=?
-            )
+                SELECT DISTINCT state_group FROM events
+                INNER JOIN event_to_state_groups USING(event_id)
+                WHERE events.room_id = ?
             """,
             (room_id,),
         )
 
-        # ... and the state group edges
-        logger.info("[purge] removing %s from state_group_edges", room_id)
+        state_groups = [row[0] for row in txn]
 
-        txn.execute(
-            """
-            DELETE FROM state_group_edges WHERE state_group IN (
-              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
-              WHERE events.room_id=?
-            )
-            """,
-            (room_id,),
-        )
-
-        # ... and the state groups
-        logger.info("[purge] removing %s from state_groups", room_id)
-
-        txn.execute(
-            """
-            DELETE FROM state_groups WHERE id IN (
-              SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
-              WHERE events.room_id=?
-            )
-            """,
-            (room_id,),
-        )
-
-        # and then tables which lack an index on room_id but have one on event_id
+        # Now we delete tables which lack an index on room_id but have one on event_id
         for table in (
             "event_auth",
             "event_edges",
@@ -1887,6 +1731,165 @@ class EventsStore(
 
         logger.info("[purge] done")
 
+        return state_groups
+
+    def purge_unreferenced_state_groups(
+        self, room_id: str, state_groups_to_delete
+    ) -> defer.Deferred:
+        """Deletes no longer referenced state groups and de-deltas any state
+        groups that reference them.
+
+        Args:
+            room_id: The room the state groups belong to (must all be in the
+                same room).
+            state_groups_to_delete (Collection[int]): Set of all state groups
+                to delete.
+        """
+
+        return self.runInteraction(
+            "purge_unreferenced_state_groups",
+            self._purge_unreferenced_state_groups,
+            room_id,
+            state_groups_to_delete,
+        )
+
+    def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
+        logger.info(
+            "[purge] found %i state groups to delete", len(state_groups_to_delete)
+        )
+
+        rows = self._simple_select_many_txn(
+            txn,
+            table="state_group_edges",
+            column="prev_state_group",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+            retcols=("state_group",),
+        )
+
+        remaining_state_groups = set(
+            row["state_group"]
+            for row in rows
+            if row["state_group"] not in state_groups_to_delete
+        )
+
+        logger.info(
+            "[purge] de-delta-ing %i remaining state groups",
+            len(remaining_state_groups),
+        )
+
+        # Now we turn the state groups that reference to-be-deleted state
+        # groups to non delta versions.
+        for sg in remaining_state_groups:
+            logger.info("[purge] de-delta-ing remaining state group %s", sg)
+            curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
+            curr_state = curr_state[sg]
+
+            self._simple_delete_txn(
+                txn, table="state_groups_state", keyvalues={"state_group": sg}
+            )
+
+            self._simple_delete_txn(
+                txn, table="state_group_edges", keyvalues={"state_group": sg}
+            )
+
+            self._simple_insert_many_txn(
+                txn,
+                table="state_groups_state",
+                values=[
+                    {
+                        "state_group": sg,
+                        "room_id": room_id,
+                        "type": key[0],
+                        "state_key": key[1],
+                        "event_id": state_id,
+                    }
+                    for key, state_id in iteritems(curr_state)
+                ],
+            )
+
+        logger.info("[purge] removing redundant state groups")
+        txn.executemany(
+            "DELETE FROM state_groups_state WHERE state_group = ?",
+            ((sg,) for sg in state_groups_to_delete),
+        )
+        txn.executemany(
+            "DELETE FROM state_groups WHERE id = ?",
+            ((sg,) for sg in state_groups_to_delete),
+        )
+
+    @defer.inlineCallbacks
+    def get_previous_state_groups(self, state_groups):
+        """Fetch the previous groups of the given state groups.
+
+        Args:
+            state_groups (Iterable[int])
+
+        Returns:
+            Deferred[dict[int, int]]: mapping from state group to previous
+            state group.
+        """
+
+        rows = yield self._simple_select_many_batch(
+            table="state_group_edges",
+            column="prev_state_group",
+            iterable=state_groups,
+            keyvalues={},
+            retcols=("prev_state_group", "state_group"),
+            desc="get_previous_state_groups",
+        )
+
+        return {row["state_group"]: row["prev_state_group"] for row in rows}
+
+    def purge_room_state(self, room_id, state_groups_to_delete):
+        """Deletes all record of a room from state tables
+
+        Args:
+            room_id (str):
+            state_groups_to_delete (list[int]): State groups to delete
+        """
+
+        return self.runInteraction(
+            "purge_room_state",
+            self._purge_room_state_txn,
+            room_id,
+            state_groups_to_delete,
+        )
+
+    def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
+        # first we have to delete the state groups states
+        logger.info("[purge] removing %s from state_groups_state", room_id)
+
+        self._simple_delete_many_txn(
+            txn,
+            table="state_groups_state",
+            column="state_group",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+        )
+
+        # ... and the state group edges
+        logger.info("[purge] removing %s from state_group_edges", room_id)
+
+        self._simple_delete_many_txn(
+            txn,
+            table="state_group_edges",
+            column="state_group",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+        )
+
+        # ... and the state groups
+        logger.info("[purge] removing %s from state_groups", room_id)
+
+        self._simple_delete_many_txn(
+            txn,
+            table="state_groups",
+            column="id",
+            iterable=state_groups_to_delete,
+            keyvalues={},
+        )
+
     async def is_event_after(self, event_id1, event_id2):
         """Returns True if event_id1 is after event_id2 in the stream
         """
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 51352b9966..aa87f9abc5 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -21,6 +21,7 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import EventContentFields
 from synapse.storage._base import make_in_list_sql_clause
 from synapse.storage.background_updates import BackgroundUpdateStore
 
@@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             "event_fix_redactions_bytes", self._event_fix_redactions_bytes
         )
 
+        self.register_background_update_handler(
+            "event_store_labels", self._event_store_labels
+        )
+
     @defer.inlineCallbacks
     def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
@@ -503,3 +508,68 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         yield self._end_background_update("event_fix_redactions_bytes")
 
         return 1
+
+    @defer.inlineCallbacks
+    def _event_store_labels(self, progress, batch_size):
+        """Background update handler which will store labels for existing events."""
+        last_event_id = progress.get("last_event_id", "")
+
+        def _event_store_labels_txn(txn):
+            txn.execute(
+                """
+                SELECT event_id, json FROM event_json
+                LEFT JOIN event_labels USING (event_id)
+                WHERE event_id > ? AND label IS NULL
+                ORDER BY event_id LIMIT ?
+                """,
+                (last_event_id, batch_size),
+            )
+
+            results = list(txn)
+
+            nbrows = 0
+            last_row_event_id = ""
+            for (event_id, event_json_raw) in results:
+                try:
+                    event_json = json.loads(event_json_raw)
+
+                    self._simple_insert_many_txn(
+                        txn=txn,
+                        table="event_labels",
+                        values=[
+                            {
+                                "event_id": event_id,
+                                "label": label,
+                                "room_id": event_json["room_id"],
+                                "topological_ordering": event_json["depth"],
+                            }
+                            for label in event_json["content"].get(
+                                EventContentFields.LABELS, []
+                            )
+                            if isinstance(label, str)
+                        ],
+                    )
+                except Exception as e:
+                    logger.warning(
+                        "Unable to load event %s (no labels will be imported): %s",
+                        event_id,
+                        e,
+                    )
+
+                nbrows += 1
+                last_row_event_id = event_id
+
+            self._background_update_progress_txn(
+                txn, "event_store_labels", {"last_event_id": last_row_event_id}
+            )
+
+            return nbrows
+
+        num_rows = yield self.runInteraction(
+            desc="event_store_labels", func=_event_store_labels_txn
+        )
+
+        if not num_rows:
+            yield self._end_background_update("event_store_labels")
+
+        return num_rows
diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py
index a2a2a67927..f05ace299a 100644
--- a/synapse/storage/data_stores/main/filtering.py
+++ b/synapse/storage/data_stores/main/filtering.py
@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
             if filter_id_response is not None:
                 return filter_id_response[0]
 
-            sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
+            sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
             txn.execute(sql, (user_localpart,))
             max_id = txn.fetchone()[0]
             if max_id is None:
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index b3a2771f1b..5ded539af8 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -553,6 +553,21 @@ class GroupServerStore(SQLBaseStore):
             desc="remove_user_from_summary",
         )
 
+    def get_local_groups_for_room(self, room_id):
+        """Get all of the local group that contain a given room
+        Args:
+            room_id (str): The ID of a room
+        Returns:
+            Deferred[list[str]]: A twisted.Deferred containing a list of group ids
+                containing this room
+        """
+        return self._simple_select_onecol(
+            table="group_rooms",
+            keyvalues={"room_id": room_id},
+            retcol="group_id",
+            desc="get_local_groups_for_room",
+        )
+
     def get_users_for_summary_by_role(self, group_id, include_private=False):
         """Get the users and roles that should be included in a summary request
 
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 84b5f3ad5e..0f2887bdce 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         if len(media_ids) == 0:
             return
 
-        sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
+        sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
 
         def _delete_url_cache_txn(txn):
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             return
 
         def _delete_url_cache_media_txn(txn):
-            sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
-            sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0c24430f28..8b17334ff4 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 args.append(limit)
             txn.execute(sql, args)
 
-            return (r[0:5] + (json.loads(r[5]),) for r in txn)
+            return list(r[0:5] + (json.loads(r[5]),) for r in txn)
 
         return self.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index f70d41ecab..6a594c160c 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -377,9 +377,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
 
         def f(txn):
-            sql = (
-                "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
-            )
+            sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
             txn.execute(sql, (user_id,))
             return dict(txn)
 
@@ -488,14 +486,14 @@ class RegistrationWorkerStore(SQLBaseStore):
         we can. Unfortunately, it's possible some of them are already taken by
         existing users, and there may be gaps in the already taken range. This
         function returns the start of the first allocatable gap. This is to
-        avoid the case of ID 10000000 being pre-allocated, so us wasting the
-        first (and shortest) many generated user IDs.
+        avoid the case of ID 1000 being pre-allocated and starting at 1001 while
+        0-999 are available.
         """
 
         def _find_next_generated_user_id(txn):
-            # We bound between '@1' and '@a' to avoid pulling the entire table
+            # We bound between '@0' and '@a' to avoid pulling the entire table
             # out.
-            txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
+            txn.execute("SELECT name FROM users WHERE '@0' <= name AND name < '@a'")
 
             regex = re.compile(r"^@(\d+):")
 
diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql
new file mode 100644
index 0000000000..5f5e0499ae
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('event_store_labels', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 3132848034..6a90daea31 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -285,7 +285,11 @@ class StateGroupWorkerStore(
             room_id (str)
 
         Returns:
-            Deferred[unicode|None]: predecessor room id
+            Deferred[dict|None]: A dictionary containing the structure of the predecessor
+                field from the room's create event. The structure is subject to other servers,
+                but it is expected to be:
+                    * room_id (str): The room ID of the predecessor room
+                    * event_id (str): The ID of the tombstone event in the predecessor room
 
         Raises:
             NotFoundError if the room is unknown
@@ -991,6 +995,29 @@ class StateGroupWorkerStore(
 
         return self.runInteraction("store_state_group", _store_state_group_txn)
 
+    @defer.inlineCallbacks
+    def get_referenced_state_groups(self, state_groups):
+        """Check if the state groups are referenced by events.
+
+        Args:
+            state_groups (Iterable[int])
+
+        Returns:
+            Deferred[set[int]]: The subset of state groups that are
+            referenced.
+        """
+
+        rows = yield self._simple_select_many_batch(
+            table="event_to_state_groups",
+            column="state_group",
+            iterable=state_groups,
+            keyvalues={},
+            retcols=("DISTINCT state_group",),
+            desc="get_referenced_state_groups",
+        )
+
+        return set(row["state_group"] for row in rows)
+
 
 class StateBackgroundUpdateStore(
     StateGroupBackgroundUpdateStore, BackgroundUpdateStore
@@ -1231,7 +1258,7 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
             # if the event was rejected, just give it the same state as its
             # predecessor.
             if context.rejected:
-                state_groups[event.event_id] = context.prev_group
+                state_groups[event.event_id] = context.state_group_before_event
                 continue
 
             state_groups[event.event_id] = context.state_group
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 616ef91d4e..9ae4a913a1 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     def _get_max_topological_txn(self, txn, room_id):
         txn.execute(
-            "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
+            "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
             (room_id,),
         )
 
@@ -871,14 +871,38 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         args.append(int(limit))
 
-        sql = (
-            "SELECT DISTINCT event_id, topological_ordering, stream_ordering"
-            " FROM events"
-            " LEFT JOIN event_labels USING (event_id, room_id, topological_ordering)"
-            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
-            " ORDER BY topological_ordering %(order)s,"
-            " stream_ordering %(order)s LIMIT ?"
-        ) % {"bounds": bounds, "order": order}
+        select_keywords = "SELECT"
+        join_clause = ""
+        if event_filter and event_filter.labels:
+            # If we're not filtering on a label, then joining on event_labels will
+            # return as many row for a single event as the number of labels it has. To
+            # avoid this, only join if we're filtering on at least one label.
+            join_clause = """
+                LEFT JOIN event_labels
+                USING (event_id, room_id, topological_ordering)
+            """
+            if len(event_filter.labels) > 1:
+                # Using DISTINCT in this SELECT query is quite expensive, because it
+                # requires the engine to sort on the entire (not limited) result set,
+                # i.e. the entire events table. We only need to use it when we're
+                # filtering on more than two labels, because that's the only scenario
+                # in which we can possibly to get multiple times the same event ID in
+                # the results.
+                select_keywords += "DISTINCT"
+
+        sql = """
+            %(select_keywords)s event_id, topological_ordering, stream_ordering
+            FROM events
+            %(join_clause)s
+            WHERE outlier = ? AND room_id = ? AND %(bounds)s
+            ORDER BY topological_ordering %(order)s,
+            stream_ordering %(order)s LIMIT ?
+        """ % {
+            "select_keywords": select_keywords,
+            "join_clause": join_clause,
+            "bounds": bounds,
+            "order": order,
+        }
 
         txn.execute(sql, args)
 
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 10d1887f75..aa24339717 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
         )
 
         def get_tag_content(txn, tag_ids):
-            sql = (
-                "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
-            )
+            sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
             results = []
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 2e7753820e..731e1c9d9c 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
         # Mark as done.
         cur.execute(
             database_engine.convert_param_style(
-                "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
+                "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
             ),
             (modname, name),
         )
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
new file mode 100644
index 0000000000..a368182034
--- /dev/null
+++ b/synapse/storage/purge_events.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+
+from twisted.internet import defer
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeEventsStorage(object):
+    """High level interface for purging rooms and event history.
+    """
+
+    def __init__(self, hs, stores):
+        self.stores = stores
+
+    @defer.inlineCallbacks
+    def purge_room(self, room_id: str):
+        """Deletes all record of a room
+        """
+
+        state_groups_to_delete = yield self.stores.main.purge_room(room_id)
+        yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
+
+    @defer.inlineCallbacks
+    def purge_history(self, room_id, token, delete_local_events):
+        """Deletes room history before a certain point
+
+        Args:
+            room_id (str):
+
+            token (str): A topological token to delete events before
+
+            delete_local_events (bool):
+                if True, we will delete local events as well as remote ones
+                (instead of just marking them as outliers and deleting their
+                state groups).
+        """
+        state_groups = yield self.stores.main.purge_history(
+            room_id, token, delete_local_events
+        )
+
+        logger.info("[purge] finding state groups that can be deleted")
+
+        sg_to_delete = yield self._find_unreferenced_groups(state_groups)
+
+        yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
+
+    @defer.inlineCallbacks
+    def _find_unreferenced_groups(self, state_groups):
+        """Used when purging history to figure out which state groups can be
+        deleted.
+
+        Args:
+            state_groups (set[int]): Set of state groups referenced by events
+                that are going to be deleted.
+
+        Returns:
+            Deferred[set[int]] The set of state groups that can be deleted.
+        """
+        # Graph of state group -> previous group
+        graph = {}
+
+        # Set of events that we have found to be referenced by events
+        referenced_groups = set()
+
+        # Set of state groups we've already seen
+        state_groups_seen = set(state_groups)
+
+        # Set of state groups to handle next.
+        next_to_search = set(state_groups)
+        while next_to_search:
+            # We bound size of groups we're looking up at once, to stop the
+            # SQL query getting too big
+            if len(next_to_search) < 100:
+                current_search = next_to_search
+                next_to_search = set()
+            else:
+                current_search = set(itertools.islice(next_to_search, 100))
+                next_to_search -= current_search
+
+            referenced = yield self.stores.main.get_referenced_state_groups(
+                current_search
+            )
+            referenced_groups |= referenced
+
+            # We don't continue iterating up the state group graphs for state
+            # groups that are referenced.
+            current_search -= referenced
+
+            edges = yield self.stores.main.get_previous_state_groups(current_search)
+
+            prevs = set(edges.values())
+            # We don't bother re-handling groups we've already seen
+            prevs -= state_groups_seen
+            next_to_search |= prevs
+            state_groups_seen |= prevs
+
+            graph.update(edges)
+
+        to_delete = state_groups_seen - referenced_groups
+
+        return to_delete