summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/background_updates.py16
-rw-r--r--synapse/storage/database.py18
-rw-r--r--synapse/storage/databases/main/appservice.py4
-rw-r--r--synapse/storage/databases/main/deviceinbox.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py142
-rw-r--r--synapse/storage/databases/main/event_push_actions.py2
-rw-r--r--synapse/storage/databases/main/events.py44
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py68
-rw-r--r--synapse/storage/databases/main/events_worker.py6
-rw-r--r--synapse/storage/databases/main/group_server.py6
-rw-r--r--synapse/storage/databases/main/lock.py15
-rw-r--r--synapse/storage/databases/main/metrics.py133
-rw-r--r--synapse/storage/databases/main/profile.py8
-rw-r--r--synapse/storage/databases/main/purge_events.py4
-rw-r--r--synapse/storage/databases/main/push_rule.py6
-rw-r--r--synapse/storage/databases/main/registration.py2
-rw-r--r--synapse/storage/databases/main/room.py104
-rw-r--r--synapse/storage/databases/main/roommember.py15
-rw-r--r--synapse/storage/databases/main/stats.py299
-rw-r--r--synapse/storage/databases/main/stream.py6
-rw-r--r--synapse/storage/databases/main/tags.py2
-rw-r--r--synapse/storage/databases/main/ui_auth.py4
-rw-r--r--synapse/storage/engines/_base.py6
-rw-r--r--synapse/storage/engines/postgres.py5
-rw-r--r--synapse/storage/engines/sqlite.py5
-rw-r--r--synapse/storage/persist_events.py16
-rw-r--r--synapse/storage/prepare_database.py8
-rw-r--r--synapse/storage/schema/__init__.py6
-rw-r--r--synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres11
-rw-r--r--synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres30
-rw-r--r--synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres23
-rw-r--r--synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql18
-rw-r--r--synapse/storage/schema/main/delta/61/03recreate_min_depth.py70
-rw-r--r--synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres34
-rw-r--r--synapse/storage/state.py4
-rw-r--r--synapse/storage/util/id_generators.py12
-rw-r--r--synapse/storage/util/sequence.py6
38 files changed, 728 insertions, 436 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 142787fdfd..82b31d24f1 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -92,14 +92,12 @@ class BackgroundUpdater:
         self.db_pool = database
 
         # if a background update is currently running, its name.
-        self._current_background_update = None  # type: Optional[str]
-
-        self._background_update_performance = (
-            {}
-        )  # type: Dict[str, BackgroundUpdatePerformance]
-        self._background_update_handlers = (
-            {}
-        )  # type: Dict[str, Callable[[JsonDict, int], Awaitable[int]]]
+        self._current_background_update: Optional[str] = None
+
+        self._background_update_performance: Dict[str, BackgroundUpdatePerformance] = {}
+        self._background_update_handlers: Dict[
+            str, Callable[[JsonDict, int], Awaitable[int]]
+        ] = {}
         self._all_done = False
 
     def start_doing_background_updates(self) -> None:
@@ -411,7 +409,7 @@ class BackgroundUpdater:
             c.execute(sql)
 
         if isinstance(self.db_pool.engine, engines.PostgresEngine):
-            runner = create_index_psql  # type: Optional[Callable[[Connection], None]]
+            runner: Optional[Callable[[Connection], None]] = create_index_psql
         elif psql_only:
             runner = None
         else:
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index d470cdacde..ccf9ac51ef 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -111,7 +111,7 @@ def make_conn(
     db_config: DatabaseConnectionConfig,
     engine: BaseDatabaseEngine,
     default_txn_name: str,
-) -> Connection:
+) -> "LoggingDatabaseConnection":
     """Make a new connection to the database and return it.
 
     Returns:
@@ -670,8 +670,8 @@ class DatabasePool:
         Returns:
             The result of func
         """
-        after_callbacks = []  # type: List[_CallbackListEntry]
-        exception_callbacks = []  # type: List[_CallbackListEntry]
+        after_callbacks: List[_CallbackListEntry] = []
+        exception_callbacks: List[_CallbackListEntry] = []
 
         if not current_context():
             logger.warning("Starting db txn '%s' from sentinel context", desc)
@@ -907,7 +907,7 @@ class DatabasePool:
         # The sort is to ensure that we don't rely on dictionary iteration
         # order.
         keys, vals = zip(
-            *[zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i]
+            *(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i)
         )
 
         for k in keys:
@@ -1090,7 +1090,7 @@ class DatabasePool:
                 return False
 
         # We didn't find any existing rows, so insert a new one
-        allvalues = {}  # type: Dict[str, Any]
+        allvalues: Dict[str, Any] = {}
         allvalues.update(keyvalues)
         allvalues.update(values)
         allvalues.update(insertion_values)
@@ -1121,7 +1121,7 @@ class DatabasePool:
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
         """
-        allvalues = {}  # type: Dict[str, Any]
+        allvalues: Dict[str, Any] = {}
         allvalues.update(keyvalues)
         allvalues.update(insertion_values or {})
 
@@ -1257,7 +1257,7 @@ class DatabasePool:
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
         """
-        allnames = []  # type: List[str]
+        allnames: List[str] = []
         allnames.extend(key_names)
         allnames.extend(value_names)
 
@@ -1566,7 +1566,7 @@ class DatabasePool:
         """
         keyvalues = keyvalues or {}
 
-        results = []  # type: List[Dict[str, Any]]
+        results: List[Dict[str, Any]] = []
 
         if not iterable:
             return results
@@ -1978,7 +1978,7 @@ class DatabasePool:
             raise ValueError("order_direction must be one of 'ASC' or 'DESC'.")
 
         where_clause = "WHERE " if filters or keyvalues or exclude_keyvalues else ""
-        arg_list = []  # type: List[Any]
+        arg_list: List[Any] = []
         if filters:
             where_clause += " AND ".join("%s LIKE ?" % (k,) for k in filters)
             arg_list += list(filters.values())
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 9f182c2a89..e2d1b758bd 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -48,9 +48,7 @@ def _make_exclusive_regex(
     ]
     if exclusive_user_regexes:
         exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
-        exclusive_user_pattern = re.compile(
-            exclusive_user_regex
-        )  # type: Optional[Pattern]
+        exclusive_user_pattern: Optional[Pattern] = re.compile(exclusive_user_regex)
     else:
         # We handle this case specially otherwise the constructed regex
         # will always match
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 50e7ddd735..c55508867d 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -203,9 +203,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_messages_for_device", delete_messages_for_device_txn
         )
 
-        log_kv(
-            {"message": "deleted {} messages for device".format(count), "count": count}
-        )
+        log_kv({"message": f"deleted {count} messages for device", "count": count})
 
         # Update the cache, ensuring that we only ever increase the value
         last_deleted_stream_id = self._last_device_delete_cache.get(
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 0e3dd4e9ca..78ae68ec68 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -247,7 +247,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
 
         txn.execute(sql, query_params)
 
-        result = {}  # type: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]
+        result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
         for (user_id, device_id, display_name, key_json) in txn:
             if include_deleted_devices:
                 deleted_devices.remove((user_id, device_id))
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..d39368c20e 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,8 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
 
+from prometheus_client import Gauge
+
 from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import RoomVersion
@@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 
+oldest_pdu_in_federation_staging = Gauge(
+    "synapse_federation_server_oldest_inbound_pdu_in_staging",
+    "The age in seconds since we received the oldest pdu in the federation staging area",
+)
+
+number_pdus_in_federation_queue = Gauge(
+    "synapse_federation_server_number_inbound_pdu_in_staging",
+    "The total number of events in the inbound federation staging",
+)
+
 logger = logging.getLogger(__name__)
 
 
@@ -50,9 +62,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             )
 
         # Cache of event ID to list of auth event IDs and their depths.
-        self._event_auth_cache = LruCache(
+        self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache(
             500000, "_event_auth_cache", size_callback=len
-        )  # type: LruCache[str, List[Tuple[str, int]]]
+        )
+
+        self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
 
     async def get_auth_chain(
         self, room_id: str, event_ids: Collection[str], include_given: bool = False
@@ -123,10 +137,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         initial_events = set(event_ids)
 
         # All the events that we've found that are reachable from the events.
-        seen_events = set()  # type: Set[str]
+        seen_events: Set[str] = set()
 
         # A map from chain ID to max sequence number of the given events.
-        event_chains = {}  # type: Dict[int, int]
+        event_chains: Dict[int, int] = {}
 
         sql = """
             SELECT event_id, chain_id, sequence_number
