summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13589.feature1
-rw-r--r--changelog.d/13749.bugfix1
-rw-r--r--changelog.d/13753.misc1
-rw-r--r--changelog.d/13780.misc1
-rw-r--r--changelog.d/13788.misc1
-rw-r--r--changelog.d/13789.bugfix1
-rw-r--r--changelog.d/13795.misc1
-rw-r--r--changelog.d/13798.misc1
-rw-r--r--changelog.d/13802.misc1
-rw-r--r--synapse/api/auth.py9
-rw-r--r--synapse/federation/transport/server/federation.py3
-rw-r--r--synapse/handlers/device.py11
-rw-r--r--synapse/handlers/e2e_keys.py26
-rw-r--r--synapse/handlers/federation_event.py7
-rw-r--r--synapse/handlers/message.py10
-rw-r--r--synapse/handlers/room_member.py10
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py29
-rw-r--r--synapse/storage/background_updates.py6
-rw-r--r--synapse/storage/controllers/persist_events.py20
-rw-r--r--synapse/storage/databases/main/event_federation.py45
-rw-r--r--synapse/storage/databases/main/event_push_actions.py122
-rw-r--r--synapse/storage/databases/main/events.py36
-rw-r--r--synapse/storage/databases/main/receipts.py20
-rw-r--r--synapse/storage/databases/main/roommember.py17
-rw-r--r--synapse/storage/schema/__init__.py8
-rw-r--r--synapse/storage/schema/main/delta/72/06thread_notifications.sql30
-rw-r--r--synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres30
-rw-r--r--synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite70
-rw-r--r--synapse/storage/schema/main/delta/72/08thread_receipts.sql20
-rw-r--r--synapse/storage/schema/main/delta/72/09partial_indices.sql.sqlite56
-rw-r--r--synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql29
-rw-r--r--synapse/storage/schema/main/delta/73/02rename_opentelemtetry_tracing_context.sql (renamed from synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql)0
-rw-r--r--synapse/storage/schema/state/delta/30/state_stream.sql37
-rw-r--r--synapse/util/caches/__init__.py3
-rw-r--r--synapse/util/metrics.py10
-rw-r--r--tests/handlers/test_e2e_keys.py8
-rw-r--r--tests/handlers/test_federation_event.py222
-rw-r--r--tests/replication/slave/storage/test_events.py1
38 files changed, 783 insertions, 121 deletions
diff --git a/changelog.d/13589.feature b/changelog.d/13589.feature
new file mode 100644
index 0000000000..78fa1ddb52
--- /dev/null
+++ b/changelog.d/13589.feature
@@ -0,0 +1 @@
+Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
diff --git a/changelog.d/13749.bugfix b/changelog.d/13749.bugfix
new file mode 100644
index 0000000000..8ffafec07b
--- /dev/null
+++ b/changelog.d/13749.bugfix
@@ -0,0 +1 @@
+Fix a long standing bug where device lists would remain cached when remote users left and rejoined the last room shared with the local homeserver.
diff --git a/changelog.d/13753.misc b/changelog.d/13753.misc
new file mode 100644
index 0000000000..63de2eb9f9
--- /dev/null
+++ b/changelog.d/13753.misc
@@ -0,0 +1 @@
+Prepatory work for storing thread IDs for notifications and receipts.
diff --git a/changelog.d/13780.misc b/changelog.d/13780.misc
new file mode 100644
index 0000000000..1bcac51cad
--- /dev/null
+++ b/changelog.d/13780.misc
@@ -0,0 +1 @@
+Deduplicate `is_server_notices_room`.
\ No newline at end of file
diff --git a/changelog.d/13788.misc b/changelog.d/13788.misc
new file mode 100644
index 0000000000..7263b1ac52
--- /dev/null
+++ b/changelog.d/13788.misc
@@ -0,0 +1 @@
+Remove an old, incorrect migration file.
diff --git a/changelog.d/13789.bugfix b/changelog.d/13789.bugfix
new file mode 100644
index 0000000000..9e1e3e0fa7
--- /dev/null
+++ b/changelog.d/13789.bugfix
@@ -0,0 +1 @@
+Fix a long-standing spec compliance bug where Synapse would accept a trailing slash on the end of `/get_missing_events` federation requests.
\ No newline at end of file
diff --git a/changelog.d/13795.misc b/changelog.d/13795.misc
new file mode 100644
index 0000000000..20d90cc130
--- /dev/null
+++ b/changelog.d/13795.misc
@@ -0,0 +1 @@
+Remove unused method in `synapse.api.auth.Auth`.
diff --git a/changelog.d/13798.misc b/changelog.d/13798.misc
new file mode 100644
index 0000000000..e4ec2d77d6
--- /dev/null
+++ b/changelog.d/13798.misc
@@ -0,0 +1 @@
+Fix a memory leak when running the unit tests.
\ No newline at end of file
diff --git a/changelog.d/13802.misc b/changelog.d/13802.misc
new file mode 100644
index 0000000000..0d55071326
--- /dev/null
+++ b/changelog.d/13802.misc
@@ -0,0 +1 @@
+Use partial indices on SQLite.
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 26ed41a654..ae5b04759a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -459,15 +459,6 @@ class Auth:
             )
             raise InvalidClientTokenError("Invalid access token passed.")
 
