summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorBen Banfield-Zanin <benbz@matrix.org>2021-02-16 13:33:20 +0000
committerBen Banfield-Zanin <benbz@matrix.org>2021-02-16 13:33:20 +0000
commitdcf1b9c276e22bb6f5200fc029301c4d40e87a1f (patch)
tree1f5badce24645d99534133a7a989069906088fff /synapse/storage
parentMerge remote-tracking branch 'origin/release-v1.24.0' into bbz/info-mainline-... (diff)
parentFixup CHANGES (diff)
downloadsynapse-bbz/info-mainline-1.27.0.tar.xz
Merge remote-tracking branch 'origin/release-v1.27.0' into bbz/info-mainline-1.27.0 github/bbz/info-mainline-1.27.0 bbz/info-mainline-1.27.0
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py9
-rw-r--r--synapse/storage/_base.py36
-rw-r--r--synapse/storage/background_updates.py111
-rw-r--r--synapse/storage/database.py52
-rw-r--r--synapse/storage/databases/main/__init__.py57
-rw-r--r--synapse/storage/databases/main/account_data.py254
-rw-r--r--synapse/storage/databases/main/client_ips.py102
-rw-r--r--synapse/storage/databases/main/deviceinbox.py250
-rw-r--r--synapse/storage/databases/main/devices.py36
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py133
-rw-r--r--synapse/storage/databases/main/event_federation.py189
-rw-r--r--synapse/storage/databases/main/event_push_actions.py106
-rw-r--r--synapse/storage/databases/main/events.py703
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py360
-rw-r--r--synapse/storage/databases/main/events_forward_extremities.py101
-rw-r--r--synapse/storage/databases/main/events_worker.py8
-rw-r--r--synapse/storage/databases/main/keys.py10
-rw-r--r--synapse/storage/databases/main/media_repository.py13
-rw-r--r--synapse/storage/databases/main/metrics.py56
-rw-r--r--synapse/storage/databases/main/profile.py2
-rw-r--r--synapse/storage/databases/main/purge_events.py2
-rw-r--r--synapse/storage/databases/main/pusher.py102
-rw-r--r--synapse/storage/databases/main/receipts.py108
-rw-r--r--synapse/storage/databases/main/registration.py134
-rw-r--r--synapse/storage/databases/main/room.py61
-rw-r--r--synapse/storage/databases/main/roommember.py6
-rw-r--r--synapse/storage/databases/main/schema/delta/58/25user_external_ids_user_id_idx.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/58/26access_token_last_validated.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/58/27local_invites.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres16
-rw-r--r--synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite62
-rw-r--r--synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/59/01ignored_user.py82
-rw-r--r--synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres25
-rw-r--r--synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql52
-rw-r--r--synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres16
-rw-r--r--synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql20
-rw-r--r--synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql.postgres32
-rw-r--r--synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql18
-rw-r--r--synapse/storage/databases/main/search.py7
-rw-r--r--synapse/storage/databases/main/stats.py22
-rw-r--r--synapse/storage/databases/main/tags.py20
-rw-r--r--synapse/storage/databases/main/transactions.py24
-rw-r--r--synapse/storage/databases/main/user_directory.py33
-rw-r--r--synapse/storage/databases/state/store.py4
-rw-r--r--synapse/storage/keys.py5
-rw-r--r--synapse/storage/persist_events.py200
-rw-r--r--synapse/storage/prepare_database.py126
-rw-r--r--synapse/storage/purge_events.py11
-rw-r--r--synapse/storage/relations.py44
-rw-r--r--synapse/storage/state.py35
-rw-r--r--synapse/storage/util/id_generators.py113
-rw-r--r--synapse/storage/util/sequence.py82
57 files changed, 3333 insertions, 856 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index bbff3c8d5b..c0d9d1240f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -27,6 +27,7 @@ There are also schemas that get applied to every database, regardless of the
 data stores associated with them (e.g. the schema version tables), which are
 stored in `synapse.storage.schema`.
 """
+from typing import TYPE_CHECKING
 
 from synapse.storage.databases import Databases
 from synapse.storage.databases.main import DataStore
@@ -34,14 +35,18 @@ from synapse.storage.persist_events import EventsPersistenceStorage
 from synapse.storage.purge_events import PurgeEventsStorage
 from synapse.storage.state import StateGroupStorage
 
-__all__ = ["DataStores", "DataStore"]
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
+
+__all__ = ["Databases", "DataStore"]
 
 
 class Storage:
     """The high level interfaces for talking to various storage layers.
     """
 
-    def __init__(self, hs, stores: Databases):
+    def __init__(self, hs: "HomeServer", stores: Databases):
         # We include the main data store here mainly so that we don't have to
         # rewrite all the existing code to split it into high vs low level
         # interfaces.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2b196ded1b..a25c4093bc 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -17,14 +17,18 @@
 import logging
 import random
 from abc import ABCMeta
-from typing import Any, Optional
+from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
 
 from synapse.storage.database import LoggingTransaction  # noqa: F401
 from synapse.storage.database import make_in_list_sql_clause  # noqa: F401
 from synapse.storage.database import DatabasePool
-from synapse.types import Collection, get_domain_from_id
+from synapse.storage.types import Connection
+from synapse.types import Collection, StreamToken, get_domain_from_id
 from synapse.util import json_decoder
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -36,24 +40,31 @@ class SQLBaseStore(metaclass=ABCMeta):
     per data store (and not one per physical database).
     """
 
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         self.hs = hs
         self._clock = hs.get_clock()
         self.database_engine = database.engine
         self.db_pool = database
         self.rand = random.SystemRandom()
 
-    def process_replication_rows(self, stream_name, instance_name, token, rows):
+    def process_replication_rows(
+        self,
+        stream_name: str,
+        instance_name: str,
+        token: StreamToken,
+        rows: Iterable[Any],
+    ) -> None:
         pass
 
-    def _invalidate_state_caches(self, room_id, members_changed):
+    def _invalidate_state_caches(
+        self, room_id: str, members_changed: Iterable[str]
+    ) -> None:
         """Invalidates caches that are based on the current state, but does
         not stream invalidations down replication.
 
         Args:
-            room_id (str): Room where state changed
-            members_changed (iterable[str]): The user_ids of members that have
-                changed
+            room_id: Room where state changed
+            members_changed: The user_ids of members that have changed
         """
         for host in {get_domain_from_id(u) for u in members_changed}:
             self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
@@ -64,7 +75,7 @@ class SQLBaseStore(metaclass=ABCMeta):
 
     def _attempt_to_invalidate_cache(
         self, cache_name: str, key: Optional[Collection[Any]]
-    ):
+    ) -> None:
         """Attempts to invalidate the cache of the given name, ignoring if the
         cache doesn't exist. Mainly used for invalidating caches on workers,
         where they may not have the cache.
@@ -88,12 +99,15 @@ class SQLBaseStore(metaclass=ABCMeta):
             cache.invalidate(tuple(key))
 
 
-def db_to_json(db_content):
+def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
     """
     Take some data from a database row and return a JSON-decoded object.
 
     Args:
-        db_content (memoryview|buffer|bytes|bytearray|unicode)
+        db_content: The JSON-encoded contents from the database.
+
+    Returns:
+        The object decoded from JSON.
     """
     # psycopg2 on Python 3 returns memoryview objects, which we need to
     # cast to bytes to decode
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 810721ebe9..29b8ca676a 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,29 +12,34 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
-from typing import Optional
+from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Iterable, Optional
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.types import Connection
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 
 from . import engines
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+    from synapse.storage.database import DatabasePool, LoggingTransaction
+
 logger = logging.getLogger(__name__)
 
 
 class BackgroundUpdatePerformance:
     """Tracks the how long a background update is taking to update its items"""
 
-    def __init__(self, name):
+    def __init__(self, name: str):
         self.name = name
         self.total_item_count = 0
-        self.total_duration_ms = 0
-        self.avg_item_count = 0
-        self.avg_duration_ms = 0
+        self.total_duration_ms = 0.0
+        self.avg_item_count = 0.0
+        self.avg_duration_ms = 0.0
 
-    def update(self, item_count, duration_ms):
+    def update(self, item_count: int, duration_ms: float) -> None:
         """Update the stats after doing an update"""
         self.total_item_count += item_count
         self.total_duration_ms += duration_ms
@@ -44,7 +49,7 @@ class BackgroundUpdatePerformance:
         self.avg_item_count += 0.1 * (item_count - self.avg_item_count)
         self.avg_duration_ms += 0.1 * (duration_ms - self.avg_duration_ms)
 
-    def average_items_per_ms(self):
+    def average_items_per_ms(self) -> Optional[float]:
         """An estimate of how long it takes to do a single update.
         Returns:
             A duration in ms as a float
@@ -58,7 +63,7 @@ class BackgroundUpdatePerformance:
             # changes in how long the update process takes.
             return float(self.avg_item_count) / float(self.avg_duration_ms)
 
-    def total_items_per_ms(self):
+    def total_items_per_ms(self) -> Optional[float]:
         """An estimate of how long it takes to do a single update.
         Returns:
             A duration in ms as a float
@@ -83,21 +88,25 @@ class BackgroundUpdater:
     BACKGROUND_UPDATE_INTERVAL_MS = 1000
     BACKGROUND_UPDATE_DURATION_MS = 100
 
-    def __init__(self, hs, database):
+    def __init__(self, hs: "HomeServer", database: "DatabasePool"):
         self._clock = hs.get_clock()
         self.db_pool = database
 
         # if a background update is currently running, its name.
         self._current_background_update = None  # type: Optional[str]
 
-        self._background_update_performance = {}
-        self._background_update_handlers = {}
+        self._background_update_performance = (
+            {}
+        )  # type: Dict[str, BackgroundUpdatePerformance]
+        self._background_update_handlers = (
+            {}
+        )  # type: Dict[str, Callable[[JsonDict, int], Awaitable[int]]]
         self._all_done = False
 
-    def start_doing_background_updates(self):
+    def start_doing_background_updates(self) -> None:
         run_as_background_process("background_updates", self.run_background_updates)
 
-    async def run_background_updates(self, sleep=True):
+    async def run_background_updates(self, sleep: bool = True) -> None:
         logger.info("Starting background schema updates")
         while True:
             if sleep:
@@ -148,7 +157,7 @@ class BackgroundUpdater:
 
         return False
 
-    async def has_completed_background_update(self, update_name) -> bool:
+    async def has_completed_background_update(self, update_name: str) -> bool:
         """Check if the given background update has finished running.
         """
         if self._all_done:
@@ -173,8 +182,7 @@ class BackgroundUpdater:
         Returns once some amount of work is done.
 
         Args:
-            desired_duration_ms(float): How long we want to spend
-                updating.
+            desired_duration_ms: How long we want to spend updating.
         Returns:
             True if we have finished running all the background updates, otherwise False
         """
@@ -220,6 +228,7 @@ class BackgroundUpdater:
         return False
 
     async def _do_background_update(self, desired_duration_ms: float) -> int:
+        assert self._current_background_update is not None
         update_name = self._current_background_update
         logger.info("Starting update batch on background update '%s'", update_name)
 
@@ -273,7 +282,11 @@ class BackgroundUpdater:
 
         return len(self._background_update_performance)
 
-    def register_background_update_handler(self, update_name, update_handler):
+    def register_background_update_handler(
+        self,
+        update_name: str,
+        update_handler: Callable[[JsonDict, int], Awaitable[int]],
+    ):
         """Register a handler for doing a background update.
 
         The handler should take two arguments:
@@ -287,12 +300,12 @@ class BackgroundUpdater:
         The handler is responsible for updating the progress of the update.
 
         Args:
-            update_name(str): The name of the update that this code handles.
-            update_handler(function): The function that does the update.
+            update_name: The name of the update that this code handles.
+            update_handler: The function that does the update.
         """
         self._background_update_handlers[update_name] = update_handler
 
-    def register_noop_background_update(self, update_name):
+    def register_noop_background_update(self, update_name: str) -> None:
         """Register a noop handler for a background update.
 
         This is useful when we previously did a background update, but no
@@ -302,10 +315,10 @@ class BackgroundUpdater:
         also be called to clear the update.
 
         Args:
-            update_name (str): Name of update
+            update_name: Name of update
         """
 
-        async def noop_update(progress, batch_size):
+        async def noop_update(progress: JsonDict, batch_size: int) -> int:
             await self._end_background_update(update_name)
             return 1
 
@@ -313,14 +326,14 @@ class BackgroundUpdater:
 
     def register_background_index_update(
         self,
-        update_name,
-        index_name,
-        table,
-        columns,
-        where_clause=None,
-        unique=False,
-        psql_only=False,
-    ):
+        update_name: str,
+        index_name: str,
+        table: str,
+        columns: Iterable[str],
+        where_clause: Optional[str] = None,
+        unique: bool = False,
+        psql_only: bool = False,
+    ) -> None:
         """Helper for store classes to do a background index addition
 
         To use:
@@ -332,19 +345,19 @@ class BackgroundUpdater:
         2. In the Store constructor, call this method
 
         Args:
-            update_name (str): update_name to register for
-            index_name (str): name of index to add
-            table (str): table to add index to
-            columns (list[str]): columns/expressions to include in index
-            unique (bool): true to make a UNIQUE index
+            update_name: update_name to register for
+            index_name: name of index to add
+            table: table to add index to
+            columns: columns/expressions to include in index
+            unique: true to make a UNIQUE index
             psql_only: true to only create this index on psql databases (useful
                 for virtual sqlite tables)
         """
 
-        def create_index_psql(conn):
+        def create_index_psql(conn: Connection) -> None:
             conn.rollback()
             # postgres insists on autocommit for the index
-            conn.set_session(autocommit=True)
+            conn.set_session(autocommit=True)  # type: ignore
 
             try:
                 c = conn.cursor()
@@ -371,9 +384,9 @@ class BackgroundUpdater:
                 logger.debug("[SQL] %s", sql)
                 c.execute(sql)
             finally:
-                conn.set_session(autocommit=False)
+                conn.set_session(autocommit=False)  # type: ignore
 
-        def create_index_sqlite(conn):
+        def create_index_sqlite(conn: Connection) -> None:
             # Sqlite doesn't support concurrent creation of indexes.
             #
             # We don't use partial indices on SQLite as it wasn't introduced
@@ -399,7 +412,7 @@ class BackgroundUpdater:
             c.execute(sql)
 
         if isinstance(self.db_pool.engine, engines.PostgresEngine):
-            runner = create_index_psql
+            runner = create_index_psql  # type: Optional[Callable[[Connection], None]]
         elif psql_only:
             runner = None
         else:
@@ -433,7 +446,9 @@ class BackgroundUpdater:
             "background_updates", keyvalues={"update_name": update_name}
         )
 
-    async def _background_update_progress(self, update_name: str, progress: dict):
+    async def _background_update_progress(
+        self, update_name: str, progress: dict
+    ) -> None:
         """Update the progress of a background update
 
         Args:
@@ -441,20 +456,22 @@ class BackgroundUpdater:
             progress: The progress of the update.
         """
 
-        return await self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "background_update_progress",
             self._background_update_progress_txn,
             update_name,
             progress,
         )
 
-    def _background_update_progress_txn(self, txn, update_name, progress):
+    def _background_update_progress_txn(
+        self, txn: "LoggingTransaction", update_name: str, progress: JsonDict
+    ) -> None:
         """Update the progress of a background update
 
         Args:
-            txn(cursor): The transaction.
-            update_name(str): The name of the background update task
-            progress(dict): The progress of the update.
+            txn: The transaction.
+            update_name: The name of the background update task
+            progress: The progress of the update.
         """
 
         progress_json = json_encoder.encode(progress)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index d1b5760c2c..d2ba4bd2fc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -42,7 +42,6 @@ from synapse.api.errors import StoreError
 from synapse.config.database import DatabaseConnectionConfig
 from synapse.logging.context import (
     LoggingContext,
-    LoggingContextOrSentinel,
     current_context,
     make_deferred_yieldable,
 )
@@ -50,6 +49,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.background_updates import BackgroundUpdater
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.types import Connection, Cursor
+from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import Collection
 
 # python 3 does not have a maximum int value
@@ -180,6 +180,9 @@ class LoggingDatabaseConnection:
 _CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
 
 
+R = TypeVar("R")
+
+
 class LoggingTransaction:
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging and metrics to the .execute()
@@ -259,13 +262,32 @@ class LoggingTransaction:
         return self.txn.description
 
     def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
+        """Similar to `executemany`, except `txn.rowcount` will not be correct
+        afterwards.
+
+        More efficient than `executemany` on PostgreSQL
+        """
+
         if isinstance(self.database_engine, PostgresEngine):
             from psycopg2.extras import execute_batch  # type: ignore
 
             self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
         else:
-            for val in args:
-                self.execute(sql, val)
+            self.executemany(sql, args)
+
+    def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
+        """Corresponds to psycopg2.extras.execute_values. Only available when
+        using postgres.
+
+        Always sets fetch=True when caling `execute_values`, so will return the
+        results.
+        """
+        assert isinstance(self.database_engine, PostgresEngine)
+        from psycopg2.extras import execute_values  # type: ignore
+
+        return self._do_execute(
+            lambda *x: execute_values(self.txn, *x, fetch=True), sql, *args
+        )
 
     def execute(self, sql: str, *args: Any) -> None:
         self._do_execute(self.txn.execute, sql, *args)
@@ -277,7 +299,7 @@ class LoggingTransaction:
         "Strip newlines out of SQL so that the loggers in the DB are on one line"
         return " ".join(line.strip() for line in sql.splitlines() if line.strip())
 
-    def _do_execute(self, func, sql: str, *args: Any) -> None:
+    def _do_execute(self, func: Callable[..., R], sql: str, *args: Any) -> R:
         sql = self._make_sql_one_line(sql)
 
         # TODO(paul): Maybe use 'info' and 'debug' for values?
@@ -348,9 +370,6 @@ class PerformanceCounters:
         return top_n_counters
 
 
-R = TypeVar("R")
-
-
 class DatabasePool:
     """Wraps a single physical database and connection pool.
 
@@ -399,6 +418,16 @@ class DatabasePool:
                 self._check_safe_to_upsert,
             )
 
+        # We define this sequence here so that it can be referenced from both
+        # the DataStore and PersistEventStore.
+        def get_chain_id_txn(txn):
+            txn.execute("SELECT COALESCE(max(chain_id), 0) FROM event_auth_chains")
+            return txn.fetchone()[0]
+
+        self.event_chain_id_gen = build_sequence_generator(
+            engine, get_chain_id_txn, "event_auth_chain_id"
+        )
+
     def is_running(self) -> bool:
         """Is the database pool currently running
         """
@@ -671,12 +700,15 @@ class DatabasePool:
         Returns:
             The result of func
         """
-        parent_context = current_context()  # type: Optional[LoggingContextOrSentinel]
-        if not parent_context:
+        curr_context = current_context()
+        if not curr_context:
             logger.warning(
                 "Starting db connection from sentinel context: metrics will be lost"
             )
             parent_context = None
+        else:
+            assert isinstance(curr_context, LoggingContext)
+            parent_context = curr_context
 
         start_time = monotonic_time()
 
@@ -861,7 +893,7 @@ class DatabasePool:
             ", ".join("?" for _ in keys[0]),
         )
 