@@ -168,7 +182,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         """
 
         # A map from chain ID to max sequence number *reachable* from any event ID.
-        chains = {}  # type: Dict[int, int]
+        chains: Dict[int, int] = {}
 
         # Add all linked chains reachable from initial set of chains.
         for batch in batch_iter(event_chains, 1000):
@@ -339,14 +353,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         initial_events = set(state_sets[0]).union(*state_sets[1:])
 
         # Map from event_id -> (chain ID, seq no)
-        chain_info = {}  # type: Dict[str, Tuple[int, int]]
+        chain_info: Dict[str, Tuple[int, int]] = {}
 
         # Map from chain ID -> seq no -> event Id
-        chain_to_event = {}  # type: Dict[int, Dict[int, str]]
+        chain_to_event: Dict[int, Dict[int, str]] = {}
 
         # All the chains that we've found that are reachable from the state
         # sets.
-        seen_chains = set()  # type: Set[int]
+        seen_chains: Set[int] = set()
 
         sql = """
             SELECT event_id, chain_id, sequence_number
@@ -378,9 +392,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         # Corresponds to `state_sets`, except as a map from chain ID to max
         # sequence number reachable from the state set.
-        set_to_chain = []  # type: List[Dict[int, int]]
+        set_to_chain: List[Dict[int, int]] = []
         for state_set in state_sets:
-            chains = {}  # type: Dict[int, int]
+            chains: Dict[int, int] = {}
             set_to_chain.append(chains)
 
             for event_id in state_set:
@@ -432,7 +446,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         # Mapping from chain ID to the range of sequence numbers that should be
         # pulled from the database.
-        chain_to_gap = {}  # type: Dict[int, Tuple[int, int]]
+        chain_to_gap: Dict[int, Tuple[int, int]] = {}
 
         for chain_id in seen_chains:
             min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
@@ -541,7 +555,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         }
 
         # The sorted list of events whose auth chains we should walk.
-        search = []  # type: List[Tuple[int, str]]
+        search: List[Tuple[int, str]] = []
 
         # We need to get the depth of the initial events for sorting purposes.
         sql = """
@@ -564,7 +578,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         search.sort()
 
         # Map from event to its auth events
-        event_to_auth_events = {}  # type: Dict[str, Set[str]]
+        event_to_auth_events: Dict[str, Set[str]] = {}
 
         base_sql = """
             SELECT a.event_id, auth_id, depth
@@ -1075,16 +1089,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         self,
         origin: str,
         event_id: str,
-    ) -> None:
-        """Remove the given event from the staging area"""
-        await self.db_pool.simple_delete(
-            table="federation_inbound_events_staging",
-            keyvalues={
-                "origin": origin,
-                "event_id": event_id,
-            },
-            desc="remove_received_event_from_staging",
-        )
+    ) -> Optional[int]:
+        """Remove the given event from the staging area.
+
+        Returns:
+            The received_ts of the row that was deleted, if any.
+        """
+        if self.db_pool.engine.supports_returning:
+
+            def _remove_received_event_from_staging_txn(txn):
+                sql = """
+                    DELETE FROM federation_inbound_events_staging
+                    WHERE origin = ? AND event_id = ?
+                    RETURNING received_ts
+                """
+
+                txn.execute(sql, (origin, event_id))
+                return txn.fetchone()
+
+            row = await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+                db_autocommit=True,
+            )
+            if row is None:
+                return None
+
+            return row[0]
+
+        else:
+
+            def _remove_received_event_from_staging_txn(txn):
+                received_ts = self.db_pool.simple_select_one_onecol_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                    retcol="received_ts",
+                    allow_none=True,
+                )
+                self.db_pool.simple_delete_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                )
+
+                return received_ts
+
+            return await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+            )
 
     async def get_next_staged_event_id_for_room(
         self,
@@ -1147,6 +1207,42 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return origin, event
 
+    async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
+        """Get the room IDs of all events currently staged."""
+        return await self.db_pool.simple_select_onecol(
+            table="federation_inbound_events_staging",
+            keyvalues={},
+            retcol="DISTINCT room_id",
+            desc="get_all_rooms_with_staged_incoming_events",
+        )
+
+    @wrap_as_background_process("_get_stats_for_federation_staging")
+    async def _get_stats_for_federation_staging(self):
+        """Update the prometheus metrics for the inbound federation staging area."""
+
+        def _get_stats_for_federation_staging_txn(txn):
+            txn.execute(
+                "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
+            )
+            (count,) = txn.fetchone()
+
+            txn.execute(
+                "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
+            )
+
+            (received_ts,) = txn.fetchone()
+
+            age = self._clock.time_msec() - received_ts
+
+            return count, age
+
+        count, age = await self.db_pool.runInteraction(
+            "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
+        )
+
+        number_pdus_in_federation_queue.set(count)
+        oldest_pdu_in_federation_staging.set(age)
+
 
 class EventFederationStore(EventFederationWorkerStore):
     """Responsible for storing and serving up the various graphs associated
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index d1237c65cc..55caa6bbe7 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -759,7 +759,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # object because we might not have the same amount of rows in each of them. To do
         # this, we use a dict indexed on the user ID and room ID to make it easier to
         # populate.
-        summaries = {}  # type: Dict[Tuple[str, str], _EventPushSummary]
+        summaries: Dict[Tuple[str, str], _EventPushSummary] = {}
         for row in txn:
             summaries[(row[0], row[1])] = _EventPushSummary(
                 unread_count=row[2],
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 897fa06639..a396a201d4 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -109,10 +109,8 @@ class PersistEventsStore:
 
         # Ideally we'd move these ID gens here, unfortunately some other ID
         # generators are chained off them so doing so is a bit of a PITA.
-        self._backfill_id_gen = (
-            self.store._backfill_id_gen
-        )  # type: MultiWriterIdGenerator
-        self._stream_id_gen = self.store._stream_id_gen  # type: MultiWriterIdGenerator
+        self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen
+        self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen
 
         # This should only exist on instances that are configured to write
         assert (
@@ -221,7 +219,7 @@ class PersistEventsStore:
         Returns:
             Filtered event ids
         """
-        results = []  # type: List[str]
+        results: List[str] = []
 
         def _get_events_which_are_prevs_txn(txn, batch):
             sql = """
@@ -508,7 +506,7 @@ class PersistEventsStore:
         """
 
         # Map from event ID to chain ID/sequence number.
-        chain_map = {}  # type: Dict[str, Tuple[int, int]]
+        chain_map: Dict[str, Tuple[int, int]] = {}
 
         # Set of event IDs to calculate chain ID/seq numbers for.
         events_to_calc_chain_id_for = set(event_to_room_id)
@@ -817,8 +815,8 @@ class PersistEventsStore:
         #      new chain if the sequence number has already been allocated.
         #
 
-        existing_chains = set()  # type: Set[int]
-        tree = []  # type: List[Tuple[str, Optional[str]]]
+        existing_chains: Set[int] = set()
+        tree: List[Tuple[str, Optional[str]]] = []
 
         # We need to do this in a topologically sorted order as we want to
         # generate chain IDs/sequence numbers of an event's auth events before
@@ -848,7 +846,7 @@ class PersistEventsStore:
         )
         txn.execute(sql % (clause,), args)
 