-    def get_appservice_by_req(self, request: SynapseRequest) -> ApplicationService:
-        token = self.get_access_token_from_request(request)
-        service = self.store.get_app_service_by_token(token)
-        if not service:
-            logger.warning("Unrecognised appservice access token.")
-            raise InvalidClientTokenError()
-        request.requester = create_requester(service.sender, app_service=service)
-        return service
-
     async def is_server_admin(self, requester: Requester) -> bool:
         """Check if the given user is a local server admin.
 
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index f7884bfbe0..6bb4659c4c 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -549,8 +549,7 @@ class FederationClientKeysClaimServlet(BaseFederationServerServlet):
 
 
 class FederationGetMissingEventsServlet(BaseFederationServerServlet):
-    # TODO(paul): Why does this path alone end with "/?" optional?
-    PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
+    PATH = "/get_missing_events/(?P<room_id>[^/]*)"
 
     async def on_POST(
         self,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c484546c43..8e9c98db6c 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -45,7 +45,6 @@ from synapse.types import (
     JsonDict,
     StreamKeyType,
     StreamToken,
-    UserID,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
 )
@@ -324,8 +323,6 @@ class DeviceHandler(DeviceWorkerHandler):
             self.device_list_updater.incoming_device_list_update,
         )
 
-        hs.get_distributor().observe("user_left_room", self.user_left_room)
-
         # Whether `_handle_new_device_update_async` is currently processing.
         self._handle_new_device_update_is_processing = False
 
@@ -569,14 +566,6 @@ class DeviceHandler(DeviceWorkerHandler):
             StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
         )
 
-    async def user_left_room(self, user: UserID, room_id: str) -> None:
-        user_id = user.to_string()
-        room_ids = await self.store.get_rooms_for_user(user_id)
-        if not room_ids:
-            # We no longer share rooms with this user, so we'll no longer
-            # receive device updates. Mark this in DB.
-            await self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
-
     async def store_dehydrated_device(
         self,
         user_id: str,
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1b0ec3b58b..a87ab99404 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -175,6 +175,32 @@ class E2eKeysHandler:
                     user_ids_not_in_cache,
                     remote_results,
                 ) = await self.store.get_user_devices_from_cache(query_list)
+
+                # Check that the homeserver still shares a room with all cached users.
+                # Note that this check may be slightly racy when a remote user leaves a
+                # room after we have fetched their cached device list. In the worst case
+                # we will do extra federation queries for devices that we had cached.
+                cached_users = set(remote_results.keys())
+                valid_cached_users = (
+                    await self.store.get_users_server_still_shares_room_with(
+                        remote_results.keys()
+                    )
+                )
+                invalid_cached_users = cached_users - valid_cached_users
+                if invalid_cached_users:
+                    # Fix up results. If we get here, there is either a bug in device
+                    # list tracking, or we hit the race mentioned above.
+                    user_ids_not_in_cache.update(invalid_cached_users)
+                    for invalid_user_id in invalid_cached_users:
+                        remote_results.pop(invalid_user_id)
+                    # This log message may be removed if it turns out it's almost
+                    # entirely triggered by races.
+                    logger.error(
+                        "Devices for %s were cached, but the server no longer shares "
+                        "any rooms with them. The cached device lists are stale.",
+                        invalid_cached_users,
+                    )
+
                 for user_id, devices in remote_results.items():
                     user_devices = results.setdefault(user_id, {})
                     for device_id, device in devices.items():
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index f8b97a1749..47dabaee3f 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -862,6 +862,9 @@ class FederationEventHandler:
             self._sanity_check_event(event)
         except SynapseError as err:
             logger.warning("Event %s failed sanity check: %s", event_id, err)
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(err)
+            )
             return
 
         try:
@@ -897,6 +900,10 @@ class FederationEventHandler:
                     backfilled=backfilled,
                 )
         except FederationError as e:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(e)
+            )
+
             if e.code == 403:
                 logger.warning("Pulled event %s failed history check.", event_id)
             else:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5e4f74a98c..10b5dad030 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -752,20 +752,12 @@ class EventCreationHandler:
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
-                return await self._is_server_notices_room(builder.room_id)
+                return await self.store.is_server_notice_room(builder.room_id)
             elif membership == Membership.LEAVE:
                 # the user is always allowed to leave (but not kick people)
                 return builder.state_key == requester.user.to_string()
         return False
 
-    async def _is_server_notices_room(self, room_id: str) -> bool:
-        if self.config.servernotices.server_notices_mxid is None:
-            return False
-        is_server_notices_room = await self.store.check_local_user_in_room(
-            user_id=self.config.servernotices.server_notices_mxid, room_id=room_id
-        )
-        return is_server_notices_room
-
     async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
         """Check if a user has accepted the privacy policy
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index bff7abf509..e0d0a8941c 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -837,7 +837,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 old_membership == Membership.INVITE
                 and effective_membership_state == Membership.LEAVE
             ):