-        txn.executemany(sql, vals)
+        txn.execute_batch(sql, vals)
 
     async def simple_upsert(
         self,
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 43660ec4fb..5d0845588c 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@ from .end_to_end_keys import EndToEndKeyStore
 from .event_federation import EventFederationStore
 from .event_push_actions import EventPushActionsStore
 from .events_bg_updates import EventsBackgroundUpdatesStore
+from .events_forward_extremities import EventForwardExtremitiesStore
 from .filtering import FilteringStore
 from .group_server import GroupServerStore
 from .keys import KeyStore
@@ -118,6 +119,7 @@ class DataStore(
     UIAuthStore,
     CacheInvalidationWorkerStore,
     ServerMetricsStore,
+    EventForwardExtremitiesStore,
 ):
     def __init__(self, database: DatabasePool, db_conn, hs):
         self.hs = hs
@@ -127,9 +129,6 @@ class DataStore(
         self._presence_id_gen = StreamIdGenerator(
             db_conn, "presence_stream", "stream_id"
         )
-        self._device_inbox_id_gen = StreamIdGenerator(
-            db_conn, "device_inbox", "stream_id"
-        )
         self._public_room_id_gen = StreamIdGenerator(
             db_conn, "public_room_list_stream", "stream_id"
         )
@@ -149,9 +148,6 @@ class DataStore(
         self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
         self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
         self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
-        self._pushers_id_gen = StreamIdGenerator(
-            db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
-        )
         self._group_updates_id_gen = StreamIdGenerator(
             db_conn, "local_group_updates", "stream_id"
         )
@@ -166,9 +162,13 @@ class DataStore(
                 database,
                 stream_name="caches",
                 instance_name=hs.get_instance_name(),
-                table="cache_invalidation_stream_by_instance",
-                instance_column="instance_name",
-                id_column="stream_id",
+                tables=[
+                    (
+                        "cache_invalidation_stream_by_instance",
+                        "instance_name",
+                        "stream_id",
+                    )
+                ],
                 sequence_name="cache_invalidation_stream_seq",
                 writers=[],
             )
@@ -192,36 +192,6 @@ class DataStore(
             prefilled_cache=presence_cache_prefill,
         )
 
-        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
-        device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
-            db_conn,
-            "device_inbox",
-            entity_column="user_id",
-            stream_column="stream_id",
-            max_value=max_device_inbox_id,
-            limit=1000,
-        )
-        self._device_inbox_stream_cache = StreamChangeCache(
-            "DeviceInboxStreamChangeCache",
-            min_device_inbox_id,
-            prefilled_cache=device_inbox_prefill,
-        )
-        # The federation outbox and the local device inbox uses the same
-        # stream_id generator.
-        device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
-            db_conn,
-            "device_federation_outbox",
-            entity_column="destination",
-            stream_column="stream_id",
-            max_value=max_device_inbox_id,
-            limit=1000,
-        )
-        self._device_federation_outbox_stream_cache = StreamChangeCache(
-            "DeviceFederationOutboxStreamChangeCache",
-            min_device_outbox_id,
-            prefilled_cache=device_outbox_prefill,
-        )
-
         device_list_max = self._device_list_id_gen.get_current_token()
         self._device_list_stream_cache = StreamChangeCache(
             "DeviceListStreamChangeCache", device_list_max
@@ -342,12 +312,13 @@ class DataStore(
             filters = []
             args = [self.hs.config.server_name]
 
+            # `name` is in database already in lower case
             if name:
-                filters.append("(name LIKE ? OR displayname LIKE ?)")
-                args.extend(["@%" + name + "%:%", "%" + name + "%"])
+                filters.append("(name LIKE ? OR LOWER(displayname) LIKE ?)")
+                args.extend(["@%" + name.lower() + "%:%", "%" + name.lower() + "%"])
             elif user_id:
                 filters.append("name LIKE ?")
-                args.extend(["%" + user_id + "%"])
+                args.extend(["%" + user_id.lower() + "%"])
 
             if not guests:
                 filters.append("is_guest = 0")
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 49ee23470d..a277a1ef13 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -14,30 +14,75 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import abc
 import logging
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, List, Optional, Set, Tuple
 
 from synapse.api.constants import AccountDataTypes
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.types import JsonDict
 from synapse.util import json_encoder
-from synapse.util.caches.descriptors import _CacheContext, cached
+from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
 
 
-# The ABCMeta metaclass ensures that it cannot be instantiated without
-# the abstract methods being implemented.
-class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
+class AccountDataWorkerStore(SQLBaseStore):
     """This is an abstract base class where subclasses must implement
     `get_max_account_data_stream_id` which can be called in the initializer.
     """
 
     def __init__(self, database: DatabasePool, db_conn, hs):
+        self._instance_name = hs.get_instance_name()
+
+        if isinstance(database.engine, PostgresEngine):
+            self._can_write_to_account_data = (
+                self._instance_name in hs.config.worker.writers.account_data
+            )
+
+            self._account_data_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="account_data",
+                instance_name=self._instance_name,
+                tables=[
+                    ("room_account_data", "instance_name", "stream_id"),
+                    ("room_tags_revisions", "instance_name", "stream_id"),
+                    ("account_data", "instance_name", "stream_id"),
+                ],
+                sequence_name="account_data_sequence",
+                writers=hs.config.worker.writers.account_data,
+            )
+        else:
+            self._can_write_to_account_data = True
+
+            # We shouldn't be running in worker mode with SQLite, but its useful
+            # to support it for unit tests.
+            #
+            # If this process is the writer than we need to use
+            # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
+            # updated over replication. (Multiple writers are not supported for
+            # SQLite).
+            if hs.get_instance_name() in hs.config.worker.writers.account_data:
+                self._account_data_id_gen = StreamIdGenerator(
+                    db_conn,
+                    "room_account_data",
+                    "stream_id",
+                    extra_tables=[("room_tags_revisions", "stream_id")],
+                )
+            else:
+                self._account_data_id_gen = SlavedIdTracker(
+                    db_conn,
+                    "room_account_data",
+                    "stream_id",
+                    extra_tables=[("room_tags_revisions", "stream_id")],
+                )
+
         account_max = self.get_max_account_data_stream_id()
         self._account_data_stream_cache = StreamChangeCache(
             "AccountDataAndTagsChangeCache", account_max
@@ -45,14 +90,13 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
 
         super().__init__(database, db_conn, hs)
 
-    @abc.abstractmethod
-    def get_max_account_data_stream_id(self):
+    def get_max_account_data_stream_id(self) -> int:
         """Get the current max stream ID for account data stream
 
         Returns:
             int
         """
-        raise NotImplementedError()
+        return self._account_data_id_gen.get_current_token()
 
     @cached()
     async def get_account_data_for_user(
@@ -287,46 +331,46 @@ class AccountDataWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
             "get_updated_account_data_for_user", get_updated_account_data_for_user_txn
         )
 
-    @cached(num_args=2, cache_context=True, max_entries=5000)
-    async def is_ignored_by(
-        self, ignored_user_id: str, ignorer_user_id: str, cache_context: _CacheContext
-    ) -> bool:
-        ignored_account_data = await self.get_global_account_data_by_type_for_user(
-            AccountDataTypes.IGNORED_USER_LIST,
-            ignorer_user_id,
-            on_invalidate=cache_context.invalidate,
-        )
-        if not ignored_account_data:
-            return False
-
-        try:
-            return ignored_user_id in ignored_account_data.get("ignored_users", {})
-        except TypeError:
-            # The type of the ignored_users field is invalid.
-            return False
+    @cached(max_entries=5000, iterable=True)
+    async def ignored_by(self, user_id: str) -> Set[str]:
+        """
+        Get users which ignore the given user.
 
+        Params:
+            user_id: The user ID which might be ignored.
 
-class AccountDataStore(AccountDataWorkerStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        self._account_data_id_gen = StreamIdGenerator(
-            db_conn,
-            "account_data_max_stream_id",
-            "stream_id",
-            extra_tables=[
-                ("room_account_data", "stream_id"),
-                ("room_tags_revisions", "stream_id"),
-            ],
+        Return:
+            The user IDs which ignore the given user.
+        """
+        return set(
+            await self.db_pool.simple_select_onecol(
+                table="ignored_users",
+                keyvalues={"ignored_user_id": user_id},
+                retcol="ignorer_user_id",
+                desc="ignored_by",
+            )
         )
 
-        super().__init__(database, db_conn, hs)
-
-    def get_max_account_data_stream_id(self) -> int:
-        """Get the current max stream id for the private user data stream
-
-        Returns:
-            The maximum stream ID.
-        """
-        return self._account_data_id_gen.get_current_token()
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
+        if stream_name == TagAccountDataStream.NAME:
+            self._account_data_id_gen.advance(instance_name, token)
+            for row in rows:
+                self.get_tags_for_user.invalidate((row.user_id,))
+                self._account_data_stream_cache.entity_has_changed(row.user_id, token)
+        elif stream_name == AccountDataStream.NAME:
+            self._account_data_id_gen.advance(instance_name, token)
+            for row in rows:
+                if not row.room_id:
+                    self.get_global_account_data_by_type_for_user.invalidate(
+                        (row.data_type, row.user_id)
+                    )
+                self.get_account_data_for_user.invalidate((row.user_id,))
+                self.get_account_data_for_room.invalidate((row.user_id, row.room_id))
+                self.get_account_data_for_room_and_type.invalidate(
+                    (row.user_id, row.room_id, row.data_type)
+                )
+                self._account_data_stream_cache.entity_has_changed(row.user_id, token)
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
 
     async def add_account_data_to_room(
         self, user_id: str, room_id: str, account_data_type: str, content: JsonDict
@@ -342,6 +386,8 @@ class AccountDataStore(AccountDataWorkerStore):
         Returns:
             The maximum stream ID.
         """
+        assert self._can_write_to_account_data
+
         content_json = json_encoder.encode(content)
 
         async with self._account_data_id_gen.get_next() as next_id:
@@ -360,14 +406,6 @@ class AccountDataStore(AccountDataWorkerStore):
                 lock=False,
             )
 
-            # it's theoretically possible for the above to succeed and the
-            # below to fail - in which case we might reuse a stream id on
-            # restart, and the above update might not get propagated. That
-            # doesn't sound any worse than the whole update getting lost,
-            # which is what would happen if we combined the two into one
-            # transaction.
-            await self._update_max_stream_id(next_id)
-
             self._account_data_stream_cache.entity_has_changed(user_id, next_id)
             self.get_account_data_for_user.invalidate((user_id,))
             self.get_account_data_for_room.invalidate((user_id, room_id))
@@ -390,32 +428,18 @@ class AccountDataStore(AccountDataWorkerStore):
         Returns:
             The maximum stream ID.
         """
-        content_json = json_encoder.encode(content)
+        assert self._can_write_to_account_data
 
         async with self._account_data_id_gen.get_next() as next_id:
-            # no need to lock here as account_data has a unique constraint on
-            # (user_id, account_data_type) so simple_upsert will retry if
-            # there is a conflict.
-            await self.db_pool.simple_upsert(
-                desc="add_user_account_data",
-                table="account_data",
-                keyvalues={"user_id": user_id, "account_data_type": account_data_type},
-                values={"stream_id": next_id, "content": content_json},
-                lock=False,
+            await self.db_pool.runInteraction(
+                "add_user_account_data",
+                self._add_account_data_for_user,
+                next_id,
+                user_id,
+                account_data_type,
+                content,
             )
 
-            # it's theoretically possible for the above to succeed and the
-            # below to fail - in which case we might reuse a stream id on
-            # restart, and the above update might not get propagated. That
-            # doesn't sound any worse than the whole update getting lost,
-            # which is what would happen if we combined the two into one
-            # transaction.
-            #
-            # Note: This is only here for backwards compat to allow admins to
-            # roll back to a previous Synapse version. Next time we update the
-            # database version we can remove this table.
-            await self._update_max_stream_id(next_id)
-
             self._account_data_stream_cache.entity_has_changed(user_id, next_id)
             self.get_account_data_for_user.invalidate((user_id,))
             self.get_global_account_data_by_type_for_user.invalidate(
@@ -424,23 +448,71 @@ class AccountDataStore(AccountDataWorkerStore):
 
         return self._account_data_id_gen.get_current_token()
 
-    async def _update_max_stream_id(self, next_id: int) -> None:
-        """Update the max stream_id
+    def _add_account_data_for_user(
+        self,
+        txn,
+        next_id: int,
+        user_id: str,
+        account_data_type: str,
+        content: JsonDict,
+    ) -> None:
+        content_json = json_encoder.encode(content)
 
-        Args:
-            next_id: The the revision to advance to.
-        """
+        # no need to lock here as account_data has a unique constraint on
+        # (user_id, account_data_type) so simple_upsert will retry if
+        # there is a conflict.
+        self.db_pool.simple_upsert_txn(
+            txn,
+            table="account_data",
+            keyvalues={"user_id": user_id, "account_data_type": account_data_type},
+            values={"stream_id": next_id, "content": content_json},
+            lock=False,
+        )
 
-        # Note: This is only here for backwards compat to allow admins to
-        # roll back to a previous Synapse version. Next time we update the
-        # database version we can remove this table.
+        # Ignored users get denormalized into a separate table as an optimisation.
+        if account_data_type != AccountDataTypes.IGNORED_USER_LIST:
+            return
 
-        def _update(txn):
-            update_max_id_sql = (
-                "UPDATE account_data_max_stream_id"
-                " SET stream_id = ?"
-                " WHERE stream_id < ?"
+        # Insert / delete to sync the list of ignored users.
+        previously_ignored_users = set(
+            self.db_pool.simple_select_onecol_txn(
+                txn,
+                table="ignored_users",
+                keyvalues={"ignorer_user_id": user_id},
+                retcol="ignored_user_id",
             )
-            txn.execute(update_max_id_sql, (next_id, next_id))
+        )
+
+        # If the data is invalid, no one is ignored.
+        ignored_users_content = content.get("ignored_users", {})
+        if isinstance(ignored_users_content, dict):
+            currently_ignored_users = set(ignored_users_content)
+        else:
+            currently_ignored_users = set()
+
+        # Delete entries which are no longer ignored.
+        self.db_pool.simple_delete_many_txn(
+            txn,
+            table="ignored_users",
+            column="ignored_user_id",
+            iterable=previously_ignored_users - currently_ignored_users,
+            keyvalues={"ignorer_user_id": user_id},
+        )
 
-        await self.db_pool.runInteraction("update_account_data_max_stream_id", _update)
+        # Add entries which are newly ignored.
+        self.db_pool.simple_insert_many_txn(
+            txn,
+            table="ignored_users",
+            values=[
+                {"ignorer_user_id": user_id, "ignored_user_id": u}
+                for u in currently_ignored_users - previously_ignored_users
+            ],
+        )
+
+        # Invalidate the cache for any ignored users which were added or removed.
+        for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
+            self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+
+
+class AccountDataStore(AccountDataWorkerStore):
+    pass
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 339bd691a4..ea1e8fb580 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -14,11 +14,12 @@
 # limitations under the License.
 
 import logging
-from typing import Dict, Optional, Tuple
+from typing import Dict, List, Optional, Tuple, Union
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
+from synapse.types import UserID
 from synapse.util.caches.lrucache import LruCache
 
 logger = logging.getLogger(__name__)
@@ -406,6 +407,34 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
             "_prune_old_user_ips", _prune_old_user_ips_txn
         )
 
+    async def get_last_client_ip_by_device(
+        self, user_id: str, device_id: Optional[str]
+    ) -> Dict[Tuple[str, str], dict]:
+        """For each device_id listed, give the user_ip it was last seen on.
+
+        The result might be slightly out of date as client IPs are inserted in batches.
+
+        Args:
+            user_id: The user to fetch devices for.
+            device_id: If None fetches all devices for the user
+
+        Returns:
+            A dictionary mapping a tuple of (user_id, device_id) to dicts, with
+            keys giving the column names from the devices table.
+        """
+
+        keyvalues = {"user_id": user_id}
+        if device_id is not None:
+            keyvalues["device_id"] = device_id
+
+        res = await self.db_pool.simple_select_list(
+            table="devices",
+            keyvalues=keyvalues,
+            retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
+        )
+
+        return {(d["user_id"], d["device_id"]): d for d in res}
+
 
 class ClientIpStore(ClientIpWorkerStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
@@ -469,43 +498,35 @@ class ClientIpStore(ClientIpWorkerStore):
         for entry in to_update.items():
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
-            try:
-                self.db_pool.simple_upsert_txn(
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="user_ips",
+                keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
+                values={
+                    "user_agent": user_agent,
+                    "device_id": device_id,
+                    "last_seen": last_seen,
+                },
+                lock=False,
+            )
+
+            # Technically an access token might not be associated with
+            # a device so we need to check.
+            if device_id:
+                # this is always an update rather than an upsert: the row should
+                # already exist, and if it doesn't, that may be because it has been
+                # deleted, and we don't want to re-create it.
+                self.db_pool.simple_update_txn(
                     txn,
-                    table="user_ips",
-                    keyvalues={
-                        "user_id": user_id,
-                        "access_token": access_token,
-                        "ip": ip,
-                    },
-                    values={
+                    table="devices",
+                    keyvalues={"user_id": user_id, "device_id": device_id},
+                    updatevalues={
                         "user_agent": user_agent,
-                        "device_id": device_id,
                         "last_seen": last_seen,
+                        "ip": ip,
                     },
-                    lock=False,
                 )
 
-                # Technically an access token might not be associated with
-                # a device so we need to check.
-                if device_id:
-                    # this is always an update rather than an upsert: the row should
-                    # already exist, and if it doesn't, that may be because it has been
-                    # deleted, and we don't want to re-create it.
-                    self.db_pool.simple_update_txn(
-                        txn,
-                        table="devices",
-                        keyvalues={"user_id": user_id, "device_id": device_id},
-                        updatevalues={
-                            "user_agent": user_agent,
-                            "last_seen": last_seen,
-                            "ip": ip,
-                        },
-                    )
-            except Exception as e:
-                # Failed to upsert, log and continue
-                logger.error("Failed to insert client IP %r: %r", entry, e)
-
     async def get_last_client_ip_by_device(
         self, user_id: str, device_id: Optional[str]
     ) -> Dict[Tuple[str, str], dict]:
@@ -519,18 +540,9 @@ class ClientIpStore(ClientIpWorkerStore):
             A dictionary mapping a tuple of (user_id, device_id) to dicts, with
             keys giving the column names from the devices table.
         """
+        ret = await super().get_last_client_ip_by_device(user_id, device_id)
 
-        keyvalues = {"user_id": user_id}
-        if device_id is not None:
-            keyvalues["device_id"] = device_id
-
-        res = await self.db_pool.simple_select_list(
-            table="devices",
-            keyvalues=keyvalues,
-            retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
-        )
-
-        ret = {(d["user_id"], d["device_id"]): d for d in res}
+        # Update what is retrieved from the database with data which is pending insertion.
         for key in self._batch_row_update:
             uid, access_token, ip = key
             if uid == user_id:
@@ -546,7 +558,9 @@ class ClientIpStore(ClientIpWorkerStore):
                     }
         return ret
 
-    async def get_user_ip_and_agents(self, user):
+    async def get_user_ip_and_agents(
+        self, user: UserID
+    ) -> List[Dict[str, Union[str, int]]]:
         user_id = user.to_string()
         results = {}
 
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index d42faa3f1f..31f70ac5ef 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -17,15 +17,98 @@ import logging
 from typing import List, Tuple
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
+from synapse.replication.tcp.streams import ToDeviceStream
+from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.util import json_encoder
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
 
 
 class DeviceInboxWorkerStore(SQLBaseStore):
+    def __init__(self, database: DatabasePool, db_conn, hs):
+        super().__init__(database, db_conn, hs)
+
+        self._instance_name = hs.get_instance_name()
+
+        # Map of (user_id, device_id) to the last stream_id that has been
+        # deleted up to. This is so that we can no op deletions.
+        self._last_device_delete_cache = ExpiringCache(
+            cache_name="last_device_delete_cache",
+            clock=self._clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+        )
+
+        if isinstance(database.engine, PostgresEngine):
+            self._can_write_to_device = (
+                self._instance_name in hs.config.worker.writers.to_device
+            )
+
+            self._device_inbox_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="to_device",
+                instance_name=self._instance_name,
+                tables=[("device_inbox", "instance_name", "stream_id")],
+                sequence_name="device_inbox_sequence",
+                writers=hs.config.worker.writers.to_device,
+            )
+        else:
+            self._can_write_to_device = True
+            self._device_inbox_id_gen = StreamIdGenerator(
+                db_conn, "device_inbox", "stream_id"
+            )
+
+        max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
+        device_inbox_prefill, min_device_inbox_id = self.db_pool.get_cache_dict(
+            db_conn,
+            "device_inbox",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=max_device_inbox_id,
+            limit=1000,
+        )
+        self._device_inbox_stream_cache = StreamChangeCache(
+            "DeviceInboxStreamChangeCache",
+            min_device_inbox_id,
+            prefilled_cache=device_inbox_prefill,
+        )
+
+        # The federation outbox and the local device inbox uses the same
+        # stream_id generator.
+        device_outbox_prefill, min_device_outbox_id = self.db_pool.get_cache_dict(
+            db_conn,
+            "device_federation_outbox",
+            entity_column="destination",
+            stream_column="stream_id",
+            max_value=max_device_inbox_id,
+            limit=1000,
+        )
+        self._device_federation_outbox_stream_cache = StreamChangeCache(
+            "DeviceFederationOutboxStreamChangeCache",
+            min_device_outbox_id,
+            prefilled_cache=device_outbox_prefill,
+        )
+
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
+        if stream_name == ToDeviceStream.NAME:
+            self._device_inbox_id_gen.advance(instance_name, token)
+            for row in rows:
+                if row.entity.startswith("@"):
+                    self._device_inbox_stream_cache.entity_has_changed(
+                        row.entity, token
+                    )
+                else:
+                    self._device_federation_outbox_stream_cache.entity_has_changed(
+                        row.entity, token
+                    )
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
+
     def get_to_device_stream_token(self):
         return self._device_inbox_id_gen.get_current_token()
 
@@ -278,52 +361,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
 
-
-class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
-    DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
-
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        super().__init__(database, db_conn, hs)
-
-        self.db_pool.updates.register_background_index_update(
-            "device_inbox_stream_index",
-            index_name="device_inbox_stream_id_user_id",
-            table="device_inbox",
-            columns=["stream_id", "user_id"],
-        )
-
-        self.db_pool.updates.register_background_update_handler(
-            self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
-        )
-
-    async def _background_drop_index_device_inbox(self, progress, batch_size):
-        def reindex_txn(conn):
-            txn = conn.cursor()
-            txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
-            txn.close()
-
-        await self.db_pool.runWithConnection(reindex_txn)
-
-        await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
-
-        return 1
-
-
-class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
-    DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
-
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        super().__init__(database, db_conn, hs)
-
-        # Map of (user_id, device_id) to the last stream_id that has been
-        # deleted up to. This is so that we can no op deletions.
-        self._last_device_delete_cache = ExpiringCache(
-            cache_name="last_device_delete_cache",
-            clock=self._clock,
-            max_len=10000,
-            expiry_ms=30 * 60 * 1000,
-        )
-
     @trace
     async def add_messages_to_device_inbox(
         self,
@@ -342,6 +379,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             The new stream_id.
         """
 
+        assert self._can_write_to_device
+
         def add_messages_txn(txn, now_ms, stream_id):
             # Add the local messages directly to the local inbox.
             self._add_messages_to_local_device_inbox_txn(
@@ -351,16 +390,20 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             # Add the remote messages to the federation outbox.
             # We'll send them to a remote server when we next send a
             # federation transaction to that destination.
-            sql = (
-                "INSERT INTO device_federation_outbox"
-                " (destination, stream_id, queued_ts, messages_json)"
-                " VALUES (?,?,?,?)"
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="device_federation_outbox",
+                values=[
+                    {
+                        "destination": destination,
+                        "stream_id": stream_id,
+                        "queued_ts": now_ms,
+                        "messages_json": json_encoder.encode(edu),
+                        "instance_name": self._instance_name,
+                    }
+                    for destination, edu in remote_messages_by_destination.items()
+                ],
             )
-            rows = []
-            for destination, edu in remote_messages_by_destination.items():
-                edu_json = json_encoder.encode(edu)
-                rows.append((destination, stream_id, now_ms, edu_json))
-            txn.executemany(sql, rows)
 
         async with self._device_inbox_id_gen.get_next() as stream_id:
             now_ms = self.clock.time_msec()
@@ -379,6 +422,8 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     async def add_messages_from_remote_to_device_inbox(
         self, origin: str, message_id: str, local_messages_by_user_then_device: dict
     ) -> int:
+        assert self._can_write_to_device
+
         def add_messages_txn(txn, now_ms, stream_id):
             # Check if we've already inserted a matching message_id for that
             # origin. This can happen if the origin doesn't receive our
@@ -427,38 +472,45 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     def _add_messages_to_local_device_inbox_txn(
         self, txn, stream_id, messages_by_user_then_device
     ):
+        assert self._can_write_to_device
+
         local_by_user_then_device = {}
         for user_id, messages_by_device in messages_by_user_then_device.items():
             messages_json_for_user = {}
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = "SELECT device_id FROM devices WHERE user_id = ?"
-                txn.execute(sql, (user_id,))
+                devices = self.db_pool.simple_select_onecol_txn(
+                    txn,
+                    table="devices",
+                    keyvalues={"user_id": user_id},
+                    retcol="device_id",
+                )
+
                 message_json = json_encoder.encode(messages_by_device["*"])
-                for row in txn:
+                for device_id in devices:
                     # Add the message for all devices for this user on this
                     # server.
-                    device = row[0]
-                    messages_json_for_user[device] = message_json
+                    messages_json_for_user[device_id] = message_json
             else:
                 if not devices:
                     continue
 
-                clause, args = make_in_list_sql_clause(
-                    txn.database_engine, "device_id", devices
+                rows = self.db_pool.simple_select_many_txn(
+                    txn,
+                    table="devices",
+                    keyvalues={"user_id": user_id},
+                    column="device_id",
+                    iterable=devices,
+                    retcols=("device_id",),
                 )
-                sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause
 
-                # TODO: Maybe this needs to be done in batches if there are
-                # too many local devices for a given user.
-                txn.execute(sql, [user_id] + list(args))
-                for row in txn:
+                for row in rows:
                     # Only insert into the local inbox if the device exists on
                     # this server
-                    device = row[0]
-                    message_json = json_encoder.encode(messages_by_device[device])
-                    messages_json_for_user[device] = message_json
+                    device_id = row["device_id"]
+                    message_json = json_encoder.encode(messages_by_device[device_id])
+                    messages_json_for_user[device_id] = message_json
 
             if messages_json_for_user:
                 local_by_user_then_device[user_id] = messages_json_for_user
@@ -466,14 +518,52 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
         if not local_by_user_then_device:
             return
 
-        sql = (
-            "INSERT INTO device_inbox"
-            " (user_id, device_id, stream_id, message_json)"
-            " VALUES (?,?,?,?)"
+        self.db_pool.simple_insert_many_txn(
+            txn,
+            table="device_inbox",
+            values=[
+                {
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "stream_id": stream_id,
+                    "message_json": message_json,
+                    "instance_name": self._instance_name,
+                }
+                for user_id, messages_by_device in local_by_user_then_device.items()
+                for device_id, message_json in messages_by_device.items()
+            ],
         )
-        rows = []
-        for user_id, messages_by_device in local_by_user_then_device.items():
-            for device_id, message_json in messages_by_device.items():
-                rows.append((user_id, device_id, stream_id, message_json))
 
-        txn.executemany(sql, rows)
+
+class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
+    DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
+
+    def __init__(self, database: DatabasePool, db_conn, hs):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_index_update(
+            "device_inbox_stream_index",
+            index_name="device_inbox_stream_id_user_id",
+            table="device_inbox",
+            columns=["stream_id", "user_id"],
+        )
+
+        self.db_pool.updates.register_background_update_handler(
+            self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
+        )
+
+    async def _background_drop_index_device_inbox(self, progress, batch_size):
+        def reindex_txn(conn):
+            txn = conn.cursor()
+            txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
+            txn.close()
+
+        await self.db_pool.runWithConnection(reindex_txn)
+
+        await self.db_pool.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
+
+        return 1
+
+
+class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
+    pass
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index dfb4f87b8f..659d8f245f 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -57,6 +57,38 @@ class DeviceWorkerStore(SQLBaseStore):
                 self._prune_old_outbound_device_pokes, 60 * 60 * 1000
             )
 
+    async def count_devices_by_users(self, user_ids: Optional[List[str]] = None) -> int:
+        """Retrieve number of all devices of given users.
+        Only returns number of devices that are not marked as hidden.
+
+        Args:
+            user_ids: The IDs of the users which owns devices
+        Returns:
+            Number of devices of this users.
+        """
+
+        def count_devices_by_users_txn(txn, user_ids):
+            sql = """
+                SELECT count(*)
+                FROM devices
+                WHERE
+                    hidden = '0' AND
+            """
+
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "user_id", user_ids
+            )
+
+            txn.execute(sql + clause, args)
+            return txn.fetchone()[0]
+
+        if not user_ids:
+            return 0
+
+        return await self.db_pool.runInteraction(
+            "count_devices_by_users", count_devices_by_users_txn, user_ids
+        )
+
     async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:
         """Retrieve a device. Only returns devices that are not marked as
         hidden.
@@ -865,7 +897,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 DELETE FROM device_lists_outbound_last_success
                 WHERE destination = ? AND user_id = ?
             """
-            txn.executemany(sql, ((row[0], row[1]) for row in rows))
+            txn.execute_batch(sql, ((row[0], row[1]) for row in rows))
 
             logger.info("Pruned %d device list outbound pokes", count)
 
@@ -1311,7 +1343,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         # Delete older entries in the table, as we really only care about
         # when the latest change happened.
-        txn.executemany(
+        txn.execute_batch(
             """
             DELETE FROM device_lists_stream
             WHERE user_id = ? AND device_id = ? AND stream_id < ?
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 4d1b92d1aa..309f1e865b 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -25,6 +25,7 @@ from twisted.enterprise.adbapi import Connection
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, make_in_list_sql_clause
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
 from synapse.types import JsonDict
 from synapse.util import json_encoder
@@ -513,21 +514,35 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
 
         for user_chunk in batch_iter(user_ids, 100):
             clause, params = make_in_list_sql_clause(
-                txn.database_engine, "k.user_id", user_chunk
-            )
-            sql = (
-                """
-                SELECT k.user_id, k.keytype, k.keydata, k.stream_id
-                  FROM e2e_cross_signing_keys k
-                  INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id
-                                FROM e2e_cross_signing_keys
-                               GROUP BY user_id, keytype) s
-                 USING (user_id, stream_id, keytype)
-                 WHERE
-            """
-                + clause
+                txn.database_engine, "user_id", user_chunk
             )
 
+            # Fetch the latest key for each type per user.
+            if isinstance(self.database_engine, PostgresEngine):
+                # The `DISTINCT ON` clause will pick the *first* row it
+                # encounters, so ordering by stream ID desc will ensure we get
+                # the latest key.
+                sql = """
+                    SELECT DISTINCT ON (user_id, keytype) user_id, keytype, keydata, stream_id
+                        FROM e2e_cross_signing_keys
+                        WHERE %(clause)s
+                        ORDER BY user_id, keytype, stream_id DESC
+                """ % {
+                    "clause": clause
+                }
+            else:
+                # SQLite has special handling for bare columns when using
+                # MIN/MAX with a `GROUP BY` clause where it picks the value from
+                # a row that matches the MIN/MAX.
+                sql = """
+                    SELECT user_id, keytype, keydata, MAX(stream_id)
+                        FROM e2e_cross_signing_keys
+                        WHERE %(clause)s
+                        GROUP BY user_id, keytype
+                """ % {
+                    "clause": clause
+                }
+
             txn.execute(sql, params)
             rows = self.db_pool.cursor_to_dict(txn)
 
@@ -619,7 +634,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
 
     async def get_e2e_cross_signing_keys_bulk(
         self, user_ids: List[str], from_user_id: Optional[str] = None
-    ) -> Dict[str, Dict[str, dict]]:
+    ) -> Dict[str, Optional[Dict[str, dict]]]:
         """Returns the cross-signing keys for a set of users.
 
         Args:
@@ -707,53 +722,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
         """Get the current stream id from the _device_list_id_gen"""
         ...
 
-
-class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
-    async def set_e2e_device_keys(
-        self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
-    ) -> bool:
-        """Stores device keys for a device. Returns whether there was a change
-        or the keys were already in the database.
-        """
-
-        def _set_e2e_device_keys_txn(txn):
-            set_tag("user_id", user_id)
-            set_tag("device_id", device_id)
-            set_tag("time_now", time_now)
-            set_tag("device_keys", device_keys)
-
-            old_key_json = self.db_pool.simple_select_one_onecol_txn(
-                txn,
-                table="e2e_device_keys_json",
-                keyvalues={"user_id": user_id, "device_id": device_id},
-                retcol="key_json",
-                allow_none=True,
-            )
-
-            # In py3 we need old_key_json to match new_key_json type. The DB
-            # returns unicode while encode_canonical_json returns bytes.
-            new_key_json = encode_canonical_json(device_keys).decode("utf-8")
-
-            if old_key_json == new_key_json:
-                log_kv({"Message": "Device key already stored."})
-                return False
-
-            self.db_pool.simple_upsert_txn(
-                txn,
-                table="e2e_device_keys_json",
-                keyvalues={"user_id": user_id, "device_id": device_id},
-                values={"ts_added_ms": time_now, "key_json": new_key_json},
-            )
-            log_kv({"message": "Device keys stored."})
-            return True
-
-        return await self.db_pool.runInteraction(
-            "set_e2e_device_keys", _set_e2e_device_keys_txn
-        )
-
     async def claim_e2e_one_time_keys(
         self, query_list: Iterable[Tuple[str, str, str]]
-    ) -> Dict[str, Dict[str, Dict[str, bytes]]]:
+    ) -> Dict[str, Dict[str, Dict[str, str]]]:
         """Take a list of one time keys out of the database.
 
         Args:
@@ -840,6 +811,50 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
         )
 
+
+class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
+    async def set_e2e_device_keys(
+        self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
+    ) -> bool:
+        """Stores device keys for a device. Returns whether there was a change
+        or the keys were already in the database.
+        """
+
+        def _set_e2e_device_keys_txn(txn):
+            set_tag("user_id", user_id)
+            set_tag("device_id", device_id)
+            set_tag("time_now", time_now)
+            set_tag("device_keys", device_keys)
+
+            old_key_json = self.db_pool.simple_select_one_onecol_txn(
+                txn,
+                table="e2e_device_keys_json",
+                keyvalues={"user_id": user_id, "device_id": device_id},
+                retcol="key_json",
+                allow_none=True,
+            )
+
+            # In py3 we need old_key_json to match new_key_json type. The DB
+            # returns unicode while encode_canonical_json returns bytes.
+            new_key_json = encode_canonical_json(device_keys).decode("utf-8")
+
+            if old_key_json == new_key_json:
+                log_kv({"Message": "Device key already stored."})
+                return False
+
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="e2e_device_keys_json",
+                keyvalues={"user_id": user_id, "device_id": device_id},
+                values={"ts_added_ms": time_now, "key_json": new_key_json},
+            )
+            log_kv({"message": "Device keys stored."})
+            return True
+
+        return await self.db_pool.runInteraction(
+            "set_e2e_device_keys", _set_e2e_device_keys_txn
+        )
+
     async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
         def delete_e2e_keys_by_device_txn(txn):
             log_kv(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 2e07c37340..8326640d20 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -24,6 +24,8 @@ from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.signatures import SignatureWorkerStore
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.types import Cursor
 from synapse.types import Collection
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
@@ -32,6 +34,11 @@ from synapse.util.iterutils import batch_iter
 logger = logging.getLogger(__name__)
 
 
+class _NoChainCoverIndex(Exception):
+    def __init__(self, room_id: str):
+        super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
+
+
 class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
@@ -137,7 +144,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return list(results)
 
-    async def get_auth_chain_difference(self, state_sets: List[Set[str]]) -> Set[str]:
+    async def get_auth_chain_difference(
+        self, room_id: str, state_sets: List[Set[str]]
+    ) -> Set[str]:
         """Given sets of state events figure out the auth chain difference (as
         per state res v2 algorithm).
 
@@ -149,15 +158,193 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             The set of the difference in auth chains.
         """
 
+        # Check if we have indexed the room so we can use the chain cover
+        # algorithm.
+        room = await self.get_room(room_id)
+        if room["has_auth_chain_index"]:
+            try:
+                return await self.db_pool.runInteraction(
+                    "get_auth_chain_difference_chains",
+                    self._get_auth_chain_difference_using_cover_index_txn,
+                    room_id,
+                    state_sets,
+                )
+            except _NoChainCoverIndex:
+                # For whatever reason we don't actually have a chain cover index
+                # for the events in question, so we fall back to the old method.
+                pass
+
         return await self.db_pool.runInteraction(
             "get_auth_chain_difference",
             self._get_auth_chain_difference_txn,
             state_sets,
         )
 
+    def _get_auth_chain_difference_using_cover_index_txn(
+        self, txn: Cursor, room_id: str, state_sets: List[Set[str]]
+    ) -> Set[str]:
+        """Calculates the auth chain difference using the chain index.
+
+        See docs/auth_chain_difference_algorithm.md for details
+        """
+
+        # First we look up the chain ID/sequence numbers for all the events, and
+        # work out the chain/sequence numbers reachable from each state set.
+
+        initial_events = set(state_sets[0]).union(*state_sets[1:])
+
+        # Map from event_id -> (chain ID, seq no)
+        chain_info = {}  # type: Dict[str, Tuple[int, int]]
+
+        # Map from chain ID -> seq no -> event Id
+        chain_to_event = {}  # type: Dict[int, Dict[int, str]]
+
+        # All the chains that we've found that are reachable from the state
+        # sets.
+        seen_chains = set()  # type: Set[int]
+
+        sql = """
+            SELECT event_id, chain_id, sequence_number
+            FROM event_auth_chains
+            WHERE %s
+        """
+        for batch in batch_iter(initial_events, 1000):
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "event_id", batch
+            )
+            txn.execute(sql % (clause,), args)
+
+            for event_id, chain_id, sequence_number in txn:
+                chain_info[event_id] = (chain_id, sequence_number)
+                seen_chains.add(chain_id)
+                chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
+
+        # Check that we actually have a chain ID for all the events.
+        events_missing_chain_info = initial_events.difference(chain_info)
+        if events_missing_chain_info:
+            # This can happen due to e.g. downgrade/upgrade of the server. We
+            # raise an exception and fall back to the previous algorithm.
+            logger.info(
+                "Unexpectedly found that events don't have chain IDs in room %s: %s",
+                room_id,
+                events_missing_chain_info,
+            )
+            raise _NoChainCoverIndex(room_id)
+
+        # Corresponds to `state_sets`, except as a map from chain ID to max
+        # sequence number reachable from the state set.
+        set_to_chain = []  # type: List[Dict[int, int]]
+        for state_set in state_sets:
+            chains = {}  # type: Dict[int, int]
+            set_to_chain.append(chains)
+
+            for event_id in state_set:
+                chain_id, seq_no = chain_info[event_id]
+
+                chains[chain_id] = max(seq_no, chains.get(chain_id, 0))
+
+        # Now we look up all links for the chains we have, adding chains to
+        # set_to_chain that are reachable from each set.
+        sql = """
+            SELECT
+                origin_chain_id, origin_sequence_number,
+                target_chain_id, target_sequence_number
+            FROM event_auth_chain_links
+            WHERE %s
+        """
+
+        # (We need to take a copy of `seen_chains` as we want to mutate it in
+        # the loop)
+        for batch in batch_iter(set(seen_chains), 1000):
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "origin_chain_id", batch
+            )
+            txn.execute(sql % (clause,), args)
+
+            for (
+                origin_chain_id,
+                origin_sequence_number,
+                target_chain_id,
+                target_sequence_number,
+            ) in txn:
+                for chains in set_to_chain:
+                    # chains are only reachable if the origin sequence number of
+                    # the link is less than the max sequence number in the
+                    # origin chain.
+                    if origin_sequence_number <= chains.get(origin_chain_id, 0):
+                        chains[target_chain_id] = max(
+                            target_sequence_number, chains.get(target_chain_id, 0),
+                        )
+
+                seen_chains.add(target_chain_id)
+
+        # Now for each chain we figure out the maximum sequence number reachable
+        # from *any* state set and the minimum sequence number reachable from
+        # *all* state sets. Events in that range are in the auth chain
+        # difference.
+        result = set()
+
+        # Mapping from chain ID to the range of sequence numbers that should be
+        # pulled from the database.
+        chain_to_gap = {}  # type: Dict[int, Tuple[int, int]]
+
+        for chain_id in seen_chains:
+            min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
+            max_seq_no = max(chains.get(chain_id, 0) for chains in set_to_chain)
+
+            if min_seq_no < max_seq_no:
+                # We have a non empty gap, try and fill it from the events that
+                # we have, otherwise add them to the list of gaps to pull out
+                # from the DB.
+                for seq_no in range(min_seq_no + 1, max_seq_no + 1):
+                    event_id = chain_to_event.get(chain_id, {}).get(seq_no)
+                    if event_id:
+                        result.add(event_id)
+                    else:
+                        chain_to_gap[chain_id] = (min_seq_no, max_seq_no)
+                        break
+
+        if not chain_to_gap:
+            # If there are no gaps to fetch, we're done!
+            return result
+
+        if isinstance(self.database_engine, PostgresEngine):
+            # We can use `execute_values` to efficiently fetch the gaps when
+            # using postgres.
+            sql = """
+                SELECT event_id
+                FROM event_auth_chains AS c, (VALUES ?) AS l(chain_id, min_seq, max_seq)
+                WHERE
+                    c.chain_id = l.chain_id
+                    AND min_seq < sequence_number AND sequence_number <= max_seq
+            """
+
+            args = [
+                (chain_id, min_no, max_no)
+                for chain_id, (min_no, max_no) in chain_to_gap.items()
+            ]
+
+            rows = txn.execute_values(sql, args)
+            result.update(r for r, in rows)
+        else:
+            # For SQLite we just fall back to doing a noddy for loop.
+            sql = """
+                SELECT event_id FROM event_auth_chains
+                WHERE chain_id = ? AND ? < sequence_number AND sequence_number <= ?
+            """
+            for chain_id, (min_no, max_no) in chain_to_gap.items():
+                txn.execute(sql, (chain_id, min_no, max_no))
+                result.update(r for r, in txn)
+
+        return result
+
     def _get_auth_chain_difference_txn(
         self, txn, state_sets: List[Set[str]]
     ) -> Set[str]:
+        """Calculates the auth chain difference using a breadth first search.
+
+        This is used when we don't have a cover index for the room.
+        """
 
         # Algorithm Description
         # ~~~~~~~~~~~~~~~~~~~~~
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 2e56dfaf31..438383abe1 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -487,7 +487,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 VALUES (?, ?, ?, ?, ?, ?)
             """
 
-            txn.executemany(
+            txn.execute_batch(
                 sql,
                 (
                     _gen_entry(user_id, actions)
@@ -803,7 +803,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             ],
         )
 
-        txn.executemany(
+        txn.execute_batch(
             """
                 UPDATE event_push_summary
                 SET notif_count = ?, unread_count = ?, stream_ordering = ?
@@ -835,6 +835,52 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             (rotate_to_stream_ordering,),
         )
 
+    def _remove_old_push_actions_before_txn(
+        self, txn, room_id, user_id, stream_ordering
+    ):
+        """
+        Purges old push actions for a user and room before a given
+        stream_ordering.
+
+        We however keep a months worth of highlighted notifications, so that
+        users can still get a list of recent highlights.
+
+        Args:
+            txn: The transcation
+            room_id: Room ID to delete from
+            user_id: user ID to delete for
+            stream_ordering: The lowest stream ordering which will
+                                  not be deleted.
+        """
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id, user_id),
+        )
+
+        # We need to join on the events table to get the received_ts for
+        # event_push_actions and sqlite won't let us use a join in a delete so
+        # we can't just delete where received_ts < x. Furthermore we can
+        # only identify event_push_actions by a tuple of room_id, event_id
+        # we we can't use a subquery.
+        # Instead, we look up the stream ordering for the last event in that
+        # room received before the threshold time and delete event_push_actions
+        # in the room with a stream_odering before that.
+        txn.execute(
+            "DELETE FROM event_push_actions "
+            " WHERE user_id = ? AND room_id = ? AND "
+            " stream_ordering <= ?"
+            " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
+            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
+        )
+
+        txn.execute(
+            """
+            DELETE FROM event_push_summary
+            WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
+        """,
+            (room_id, user_id, stream_ordering),
+        )
+
 
 class EventPushActionsStore(EventPushActionsWorkerStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -894,62 +940,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
         return push_actions
 
-    async def get_latest_push_action_stream_ordering(self):
-        def f(txn):
-            txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
-            return txn.fetchone()
-
-        result = await self.db_pool.runInteraction(
-            "get_latest_push_action_stream_ordering", f
-        )
-        return result[0] or 0
-
-    def _remove_old_push_actions_before_txn(
-        self, txn, room_id, user_id, stream_ordering
-    ):
-        """
-        Purges old push actions for a user and room before a given
-        stream_ordering.
-
-        We however keep a months worth of highlighted notifications, so that
-        users can still get a list of recent highlights.
-
-        Args:
-            txn: The transcation
-            room_id: Room ID to delete from
-            user_id: user ID to delete for
-            stream_ordering: The lowest stream ordering which will
-                                  not be deleted.
-        """
-        txn.call_after(
-            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-            (room_id, user_id),
-        )
-
-        # We need to join on the events table to get the received_ts for
-        # event_push_actions and sqlite won't let us use a join in a delete so
-        # we can't just delete where received_ts < x. Furthermore we can
-        # only identify event_push_actions by a tuple of room_id, event_id
-        # we we can't use a subquery.
-        # Instead, we look up the stream ordering for the last event in that
-        # room received before the threshold time and delete event_push_actions
-        # in the room with a stream_odering before that.
-        txn.execute(
-            "DELETE FROM event_push_actions "
-            " WHERE user_id = ? AND room_id = ? AND "
-            " stream_ordering <= ?"
-            " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
-            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
-        )
-
-        txn.execute(
-            """
-            DELETE FROM event_push_summary
-            WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
-        """,
-            (room_id, user_id, stream_ordering),
-        )
-
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 90fb1a1f00..ccda9f1caa 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -17,7 +17,17 @@
 import itertools
 import logging
 from collections import OrderedDict, namedtuple
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Generator,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+)
 
 import attr
 from prometheus_client import Counter
@@ -35,7 +45,7 @@ from synapse.storage.databases.main.search import SearchEntry
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.types import StateMap, get_domain_from_id
 from synapse.util import json_encoder
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, sorted_topologically
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -366,6 +376,36 @@ class PersistEventsStore:
         # Insert into event_to_state_groups.
         self._store_event_state_mappings_txn(txn, events_and_contexts)
 
+        self._persist_event_auth_chain_txn(txn, [e for e, _ in events_and_contexts])
+
+        # _store_rejected_events_txn filters out any events which were
+        # rejected, and returns the filtered list.
+        events_and_contexts = self._store_rejected_events_txn(
+            txn, events_and_contexts=events_and_contexts
+        )
+
+        # From this point onwards the events are only ones that weren't
+        # rejected.
+
+        self._update_metadata_tables_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
+            backfilled=backfilled,
+        )
+
+        # We call this last as it assumes we've inserted the events into
+        # room_memberships, where applicable.
+        self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
+
+    def _persist_event_auth_chain_txn(
+        self, txn: LoggingTransaction, events: List[EventBase],
+    ) -> None:
+
+        # We only care about state events, so this if there are no state events.
+        if not any(e.is_state() for e in events):
+            return
+
         # We want to store event_auth mappings for rejected events, as they're
         # used in state res v2.
         # This is only necessary if the rejected event appears in an accepted
@@ -381,31 +421,467 @@ class PersistEventsStore:
                     "room_id": event.room_id,
                     "auth_id": auth_id,
                 }
-                for event, _ in events_and_contexts
+                for event in events
                 for auth_id in event.auth_event_ids()
                 if event.is_state()
             ],
         )
 
-        # _store_rejected_events_txn filters out any events which were
-        # rejected, and returns the filtered list.
-        events_and_contexts = self._store_rejected_events_txn(
-            txn, events_and_contexts=events_and_contexts
+        # We now calculate chain ID/sequence numbers for any state events we're
+        # persisting. We ignore out of band memberships as we're not in the room
+        # and won't have their auth chain (we'll fix it up later if we join the
+        # room).
+        #
+        # See: docs/auth_chain_difference_algorithm.md
+
+        # We ignore legacy rooms that we aren't filling the chain cover index
+        # for.
+        rows = self.db_pool.simple_select_many_txn(
+            txn,
+            table="rooms",
+            column="room_id",
+            iterable={event.room_id for event in events if event.is_state()},
+            keyvalues={},
+            retcols=("room_id", "has_auth_chain_index"),
         )
+        rooms_using_chain_index = {
+            row["room_id"] for row in rows if row["has_auth_chain_index"]
+        }
 
-        # From this point onwards the events are only ones that weren't
-        # rejected.
+        state_events = {
+            event.event_id: event
+            for event in events
+            if event.is_state() and event.room_id in rooms_using_chain_index
+        }
 
-        self._update_metadata_tables_txn(
+        if not state_events:
+            return
+
+        # We need to know the type/state_key and auth events of the events we're
+        # calculating chain IDs for. We don't rely on having the full Event
+        # instances as we'll potentially be pulling more events from the DB and
+        # we don't need the overhead of fetching/parsing the full event JSON.
+        event_to_types = {
+            e.event_id: (e.type, e.state_key) for e in state_events.values()
+        }
+        event_to_auth_chain = {
+            e.event_id: e.auth_event_ids() for e in state_events.values()
+        }
+        event_to_room_id = {e.event_id: e.room_id for e in state_events.values()}
+
+        self._add_chain_cover_index(
+            txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain,
+        )
+
+    @classmethod
+    def _add_chain_cover_index(
+        cls,
+        txn,
+        db_pool: DatabasePool,
+        event_to_room_id: Dict[str, str],
+        event_to_types: Dict[str, Tuple[str, str]],
+        event_to_auth_chain: Dict[str, List[str]],
+    ) -> None:
+        """Calculate the chain cover index for the given events.
+
+        Args:
+            event_to_room_id: Event ID to the room ID of the event
+            event_to_types: Event ID to type and state_key of the event
+            event_to_auth_chain: Event ID to list of auth event IDs of the
+                event (events with no auth events can be excluded).
+        """
+
+        # Map from event ID to chain ID/sequence number.
+        chain_map = {}  # type: Dict[str, Tuple[int, int]]
+
+        # Set of event IDs to calculate chain ID/seq numbers for.
+        events_to_calc_chain_id_for = set(event_to_room_id)
+
+        # We check if there are any events that need to be handled in the rooms
+        # we're looking at. These should just be out of band memberships, where
+        # we didn't have the auth chain when we first persisted.
+        rows = db_pool.simple_select_many_txn(
             txn,
-            events_and_contexts=events_and_contexts,
-            all_events_and_contexts=all_events_and_contexts,
-            backfilled=backfilled,
+            table="event_auth_chain_to_calculate",
+            keyvalues={},
+            column="room_id",
+            iterable=set(event_to_room_id.values()),
+            retcols=("event_id", "type", "state_key"),
         )
+        for row in rows:
+            event_id = row["event_id"]
+            event_type = row["type"]
+            state_key = row["state_key"]
+
+            # (We could pull out the auth events for all rows at once using
+            # simple_select_many, but this case happens rarely and almost always
+            # with a single row.)
+            auth_events = db_pool.simple_select_onecol_txn(
+                txn, "event_auth", keyvalues={"event_id": event_id}, retcol="auth_id",
+            )
 
-        # We call this last as it assumes we've inserted the events into
-        # room_memberships, where applicable.
-        self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
+            events_to_calc_chain_id_for.add(event_id)
+            event_to_types[event_id] = (event_type, state_key)
+            event_to_auth_chain[event_id] = auth_events
+
+        # First we get the chain ID and sequence numbers for the events'
+        # auth events (that aren't also currently being persisted).
+        #
+        # Note that there there is an edge case here where we might not have
+        # calculated chains and sequence numbers for events that were "out
+        # of band". We handle this case by fetching the necessary info and
+        # adding it to the set of events to calculate chain IDs for.
+
+        missing_auth_chains = {
+            a_id
+            for auth_events in event_to_auth_chain.values()
+            for a_id in auth_events
+            if a_id not in events_to_calc_chain_id_for
+        }
+
+        # We loop here in case we find an out of band membership and need to
+        # fetch their auth event info.
+        while missing_auth_chains:
+            sql = """
+                SELECT event_id, events.type, state_key, chain_id, sequence_number
+                FROM events
+                INNER JOIN state_events USING (event_id)
+                LEFT JOIN event_auth_chains USING (event_id)
+                WHERE
+            """
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "event_id", missing_auth_chains,
+            )
+            txn.execute(sql + clause, args)
+
+            missing_auth_chains.clear()
+
+            for auth_id, event_type, state_key, chain_id, sequence_number in txn:
+                event_to_types[auth_id] = (event_type, state_key)
+
+                if chain_id is None:
+                    # No chain ID, so the event was persisted out of band.
+                    # We add to list of events to calculate auth chains for.
+
+                    events_to_calc_chain_id_for.add(auth_id)
+
+                    event_to_auth_chain[auth_id] = db_pool.simple_select_onecol_txn(
+                        txn,
+                        "event_auth",
+                        keyvalues={"event_id": auth_id},
+                        retcol="auth_id",
+                    )
+
+                    missing_auth_chains.update(
+                        e
+                        for e in event_to_auth_chain[auth_id]
+                        if e not in event_to_types
+                    )
+                else:
+                    chain_map[auth_id] = (chain_id, sequence_number)
+
+        # Now we check if we have any events where we don't have auth chain,
+        # this should only be out of band memberships.
+        for event_id in sorted_topologically(event_to_auth_chain, event_to_auth_chain):
+            for auth_id in event_to_auth_chain[event_id]:
+                if (
+                    auth_id not in chain_map
+                    and auth_id not in events_to_calc_chain_id_for
+                ):
+                    events_to_calc_chain_id_for.discard(event_id)
+
+                    # If this is an event we're trying to persist we add it to
+                    # the list of events to calculate chain IDs for next time
+                    # around. (Otherwise we will have already added it to the
+                    # table).
+                    room_id = event_to_room_id.get(event_id)
+                    if room_id:
+                        e_type, state_key = event_to_types[event_id]
+                        db_pool.simple_insert_txn(
+                            txn,
+                            table="event_auth_chain_to_calculate",
+                            values={
+                                "event_id": event_id,
+                                "room_id": room_id,
+                                "type": e_type,
+                                "state_key": state_key,
+                            },
+                        )
+
+                    # We stop checking the event's auth events since we've
+                    # discarded it.
+                    break
+
+        if not events_to_calc_chain_id_for:
+            return
+
+        # Allocate chain ID/sequence numbers to each new event.
+        new_chain_tuples = cls._allocate_chain_ids(
+            txn,
+            db_pool,
+            event_to_room_id,
+            event_to_types,
+            event_to_auth_chain,
+            events_to_calc_chain_id_for,
+            chain_map,
+        )
+        chain_map.update(new_chain_tuples)
+
+        db_pool.simple_insert_many_txn(
+            txn,
+            table="event_auth_chains",
+            values=[
+                {"event_id": event_id, "chain_id": c_id, "sequence_number": seq}
+                for event_id, (c_id, seq) in new_chain_tuples.items()
+            ],
+        )
+
+        db_pool.simple_delete_many_txn(
+            txn,
+            table="event_auth_chain_to_calculate",
+            keyvalues={},
+            column="event_id",
+            iterable=new_chain_tuples,
+        )
+
+        # Now we need to calculate any new links between chains caused by
+        # the new events.
+        #
+        # Links are pairs of chain ID/sequence numbers such that for any
+        # event A (CA, SA) and any event B (CB, SB), B is in A's auth chain
+        # if and only if there is at least one link (CA, S1) -> (CB, S2)
+        # where SA >= S1 and S2 >= SB.
+        #
+        # We try and avoid adding redundant links to the table, e.g. if we
+        # have two links between two chains which both start/end at the
+        # sequence number event (or cross) then one can be safely dropped.
+        #
+        # To calculate new links we look at every new event and:
+        #   1. Fetch the chain ID/sequence numbers of its auth events,
+        #      discarding any that are reachable by other auth events, or
+        #      that have the same chain ID as the event.
+        #   2. For each retained auth event we:
+        #       a. Add a link from the event's to the auth event's chain
+        #          ID/sequence number; and
+        #       b. Add a link from the event to every chain reachable by the
+        #          auth event.
+
+        # Step 1, fetch all existing links from all the chains we've seen
+        # referenced.
+        chain_links = _LinkMap()
+        rows = db_pool.simple_select_many_txn(
+            txn,
+            table="event_auth_chain_links",
+            column="origin_chain_id",
+            iterable={chain_id for chain_id, _ in chain_map.values()},
+            keyvalues={},
+            retcols=(
+                "origin_chain_id",
+                "origin_sequence_number",
+                "target_chain_id",
+                "target_sequence_number",
+            ),
+        )
+        for row in rows:
+            chain_links.add_link(
+                (row["origin_chain_id"], row["origin_sequence_number"]),
+                (row["target_chain_id"], row["target_sequence_number"]),
+                new=False,
+            )
+
+        # We do this in toplogical order to avoid adding redundant links.
+        for event_id in sorted_topologically(
+            events_to_calc_chain_id_for, event_to_auth_chain
+        ):
+            chain_id, sequence_number = chain_map[event_id]
+
+            # Filter out auth events that are reachable by other auth
+            # events. We do this by looking at every permutation of pairs of
+            # auth events (A, B) to check if B is reachable from A.
+            reduction = {
+                a_id
+                for a_id in event_to_auth_chain.get(event_id, [])
+                if chain_map[a_id][0] != chain_id
+            }
+            for start_auth_id, end_auth_id in itertools.permutations(
+                event_to_auth_chain.get(event_id, []), r=2,
+            ):
+                if chain_links.exists_path_from(
+                    chain_map[start_auth_id], chain_map[end_auth_id]
+                ):
+                    reduction.discard(end_auth_id)
+
+            # Step 2, figure out what the new links are from the reduced
+            # list of auth events.
+            for auth_id in reduction:
+                auth_chain_id, auth_sequence_number = chain_map[auth_id]
+
+                # Step 2a, add link between the event and auth event
+                chain_links.add_link(
+                    (chain_id, sequence_number), (auth_chain_id, auth_sequence_number)
+                )
+
+                # Step 2b, add a link to chains reachable from the auth
+                # event.
+                for target_id, target_seq in chain_links.get_links_from(
+                    (auth_chain_id, auth_sequence_number)
+                ):
+                    if target_id == chain_id:
+                        continue
+
+                    chain_links.add_link(
+                        (chain_id, sequence_number), (target_id, target_seq)
+                    )
+
+        db_pool.simple_insert_many_txn(
+            txn,
+            table="event_auth_chain_links",
+            values=[
+                {
+                    "origin_chain_id": source_id,
+                    "origin_sequence_number": source_seq,
+                    "target_chain_id": target_id,
+                    "target_sequence_number": target_seq,
+                }
+                for (
+                    source_id,
+                    source_seq,
+                    target_id,
+                    target_seq,
+                ) in chain_links.get_additions()
+            ],
+        )
+
+    @staticmethod
+    def _allocate_chain_ids(
+        txn,
+        db_pool: DatabasePool,
+        event_to_room_id: Dict[str, str],
+        event_to_types: Dict[str, Tuple[str, str]],
+        event_to_auth_chain: Dict[str, List[str]],
+        events_to_calc_chain_id_for: Set[str],
+        chain_map: Dict[str, Tuple[int, int]],
+    ) -> Dict[str, Tuple[int, int]]:
+        """Allocates, but does not persist, chain ID/sequence numbers for the
+        events in `events_to_calc_chain_id_for`. (c.f. _add_chain_cover_index
+        for info on args)
+        """
+
+        # We now calculate the chain IDs/sequence numbers for the events. We do
+        # this by looking at the chain ID and sequence number of any auth event
+        # with the same type/state_key and incrementing the sequence number by
+        # one. If there was no match or the chain ID/sequence number is already
+        # taken we generate a new chain.
+        #
+        # We try to reduce the number of times that we hit the database by
+        # batching up calls, to make this more efficient when persisting large
+        # numbers of state events (e.g. during joins).
+        #
+        # We do this by:
+        #   1. Calculating for each event which auth event will be used to
+        #      inherit the chain ID, i.e. converting the auth chain graph to a
+        #      tree that we can allocate chains on. We also keep track of which
+        #      existing chain IDs have been referenced.
+        #   2. Fetching the max allocated sequence number for each referenced
+        #      existing chain ID, generating a map from chain ID to the max
+        #      allocated sequence number.
+        #   3. Iterating over the tree and allocating a chain ID/seq no. to the
+        #      new event, by incrementing the sequence number from the
+        #      referenced event's chain ID/seq no. and checking that the
+        #      incremented sequence number hasn't already been allocated (by
+        #      looking in the map generated in the previous step). We generate a
+        #      new chain if the sequence number has already been allocated.
+        #
+
+        existing_chains = set()  # type: Set[int]
+        tree = []  # type: List[Tuple[str, Optional[str]]]
+
+        # We need to do this in a topologically sorted order as we want to
+        # generate chain IDs/sequence numbers of an event's auth events before
+        # the event itself.
+        for event_id in sorted_topologically(
+            events_to_calc_chain_id_for, event_to_auth_chain
+        ):
+            for auth_id in event_to_auth_chain.get(event_id, []):
+                if event_to_types.get(event_id) == event_to_types.get(auth_id):
+                    existing_chain_id = chain_map.get(auth_id)
+                    if existing_chain_id:
+                        existing_chains.add(existing_chain_id[0])
+
+                    tree.append((event_id, auth_id))
+                    break
+            else:
+                tree.append((event_id, None))
+
+        # Fetch the current max sequence number for each existing referenced chain.
+        sql = """
+            SELECT chain_id, MAX(sequence_number) FROM event_auth_chains
+            WHERE %s
+            GROUP BY chain_id
+        """
+        clause, args = make_in_list_sql_clause(
+            db_pool.engine, "chain_id", existing_chains
+        )
+        txn.execute(sql % (clause,), args)
+
+        chain_to_max_seq_no = {row[0]: row[1] for row in txn}  # type: Dict[Any, int]
+
+        # Allocate the new events chain ID/sequence numbers.
+        #
+        # To reduce the number of calls to the database we don't allocate a
+        # chain ID number in the loop, instead we use a temporary `object()` for
+        # each new chain ID. Once we've done the loop we generate the necessary
+        # number of new chain IDs in one call, replacing all temporary
+        # objects with real allocated chain IDs.
+
+        unallocated_chain_ids = set()  # type: Set[object]
+        new_chain_tuples = {}  # type: Dict[str, Tuple[Any, int]]
+        for event_id, auth_event_id in tree:
+            # If we reference an auth_event_id we fetch the allocated chain ID,
+            # either from the existing `chain_map` or the newly generated
+            # `new_chain_tuples` map.
+            existing_chain_id = None
+            if auth_event_id:
+                existing_chain_id = new_chain_tuples.get(auth_event_id)
+                if not existing_chain_id:
+                    existing_chain_id = chain_map[auth_event_id]
+
+            new_chain_tuple = None  # type: Optional[Tuple[Any, int]]
+            if existing_chain_id:
+                # We found a chain ID/sequence number candidate, check its
+                # not already taken.
+                proposed_new_id = existing_chain_id[0]
+                proposed_new_seq = existing_chain_id[1] + 1
+
+                if chain_to_max_seq_no[proposed_new_id] < proposed_new_seq:
+                    new_chain_tuple = (
+                        proposed_new_id,
+                        proposed_new_seq,
+                    )
+
+            # If we need to start a new chain we allocate a temporary chain ID.
+            if not new_chain_tuple:
+                new_chain_tuple = (object(), 1)
+                unallocated_chain_ids.add(new_chain_tuple[0])
+
+            new_chain_tuples[event_id] = new_chain_tuple
+            chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1]
+
+        # Generate new chain IDs for all unallocated chain IDs.
+        newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn(
+            txn, len(unallocated_chain_ids)
+        )
+
+        # Map from potentially temporary chain ID to real chain ID
+        chain_id_to_allocated_map = dict(
+            zip(unallocated_chain_ids, newly_allocated_chain_ids)
+        )  # type: Dict[Any, int]
+        chain_id_to_allocated_map.update((c, c) for c in existing_chains)
+
+        return {
+            event_id: (chain_id_to_allocated_map[chain_id], seq)
+            for event_id, (chain_id, seq) in new_chain_tuples.items()
+        }
 
     def _persist_transaction_ids_txn(
         self,
@@ -489,7 +965,7 @@ class PersistEventsStore:
                         WHERE room_id = ? AND type = ? AND state_key = ?
                     )
                 """
-                txn.executemany(
+                txn.execute_batch(
                     sql,
                     (
                         (
@@ -508,7 +984,7 @@ class PersistEventsStore:
                 )
                 # Now we actually update the current_state_events table
 
-                txn.executemany(
+                txn.execute_batch(
                     "DELETE FROM current_state_events"
                     " WHERE room_id = ? AND type = ? AND state_key = ?",
                     (
@@ -520,7 +996,7 @@ class PersistEventsStore:
                 # We include the membership in the current state table, hence we do
                 # a lookup when we insert. This assumes that all events have already
                 # been inserted into room_memberships.
-                txn.executemany(
+                txn.execute_batch(
                     """INSERT INTO current_state_events
                         (room_id, type, state_key, event_id, membership)
                     VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
@@ -540,7 +1016,7 @@ class PersistEventsStore:
             # we have no record of the fact the user *was* a member of the
             # room but got, say, state reset out of it.
             if to_delete or to_insert:
-                txn.executemany(
+                txn.execute_batch(
                     "DELETE FROM local_current_membership"
                     " WHERE room_id = ? AND user_id = ?",
                     (
@@ -551,7 +1027,7 @@ class PersistEventsStore:
                 )
 
             if to_insert:
-                txn.executemany(
+                txn.execute_batch(
                     """INSERT INTO local_current_membership
                         (room_id, user_id, event_id, membership)
                     VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
@@ -799,7 +1275,8 @@ class PersistEventsStore:
         return [ec for ec in events_and_contexts if ec[0] not in to_remove]
 
     def _store_event_txn(self, txn, events_and_contexts):
-        """Insert new events into the event and event_json tables
+        """Insert new events into the event, event_json, redaction and
+        state_events tables.
 
         Args:
             txn (twisted.enterprise.adbapi.Connection): db connection
@@ -871,6 +1348,29 @@ class PersistEventsStore:
                     updatevalues={"have_censored": False},
                 )
 
+        state_events_and_contexts = [
+            ec for ec in events_and_contexts if ec[0].is_state()
+        ]
+
+        state_values = []
+        for event, context in state_events_and_contexts:
+            vals = {
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "type": event.type,
+                "state_key": event.state_key,
+            }
+
+            # TODO: How does this work with backfilling?
+            if hasattr(event, "replaces_state"):
+                vals["prev_state"] = event.replaces_state
+
+            state_values.append(vals)
+
+        self.db_pool.simple_insert_many_txn(
+            txn, table="state_events", values=state_values
+        )
+
     def _store_rejected_events_txn(self, txn, events_and_contexts):
         """Add rows to the 'rejections' table for received events which were
         rejected
@@ -987,29 +1487,6 @@ class PersistEventsStore:
             txn, [event for event, _ in events_and_contexts]
         )
 
-        state_events_and_contexts = [
-            ec for ec in events_and_contexts if ec[0].is_state()
-        ]
-
-        state_values = []
-        for event, context in state_events_and_contexts:
-            vals = {
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "type": event.type,
-                "state_key": event.state_key,
-            }
-
-            # TODO: How does this work with backfilling?
-            if hasattr(event, "replaces_state"):
-                vals["prev_state"] = event.replaces_state
-
-            state_values.append(vals)
-
-        self.db_pool.simple_insert_many_txn(
-            txn, table="state_events", values=state_values
-        )
-
         # Prefill the event cache
         self._add_to_cache(txn, events_and_contexts)
 
@@ -1350,7 +1827,7 @@ class PersistEventsStore:
         """
 
         if events_and_contexts:
-            txn.executemany(
+            txn.execute_batch(
                 sql,
                 (
                     (
@@ -1379,7 +1856,7 @@ class PersistEventsStore:
 
         # Now we delete the staging area for *all* events that were being
         # persisted.
-        txn.executemany(
+        txn.execute_batch(
             "DELETE FROM event_push_actions_staging WHERE event_id = ?",
             ((event.event_id,) for event, _ in all_events_and_contexts),
         )
@@ -1498,7 +1975,7 @@ class PersistEventsStore:
             " )"
         )
 
-        txn.executemany(
+        txn.execute_batch(
             query,
             [
                 (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
@@ -1512,7 +1989,7 @@ class PersistEventsStore:
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
         )
-        txn.executemany(
+        txn.execute_batch(
             query,
             [
                 (ev.event_id, ev.room_id)
@@ -1520,3 +1997,131 @@ class PersistEventsStore:
                 if not ev.internal_metadata.is_outlier()
             ],
         )
+
+
+@attr.s(slots=True)
+class _LinkMap:
+    """A helper type for tracking links between chains.
+    """
+
+    # Stores the set of links as nested maps: source chain ID -> target chain ID
+    # -> source sequence number -> target sequence number.
+    maps = attr.ib(type=Dict[int, Dict[int, Dict[int, int]]], factory=dict)
+
+    # Stores the links that have been added (with new set to true), as tuples of
+    # `(source chain ID, source sequence no, target chain ID, target sequence no.)`
+    additions = attr.ib(type=Set[Tuple[int, int, int, int]], factory=set)
+
+    def add_link(
+        self,
+        src_tuple: Tuple[int, int],
+        target_tuple: Tuple[int, int],
+        new: bool = True,
+    ) -> bool:
+        """Add a new link between two chains, ensuring no redundant links are added.
+
+        New links should be added in topological order.
+
+        Args:
+            src_tuple: The chain ID/sequence number of the source of the link.
+            target_tuple: The chain ID/sequence number of the target of the link.
+            new: Whether this is a "new" link, i.e. should it be returned
+                by `get_additions`.
+
+        Returns:
+            True if a link was added, false if the given link was dropped as redundant
+        """
+        src_chain, src_seq = src_tuple
+        target_chain, target_seq = target_tuple
+
+        current_links = self.maps.setdefault(src_chain, {}).setdefault(target_chain, {})
+
+        assert src_chain != target_chain
+
+        if new:
+            # Check if the new link is redundant
+            for current_seq_src, current_seq_target in current_links.items():
+                # If a link "crosses" another link then its redundant. For example
+                # in the following link 1 (L1) is redundant, as any event reachable
+                # via L1 is *also* reachable via L2.
+                #
+                #   Chain A     Chain B
+                #      |          |
+                #   L1 |------    |
+                #      |     |    |
+                #   L2 |---- | -->|
+                #      |     |    |
+                #      |     |--->|
+                #      |          |
+                #      |          |
+                #
+                # So we only need to keep links which *do not* cross, i.e. links
+                # that both start and end above or below an existing link.
+                #
+                # Note, since we add links in topological ordering we should never
+                # see `src_seq` less than `current_seq_src`.
+
+                if current_seq_src <= src_seq and target_seq <= current_seq_target:
+                    # This new link is redundant, nothing to do.
+                    return False
+
+            self.additions.add((src_chain, src_seq, target_chain, target_seq))
+
+        current_links[src_seq] = target_seq
+        return True
+
+    def get_links_from(
+        self, src_tuple: Tuple[int, int]
+    ) -> Generator[Tuple[int, int], None, None]:
+        """Gets the chains reachable from the given chain/sequence number.
+
+        Yields:
+            The chain ID and sequence number the link points to.
+        """
+        src_chain, src_seq = src_tuple
+        for target_id, sequence_numbers in self.maps.get(src_chain, {}).items():
+            for link_src_seq, target_seq in sequence_numbers.items():
+                if link_src_seq <= src_seq:
+                    yield target_id, target_seq
+
+    def get_links_between(
+        self, source_chain: int, target_chain: int
+    ) -> Generator[Tuple[int, int], None, None]:
+        """Gets the links between two chains.
+
+        Yields:
+            The source and target sequence numbers.
+        """
+
+        yield from self.maps.get(source_chain, {}).get(target_chain, {}).items()
+
+    def get_additions(self) -> Generator[Tuple[int, int, int, int], None, None]:
+        """Gets any newly added links.
+
+        Yields:
+            The source chain ID/sequence number and target chain ID/sequence number
+        """
+
+        for src_chain, src_seq, target_chain, _ in self.additions:
+            target_seq = self.maps.get(src_chain, {}).get(target_chain, {}).get(src_seq)
+            if target_seq is not None:
+                yield (src_chain, src_seq, target_chain, target_seq)
+
+    def exists_path_from(
+        self, src_tuple: Tuple[int, int], target_tuple: Tuple[int, int],
+    ) -> bool:
+        """Checks if there is a path between the source chain ID/sequence and
+        target chain ID/sequence.
+        """
+        src_chain, src_seq = src_tuple
+        target_chain, target_seq = target_tuple
+
+        if src_chain == target_chain:
+            return target_seq <= src_seq
+
+        links = self.get_links_between(src_chain, target_chain)
+        for link_start_seq, link_end_seq in links:
+            if link_start_seq <= src_seq and target_seq <= link_end_seq:
+                return True
+
+        return False
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 97b6754846..5ca4fa6817 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -14,14 +14,41 @@
 # limitations under the License.
 
 import logging
+from typing import Dict, List, Optional, Tuple
+
+import attr
 
 from synapse.api.constants import EventContentFields
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import make_event_from_dict
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
+from synapse.storage.databases.main.events import PersistEventsStore
+from synapse.storage.types import Cursor
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
 
+@attr.s(slots=True, frozen=True)
+class _CalculateChainCover:
+    """Return value for _calculate_chain_cover_txn.
+    """
+
+    # The last room_id/depth/stream processed.
+    room_id = attr.ib(type=str)
+    depth = attr.ib(type=int)
+    stream = attr.ib(type=int)
+
+    # Number of rows processed
+    processed_count = attr.ib(type=int)
+
+    # Map from room_id to last depth/stream processed for each room that we have
+    # processed all events for (i.e. the rooms we can flip the
+    # `has_auth_chain_index` for)
+    finished_room_map = attr.ib(type=Dict[str, Tuple[int, int]])
+
+
 class EventsBackgroundUpdatesStore(SQLBaseStore):
 
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
@@ -99,13 +126,19 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             columns=["user_id", "created_ts"],
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "rejected_events_metadata", self._rejected_events_metadata,
+        )
+
+        self.db_pool.updates.register_background_update_handler(
+            "chain_cover", self._chain_cover_index,
+        )
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
 
-        INSERT_CLUMP_SIZE = 1000
-
         def reindex_txn(txn):
             sql = (
                 "SELECT stream_ordering, event_id, json FROM events"
@@ -143,9 +176,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
 
-            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
-                clump = update_rows[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
+            txn.execute_batch(sql, update_rows)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
@@ -175,8 +206,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
 
-        INSERT_CLUMP_SIZE = 1000
-
         def reindex_search_txn(txn):
             sql = (
                 "SELECT stream_ordering, event_id FROM events"
@@ -221,9 +250,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
             sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
 
-            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
-                clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
+            txn.execute_batch(sql, rows_to_update)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
@@ -582,3 +609,314 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             await self.db_pool.updates._end_background_update("event_store_labels")
 
         return num_rows
+
+    async def _rejected_events_metadata(self, progress: dict, batch_size: int) -> int:
+        """Adds rejected events to the `state_events` and `event_auth` metadata
+        tables.
+        """
+
+        last_event_id = progress.get("last_event_id", "")
+
+        def get_rejected_events(
+            txn: Cursor,
+        ) -> List[Tuple[str, str, JsonDict, bool, bool]]:
+            # Fetch rejected event json, their room version and whether we have
+            # inserted them into the state_events or auth_events tables.
+            #
+            # Note we can assume that events that don't have a corresponding
+            # room version are V1 rooms.
+            sql = """
+                SELECT DISTINCT
+                    event_id,
+                    COALESCE(room_version, '1'),
+                    json,
+                    state_events.event_id IS NOT NULL,
+                    event_auth.event_id IS NOT NULL
+                FROM rejections
+                INNER JOIN event_json USING (event_id)
+                LEFT JOIN rooms USING (room_id)
+                LEFT JOIN state_events USING (event_id)
+                LEFT JOIN event_auth USING (event_id)
+                WHERE event_id > ?
+                ORDER BY event_id
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_event_id, batch_size,))
+
+            return [(row[0], row[1], db_to_json(row[2]), row[3], row[4]) for row in txn]  # type: ignore
+
+        results = await self.db_pool.runInteraction(
+            desc="_rejected_events_metadata_get", func=get_rejected_events
+        )
+
+        if not results:
+            await self.db_pool.updates._end_background_update(
+                "rejected_events_metadata"
+            )
+            return 0
+
+        state_events = []
+        auth_events = []
+        for event_id, room_version, event_json, has_state, has_event_auth in results:
+            last_event_id = event_id
+
+            if has_state and has_event_auth:
+                continue
+
+            room_version_obj = KNOWN_ROOM_VERSIONS.get(room_version)
+            if not room_version_obj:
+                # We no longer support this room version, so we just ignore the
+                # events entirely.
+                logger.info(
+                    "Ignoring event with unknown room version %r: %r",
+                    room_version,
+                    event_id,
+                )
+                continue
+
+            event = make_event_from_dict(event_json, room_version_obj)
+
+            if not event.is_state():
+                continue
+
+            if not has_state:
+                state_events.append(
+                    {
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "type": event.type,
+                        "state_key": event.state_key,
+                    }
+                )
+
+            if not has_event_auth:
+                for auth_id in event.auth_event_ids():
+                    auth_events.append(
+                        {
+                            "room_id": event.room_id,
+                            "event_id": event.event_id,
+                            "auth_id": auth_id,
+                        }
+                    )
+
+        if state_events:
+            await self.db_pool.simple_insert_many(
+                table="state_events",
+                values=state_events,
+                desc="_rejected_events_metadata_state_events",
+            )
+
+        if auth_events:
+            await self.db_pool.simple_insert_many(
+                table="event_auth",
+                values=auth_events,
+                desc="_rejected_events_metadata_event_auth",
+            )
+
+        await self.db_pool.updates._background_update_progress(
+            "rejected_events_metadata", {"last_event_id": last_event_id}
+        )
+
+        if len(results) < batch_size:
+            await self.db_pool.updates._end_background_update(
+                "rejected_events_metadata"
+            )
+
+        return len(results)
+
+    async def _chain_cover_index(self, progress: dict, batch_size: int) -> int:
+        """A background updates that iterates over all rooms and generates the
+        chain cover index for them.
+        """
+
+        current_room_id = progress.get("current_room_id", "")
+
+        # Where we've processed up to in the room, defaults to the start of the
+        # room.
+        last_depth = progress.get("last_depth", -1)
+        last_stream = progress.get("last_stream", -1)
+
+        result = await self.db_pool.runInteraction(
+            "_chain_cover_index",
+            self._calculate_chain_cover_txn,
+            current_room_id,
+            last_depth,
+            last_stream,
+            batch_size,
+            single_room=False,
+        )
+
+        finished = result.processed_count == 0
+
+        total_rows_processed = result.processed_count
+        current_room_id = result.room_id
+        last_depth = result.depth
+        last_stream = result.stream
+
+        for room_id, (depth, stream) in result.finished_room_map.items():
+            # If we've done all the events in the room we flip the
+            # `has_auth_chain_index` in the DB. Note that its possible for
+            # further events to be persisted between the above and setting the
+            # flag without having the chain cover calculated for them. This is
+            # fine as a) the code gracefully handles these cases and b) we'll
+            # calculate them below.
+
+            await self.db_pool.simple_update(
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                updatevalues={"has_auth_chain_index": True},
+                desc="_chain_cover_index",
+            )
+
+            # Handle any events that might have raced with us flipping the
+            # bit above.
+            result = await self.db_pool.runInteraction(
+                "_chain_cover_index",
+                self._calculate_chain_cover_txn,
+                room_id,
+                depth,
+                stream,
+                batch_size=None,
+                single_room=True,
+            )
+
+            total_rows_processed += result.processed_count
+
+        if finished:
+            await self.db_pool.updates._end_background_update("chain_cover")
+            return total_rows_processed
+
+        await self.db_pool.updates._background_update_progress(
+            "chain_cover",
+            {
+                "current_room_id": current_room_id,
+                "last_depth": last_depth,
+                "last_stream": last_stream,
+            },
+        )
+
+        return total_rows_processed
+
+    def _calculate_chain_cover_txn(
+        self,
+        txn: Cursor,
+        last_room_id: str,
+        last_depth: int,
+        last_stream: int,
+        batch_size: Optional[int],
+        single_room: bool,
+    ) -> _CalculateChainCover:
+        """Calculate the chain cover for `batch_size` events, ordered by
+        `(room_id, depth, stream)`.
+
+        Args:
+            txn,
+            last_room_id, last_depth, last_stream: The `(room_id, depth, stream)`
+                tuple to fetch results after.
+            batch_size: The maximum number of events to process. If None then
+                no limit.
+            single_room: Whether to calculate the index for just the given
+                room.
+        """
+
+        # Get the next set of events in the room (that we haven't already
+        # computed chain cover for). We do this in topological order.
+
+        # We want to do a `(topological_ordering, stream_ordering) > (?,?)`
+        # comparison, but that is not supported on older SQLite versions
+        tuple_clause, tuple_args = make_tuple_comparison_clause(
+            self.database_engine,
+            [
+                ("events.room_id", last_room_id),
+                ("topological_ordering", last_depth),
+                ("stream_ordering", last_stream),
+            ],
+        )
+
+        extra_clause = ""
+        if single_room:
+            extra_clause = "AND events.room_id = ?"
+            tuple_args.append(last_room_id)
+
+        sql = """
+            SELECT
+                event_id, state_events.type, state_events.state_key,
+                topological_ordering, stream_ordering,
+                events.room_id
+            FROM events
+            INNER JOIN state_events USING (event_id)
+            LEFT JOIN event_auth_chains USING (event_id)
+            LEFT JOIN event_auth_chain_to_calculate USING (event_id)
+            WHERE event_auth_chains.event_id IS NULL
+                AND event_auth_chain_to_calculate.event_id IS NULL
+                AND %(tuple_cmp)s
+                %(extra)s
+            ORDER BY events.room_id, topological_ordering, stream_ordering
+            %(limit)s
+        """ % {
+            "tuple_cmp": tuple_clause,
+            "limit": "LIMIT ?" if batch_size is not None else "",
+            "extra": extra_clause,
+        }
+
+        if batch_size is not None:
+            tuple_args.append(batch_size)
+
+        txn.execute(sql, tuple_args)
+        rows = txn.fetchall()
+
+        # Put the results in the necessary format for
+        # `_add_chain_cover_index`
+        event_to_room_id = {row[0]: row[5] for row in rows}
+        event_to_types = {row[0]: (row[1], row[2]) for row in rows}
+
+        # Calculate the new last position we've processed up to.
+        new_last_depth = rows[-1][3] if rows else last_depth  # type: int
+        new_last_stream = rows[-1][4] if rows else last_stream  # type: int
+        new_last_room_id = rows[-1][5] if rows else ""  # type: str
+
+        # Map from room_id to last depth/stream_ordering processed for the room,
+        # excluding the last room (which we're likely still processing). We also
+        # need to include the room passed in if it's not included in the result
+        # set (as we then know we've processed all events in said room).
+        #
+        # This is the set of rooms that we can now safely flip the
+        # `has_auth_chain_index` bit for.
+        finished_rooms = {
+            row[5]: (row[3], row[4]) for row in rows if row[5] != new_last_room_id
+        }
+        if last_room_id not in finished_rooms and last_room_id != new_last_room_id:
+            finished_rooms[last_room_id] = (last_depth, last_stream)
+
+        count = len(rows)
+
+        # We also need to fetch the auth events for them.
+        auth_events = self.db_pool.simple_select_many_txn(
+            txn,
+            table="event_auth",
+            column="event_id",
+            iterable=event_to_room_id,
+            keyvalues={},
+            retcols=("event_id", "auth_id"),
+        )
+
+        event_to_auth_chain = {}  # type: Dict[str, List[str]]
+        for row in auth_events:
+            event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"])
+
+        # Calculate and persist the chain cover index for this set of events.
+        #
+        # Annoyingly we need to gut wrench into the persit event store so that
+        # we can reuse the function to calculate the chain cover for rooms.
+        PersistEventsStore._add_chain_cover_index(
+            txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain,
+        )
+
+        return _CalculateChainCover(
+            room_id=new_last_room_id,
+            depth=new_last_depth,
+            stream=new_last_stream,
+            processed_count=count,
+            finished_room_map=finished_rooms,
+        )
diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py
new file mode 100644
index 0000000000..0ac1da9c35
--- /dev/null
+++ b/synapse/storage/databases/main/events_forward_extremities.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import Dict, List
+
+from synapse.api.errors import SynapseError
+from synapse.storage._base import SQLBaseStore
+
+logger = logging.getLogger(__name__)
+
+
+class EventForwardExtremitiesStore(SQLBaseStore):
+    async def delete_forward_extremities_for_room(self, room_id: str) -> int:
+        """Delete any extra forward extremities for a room.
+
+        Invalidates the "get_latest_event_ids_in_room" cache if any forward
+        extremities were deleted.
+
+        Returns count deleted.
+        """
+
+        def delete_forward_extremities_for_room_txn(txn):
+            # First we need to get the event_id to not delete
+            sql = """
+                SELECT event_id FROM event_forward_extremities
+                INNER JOIN events USING (room_id, event_id)
+                WHERE room_id = ?
+                ORDER BY stream_ordering DESC
+                LIMIT 1
+            """
+            txn.execute(sql, (room_id,))
+            rows = txn.fetchall()
+            try:
+                event_id = rows[0][0]
+                logger.debug(
+                    "Found event_id %s as the forward extremity to keep for room %s",
+                    event_id,
+                    room_id,
+                )
+            except KeyError:
+                msg = "No forward extremity event found for room %s" % room_id
+                logger.warning(msg)
+                raise SynapseError(400, msg)
+
+            # Now delete the extra forward extremities
+            sql = """
+                DELETE FROM event_forward_extremities
+                WHERE event_id != ? AND room_id = ?
+            """
+
+            txn.execute(sql, (event_id, room_id))
+            logger.info(
+                "Deleted %s extra forward extremities for room %s",
+                txn.rowcount,
+                room_id,
+            )
+
+            if txn.rowcount > 0:
+                # Invalidate the cache
+                self._invalidate_cache_and_stream(
+                    txn, self.get_latest_event_ids_in_room, (room_id,),
+                )
+
+            return txn.rowcount
+
+        return await self.db_pool.runInteraction(
+            "delete_forward_extremities_for_room",
+            delete_forward_extremities_for_room_txn,
+        )
+
+    async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]:
+        """Get list of forward extremities for a room."""
+
+        def get_forward_extremities_for_room_txn(txn):
+            sql = """
+                SELECT event_id, state_group, depth, received_ts
+                FROM event_forward_extremities
+                INNER JOIN event_to_state_groups USING (event_id)
+                INNER JOIN events USING (room_id, event_id)
+                WHERE room_id = ?
+            """
+
+            txn.execute(sql, (room_id,))
+            return self.db_pool.cursor_to_dict(txn)
+
+        return await self.db_pool.runInteraction(
+            "get_forward_extremities_for_room", get_forward_extremities_for_room_txn,
+        )
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4732685f6e..71d823be72 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -96,9 +96,7 @@ class EventsWorkerStore(SQLBaseStore):
                 db=database,
                 stream_name="events",
                 instance_name=hs.get_instance_name(),
-                table="events",
-                instance_column="instance_name",
-                id_column="stream_ordering",
+                tables=[("events", "instance_name", "stream_ordering")],
                 sequence_name="events_stream_seq",
                 writers=hs.config.worker.writers.events,
             )
@@ -107,9 +105,7 @@ class EventsWorkerStore(SQLBaseStore):
                 db=database,
                 stream_name="backfill",
                 instance_name=hs.get_instance_name(),
-                table="events",
-                instance_column="instance_name",
-                id_column="stream_ordering",
+                tables=[("events", "instance_name", "stream_ordering")],
                 sequence_name="events_backfill_stream_seq",
                 positive=False,
                 writers=hs.config.worker.writers.events,
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index f8f4bb9b3f..04ac2d0ced 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -22,6 +22,7 @@ from signedjson.key import decode_verify_key_bytes
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.keys import FetchKeyResult
+from synapse.storage.types import Cursor
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.iterutils import batch_iter
 
@@ -44,7 +45,7 @@ class KeyStore(SQLBaseStore):
     )
     async def get_server_verify_keys(
         self, server_name_and_key_ids: Iterable[Tuple[str, str]]
-    ) -> Dict[Tuple[str, str], Optional[FetchKeyResult]]:
+    ) -> Dict[Tuple[str, str], FetchKeyResult]:
         """
         Args:
             server_name_and_key_ids:
@@ -56,7 +57,7 @@ class KeyStore(SQLBaseStore):
         """
         keys = {}
 
-        def _get_keys(txn, batch):
+        def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str]]) -> None:
             """Processes a batch of keys to fetch, and adds the result to `keys`."""
 
             # batch_iter always returns tuples so it's safe to do len(batch)
@@ -77,13 +78,12 @@ class KeyStore(SQLBaseStore):
                     # `ts_valid_until_ms`.
                     ts_valid_until_ms = 0
 
-                res = FetchKeyResult(
+                keys[(server_name, key_id)] = FetchKeyResult(
                     verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
                     valid_until_ts=ts_valid_until_ms,
                 )
-                keys[(server_name, key_id)] = res
 
-        def _txn(txn):
+        def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]:
             for batch in batch_iter(server_name_and_key_ids, 50):
                 _get_keys(txn, batch)
             return keys
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 4b2f224718..e017177655 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -169,7 +170,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
 
     async def get_local_media_before(
         self, before_ts: int, size_gt: int, keep_profiles: bool,
-    ) -> Optional[List[str]]:
+    ) -> List[str]:
 
         # to find files that have never been accessed (last_access_ts IS NULL)
         # compare with `created_ts`
@@ -416,7 +417,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 " WHERE media_origin = ? AND media_id = ?"
             )
 
-            txn.executemany(
+            txn.execute_batch(
                 sql,
                 (
                     (time_ms, media_origin, media_id)
@@ -429,7 +430,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 " WHERE media_id = ?"
             )
 
-            txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
+            txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media))
 
         return await self.db_pool.runInteraction(
             "update_cached_last_access_time", update_cache_txn
@@ -556,7 +557,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
 
         def _delete_url_cache_txn(txn):
-            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+            txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
 
         return await self.db_pool.runInteraction(
             "delete_url_cache", _delete_url_cache_txn
@@ -585,11 +586,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         def _delete_url_cache_media_txn(txn):
             sql = "DELETE FROM local_media_repository WHERE media_id = ?"
 
-            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+            txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
 
             sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
 
-            txn.executemany(sql, [(media_id,) for media_id in media_ids])
+            txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
 
         return await self.db_pool.runInteraction(
             "delete_url_cache_media", _delete_url_cache_media_txn
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index ab18cc4d79..92e65aa640 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -88,6 +88,62 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
             (x[0] - 1) * x[1] for x in res if x[1]
         )
 
+    async def count_daily_e2ee_messages(self):
+        """
+        Returns an estimate of the number of messages sent in the last day.
+
+        If it has been significantly less or more than one day since the last
+        call to this function, it will return None.
+        """
+
+        def _count_messages(txn):
+            sql = """
+                SELECT COALESCE(COUNT(*), 0) FROM events
+                WHERE type = 'm.room.encrypted'
+                AND stream_ordering > ?
+            """
+            txn.execute(sql, (self.stream_ordering_day_ago,))
+            (count,) = txn.fetchone()
+            return count
+
+        return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
+
+    async def count_daily_sent_e2ee_messages(self):
+        def _count_messages(txn):
+            # This is good enough as if you have silly characters in your own
+            # hostname then thats your own fault.
+            like_clause = "%:" + self.hs.hostname
+
+            sql = """
+                SELECT COALESCE(COUNT(*), 0) FROM events
+                WHERE type = 'm.room.encrypted'
+                    AND sender LIKE ?
+                AND stream_ordering > ?
+            """
+
+            txn.execute(sql, (like_clause, self.stream_ordering_day_ago))
+            (count,) = txn.fetchone()
+            return count
+
+        return await self.db_pool.runInteraction(
+            "count_daily_sent_e2ee_messages", _count_messages
+        )
+
+    async def count_daily_active_e2ee_rooms(self):
+        def _count(txn):
+            sql = """
+                SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events
+                WHERE type = 'm.room.encrypted'
+                AND stream_ordering > ?
+            """
+            txn.execute(sql, (self.stream_ordering_day_ago,))
+            (count,) = txn.fetchone()
+            return count
+
+        return await self.db_pool.runInteraction(
+            "count_daily_active_e2ee_rooms", _count
+        )
+
     async def count_daily_messages(self):
         """
         Returns an estimate of the number of messages sent in the last day.
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 0e25ca3d7a..54ef0f1f54 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -82,7 +82,7 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     async def set_profile_avatar_url(
-        self, user_localpart: str, new_avatar_url: str
+        self, user_localpart: str, new_avatar_url: Optional[str]
     ) -> None:
         await self.db_pool.simple_update_one(
             table="profiles",
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 5d668aadb2..ecfc9f20b1 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -172,7 +172,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         )
 
         # Update backward extremeties
-        txn.executemany(
+        txn.execute_batch(
             "INSERT INTO event_backward_extremities (room_id, event_id)"
             " VALUES (?, ?)",
             [(room_id, event_id) for event_id, in new_backwards_extrems],
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 7997242d90..2687ef3e43 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -15,18 +15,31 @@
 # limitations under the License.
 
 import logging
-from typing import Iterable, Iterator, List, Tuple
-
-from canonicaljson import encode_canonical_json
+from typing import TYPE_CHECKING, Any, Dict, Iterable, Iterator, List, Optional, Tuple
 
+from synapse.push import PusherConfig, ThrottleParams
 from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage.database import DatabasePool
+from synapse.storage.types import Connection
+from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.types import JsonDict
+from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class PusherWorkerStore(SQLBaseStore):
-    def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[dict]:
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+        super().__init__(database, db_conn, hs)
+        self._pushers_id_gen = StreamIdGenerator(
+            db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
+        )
+
+    def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]:
         """JSON-decode the data in the rows returned from the `pushers` table
 
         Drops any rows whose data cannot be decoded
@@ -44,21 +57,23 @@ class PusherWorkerStore(SQLBaseStore):
                 )
                 continue
 
-            yield r
+            yield PusherConfig(**r)
 
-    async def user_has_pusher(self, user_id):
+    async def user_has_pusher(self, user_id: str) -> bool:
         ret = await self.db_pool.simple_select_one_onecol(
             "pushers", {"user_name": user_id}, "id", allow_none=True
         )
         return ret is not None
 
-    def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
-        return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
+    async def get_pushers_by_app_id_and_pushkey(
+        self, app_id: str, pushkey: str
+    ) -> Iterator[PusherConfig]:
+        return await self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
 
-    def get_pushers_by_user_id(self, user_id):
-        return self.get_pushers_by({"user_name": user_id})
+    async def get_pushers_by_user_id(self, user_id: str) -> Iterator[PusherConfig]:
+        return await self.get_pushers_by({"user_name": user_id})
 
-    async def get_pushers_by(self, keyvalues):
+    async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConfig]:
         ret = await self.db_pool.simple_select_list(
             "pushers",
             keyvalues,
@@ -83,7 +98,7 @@ class PusherWorkerStore(SQLBaseStore):
         )
         return self._decode_pushers_rows(ret)
 
-    async def get_all_pushers(self):
+    async def get_all_pushers(self) -> Iterator[PusherConfig]:
         def get_pushers(txn):
             txn.execute("SELECT * FROM pushers")
             rows = self.db_pool.cursor_to_dict(txn)
@@ -159,14 +174,16 @@ class PusherWorkerStore(SQLBaseStore):
         )
 
     @cached(num_args=1, max_entries=15000)
-    async def get_if_user_has_pusher(self, user_id):
+    async def get_if_user_has_pusher(self, user_id: str):
         # This only exists for the cachedList decorator
         raise NotImplementedError()
 
     @cachedList(
         cached_method_name="get_if_user_has_pusher", list_name="user_ids", num_args=1,
     )
-    async def get_if_users_have_pushers(self, user_ids):
+    async def get_if_users_have_pushers(
+        self, user_ids: Iterable[str]
+    ) -> Dict[str, bool]:
         rows = await self.db_pool.simple_select_many_batch(
             table="pushers",
             column="user_name",
@@ -224,7 +241,7 @@ class PusherWorkerStore(SQLBaseStore):
         return bool(updated)
 
     async def update_pusher_failing_since(
-        self, app_id, pushkey, user_id, failing_since
+        self, app_id: str, pushkey: str, user_id: str, failing_since: Optional[int]
     ) -> None:
         await self.db_pool.simple_update(
             table="pushers",
@@ -233,7 +250,9 @@ class PusherWorkerStore(SQLBaseStore):
             desc="update_pusher_failing_since",
         )
 
-    async def get_throttle_params_by_room(self, pusher_id):
+    async def get_throttle_params_by_room(
+        self, pusher_id: str
+    ) -> Dict[str, ThrottleParams]:
         res = await self.db_pool.simple_select_list(
             "pusher_throttle",
             {"pusher": pusher_id},
@@ -243,43 +262,44 @@ class PusherWorkerStore(SQLBaseStore):
 
         params_by_room = {}
         for row in res:
-            params_by_room[row["room_id"]] = {
-                "last_sent_ts": row["last_sent_ts"],
-                "throttle_ms": row["throttle_ms"],
-            }
+            params_by_room[row["room_id"]] = ThrottleParams(
+                row["last_sent_ts"], row["throttle_ms"],
+            )
 
         return params_by_room
 
-    async def set_throttle_params(self, pusher_id, room_id, params) -> None:
+    async def set_throttle_params(
+        self, pusher_id: str, room_id: str, params: ThrottleParams
+    ) -> None:
         # no need to lock because `pusher_throttle` has a primary key on
         # (pusher, room_id) so simple_upsert will retry
         await self.db_pool.simple_upsert(
             "pusher_throttle",
             {"pusher": pusher_id, "room_id": room_id},
-            params,
+            {"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
             desc="set_throttle_params",
             lock=False,
         )
 
 
 class PusherStore(PusherWorkerStore):
-    def get_pushers_stream_token(self):
+    def get_pushers_stream_token(self) -> int:
         return self._pushers_id_gen.get_current_token()
 
     async def add_pusher(
         self,
-        user_id,
-        access_token,
-        kind,
-        app_id,
-        app_display_name,
-        device_display_name,
-        pushkey,
-        pushkey_ts,
-        lang,
-        data,
-        last_stream_ordering,
-        profile_tag="",
+        user_id: str,
+        access_token: Optional[int],
+        kind: str,
+        app_id: str,
+        app_display_name: str,
+        device_display_name: str,
+        pushkey: str,
+        pushkey_ts: int,
+        lang: Optional[str],
+        data: Optional[JsonDict],
+        last_stream_ordering: int,
+        profile_tag: str = "",
     ) -> None:
         async with self._pushers_id_gen.get_next() as stream_id:
             # no need to lock because `pushers` has a unique key on
@@ -294,7 +314,7 @@ class PusherStore(PusherWorkerStore):
                     "device_display_name": device_display_name,
                     "ts": pushkey_ts,
                     "lang": lang,
-                    "data": bytearray(encode_canonical_json(data)),
+                    "data": json_encoder.encode(data),
                     "last_stream_ordering": last_stream_ordering,
                     "profile_tag": profile_tag,
                     "id": stream_id,
@@ -311,20 +331,22 @@ class PusherStore(PusherWorkerStore):
                 # invalidate, since we the user might not have had a pusher before
                 await self.db_pool.runInteraction(
                     "add_pusher",
-                    self._invalidate_cache_and_stream,
+                    self._invalidate_cache_and_stream,  # type: ignore
                     self.get_if_user_has_pusher,
                     (user_id,),
                 )
 
     async def delete_pusher_by_app_id_pushkey_user_id(
-        self, app_id, pushkey, user_id
+        self, app_id: str, pushkey: str, user_id: str
     ) -> None:
         def delete_pusher_txn(txn, stream_id):
-            self._invalidate_cache_and_stream(
+            self._invalidate_cache_and_stream(  # type: ignore
                 txn, self.get_if_user_has_pusher, (user_id,)
             )
 
-            self.db_pool.simple_delete_one_txn(
+            # It is expected that there is exactly one pusher to delete, but
+            # if it isn't there (or there are multiple) delete them all.
+            self.db_pool.simple_delete_txn(
                 txn,
                 "pushers",
                 {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 1e7949a323..e4843a202c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -14,15 +14,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import abc
 import logging
 from typing import Any, Dict, List, Optional, Tuple
 
 from twisted.internet import defer
 
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
-from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
@@ -31,28 +33,56 @@ from synapse.util.caches.stream_change_cache import StreamChangeCache
 logger = logging.getLogger(__name__)
 
 
-# The ABCMeta metaclass ensures that it cannot be instantiated without
-# the abstract methods being implemented.
-class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
-    """This is an abstract base class where subclasses must implement
-    `get_max_receipt_stream_id` which can be called in the initializer.
-    """
-
+class ReceiptsWorkerStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
+        self._instance_name = hs.get_instance_name()
+
+        if isinstance(database.engine, PostgresEngine):
+            self._can_write_to_receipts = (
+                self._instance_name in hs.config.worker.writers.receipts
+            )
+
+            self._receipts_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="receipts",
+                instance_name=self._instance_name,
+                tables=[("receipts_linearized", "instance_name", "stream_id")],
+                sequence_name="receipts_sequence",
+                writers=hs.config.worker.writers.receipts,
+            )
+        else:
+            self._can_write_to_receipts = True
+
+            # We shouldn't be running in worker mode with SQLite, but its useful
+            # to support it for unit tests.
+            #
+            # If this process is the writer than we need to use
+            # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
+            # updated over replication. (Multiple writers are not supported for
+            # SQLite).
+            if hs.get_instance_name() in hs.config.worker.writers.receipts:
+                self._receipts_id_gen = StreamIdGenerator(
+                    db_conn, "receipts_linearized", "stream_id"
+                )
+            else:
+                self._receipts_id_gen = SlavedIdTracker(
+                    db_conn, "receipts_linearized", "stream_id"
+                )
+
         super().__init__(database, db_conn, hs)
 
         self._receipts_stream_cache = StreamChangeCache(
             "ReceiptsRoomChangeCache", self.get_max_receipt_stream_id()
         )
 
-    @abc.abstractmethod
     def get_max_receipt_stream_id(self):
         """Get the current max stream ID for receipts stream
 
         Returns:
             int
         """
-        raise NotImplementedError()
+        return self._receipts_id_gen.get_current_token()
 
     @cached()
     async def get_users_with_read_receipts_in_room(self, room_id):
@@ -428,19 +458,25 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
 
         self.get_users_with_read_receipts_in_room.invalidate((room_id,))
 
-
-class ReceiptsStore(ReceiptsWorkerStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        # We instantiate this first as the ReceiptsWorkerStore constructor
-        # needs to be able to call get_max_receipt_stream_id
-        self._receipts_id_gen = StreamIdGenerator(
-            db_conn, "receipts_linearized", "stream_id"
+    def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
+        self.get_receipts_for_user.invalidate((user_id, receipt_type))
+        self._get_linearized_receipts_for_room.invalidate_many((room_id,))
+        self.get_last_receipt_event_id_for_user.invalidate(
+            (user_id, room_id, receipt_type)
         )
+        self._invalidate_get_users_with_receipts_in_room(room_id, receipt_type, user_id)
+        self.get_receipts_for_room.invalidate((room_id, receipt_type))
+
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
+        if stream_name == ReceiptsStream.NAME:
+            self._receipts_id_gen.advance(instance_name, token)
+            for row in rows:
+                self.invalidate_caches_for_receipt(
+                    row.room_id, row.receipt_type, row.user_id
+                )
+                self._receipts_stream_cache.entity_has_changed(row.room_id, token)
 
-        super().__init__(database, db_conn, hs)
-
-    def get_max_receipt_stream_id(self):
-        return self._receipts_id_gen.get_current_token()
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
 
     def insert_linearized_receipt_txn(
         self, txn, room_id, receipt_type, user_id, event_id, data, stream_id
@@ -452,6 +488,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
             otherwise, the rx timestamp of the event that the RR corresponds to
                 (or 0 if the event is unknown)
         """
+        assert self._can_write_to_receipts
+
         res = self.db_pool.simple_select_one_txn(
             txn,
             table="events",
@@ -483,28 +521,14 @@ class ReceiptsStore(ReceiptsWorkerStore):
                     )
                     return None
 
-        txn.call_after(self.get_receipts_for_room.invalidate, (room_id, receipt_type))
-        txn.call_after(
-            self._invalidate_get_users_with_receipts_in_room,
-            room_id,
-            receipt_type,
-            user_id,
-        )
-        txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
-        # FIXME: This shouldn't invalidate the whole cache
         txn.call_after(
-            self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
+            self.invalidate_caches_for_receipt, room_id, receipt_type, user_id
         )
 
         txn.call_after(
             self._receipts_stream_cache.entity_has_changed, room_id, stream_id
         )
 
-        txn.call_after(
-            self.get_last_receipt_event_id_for_user.invalidate,
-            (user_id, room_id, receipt_type),
-        )
-
         self.db_pool.simple_upsert_txn(
             txn,
             table="receipts_linearized",
@@ -543,6 +567,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
         Automatically does conversion between linearized and graph
         representations.
         """
+        assert self._can_write_to_receipts
+
         if not event_ids:
             return None
 
@@ -607,6 +633,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
     async def insert_graph_receipt(
         self, room_id, receipt_type, user_id, event_ids, data
     ):
+        assert self._can_write_to_receipts
+
         return await self.db_pool.runInteraction(
             "insert_graph_receipt",
             self.insert_graph_receipt_txn,
@@ -620,6 +648,8 @@ class ReceiptsStore(ReceiptsWorkerStore):
     def insert_graph_receipt_txn(
         self, txn, room_id, receipt_type, user_id, event_ids, data
     ):
+        assert self._can_write_to_receipts
+
         txn.call_after(self.get_receipts_for_room.invalidate, (room_id, receipt_type))
         txn.call_after(
             self._invalidate_get_users_with_receipts_in_room,
@@ -653,3 +683,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
                 "data": json_encoder.encode(data),
             },
         )
+
+
+class ReceiptsStore(ReceiptsWorkerStore):
+    pass
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index fedb8a6c26..8405dd460f 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -360,6 +360,35 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
 
         await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
 
+    async def set_shadow_banned(self, user: UserID, shadow_banned: bool) -> None:
+        """Sets whether a user shadow-banned.
+
+        Args:
+            user: user ID of the user to test
+            shadow_banned: true iff the user is to be shadow-banned, false otherwise.
+        """
+
+        def set_shadow_banned_txn(txn):
+            self.db_pool.simple_update_one_txn(
+                txn,
+                table="users",
+                keyvalues={"name": user.to_string()},
+                updatevalues={"shadow_banned": shadow_banned},
+            )
+            # In order for this to apply immediately, clear the cache for this user.
+            tokens = self.db_pool.simple_select_onecol_txn(
+                txn,
+                table="access_tokens",
+                keyvalues={"user_id": user.to_string()},
+                retcol="token",
+            )
+            for token in tokens:
+                self._invalidate_cache_and_stream(
+                    txn, self.get_user_by_access_token, (token,)
+                )
+
+        await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
+
     def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]:
         sql = """
             SELECT users.name as user_id,
@@ -443,6 +472,26 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
 
         return await self.db_pool.runInteraction("get_users_by_id_case_insensitive", f)
 
+    async def record_user_external_id(
+        self, auth_provider: str, external_id: str, user_id: str
+    ) -> None:
+        """Record a mapping from an external user id to a mxid
+
+        Args:
+            auth_provider: identifier for the remote auth provider
+            external_id: id on that system
+            user_id: complete mxid that it is mapped to
+        """
+        await self.db_pool.simple_insert(
+            table="user_external_ids",
+            values={
+                "auth_provider": auth_provider,
+                "external_id": external_id,
+                "user_id": user_id,
+            },
+            desc="record_user_external_id",
+        )
+
     async def get_user_by_external_id(
         self, auth_provider: str, external_id: str
     ) -> Optional[str]:
@@ -463,6 +512,23 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
             desc="get_user_by_external_id",
         )
 
+    async def get_external_ids_by_user(self, mxid: str) -> List[Tuple[str, str]]:
+        """Look up external ids for the given user
+
+        Args:
+            mxid: the MXID to be looked up
+
+        Returns:
+            Tuples of (auth_provider, external_id)
+        """
+        res = await self.db_pool.simple_select_list(
+            table="user_external_ids",
+            keyvalues={"user_id": mxid},
+            retcols=("auth_provider", "external_id"),
+            desc="get_external_ids_by_user",
+        )
+        return [(r["auth_provider"], r["external_id"]) for r in res]
+
     async def count_all_users(self):
         """Counts all users registered on the homeserver."""
 
@@ -926,6 +992,42 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
             desc="del_user_pending_deactivation",
         )
 
+    async def get_access_token_last_validated(self, token_id: int) -> int:
+        """Retrieves the time (in milliseconds) of the last validation of an access token.
+
+        Args:
+            token_id: The ID of the access token to update.
+        Raises:
+            StoreError if the access token was not found.
+
+        Returns:
+            The last validation time.
+        """
+        result = await self.db_pool.simple_select_one_onecol(
+            "access_tokens", {"id": token_id}, "last_validated"
+        )
+
+        # If this token has not been validated (since starting to track this),
+        # return 0 instead of None.
+        return result or 0
+
+    async def update_access_token_last_validated(self, token_id: int) -> None:
+        """Updates the last time an access token was validated.
+
+        Args:
+            token_id: The ID of the access token to update.
+        Raises:
+            StoreError if there was a problem updating this.
+        """
+        now = self._clock.time_msec()
+
+        await self.db_pool.simple_update_one(
+            "access_tokens",
+            {"id": token_id},
+            {"last_validated": now},
+            desc="update_access_token_last_validated",
+        )
+
 
 class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
     def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
@@ -963,6 +1065,14 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
             "users_set_deactivated_flag", self._background_update_set_deactivated_flag
         )
 
+        self.db_pool.updates.register_background_index_update(
+            "user_external_ids_user_id_idx",
+            index_name="user_external_ids_user_id_idx",
+            table="user_external_ids",
+            columns=["user_id"],
+            unique=False,
+        )
+
     async def _background_update_set_deactivated_flag(self, progress, batch_size):
         """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
         for each of them.
@@ -1043,7 +1153,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
                 FROM user_threepids
             """
 
-            txn.executemany(sql, [(id_server,) for id_server in id_servers])
+            txn.execute_batch(sql, [(id_server,) for id_server in id_servers])
 
         if id_servers:
             await self.db_pool.runInteraction(
@@ -1125,6 +1235,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
             The token ID
         """
         next_id = self._access_tokens_id_gen.get_next()
+        now = self._clock.time_msec()
 
         await self.db_pool.simple_insert(
             "access_tokens",
@@ -1135,6 +1246,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
                 "device_id": device_id,
                 "valid_until_ms": valid_until_ms,
                 "puppets_user_id": puppets_user_id,
+                "last_validated": now,
             },
             desc="add_access_token_to_user",
         )
@@ -1308,26 +1420,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
 
         self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
-    async def record_user_external_id(
-        self, auth_provider: str, external_id: str, user_id: str
-    ) -> None:
-        """Record a mapping from an external user id to a mxid
-
-        Args:
-            auth_provider: identifier for the remote auth provider
-            external_id: id on that system
-            user_id: complete mxid that it is mapped to
-        """
-        await self.db_pool.simple_insert(
-            table="user_external_ids",
-            values={
-                "auth_provider": auth_provider,
-                "external_id": external_id,
-                "user_id": user_id,
-            },
-            desc="record_user_external_id",
-        )
-
     async def user_set_password_hash(
         self, user_id: str, password_hash: Optional[str]
     ) -> None:
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 6b89db15c9..a9fcb5f59c 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -16,7 +16,6 @@
 
 import collections
 import logging
-import re
 from abc import abstractmethod
 from enum import Enum
 from typing import Any, Dict, List, Optional, Tuple
@@ -30,6 +29,7 @@ from synapse.storage.databases.main.search import SearchStore
 from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
+from synapse.util.stringutils import MXC_REGEX
 
 logger = logging.getLogger(__name__)
 
@@ -84,7 +84,7 @@ class RoomWorkerStore(SQLBaseStore):
         return await self.db_pool.simple_select_one(
             table="rooms",
             keyvalues={"room_id": room_id},
-            retcols=("room_id", "is_public", "creator"),
+            retcols=("room_id", "is_public", "creator", "has_auth_chain_index"),
             desc="get_room",
             allow_none=True,
         )
@@ -379,14 +379,14 @@ class RoomWorkerStore(SQLBaseStore):
         # Filter room names by a string
         where_statement = ""
         if search_term:
-            where_statement = "WHERE state.name LIKE ?"
+            where_statement = "WHERE LOWER(state.name) LIKE ?"
 
             # Our postgres db driver converts ? -> %s in SQL strings as that's the
             # placeholder for postgres.
             # HOWEVER, if you put a % into your SQL then everything goes wibbly.
             # To get around this, we're going to surround search_term with %'s
             # before giving it to the database in python instead
-            search_term = "%" + search_term + "%"
+            search_term = "%" + search_term.lower() + "%"
 
         # Set ordering
         if RoomSortOrder(order_by) == RoomSortOrder.SIZE:
@@ -660,8 +660,6 @@ class RoomWorkerStore(SQLBaseStore):
             The local and remote media as a lists of tuples where the key is
             the hostname and the value is the media ID.
         """
-        mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
-
         sql = """
             SELECT stream_ordering, json FROM events
             JOIN event_json USING (room_id, event_id)
@@ -688,7 +686,7 @@ class RoomWorkerStore(SQLBaseStore):
                 for url in (content_url, thumbnail_url):
                     if not url:
                         continue
-                    matches = mxc_re.match(url)
+                    matches = MXC_REGEX.match(url)
                     if matches:
                         hostname = matches.group(1)
                         media_id = matches.group(2)
@@ -1166,6 +1164,37 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
         # It's overridden by RoomStore for the synapse master.
         raise NotImplementedError()
 
+    async def has_auth_chain_index(self, room_id: str) -> bool:
+        """Check if the room has (or can have) a chain cover index.
+
+        Defaults to True if we don't have an entry in `rooms` table nor any
+        events for the room.
+        """
+
+        has_auth_chain_index = await self.db_pool.simple_select_one_onecol(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcol="has_auth_chain_index",
+            desc="has_auth_chain_index",
+            allow_none=True,
+        )
+
+        if has_auth_chain_index:
+            return True
+
+        # It's possible that we already have events for the room in our DB
+        # without a corresponding room entry. If we do then we don't want to
+        # mark the room as having an auth chain cover index.
+        max_ordering = await self.db_pool.simple_select_one_onecol(
+            table="events",
+            keyvalues={"room_id": room_id},
+            retcol="MAX(stream_ordering)",
+            allow_none=True,
+            desc="upsert_room_on_join",
+        )
+
+        return max_ordering is None
+
 
 class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
@@ -1179,12 +1208,21 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         Called when we join a room over federation, and overwrites any room version
         currently in the table.
         """
+        # It's possible that we already have events for the room in our DB
+        # without a corresponding room entry. If we do then we don't want to
+        # mark the room as having an auth chain cover index.
+        has_auth_chain_index = await self.has_auth_chain_index(room_id)
+
         await self.db_pool.simple_upsert(
             desc="upsert_room_on_join",
             table="rooms",
             keyvalues={"room_id": room_id},
             values={"room_version": room_version.identifier},
-            insertion_values={"is_public": False, "creator": ""},
+            insertion_values={
+                "is_public": False,
+                "creator": "",
+                "has_auth_chain_index": has_auth_chain_index,
+            },
             # rooms has a unique constraint on room_id, so no need to lock when doing an
             # emulated upsert.
             lock=False,
@@ -1219,6 +1257,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                         "creator": room_creator_user_id,
                         "is_public": is_public,
                         "room_version": room_version.identifier,
+                        "has_auth_chain_index": True,
                     },
                 )
                 if is_public:
@@ -1247,6 +1286,11 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         When we receive an invite or any other event over federation that may relate to a room
         we are not in, store the version of the room if we don't already know the room version.
         """
+        # It's possible that we already have events for the room in our DB
+        # without a corresponding room entry. If we do then we don't want to
+        # mark the room as having an auth chain cover index.
+        has_auth_chain_index = await self.has_auth_chain_index(room_id)
+
         await self.db_pool.simple_upsert(
             desc="maybe_store_room_on_outlier_membership",
             table="rooms",
@@ -1256,6 +1300,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                 "room_version": room_version.identifier,
                 "is_public": False,
                 "creator": "",
+                "has_auth_chain_index": has_auth_chain_index,
             },
             # rooms has a unique constraint on room_id, so no need to lock when doing an
             # emulated upsert.
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index dcdaf09682..92382bed28 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -873,8 +873,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
             "max_stream_id_exclusive", self._stream_order_on_start + 1
         )
 
-        INSERT_CLUMP_SIZE = 1000
-
         def add_membership_profile_txn(txn):
             sql = """
                 SELECT stream_ordering, event_id, events.room_id, event_json.json
@@ -915,9 +913,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
                 UPDATE room_memberships SET display_name = ?, avatar_url = ?
                 WHERE event_id = ? AND room_id = ?
             """
-            for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
-                clump = to_update[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(to_update_sql, clump)
+            txn.execute_batch(to_update_sql, to_update)
 
             progress = {
                 "target_min_stream_id_inclusive": target_min_stream_id,
diff --git a/synapse/storage/databases/main/schema/delta/58/25user_external_ids_user_id_idx.sql b/synapse/storage/databases/main/schema/delta/58/25user_external_ids_user_id_idx.sql
new file mode 100644
index 0000000000..8f5e65aa71
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/25user_external_ids_user_id_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (5825, 'user_external_ids_user_id_idx', '{}');
diff --git a/synapse/storage/databases/main/schema/delta/58/26access_token_last_validated.sql b/synapse/storage/databases/main/schema/delta/58/26access_token_last_validated.sql
new file mode 100644
index 0000000000..1a101cd5eb
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/26access_token_last_validated.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The last time this access token was "validated" (i.e. logged in or succeeded
+-- at user-interactive authentication).
+ALTER TABLE access_tokens ADD COLUMN last_validated BIGINT;
diff --git a/synapse/storage/databases/main/schema/delta/58/27local_invites.sql b/synapse/storage/databases/main/schema/delta/58/27local_invites.sql
new file mode 100644
index 0000000000..44b2a0572f
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/27local_invites.sql
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is unused since Synapse v1.17.0.
+DROP TABLE local_invites;
diff --git a/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres
new file mode 100644
index 0000000000..de57645019
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.postgres
@@ -0,0 +1,16 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE access_tokens DROP COLUMN last_used;
\ No newline at end of file
diff --git a/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite
new file mode 100644
index 0000000000..ee0e3521bf
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/28drop_last_used_column.sql.sqlite
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- Dropping last_used column from access_tokens table.
+
+CREATE TABLE access_tokens2 (
+    id BIGINT PRIMARY KEY, 
+    user_id TEXT NOT NULL, 
+    device_id TEXT, 
+    token TEXT NOT NULL,
+    valid_until_ms BIGINT,
+    puppets_user_id TEXT,
+    last_validated BIGINT,
+    UNIQUE(token) 
+);
+
+INSERT INTO access_tokens2(id, user_id, device_id, token)
+    SELECT id, user_id, device_id, token FROM access_tokens;
+
+DROP TABLE access_tokens;
+ALTER TABLE access_tokens2 RENAME TO access_tokens;
+
+CREATE INDEX access_tokens_device_id ON access_tokens (user_id, device_id);
+
+
+-- Re-adding foreign key reference in event_txn_id table
+
+CREATE TABLE event_txn_id2 (
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    token_id BIGINT NOT NULL,
+    txn_id TEXT NOT NULL,
+    inserted_ts BIGINT NOT NULL,
+    FOREIGN KEY (event_id)
+        REFERENCES events (event_id) ON DELETE CASCADE,
+    FOREIGN KEY (token_id)
+        REFERENCES access_tokens (id) ON DELETE CASCADE
+);
+
+INSERT INTO event_txn_id2(event_id, room_id, user_id, token_id, txn_id, inserted_ts)
+    SELECT event_id, room_id, user_id, token_id, txn_id, inserted_ts FROM event_txn_id;
+
+DROP TABLE event_txn_id;
+ALTER TABLE event_txn_id2 RENAME TO event_txn_id;
+
+CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id);
+CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
+CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts);
\ No newline at end of file
diff --git a/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql
new file mode 100644
index 0000000000..9c95646281
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/28rejected_events_metadata.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (5828, 'rejected_events_metadata', '{}');
diff --git a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
new file mode 100644
index 0000000000..9e8f35c1d2
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py
@@ -0,0 +1,82 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This migration denormalises the account_data table into an ignored users table.
+"""
+
+import logging
+from io import StringIO
+
+from synapse.storage._base import db_to_json
+from synapse.storage.engines import BaseDatabaseEngine
+from synapse.storage.prepare_database import execute_statements_from_stream
+from synapse.storage.types import Cursor
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    pass
+
+
+def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    logger.info("Creating ignored_users table")
+    execute_statements_from_stream(cur, StringIO(_create_commands))
+
+    # We now upgrade existing data, if any. We don't do this in `run_upgrade` as
+    # we a) want to run these before adding constraints and b) `run_upgrade` is
+    # not run on empty databases.
+    insert_sql = """
+    INSERT INTO ignored_users (ignorer_user_id, ignored_user_id) VALUES (?, ?)
+    """
+
+    logger.info("Converting existing ignore lists")
+    cur.execute(
+        "SELECT user_id, content FROM account_data WHERE account_data_type = 'm.ignored_user_list'"
+    )
+    for user_id, content_json in cur.fetchall():
+        content = db_to_json(content_json)
+
+        # The content should be the form of a dictionary with a key
+        # "ignored_users" pointing to a dictionary with keys of ignored users.
+        #
+        # { "ignored_users": "@someone:example.org": {} }
+        ignored_users = content.get("ignored_users", {})
+        if isinstance(ignored_users, dict) and ignored_users:
+            cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users])
+
+    # Add indexes after inserting data for efficiency.
+    logger.info("Adding constraints to ignored_users table")
+    execute_statements_from_stream(cur, StringIO(_constraints_commands))
+
+
+# there might be duplicates, so the easiest way to achieve this is to create a new
+# table with the right data, and renaming it into place
+
+_create_commands = """
+-- Users which are ignored when calculating push notifications. This data is
+-- denormalized from account data.
+CREATE TABLE IF NOT EXISTS ignored_users(
+    ignorer_user_id TEXT NOT NULL,  -- The user ID of the user who is ignoring another user. (This is a local user.)
+    ignored_user_id TEXT NOT NULL  -- The user ID of the user who is being ignored. (This is a local or remote user.)
+);
+"""
+
+_constraints_commands = """
+CREATE UNIQUE INDEX ignored_users_uniqueness ON ignored_users (ignorer_user_id, ignored_user_id);
+
+-- Add an index on ignored_users since look-ups are done to get all ignorers of an ignored user.
+CREATE INDEX ignored_users_ignored_user_id ON ignored_users (ignored_user_id);
+"""
diff --git a/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql
new file mode 100644
index 0000000000..d781a92fec
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/02shard_send_to_device.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE device_inbox ADD COLUMN instance_name TEXT;
+ALTER TABLE device_federation_inbox ADD COLUMN instance_name TEXT;
+ALTER TABLE device_federation_outbox ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres
new file mode 100644
index 0000000000..45a845a3a5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/03shard_send_to_device_sequence.sql.postgres
@@ -0,0 +1,25 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS device_inbox_sequence;
+
+-- We need to take the max across both device_inbox and device_federation_outbox
+-- tables as they share the ID generator
+SELECT setval('device_inbox_sequence', (
+    SELECT GREATEST(
+        (SELECT COALESCE(MAX(stream_id), 1) FROM device_inbox),
+        (SELECT COALESCE(MAX(stream_id), 1) FROM device_federation_outbox)
+    )
+));
diff --git a/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql
new file mode 100644
index 0000000000..729196cfd5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql
@@ -0,0 +1,52 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- See docs/auth_chain_difference_algorithm.md
+
+CREATE TABLE event_auth_chains (
+  event_id TEXT PRIMARY KEY,
+  chain_id BIGINT NOT NULL,
+  sequence_number BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX event_auth_chains_c_seq_index ON event_auth_chains (chain_id, sequence_number);
+
+
+CREATE TABLE event_auth_chain_links (
+  origin_chain_id BIGINT NOT NULL,
+  origin_sequence_number BIGINT NOT NULL,
+
+  target_chain_id BIGINT NOT NULL,
+  target_sequence_number BIGINT NOT NULL
+);
+
+
+CREATE INDEX event_auth_chain_links_idx ON event_auth_chain_links (origin_chain_id, target_chain_id);
+
+
+-- Events that we have persisted but not calculated auth chains for,
+-- e.g. out of band memberships (where we don't have the auth chain)
+CREATE TABLE event_auth_chain_to_calculate (
+  event_id TEXT PRIMARY KEY,
+  room_id TEXT NOT NULL,
+  type TEXT NOT NULL,
+  state_key TEXT NOT NULL
+);
+
+CREATE INDEX event_auth_chain_to_calculate_rm_id ON event_auth_chain_to_calculate(room_id);
+
+
+-- Whether we've calculated the above index for a room.
+ALTER TABLE rooms ADD COLUMN has_auth_chain_index BOOLEAN;
diff --git a/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres
new file mode 100644
index 0000000000..e8a035bbeb
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/04_event_auth_chains.sql.postgres
@@ -0,0 +1,16 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS event_auth_chain_id;
diff --git a/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql b/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql
new file mode 100644
index 0000000000..64ab696cfe
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/04drop_account_data.sql
@@ -0,0 +1,17 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is no longer used and was only kept until we bumped the schema version.
+DROP TABLE IF EXISTS account_data_max_stream_id;
diff --git a/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql b/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql
new file mode 100644
index 0000000000..fb71b360a0
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/05cache_invalidation.sql
@@ -0,0 +1,17 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This is no longer used and was only kept until we bumped the schema version.
+DROP TABLE IF EXISTS cache_invalidation_stream;
diff --git a/synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql b/synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql
new file mode 100644
index 0000000000..fe3dca71dd
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+  (5906, 'chain_cover', '{}', 'rejected_events_metadata');
diff --git a/synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql b/synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql
new file mode 100644
index 0000000000..46abf8d562
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql
@@ -0,0 +1,20 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE room_account_data ADD COLUMN instance_name TEXT;
+ALTER TABLE room_tags_revisions ADD COLUMN instance_name TEXT;
+ALTER TABLE account_data ADD COLUMN instance_name TEXT;
+
+ALTER TABLE receipts_linearized ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql.postgres b/synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql.postgres
new file mode 100644
index 0000000000..4a6e6c74f5
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/06shard_account_data.sql.postgres
@@ -0,0 +1,32 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS account_data_sequence;
+
+-- We need to take the max across all the account_data tables as they share the
+-- ID generator
+SELECT setval('account_data_sequence', (
+    SELECT GREATEST(
+        (SELECT COALESCE(MAX(stream_id), 1) FROM room_account_data),
+        (SELECT COALESCE(MAX(stream_id), 1) FROM room_tags_revisions),
+        (SELECT COALESCE(MAX(stream_id), 1) FROM account_data)
+    )
+));
+
+CREATE SEQUENCE IF NOT EXISTS receipts_sequence;
+
+SELECT setval('receipts_sequence', (
+    SELECT COALESCE(MAX(stream_id), 1) FROM receipts_linearized
+));
diff --git a/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql b/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql
new file mode 100644
index 0000000000..9f2b5ebc5a
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/07shard_account_data_fix.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We incorrectly populated these, so we delete them and let the
+-- MultiWriterIdGenerator repopulate it.
+DELETE FROM stream_positions WHERE stream_name = 'receipts' OR stream_name = 'account_data';
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index e34fce6281..f5e7d9ef98 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -24,6 +24,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.types import Collection
 
 logger = logging.getLogger(__name__)
 
@@ -63,7 +64,7 @@ class SearchWorkerStore(SQLBaseStore):
                 for entry in entries
             )
 
-            txn.executemany(sql, args)
+            txn.execute_batch(sql, args)
 
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = (
@@ -75,7 +76,7 @@ class SearchWorkerStore(SQLBaseStore):
                 for entry in entries
             )
 
-            txn.executemany(sql, args)
+            txn.execute_batch(sql, args)
         else:
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
@@ -460,7 +461,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
     async def search_rooms(
         self,
-        room_ids: List[str],
+        room_ids: Collection[str],
         search_term: str,
         keys: List[str],
         limit,
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 0cdb3ec1f7..d421d18f8d 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -15,11 +15,12 @@
 # limitations under the License.
 
 import logging
-from collections import Counter
 from enum import Enum
 from itertools import chain
 from typing import Any, Dict, List, Optional, Tuple
 
+from typing_extensions import Counter
+
 from twisted.internet.defer import DeferredLock
 
 from synapse.api.constants import EventTypes, Membership
@@ -319,7 +320,9 @@ class StatsStore(StateDeltasStore):
         return slice_list
 
     @cached()
-    async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
+    async def get_earliest_token_for_stats(
+        self, stats_type: str, id: str
+    ) -> Optional[int]:
         """
         Fetch the "earliest token". This is used by the room stats delta
         processor to ignore deltas that have been processed between the
@@ -339,7 +342,7 @@ class StatsStore(StateDeltasStore):
         )
 
     async def bulk_update_stats_delta(
-        self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
+        self, ts: int, updates: Dict[str, Dict[str, Counter[str]]], stream_id: int
     ) -> None:
         """Bulk update stats tables for a given stream_id and updates the stats
         incremental position.
@@ -665,7 +668,7 @@ class StatsStore(StateDeltasStore):
 
     async def get_changes_room_total_events_and_bytes(
         self, min_pos: int, max_pos: int
-    ) -> Dict[str, Dict[str, int]]:
+    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
         """Fetches the counts of events in the given range of stream IDs.
 
         Args:
@@ -683,18 +686,19 @@ class StatsStore(StateDeltasStore):
             max_pos,
         )
 
-    def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
+    def get_changes_room_total_events_and_bytes_txn(
+        self, txn, low_pos: int, high_pos: int
+    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
         """Gets the total_events and total_event_bytes counts for rooms and
         senders, in a range of stream_orderings (including backfilled events).
 
         Args:
             txn
-            low_pos (int): Low stream ordering
-            high_pos (int): High stream ordering
+            low_pos: Low stream ordering
+            high_pos: High stream ordering
 
         Returns:
-            tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
-            room and user deltas for total_events/total_event_bytes in the
+            The room and user deltas for total_events/total_event_bytes in the
             format of `stats_id` -> fields
         """
 
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index 9f120d3cb6..50067eabfc 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -183,8 +183,6 @@ class TagsWorkerStore(AccountDataWorkerStore):
         )
         return {row["tag"]: db_to_json(row["content"]) for row in rows}
 
-
-class TagsStore(TagsWorkerStore):
     async def add_tag_to_room(
         self, user_id: str, room_id: str, tag: str, content: JsonDict
     ) -> int:
@@ -199,6 +197,8 @@ class TagsStore(TagsWorkerStore):
         Returns:
             The next account data ID.
         """
+        assert self._can_write_to_account_data
+
         content_json = json_encoder.encode(content)
 
         def add_tag_txn(txn, next_id):
@@ -223,6 +223,7 @@ class TagsStore(TagsWorkerStore):
         Returns:
             The next account data ID.
         """
+        assert self._can_write_to_account_data
 
         def remove_tag_txn(txn, next_id):
             sql = (
@@ -250,21 +251,12 @@ class TagsStore(TagsWorkerStore):
             room_id: The ID of the room.
             next_id: The the revision to advance to.
         """
+        assert self._can_write_to_account_data
 
         txn.call_after(
             self._account_data_stream_cache.entity_has_changed, user_id, next_id
         )
 
-        # Note: This is only here for backwards compat to allow admins to
-        # roll back to a previous Synapse version. Next time we update the
-        # database version we can remove this table.
-        update_max_id_sql = (
-            "UPDATE account_data_max_stream_id"
-            " SET stream_id = ?"
-            " WHERE stream_id < ?"
-        )
-        txn.execute(update_max_id_sql, (next_id, next_id))
-
         update_sql = (
             "UPDATE room_tags_revisions"
             " SET stream_id = ?"
@@ -288,3 +280,7 @@ class TagsStore(TagsWorkerStore):
                 # which stream_id ends up in the table, as long as it is higher
                 # than the id that the client has.
                 pass
+
+
+class TagsStore(TagsWorkerStore):
+    pass
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 59207cadd4..cea595ff19 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -464,19 +464,17 @@ class TransactionStore(TransactionWorkerStore):
         txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
     ) -> List[str]:
         q = """
-            SELECT destination FROM destinations
-                WHERE destination IN (
-                    SELECT destination FROM destination_rooms
-                        WHERE destination_rooms.stream_ordering >
-                            destinations.last_successful_stream_ordering
-                )
-                AND destination > ?
-                AND (
-                    retry_last_ts IS NULL OR
-                    retry_last_ts + retry_interval < ?
-                )
-                ORDER BY destination
-                LIMIT 25
+            SELECT DISTINCT destination FROM destinations
+            INNER JOIN destination_rooms USING (destination)
+                WHERE
+                    stream_ordering > last_successful_stream_ordering
+                    AND destination > ?
+                    AND (
+                        retry_last_ts IS NULL OR
+                        retry_last_ts + retry_interval < ?
+                    )
+                    ORDER BY destination
+                    LIMIT 25
         """
         txn.execute(
             q,
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index d87ceec6da..7b9729da09 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -17,7 +17,7 @@ import logging
 import re
 from typing import Any, Dict, Iterable, Optional, Set, Tuple
 
-from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.state import StateFilter
 from synapse.storage.databases.main.state_deltas import StateDeltasStore
@@ -360,7 +360,10 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         if hist_vis_id:
             hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True)
             if hist_vis_ev:
-                if hist_vis_ev.content.get("history_visibility") == "world_readable":
+                if (
+                    hist_vis_ev.content.get("history_visibility")
+                    == HistoryVisibility.WORLD_READABLE
+                ):
                     return True
 
         return False
@@ -393,9 +396,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                     sql = """
                         INSERT INTO user_directory_search(user_id, vector)
                         VALUES (?,
-                            setweight(to_tsvector('english', ?), 'A')
-                            || setweight(to_tsvector('english', ?), 'D')
-                            || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                            setweight(to_tsvector('simple', ?), 'A')
+                            || setweight(to_tsvector('simple', ?), 'D')
+                            || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
                         ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
                     """
                     txn.execute(
@@ -415,9 +418,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                         sql = """
                             INSERT INTO user_directory_search(user_id, vector)
                             VALUES (?,
-                                setweight(to_tsvector('english', ?), 'A')
-                                || setweight(to_tsvector('english', ?), 'D')
-                                || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                                setweight(to_tsvector('simple', ?), 'A')
+                                || setweight(to_tsvector('simple', ?), 'D')
+                                || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
                             )
                         """
                         txn.execute(
@@ -432,9 +435,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                     elif new_entry is False:
                         sql = """
                             UPDATE user_directory_search
-                            SET vector = setweight(to_tsvector('english', ?), 'A')
-                                || setweight(to_tsvector('english', ?), 'D')
-                                || setweight(to_tsvector('english', COALESCE(?, '')), 'B')
+                            SET vector = setweight(to_tsvector('simple', ?), 'A')
+                                || setweight(to_tsvector('simple', ?), 'D')
+                                || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
                             WHERE user_id = ?
                         """
                         txn.execute(
@@ -537,7 +540,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             desc="get_user_in_directory",
         )
 
-    async def update_user_directory_stream_pos(self, stream_id: str) -> None:
+    async def update_user_directory_stream_pos(self, stream_id: int) -> None:
         await self.db_pool.simple_update_one(
             table="user_directory_stream_pos",
             keyvalues={},
@@ -761,7 +764,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
                 INNER JOIN user_directory AS d USING (user_id)
                 WHERE
                     %s
-                    AND vector @@ to_tsquery('english', ?)
+                    AND vector @@ to_tsquery('simple', ?)
                 ORDER BY
                     (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
                     * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
@@ -770,13 +773,13 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
                         3 * ts_rank_cd(
                             '{0.1, 0.1, 0.9, 1.0}',
                             vector,
-                            to_tsquery('english', ?),
+                            to_tsquery('simple', ?),
                             8
                         )
                         + ts_rank_cd(
                             '{0.1, 0.1, 0.9, 1.0}',
                             vector,
-                            to_tsquery('english', ?),
+                            to_tsquery('simple', ?),
                             8
                         )
                     )
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 0e31cc811a..89cdc84a9c 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -565,11 +565,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
             )
 
         logger.info("[purge] removing redundant state groups")
-        txn.executemany(
+        txn.execute_batch(
             "DELETE FROM state_groups_state WHERE state_group = ?",
             ((sg,) for sg in state_groups_to_delete),
         )
-        txn.executemany(
+        txn.execute_batch(
             "DELETE FROM state_groups WHERE id = ?",
             ((sg,) for sg in state_groups_to_delete),
         )
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index afd10f7bae..c03871f393 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -17,11 +17,12 @@
 import logging
 
 import attr
+from signedjson.types import VerifyKey
 
 logger = logging.getLogger(__name__)
 
 
 @attr.s(slots=True, frozen=True)
 class FetchKeyResult:
-    verify_key = attr.ib()  # VerifyKey: the key itself
-    valid_until_ts = attr.ib()  # int: how long we can use this key for
+    verify_key = attr.ib(type=VerifyKey)  # the key itself
+    valid_until_ts = attr.ib(type=int)  # how long we can use this key for
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 70e636b0ba..61fc49c69c 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -31,7 +31,14 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases import Databases
 from synapse.storage.databases.main.events import DeltaState
-from synapse.types import Collection, PersistedEventPosition, RoomStreamToken, StateMap
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
+from synapse.types import (
+    Collection,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StateMap,
+    get_domain_from_id,
+)
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.metrics import Measure
 
@@ -68,6 +75,21 @@ stale_forward_extremities_counter = Histogram(
     buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
 )
 
+state_resolutions_during_persistence = Counter(
+    "synapse_storage_events_state_resolutions_during_persistence",
+    "Number of times we had to do state res to calculate new current state",
+)
+
+potential_times_prune_extremities = Counter(
+    "synapse_storage_events_potential_times_prune_extremities",
+    "Number of times we might be able to prune extremities",
+)
+
+times_pruned_extremities = Counter(
+    "synapse_storage_events_times_pruned_extremities",
+    "Number of times we were actually be able to prune extremities",
+)
+
 
 class _EventPeristenceQueue:
     """Queues up events so that they can be persisted in bulk with only one
@@ -454,7 +476,15 @@ class EventsPersistenceStorage:
                                 latest_event_ids,
                                 new_latest_event_ids,
                             )
-                            current_state, delta_ids = res
+                            current_state, delta_ids, new_latest_event_ids = res
+
+                            # there should always be at least one forward extremity.
+                            # (except during the initial persistence of the send_join
+                            # results, in which case there will be no existing
+                            # extremities, so we'll `continue` above and skip this bit.)
+                            assert new_latest_event_ids, "No forward extremities left!"
+
+                            new_forward_extremeties[room_id] = new_latest_event_ids
 
                         # If either are not None then there has been a change,
                         # and we need to work out the delta (or use that
@@ -573,29 +603,35 @@ class EventsPersistenceStorage:
         self,
         room_id: str,
         events_context: List[Tuple[EventBase, EventContext]],
-        old_latest_event_ids: Iterable[str],
-        new_latest_event_ids: Iterable[str],
-    ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]]]:
+        old_latest_event_ids: Set[str],
+        new_latest_event_ids: Set[str],
+    ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]:
         """Calculate the current state dict after adding some new events to
         a room
 
         Args:
-            room_id (str):
+            room_id:
                 room to which the events are being added. Used for logging etc
 
-            events_context (list[(EventBase, EventContext)]):
+            events_context:
                 events and contexts which are being added to the room
 
-            old_latest_event_ids (iterable[str]):
+            old_latest_event_ids:
                 the old forward extremities for the room.
 
-            new_latest_event_ids (iterable[str]):
+            new_latest_event_ids :
                 the new forward extremities for the room.
 
         Returns:
-            Returns a tuple of two state maps, the first being the full new current
-            state and the second being the delta to the existing current state.
-            If both are None then there has been no change.
+            Returns a tuple of two state maps and a set of new forward
+            extremities.
+
+            The first state map is the full new current state and the second
+            is the delta to the existing current state. If both are None then
+            there has been no change.
+
+            The function may prune some old entries from the set of new
+            forward extremities if it's safe to do so.
 
             If there has been a change then we only return the delta if its
             already been calculated. Conversely if we do know the delta then
@@ -672,7 +708,7 @@ class EventsPersistenceStorage:
         # If they old and new groups are the same then we don't need to do
         # anything.
         if old_state_groups == new_state_groups:
-            return None, None
+            return None, None, new_latest_event_ids
 
         if len(new_state_groups) == 1 and len(old_state_groups) == 1:
             # If we're going from one state group to another, lets check if
@@ -689,7 +725,7 @@ class EventsPersistenceStorage:
                 # the current state in memory then lets also return that,
                 # but it doesn't matter if we don't.
                 new_state = state_groups_map.get(new_state_group)
-                return new_state, delta_ids
+                return new_state, delta_ids, new_latest_event_ids
 
         # Now that we have calculated new_state_groups we need to get
         # their state IDs so we can resolve to a single state set.
@@ -701,7 +737,7 @@ class EventsPersistenceStorage:
         if len(new_state_groups) == 1:
             # If there is only one state group, then we know what the current
             # state is.
-            return state_groups_map[new_state_groups.pop()], None
+            return state_groups_map[new_state_groups.pop()], None, new_latest_event_ids
 
         # Ok, we need to defer to the state handler to resolve our state sets.
 
@@ -734,7 +770,139 @@ class EventsPersistenceStorage:
             state_res_store=StateResolutionStore(self.main_store),
         )
 
-        return res.state, None
+        state_resolutions_during_persistence.inc()
+
+        # If the returned state matches the state group of one of the new
+        # forward extremities then we check if we are able to prune some state
+        # extremities.
+        if res.state_group and res.state_group in new_state_groups:
+            new_latest_event_ids = await self._prune_extremities(
+                room_id,
+                new_latest_event_ids,
+                res.state_group,
+                event_id_to_state_group,
+                events_context,
+            )
+
+        return res.state, None, new_latest_event_ids
+
+    async def _prune_extremities(
+        self,
+        room_id: str,
+        new_latest_event_ids: Set[str],
+        resolved_state_group: int,
+        event_id_to_state_group: Dict[str, int],
+        events_context: List[Tuple[EventBase, EventContext]],
+    ) -> Set[str]:
+        """See if we can prune any of the extremities after calculating the
+        resolved state.
+        """
+        potential_times_prune_extremities.inc()
+
+        # We keep all the extremities that have the same state group, and
+        # see if we can drop the others.
+        new_new_extrems = {
+            e
+            for e in new_latest_event_ids
+            if event_id_to_state_group[e] == resolved_state_group
+        }
+
+        dropped_extrems = set(new_latest_event_ids) - new_new_extrems
+
+        logger.debug("Might drop extremities: %s", dropped_extrems)
+
+        # We only drop events from the extremities list if:
+        #   1. we're not currently persisting them;
+        #   2. they're not our own events (or are dummy events); and
+        #   3. they're either:
+        #       1. over N hours old and more than N events ago (we use depth to
+        #          calculate); or
+        #       2. we are persisting an event from the same domain and more than
+        #          M events ago.
+        #
+        # The idea is that we don't want to drop events that are "legitimate"
+        # extremities (that we would want to include as prev events), only
+        # "stuck" extremities that are e.g. due to a gap in the graph.
+        #
+        # Note that we either drop all of them or none of them. If we only drop
+        # some of the events we don't know if state res would come to the same
+        # conclusion.
+
+        for ev, _ in events_context:
+            if ev.event_id in dropped_extrems:
+                logger.debug(
+                    "Not dropping extremities: %s is being persisted", ev.event_id
+                )
+                return new_latest_event_ids
+
+        dropped_events = await self.main_store.get_events(
+            dropped_extrems,
+            allow_rejected=True,
+            redact_behaviour=EventRedactBehaviour.AS_IS,
+        )
+
+        new_senders = {get_domain_from_id(e.sender) for e, _ in events_context}
+
+        one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
+        current_depth = max(e.depth for e, _ in events_context)
+        for event in dropped_events.values():
+            # If the event is a local dummy event then we should check it
+            # doesn't reference any local events, as we want to reference those
+            # if we send any new events.
+            #
+            # Note we do this recursively to handle the case where a dummy event
+            # references a dummy event that only references remote events.
+            #
+            # Ideally we'd figure out a way of still being able to drop old
+            # dummy events that reference local events, but this is good enough
+            # as a first cut.
+            events_to_check = [event]
+            while events_to_check:
+                new_events = set()
+                for event_to_check in events_to_check:
+                    if self.is_mine_id(event_to_check.sender):
+                        if event_to_check.type != EventTypes.Dummy:
+                            logger.debug("Not dropping own event")
+                            return new_latest_event_ids
+                        new_events.update(event_to_check.prev_event_ids())
+
+                prev_events = await self.main_store.get_events(
+                    new_events,
+                    allow_rejected=True,
+                    redact_behaviour=EventRedactBehaviour.AS_IS,
+                )
+                events_to_check = prev_events.values()
+
+            if (
+                event.origin_server_ts < one_day_ago
+                and event.depth < current_depth - 100
+            ):
+                continue
+
+            # We can be less conservative about dropping extremities from the
+            # same domain, though we do want to wait a little bit (otherwise
+            # we'll immediately remove all extremities from a given server).
+            if (
+                get_domain_from_id(event.sender) in new_senders
+                and event.depth < current_depth - 20
+            ):
+                continue
+
+            logger.debug(
+                "Not dropping as too new and not in new_senders: %s", new_senders,
+            )
+
+            return new_latest_event_ids
+
+        times_pruned_extremities.inc()
+
+        logger.info(
+            "Pruning forward extremities in room %s: from %s -> %s",
+            room_id,
+            new_latest_event_ids,
+            new_new_extrems,
+        )
+        return new_new_extrems
 
     async def _calculate_state_delta(
         self, room_id: str, current_state: StateMap[str]
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 459754feab..566ea19bae 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -18,9 +18,10 @@ import logging
 import os
 import re
 from collections import Counter
-from typing import Optional, TextIO
+from typing import Generator, Iterable, List, Optional, TextIO, Tuple
 
 import attr
+from typing_extensions import Counter as CounterType
 
 from synapse.config.homeserver import HomeServerConfig
 from synapse.storage.database import LoggingDatabaseConnection
@@ -34,10 +35,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-# XXX: If you're about to bump this to 59 (or higher) please create an update
-# that drops the unused `cache_invalidation_stream` table, as per #7436!
-# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
-SCHEMA_VERSION = 58
+SCHEMA_VERSION = 59
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
@@ -70,7 +68,7 @@ def prepare_database(
     db_conn: LoggingDatabaseConnection,
     database_engine: BaseDatabaseEngine,
     config: Optional[HomeServerConfig],
-    databases: Collection[str] = ["main", "state"],
+    databases: Collection[str] = ("main", "state"),
 ):
     """Prepares a physical database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
@@ -155,7 +153,9 @@ def prepare_database(
         raise
 
 
-def _setup_new_database(cur, database_engine, databases):
+def _setup_new_database(
+    cur: Cursor, database_engine: BaseDatabaseEngine, databases: Collection[str]
+) -> None:
     """Sets up the physical database by finding a base set of "full schemas" and
     then applying any necessary deltas, including schemas from the given data
     stores.
@@ -188,10 +188,9 @@ def _setup_new_database(cur, database_engine, databases):
     folder as well those in the data stores specified.
 
     Args:
-        cur (Cursor): a database cursor
-        database_engine (DatabaseEngine)
-        databases (list[str]): The names of the databases to instantiate
-            on the given physical database.
+        cur: a database cursor
+        database_engine
+        databases: The names of the databases to instantiate on the given physical database.
     """
 
     # We're about to set up a brand new database so we check that its
@@ -199,12 +198,11 @@ def _setup_new_database(cur, database_engine, databases):
     database_engine.check_new_database(cur)
 
     current_dir = os.path.join(dir_path, "schema", "full_schemas")
-    directory_entries = os.listdir(current_dir)
 
     # First we find the highest full schema version we have
     valid_versions = []
 
-    for filename in directory_entries:
+    for filename in os.listdir(current_dir):
         try:
             ver = int(filename)
         except ValueError:
@@ -237,7 +235,7 @@ def _setup_new_database(cur, database_engine, databases):
         for database in databases
     )
 
-    directory_entries = []
+    directory_entries = []  # type: List[_DirectoryListing]
     for directory in directories:
         directory_entries.extend(
             _DirectoryListing(file_name, os.path.join(directory, file_name))
@@ -275,15 +273,15 @@ def _setup_new_database(cur, database_engine, databases):
 
 
 def _upgrade_existing_database(
-    cur,
-    current_version,
-    applied_delta_files,
-    upgraded,
-    database_engine,
-    config,
-    databases,
-    is_empty=False,
-):
+    cur: Cursor,
+    current_version: int,
+    applied_delta_files: List[str],
+    upgraded: bool,
+    database_engine: BaseDatabaseEngine,
+    config: Optional[HomeServerConfig],
+    databases: Collection[str],
+    is_empty: bool = False,
+) -> None:
     """Upgrades an existing physical database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -323,21 +321,20 @@ def _upgrade_existing_database(
     for a version before applying those in the next version.
 
     Args:
-        cur (Cursor)
-        current_version (int): The current version of the schema.
-        applied_delta_files (list): A list of deltas that have already been
-            applied.
-        upgraded (bool): Whether the current version was generated by having
+        cur
+        current_version: The current version of the schema.
+        applied_delta_files: A list of deltas that have already been applied.
+        upgraded: Whether the current version was generated by having
             applied deltas or from full schema file. If `True` the function
             will never apply delta files for the given `current_version`, since
             the current_version wasn't generated by applying those delta files.
-        database_engine (DatabaseEngine)
-        config (synapse.config.homeserver.HomeServerConfig|None):
+        database_engine
+        config:
             None if we are initialising a blank database, otherwise the application
             config
-        databases (list[str]): The names of the databases to instantiate
+        databases: The names of the databases to instantiate
             on the given physical database.
-        is_empty (bool): Is this a blank database? I.e. do we need to run the
+        is_empty: Is this a blank database? I.e. do we need to run the
             upgrade portions of the delta scripts.
     """
     if is_empty:
@@ -358,6 +355,7 @@ def _upgrade_existing_database(
     if not is_empty and "main" in databases:
         from synapse.storage.databases.main import check_database_before_upgrade
 
+        assert config is not None
         check_database_before_upgrade(cur, database_engine, config)
 
     start_ver = current_version
@@ -374,7 +372,16 @@ def _upgrade_existing_database(
     specific_engine_extensions = (".sqlite", ".postgres")
 
     for v in range(start_ver, SCHEMA_VERSION + 1):
-        logger.info("Applying schema deltas for v%d", v)
+        if not is_worker:
+            logger.info("Applying schema deltas for v%d", v)
+
+            cur.execute("DELETE FROM schema_version")
+            cur.execute(
+                "INSERT INTO schema_version (version, upgraded) VALUES (?,?)",
+                (v, True),
+            )
+        else:
+            logger.info("Checking schema deltas for v%d", v)
 
         # We need to search both the global and per data store schema
         # directories for schema updates.
@@ -388,10 +395,10 @@ def _upgrade_existing_database(
             )
 
         # Used to check if we have any duplicate file names
-        file_name_counter = Counter()
+        file_name_counter = Counter()  # type: CounterType[str]
 
         # Now find which directories have anything of interest.
-        directory_entries = []
+        directory_entries = []  # type: List[_DirectoryListing]
         for directory in directories:
             logger.debug("Looking for schema deltas in %s", directory)
             try:
@@ -445,11 +452,11 @@ def _upgrade_existing_database(
 
                 module_name = "synapse.storage.v%d_%s" % (v, root_name)
                 with open(absolute_path) as python_file:
-                    module = imp.load_source(module_name, absolute_path, python_file)
+                    module = imp.load_source(module_name, absolute_path, python_file)  # type: ignore
                 logger.info("Running script %s", relative_path)
-                module.run_create(cur, database_engine)
+                module.run_create(cur, database_engine)  # type: ignore
                 if not is_empty:
-                    module.run_upgrade(cur, database_engine, config=config)
+                    module.run_upgrade(cur, database_engine, config=config)  # type: ignore
             elif ext == ".pyc" or file_name == "__pycache__":
                 # Sometimes .pyc files turn up anyway even though we've
                 # disabled their generation; e.g. from distribution package
@@ -488,23 +495,18 @@ def _upgrade_existing_database(
                 (v, relative_path),
             )
 
-            cur.execute("DELETE FROM schema_version")
-            cur.execute(
-                "INSERT INTO schema_version (version, upgraded) VALUES (?,?)",
-                (v, True),
-            )
-
     logger.info("Schema now up to date")
 
 
-def _apply_module_schemas(txn, database_engine, config):
+def _apply_module_schemas(
+    txn: Cursor, database_engine: BaseDatabaseEngine, config: HomeServerConfig
+) -> None:
     """Apply the module schemas for the dynamic modules, if any
 
     Args:
         cur: database cursor
-        database_engine: synapse database engine class
-        config (synapse.config.homeserver.HomeServerConfig):
-            application config
+        database_engine:
+        config: application config
     """
     for (mod, _config) in config.password_providers:
         if not hasattr(mod, "get_db_schema_files"):
@@ -515,15 +517,19 @@ def _apply_module_schemas(txn, database_engine, config):
         )
 
 
-def _apply_module_schema_files(cur, database_engine, modname, names_and_streams):
+def _apply_module_schema_files(
+    cur: Cursor,
+    database_engine: BaseDatabaseEngine,
+    modname: str,
+    names_and_streams: Iterable[Tuple[str, TextIO]],
+) -> None:
     """Apply the module schemas for a single module
 
     Args:
         cur: database cursor
         database_engine: synapse database engine class
-        modname (str): fully qualified name of the module
-        names_and_streams (Iterable[(str, file)]): the names and streams of
-            schemas to be applied
+        modname: fully qualified name of the module
+        names_and_streams: the names and streams of schemas to be applied
     """
     cur.execute(
         "SELECT file FROM applied_module_schemas WHERE module_name = ?", (modname,),
@@ -549,7 +555,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
         )
 
 
-def get_statements(f):
+def get_statements(f: Iterable[str]) -> Generator[str, None, None]:
     statement_buffer = ""
     in_comment = False  # If we're in a /* ... */ style comment
 
@@ -594,17 +600,19 @@ def get_statements(f):
         statement_buffer = statements[-1].strip()
 
 
-def executescript(txn, schema_path):
+def executescript(txn: Cursor, schema_path: str) -> None:
     with open(schema_path, "r") as f:
         execute_statements_from_stream(txn, f)
 
 
-def execute_statements_from_stream(cur: Cursor, f: TextIO):
+def execute_statements_from_stream(cur: Cursor, f: TextIO) -> None:
     for statement in get_statements(f):
         cur.execute(statement)
 
 
-def _get_or_create_schema_state(txn, database_engine):
+def _get_or_create_schema_state(
+    txn: Cursor, database_engine: BaseDatabaseEngine
+) -> Optional[Tuple[int, List[str], bool]]:
     # Bluntly try creating the schema_version tables.
     schema_path = os.path.join(dir_path, "schema", "schema_version.sql")
     executescript(txn, schema_path)
@@ -612,7 +620,6 @@ def _get_or_create_schema_state(txn, database_engine):
     txn.execute("SELECT version, upgraded FROM schema_version")
     row = txn.fetchone()
     current_version = int(row[0]) if row else None
-    upgraded = bool(row[1]) if row else None
 
     if current_version:
         txn.execute(
@@ -620,6 +627,7 @@ def _get_or_create_schema_state(txn, database_engine):
             (current_version,),
         )
         applied_deltas = [d for d, in txn]
+        upgraded = bool(row[1])
         return current_version, applied_deltas, upgraded
 
     return None
@@ -634,5 +642,5 @@ class _DirectoryListing:
     `file_name` attr is kept first.
     """
 
-    file_name = attr.ib()
-    absolute_path = attr.ib()
+    file_name = attr.ib(type=str)
+    absolute_path = attr.ib(type=str)
diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py
index bfa0a9fd06..6c359c1aae 100644
--- a/synapse/storage/purge_events.py
+++ b/synapse/storage/purge_events.py
@@ -15,7 +15,12 @@
 
 import itertools
 import logging
-from typing import Set
+from typing import TYPE_CHECKING, Set
+
+from synapse.storage.databases import Databases
+
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
 
 logger = logging.getLogger(__name__)
 
@@ -24,10 +29,10 @@ class PurgeEventsStorage:
     """High level interface for purging rooms and event history.
     """
 
-    def __init__(self, hs, stores):
+    def __init__(self, hs: "HomeServer", stores: Databases):
         self.stores = stores
 
-    async def purge_room(self, room_id: str):
+    async def purge_room(self, room_id: str) -> None:
         """Deletes all record of a room
         """
 
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index cec96ad6a7..2564f34b47 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -14,10 +14,12 @@
 # limitations under the License.
 
 import logging
+from typing import Any, Dict, List, Optional, Tuple
 
 import attr
 
 from synapse.api.errors import SynapseError
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
@@ -27,18 +29,18 @@ class PaginationChunk:
     """Returned by relation pagination APIs.
 
     Attributes:
-        chunk (list): The rows returned by pagination
-        next_batch (Any|None): Token to fetch next set of results with, if
+        chunk: The rows returned by pagination
+        next_batch: Token to fetch next set of results with, if
             None then there are no more results.
-        prev_batch (Any|None): Token to fetch previous set of results with, if
+        prev_batch: Token to fetch previous set of results with, if
             None then there are no previous results.
     """
 
-    chunk = attr.ib()
-    next_batch = attr.ib(default=None)
-    prev_batch = attr.ib(default=None)
+    chunk = attr.ib(type=List[JsonDict])
+    next_batch = attr.ib(type=Optional[Any], default=None)
+    prev_batch = attr.ib(type=Optional[Any], default=None)
 
-    def to_dict(self):
+    def to_dict(self) -> Dict[str, Any]:
         d = {"chunk": self.chunk}
 
         if self.next_batch:
@@ -59,25 +61,25 @@ class RelationPaginationToken:
     boundaries of the chunk as pagination tokens.
 
     Attributes:
-        topological (int): The topological ordering of the boundary event
-        stream (int): The stream ordering of the boundary event.
+        topological: The topological ordering of the boundary event
+        stream: The stream ordering of the boundary event.
     """
 
-    topological = attr.ib()
-    stream = attr.ib()
+    topological = attr.ib(type=int)
+    stream = attr.ib(type=int)
 
     @staticmethod
-    def from_string(string):
+    def from_string(string: str) -> "RelationPaginationToken":
         try:
             t, s = string.split("-")
             return RelationPaginationToken(int(t), int(s))
         except ValueError:
             raise SynapseError(400, "Invalid token")
 
-    def to_string(self):
+    def to_string(self) -> str:
         return "%d-%d" % (self.topological, self.stream)
 
-    def as_tuple(self):
+    def as_tuple(self) -> Tuple[Any, ...]:
         return attr.astuple(self)
 
 
@@ -89,23 +91,23 @@ class AggregationPaginationToken:
     aggregation groups, we can just use them as our pagination token.
 
     Attributes:
-        count (int): The count of relations in the boundar group.
-        stream (int): The MAX stream ordering in the boundary group.
+        count: The count of relations in the boundary group.
+        stream: The MAX stream ordering in the boundary group.
     """
 
-    count = attr.ib()
-    stream = attr.ib()
+    count = attr.ib(type=int)
+    stream = attr.ib(type=int)
 
     @staticmethod
-    def from_string(string):
+    def from_string(string: str) -> "AggregationPaginationToken":
         try:
             c, s = string.split("-")
             return AggregationPaginationToken(int(c), int(s))
         except ValueError:
             raise SynapseError(400, "Invalid token")
 
-    def to_string(self):
+    def to_string(self) -> str:
         return "%d-%d" % (self.count, self.stream)
 
-    def as_tuple(self):
+    def as_tuple(self) -> Tuple[Any, ...]:
         return attr.astuple(self)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 08a69f2f96..31ccbf23dc 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -12,9 +12,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
-from typing import Awaitable, Dict, Iterable, List, Optional, Set, Tuple, TypeVar
+from typing import (
+    TYPE_CHECKING,
+    Awaitable,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    TypeVar,
+)
 
 import attr
 
@@ -22,6 +31,10 @@ from synapse.api.constants import EventTypes
 from synapse.events import EventBase
 from synapse.types import MutableStateMap, StateMap
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+    from synapse.storage.databases import Databases
+
 logger = logging.getLogger(__name__)
 
 # Used for generic functions below
@@ -330,10 +343,12 @@ class StateGroupStorage:
     """High level interface to fetching state for event.
     """
 
-    def __init__(self, hs, stores):
+    def __init__(self, hs: "HomeServer", stores: "Databases"):
         self.stores = stores
 
-    async def get_state_group_delta(self, state_group: int):
+    async def get_state_group_delta(
+        self, state_group: int
+    ) -> Tuple[Optional[int], Optional[StateMap[str]]]:
         """Given a state group try to return a previous group and a delta between
         the old and the new.
 
@@ -341,8 +356,8 @@ class StateGroupStorage:
             state_group: The state group used to retrieve state deltas.
 
         Returns:
-            Tuple[Optional[int], Optional[StateMap[str]]]:
-                (prev_group, delta_ids)
+            A tuple of the previous group and a state map of the event IDs which
+            make up the delta between the old and new state groups.
         """
 
         return await self.stores.state.get_state_group_delta(state_group)
@@ -436,7 +451,7 @@ class StateGroupStorage:
 
     async def get_state_for_events(
         self, event_ids: List[str], state_filter: StateFilter = StateFilter.all()
-    ):
+    ) -> Dict[str, StateMap[EventBase]]:
         """Given a list of event_ids and type tuples, return a list of state
         dicts for each event.
 
@@ -472,7 +487,7 @@ class StateGroupStorage:
 
     async def get_state_ids_for_events(
         self, event_ids: List[str], state_filter: StateFilter = StateFilter.all()
-    ):
+    ) -> Dict[str, StateMap[str]]:
         """
         Get the state dicts corresponding to a list of events, containing the event_ids
         of the state events (as opposed to the events themselves)
@@ -500,7 +515,7 @@ class StateGroupStorage:
 
     async def get_state_for_event(
         self, event_id: str, state_filter: StateFilter = StateFilter.all()
-    ):
+    ) -> StateMap[EventBase]:
         """
         Get the state dict corresponding to a particular event
 
@@ -516,7 +531,7 @@ class StateGroupStorage:
 
     async def get_state_ids_for_event(
         self, event_id: str, state_filter: StateFilter = StateFilter.all()
-    ):
+    ) -> StateMap[str]:
         """
         Get the state dict corresponding to a particular event
 
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 02d71302ea..71ef5a72dc 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -15,12 +15,11 @@
 import heapq
 import logging
 import threading
-from collections import deque
+from collections import OrderedDict
 from contextlib import contextmanager
-from typing import Dict, List, Optional, Set, Union
+from typing import Dict, List, Optional, Set, Tuple, Union
 
 import attr
-from typing_extensions import Deque
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.database import DatabasePool, LoggingTransaction
@@ -101,7 +100,13 @@ class StreamIdGenerator:
             self._current = (max if step > 0 else min)(
                 self._current, _load_current_id(db_conn, table, column, step)
             )
-        self._unfinished_ids = deque()  # type: Deque[int]
+
+        # We use this as an ordered set, as we want to efficiently append items,
+        # remove items and get the first item. Since we insert IDs in order, the
+        # insertion ordering will ensure its in the correct ordering.
+        #
+        # The key and values are the same, but we never look at the values.
+        self._unfinished_ids = OrderedDict()  # type: OrderedDict[int, int]
 
     def get_next(self):
         """
@@ -113,7 +118,7 @@ class StreamIdGenerator:
             self._current += self._step
             next_id = self._current
 
-            self._unfinished_ids.append(next_id)
+            self._unfinished_ids[next_id] = next_id
 
         @contextmanager
         def manager():
@@ -121,7 +126,7 @@ class StreamIdGenerator:
                 yield next_id
             finally:
                 with self._lock:
-                    self._unfinished_ids.remove(next_id)
+                    self._unfinished_ids.pop(next_id)
 
         return _AsyncCtxManagerWrapper(manager())
 
@@ -140,7 +145,7 @@ class StreamIdGenerator:
             self._current += n * self._step
 
             for next_id in next_ids:
-                self._unfinished_ids.append(next_id)
+                self._unfinished_ids[next_id] = next_id
 
         @contextmanager
         def manager():
@@ -149,20 +154,20 @@ class StreamIdGenerator:
             finally:
                 with self._lock:
                     for next_id in next_ids:
-                        self._unfinished_ids.remove(next_id)
+                        self._unfinished_ids.pop(next_id)
 
         return _AsyncCtxManagerWrapper(manager())
 
-    def get_current_token(self):
+    def get_current_token(self) -> int:
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
 
         Returns:
-            int
+            The maximum stream id.
         """
         with self._lock:
             if self._unfinished_ids:
-                return self._unfinished_ids[0] - self._step
+                return next(iter(self._unfinished_ids)) - self._step
 
             return self._current
 
@@ -186,11 +191,12 @@ class MultiWriterIdGenerator:
     Args:
         db_conn
         db
-        stream_name: A name for the stream.
+        stream_name: A name for the stream, for use in the `stream_positions`
+            table. (Does not need to be the same as the replication stream name)
         instance_name: The name of this instance.
-        table: Database table associated with stream.
-        instance_column: Column that stores the row's writer's instance name
-        id_column: Column that stores the stream ID.
+        tables: List of tables associated with the stream. Tuple of table
+            name, column name that stores the writer's instance name, and
+            column name that stores the stream ID.
         sequence_name: The name of the postgres sequence used to generate new
             IDs.
         writers: A list of known writers to use to populate current positions
@@ -206,9 +212,7 @@ class MultiWriterIdGenerator:
         db: DatabasePool,
         stream_name: str,
         instance_name: str,
-        table: str,
-        instance_column: str,
-        id_column: str,
+        tables: List[Tuple[str, str, str]],
         sequence_name: str,
         writers: List[str],
         positive: bool = True,
@@ -260,15 +264,20 @@ class MultiWriterIdGenerator:
         self._sequence_gen = PostgresSequenceGenerator(sequence_name)
 
         # We check that the table and sequence haven't diverged.
-        self._sequence_gen.check_consistency(
-            db_conn, table=table, id_column=id_column, positive=positive
-        )
+        for table, _, id_column in tables:
+            self._sequence_gen.check_consistency(
+                db_conn,
+                table=table,
+                id_column=id_column,
+                stream_name=stream_name,
+                positive=positive,
+            )
 
         # This goes and fills out the above state from the database.
-        self._load_current_ids(db_conn, table, instance_column, id_column)
+        self._load_current_ids(db_conn, tables)
 
     def _load_current_ids(
-        self, db_conn, table: str, instance_column: str, id_column: str
+        self, db_conn, tables: List[Tuple[str, str, str]],
     ):
         cur = db_conn.cursor(txn_name="_load_current_ids")
 
@@ -306,17 +315,22 @@ class MultiWriterIdGenerator:
             # We add a GREATEST here to ensure that the result is always
             # positive. (This can be a problem for e.g. backfill streams where
             # the server has never backfilled).
-            sql = """
-                SELECT GREATEST(COALESCE(%(agg)s(%(id)s), 1), 1)
-                FROM %(table)s
-            """ % {
-                "id": id_column,
-                "table": table,
-                "agg": "MAX" if self._positive else "-MIN",
-            }
-            cur.execute(sql)
-            (stream_id,) = cur.fetchone()
-            self._persisted_upto_position = stream_id
+            max_stream_id = 1
+            for table, _, id_column in tables:
+                sql = """
+                    SELECT GREATEST(COALESCE(%(agg)s(%(id)s), 1), 1)
+                    FROM %(table)s
+                """ % {
+                    "id": id_column,
+                    "table": table,
+                    "agg": "MAX" if self._positive else "-MIN",
+                }
+                cur.execute(sql)
+                (stream_id,) = cur.fetchone()
+
+                max_stream_id = max(max_stream_id, stream_id)
+
+            self._persisted_upto_position = max_stream_id
         else:
             # If we have a min_stream_id then we pull out everything greater
             # than it from the DB so that we can prefill
@@ -329,21 +343,28 @@ class MultiWriterIdGenerator:
             # stream positions table before restart (or the stream position
             # table otherwise got out of date).
 
-            sql = """
-                SELECT %(instance)s, %(id)s FROM %(table)s
-                WHERE ? %(cmp)s %(id)s
-            """ % {
-                "id": id_column,
-                "table": table,
-                "instance": instance_column,
-                "cmp": "<=" if self._positive else ">=",
-            }
-            cur.execute(sql, (min_stream_id * self._return_factor,))
-
             self._persisted_upto_position = min_stream_id
 
+            rows = []
+            for table, instance_column, id_column in tables:
+                sql = """
+                    SELECT %(instance)s, %(id)s FROM %(table)s
+                    WHERE ? %(cmp)s %(id)s
+                """ % {
+                    "id": id_column,
+                    "table": table,
+                    "instance": instance_column,
+                    "cmp": "<=" if self._positive else ">=",
+                }
+                cur.execute(sql, (min_stream_id * self._return_factor,))
+
+                rows.extend(cur)
+
+            # Sort so that we handle rows in order for each instance.
+            rows.sort()
+
             with self._lock:
-                for (instance, stream_id,) in cur:
+                for (instance, stream_id,) in rows:
                     stream_id = self._return_factor * stream_id
                     self._add_persisted_position(stream_id)
 
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 4386b6101e..0ec4dc2918 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -15,9 +15,8 @@
 import abc
 import logging
 import threading
-from typing import Callable, List, Optional
+from typing import TYPE_CHECKING, Callable, List, Optional
 
-from synapse.storage.database import LoggingDatabaseConnection
 from synapse.storage.engines import (
     BaseDatabaseEngine,
     IncorrectDatabaseSetup,
@@ -25,6 +24,9 @@ from synapse.storage.engines import (
 )
 from synapse.storage.types import Connection, Cursor
 
+if TYPE_CHECKING:
+    from synapse.storage.database import LoggingDatabaseConnection
+
 logger = logging.getLogger(__name__)
 
 
@@ -43,6 +45,21 @@ and run the following SQL:
 See docs/postgres.md for more information.
 """
 
+_INCONSISTENT_STREAM_ERROR = """
+Postgres sequence '%(seq)s' is inconsistent with associated stream position
+of '%(stream_name)s' in the 'stream_positions' table.
+
+This is likely a programming error and should be reported at
+https://github.com/matrix-org/synapse.
+
+A temporary workaround to fix this error is to shut down Synapse (including
+any and all workers) and run the following SQL:
+
+    DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s';
+
+This will need to be done every time the server is restarted.
+"""
+
 
 class SequenceGenerator(metaclass=abc.ABCMeta):
     """A class which generates a unique sequence of integers"""
@@ -53,19 +70,30 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
         ...
 
     @abc.abstractmethod
+    def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
+        """Get the next `n` IDs in the sequence"""
+        ...
+
+    @abc.abstractmethod
     def check_consistency(
         self,
-        db_conn: LoggingDatabaseConnection,
+        db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
         """Should be called during start up to test that the current value of
         the sequence is greater than or equal to the maximum ID in the table.
 
-        This is to handle various cases where the sequence value can get out
-        of sync with the table, e.g. if Synapse gets rolled back to a previous
+        This is to handle various cases where the sequence value can get out of
+        sync with the table, e.g. if Synapse gets rolled back to a previous
         version and the rolled forwards again.
+
+        If a stream name is given then this will check that any value in the
+        `stream_positions` table is less than or equal to the current sequence
+        value. If it isn't then it's likely that streams have been crossed
+        somewhere (e.g. two ID generators have the same stream name).
         """
         ...
 
@@ -88,11 +116,15 @@ class PostgresSequenceGenerator(SequenceGenerator):
 
     def check_consistency(
         self,
-        db_conn: LoggingDatabaseConnection,
+        db_conn: "LoggingDatabaseConnection",
         table: str,
         id_column: str,
+        stream_name: Optional[str] = None,
         positive: bool = True,
     ):
+        """See SequenceGenerator.check_consistency for docstring.
+        """
+
         txn = db_conn.cursor(txn_name="sequence.check_consistency")
 
         # First we get the current max ID from the table.
@@ -116,6 +148,18 @@ class PostgresSequenceGenerator(SequenceGenerator):
             "SELECT last_value, is_called FROM %(seq)s" % {"seq": self._sequence_name}
         )
         last_value, is_called = txn.fetchone()
+
+        # If we have an associated stream check the stream_positions table.
+        max_in_stream_positions = None
+        if stream_name:
+            txn.execute(
+                "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?",
+                (stream_name,),
+            )
+            row = txn.fetchone()
+            if row:
+                max_in_stream_positions = row[0]
+
         txn.close()
 
         # If `is_called` is False then `last_value` is actually the value that
@@ -136,6 +180,14 @@ class PostgresSequenceGenerator(SequenceGenerator):
                 % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql}
             )
 
+        # If we have values in the stream positions table then they have to be
+        # less than or equal to `last_value`
+        if max_in_stream_positions and max_in_stream_positions > last_value:
+            raise IncorrectDatabaseSetup(
+                _INCONSISTENT_STREAM_ERROR
+                % {"seq": self._sequence_name, "stream_name": stream_name}
+            )
+
 
 GetFirstCallbackType = Callable[[Cursor], int]
 
@@ -172,8 +224,24 @@ class LocalSequenceGenerator(SequenceGenerator):
             self._current_max_id += 1
             return self._current_max_id
 
+    def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]:
+        with self._lock:
+            if self._current_max_id is None:
+                assert self._callback is not None
+                self._current_max_id = self._callback(txn)
+                self._callback = None
+
+            first_id = self._current_max_id + 1
+            self._current_max_id += n
+            return [first_id + i for i in range(n)]
+
     def check_consistency(
-        self, db_conn: Connection, table: str, id_column: str, positive: bool = True
+        self,
+        db_conn: Connection,
+        table: str,
+        id_column: str,
+        stream_name: Optional[str] = None,
+        positive: bool = True,
     ):
         # There is nothing to do for in memory sequences
         pass