-        chain_to_max_seq_no = {row[0]: row[1] for row in txn}  # type: Dict[Any, int]
+        chain_to_max_seq_no: Dict[Any, int] = {row[0]: row[1] for row in txn}
 
         # Allocate the new events chain ID/sequence numbers.
         #
@@ -858,8 +856,8 @@ class PersistEventsStore:
         # number of new chain IDs in one call, replacing all temporary
         # objects with real allocated chain IDs.
 
-        unallocated_chain_ids = set()  # type: Set[object]
-        new_chain_tuples = {}  # type: Dict[str, Tuple[Any, int]]
+        unallocated_chain_ids: Set[object] = set()
+        new_chain_tuples: Dict[str, Tuple[Any, int]] = {}
         for event_id, auth_event_id in tree:
             # If we reference an auth_event_id we fetch the allocated chain ID,
             # either from the existing `chain_map` or the newly generated
@@ -870,7 +868,7 @@ class PersistEventsStore:
                 if not existing_chain_id:
                     existing_chain_id = chain_map[auth_event_id]
 
-            new_chain_tuple = None  # type: Optional[Tuple[Any, int]]
+            new_chain_tuple: Optional[Tuple[Any, int]] = None
             if existing_chain_id:
                 # We found a chain ID/sequence number candidate, check its
                 # not already taken.
@@ -897,9 +895,9 @@ class PersistEventsStore:
         )
 
         # Map from potentially temporary chain ID to real chain ID
-        chain_id_to_allocated_map = dict(
+        chain_id_to_allocated_map: Dict[Any, int] = dict(
             zip(unallocated_chain_ids, newly_allocated_chain_ids)
-        )  # type: Dict[Any, int]
+        )
         chain_id_to_allocated_map.update((c, c) for c in existing_chains)
 
         return {
@@ -1175,9 +1173,9 @@ class PersistEventsStore:
         Returns:
             list[(EventBase, EventContext)]: filtered list
         """
-        new_events_and_contexts = (
-            OrderedDict()
-        )  # type: OrderedDict[str, Tuple[EventBase, EventContext]]
+        new_events_and_contexts: OrderedDict[
+            str, Tuple[EventBase, EventContext]
+        ] = OrderedDict()
         for event, context in events_and_contexts:
             prev_event_context = new_events_and_contexts.get(event.event_id)
             if prev_event_context:
@@ -1205,7 +1203,7 @@ class PersistEventsStore:
                 we are persisting
             backfilled (bool): True if the events were backfilled
         """
-        depth_updates = {}  # type: Dict[str, int]
+        depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
             txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
@@ -1580,11 +1578,11 @@ class PersistEventsStore:
         # invalidate the cache for the redacted event
         txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
 
-        self.db_pool.simple_insert_txn(
+        self.db_pool.simple_upsert_txn(
             txn,
             table="redactions",
+            keyvalues={"event_id": event.event_id},
             values={
-                "event_id": event.event_id,
                 "redacts": event.redacts,
                 "received_ts": self._clock.time_msec(),
             },
@@ -1885,7 +1883,7 @@ class PersistEventsStore:
                 ),
             )
 
-            room_to_event_ids = {}  # type: Dict[str, List[str]]
+            room_to_event_ids: Dict[str, List[str]] = {}
             for e, _ in events_and_contexts:
                 room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
 
@@ -2012,10 +2010,6 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
-        events_by_room = {}  # type: Dict[str, List[EventBase]]
-        for ev in events:
-            events_by_room.setdefault(ev.room_id, []).append(ev)
-
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index da3a7df27b..6fcb2b8353 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,13 +29,18 @@ from synapse.types import JsonDict
 logger = logging.getLogger(__name__)
 
 
-_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
+_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
     # there should be no leftover rows without a stream_ordering2, but just in case...
     "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
-    # finally, we can drop the rule and switch the columns
+    # now we can drop the rule and switch the columns
     "DROP RULE populate_stream_ordering2 ON events",
     "ALTER TABLE events DROP COLUMN stream_ordering",
     "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
+    # ... and finally, rename the indexes into place for consistency with sqlite
+    "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
+    "ALTER INDEX events_order_room2 RENAME TO events_order_room",
+    "ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
+    "ALTER INDEX events_ts2 RENAME TO events_ts",
 )
 
 
@@ -45,6 +50,10 @@ class _BackgroundUpdates:
     DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
     POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
     INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+    INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
+    INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
+    INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
+    INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
     REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
 
 
@@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._purged_chain_cover_index,
         )
 
+        ################################################################################
+
         # bg updates for replacing stream_ordering with a BIGINT
         # (these only run on postgres.)
+
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
             self._background_populate_stream_ordering2,
         )
+        # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
         self.db_pool.updates.register_background_index_update(
             _BackgroundUpdates.INDEX_STREAM_ORDERING2,
             index_name="events_stream_ordering",
@@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             columns=["stream_ordering2"],
             unique=True,
         )
+        # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
+            index_name="event_contains_url_index2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+            where_clause="contains_url = true AND outlier = false",
+        )
+        # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
+            index_name="events_order_room2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+        )
+        # CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
+            index_name="events_room_stream2",
+            table="events",
+            columns=["room_id", "stream_ordering2"],
+        )
+        # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
+            index_name="events_ts2",
+            table="events",
+            columns=["origin_server_ts", "stream_ordering2"],
+        )
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
             self._background_replace_stream_ordering_column,
         )
 
+        ################################################################################
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -916,9 +960,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         event_to_types = {row[0]: (row[1], row[2]) for row in rows}
 
         # Calculate the new last position we've processed up to.
-        new_last_depth = rows[-1][3] if rows else last_depth  # type: int
-        new_last_stream = rows[-1][4] if rows else last_stream  # type: int
-        new_last_room_id = rows[-1][5] if rows else ""  # type: str
+        new_last_depth: int = rows[-1][3] if rows else last_depth
+        new_last_stream: int = rows[-1][4] if rows else last_stream
+        new_last_room_id: str = rows[-1][5] if rows else ""
 
         # Map from room_id to last depth/stream_ordering processed for the room,
         # excluding the last room (which we're likely still processing). We also
@@ -945,7 +989,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             retcols=("event_id", "auth_id"),
         )
 
-        event_to_auth_chain = {}  # type: Dict[str, List[str]]
+        event_to_auth_chain: Dict[str, List[str]] = {}
         for row in auth_events:
             event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"])
 
@@ -1098,10 +1142,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
 
         def process(txn: Cursor) -> None:
-            for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
+            for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
                 logger.info("completing stream_ordering migration: %s", sql)
                 txn.execute(sql)
 
+        # ANALYZE the new column to build stats on it, to encourage PostgreSQL to use the
+        # indexes on it.
+        # We need to pass execute a dummy function to handle the txn's result otherwise
+        # it tries to call fetchall() on it and fails because there's no result to fetch.
+        await self.db_pool.execute(
+            "background_analyze_new_stream_ordering_column",
+            lambda txn: None,
+            "ANALYZE events(stream_ordering2)",
+        )
+
         await self.db_pool.runInteraction(
             "_background_replace_stream_ordering_column", process
         )
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 86fd79f3a6..5ff29530bb 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1468,10 +1468,10 @@ class EventsWorkerStore(SQLBaseStore):
         # we need to make sure that, for every stream id in the results, we get *all*
         # the rows with that stream id.
 
-        rows = await self.db_pool.runInteraction(
+        rows: List[Tuple] = await self.db_pool.runInteraction(
             "get_all_updated_current_state_deltas",
             get_all_updated_current_state_deltas_txn,
-        )  # type: List[Tuple]
+        )
 
         # if we've got fewer rows than the limit, we're good
         if len(rows) < target_row_count:
@@ -1572,7 +1572,7 @@ class EventsWorkerStore(SQLBaseStore):
         """
 
         mapping = {}
-        txn_id_to_event = {}  # type: Dict[Tuple[str, int, str], str]
+        txn_id_to_event: Dict[Tuple[str, int, str], str] = {}
 
         for event in events:
             token_id = getattr(event.internal_metadata, "token_id", None)
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index 66ad363bfb..e70d3649ff 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -27,8 +27,11 @@ from synapse.util import json_encoder
 _DEFAULT_CATEGORY_ID = ""
 _DEFAULT_ROLE_ID = ""
 
+
 # A room in a group.
-_RoomInGroup = TypedDict("_RoomInGroup", {"room_id": str, "is_public": bool})
+class _RoomInGroup(TypedDict):
+    room_id: str
+    is_public: bool
 
 
 class GroupServerWorkerStore(SQLBaseStore):
@@ -92,6 +95,7 @@ class GroupServerWorkerStore(SQLBaseStore):
               "is_public": False                    # Whether this is a public room or not
             }
         """
+
         # TODO: Pagination
 
         def _get_rooms_in_group_txn(txn):
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index e76188328c..774861074c 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -310,14 +310,25 @@ class Lock:
         _excinst: Optional[BaseException],
         _exctb: Optional[TracebackType],
     ) -> bool:
+        await self.release()
+
+        return False
+
+    async def release(self) -> None:
+        """Release the lock.
+
+        This is automatically called when using the lock as a context manager.
+        """
+
+        if self._dropped:
+            return
+
         if self._looping_call.running:
             self._looping_call.stop()
 
         await self._store._drop_lock(self._lock_name, self._lock_key, self._token)
         self._dropped = True
 
-        return False
-
     def __del__(self) -> None:
         if not self._dropped:
             # We should not be dropped without the lock being released (unless
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index c3f551d377..dc0bbc56ac 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -316,11 +316,140 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
 
         return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
 
+    async def count_r30v2_users(self) -> Dict[str, int]:
+        """
+        Counts the number of 30 day retained users, defined as users that:
+         - Appear more than once in the past 60 days
+         - Have more than 30 days between the most and least recent appearances that
+           occurred in the past 60 days.
+
+        (This is the second version of this metric, hence R30'v2')
+
+        Returns:
+             A mapping from client type to the number of 30-day retained users for that client.
+
+             The dict keys are:
+              - "all" (a combined number of users across any and all clients)
+              - "android" (Element Android)
+              - "ios" (Element iOS)
+              - "electron" (Element Desktop)
+              - "web" (any web application -- it's not possible to distinguish Element Web here)
+        """
+
+        def _count_r30v2_users(txn):
+            thirty_days_in_secs = 86400 * 30
+            now = int(self._clock.time())
+            sixty_days_ago_in_secs = now - 2 * thirty_days_in_secs
+            one_day_from_now_in_secs = now + 86400
+
+            # This is the 'per-platform' count.
+            sql = """
+                SELECT
+                    client_type,
+                    count(client_type)
+                FROM
+                    (
+                        SELECT
+                            user_id,
+                            CASE
+                                WHEN
+                                    LOWER(user_agent) LIKE '%%riot%%' OR
+                                    LOWER(user_agent) LIKE '%%element%%'
+                                    THEN CASE
+                                        WHEN
+                                            LOWER(user_agent) LIKE '%%electron%%'
+                                            THEN 'electron'
+                                        WHEN
+                                            LOWER(user_agent) LIKE '%%android%%'
+                                            THEN 'android'
+                                        WHEN
+                                            LOWER(user_agent) LIKE '%%ios%%'
+                                            THEN 'ios'
+                                        ELSE 'unknown'
+                                    END
+                                WHEN
+                                    LOWER(user_agent) LIKE '%%mozilla%%' OR
+                                    LOWER(user_agent) LIKE '%%gecko%%'
+                                    THEN 'web'
+                                ELSE 'unknown'
+                            END as client_type
+                        FROM
+                            user_daily_visits
+                        WHERE
+                            timestamp > ?
+                            AND
+                            timestamp < ?
+                        GROUP BY
+                            user_id,
+                            client_type
+                        HAVING
+                            max(timestamp) - min(timestamp) > ?
+                    ) AS temp
+                GROUP BY
+                    client_type
+                ;
+            """
+
+            # We initialise all the client types to zero, so we get an explicit
+            # zero if they don't appear in the query results
+            results = {"ios": 0, "android": 0, "web": 0, "electron": 0}
+            txn.execute(
+                sql,
+                (
+                    sixty_days_ago_in_secs * 1000,
+                    one_day_from_now_in_secs * 1000,
+                    thirty_days_in_secs * 1000,
+                ),
+            )
+
+            for row in txn:
+                if row[0] == "unknown":
+                    continue
+                results[row[0]] = row[1]
+
+            # This is the 'all users' count.
+            sql = """
+                SELECT COUNT(*) FROM (
+                    SELECT
+                        1
+                    FROM
+                        user_daily_visits
+                    WHERE
+                        timestamp > ?
+                        AND
+                        timestamp < ?
+                    GROUP BY
+                        user_id
+                    HAVING
+                        max(timestamp) - min(timestamp) > ?
+                ) AS r30_users
+            """
+
+            txn.execute(
+                sql,
+                (
+                    sixty_days_ago_in_secs * 1000,
+                    one_day_from_now_in_secs * 1000,
+                    thirty_days_in_secs * 1000,
+                ),
+            )
+            row = txn.fetchone()
+            if row is None:
+                results["all"] = 0
+            else:
+                results["all"] = row[0]
+
+            return results
+
+        return await self.db_pool.runInteraction(
+            "count_r30v2_users", _count_r30v2_users
+        )
+
     def _get_start_of_day(self):
         """
         Returns millisecond unixtime for start of UTC day.
         """
-        now = time.gmtime()
+        now = time.gmtime(self._clock.time())
         today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
         return today_start * 1000
 
@@ -352,7 +481,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
                     ) udv
                     ON u.user_id = udv.user_id AND u.device_id=udv.device_id
                     INNER JOIN users ON users.name=u.user_id
-                    WHERE last_seen > ? AND last_seen <= ?
+                    WHERE ? <= last_seen AND last_seen < ?
                     AND udv.timestamp IS NULL AND users.is_guest=0
                     AND users.appservice_id IS NULL
                     GROUP BY u.user_id, u.device_id
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 9b4e95e134..ba7075caa5 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -73,20 +73,20 @@ class ProfileWorkerStore(SQLBaseStore):
     async def set_profile_displayname(
         self, user_localpart: str, new_displayname: Optional[str]
     ) -> None:
-        await self.db_pool.simple_update_one(
+        await self.db_pool.simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
+            values={"displayname": new_displayname},
             desc="set_profile_displayname",
         )
 
     async def set_profile_avatar_url(
         self, user_localpart: str, new_avatar_url: Optional[str]
     ) -> None:
-        await self.db_pool.simple_update_one(
+        await self.db_pool.simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"avatar_url": new_avatar_url},
+            values={"avatar_url": new_avatar_url},
             desc="set_profile_avatar_url",
         )
 
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 7fb7780d0f..664c65dac5 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -115,7 +115,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         logger.info("[purge] looking for events to delete")
 
         should_delete_expr = "state_key IS NULL"
-        should_delete_params = ()  # type: Tuple[Any, ...]
+        should_delete_params: Tuple[Any, ...] = ()
         if not delete_local_events:
             should_delete_expr += " AND event_id NOT LIKE ?"
 
@@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "event_relations",
             "event_search",
             "rejections",
+            "redactions",
         ):
             logger.info("[purge] removing events from %s", table)
 
@@ -392,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "room_memberships",
             "room_stats_state",
             "room_stats_current",
