summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorWill Hunt <willh@matrix.org>2021-07-20 09:35:03 +0100
committerWill Hunt <willh@matrix.org>2021-07-20 09:35:03 +0100
commitf19795355bf31af3842e607b3f1cc34e5d7727dc (patch)
tree85834d47d9f4610fcf70f176114aa17a7b61b8e4 /synapse/storage/databases/main
parentMerge remote-tracking branch 'origin/develop' into hs/hacked-together-event-c... (diff)
parentFactorise `get_datastore` calls in phone_stats_home. (#10427) (diff)
downloadsynapse-f19795355bf31af3842e607b3f1cc34e5d7727dc.tar.xz
Merge remote-tracking branch 'origin/develop' into hs/hacked-together-event-cache
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/appservice.py4
-rw-r--r--synapse/storage/databases/main/deviceinbox.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py142
-rw-r--r--synapse/storage/databases/main/event_push_actions.py2
-rw-r--r--synapse/storage/databases/main/events.py44
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py68
-rw-r--r--synapse/storage/databases/main/events_worker.py6
-rw-r--r--synapse/storage/databases/main/group_server.py6
-rw-r--r--synapse/storage/databases/main/lock.py15
-rw-r--r--synapse/storage/databases/main/metrics.py133
-rw-r--r--synapse/storage/databases/main/profile.py8
-rw-r--r--synapse/storage/databases/main/purge_events.py4
-rw-r--r--synapse/storage/databases/main/push_rule.py6
-rw-r--r--synapse/storage/databases/main/registration.py2
-rw-r--r--synapse/storage/databases/main/room.py104
-rw-r--r--synapse/storage/databases/main/roommember.py15
-rw-r--r--synapse/storage/databases/main/stats.py299
-rw-r--r--synapse/storage/databases/main/stream.py6
-rw-r--r--synapse/storage/databases/main/tags.py2
-rw-r--r--synapse/storage/databases/main/ui_auth.py4
21 files changed, 486 insertions, 390 deletions
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 9f182c2a89..e2d1b758bd 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -48,9 +48,7 @@ def _make_exclusive_regex(
     ]
     if exclusive_user_regexes:
         exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
-        exclusive_user_pattern = re.compile(
-            exclusive_user_regex
-        )  # type: Optional[Pattern]
+        exclusive_user_pattern: Optional[Pattern] = re.compile(exclusive_user_regex)
     else:
         # We handle this case specially otherwise the constructed regex
         # will always match
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 50e7ddd735..c55508867d 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -203,9 +203,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_messages_for_device", delete_messages_for_device_txn
         )
 
-        log_kv(
-            {"message": "deleted {} messages for device".format(count), "count": count}
-        )
+        log_kv({"message": f"deleted {count} messages for device", "count": count})
 
         # Update the cache, ensuring that we only ever increase the value
         last_deleted_stream_id = self._last_device_delete_cache.get(
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 0e3dd4e9ca..78ae68ec68 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -247,7 +247,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
 
         txn.execute(sql, query_params)
 
-        result = {}  # type: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]
+        result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
         for (user_id, device_id, display_name, key_json) in txn:
             if include_deleted_devices:
                 deleted_devices.remove((user_id, device_id))
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..d39368c20e 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,8 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
 
+from prometheus_client import Gauge
+
 from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import RoomVersion
@@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 
+oldest_pdu_in_federation_staging = Gauge(
+    "synapse_federation_server_oldest_inbound_pdu_in_staging",
+    "The age in seconds since we received the oldest pdu in the federation staging area",
+)
+
+number_pdus_in_federation_queue = Gauge(
+    "synapse_federation_server_number_inbound_pdu_in_staging",
+    "The total number of events in the inbound federation staging",
+)
+
 logger = logging.getLogger(__name__)
 
 
@@ -50,9 +62,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             )
 
         # Cache of event ID to list of auth event IDs and their depths.
