summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/push.py10
-rw-r--r--synapse/config/room.py4
-rw-r--r--synapse/handlers/room_member.py173
-rw-r--r--synapse/handlers/room_member_worker.py3
-rw-r--r--synapse/push/httppusher.py18
-rw-r--r--synapse/rest/admin/experimental_features.py1
-rw-r--r--synapse/server.py11
-rw-r--r--synapse/storage/background_updates.py44
-rw-r--r--synapse/storage/database.py13
-rw-r--r--synapse/storage/databases/main/event_push_actions.py253
-rw-r--r--synapse/storage/databases/main/roommember.py69
-rw-r--r--synapse/storage/databases/main/user_directory.py235
-rw-r--r--synapse/storage/schema/__init__.py3
-rw-r--r--synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql24
-rw-r--r--synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql28
-rw-r--r--synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres37
-rw-r--r--synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite102
17 files changed, 667 insertions, 361 deletions
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 3b5378e6ea..8177ff52e2 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -42,11 +42,17 @@ class PushConfig(Config):
 
         # Now check for the one in the 'email' section and honour it,
         # with a warning.
-        push_config = config.get("email") or {}
-        redact_content = push_config.get("redact_content")
+        email_push_config = config.get("email") or {}
+        redact_content = email_push_config.get("redact_content")
         if redact_content is not None:
             print(
                 "The 'email.redact_content' option is deprecated: "
                 "please set push.include_content instead"
             )
             self.push_include_content = not redact_content
+
+        # Whether to apply a random delay to outbound push.
+        self.push_jitter_delay_ms = None
+        push_jitter_delay = push_config.get("jitter_delay", None)
+        if push_jitter_delay:
+            self.push_jitter_delay_ms = self.parse_duration(push_jitter_delay)
diff --git a/synapse/config/room.py b/synapse/config/room.py
index 4a7ac00540..b6696cd129 100644
--- a/synapse/config/room.py
+++ b/synapse/config/room.py
@@ -75,3 +75,7 @@ class RoomConfig(Config):
                         % preset
                     )
                 # We validate the actual overrides when we try to apply them.