-            "room_stats_historical",
             "room_stats_earliest_token",
             "rooms",
             "stream_ordering_to_exterm",
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index db52176337..a7fb8cd848 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -79,9 +79,9 @@ class PushRulesWorkerStore(
         super().__init__(database, db_conn, hs)
 
         if hs.config.worker.worker_app is None:
-            self._push_rules_stream_id_gen = StreamIdGenerator(
-                db_conn, "push_rules_stream", "stream_id"
-            )  # type: Union[StreamIdGenerator, SlavedIdTracker]
+            self._push_rules_stream_id_gen: Union[
+                StreamIdGenerator, SlavedIdTracker
+            ] = StreamIdGenerator(db_conn, "push_rules_stream", "stream_id")
         else:
             self._push_rules_stream_id_gen = SlavedIdTracker(
                 db_conn, "push_rules_stream", "stream_id"
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index e31c5864ac..6ad1a0cf7f 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1744,7 +1744,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
 
             items = keyvalues.items()
             where_clause = " AND ".join(k + " = ?" for k, _ in items)
-            values = [v for _, v in items]  # type: List[Union[str, int]]
+            values: List[Union[str, int]] = [v for _, v in items]
             # Conveniently, refresh_tokens and access_tokens both use the user_id and device_id fields. Only caveat
             # is the `except_token_id` param that is tricky to get right, so for now we're just using the same where
             # clause and values before we handle that. This seems to be only used in the "set password" handler.
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 9f0d64a325..6ddafe5434 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.search import SearchStore
+from synapse.storage.types import Cursor
 from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore):
         )
 
 
-class RoomBackgroundUpdateStore(SQLBaseStore):
+class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
     ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
+    POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
+    REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
+
+
+_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
+    "DROP TRIGGER populate_min_depth2_trigger ON room_depth",
+    "DROP FUNCTION populate_min_depth2()",
+    "ALTER TABLE room_depth DROP COLUMN min_depth",
+    "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth",
+)
+
 
+class RoomBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
@@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
+            _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
             self._remove_tombstoned_rooms_from_directory,
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.ADD_ROOMS_ROOM_VERSION_COLUMN,
+            _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
             self._background_add_rooms_room_version_column,
         )
 
+        # BG updates to change the type of room_depth.min_depth
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+            self._background_populate_room_depth_min_depth2,
+        )
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+            self._background_replace_room_depth_min_depth,
+        )
+
     async def _background_insert_retention(self, progress, batch_size):
         """Retrieves a list of all rooms within a range and inserts an entry for each of
         them into the room_retention table.
@@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
                 new_last_room_id = room_id
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
+                txn,
+                _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
+                {"room_id": new_last_room_id},
             )
 
             return False
@@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         if end:
             await self.db_pool.updates._end_background_update(
-                self.ADD_ROOMS_ROOM_VERSION_COLUMN
+                _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN
             )
 
         return batch_size
@@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         if not rooms:
             await self.db_pool.updates._end_background_update(
-                self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
+                _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
             )
             return 0
 
@@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
             await self.set_room_is_public(room_id, False)
 
         await self.db_pool.updates._background_update_progress(
-            self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
+            _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
         )
 
         return len(rooms)
@@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         return max_ordering is None
 
+    async def _background_populate_room_depth_min_depth2(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Populate room_depth.min_depth2
+
+        This is to deal with the fact that min_depth was initially created as a
+        32-bit integer field.
+        """
+
+        def process(txn: Cursor) -> int:
+            last_room = progress.get("last_room", "")
+            txn.execute(
+                """
+                UPDATE room_depth SET min_depth2=min_depth
+                WHERE room_id IN (
+                   SELECT room_id FROM room_depth WHERE room_id > ?
+                   ORDER BY room_id LIMIT ?
+                )
+                RETURNING room_id;
+                """,
+                (last_room, batch_size),
+            )
+            row_count = txn.rowcount
+            if row_count == 0:
+                return 0
+            last_room = max(row[0] for row in txn)
+            logger.info("populated room_depth up to %s", last_room)
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+                {"last_room": last_room},
+            )
+            return row_count
+
+        result = await self.db_pool.runInteraction(
+            "_background_populate_min_depth2", process
+        )
+
+        if result != 0:
+            return result
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2
+        )
+        return 0
+
+    async def _background_replace_room_depth_min_depth(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Drop the old 'min_depth' column and rename 'min_depth2' into its place."""
+
+        def process(txn: Cursor) -> None:
+            for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS:
+                logger.info("completing room_depth migration: %s", sql)
+                txn.execute(sql)
+
+        await self.db_pool.runInteraction("_background_replace_room_depth", process)
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+        )
+
+        return 0
+
 
 class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4eebe5257a..b6cb6836c2 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -649,7 +649,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             event_to_memberships = await self._get_joined_profiles_from_event_ids(
                 missing_member_event_ids
             )
-            users_in_room.update((row for row in event_to_memberships.values() if row))
+            users_in_room.update(row for row in event_to_memberships.values() if row)
 
         if event is not None and event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -703,13 +703,22 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cached(max_entries=10000)
     async def is_host_joined(self, room_id: str, host: str) -> bool:
+        return await self._check_host_room_membership(room_id, host, Membership.JOIN)
+
+    @cached(max_entries=10000)
+    async def is_host_invited(self, room_id: str, host: str) -> bool:
+        return await self._check_host_room_membership(room_id, host, Membership.INVITE)
+
+    async def _check_host_room_membership(
+        self, room_id: str, host: str, membership: str
+    ) -> bool:
         if "%" in host or "_" in host:
             raise Exception("Invalid host name")
 
         sql = """
             SELECT state_key FROM current_state_events AS c
             INNER JOIN room_memberships AS m USING (event_id)
-            WHERE m.membership = 'join'
+            WHERE m.membership = ?
                 AND type = 'm.room.member'
                 AND c.room_id = ?
                 AND state_key LIKE ?
@@ -722,7 +731,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         like_clause = "%:" + host
 
         rows = await self.db_pool.execute(
-            "is_host_joined", None, sql, room_id, like_clause
+            "is_host_joined", None, sql, membership, room_id, like_clause
         )
 
         if not rows:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 82a1833509..59d67c255b 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import StoreError
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.state_deltas import StateDeltasStore
-from synapse.storage.engines import PostgresEngine
 from synapse.types import JsonDict
 from synapse.util.caches.descriptors import cached
 
@@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = {
     "user": ("joined_rooms",),
 }
 
-# these fields are per-timeslice and so should be reset to 0 upon a new slice
-# You can draw these stats on a histogram.
-# Example: number of events sent locally during a time slice
-PER_SLICE_FIELDS = {
-    "room": ("total_events", "total_event_bytes"),
-    "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
-}
-
 TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
 # these are the tables (& ID columns) which contain our actual subjects
@@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore):
         self.server_name = hs.hostname
         self.clock = self.hs.get_clock()
         self.stats_enabled = hs.config.stats_enabled
-        self.stats_bucket_size = hs.config.stats_bucket_size
 
         self.stats_delta_processing_lock = DeferredLock()
 
@@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore):
         self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
         self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
 