-        self._event_auth_cache = LruCache(
+        self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache(
             500000, "_event_auth_cache", size_callback=len
-        )  # type: LruCache[str, List[Tuple[str, int]]]
+        )
+
+        self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
 
     async def get_auth_chain(
         self, room_id: str, event_ids: Collection[str], include_given: bool = False
@@ -123,10 +137,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         initial_events = set(event_ids)
 
         # All the events that we've found that are reachable from the events.
-        seen_events = set()  # type: Set[str]
+        seen_events: Set[str] = set()
 
         # A map from chain ID to max sequence number of the given events.
-        event_chains = {}  # type: Dict[int, int]
+        event_chains: Dict[int, int] = {}
 
         sql = """
             SELECT event_id, chain_id, sequence_number
@@ -168,7 +182,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         """
 
         # A map from chain ID to max sequence number *reachable* from any event ID.
-        chains = {}  # type: Dict[int, int]
+        chains: Dict[int, int] = {}
 
         # Add all linked chains reachable from initial set of chains.
         for batch in batch_iter(event_chains, 1000):
@@ -339,14 +353,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         initial_events = set(state_sets[0]).union(*state_sets[1:])
 
         # Map from event_id -> (chain ID, seq no)
-        chain_info = {}  # type: Dict[str, Tuple[int, int]]
+        chain_info: Dict[str, Tuple[int, int]] = {}
 
         # Map from chain ID -> seq no -> event Id
-        chain_to_event = {}  # type: Dict[int, Dict[int, str]]
+        chain_to_event: Dict[int, Dict[int, str]] = {}
 
         # All the chains that we've found that are reachable from the state
         # sets.
-        seen_chains = set()  # type: Set[int]
+        seen_chains: Set[int] = set()
 
         sql = """
             SELECT event_id, chain_id, sequence_number
@@ -378,9 +392,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         # Corresponds to `state_sets`, except as a map from chain ID to max
         # sequence number reachable from the state set.
-        set_to_chain = []  # type: List[Dict[int, int]]
+        set_to_chain: List[Dict[int, int]] = []
         for state_set in state_sets:
-            chains = {}  # type: Dict[int, int]
+            chains: Dict[int, int] = {}
             set_to_chain.append(chains)
 
             for event_id in state_set:
@@ -432,7 +446,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         # Mapping from chain ID to the range of sequence numbers that should be
         # pulled from the database.
-        chain_to_gap = {}  # type: Dict[int, Tuple[int, int]]
+        chain_to_gap: Dict[int, Tuple[int, int]] = {}
 
         for chain_id in seen_chains:
             min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
@@ -541,7 +555,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         }
 
         # The sorted list of events whose auth chains we should walk.
-        search = []  # type: List[Tuple[int, str]]
+        search: List[Tuple[int, str]] = []
 
         # We need to get the depth of the initial events for sorting purposes.
         sql = """
@@ -564,7 +578,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         search.sort()
 
         # Map from event to its auth events
-        event_to_auth_events = {}  # type: Dict[str, Set[str]]
+        event_to_auth_events: Dict[str, Set[str]] = {}
 
         base_sql = """
             SELECT a.event_id, auth_id, depth
@@ -1075,16 +1089,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         self,
         origin: str,
         event_id: str,
-    ) -> None:
-        """Remove the given event from the staging area"""
-        await self.db_pool.simple_delete(
-            table="federation_inbound_events_staging",
-            keyvalues={
-                "origin": origin,
-                "event_id": event_id,
-            },
-            desc="remove_received_event_from_staging",
-        )
+    ) -> Optional[int]:
+        """Remove the given event from the staging area.
+
+        Returns:
+            The received_ts of the row that was deleted, if any.
+        """
+        if self.db_pool.engine.supports_returning:
+
+            def _remove_received_event_from_staging_txn(txn):
+                sql = """
+                    DELETE FROM federation_inbound_events_staging
+                    WHERE origin = ? AND event_id = ?
+                    RETURNING received_ts
+                """
+
+                txn.execute(sql, (origin, event_id))
+                return txn.fetchone()
+
+            row = await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+                db_autocommit=True,
+            )
+            if row is None:
+                return None
+
+            return row[0]
+
+        else:
+
+            def _remove_received_event_from_staging_txn(txn):
+                received_ts = self.db_pool.simple_select_one_onecol_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                    retcol="received_ts",
+                    allow_none=True,
+                )
+                self.db_pool.simple_delete_txn(
+                    txn,
+                    table="federation_inbound_events_staging",
+                    keyvalues={
+                        "origin": origin,
+                        "event_id": event_id,
+                    },
+                )
+
+                return received_ts
+
+            return await self.db_pool.runInteraction(
+                "remove_received_event_from_staging",
+                _remove_received_event_from_staging_txn,
+            )
 
     async def get_next_staged_event_id_for_room(
         self,
@@ -1147,6 +1207,42 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return origin, event
 
+    async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
+        """Get the room IDs of all events currently staged."""
+        return await self.db_pool.simple_select_onecol(
+            table="federation_inbound_events_staging",
+            keyvalues={},
+            retcol="DISTINCT room_id",
+            desc="get_all_rooms_with_staged_incoming_events",
+        )
+
+    @wrap_as_background_process("_get_stats_for_federation_staging")
+    async def _get_stats_for_federation_staging(self):
+        """Update the prometheus metrics for the inbound federation staging area."""
+
+        def _get_stats_for_federation_staging_txn(txn):
+            txn.execute(
+                "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
+            )
+            (count,) = txn.fetchone()
+
+            txn.execute(
+                "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
+            )
+
+            (received_ts,) = txn.fetchone()
+
+            age = self._clock.time_msec() - received_ts
+
+            return count, age
+
+        count, age = await self.db_pool.runInteraction(
+            "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
+        )
+
+        number_pdus_in_federation_queue.set(count)
+        oldest_pdu_in_federation_staging.set(age)
+
 
 class EventFederationStore(EventFederationWorkerStore):
     """Responsible for storing and serving up the various graphs associated
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index d1237c65cc..55caa6bbe7 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -759,7 +759,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # object because we might not have the same amount of rows in each of them. To do
         # this, we use a dict indexed on the user ID and room ID to make it easier to
         # populate.
-        summaries = {}  # type: Dict[Tuple[str, str], _EventPushSummary]
+        summaries: Dict[Tuple[str, str], _EventPushSummary] = {}
         for row in txn:
             summaries[(row[0], row[1])] = _EventPushSummary(
                 unread_count=row[2],
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 897fa06639..a396a201d4 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -109,10 +109,8 @@ class PersistEventsStore:
 
         # Ideally we'd move these ID gens here, unfortunately some other ID
         # generators are chained off them so doing so is a bit of a PITA.
-        self._backfill_id_gen = (
-            self.store._backfill_id_gen
-        )  # type: MultiWriterIdGenerator
-        self._stream_id_gen = self.store._stream_id_gen  # type: MultiWriterIdGenerator
+        self._backfill_id_gen: MultiWriterIdGenerator = self.store._backfill_id_gen
+        self._stream_id_gen: MultiWriterIdGenerator = self.store._stream_id_gen
 
         # This should only exist on instances that are configured to write
         assert (
@@ -221,7 +219,7 @@ class PersistEventsStore:
         Returns:
             Filtered event ids
         """
-        results = []  # type: List[str]
+        results: List[str] = []
 
         def _get_events_which_are_prevs_txn(txn, batch):
             sql = """
@@ -508,7 +506,7 @@ class PersistEventsStore:
         """
 
         # Map from event ID to chain ID/sequence number.
-        chain_map = {}  # type: Dict[str, Tuple[int, int]]
+        chain_map: Dict[str, Tuple[int, int]] = {}
 
         # Set of event IDs to calculate chain ID/seq numbers for.
         events_to_calc_chain_id_for = set(event_to_room_id)
@@ -817,8 +815,8 @@ class PersistEventsStore:
         #      new chain if the sequence number has already been allocated.
         #
 
-        existing_chains = set()  # type: Set[int]
-        tree = []  # type: List[Tuple[str, Optional[str]]]
+        existing_chains: Set[int] = set()
+        tree: List[Tuple[str, Optional[str]]] = []
 
         # We need to do this in a topologically sorted order as we want to
         # generate chain IDs/sequence numbers of an event's auth events before
@@ -848,7 +846,7 @@ class PersistEventsStore:
         )
         txn.execute(sql % (clause,), args)
 
-        chain_to_max_seq_no = {row[0]: row[1] for row in txn}  # type: Dict[Any, int]
+        chain_to_max_seq_no: Dict[Any, int] = {row[0]: row[1] for row in txn}
 
         # Allocate the new events chain ID/sequence numbers.
         #
@@ -858,8 +856,8 @@ class PersistEventsStore:
         # number of new chain IDs in one call, replacing all temporary
         # objects with real allocated chain IDs.
 
-        unallocated_chain_ids = set()  # type: Set[object]
-        new_chain_tuples = {}  # type: Dict[str, Tuple[Any, int]]
+        unallocated_chain_ids: Set[object] = set()
+        new_chain_tuples: Dict[str, Tuple[Any, int]] = {}
         for event_id, auth_event_id in tree:
             # If we reference an auth_event_id we fetch the allocated chain ID,
             # either from the existing `chain_map` or the newly generated
@@ -870,7 +868,7 @@ class PersistEventsStore:
                 if not existing_chain_id:
                     existing_chain_id = chain_map[auth_event_id]
 
-            new_chain_tuple = None  # type: Optional[Tuple[Any, int]]
+            new_chain_tuple: Optional[Tuple[Any, int]] = None
             if existing_chain_id:
                 # We found a chain ID/sequence number candidate, check its
                 # not already taken.
@@ -897,9 +895,9 @@ class PersistEventsStore:
         )
 
         # Map from potentially temporary chain ID to real chain ID
-        chain_id_to_allocated_map = dict(
+        chain_id_to_allocated_map: Dict[Any, int] = dict(
             zip(unallocated_chain_ids, newly_allocated_chain_ids)
-        )  # type: Dict[Any, int]
+        )
         chain_id_to_allocated_map.update((c, c) for c in existing_chains)
 
         return {
@@ -1175,9 +1173,9 @@ class PersistEventsStore:
         Returns:
             list[(EventBase, EventContext)]: filtered list
         """
-        new_events_and_contexts = (
-            OrderedDict()
-        )  # type: OrderedDict[str, Tuple[EventBase, EventContext]]
+        new_events_and_contexts: OrderedDict[
+            str, Tuple[EventBase, EventContext]
+        ] = OrderedDict()
         for event, context in events_and_contexts:
             prev_event_context = new_events_and_contexts.get(event.event_id)
             if prev_event_context:
@@ -1205,7 +1203,7 @@ class PersistEventsStore:
                 we are persisting
             backfilled (bool): True if the events were backfilled
         """
-        depth_updates = {}  # type: Dict[str, int]
+        depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
             txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
@@ -1580,11 +1578,11 @@ class PersistEventsStore:
         # invalidate the cache for the redacted event
         txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
 
-        self.db_pool.simple_insert_txn(
+        self.db_pool.simple_upsert_txn(
             txn,
             table="redactions",
+            keyvalues={"event_id": event.event_id},
             values={
-                "event_id": event.event_id,
                 "redacts": event.redacts,
                 "received_ts": self._clock.time_msec(),
             },
@@ -1885,7 +1883,7 @@ class PersistEventsStore:
                 ),
             )
 
-            room_to_event_ids = {}  # type: Dict[str, List[str]]
+            room_to_event_ids: Dict[str, List[str]] = {}
             for e, _ in events_and_contexts:
                 room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
 
@@ -2012,10 +2010,6 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
-        events_by_room = {}  # type: Dict[str, List[EventBase]]
-        for ev in events:
-            events_by_room.setdefault(ev.room_id, []).append(ev)
-
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index da3a7df27b..6fcb2b8353 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,13 +29,18 @@ from synapse.types import JsonDict
 logger = logging.getLogger(__name__)
 
 
-_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
+_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
     # there should be no leftover rows without a stream_ordering2, but just in case...
     "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
-    # finally, we can drop the rule and switch the columns
+    # now we can drop the rule and switch the columns
     "DROP RULE populate_stream_ordering2 ON events",
     "ALTER TABLE events DROP COLUMN stream_ordering",
     "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
+    # ... and finally, rename the indexes into place for consistency with sqlite
+    "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
+    "ALTER INDEX events_order_room2 RENAME TO events_order_room",
+    "ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
+    "ALTER INDEX events_ts2 RENAME TO events_ts",
 )
 
 
@@ -45,6 +50,10 @@ class _BackgroundUpdates:
     DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
     POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
     INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+    INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
+    INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
+    INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
+    INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
     REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
 
 
@@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._purged_chain_cover_index,
         )
 
+        ################################################################################
+
         # bg updates for replacing stream_ordering with a BIGINT
         # (these only run on postgres.)
+
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
             self._background_populate_stream_ordering2,
         )
+        # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
         self.db_pool.updates.register_background_index_update(
             _BackgroundUpdates.INDEX_STREAM_ORDERING2,
             index_name="events_stream_ordering",
@@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             columns=["stream_ordering2"],
             unique=True,
         )
+        # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
+            index_name="event_contains_url_index2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+            where_clause="contains_url = true AND outlier = false",
+        )
+        # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
+            index_name="events_order_room2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+        )
+        # CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
+            index_name="events_room_stream2",
+            table="events",
+            columns=["room_id", "stream_ordering2"],
+        )
+        # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
+            index_name="events_ts2",
+            table="events",
+            columns=["origin_server_ts", "stream_ordering2"],
+        )
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
             self._background_replace_stream_ordering_column,
         )
 
+        ################################################################################
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -916,9 +960,9 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         event_to_types = {row[0]: (row[1], row[2]) for row in rows}
 
         # Calculate the new last position we've processed up to.
-        new_last_depth = rows[-1][3] if rows else last_depth  # type: int
-        new_last_stream = rows[-1][4] if rows else last_stream  # type: int
-        new_last_room_id = rows[-1][5] if rows else ""  # type: str
+        new_last_depth: int = rows[-1][3] if rows else last_depth
+        new_last_stream: int = rows[-1][4] if rows else last_stream
+        new_last_room_id: str = rows[-1][5] if rows else ""
 
         # Map from room_id to last depth/stream_ordering processed for the room,
         # excluding the last room (which we're likely still processing). We also
@@ -945,7 +989,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             retcols=("event_id", "auth_id"),
         )
 
-        event_to_auth_chain = {}  # type: Dict[str, List[str]]
+        event_to_auth_chain: Dict[str, List[str]] = {}
         for row in auth_events:
             event_to_auth_chain.setdefault(row["event_id"], []).append(row["auth_id"])
 
@@ -1098,10 +1142,20 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
 
         def process(txn: Cursor) -> None:
-            for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
+            for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
                 logger.info("completing stream_ordering migration: %s", sql)
                 txn.execute(sql)
 
+        # ANALYZE the new column to build stats on it, to encourage PostgreSQL to use the
+        # indexes on it.
+        # We need to pass execute a dummy function to handle the txn's result otherwise
+        # it tries to call fetchall() on it and fails because there's no result to fetch.
+        await self.db_pool.execute(
+            "background_analyze_new_stream_ordering_column",
+            lambda txn: None,
+            "ANALYZE events(stream_ordering2)",
+        )
+
         await self.db_pool.runInteraction(
             "_background_replace_stream_ordering_column", process
         )
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 86fd79f3a6..5ff29530bb 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1468,10 +1468,10 @@ class EventsWorkerStore(SQLBaseStore):
         # we need to make sure that, for every stream id in the results, we get *all*
         # the rows with that stream id.
 
-        rows = await self.db_pool.runInteraction(
+        rows: List[Tuple] = await self.db_pool.runInteraction(
             "get_all_updated_current_state_deltas",
             get_all_updated_current_state_deltas_txn,
-        )  # type: List[Tuple]
+        )
 
         # if we've got fewer rows than the limit, we're good
         if len(rows) < target_row_count:
@@ -1572,7 +1572,7 @@ class EventsWorkerStore(SQLBaseStore):
         """
 
         mapping = {}
-        txn_id_to_event = {}  # type: Dict[Tuple[str, int, str], str]
+        txn_id_to_event: Dict[Tuple[str, int, str], str] = {}
 
         for event in events:
             token_id = getattr(event.internal_metadata, "token_id", None)
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index 66ad363bfb..e70d3649ff 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -27,8 +27,11 @@ from synapse.util import json_encoder
 _DEFAULT_CATEGORY_ID = ""
 _DEFAULT_ROLE_ID = ""
 
+
 # A room in a group.
-_RoomInGroup = TypedDict("_RoomInGroup", {"room_id": str, "is_public": bool})
+class _RoomInGroup(TypedDict):
+    room_id: str
+    is_public: bool
 
 
 class GroupServerWorkerStore(SQLBaseStore):
@@ -92,6 +95,7 @@ class GroupServerWorkerStore(SQLBaseStore):
               "is_public": False                    # Whether this is a public room or not
             }
         """
+
         # TODO: Pagination
 
         def _get_rooms_in_group_txn(txn):
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index e76188328c..774861074c 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -310,14 +310,25 @@ class Lock:
         _excinst: Optional[BaseException],
         _exctb: Optional[TracebackType],
     ) -> bool:
+        await self.release()
+
+        return False
+
+    async def release(self) -> None:
+        """Release the lock.
+
+        This is automatically called when using the lock as a context manager.
+        """
+
+        if self._dropped:
+            return
+
         if self._looping_call.running:
             self._looping_call.stop()
 
         await self._store._drop_lock(self._lock_name, self._lock_key, self._token)
         self._dropped = True
 
-        return False
-
     def __del__(self) -> None:
         if not self._dropped:
             # We should not be dropped without the lock being released (unless
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index c3f551d377..dc0bbc56ac 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -316,11 +316,140 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
 
         return await self.db_pool.runInteraction("count_r30_users", _count_r30_users)
 
+    async def count_r30v2_users(self) -> Dict[str, int]:
+        """
+        Counts the number of 30 day retained users, defined as users that:
+         - Appear more than once in the past 60 days
+         - Have more than 30 days between the most and least recent appearances that
+           occurred in the past 60 days.
+
+        (This is the second version of this metric, hence R30'v2')
+
+        Returns:
+             A mapping from client type to the number of 30-day retained users for that client.
+
+             The dict keys are:
+              - "all" (a combined number of users across any and all clients)
+              - "android" (Element Android)
+              - "ios" (Element iOS)
+              - "electron" (Element Desktop)
+              - "web" (any web application -- it's not possible to distinguish Element Web here)
+        """
+
+        def _count_r30v2_users(txn):
+            thirty_days_in_secs = 86400 * 30
+            now = int(self._clock.time())
+            sixty_days_ago_in_secs = now - 2 * thirty_days_in_secs
+            one_day_from_now_in_secs = now + 86400
+
+            # This is the 'per-platform' count.
+            sql = """
+                SELECT
+                    client_type,
+                    count(client_type)
+                FROM
+                    (
+                        SELECT
+                            user_id,
+                            CASE
+                                WHEN
+                                    LOWER(user_agent) LIKE '%%riot%%' OR
+                                    LOWER(user_agent) LIKE '%%element%%'
+                                    THEN CASE
+                                        WHEN
+                                            LOWER(user_agent) LIKE '%%electron%%'
+                                            THEN 'electron'
+                                        WHEN
+                                            LOWER(user_agent) LIKE '%%android%%'
+                                            THEN 'android'
+                                        WHEN
+                                            LOWER(user_agent) LIKE '%%ios%%'
+                                            THEN 'ios'
+                                        ELSE 'unknown'
+                                    END
+                                WHEN
+                                    LOWER(user_agent) LIKE '%%mozilla%%' OR
+                                    LOWER(user_agent) LIKE '%%gecko%%'
+                                    THEN 'web'
+                                ELSE 'unknown'
+                            END as client_type
+                        FROM
+                            user_daily_visits
+                        WHERE
+                            timestamp > ?
+                            AND
+                            timestamp < ?
+                        GROUP BY
+                            user_id,
+                            client_type
+                        HAVING
+                            max(timestamp) - min(timestamp) > ?
+                    ) AS temp
+                GROUP BY
+                    client_type
+                ;
+            """
+
+            # We initialise all the client types to zero, so we get an explicit
+            # zero if they don't appear in the query results
+            results = {"ios": 0, "android": 0, "web": 0, "electron": 0}
+            txn.execute(
+                sql,
+                (
+                    sixty_days_ago_in_secs * 1000,
+                    one_day_from_now_in_secs * 1000,
+                    thirty_days_in_secs * 1000,
+                ),
+            )
+
+            for row in txn:
+                if row[0] == "unknown":
+                    continue
+                results[row[0]] = row[1]
+
+            # This is the 'all users' count.
+            sql = """
+                SELECT COUNT(*) FROM (
+                    SELECT
+                        1
+                    FROM
+                        user_daily_visits
+                    WHERE
+                        timestamp > ?
+                        AND
+                        timestamp < ?
+                    GROUP BY
+                        user_id
+                    HAVING
+                        max(timestamp) - min(timestamp) > ?
+                ) AS r30_users
+            """
+
+            txn.execute(
+                sql,
+                (
+                    sixty_days_ago_in_secs * 1000,
+                    one_day_from_now_in_secs * 1000,
+                    thirty_days_in_secs * 1000,
+                ),
+            )
+            row = txn.fetchone()
+            if row is None:
+                results["all"] = 0
+            else:
+                results["all"] = row[0]
+
+            return results
+
+        return await self.db_pool.runInteraction(
+            "count_r30v2_users", _count_r30v2_users
+        )
+
     def _get_start_of_day(self):
         """
         Returns millisecond unixtime for start of UTC day.
         """
-        now = time.gmtime()
+        now = time.gmtime(self._clock.time())
         today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
         return today_start * 1000
 
@@ -352,7 +481,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
                     ) udv
                     ON u.user_id = udv.user_id AND u.device_id=udv.device_id
                     INNER JOIN users ON users.name=u.user_id
-                    WHERE last_seen > ? AND last_seen <= ?
+                    WHERE ? <= last_seen AND last_seen < ?
                     AND udv.timestamp IS NULL AND users.is_guest=0
                     AND users.appservice_id IS NULL
                     GROUP BY u.user_id, u.device_id
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 9b4e95e134..ba7075caa5 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -73,20 +73,20 @@ class ProfileWorkerStore(SQLBaseStore):
     async def set_profile_displayname(
         self, user_localpart: str, new_displayname: Optional[str]
     ) -> None:
-        await self.db_pool.simple_update_one(
+        await self.db_pool.simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"displayname": new_displayname},
+            values={"displayname": new_displayname},
             desc="set_profile_displayname",
         )
 
     async def set_profile_avatar_url(
         self, user_localpart: str, new_avatar_url: Optional[str]
     ) -> None:
-        await self.db_pool.simple_update_one(
+        await self.db_pool.simple_upsert(
             table="profiles",
             keyvalues={"user_id": user_localpart},
-            updatevalues={"avatar_url": new_avatar_url},
+            values={"avatar_url": new_avatar_url},
             desc="set_profile_avatar_url",
         )
 
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 7fb7780d0f..664c65dac5 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -115,7 +115,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         logger.info("[purge] looking for events to delete")
 
         should_delete_expr = "state_key IS NULL"
-        should_delete_params = ()  # type: Tuple[Any, ...]
+        should_delete_params: Tuple[Any, ...] = ()
         if not delete_local_events:
             should_delete_expr += " AND event_id NOT LIKE ?"
 
@@ -215,6 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "event_relations",
             "event_search",
             "rejections",
+            "redactions",
         ):
             logger.info("[purge] removing events from %s", table)
 
@@ -392,7 +393,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "room_memberships",
             "room_stats_state",
             "room_stats_current",
-            "room_stats_historical",
             "room_stats_earliest_token",
             "rooms",
             "stream_ordering_to_exterm",
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index db52176337..a7fb8cd848 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -79,9 +79,9 @@ class PushRulesWorkerStore(
         super().__init__(database, db_conn, hs)
 
         if hs.config.worker.worker_app is None:
-            self._push_rules_stream_id_gen = StreamIdGenerator(
-                db_conn, "push_rules_stream", "stream_id"
-            )  # type: Union[StreamIdGenerator, SlavedIdTracker]
+            self._push_rules_stream_id_gen: Union[
+                StreamIdGenerator, SlavedIdTracker
+            ] = StreamIdGenerator(db_conn, "push_rules_stream", "stream_id")
         else:
             self._push_rules_stream_id_gen = SlavedIdTracker(
                 db_conn, "push_rules_stream", "stream_id"
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index e31c5864ac..6ad1a0cf7f 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1744,7 +1744,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
 
             items = keyvalues.items()
             where_clause = " AND ".join(k + " = ?" for k, _ in items)
-            values = [v for _, v in items]  # type: List[Union[str, int]]
+            values: List[Union[str, int]] = [v for _, v in items]
             # Conveniently, refresh_tokens and access_tokens both use the user_id and device_id fields. Only caveat
             # is the `except_token_id` param that is tricky to get right, so for now we're just using the same where
             # clause and values before we handle that. This seems to be only used in the "set password" handler.
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 9f0d64a325..6ddafe5434 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -25,6 +25,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.search import SearchStore
+from synapse.storage.types import Cursor
 from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -1022,10 +1023,22 @@ class RoomWorkerStore(SQLBaseStore):
         )
 
 
-class RoomBackgroundUpdateStore(SQLBaseStore):
+class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
     ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
+    POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
+    REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
+
+
+_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
+    "DROP TRIGGER populate_min_depth2_trigger ON room_depth",
+    "DROP FUNCTION populate_min_depth2()",
+    "ALTER TABLE room_depth DROP COLUMN min_depth",
+    "ALTER TABLE room_depth RENAME COLUMN min_depth2 TO min_depth",
+)
+
 
+class RoomBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
@@ -1037,15 +1050,25 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
+            _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
             self._remove_tombstoned_rooms_from_directory,
         )
 
         self.db_pool.updates.register_background_update_handler(
-            self.ADD_ROOMS_ROOM_VERSION_COLUMN,
+            _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
             self._background_add_rooms_room_version_column,
         )
 
+        # BG updates to change the type of room_depth.min_depth
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+            self._background_populate_room_depth_min_depth2,
+        )
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+            self._background_replace_room_depth_min_depth,
+        )
+
     async def _background_insert_retention(self, progress, batch_size):
         """Retrieves a list of all rooms within a range and inserts an entry for each of
         them into the room_retention table.
@@ -1164,7 +1187,9 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
                 new_last_room_id = room_id
 
             self.db_pool.updates._background_update_progress_txn(
-                txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
+                txn,
+                _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN,
+                {"room_id": new_last_room_id},
             )
 
             return False
@@ -1176,7 +1201,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         if end:
             await self.db_pool.updates._end_background_update(
-                self.ADD_ROOMS_ROOM_VERSION_COLUMN
+                _BackgroundUpdates.ADD_ROOMS_ROOM_VERSION_COLUMN
             )
 
         return batch_size
@@ -1215,7 +1240,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         if not rooms:
             await self.db_pool.updates._end_background_update(
-                self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
+                _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
             )
             return 0
 
@@ -1224,7 +1249,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
             await self.set_room_is_public(room_id, False)
 
         await self.db_pool.updates._background_update_progress(
-            self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
+            _BackgroundUpdates.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
         )
 
         return len(rooms)
@@ -1268,6 +1293,71 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         return max_ordering is None
 
+    async def _background_populate_room_depth_min_depth2(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Populate room_depth.min_depth2
+
+        This is to deal with the fact that min_depth was initially created as a
+        32-bit integer field.
+        """
+
+        def process(txn: Cursor) -> int:
+            last_room = progress.get("last_room", "")
+            txn.execute(
+                """
+                UPDATE room_depth SET min_depth2=min_depth
+                WHERE room_id IN (
+                   SELECT room_id FROM room_depth WHERE room_id > ?
+                   ORDER BY room_id LIMIT ?
+                )
+                RETURNING room_id;
+                """,
+                (last_room, batch_size),
+            )
+            row_count = txn.rowcount
+            if row_count == 0:
+                return 0
+            last_room = max(row[0] for row in txn)
+            logger.info("populated room_depth up to %s", last_room)
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
+                {"last_room": last_room},
+            )
+            return row_count
+
+        result = await self.db_pool.runInteraction(
+            "_background_populate_min_depth2", process
+        )
+
+        if result != 0:
+            return result
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2
+        )
+        return 0
+
+    async def _background_replace_room_depth_min_depth(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Drop the old 'min_depth' column and rename 'min_depth2' into its place."""
+
+        def process(txn: Cursor) -> None:
+            for sql in _REPLACE_ROOM_DEPTH_SQL_COMMANDS:
+                logger.info("completing room_depth migration: %s", sql)
+                txn.execute(sql)
+
+        await self.db_pool.runInteraction("_background_replace_room_depth", process)
+
+        await self.db_pool.updates._end_background_update(
+            _BackgroundUpdates.REPLACE_ROOM_DEPTH_MIN_DEPTH,
+        )
+
+        return 0
+
 
 class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4eebe5257a..b6cb6836c2 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -649,7 +649,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             event_to_memberships = await self._get_joined_profiles_from_event_ids(
                 missing_member_event_ids
             )
-            users_in_room.update((row for row in event_to_memberships.values() if row))
+            users_in_room.update(row for row in event_to_memberships.values() if row)
 
         if event is not None and event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
@@ -703,13 +703,22 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cached(max_entries=10000)
     async def is_host_joined(self, room_id: str, host: str) -> bool:
+        return await self._check_host_room_membership(room_id, host, Membership.JOIN)
+
+    @cached(max_entries=10000)
+    async def is_host_invited(self, room_id: str, host: str) -> bool:
+        return await self._check_host_room_membership(room_id, host, Membership.INVITE)
+
+    async def _check_host_room_membership(
+        self, room_id: str, host: str, membership: str
+    ) -> bool:
         if "%" in host or "_" in host:
             raise Exception("Invalid host name")
 
         sql = """
             SELECT state_key FROM current_state_events AS c
             INNER JOIN room_memberships AS m USING (event_id)
-            WHERE m.membership = 'join'
+            WHERE m.membership = ?
                 AND type = 'm.room.member'
                 AND c.room_id = ?
                 AND state_key LIKE ?
@@ -722,7 +731,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         like_clause = "%:" + host
 
         rows = await self.db_pool.execute(
-            "is_host_joined", None, sql, room_id, like_clause
+            "is_host_joined", None, sql, membership, room_id, like_clause
         )
 
         if not rows:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 82a1833509..59d67c255b 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -26,7 +26,6 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import StoreError
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.state_deltas import StateDeltasStore
-from synapse.storage.engines import PostgresEngine
 from synapse.types import JsonDict
 from synapse.util.caches.descriptors import cached
 
@@ -49,14 +48,6 @@ ABSOLUTE_STATS_FIELDS = {
     "user": ("joined_rooms",),
 }
 
-# these fields are per-timeslice and so should be reset to 0 upon a new slice
-# You can draw these stats on a histogram.
-# Example: number of events sent locally during a time slice
-PER_SLICE_FIELDS = {
-    "room": ("total_events", "total_event_bytes"),
-    "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
-}
-
 TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
 # these are the tables (& ID columns) which contain our actual subjects
@@ -106,7 +97,6 @@ class StatsStore(StateDeltasStore):
         self.server_name = hs.hostname
         self.clock = self.hs.get_clock()
         self.stats_enabled = hs.config.stats_enabled
-        self.stats_bucket_size = hs.config.stats_bucket_size
 
         self.stats_delta_processing_lock = DeferredLock()
 
@@ -122,22 +112,6 @@ class StatsStore(StateDeltasStore):
         self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
         self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
 
-    def quantise_stats_time(self, ts):
-        """
-        Quantises a timestamp to be a multiple of the bucket size.
-
-        Args:
-            ts (int): the timestamp to quantise, in milliseconds since the Unix
-                Epoch
-
-        Returns:
-            int: a timestamp which
-              - is divisible by the bucket size;
-              - is no later than `ts`; and
-              - is the largest such timestamp.
-        """
-        return (ts // self.stats_bucket_size) * self.stats_bucket_size
-
     async def _populate_stats_process_users(self, progress, batch_size):
         """
         This is a background update which regenerates statistics for users.
@@ -288,56 +262,6 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    async def get_statistics_for_subject(
-        self, stats_type: str, stats_id: str, start: str, size: int = 100
-    ) -> List[dict]:
-        """
-        Get statistics for a given subject.
-
-        Args:
-            stats_type: The type of subject
-            stats_id: The ID of the subject (e.g. room_id or user_id)
-            start: Pagination start. Number of entries, not timestamp.
-            size: How many entries to return.
-
-        Returns:
-            A list of dicts, where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
-        """
-        return await self.db_pool.runInteraction(
-            "get_statistics_for_subject",
-            self._get_statistics_for_subject_txn,
-            stats_type,
-            stats_id,
-            start,
-            size,
-        )
-
-    def _get_statistics_for_subject_txn(
-        self, txn, stats_type, stats_id, start, size=100
-    ):
-        """
-        Transaction-bound version of L{get_statistics_for_subject}.
-        """
-
-        table, id_col = TYPE_TO_TABLE[stats_type]
-        selected_columns = list(
-            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
-        )
-
-        slice_list = self.db_pool.simple_select_list_paginate_txn(
-            txn,
-            table + "_historical",
-            "end_ts",
-            start,
-            size,
-            retcols=selected_columns + ["bucket_size", "end_ts"],
-            keyvalues={id_col: stats_id},
-            order_direction="DESC",
-        )
-
-        return slice_list
-
     @cached()
     async def get_earliest_token_for_stats(
         self, stats_type: str, id: str
@@ -451,14 +375,10 @@ class StatsStore(StateDeltasStore):
 
         table, id_col = TYPE_TO_TABLE[stats_type]
 
-        quantised_ts = self.quantise_stats_time(int(ts))
-        end_ts = quantised_ts + self.stats_bucket_size
-
         # Lets be paranoid and check that all the given field names are known
         abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
-        slice_field_names = PER_SLICE_FIELDS[stats_type]
         for field in chain(fields.keys(), absolute_field_overrides.keys()):
-            if field not in abs_field_names and field not in slice_field_names:
+            if field not in abs_field_names:
                 # guard against potential SQL injection dodginess
                 raise ValueError(
                     "%s is not a recognised field"
@@ -491,20 +411,6 @@ class StatsStore(StateDeltasStore):
             additive_relatives=deltas_of_absolute_fields,
         )
 
-        per_slice_additive_relatives = {
-            key: fields.get(key, 0) for key in slice_field_names
-        }
-        self._upsert_copy_from_table_with_additive_relatives_txn(
-            txn=txn,
-            into_table=table + "_historical",
-            keyvalues={id_col: stats_id},
-            extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
-            extra_dst_keyvalues={"end_ts": end_ts},
-            additive_relatives=per_slice_additive_relatives,
-            src_table=table + "_current",
-            copy_columns=abs_field_names,
-        )
-
     def _upsert_with_additive_relatives_txn(
         self, txn, table, keyvalues, absolutes, additive_relatives
     ):
@@ -528,7 +434,7 @@ class StatsStore(StateDeltasStore):
             ]
 
             relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
                 % {"table": table, "field": field}
                 for field in additive_relatives.keys()
             ]
@@ -568,205 +474,13 @@ class StatsStore(StateDeltasStore):
                 self.db_pool.simple_insert_txn(txn, table, merged_dict)
             else:
                 for (key, val) in additive_relatives.items():
-                    current_row[key] += val
+                    if current_row[key] is None:
+                        current_row[key] = val
+                    else:
+                        current_row[key] += val
                 current_row.update(absolutes)
                 self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
 
-    def _upsert_copy_from_table_with_additive_relatives_txn(
-        self,
-        txn,
-        into_table,
-        keyvalues,
-        extra_dst_keyvalues,
-        extra_dst_insvalues,
-        additive_relatives,
-        src_table,
-        copy_columns,
-    ):
-        """Updates the historic stats table with latest updates.
-
-        This involves copying "absolute" fields from the `_current` table, and
-        adding relative fields to any existing values.
-
-        Args:
-             txn: Transaction
-             into_table (str): The destination table to UPSERT the row into
-             keyvalues (dict[str, any]): Row-identifying key values
-             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
-                for `into_table`.
-             extra_dst_insvalues (dict[str, any]): Additional values to insert
-                on new row creation for `into_table`.
-             additive_relatives (dict[str, any]): Fields that will be added onto
-                if existing row present. (Must be disjoint from copy_columns.)
-             src_table (str): The source table to copy from
-             copy_columns (iterable[str]): The list of columns to copy
-        """
-        if self.database_engine.can_native_upsert:
-            ins_columns = chain(
-                keyvalues,
-                copy_columns,
-                additive_relatives,
-                extra_dst_keyvalues,
-                extra_dst_insvalues,
-            )
-            sel_exprs = chain(
-                keyvalues,
-                copy_columns,
-                (
-                    "?"
-                    for _ in chain(
-                        additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
-                    )
-                ),
-            )
-            keyvalues_where = ("%s = ?" % f for f in keyvalues)
-
-            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
-            sets_ar = (
-                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
-                for f in additive_relatives
-            )
-
-            sql = """
-                INSERT INTO %(into_table)s (%(ins_columns)s)
-                SELECT %(sel_exprs)s
-                FROM %(src_table)s
-                WHERE %(keyvalues_where)s
-                ON CONFLICT (%(keyvalues)s)
-                DO UPDATE SET %(sets)s
-            """ % {
-                "into_table": into_table,
-                "ins_columns": ", ".join(ins_columns),
-                "sel_exprs": ", ".join(sel_exprs),
-                "keyvalues_where": " AND ".join(keyvalues_where),
-                "src_table": src_table,
-                "keyvalues": ", ".join(
-                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
-                ),
-                "sets": ", ".join(chain(sets_cc, sets_ar)),
-            }
-
-            qargs = list(
-                chain(
-                    additive_relatives.values(),
-                    extra_dst_keyvalues.values(),
-                    extra_dst_insvalues.values(),
-                    keyvalues.values(),
-                )
-            )
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, into_table)
-            src_row = self.db_pool.simple_select_one_txn(
-                txn, src_table, keyvalues, copy_columns
-            )
-            all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
-            dest_current_row = self.db_pool.simple_select_one_txn(
-                txn,
-                into_table,
-                keyvalues=all_dest_keyvalues,
-                retcols=list(chain(additive_relatives.keys(), copy_columns)),
-                allow_none=True,
-            )
-
-            if dest_current_row is None:
-                merged_dict = {
-                    **keyvalues,
-                    **extra_dst_keyvalues,
-                    **extra_dst_insvalues,
-                    **src_row,
-                    **additive_relatives,
-                }
-                self.db_pool.simple_insert_txn(txn, into_table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    src_row[key] = dest_current_row[key] + val
-                self.db_pool.simple_update_txn(
-                    txn, into_table, all_dest_keyvalues, src_row
-                )
-
-    async def get_changes_room_total_events_and_bytes(
-        self, min_pos: int, max_pos: int
-    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
-        """Fetches the counts of events in the given range of stream IDs.
-
-        Args:
-            min_pos
-            max_pos
-
-        Returns:
-            Mapping of room ID to field changes.
-        """
-
-        return await self.db_pool.runInteraction(
-            "stats_incremental_total_events_and_bytes",
-            self.get_changes_room_total_events_and_bytes_txn,
-            min_pos,
-            max_pos,
-        )
-
-    def get_changes_room_total_events_and_bytes_txn(
-        self, txn, low_pos: int, high_pos: int
-    ) -> Tuple[Dict[str, Dict[str, int]], Dict[str, Dict[str, int]]]:
-        """Gets the total_events and total_event_bytes counts for rooms and
-        senders, in a range of stream_orderings (including backfilled events).
-
-        Args:
-            txn
-            low_pos: Low stream ordering
-            high_pos: High stream ordering
-
-        Returns:
-            The room and user deltas for total_events/total_event_bytes in the
-            format of `stats_id` -> fields
-        """
-
-        if low_pos >= high_pos:
-            # nothing to do here.
-            return {}, {}
-
-        if isinstance(self.database_engine, PostgresEngine):
-            new_bytes_expression = "OCTET_LENGTH(json)"
-        else:
-            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
-
-        sql = """
-            SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.room_id
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        room_deltas = {
-            room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for room_id, new_events, new_bytes in txn
-        }
-
-        sql = """
-            SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
-            FROM events INNER JOIN event_json USING (event_id)
-            WHERE (? < stream_ordering AND stream_ordering <= ?)
-                OR (? <= stream_ordering AND stream_ordering <= ?)
-            GROUP BY events.sender
-        """ % (
-            new_bytes_expression,
-        )
-
-        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
-
-        user_deltas = {
-            user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
-            for user_id, new_events, new_bytes in txn
-            if self.hs.is_mine_id(user_id)
-        }
-
-        return room_deltas, user_deltas
-
     async def _calculate_and_set_initial_state_for_room(
         self, room_id: str
     ) -> Tuple[dict, dict, int]:
@@ -893,6 +607,7 @@ class StatsStore(StateDeltasStore):
                 "invited_members": membership_counts.get(Membership.INVITE, 0),
                 "left_members": membership_counts.get(Membership.LEAVE, 0),
                 "banned_members": membership_counts.get(Membership.BAN, 0),
+                "knocked_members": membership_counts.get(Membership.KNOCK, 0),
                 "local_users_in_room": len(local_users_in_room),
             },
         )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 7581c7d3ff..959f13de47 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1085,9 +1085,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
         # stream token (as returned by `RoomStreamToken.get_max_stream_pos`) and
         # then filtering the results.
         if from_token.topological is not None:
-            from_bound = (
-                from_token.as_historical_tuple()
-            )  # type: Tuple[Optional[int], int]
+            from_bound: Tuple[Optional[int], int] = from_token.as_historical_tuple()
         elif direction == "b":
             from_bound = (
                 None,
@@ -1099,7 +1097,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
                 from_token.stream,
             )
 
-        to_bound = None  # type: Optional[Tuple[Optional[int], int]]
+        to_bound: Optional[Tuple[Optional[int], int]] = None
         if to_token:
             if to_token.topological is not None:
                 to_bound = to_token.as_historical_tuple()
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index 1d62c6140f..f93ff0a545 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -42,7 +42,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"]
         )
 
-        tags_by_room = {}  # type: Dict[str, Dict[str, JsonDict]]
+        tags_by_room: Dict[str, Dict[str, JsonDict]] = {}
         for row in rows:
             room_tags = tags_by_room.setdefault(row["room_id"], {})
             room_tags[row["tag"]] = db_to_json(row["content"])
diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py
index 22c05cdde7..38bfdf5dad 100644
--- a/synapse/storage/databases/main/ui_auth.py
+++ b/synapse/storage/databases/main/ui_auth.py
@@ -224,12 +224,12 @@ class UIAuthWorkerStore(SQLBaseStore):
         self, txn: LoggingTransaction, session_id: str, key: str, value: Any
     ):
         # Get the current value.
-        result = self.db_pool.simple_select_one_txn(
+        result: Dict[str, Any] = self.db_pool.simple_select_one_txn(  # type: ignore
             txn,
             table="ui_auth_sessions",
             keyvalues={"session_id": session_id},
             retcols=("serverdict",),
-        )  # type: Dict[str, Any]  # type: ignore
+        )
 
         # Update it and add it back to the database.
         serverdict = db_to_json(result["serverdict"])