-                is_blocked = await self._is_server_notice_room(room_id)
+                is_blocked = await self.store.is_server_notice_room(room_id)
                 if is_blocked:
                     raise SynapseError(
                         HTTPStatus.FORBIDDEN,
@@ -1617,14 +1617,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         return False
 
-    async def _is_server_notice_room(self, room_id: str) -> bool:
-        if self._server_notices_mxid is None:
-            return False
-        is_server_notices_room = await self.store.check_local_user_in_room(
-            user_id=self._server_notices_mxid, room_id=room_id
-        )
-        return is_server_notices_room
-
 
 class RoomMemberMasterHandler(RoomMemberHandler):
     def __init__(self, hs: "HomeServer"):
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index d1caf8a0f7..3846fbc5f0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -198,7 +198,7 @@ class BulkPushRuleEvaluator:
         return pl_event.content if pl_event else {}, sender_level
 
     async def _get_mutual_relations(
-        self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
+        self, parent_id: str, rules: Iterable[Tuple[PushRule, bool]]
     ) -> Dict[str, Set[Tuple[str, str]]]:
         """
         Fetch event metadata for events which related to the same event as the given event.
@@ -206,7 +206,7 @@ class BulkPushRuleEvaluator:
         If the given event has no relation information, returns an empty dictionary.
 
         Args:
-            event_id: The event ID which is targeted by relations.
+            parent_id: The event ID which is targeted by relations.
             rules: The push rules which will be processed for this event.
 
         Returns:
@@ -220,12 +220,6 @@ class BulkPushRuleEvaluator:
         if not self._relations_match_enabled:
             return {}
 
-        # If the event does not have a relation, then cannot have any mutual
-        # relations.
-        relation = relation_from_event(event)
-        if not relation:
-            return {}
-
         # Pre-filter to figure out which relation types are interesting.
         rel_types = set()
         for rule, enabled in rules:
@@ -246,9 +240,7 @@ class BulkPushRuleEvaluator:
             return {}
 
         # If any valid rules were found, fetch the mutual relations.
-        return await self.store.get_mutual_event_relations(
-            relation.parent_id, rel_types
-        )
+        return await self.store.get_mutual_event_relations(parent_id, rel_types)
 
     @measure_func("action_for_event_by_user")
     async def action_for_event_by_user(
@@ -281,9 +273,17 @@ class BulkPushRuleEvaluator:
             sender_power_level,
         ) = await self._get_power_levels_and_sender_level(event, context)
 
-        relations = await self._get_mutual_relations(
-            event, itertools.chain(*rules_by_user.values())
-        )
+        relation = relation_from_event(event)
+        # If the event does not have a relation, then cannot have any mutual
+        # relations or thread ID.
+        relations = {}
+        thread_id = "main"
+        if relation:
+            relations = await self._get_mutual_relations(
+                relation.parent_id, itertools.chain(*rules_by_user.values())
+            )
+            if relation.rel_type == RelationTypes.THREAD:
+                thread_id = relation.parent_id
 
         evaluator = PushRuleEvaluatorForEvent(
             event,
@@ -352,6 +352,7 @@ class BulkPushRuleEvaluator:
             event.event_id,
             actions_by_user,
             count_as_unread,
+            thread_id,
         )
 
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 555b4e77d2..cf1eabc437 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -581,9 +581,6 @@ class BackgroundUpdater:
         def create_index_sqlite(conn: Connection) -> None:
             # Sqlite doesn't support concurrent creation of indexes.
             #
-            # We don't use partial indices on SQLite as it wasn't introduced
-            # until 3.8, and wheezy and CentOS 7 have 3.7
-            #
             # We assume that sqlite doesn't give us invalid indices; however
             # we may still end up with the index existing but the
             # background_updates not having been recorded if synapse got shut
@@ -591,12 +588,13 @@ class BackgroundUpdater:
             # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
             sql = (
                 "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
-                " (%(columns)s)"
+                " (%(columns)s) %(where_clause)s"
             ) % {
                 "unique": "UNIQUE" if unique else "",
                 "name": index_name,
                 "table": table,
                 "columns": ", ".join(columns),
+                "where_clause": "WHERE " + where_clause if where_clause else "",
             }
 
             c = conn.cursor()
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index bb14729a9d..bde7a6648a 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -602,9 +602,9 @@ class EventsPersistenceStorageController:
             # room
             state_delta_for_room: Dict[str, DeltaState] = {}
 
-            # Set of remote users which were in rooms the server has left. We
-            # should check if we still share any rooms and if not we mark their
-            # device lists as stale.
+            # Set of remote users which were in rooms the server has left or who may
+            # have left rooms the server is in. We should check if we still share any
+            # rooms and if not we mark their device lists as stale.
             potentially_left_users: Set[str] = set()
 
             if not backfilled:
@@ -729,6 +729,20 @@ class EventsPersistenceStorageController:
                                 current_state = {}
                                 delta.no_longer_in_room = True
 
+                            # Add all remote users that might have left rooms.
+                            potentially_left_users.update(
+                                user_id
+                                for event_type, user_id in delta.to_delete
+                                if event_type == EventTypes.Member
+                                and not self.is_mine_id(user_id)
+                            )
+                            potentially_left_users.update(
+                                user_id
+                                for event_type, user_id in delta.to_insert.keys()
+                                if event_type == EventTypes.Member
+                                and not self.is_mine_id(user_id)
+                            )
+
                             state_delta_for_room[room_id] = delta
 
             await self.persist_events_store._persist_events_and_state_updates(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 271e7a71c1..0669d54822 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1294,6 +1294,51 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return event_id_results
 
+    @trace
+    async def record_event_failed_pull_attempt(
+        self, room_id: str, event_id: str, cause: str
+    ) -> None:
+        """
+        Record when we fail to pull an event over federation.
+
+        This information allows us to be more intelligent when we decide to
+        retry (we don't need to fail over and over) and we can process that
+        event in the background so we don't block on it each time.
+
+        Args:
+            room_id: The room where the event failed to pull from
+            event_id: The event that failed to be fetched or processed
+            cause: The error message or reason that we failed to pull the event
+        """
+        await self.db_pool.runInteraction(
+            "record_event_failed_pull_attempt",
+            self._record_event_failed_pull_attempt_upsert_txn,
+            room_id,
+            event_id,
+            cause,
+            db_autocommit=True,  # Safe as it's a single upsert
+        )
+
+    def _record_event_failed_pull_attempt_upsert_txn(
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        event_id: str,
+        cause: str,
+    ) -> None:
+        sql = """
+            INSERT INTO event_failed_pull_attempts (
+                room_id, event_id, num_attempts, last_attempt_ts, last_cause
+            )
+                VALUES (?, ?, ?, ?, ?)
+            ON CONFLICT (room_id, event_id) DO UPDATE SET
+                num_attempts=event_failed_pull_attempts.num_attempts + 1,
+                last_attempt_ts=EXCLUDED.last_attempt_ts,
+                last_cause=EXCLUDED.last_cause;
+        """
+
+        txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
+
     async def get_missing_events(
         self,
         room_id: str,
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index f4a07de2a3..6b8668d2dc 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -98,6 +98,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.stream import StreamWorkerStore
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 
@@ -232,6 +233,104 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             replaces_index="event_push_summary_user_rm",
         )
 
+        self.db_pool.updates.register_background_index_update(
+            "event_push_summary_unique_index2",
+            index_name="event_push_summary_unique_index2",
+            table="event_push_summary",
+            columns=["user_id", "room_id", "thread_id"],
+            unique=True,
+        )
+
+        self.db_pool.updates.register_background_update_handler(
+            "event_push_backfill_thread_id",
+            self._background_backfill_thread_id,
+        )
+
+    async def _background_backfill_thread_id(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """
+        Fill in the thread_id field for event_push_actions and event_push_summary.
+
+        This is preparatory so that it can be made non-nullable in the future.
+
+        Because all current (null) data is done in an unthreaded manner this
+        simply assumes it is on the "main" timeline. Since event_push_actions
+        are periodically cleared it is not possible to correctly re-calculate
+        the thread_id.
+        """
+        event_push_actions_done = progress.get("event_push_actions_done", False)
+
+        def add_thread_id_txn(
+            txn: LoggingTransaction, table_name: str, start_stream_ordering: int
+        ) -> int:
+            sql = f"""
+            SELECT stream_ordering
+            FROM {table_name}
+            WHERE
+                thread_id IS NULL
+                AND stream_ordering > ?
+            ORDER BY stream_ordering
+            LIMIT ?
+            """
+            txn.execute(sql, (start_stream_ordering, batch_size))
+
+            # No more rows to process.
+            rows = txn.fetchall()
+            if not rows:
+                progress[f"{table_name}_done"] = True
+                self.db_pool.updates._background_update_progress_txn(
+                    txn, "event_push_backfill_thread_id", progress
+                )
+                return 0
+
+            # Update the thread ID for any of those rows.
+            max_stream_ordering = rows[-1][0]
+
+            sql = f"""
+            UPDATE {table_name}
+            SET thread_id = 'main'
+            WHERE stream_ordering <= ? AND thread_id IS NULL
+            """
+            txn.execute(sql, (max_stream_ordering,))
+
+            # Update progress.
+            processed_rows = txn.rowcount
+            progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "event_push_backfill_thread_id", progress
+            )
+
+            return processed_rows
+
+        # First update the event_push_actions table, then the event_push_summary table.
+        #
+        # Note that the event_push_actions_staging table is ignored since it is
+        # assumed that items in that table will only exist for a short period of
+        # time.
+        if not event_push_actions_done:
+            result = await self.db_pool.runInteraction(
+                "event_push_backfill_thread_id",
+                add_thread_id_txn,
+                "event_push_actions",
+                progress.get("max_event_push_actions_stream_ordering", 0),
+            )
+        else:
+            result = await self.db_pool.runInteraction(
+                "event_push_backfill_thread_id",
+                add_thread_id_txn,
+                "event_push_summary",
+                progress.get("max_event_push_summary_stream_ordering", 0),
+            )
+
+            # Only done after the event_push_summary table is done.
+            if not result:
+                await self.db_pool.updates._end_background_update(
+                    "event_push_backfill_thread_id"
+                )
+
+        return result
+
     @cached(tree=True, max_entries=5000)
     async def get_unread_event_push_actions_by_room_for_user(
         self,
@@ -670,6 +769,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         event_id: str,
         user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
         count_as_unread: bool,
+        thread_id: str,
     ) -> None:
         """Add the push actions for the event to the push action staging area.
 
@@ -678,6 +778,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             user_id_actions: A mapping of user_id to list of push actions, where
                 an action can either be a string or dict.
             count_as_unread: Whether this event should increment unread counts.
+            thread_id: The thread this event is parent of, if applicable.
         """
         if not user_id_actions:
             return
@@ -686,7 +787,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         # can be used to insert into the `event_push_actions_staging` table.
         def _gen_entry(
             user_id: str, actions: Collection[Union[Mapping, str]]
-        ) -> Tuple[str, str, str, int, int, int]:
+        ) -> Tuple[str, str, str, int, int, int, str]:
             is_highlight = 1 if _action_has_highlight(actions) else 0
             notif = 1 if "notify" in actions else 0
             return (
@@ -696,11 +797,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 notif,  # notif column
                 is_highlight,  # highlight column
                 int(count_as_unread),  # unread column
+                thread_id,  # thread_id column
             )
 
         await self.db_pool.simple_insert_many(
             "event_push_actions_staging",
-            keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"),
+            keys=(
+                "event_id",
+                "user_id",
+                "actions",
+                "notif",
+                "highlight",
+                "unread",
+                "thread_id",
+            ),
             values=[
                 _gen_entry(user_id, actions)
                 for user_id, actions in user_id_actions.items()
@@ -981,6 +1091,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             )
 
             # Replace the previous summary with the new counts.
+            #
+            # TODO(threads): Upsert per-thread instead of setting them all to main.
             self.db_pool.simple_upsert_txn(
                 txn,
                 table="event_push_summary",
@@ -990,6 +1102,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                     "unread_count": unread_count,
                     "stream_ordering": old_rotate_stream_ordering,
                     "last_receipt_stream_ordering": stream_ordering,
+                    "thread_id": "main",
                 },
             )
 
@@ -1138,17 +1251,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         logger.info("Rotating notifications, handling %d rows", len(summaries))
 
+        # TODO(threads): Update on a per-thread basis.
         self.db_pool.simple_upsert_many_txn(
             txn,
             table="event_push_summary",
             key_names=("user_id", "room_id"),
             key_values=[(user_id, room_id) for user_id, room_id in summaries],
-            value_names=("notif_count", "unread_count", "stream_ordering"),
+            value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
             value_values=[
                 (
                     summary.notif_count,
                     summary.unread_count,
                     summary.stream_ordering,
+                    "main",
                 )
                 for summary in summaries.values()
             ],
@@ -1255,7 +1370,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             table="event_push_actions",
             columns=["highlight", "stream_ordering"],
             where_clause="highlight=0",
-            psql_only=True,
         )
 
     async def get_push_actions_for_user(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1c3b804da0..5932668f2f 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2192,9 +2192,9 @@ class PersistEventsStore:
         sql = """
             INSERT INTO event_push_actions (
                 room_id, event_id, user_id, actions, stream_ordering,
-                topological_ordering, notif, highlight, unread
+                topological_ordering, notif, highlight, unread, thread_id
             )
-            SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread
+            SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id
             FROM event_push_actions_staging
             WHERE event_id = ?
         """
@@ -2435,17 +2435,31 @@ class PersistEventsStore:
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
         )
+        backward_extremity_tuples_to_remove = [
+            (ev.event_id, ev.room_id)
+            for ev in events
+            if not ev.internal_metadata.is_outlier()
+            # If we encountered an event with no prev_events, then we might
+            # as well remove it now because it won't ever have anything else
+            # to backfill from.
+            or len(ev.prev_event_ids()) == 0
+        ]
         txn.execute_batch(
             query,
-            [
-                (ev.event_id, ev.room_id)
-                for ev in events
-                if not ev.internal_metadata.is_outlier()
-                # If we encountered an event with no prev_events, then we might
-                # as well remove it now because it won't ever have anything else
-                # to backfill from.
-                or len(ev.prev_event_ids()) == 0
-            ],
+            backward_extremity_tuples_to_remove,
+        )
+
+        # Clear out the failed backfill attempts after we successfully pulled
+        # the event. Since we no longer need these events as backward
+        # extremities, it also means that they won't be backfilled from again so
+        # we no longer need to store the backfill attempts around it.
+        query = """
+            DELETE FROM event_failed_pull_attempts
+            WHERE event_id = ? and room_id = ?
+        """
+        txn.execute_batch(
+            query,
+            backward_extremity_tuples_to_remove,
         )
 
 
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 719a12b0ae..ddb8e80b69 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -113,6 +113,24 @@ class ReceiptsWorkerStore(SQLBaseStore):
             prefilled_cache=receipts_stream_prefill,
         )
 
+        self.db_pool.updates.register_background_index_update(
+            "receipts_linearized_unique_index",
+            index_name="receipts_linearized_unique_index",
+            table="receipts_linearized",
+            columns=["room_id", "receipt_type", "user_id"],
+            where_clause="thread_id IS NULL",
+            unique=True,
+        )
+
+        self.db_pool.updates.register_background_index_update(
+            "receipts_graph_unique_index",
+            index_name="receipts_graph_unique_index",
+            table="receipts_graph",
+            columns=["room_id", "receipt_type", "user_id"],
+            where_clause="thread_id IS NULL",
+            unique=True,
+        )
+
     def get_max_receipt_stream_id(self) -> int:
         """Get the current max stream ID for receipts stream"""
         return self._receipts_id_gen.get_current_token()
@@ -677,6 +695,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 "event_id": event_id,
                 "event_stream_ordering": stream_ordering,
                 "data": json_encoder.encode(data),
+                "thread_id": None,
             },
             # receipts_linearized has a unique constraint on
             # (user_id, room_id, receipt_type), so no need to lock
@@ -824,6 +843,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             values={
                 "event_ids": json_encoder.encode(event_ids),
                 "data": json_encoder.encode(data),
+                "thread_id": None,
             },
             # receipts_graph has a unique constraint on
             # (user_id, room_id, receipt_type), so no need to lock
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index fdb4684e12..a8d224602a 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -88,6 +88,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # at a time. Keyed by room_id.
         self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
 
+        self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
+
         if (
             self.hs.config.worker.run_background_tasks
             and self.hs.config.metrics.metrics_flags.known_servers
@@ -504,6 +506,21 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return membership == Membership.JOIN
 
+    async def is_server_notice_room(self, room_id: str) -> bool:
+        """
+        Determines whether the given room is a 'Server Notices' room, used for
+        sending server notices to a user.
+
+        This is determined by seeing whether the server notices user is present
+        in the room.
+        """
+        if self._server_notices_mxid is None:
+            return False
+        is_server_notices_room = await self.check_local_user_in_room(
+            user_id=self._server_notices_mxid, room_id=room_id
+        )
+        return is_server_notices_room
+
     async def get_local_current_membership_for_user_in_room(
         self, user_id: str, room_id: str
     ) -> Tuple[Optional[str], Optional[str]]:
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 2a85db2e60..da8ed6d8cf 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 72  # remember to update the list below when updating
+SCHEMA_VERSION = 73  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -77,6 +77,12 @@ Changes in SCHEMA_VERSION = 72:
     - Tables related to groups are dropped.
     - Unused column application_services_state.last_txn is dropped
     - Cache invalidation stream id sequence now begins at 2 to match code expectation.
+
+Changes in SCHEMA_VERSION = 73;
+    - thread_id column is added to event_push_actions, event_push_actions_staging
+      event_push_summary, receipts_linearized, and receipts_graph.
+    - Add table `event_failed_pull_attempts` to keep track when we fail to pull
+      events over federation.
     - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
       from `opentracing_context` to generalized `tracing_context`.
 """
diff --git a/synapse/storage/schema/main/delta/72/06thread_notifications.sql b/synapse/storage/schema/main/delta/72/06thread_notifications.sql
new file mode 100644
index 0000000000..2f4f5dac7a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/06thread_notifications.sql
@@ -0,0 +1,30 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a nullable column for thread ID to the event push actions tables; this
+-- will be filled in with a default value for any previously existing rows.
+--
+-- After migration this can be made non-nullable.
+
+ALTER TABLE event_push_actions_staging ADD COLUMN thread_id TEXT;
+ALTER TABLE event_push_actions ADD COLUMN thread_id TEXT;
+ALTER TABLE event_push_summary ADD COLUMN thread_id TEXT;
+
+-- Update the unique index for `event_push_summary`.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7006, 'event_push_summary_unique_index2', '{}');
+
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+  (7006, 'event_push_backfill_thread_id', '{}', 'event_push_summary_unique_index2');
diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres
new file mode 100644
index 0000000000..55fff9e278
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a nullable column for thread ID to the receipts table; this allows a
+-- receipt per user, per room, as well as an unthreaded receipt (corresponding
+-- to a null thread ID).
+
+ALTER TABLE receipts_linearized ADD COLUMN thread_id TEXT;
+ALTER TABLE receipts_graph ADD COLUMN thread_id TEXT;
+
+-- Rebuild the unique constraint with the thread_id.
+ALTER TABLE receipts_linearized
+    ADD CONSTRAINT receipts_linearized_uniqueness_thread
+        UNIQUE (room_id, receipt_type, user_id, thread_id);
+
+ALTER TABLE receipts_graph
+    ADD CONSTRAINT receipts_graph_uniqueness_thread
+        UNIQUE (room_id, receipt_type, user_id, thread_id);
diff --git a/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite
new file mode 100644
index 0000000000..232f67deb4
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/07thread_receipts.sql.sqlite
@@ -0,0 +1,70 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Allow multiple receipts per user per room via a nullable thread_id column.
+--
+-- SQLite doesn't support modifying constraints to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE receipts_linearized_new (
+    stream_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    receipt_type TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    thread_id TEXT,
+    event_stream_ordering BIGINT,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id),
+    CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+CREATE TABLE receipts_graph_new (
+    room_id TEXT NOT NULL,
+    receipt_type TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    event_ids TEXT NOT NULL,
+    thread_id TEXT,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id),
+    CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+-- Drop the old indexes.
+DROP INDEX IF EXISTS receipts_linearized_id;
+DROP INDEX IF EXISTS receipts_linearized_room_stream;
+DROP INDEX IF EXISTS receipts_linearized_user;
+
+-- Copy the data.
+INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data)
+    SELECT stream_id, room_id, receipt_type, user_id, event_id, event_stream_ordering, data
+    FROM receipts_linearized;
+INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
+    SELECT room_id, receipt_type, user_id, event_ids, data
+    FROM receipts_graph;
+
+-- Drop the old tables.
+DROP TABLE receipts_linearized;
+DROP TABLE receipts_graph;
+
+-- Rename the tables.
+ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
+ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
+
+-- Create the indices.
+CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
+CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
+CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
diff --git a/synapse/storage/schema/main/delta/72/08thread_receipts.sql b/synapse/storage/schema/main/delta/72/08thread_receipts.sql
new file mode 100644
index 0000000000..e35b021f31
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/08thread_receipts.sql
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7007, 'receipts_linearized_unique_index', '{}');
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7007, 'receipts_graph_unique_index', '{}');
diff --git a/synapse/storage/schema/main/delta/72/09partial_indices.sql.sqlite b/synapse/storage/schema/main/delta/72/09partial_indices.sql.sqlite
new file mode 100644
index 0000000000..c8dfdf0218
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/09partial_indices.sql.sqlite
@@ -0,0 +1,56 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+-- SQLite needs to rebuild indices which use partial indices on Postgres, but
+-- previously did not use them on SQLite.
+
+-- Drop each index that was added with register_background_index_update AND specified
+-- a where_clause (that existed before this delta).
+
+-- From events_bg_updates.py
+DROP INDEX IF EXISTS event_contains_url_index;
+-- There is also a redactions_censored_redacts index, but that gets dropped.
+DROP INDEX IF EXISTS redactions_have_censored_ts;
+-- There is also a PostgreSQL only index (event_contains_url_index2)
+-- which gets renamed to event_contains_url_index.
+
+-- From roommember.py
+DROP INDEX IF EXISTS room_memberships_user_room_forgotten;
+
+-- From presence.py
+DROP INDEX IF EXISTS presence_stream_state_not_offline_idx;
+
+-- From media_repository.py
+DROP INDEX IF EXISTS local_media_repository_url_idx;
+
+-- From event_push_actions.py
+DROP INDEX IF EXISTS event_push_actions_highlights_index;
+-- There's also a event_push_actions_stream_highlight_index which was previously
+-- PostgreSQL-only.
+
+-- From state.py
+DROP INDEX IF EXISTS current_state_events_member_index;
+
+-- Re-insert the background jobs to re-create the indices.
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+  (7209, 'event_contains_url_index', '{}', NULL),
+  (7209, 'redactions_have_censored_ts_idx', '{}', NULL),
+  (7209, 'room_membership_forgotten_idx', '{}', NULL),
+  (7209, 'presence_stream_not_offline_index', '{}', NULL),
+  (7209, 'local_media_repository_url_idx', '{}', NULL),
+  (7209, 'event_push_actions_highlights_index', '{}', NULL),
+  (7209, 'event_push_actions_stream_highlight_index', '{}', NULL),
+  (7209, 'current_state_members_idx', '{}', NULL)
+ON CONFLICT (update_name) DO NOTHING;
diff --git a/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql b/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql
new file mode 100644
index 0000000000..d397ee1082
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/01event_failed_pull_attempts.sql
@@ -0,0 +1,29 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Add a table that keeps track of when we failed to pull an event over
+-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
+-- us to be more intelligent when we decide to retry (we don't need to fail over
+-- and over) and we can process that event in the background so we don't block
+-- on it each time.
+CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
+    room_id TEXT NOT NULL REFERENCES rooms (room_id),
+    event_id TEXT NOT NULL,
+    num_attempts INT NOT NULL,
+    last_attempt_ts BIGINT NOT NULL,
+    last_cause TEXT NOT NULL,
+    PRIMARY KEY (room_id, event_id)
+);
diff --git a/synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/73/02rename_opentelemtetry_tracing_context.sql
index ae904863f8..ae904863f8 100644
--- a/synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql
+++ b/synapse/storage/schema/main/delta/73/02rename_opentelemtetry_tracing_context.sql
diff --git a/synapse/storage/schema/state/delta/30/state_stream.sql b/synapse/storage/schema/state/delta/30/state_stream.sql
deleted file mode 100644
index bdaf8b02d5..0000000000
--- a/synapse/storage/schema/state/delta/30/state_stream.sql
+++ /dev/null
@@ -1,37 +0,0 @@
-/* Copyright 2016 OpenMarket Ltd
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/* We used to create a table called current_state_resets, but this is no
- * longer used and is removed in delta 54.
- */
-
-/* The outlier events that have aquired a state group typically through
- * backfill. This is tracked separately to the events table, as assigning a
- * state group change the position of the existing event in the stream
- * ordering.
- * However since a stream_ordering is assigned in persist_event for the
- * (event, state) pair, we can use that stream_ordering to identify when
- * the new state was assigned for the event.
- */
-
-/* NB: This table belongs to the `main` logical database; it should not be present
- * in `state`.
- */
-CREATE TABLE IF NOT EXISTS ex_outlier_stream(
-    event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
-    event_id TEXT NOT NULL,
-    state_group BIGINT NOT NULL
-);
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 35c0be08b0..f7c3a6794e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -205,8 +205,9 @@ def register_cache(
         add_resizable_cache(cache_name, resize_callback)
 
     metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
+    metric_name = "cache_%s_%s" % (cache_type, cache_name)
     caches_by_name[cache_name] = cache
-    CACHE_METRIC_REGISTRY.register_hook(metric.collect)
+    CACHE_METRIC_REGISTRY.register_hook(metric_name, metric.collect)
     return metric
 
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 9687120ebf..165480bdbe 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -15,7 +15,7 @@
 import logging
 from functools import wraps
 from types import TracebackType
-from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
+from typing import Awaitable, Callable, Dict, Generator, Optional, Type, TypeVar
 
 from prometheus_client import CollectorRegistry, Counter, Metric
 from typing_extensions import Concatenate, ParamSpec, Protocol
@@ -220,21 +220,21 @@ class DynamicCollectorRegistry(CollectorRegistry):
 
     def __init__(self) -> None:
         super().__init__()
-        self._pre_update_hooks: List[Callable[[], None]] = []
+        self._pre_update_hooks: Dict[str, Callable[[], None]] = {}
 
     def collect(self) -> Generator[Metric, None, None]:
         """
         Collects metrics, calling pre-update hooks first.
         """
 
-        for pre_update_hook in self._pre_update_hooks:
+        for pre_update_hook in self._pre_update_hooks.values():
             pre_update_hook()
 
         yield from super().collect()
 
-    def register_hook(self, hook: Callable[[], None]) -> None:
+    def register_hook(self, metric_name: str, hook: Callable[[], None]) -> None:
         """
         Registers a hook that is called before metric collection.
         """
 
-        self._pre_update_hooks.append(hook)
+        self._pre_update_hooks[metric_name] = hook
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 1e6ad4b663..95698bc275 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -891,6 +891,12 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
             new_callable=mock.MagicMock,
             return_value=make_awaitable(["some_room_id"]),
         )