-    def quantise_stats_time(self, ts):
-        """
-        Quantises a timestamp to be a multiple of the bucket size.
-
-        Args:
-            ts (int): the timestamp to quantise, in milliseconds since the Unix
-                Epoch
-
-        Returns:
-            int: a timestamp which
-              - is divisible by the bucket size;
-              - is no later than `ts`; and
-              - is the largest such timestamp.
-        """
-        return (ts // self.stats_bucket_size) * self.stats_bucket_size
-
     async def _populate_stats_process_users(self, progress, batch_size):
         """
         This is a background update which regenerates statistics for users.
@@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    async def get_statistics_for_subject(
-        self, stats_type: str, stats_id: str, start: str, size: int = 100
-    ) -> List[dict]:
-        """
-        Get statistics for a given subject.
-
-        Args:
-            stats_type: The type of subject
-            stats_id: The ID of the subject (e.g. room_id or user_id)
-            start: Pagination start. Number of entries, not timestamp.
-            size: How many entries to return.
-
-        Returns:
-            A list of dicts, where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
-        """
-        return await self.db_pool.runInteraction(
-            "get_statistics_for_subject",
-            self._get_statistics_for_subject_txn,
-            stats_type,
-            stats_id,
-            start,
-            size,
-        )
-
-    def _get_statistics_for_subject_txn(
-        self, txn, stats_type, stats_id, start, size=100
-    ):
-        """
-        Transaction-bound version of L{get_statistics_for_subject}.
-        """
-
-        table, id_col = TYPE_TO_TABLE[stats_type]
-        selected_columns = list(
-            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
-        )
-
-        slice_list = self.db_pool.simple_select_list_paginate_txn(
-            txn,
-            table + "_historical",
-            "end_ts",
-            start,
-            size,
-            retcols=selected_columns + ["bucket_size", "end_ts"],
-            keyvalues={id_col: stats_id},
-            order_direction="DESC",
-        )
-
-        return slice_list
-
     @cached()
     async def get_earliest_token_for_stats(
         self, stats_type: str, id: str
@@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore):
 
         table, id_col = TYPE_TO_TABLE[stats_type]
 
-        quantised_ts = self.quantise_stats_time(int(ts))
-        end_ts = quantised_ts + self.stats_bucket_size
-
         # Lets be paranoid and check that all the given field names are known
         abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
-        slice_field_names = PER_SLICE_FIELDS[stats_type]
         for field in chain(fields.keys(), absolute_field_overrides.keys()):
-            if field not in abs_field_names and field not in slice_field_names:
+            if field not in abs_field_names:
                 # guard against potential SQL injection dodginess
                 raise ValueError(
                     "%s is not a recognised field"
@@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore):
             additive_relatives=deltas_of_absolute_fields,
         )
 
-        per_slice_additive_relatives = {
-            key: fields.get(key, 0) for key in slice_field_names
-        }
-        self._upsert_copy_from_table_with_additive_relatives_txn(
-            txn=txn,
-            into_table=table + "_historical",
-            keyvalues={id_col: stats_id},
-            extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
-            extra_dst_keyvalues={"end_ts": end_ts},
-            additive_relatives=per_slice_additive_relatives,
-            src_table=table + "_current",
-            copy_columns=abs_field_names,
-        )
-
     def _upsert_with_additive_relatives_txn(
         self, txn, table, keyvalues, absolutes, additive_relatives
     ):
@@ -528,7 +434,7 @@ class StatsStore(StateDeltasStore):
             ]
 
             relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
                 % {"table": table, "field": field}
                 for field in additive_relatives.keys()
             ]
@@ -568,205 +474,13 @@ class StatsStore(StateDeltasStore):
                 self.db_pool.simple_insert_txn(txn, table, merged_dict)
             else:
                 for (key, val) in additive_relatives.items():
-                    current_row[key] += val
+                    if current_row[key] is None:
+                        current_row[key] = val
+                    else:
+                        current_row[key] += val
                 current_row.update(absolutes)
                 self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
 
-    def _upsert_copy_from_table_with_additive_relatives_txn(
-        self,
-        txn,
-        into_table,
-        keyvalues,
-        extra_dst_keyvalues,
-        extra_dst_insvalues,
-        additive_relatives,
-        src_table,
-        copy_columns,
-    ):
-        """Updates the historic stats table with latest updates.
-
-        This involves copying "absolute" fields from the `_current` table, and
-        adding relative fields to any existing values.
-
-        Args:
-             txn: Transaction
-             into_table (str): The destination table to UPSERT the row into
-             keyvalues (dict[str, any]): Row-identifying key values
-             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
-                for `into_table`.
-             extra_dst_insvalues (dict[str, any]): Additional values to insert
-                on new row creation for `into_table`.
-             additive_relatives (dict[str, any]): Fields that will be added onto
-                if existing row present. (Must be disjoint from copy_columns.)
-             src_table (str): The source table to copy from
-             copy_columns (iterable[str]): The list of columns to copy
-        """
-        if self.database_engine.can_native_upsert:
-            ins_columns = chain(
-                keyvalues,
-                copy_columns,
-                additive_relatives,
-                extra_dst_keyvalues,
-                extra_dst_insvalues,
-            )
-            sel_exprs = chain(
-                keyvalues,
-                copy_columns,
-                (
-                    "?"
-                    for _ in chain(
-                        additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
-                    )
-                ),
-            )
-            keyvalues_where = ("%s = ?" % f for f in keyvalues)
-
-            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
-            sets_ar = (
-                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
-                for f in additive_relatives
-            )
-
-            sql = """
-                INSERT INTO %(into_table)s (%(ins_columns)s)
-                SELECT %(sel_exprs)s
-                FROM %(src_table)s
-                WHERE %(keyvalues_where)s
-                ON CONFLICT (%(keyvalues)s)
-                DO UPDATE SET %(sets)s
-            """ % {
-                "into_table": into_table,
-                "ins_columns": ", ".join(ins_columns),
-                "sel_exprs": ", ".join(sel_exprs),
-                "keyvalues_where": " AND ".join(keyvalues_where),
-                "src_table": src_table,
-                "keyvalues": ", ".join(
-                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
-                ),
-                "sets": ", ".join(chain(sets_cc, sets_ar)),
-            }
-
-            qargs = list(
-                chain(
-                    additive_relatives.values(),
-                    extra_dst_keyvalues.values(),
-                    extra_dst_insvalues.values(),
-                    keyvalues.values(),
-                )
-            )
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, into_table)
-            src_row = self.db_pool.simple_select_one_txn(
-                txn, src_table, keyvalues, copy_columns
-            )
-            all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
-            dest_current_row = self.db_pool.simple_select_one_txn(
-                txn,
-                into_table,
-                keyvalues=all_dest_keyvalues,
-                retcols=list(chain(additive_relatives.keys(), copy_columns)),
-                allow_none=True,
-            )
-
-            if dest_current_row is None:
-                merged_dict = {
-                    **keyvalues,
-                    **extra_dst_keyvalues,
-                    **extra_dst_insvalues,
-                    **src_row,
-                    **additive_relatives,
-                }
-                self.db_pool.simple_insert_txn(txn, into_table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    src_row[key] = dest_current_row[key] + val
-                self.db_pool.simple_update_txn(
-                    txn, into_table, all_dest_keyvalues, src_row
-                )
-
-    async def get_changes_room_total_events_and_bytes(
-        self, min_pos: int, max_pos: int
-    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
-        """Fetches the counts of events in the given range of stream IDs.
-
-        Args:
-            min_pos
-            max_pos
-
-        Returns:
-            Mapping of room ID to field changes.
-        """
-
-        return await self.db_pool.runInteraction(
-            "stats_incremental_total_events_and_bytes",
-            self.get_changes_room_total_events_and_bytes_txn,
-            min_pos,
-            max_pos,
-        )
-
-    def get_changes_room_total_events_and_bytes_txn(
-        self, txn, low_pos: int, high_pos: int
-    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
-        """Gets the total_events and total_event_bytes counts for rooms and
-        senders, in a range of stream_orderings (including backfilled events).
-
-        Args:
-            txn
-            low_pos: Low stream ordering
-            high_pos: High stream ordering
-
-        Returns:
-            The room and user deltas for total_events/total_event_bytes in the
-            format of `stats_id` -> fields
-        """
-
-        if low_pos >= high_pos:
-            # nothing to do here.
-            return {}, {}
-
-        if isinstance(self.database_engine, PostgresEngine):
-            new_bytes_expression = "OCTET_LENGTH(json)"
-        else:
-            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
-
-        sql = """
-            SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.room_id
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        room_deltas = {
-            room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for room_id, new_events, new_bytes in txn
-        }
-
-        sql = """
-            SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.sender
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        user_deltas = {
-            user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for user_id, new_events, new_bytes in txn
-            if self.hs.is_mine_id(user_id)
-        }
-
-        return room_deltas, user_deltas
-
     async def _calculate_and_set_initial_state_for_room(
         self, room_id: str
     ) -> Tuple[dict, dict, int]:
@@ -893,6 +607,7 @@ class StatsStore(StateDeltasStore):
                 "invited_members": membership_counts.get(Membership.INVITE, 0),
                 "left_members": membership_counts.get(Membership.LEAVE, 0),
                 "banned_members": membership_counts.get(Membership.BAN, 0),
+                "knocked_members": membership_counts.get(Membership.KNOCK, 0),
                 "local_users_in_room": len(local_users_in_room),
             },
         )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 7581c7d3ff..959f13de47 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1085,9 +1085,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
         # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
         # then filtering the results.
         if from_token.topological is not None:
-            from_bound = (
-                from_token.as_historical_tuple()
-            )  # type: Tuple[Optional[int], int]
+            from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
         elif direction == "b":
             from_bound = (
                 None,
@@ -1099,7 +1097,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
                 from_token.stream,
             )
 
-        to_bound = None  # type: Optional[Tuple[Optional[int], int]]
+        to_bound: Optional[Tuple[Optional[int], int]] = None
         if to_token:
             if to_token.topological is not None:
                 to_bound = to_token.as_historical_tuple()
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index 1d62c6140f..f93ff0a545 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -42,7 +42,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
         )
 
-        tags_by_room = {}  # type: Dict[str, Dict[str, JsonDict]]
+        tags_by_room: Dict[str, Dict[str, JsonDict]] = {}
         for row in rows:
             room_tags = tags_by_room.setdefault(row["room_id"], {})
             room_tags[row["tag"]] = db_to_json(row["content"])
diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index 22c05cdde7..38bfdf5dad 100644
--- a/synapse/storage/databases/main/ui_auth.py
+++ b/synapse/storage/databases/main/ui_auth.py
@@ -224,12 +224,12 @@ class UIAuthWorkerStore(SQLBaseStore):
         self, txn: LoggingTransaction, session_id: str, key: str, value: Any
     ):
         # Get the current value.
-        result = self.db_pool.simple_select_one_txn(
+        result: Dict[str, Any] = self.db_pool.simple_select_one_txn(  # type: ignore
             txn,
             table="ui_auth_sessions",
             keyvalues={"session_id": session_id},
             retcols=("serverdict",),
-        )  # type: Dict[str, Any]  # type: ignore
+        )
 
         # Update it and add it back to the database.
         serverdict = db_to_json(result["serverdict"])
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 1882bfd9cf..20cd63c330 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
         """
         ...
 
+    @property
+    @abc.abstractmethod
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        ...
+
     @abc.abstractmethod
     def check_database(
         self, db_conn: ConnectionType, allow_outdated_version: bool = False
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 21411c5fea..30f948a0f7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine):
         """Do we support using `a = ANY(?)` and passing a list"""
         return True
 
+    @property
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        return True
+
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
             # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 5fe1b205e1..70d17d4f2c 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
         """Do we support using `a = ANY(?)` and passing a list"""
         return False
 
+    @property
+    def supports_returning(self) -> bool:
+        """Do we support the `RETURNING` clause in insert/update/delete?"""
+        return self.module.sqlite_version_info >= (3, 35, 0)
+
     def check_database(self, db_conn, allow_outdated_version: bool = False):
         if not allow_outdated_version:
             version = self.module.sqlite_version_info
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 051095fea9..a39877f0d5 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -307,7 +307,7 @@ class EventsPersistenceStorage:
             matched the transcation ID; the existing event is returned in such
             a case.
         """
-        partitioned = {}  # type: Dict[str, List[Tuple[EventBase, EventContext]]]
+        partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
         for event, ctx in events_and_contexts:
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
@@ -384,7 +384,7 @@ class EventsPersistenceStorage:
             A dictionary of event ID to event ID we didn't persist as we already
             had another event persisted with the same TXN ID.
         """
-        replaced_events = {}  # type: Dict[str, str]
+        replaced_events: Dict[str, str] = {}
         if not events_and_contexts:
             return replaced_events
 
@@ -440,16 +440,14 @@ class EventsPersistenceStorage:
             # Set of remote users which were in rooms the server has left. We
             # should check if we still share any rooms and if not we mark their
             # device lists as stale.
-            potentially_left_users = set()  # type: Set[str]
+            potentially_left_users: Set[str] = set()
 
             if not backfilled:
                 with Measure(self._clock, "_calculate_state_and_extrem"):
                     # Work out the new "current state" for each room.
                     # We do this by working out what the new extremities are and then
                     # calculating the state from that.
-                    events_by_room = (
-                        {}
-                    )  # type: Dict[str, List[Tuple[EventBase, EventContext]]]
+                    events_by_room: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
                     for event, context in chunk:
                         events_by_room.setdefault(event.room_id, []).append(
                             (event, context)
@@ -622,9 +620,9 @@ class EventsPersistenceStorage:
         )
 
         # Remove any events which are prev_events of any existing events.
-        existing_prevs = await self.persist_events_store._get_events_which_are_prevs(
-            result
-        )  # type: Collection[str]
+        existing_prevs: Collection[
+            str
+        ] = await self.persist_events_store._get_events_which_are_prevs(result)
         result.difference_update(existing_prevs)
 
         # Finally handle the case where the new events have soft-failed prev
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 683e5e3b90..61392b9639 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -256,7 +256,7 @@ def _setup_new_database(
         for database in databases
     )
 
-    directory_entries = []  # type: List[_DirectoryListing]
+    directory_entries: List[_DirectoryListing] = []
     for directory in directories:
         directory_entries.extend(
             _DirectoryListing(file_name, os.path.join(directory, file_name))
@@ -424,10 +424,10 @@ def _upgrade_existing_database(
             directories.append(os.path.join(schema_path, database, "delta", str(v)))
 
         # Used to check if we have any duplicate file names
-        file_name_counter = Counter()  # type: CounterType[str]
+        file_name_counter: CounterType[str] = Counter()
 
         # Now find which directories have anything of interest.
-        directory_entries = []  # type: List[_DirectoryListing]
+        directory_entries: List[_DirectoryListing] = []
         for directory in directories:
             logger.debug("Looking for schema deltas in %s", directory)
             try:
@@ -639,7 +639,7 @@ def get_statements(f: Iterable[str]) -> Generator[str, None, None]:
 
 
 def executescript(txn: Cursor, schema_path: str) -> None:
-    with open(schema_path, "r") as f:
+    with open(schema_path) as f:
         execute_statements_from_stream(txn, f)
 
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 0a53b73ccc..36340a652a 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 60
+SCHEMA_VERSION = 61
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -21,6 +21,10 @@ older versions of Synapse).
 
 See `README.md <synapse/storage/schema/README.md>`_  for more information on how this
 works.
+
+Changes in SCHEMA_VERSION = 61:
+    - The `user_stats_historical` and `room_stats_historical` tables are not written and
+      are not read (previously, they were written but not read).
 """
 
 
diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
index 88c9f8bd0d..0edc9fe7a2 100644
--- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
+++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres
@@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS
 INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
   (6001, 'populate_stream_ordering2', '{}');
 
--- ... and another to build an index on it
+-- ... and some more to build indexes on it. These aren't really interdependent
+-- but the backround_updates manager can only handle a single dependency per update.
 INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
-  (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2');
+  (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'),
+  (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'),
+  (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'),
+  (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'),
+  (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream');
 
 -- ... and another to do the switcheroo
 INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
-  (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2');
+  (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts');
diff --git a/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
new file mode 100644
index 0000000000..630c24fd9e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/60/02change_stream_ordering_columns.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This migration is closely related to '01recreate_stream_ordering.sql.postgres'.
+--
+-- It updates the other tables which use an INTEGER to refer to a stream ordering.
+-- These tables are all small enough that a re-create is tractable.
+ALTER TABLE pushers ALTER COLUMN last_stream_ordering SET DATA TYPE BIGINT;
+ALTER TABLE federation_stream_position ALTER COLUMN stream_id SET DATA TYPE BIGINT;
+
+-- these aren't actually event stream orderings, but they are numbers where 2 billion
+-- is a bit limiting, application_services_state is tiny, and I don't want to ever have
+-- to do this again.
+ALTER TABLE application_services_state ALTER COLUMN last_txn SET DATA TYPE BIGINT;
+ALTER TABLE application_services_state ALTER COLUMN read_receipt_stream_id SET DATA TYPE BIGINT;
+ALTER TABLE application_services_state ALTER COLUMN presence_stream_id SET DATA TYPE BIGINT;
+
+
diff --git a/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres
new file mode 100644
index 0000000000..c8aec78e60
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/01change_appservices_txns.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- we use bigint elsewhere in the database for appservice txn ids (notably,
+-- application_services_state.last_txn), and generally we use bigints everywhere else
+-- we have monotonic counters, so let's bring this one in line.
+--
+-- assuming there aren't thousands of rows for decommisioned/non-functional ASes, this
+-- table should be pretty small, so safe to do a synchronous ALTER TABLE.
+
+ALTER TABLE application_services_txns ALTER COLUMN txn_id SET DATA TYPE BIGINT;
diff --git a/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql
new file mode 100644
index 0000000000..35ca7a40c0
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/02drop_redundant_room_depth_index.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this index is redundant; there is another UNIQUE index on this table.
+DROP INDEX IF EXISTS room_depth_room;
+
diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
new file mode 100644
index 0000000000..f8d7db9f2e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
@@ -0,0 +1,70 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This migration handles the process of changing the type of `room_depth.min_depth` to
+a BIGINT.
+"""
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.types import Cursor
+
+
+def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    if not isinstance(database_engine, PostgresEngine):
+        # this only applies to postgres - sqlite does not distinguish between big and
+        # little ints.
+        return
+
+    # First add a new column to contain the bigger min_depth
+    cur.execute("ALTER TABLE room_depth ADD COLUMN min_depth2 BIGINT")
+
+    # Create a trigger which will keep it populated.
+    cur.execute(
+        """
+        CREATE OR REPLACE FUNCTION populate_min_depth2() RETURNS trigger AS $BODY$
+            BEGIN
+                new.min_depth2 := new.min_depth;
+                RETURN NEW;
+            END;
+        $BODY$ LANGUAGE plpgsql
+        """
+    )
+
+    cur.execute(
+        """
+        CREATE TRIGGER populate_min_depth2_trigger BEFORE INSERT OR UPDATE ON room_depth
+        FOR EACH ROW
+        EXECUTE PROCEDURE populate_min_depth2()
+        """
+    )
+
+    # Start a bg process to populate it for old rooms
+    cur.execute(
+        """
+       INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+            (6103, 'populate_room_depth_min_depth2', '{}')
+       """
+    )
+
+    # and another to switch them over once it completes.
+    cur.execute(
+        """
+        INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+            (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2')
+        """
+    )
+
+
+def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    pass
diff --git a/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres b/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres
new file mode 100644
index 0000000000..35a153da7b
--- /dev/null
+++ b/synapse/storage/schema/state/delta/61/02state_groups_state_n_distinct.sql.postgres
@@ -0,0 +1,34 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- By default the postgres statistics collector massively underestimates the
+-- number of distinct state groups are in the `state_groups_state`, which can
+-- cause postgres to use table scans for queries for multiple state groups.
+--
+-- To work around this we can manually tell postgres the number of distinct state
+-- groups there are by setting `n_distinct` (a negative value here is the number
+-- of distinct values divided by the number of rows, so -0.02 means on average
+-- there are 50 rows per distinct value). We don't need a particularly
+-- accurate number here, as a) we just want it to always use index scans and b)
+-- our estimate is going to be better than the one made by the statistics
+-- collector.
+
+ALTER TABLE state_groups_state ALTER COLUMN state_group SET (n_distinct = -0.02);
+
+-- Ideally we'd do an `ANALYZE state_groups_state (state_group)` here so that
+-- the above gets picked up immediately, but that can take a bit of time so we
+-- rely on the autovacuum eventually getting run and doing that in the
+-- background for us.
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index c9dce726cb..f8fbba9d38 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -91,7 +91,7 @@ class StateFilter:
         Returns:
             The new state filter.
         """
-        type_dict = {}  # type: Dict[str, Optional[Set[str]]]
+        type_dict: Dict[str, Optional[Set[str]]] = {}
         for typ, s in types:
             if typ in type_dict:
                 if type_dict[typ] is None:
@@ -194,7 +194,7 @@ class StateFilter:
         """
 
         where_clause = ""