+
+        # When enabled, users will forget rooms when they leave them, either via a
+        # leave, kick or ban.
+        self.forget_on_leave = config.get("forget_rooms_on_leave", False)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed805d6ec8..fbef600acd 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -16,7 +16,7 @@ import abc
 import logging
 import random
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse import types
 from synapse.api.constants import (
@@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
+from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.logging import opentracing
+from synapse.metrics import event_processing_positions
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.types import (
     JsonDict,
@@ -280,9 +283,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """
         raise NotImplementedError()
 
-    @abc.abstractmethod
     async def forget(self, user: UserID, room_id: str) -> None:
-        raise NotImplementedError()
+        user_id = user.to_string()
+
+        member = await self._storage_controllers.state.get_current_state_event(
+            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
+        )
+        membership = member.membership if member else None
+
+        if membership is not None and membership not in [
+            Membership.LEAVE,
+            Membership.BAN,
+        ]:
+            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+
+        # In normal case this call is only required if `membership` is not `None`.
+        # But: After the last member had left the room, the background update
+        # `_background_remove_left_rooms` is deleting rows related to this room from
+        # the table `current_state_events` and `get_current_state_events` is `None`.
+        await self.store.forget(user_id, room_id)
 
     async def ratelimit_multiple_invites(
         self,
@@ -2046,25 +2065,141 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         """Implements RoomMemberHandler._user_left_room"""
         user_left_room(self.distributor, target, room_id)
 
-    async def forget(self, user: UserID, room_id: str) -> None:
-        user_id = user.to_string()
 
-        member = await self._storage_controllers.state.get_current_state_event(
-            room_id=room_id, event_type=EventTypes.Member, state_key=user_id
-        )
-        membership = member.membership if member else None
+class RoomForgetterHandler(StateDeltasHandler):
+    """Forgets rooms when they are left, when enabled in the homeserver config.
 
-        if membership is not None and membership not in [
-            Membership.LEAVE,
-            Membership.BAN,
-        ]:
-            raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
+    For the purposes of this feature, kicks, bans and "leaves" via state resolution
+    weirdness are all considered to be leaves.
 
-        # In normal case this call is only required if `membership` is not `None`.
-        # But: After the last member had left the room, the background update
-        # `_background_remove_left_rooms` is deleting rows related to this room from
-        # the table `current_state_events` and `get_current_state_events` is `None`.
-        await self.store.forget(user_id, room_id)
+    Derived from `StatsHandler` and `UserDirectoryHandler`.
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._hs = hs
+        self._store = hs.get_datastores().main
+        self._storage_controllers = hs.get_storage_controllers()
+        self._clock = hs.get_clock()
+        self._notifier = hs.get_notifier()
+        self._room_member_handler = hs.get_room_member_handler()
+
+        # The current position in the current_state_delta stream
+        self.pos: Optional[int] = None
+
+        # Guard to ensure we only process deltas one at a time
+        self._is_processing = False
+
+        if hs.config.worker.run_background_tasks:
+            self._notifier.add_replication_callback(self.notify_new_event)
+
+            # We kick this off to pick up outstanding work from before the last restart.
+            self._clock.call_later(0, self.notify_new_event)
+
+    def notify_new_event(self) -> None:
+        """Called when there may be more deltas to process"""
+        if self._is_processing:
+            return
+
+        self._is_processing = True
+
+        async def process() -> None:
+            try:
+                await self._unsafe_process()
+            finally:
+                self._is_processing = False
+
+        run_as_background_process("room_forgetter.notify_new_event", process)
+
+    async def _unsafe_process(self) -> None:
+        # If self.pos is None then means we haven't fetched it from DB
+        if self.pos is None:
+            self.pos = await self._store.get_room_forgetter_stream_pos()
+            room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+            if self.pos > room_max_stream_ordering:
+                # apparently, we've processed more events than exist in the database!
+                # this can happen if events are removed with history purge or similar.
+                logger.warning(
+                    "Event stream ordering appears to have gone backwards (%i -> %i): "
+                    "rewinding room forgetter processor",
+                    self.pos,
+                    room_max_stream_ordering,
+                )
+                self.pos = room_max_stream_ordering
+
+        if not self._hs.config.room.forget_on_leave:
+            # Update the processing position, so that if the server admin turns the
+            # feature on at a later date, we don't decide to forget every room that
+            # has ever been left in the past.
+            self.pos = self._store.get_room_max_stream_ordering()
+            await self._store.update_room_forgetter_stream_pos(self.pos)
+            return
+
+        # Loop round handling deltas until we're up to date
+
+        while True:
+            # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+            # deltas, since there is otherwise a chance that we could miss updates which arrive
+            # after we check the deltas.
+            room_max_stream_ordering = self._store.get_room_max_stream_ordering()
+            if self.pos == room_max_stream_ordering:
+                break
+
+            logger.debug(
+                "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering
+            )
+            (
+                max_pos,
+                deltas,
+            ) = await self._storage_controllers.state.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
+
+            logger.debug("Handling %d state deltas", len(deltas))
+            await self._handle_deltas(deltas)
+
+            self.pos = max_pos
+
+            # Expose current event processing position to prometheus
+            event_processing_positions.labels("room_forgetter").set(max_pos)
+
+            await self._store.update_room_forgetter_stream_pos(max_pos)
+
+    async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
+        """Called with the state deltas to process"""
+        for delta in deltas:
+            typ = delta["type"]
+            state_key = delta["state_key"]
+            room_id = delta["room_id"]
+            event_id = delta["event_id"]
+            prev_event_id = delta["prev_event_id"]
+
+            if typ != EventTypes.Member:
+                continue
+
+            if not self._hs.is_mine_id(state_key):
+                continue
+
+            change = await self._get_key_change(
+                prev_event_id,
+                event_id,
+                key_name="membership",
+                public_value=Membership.JOIN,
+            )
+            is_leave = change is MatchChange.now_false
+
+            if is_leave:
+                try:
+                    await self._room_member_handler.forget(
+                        UserID.from_string(state_key), room_id
+                    )
+                except SynapseError as e:
+                    if e.code == 400:
+                        # The user is back in the room.
+                        pass
+                    else:
+                        raise
 
 
 def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 76e36b8a6d..e8ff1ad063 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -137,6 +137,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
         await self._notify_change_client(
             user_id=target.to_string(), room_id=room_id, change="left"
         )
-
-    async def forget(self, target: UserID, room_id: str) -> None:
-        raise RuntimeError("Cannot forget rooms on workers.")
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 4f8fa445d9..e91ee05e99 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import random
 import urllib.parse
 from typing import TYPE_CHECKING, Dict, List, Optional, Union
 
@@ -114,6 +115,8 @@ class HttpPusher(Pusher):
         )
         self._pusherpool = hs.get_pusherpool()
 
+        self.push_jitter_delay_ms = hs.config.push.push_jitter_delay_ms
+
         self.data = pusher_config.data
         if self.data is None:
             raise PusherConfigException("'data' key can not be null for HTTP pusher")
@@ -326,6 +329,21 @@ class HttpPusher(Pusher):
         event = await self.store.get_event(push_action.event_id, allow_none=True)
         if event is None:
             return True  # It's been redacted
+
+        # Check if we should delay sending out the notification by a random
+        # amount.
+        #
+        # Note: we base the delay off of when the event was sent, rather than
+        # now, to handle the case where we need to send out many notifications
+        # at once. If we just slept the random amount each loop then the last
+        # push notification in the set could be delayed by many times the max
+        # delay.
+        if self.push_jitter_delay_ms:
+            delay_ms = random.randint(1, self.push_jitter_delay_ms)
+            diff_ms = event.origin_server_ts + delay_ms - self.clock.time_msec()
+            if diff_ms > 0:
+                await self.clock.sleep(diff_ms / 1000)
+
         rejected = await self.dispatch_push_event(event, tweaks, badge)
         if rejected is False:
             return False
diff --git a/synapse/rest/admin/experimental_features.py b/synapse/rest/admin/experimental_features.py
index 1d409ac2b7..abf273af10 100644
--- a/synapse/rest/admin/experimental_features.py
+++ b/synapse/rest/admin/experimental_features.py
@@ -33,7 +33,6 @@ class ExperimentalFeature(str, Enum):
     """
 
     MSC3026 = "msc3026"
-    MSC2654 = "msc2654"
     MSC3881 = "msc3881"
     MSC3967 = "msc3967"
 
diff --git a/synapse/server.py b/synapse/server.py
index 08ad97b952..e597627a6d 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -93,7 +93,11 @@ from synapse.handlers.room import (
 )
 from synapse.handlers.room_batch import RoomBatchHandler
 from synapse.handlers.room_list import RoomListHandler
-from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler
+from synapse.handlers.room_member import (
+    RoomForgetterHandler,
+    RoomMemberHandler,
+    RoomMemberMasterHandler,
+)
 from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.room_summary import RoomSummaryHandler
 from synapse.handlers.search import SearchHandler
@@ -232,6 +236,7 @@ class HomeServer(metaclass=abc.ABCMeta):
         "message",
         "pagination",
         "profile",
+        "room_forgetter",
         "stats",
     ]
 
@@ -827,6 +832,10 @@ class HomeServer(metaclass=abc.ABCMeta):
         return PushRulesHandler(self)
 
     @cache_in_self
+    def get_room_forgetter_handler(self) -> RoomForgetterHandler:
+        return RoomForgetterHandler(self)
+
+    @cache_in_self
     def get_outbound_redis_connection(self) -> "ConnectionHandler":
         """
         The Redis connection used for replication.
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index a99aea8926..ca085ef800 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -561,6 +561,50 @@ class BackgroundUpdater:
             updater, oneshot=True
         )
 