+        mock_get_users = mock.patch.object(
+            self.store,
+            "get_users_server_still_shares_room_with",
+            new_callable=mock.MagicMock,
+            return_value=make_awaitable({remote_user_id}),
+        )
         mock_request = mock.patch.object(
             self.hs.get_federation_client(),
             "query_user_devices",
@@ -898,7 +904,7 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
             return_value=make_awaitable(response_body),
         )
 
-        with mock_get_rooms, mock_request as mocked_federation_request:
+        with mock_get_rooms, mock_get_users, mock_request as mocked_federation_request:
             # Make the first query and sanity check it succeeds.
             response_1 = self.get_success(
                 e2e_handler.query_devices(
diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index 51c8dd6498..b5b89405a4 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -227,3 +227,225 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
 
         if prev_exists_as_outlier:
             self.mock_federation_transport_client.get_event.assert_not_called()
+
+    def test_process_pulled_event_records_failed_backfill_attempts(
+        self,
+    ) -> None:
+        """
+        Test to make sure that failed backfill attempts for an event are
+        recorded in the `event_failed_pull_attempts` table.
+
+        In this test, we pretend we are processing a "pulled" event via
+        backfill. The pulled event has a fake `prev_event` which our server has
+        obviously never seen before so it attempts to request the state at that
+        `prev_event` which expectedly fails because it's a fake event. Because
+        the server can't fetch the state at the missing `prev_event`, the
+        "pulled" event fails the history check and is fails to process.
+
+        We check that we correctly record the number of failed pull attempts
+        of the pulled event and as a sanity check, that the "pulled" event isn't
+        persisted.
+        """
+        OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+        main_store = self.hs.get_datastores().main
+
+        # Create the room
+        user_id = self.register_user("kermit", "test")
+        tok = self.login("kermit", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+        room_version = self.get_success(main_store.get_room_version(room_id))
+
+        # We expect an outbound request to /state_ids, so stub that out
+        self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
+            {
+                # Mimic the other server not knowing about the state at all.
+                # We want to cause Synapse to throw an error (`Unable to get
+                # missing prev_event $fake_prev_event`) and fail to backfill
+                # the pulled event.
+                "pdu_ids": [],
+                "auth_chain_ids": [],
+            }
+        )
+        # We also expect an outbound request to /state
+        self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
+            StateRequestResponse(
+                # Mimic the other server not knowing about the state at all.
+                # We want to cause Synapse to throw an error (`Unable to get
+                # missing prev_event $fake_prev_event`) and fail to backfill
+                # the pulled event.
+                auth_events=[],
+                state=[],
+            )
+        )
+
+        pulled_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "type": "test_regular_type",
+                    "room_id": room_id,
+                    "sender": OTHER_USER,
+                    "prev_events": [
+                        # The fake prev event will make the pulled event fail
+                        # the history check (`Unable to get missing prev_event
+                        # $fake_prev_event`)
+                        "$fake_prev_event"
+                    ],
+                    "auth_events": [],
+                    "origin_server_ts": 1,
+                    "depth": 12,
+                    "content": {"body": "pulled"},
+                }
+            ),
+            room_version,
+        )
+
+        # The function under test: try to process the pulled event
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler()._process_pulled_event(
+                    self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+                )
+            )
+
+        # Make sure our failed pull attempt was recorded
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts, 1)
+
+        # The function under test: try to process the pulled event again
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler()._process_pulled_event(
+                    self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+                )
+            )
+
+        # Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts, 2)
+
+        # And as a sanity check, make sure the event was not persisted through all of this.
+        persisted = self.get_success(
+            main_store.get_event(pulled_event.event_id, allow_none=True)
+        )
+        self.assertIsNone(
+            persisted,
+            "pulled event that fails the history check should not be persisted at all",
+        )
+
+    def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
+        self,
+    ) -> None:
+        """
+        Test to make sure that failed pull attempts
+        (`event_failed_pull_attempts` table) for an event are cleared after the
+        event is successfully persisted.
+
+        In this test, we pretend we are processing a "pulled" event via
+        backfill. The pulled event succesfully processes and the backward
+        extremeties are updated along with clearing out any failed pull attempts
+        for those old extremities.
+
+        We check that we correctly cleared failed pull attempts of the
+        pulled event.
+        """
+        OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+        main_store = self.hs.get_datastores().main
+
+        # Create the room
+        user_id = self.register_user("kermit", "test")
+        tok = self.login("kermit", "test")
+        room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+        room_version = self.get_success(main_store.get_room_version(room_id))
+
+        # allow the remote user to send state events
+        self.helper.send_state(
+            room_id,
+            "m.room.power_levels",
+            {"events_default": 0, "state_default": 0},
+            tok=tok,
+        )
+
+        # add the remote user to the room
+        member_event = self.get_success(
+            event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+        )
+
+        initial_state_map = self.get_success(
+            main_store.get_partial_current_state_ids(room_id)
+        )
+
+        auth_event_ids = [
+            initial_state_map[("m.room.create", "")],
+            initial_state_map[("m.room.power_levels", "")],
+            member_event.event_id,
+        ]
+
+        pulled_event = make_event_from_dict(
+            self.add_hashes_and_signatures_from_other_server(
+                {
+                    "type": "test_regular_type",
+                    "room_id": room_id,
+                    "sender": OTHER_USER,
+                    "prev_events": [member_event.event_id],
+                    "auth_events": auth_event_ids,
+                    "origin_server_ts": 1,
+                    "depth": 12,
+                    "content": {"body": "pulled"},
+                }
+            ),
+            room_version,
+        )
+
+        # Fake the "pulled" event failing to backfill once so we can test
+        # if it's cleared out later on.
+        self.get_success(
+            main_store.record_event_failed_pull_attempt(
+                pulled_event.room_id, pulled_event.event_id, "fake cause"
+            )
+        )
+        # Make sure we have a failed pull attempt recorded for the pulled event
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+            )
+        )
+        self.assertEqual(backfill_num_attempts, 1)
+
+        # The function under test: try to process the pulled event
+        with LoggingContext("test"):
+            self.get_success(
+                self.hs.get_federation_event_handler()._process_pulled_event(
+                    self.OTHER_SERVER_NAME, pulled_event, backfilled=True
+                )
+            )
+
+        # Make sure the failed pull attempts for the pulled event are cleared
+        backfill_num_attempts = self.get_success(
+            main_store.db_pool.simple_select_one_onecol(
+                table="event_failed_pull_attempts",
+                keyvalues={"event_id": pulled_event.event_id},
+                retcol="num_attempts",
+                allow_none=True,
+            )
+        )
+        self.assertIsNone(backfill_num_attempts)
+
+        # And as a sanity check, make sure the "pulled" event was persisted.
+        persisted = self.get_success(
+            main_store.get_event(pulled_event.event_id, allow_none=True)
+        )
+        self.assertIsNotNone(persisted, "pulled event was not persisted at all")
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 531a0db2d0..49a21e2e85 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -404,6 +404,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
                 event.event_id,
                 {user_id: actions for user_id, actions in push_actions},
                 False,
+                "main",
             )
         )
         return event, context