-        where_args = []  # type: List[str]
+        where_args: List[str] = []
 
         if self.is_full():
             return where_clause, where_args
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index f1e62f9e85..c768fdea56 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -112,7 +112,7 @@ class StreamIdGenerator:
         # insertion ordering will ensure its in the correct ordering.
         #
         # The key and values are the same, but we never look at the values.
-        self._unfinished_ids = OrderedDict()  # type: OrderedDict[int, int]
+        self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
 
     def get_next(self):
         """
@@ -236,15 +236,15 @@ class MultiWriterIdGenerator:
         # Note: If we are a negative stream then we still store all the IDs as
         # positive to make life easier for us, and simply negate the IDs when we
         # return them.
-        self._current_positions = {}  # type: Dict[str, int]
+        self._current_positions: Dict[str, int] = {}
 
         # Set of local IDs that we're still processing. The current position
         # should be less than the minimum of this set (if not empty).
-        self._unfinished_ids = set()  # type: Set[int]
+        self._unfinished_ids: Set[int] = set()
 
         # Set of local IDs that we've processed that are larger than the current
         # position, due to there being smaller unpersisted IDs.
-        self._finished_ids = set()  # type: Set[int]
+        self._finished_ids: Set[int] = set()
 
         # We track the max position where we know everything before has been
         # persisted. This is done by a) looking at the min across all instances
@@ -265,7 +265,7 @@ class MultiWriterIdGenerator:
         self._persisted_upto_position = (
             min(self._current_positions.values()) if self._current_positions else 1
         )
-        self._known_persisted_positions = []  # type: List[int]
+        self._known_persisted_positions: List[int] = []
 
         self._sequence_gen = PostgresSequenceGenerator(sequence_name)
 
@@ -465,7 +465,7 @@ class MultiWriterIdGenerator:
             self._unfinished_ids.discard(next_id)
             self._finished_ids.add(next_id)
 
-            new_cur = None  # type: Optional[int]
+            new_cur: Optional[int] = None
 
             if self._unfinished_ids:
                 # If there are unfinished IDs then the new position will be the
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index 30b6b8e0ca..bb33e04fb1 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -208,10 +208,10 @@ class LocalSequenceGenerator(SequenceGenerator):
                  get_next_id_txn; should return the curreent maximum id
         """
         # the callback. this is cleared after it is called, so that it can be GCed.
-        self._callback = get_first_callback  # type: Optional[GetFirstCallbackType]
+        self._callback: Optional[GetFirstCallbackType] = get_first_callback
 
         # The current max value, or None if we haven't looked in the DB yet.
-        self._current_max_id = None  # type: Optional[int]
+        self._current_max_id: Optional[int] = None
         self._lock = threading.Lock()
 
     def get_next_id_txn(self, txn: Cursor) -> int:
@@ -274,7 +274,7 @@ def build_sequence_generator(
             `check_consistency` details.
     """
     if isinstance(database_engine, PostgresEngine):
-        seq = PostgresSequenceGenerator(sequence_name)  # type: SequenceGenerator
+        seq: SequenceGenerator = PostgresSequenceGenerator(sequence_name)
     else:
         seq = LocalSequenceGenerator(get_first_callback)