+    def register_background_validate_constraint(
+        self, update_name: str, constraint_name: str, table: str
+    ) -> None:
+        """Helper for store classes to do a background validate constraint.
+
+        This only applies on PostgreSQL.
+
+        To use:
+
+        1. use a schema delta file to add a background update. Example:
+            INSERT INTO background_updates (update_name, progress_json) VALUES
+                ('validate_my_constraint', '{}');
+
+        2. In the Store constructor, call this method
+
+        Args:
+            update_name: update_name to register for
+            constraint_name: name of constraint to validate
+            table: table the constraint is applied to
+        """
+
+        def runner(conn: Connection) -> None:
+            c = conn.cursor()
+
+            sql = f"""
+            ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name};
+            """
+            logger.debug("[SQL] %s", sql)
+            c.execute(sql)
+
+        async def updater(progress: JsonDict, batch_size: int) -> int:
+            assert isinstance(
+                self.db_pool.engine, engines.PostgresEngine
+            ), "validate constraint background update registered for non-Postres database"
+
+            logger.info("Validating constraint %s to %s", constraint_name, table)
+            await self.db_pool.runWithConnection(runner)
+            await self._end_background_update(update_name)
+            return 1
+
+        self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
+            updater, oneshot=True
+        )
+
     async def create_index_in_background(
         self,
         index_name: str,
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 1f5f5eb6f8..313cf1a8d0 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -386,13 +386,20 @@ class LoggingTransaction:
             self.executemany(sql, args)
 
     def execute_values(
-        self, sql: str, values: Iterable[Iterable[Any]], fetch: bool = True
+        self,
+        sql: str,
+        values: Iterable[Iterable[Any]],
+        template: Optional[str] = None,
+        fetch: bool = True,
     ) -> List[Tuple]:
         """Corresponds to psycopg2.extras.execute_values. Only available when
         using postgres.
 
         The `fetch` parameter must be set to False if the query does not return
         rows (e.g. INSERTs).
+
+        The `template` is the snippet to merge to every item in argslist to
+        compose the query.
         """
         assert isinstance(self.database_engine, PostgresEngine)
         from psycopg2.extras import execute_values
@@ -400,7 +407,9 @@ class LoggingTransaction:
         return self._do_execute(
             # TODO: is it safe for values to be Iterable[Iterable[Any]] here?
             # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
-            lambda the_sql: execute_values(self.txn, the_sql, values, fetch=fetch),
+            lambda the_sql: execute_values(
+                self.txn, the_sql, values, template=template, fetch=fetch
+            ),
             sql,
         )
 
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index eeccf5db24..2e98a29fef 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -100,7 +100,6 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 
@@ -289,180 +288,22 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             unique=True,
         )
 
-        self.db_pool.updates.register_background_update_handler(
-            "event_push_backfill_thread_id",
-            self._background_backfill_thread_id,
+        self.db_pool.updates.register_background_validate_constraint(
+            "event_push_actions_staging_thread_id",
+            constraint_name="event_push_actions_staging_thread_id",
+            table="event_push_actions_staging",
         )
-
-        # Indexes which will be used to quickly make the thread_id column non-null.
-        self.db_pool.updates.register_background_index_update(
-            "event_push_actions_thread_id_null",
-            index_name="event_push_actions_thread_id_null",
+        self.db_pool.updates.register_background_validate_constraint(
+            "event_push_actions_thread_id",
+            constraint_name="event_push_actions_thread_id",
             table="event_push_actions",
-            columns=["thread_id"],
-            where_clause="thread_id IS NULL",
         )
-        self.db_pool.updates.register_background_index_update(
-            "event_push_summary_thread_id_null",
-            index_name="event_push_summary_thread_id_null",
+        self.db_pool.updates.register_background_validate_constraint(
+            "event_push_summary_thread_id",
+            constraint_name="event_push_summary_thread_id",
             table="event_push_summary",
-            columns=["thread_id"],
-            where_clause="thread_id IS NULL",
         )
 
-        # Check ASAP (and then later, every 1s) to see if we have finished
-        # background updates the event_push_actions and event_push_summary tables.
-        self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
-        self._event_push_backfill_thread_id_done = False
-
-    @wrap_as_background_process("check_event_push_backfill_thread_id")
-    async def _check_event_push_backfill_thread_id(self) -> None:
-        """
-        Has thread_id finished backfilling?
-
-        If not, we need to just-in-time update it so the queries work.
-        """
-        done = await self.db_pool.updates.has_completed_background_update(
-            "event_push_backfill_thread_id"
-        )
-
-        if done:
-            self._event_push_backfill_thread_id_done = True
-        else:
-            # Reschedule to run.
-            self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
-
-    async def _background_backfill_thread_id(
-        self, progress: JsonDict, batch_size: int
-    ) -> int:
-        """
-        Fill in the thread_id field for event_push_actions and event_push_summary.
-
-        This is preparatory so that it can be made non-nullable in the future.
-
-        Because all current (null) data is done in an unthreaded manner this
-        simply assumes it is on the "main" timeline. Since event_push_actions
-        are periodically cleared it is not possible to correctly re-calculate
-        the thread_id.
-        """
-        event_push_actions_done = progress.get("event_push_actions_done", False)
-
-        def add_thread_id_txn(
-            txn: LoggingTransaction, start_stream_ordering: int
-        ) -> int:
-            sql = """
-            SELECT stream_ordering
-            FROM event_push_actions
-            WHERE
-                thread_id IS NULL
-                AND stream_ordering > ?
-            ORDER BY stream_ordering
-            LIMIT ?
-            """
-            txn.execute(sql, (start_stream_ordering, batch_size))
-
-            # No more rows to process.
-            rows = txn.fetchall()
-            if not rows:
-                progress["event_push_actions_done"] = True
-                self.db_pool.updates._background_update_progress_txn(
-                    txn, "event_push_backfill_thread_id", progress
-                )
-                return 0
-
-            # Update the thread ID for any of those rows.
-            max_stream_ordering = rows[-1][0]
-
-            sql = """
-            UPDATE event_push_actions
-            SET thread_id = 'main'
-            WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
-            """
-            txn.execute(
-                sql,
-                (
-                    start_stream_ordering,
-                    max_stream_ordering,
-                ),
-            )
-
-            # Update progress.
-            processed_rows = txn.rowcount
-            progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
-            self.db_pool.updates._background_update_progress_txn(
-                txn, "event_push_backfill_thread_id", progress
-            )
-
-            return processed_rows
-
-        def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
-            min_user_id = progress.get("max_summary_user_id", "")
-            min_room_id = progress.get("max_summary_room_id", "")
-
-            # Slightly overcomplicated query for getting the Nth user ID / room
-            # ID tuple, or the last if there are less than N remaining.
-            sql = """
-            SELECT user_id, room_id FROM (
-                SELECT user_id, room_id FROM event_push_summary
-                WHERE (user_id, room_id) > (?, ?)
-                    AND thread_id IS NULL
-                ORDER BY user_id, room_id
-                LIMIT ?
-            ) AS e
-            ORDER BY user_id DESC, room_id DESC
-            LIMIT 1
-            """
-
-            txn.execute(sql, (min_user_id, min_room_id, batch_size))
-            row = txn.fetchone()
-            if not row:
-                return 0
-
-            max_user_id, max_room_id = row
-
-            sql = """
-            UPDATE event_push_summary
-            SET thread_id = 'main'
-            WHERE
-                (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
-                AND thread_id IS NULL
-            """
-            txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
-            processed_rows = txn.rowcount
-
-            progress["max_summary_user_id"] = max_user_id
-            progress["max_summary_room_id"] = max_room_id
-            self.db_pool.updates._background_update_progress_txn(
-                txn, "event_push_backfill_thread_id", progress
-            )
-
-            return processed_rows
-
-        # First update the event_push_actions table, then the event_push_summary table.
-        #
-        # Note that the event_push_actions_staging table is ignored since it is
-        # assumed that items in that table will only exist for a short period of
-        # time.
-        if not event_push_actions_done:
-            result = await self.db_pool.runInteraction(
-                "event_push_backfill_thread_id",
-                add_thread_id_txn,
-                progress.get("max_event_push_actions_stream_ordering", 0),
-            )
-        else:
-            result = await self.db_pool.runInteraction(
-                "event_push_backfill_thread_id",
-                add_thread_id_summary_txn,
-            )
-
-            # Only done after the event_push_summary table is done.
-            if not result:
-                await self.db_pool.updates._end_background_update(
-                    "event_push_backfill_thread_id"
-                )
-
-        return result
-
     async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
         """Get the notification count by room for a user. Only considers notifications,
         not highlight or unread counts, and threads are currently aggregated under their room.
@@ -711,25 +552,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
         )
 
-        # First ensure that the existing rows have an updated thread_id field.
-        if not self._event_push_backfill_thread_id_done:
-            txn.execute(
-                """
-                UPDATE event_push_summary
-                SET thread_id = ?
-                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                """,
-                (MAIN_TIMELINE, room_id, user_id),
-            )
-            txn.execute(
-                """
-                UPDATE event_push_actions
-                SET thread_id = ?
-                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                """,
-                (MAIN_TIMELINE, room_id, user_id),
-            )
-
         # First we pull the counts from the summary table.
         #
         # We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1545,25 +1367,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 (room_id, user_id, stream_ordering, *thread_args),
             )
 
-            # First ensure that the existing rows have an updated thread_id field.
-            if not self._event_push_backfill_thread_id_done:
-                txn.execute(
-                    """
-                    UPDATE event_push_summary
-                    SET thread_id = ?
-                    WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                    """,
-                    (MAIN_TIMELINE, room_id, user_id),
-                )
-                txn.execute(
-                    """
-                    UPDATE event_push_actions
-                    SET thread_id = ?
-                    WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                    """,
-                    (MAIN_TIMELINE, room_id, user_id),
-                )
-
             # Fetch the notification counts between the stream ordering of the
             # latest receipt and what was previously summarised.
             unread_counts = self._get_notif_unread_count_for_user_room(
@@ -1698,19 +1501,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
         """
 
-        # Ensure that any new actions have an updated thread_id.
-        if not self._event_push_backfill_thread_id_done:
-            txn.execute(
-                """
-                UPDATE event_push_actions
-                SET thread_id = ?
-                WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
-                """,
-                (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
-            )
-
-        # XXX Do we need to update summaries here too?
-
         # Calculate the new counts that should be upserted into event_push_summary
         sql = """
             SELECT user_id, room_id, thread_id,
@@ -1773,20 +1563,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         logger.info("Rotating notifications, handling %d rows", len(summaries))
 
-        # Ensure that any updated threads have the proper thread_id.
-        if not self._event_push_backfill_thread_id_done:
-            txn.execute_batch(
-                """
-                UPDATE event_push_summary
-                SET thread_id = ?
-                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                """,
-                [
-                    (MAIN_TIMELINE, room_id, user_id)
-                    for user_id, room_id, _ in summaries
-                ],
-            )
-
         self.db_pool.simple_upsert_many_txn(
             txn,
             table="event_push_summary",
@@ -1836,6 +1612,15 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             # deletes.
             batch_size = self._rotate_count
 
+            if isinstance(self.database_engine, PostgresEngine):
+                # Temporarily disable sequential scans in this transaction. We
+                # need to do this as the postgres statistics don't take into
+                # account the `highlight = 0` part when estimating the
+                # distribution of `stream_ordering`. I.e. since we keep old
+                # highlight rows the query planner thinks there are way more old
+                # rows to delete than there actually are.
+                txn.execute("SET LOCAL enable_seqscan=off")
+
             txn.execute(
                 """
                 SELECT stream_ordering FROM event_push_actions
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index daad58291a..e068f27a10 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -82,7 +82,7 @@ class EventIdMembership:
     membership: str
 
 
-class RoomMemberWorkerStore(EventsWorkerStore):
+class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
     def __init__(
         self,
         database: DatabasePool,
@@ -1372,6 +1372,50 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             _is_local_host_in_room_ignoring_users_txn,
         )
 
+    async def forget(self, user_id: str, room_id: str) -> None:
+        """Indicate that user_id wishes to discard history for room_id."""
+
+        def f(txn: LoggingTransaction) -> None:
+            self.db_pool.simple_update_txn(
+                txn,
+                table="room_memberships",
+                keyvalues={"user_id": user_id, "room_id": room_id},
+                updatevalues={"forgotten": 1},
+            )
+
+            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
+            self._invalidate_cache_and_stream(
+                txn, self.get_forgotten_rooms_for_user, (user_id,)
+            )
+
+        await self.db_pool.runInteraction("forget_membership", f)
+
+    async def get_room_forgetter_stream_pos(self) -> int:
+        """Get the stream position of the background process to forget rooms when left
+        by users.
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            retcol="stream_id",
+            desc="room_forgetter_stream_pos",
+        )
+
+    async def update_room_forgetter_stream_pos(self, stream_id: int) -> None:
+        """Update the stream position of the background process to forget rooms when
+        left by users.
+
+        Must only be used by the worker running the background process.
+        """
+        assert self.hs.config.worker.run_background_tasks
+
+        await self.db_pool.simple_update_one(
+            table="room_forgetter_stream_pos",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id},
+            desc="room_forgetter_stream_pos",
+        )
+
 
 class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(
@@ -1553,29 +1597,6 @@ class RoomMemberStore(
     ):
         super().__init__(database, db_conn, hs)
 
-    async def forget(self, user_id: str, room_id: str) -> None:
-        """Indicate that user_id wishes to discard history for room_id."""
-
-        def f(txn: LoggingTransaction) -> None:
-            sql = (
-                "UPDATE"
-                "  room_memberships"
-                " SET"
-                "  forgotten = 1"
-                " WHERE"
-                "  user_id = ?"
-                " AND"
-                "  room_id = ?"
-            )
-            txn.execute(sql, (user_id, room_id))
-
-            self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
-            self._invalidate_cache_and_stream(
-                txn, self.get_forgotten_rooms_for_user, (user_id,)
-            )
-
-        await self.db_pool.runInteraction("forget_membership", f)
-
 
 def extract_heroes_from_room_summary(
     details: Mapping[str, MemberSummary], me: str
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 5d65faed16..b7d58978de 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -27,6 +27,8 @@ from typing import (
     cast,
 )
 
+import attr
+
 try:
     # Figure out if ICU support is available for searching users.
     import icu
@@ -66,6 +68,19 @@ logger = logging.getLogger(__name__)
 TEMP_TABLE = "_temp_populate_user_directory"
 
 
+@attr.s(auto_attribs=True, frozen=True)
+class _UserDirProfile:
+    """Helper type for the user directory code for an entry to be inserted into
+    the directory.
+    """
+
+    user_id: str
+
+    # If the display name or avatar URL are unexpected types, replace with None
+    display_name: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none)
+    avatar_url: Optional[str] = attr.ib(default=None, converter=non_null_str_or_none)
+
+
 class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
     # How many records do we calculate before sending it to
     # add_users_who_share_private_rooms?
@@ -381,25 +396,65 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             % (len(users_to_work_on), progress["remaining"])
         )
 
-        for user_id in users_to_work_on:
-            if await self.should_include_local_user_in_dir(user_id):
-                profile = await self.get_profileinfo(get_localpart_from_id(user_id))  # type: ignore[attr-defined]
-                await self.update_profile_in_user_dir(
-                    user_id, profile.display_name, profile.avatar_url
-                )
-
-            # We've finished processing a user. Delete it from the table.
-            await self.db_pool.simple_delete_one(
-                TEMP_TABLE + "_users", {"user_id": user_id}
-            )
-            # Update the remaining counter.
-            progress["remaining"] -= 1
-            await self.db_pool.runInteraction(
-                "populate_user_directory",
-                self.db_pool.updates._background_update_progress_txn,
-                "populate_user_directory_process_users",
-                progress,
+        # First filter down to users we want to insert into the user directory.
+        users_to_insert = [
+            user_id
+            for user_id in users_to_work_on
+            if await self.should_include_local_user_in_dir(user_id)
+        ]
+
+        # Next fetch their profiles. Note that the `user_id` here is the
+        # *localpart*, and that not all users have profiles.
+        profile_rows = await self.db_pool.simple_select_many_batch(
+            table="profiles",
+            column="user_id",
+            iterable=[get_localpart_from_id(u) for u in users_to_insert],
+            retcols=(
+                "user_id",
+                "displayname",
+                "avatar_url",
+            ),
+            keyvalues={},
+            desc="populate_user_directory_process_users_get_profiles",
+        )
+        profiles = {
+            f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
+                f"@{row['user_id']}:{self.server_name}",
+                row["displayname"],
+                row["avatar_url"],
             )
+            for row in profile_rows
+        }
+
+        profiles_to_insert = [
+            profiles.get(user_id) or _UserDirProfile(user_id)
+            for user_id in users_to_insert
+        ]
+
+        # Actually insert the users with their profiles into the directory.
+        await self.db_pool.runInteraction(
+            "populate_user_directory_process_users_insertion",
+            self._update_profiles_in_user_dir_txn,
+            profiles_to_insert,
+        )
+
+        # We've finished processing the users. Delete it from the table.
+        await self.db_pool.simple_delete_many(
+            table=TEMP_TABLE + "_users",
+            column="user_id",
+            iterable=users_to_work_on,
+            keyvalues={},
+            desc="populate_user_directory_process_users_delete",
+        )
+
+        # Update the remaining counter.
+        progress["remaining"] -= len(users_to_work_on)
+        await self.db_pool.runInteraction(
+            "populate_user_directory",
+            self.db_pool.updates._background_update_progress_txn,
+            "populate_user_directory_process_users",
+            progress,
+        )
 
         return len(users_to_work_on)
 
@@ -584,72 +639,102 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         Update or add a user's profile in the user directory.
         If the user is remote, the profile will be marked as not stale.
         """
-        # If the display name or avatar URL are unexpected types, replace with None.
-        display_name = non_null_str_or_none(display_name)
-        avatar_url = non_null_str_or_none(avatar_url)
+        await self.db_pool.runInteraction(
+            "update_profiles_in_user_dir",
+            self._update_profiles_in_user_dir_txn,
+            [_UserDirProfile(user_id, display_name, avatar_url)],
+        )
+
+    def _update_profiles_in_user_dir_txn(
+        self,
+        txn: LoggingTransaction,
+        profiles: Sequence[_UserDirProfile],
+    ) -> None:
+        self.db_pool.simple_upsert_many_txn(
+            txn,
+            table="user_directory",
+            key_names=("user_id",),
+            key_values=[(p.user_id,) for p in profiles],
+            value_names=("display_name", "avatar_url"),
+            value_values=[
+                (
+                    p.display_name,
+                    p.avatar_url,
+                )
+                for p in profiles
+            ],
+        )
 
-        def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
-            self.db_pool.simple_upsert_txn(
+        # Remote users: Make sure the profile is not marked as stale anymore.
+        remote_users = [
+            p.user_id for p in profiles if not self.hs.is_mine_id(p.user_id)
+        ]
+        if remote_users:
+            self.db_pool.simple_delete_many_txn(
                 txn,
-                table="user_directory",
-                keyvalues={"user_id": user_id},
-                values={"display_name": display_name, "avatar_url": avatar_url},
+                table="user_directory_stale_remote_users",
+                column="user_id",
+                values=remote_users,
+                keyvalues={},
             )
 
-            if not self.hs.is_mine_id(user_id):
-                # Remote users: Make sure the profile is not marked as stale anymore.
-                self.db_pool.simple_delete_txn(
-                    txn,
-                    table="user_directory_stale_remote_users",
-                    keyvalues={"user_id": user_id},
+        if isinstance(self.database_engine, PostgresEngine):
+            # We weight the localpart most highly, then display name and finally
+            # server name
+            template = """
+                (
+                    %s,
+                    setweight(to_tsvector('simple', %s), 'A')
+                    || setweight(to_tsvector('simple', %s), 'D')
+                    || setweight(to_tsvector('simple', COALESCE(%s, '')), 'B')
                 )
+            """
 
-            # The display name that goes into the database index.
-            index_display_name = display_name
-            if index_display_name is not None:
-                index_display_name = _filter_text_for_index(index_display_name)
-
-            if isinstance(self.database_engine, PostgresEngine):
-                # We weight the localpart most highly, then display name and finally
-                # server name
-                sql = """
-                        INSERT INTO user_directory_search(user_id, vector)
-                        VALUES (?,
-                            setweight(to_tsvector('simple', ?), 'A')
-                            || setweight(to_tsvector('simple', ?), 'D')
-                            || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
-                        ) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
-                    """
-                txn.execute(
-                    sql,
+            sql = """
+                    INSERT INTO user_directory_search(user_id, vector)
+                    VALUES ? ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
+                """
+            txn.execute_values(
+                sql,
+                [
                     (
-                        user_id,
-                        get_localpart_from_id(user_id),
-                        get_domain_from_id(user_id),
-                        index_display_name,
-                    ),
-                )
-            elif isinstance(self.database_engine, Sqlite3Engine):
-                value = (
-                    "%s %s" % (user_id, index_display_name)
-                    if index_display_name
-                    else user_id
-                )
-                self.db_pool.simple_upsert_txn(
-                    txn,
-                    table="user_directory_search",
-                    keyvalues={"user_id": user_id},
-                    values={"value": value},
-                )
-            else:
-                # This should be unreachable.
-                raise Exception("Unrecognized database engine")
+                        p.user_id,
+                        get_localpart_from_id(p.user_id),
+                        get_domain_from_id(p.user_id),
+                        _filter_text_for_index(p.display_name)
+                        if p.display_name
+                        else None,
+                    )
+                    for p in profiles
+                ],
+                template=template,
+                fetch=False,
+            )
+        elif isinstance(self.database_engine, Sqlite3Engine):
+            values = []
+            for p in profiles:
+                if p.display_name is not None:
+                    index_display_name = _filter_text_for_index(p.display_name)
+                    value = f"{p.user_id} {index_display_name}"
+                else:
+                    value = p.user_id
 
-            txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
+                values.append((value,))
 
-        await self.db_pool.runInteraction(
-            "update_profile_in_user_dir", _update_profile_in_user_dir_txn
-        )
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="user_directory_search",
+                key_names=("user_id",),
+                key_values=[(p.user_id,) for p in profiles],
+                value_names=("value",),
+                value_values=values,
+            )
+        else:
+            # This should be unreachable.
+            raise Exception("Unrecognized database engine")
+
+        for p in profiles:
+            txn.call_after(self.get_user_in_directory.invalidate, (p.user_id,))
 
     async def add_users_who_share_private_room(
         self, room_id: str, user_id_tuples: Iterable[Tuple[str, str]]
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 1672976209..741563abc6 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -106,6 +106,9 @@ Changes in SCHEMA_VERSION = 76:
 SCHEMA_COMPAT_VERSION = (
     # Queries against `event_stream_ordering` columns in membership tables must
     # be disambiguated.
+    #
+    # The threads_id column must written to with non-null values for the
+    # event_push_actions, event_push_actions_staging, and event_push_summary tables.
     74
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
diff --git a/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
new file mode 100644
index 0000000000..be4b57d86f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04_add_room_forgetter.sql
@@ -0,0 +1,24 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE room_forgetter_stream_pos (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id  BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO room_forgetter_stream_pos (
+    stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;
diff --git a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql
new file mode 100644
index 0000000000..ce6f9ff937
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql
@@ -0,0 +1,28 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Force the background updates from 06thread_notifications.sql to run in the
+-- foreground as code will now require those to be "done".
+
+DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
+
+-- Overwrite any null thread_id values.
+UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
+
+-- Drop the background updates to calculate the indexes used to find null thread_ids.
+DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
+DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null';
diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres
new file mode 100644
index 0000000000..40936def6f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres
@@ -0,0 +1,37 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The thread_id columns can now be made non-nullable, this is done by using a
+-- constraint (and not altering the column) to avoid taking out a full table lock.
+--
+-- We initially add an invalid constraint which guards against new data (this
+-- doesn't lock the table).
+ALTER TABLE event_push_actions_staging
+    ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
+ALTER TABLE event_push_actions
+    ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
+ALTER TABLE event_push_summary
+    ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
+
+-- We then validate the constraint which doesn't need to worry about new data. It
+-- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7605, 'event_push_actions_staging_thread_id', '{}'),
+  (7605, 'event_push_actions_thread_id', '{}'),
+  (7605, 'event_push_summary_thread_id', '{}');
+
+-- Drop the indexes used to find null thread_ids.
+DROP INDEX IF EXISTS event_push_actions_thread_id_null;
+DROP INDEX IF EXISTS event_push_summary_thread_id_null;
diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite
new file mode 100644
index 0000000000..e9372b6cf9
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite
@@ -0,0 +1,102 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- The thread_id columns can now be made non-nullable.
+--
+-- SQLite doesn't support modifying columns to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE event_push_actions_staging_new (
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    actions TEXT NOT NULL,
+    notif SMALLINT NOT NULL,
+    highlight SMALLINT NOT NULL,
+    unread SMALLINT,
+    thread_id TEXT,
+    inserted_ts BIGINT,
+    CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL)
+);
+
+CREATE TABLE event_push_actions_new (
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    profile_tag VARCHAR(32),
+    actions TEXT NOT NULL,
+    topological_ordering BIGINT,
+    stream_ordering BIGINT,
+    notif SMALLINT,
+    highlight SMALLINT,
+    unread SMALLINT,
+    thread_id TEXT,
+    CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag),
+    CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL)
+);
+
+CREATE TABLE event_push_summary_new (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    notif_count BIGINT NOT NULL,
+    stream_ordering BIGINT NOT NULL,
+    unread_count BIGINT,
+    last_receipt_stream_ordering BIGINT,
+    thread_id TEXT,
+    CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL)
+);
+
+-- Copy the data.
+INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
+    SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
+    FROM event_push_actions_staging;
+
+INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
+    SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
+    FROM event_push_actions;
+
+INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
+    SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
+    FROM event_push_summary;
+
+-- Drop the old tables.
+DROP TABLE event_push_actions_staging;
+DROP TABLE event_push_actions;
+DROP TABLE event_push_summary;
+
+-- Rename the tables.
+ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
+ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
+ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
+
+-- Recreate the indexes.
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
+
+CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering);
+CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
+CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
+CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
+CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
+
+CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ;
+
+-- Recreate some indexes in the background, by re-running the background updates
+-- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7403, 'event_push_summary_unique_index2', '{}')
+  ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7403, 'event_push_actions_stream_highlight_index', '{}')
+  ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';