summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-11-16 16:27:21 +0000
committerErik Johnston <erik@matrix.org>2023-11-16 16:27:21 +0000
commitb20bdd3997227dd74403ce39977a37a3b89762ed (patch)
treeb35f4771a535bbbd7fe8955c2c238b62f0ad5f71 /synapse/storage
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentSpeed up deleting device messages (#16643) (diff)
downloadsynapse-b20bdd3997227dd74403ce39977a37a3b89762ed.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/background_updates.py18
-rw-r--r--synapse/storage/databases/__init__.py2
-rw-r--r--synapse/storage/databases/main/account_data.py24
-rw-r--r--synapse/storage/databases/main/deviceinbox.py106
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py84
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py19
-rw-r--r--synapse/storage/databases/main/events_worker.py6
-rw-r--r--synapse/storage/databases/main/keys.py17
-rw-r--r--synapse/storage/databases/main/media_repository.py90
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py2
-rw-r--r--synapse/storage/databases/main/presence.py9
-rw-r--r--synapse/storage/databases/main/purge_events.py33
-rw-r--r--synapse/storage/databases/main/push_rule.py43
-rw-r--r--synapse/storage/databases/main/registration.py20
-rw-r--r--synapse/storage/databases/main/search.py8
-rw-r--r--synapse/storage/databases/state/bg_updates.py4
-rw-r--r--synapse/storage/engines/postgres.py3
-rw-r--r--synapse/storage/schema/__init__.py3
-rw-r--r--synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres30
-rw-r--r--synapse/storage/schema/main/delta/54/delete_forward_extremities.sql2
-rw-r--r--synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql3
-rw-r--r--synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql3
-rw-r--r--synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres88
-rw-r--r--synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql15
-rw-r--r--synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres80
-rw-r--r--synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres30
26 files changed, 629 insertions, 113 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 7426dbcad6..62fbd05534 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -49,7 +49,11 @@ else:
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
-    from synapse.storage.database import DatabasePool, LoggingTransaction
+    from synapse.storage.database import (
+        DatabasePool,
+        LoggingDatabaseConnection,
+        LoggingTransaction,
+    )
 
 logger = logging.getLogger(__name__)
 
@@ -746,10 +750,10 @@ class BackgroundUpdater:
                 The named index will be dropped upon completion of the new index.
         """
 
-        def create_index_psql(conn: Connection) -> None:
+        def create_index_psql(conn: "LoggingDatabaseConnection") -> None:
             conn.rollback()
             # postgres insists on autocommit for the index
-            conn.set_session(autocommit=True)  # type: ignore
+            conn.engine.attempt_to_set_autocommit(conn.conn, True)
 
             try:
                 c = conn.cursor()
@@ -793,9 +797,9 @@ class BackgroundUpdater:
                 undo_timeout_sql = f"SET statement_timeout = {default_timeout}"
                 conn.cursor().execute(undo_timeout_sql)
 
-                conn.set_session(autocommit=False)  # type: ignore
+                conn.engine.attempt_to_set_autocommit(conn.conn, False)
 
-        def create_index_sqlite(conn: Connection) -> None:
+        def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None:
             # Sqlite doesn't support concurrent creation of indexes.
             #
             # We assume that sqlite doesn't give us invalid indices; however
@@ -825,7 +829,9 @@ class BackgroundUpdater:
                 c.execute(sql)
 
         if isinstance(self.db_pool.engine, engines.PostgresEngine):
-            runner: Optional[Callable[[Connection], None]] = create_index_psql
+            runner: Optional[
+                Callable[[LoggingDatabaseConnection], None]
+            ] = create_index_psql
         elif psql_only:
             runner = None
         else:
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 7aa24ccf21..b57e260fe0 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]):
     """
 
     databases: List[DatabasePool]
-    main: "DataStore"  # FIXME: #11165: actually an instance of `main_store_class`
+    main: "DataStore"  # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class`
     state: StateGroupDataStore
     persist_events: Optional[PersistEventsStore]
 
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d7482a1f4e..07f9b65af3 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
         )
 
         # Invalidate the cache for any ignored users which were added or removed.
-        for ignored_user_id in previously_ignored_users ^ currently_ignored_users:
-            self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,))
+        self._invalidate_cache_and_stream_bulk(
+            txn,
+            self.ignored_by,
+            [
+                (ignored_user_id,)
+                for ignored_user_id in (
+                    previously_ignored_users ^ currently_ignored_users
+                )
+            ],
+        )
         self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
 
     async def remove_account_data_for_user(
@@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
                 )
 
                 # Invalidate the cache for ignored users which were removed.
-                for ignored_user_id in previously_ignored_users:
-                    self._invalidate_cache_and_stream(
-                        txn, self.ignored_by, (ignored_user_id,)
-                    )
+                self._invalidate_cache_and_stream_bulk(
+                    txn,
+                    self.ignored_by,
+                    [
+                        (ignored_user_id,)
+                        for ignored_user_id in previously_ignored_users
+                    ],
+                )
 
                 # Invalidate for this user the cache tracking ignored users.
                 self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,))
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 71eefe6b7c..659631ab65 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         user_id: str,
         device_id: Optional[str],
         up_to_stream_id: int,
-        limit: Optional[int] = None,
     ) -> int:
         """
         Args:
             user_id: The recipient user_id.
             device_id: The recipient device_id.
             up_to_stream_id: Where to delete messages up to.
-            limit: maximum number of messages to delete
 
         Returns:
             The number of messages deleted.
@@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 log_kv({"message": "No changes in cache since last check"})
                 return 0
 
-        def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
-            limit_statement = "" if limit is None else f"LIMIT {limit}"
-            sql = f"""
-                DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
-                  SELECT MAX(stream_id) FROM (
-                    SELECT stream_id FROM device_inbox
-                    WHERE user_id = ? AND device_id = ? AND stream_id <= ?
-                    ORDER BY stream_id
-                    {limit_statement}
-                  ) AS q1
-                )
-                """
-            txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
-            return txn.rowcount
-
-        count = await self.db_pool.runInteraction(
-            "delete_messages_for_device", delete_messages_for_device_txn
-        )
+        from_stream_id = None
+        count = 0
+        while True:
+            from_stream_id, loop_count = await self.delete_messages_for_device_between(
+                user_id,
+                device_id,
+                from_stream_id=from_stream_id,
+                to_stream_id=up_to_stream_id,
+                limit=1000,
+            )
+            count += loop_count
+            if from_stream_id is None:
+                break
 
         log_kv({"message": f"deleted {count} messages for device", "count": count})
 
-        # In this case we don't know if we hit the limit or the delete is complete
-        # so let's not update the cache.
-        if count == limit:
-            return count
-
         # Update the cache, ensuring that we only ever increase the value
         updated_last_deleted_stream_id = self._last_device_delete_cache.get(
             (user_id, device_id), 0
@@ -515,6 +503,74 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         return count
 
     @trace
+    async def delete_messages_for_device_between(
+        self,
+        user_id: str,
+        device_id: Optional[str],
+        from_stream_id: Optional[int],
+        to_stream_id: int,
+        limit: int,
+    ) -> Tuple[Optional[int], int]:
+        """Delete N device messages between the stream IDs, returning the
+        highest stream ID deleted (or None if all messages in the range have
+        been deleted) and the number of messages deleted.
+
+        This is more efficient than `delete_messages_for_device` when calling in
+        a loop to batch delete messages.
+        """
+
+        # Keeping track of a lower bound of stream ID where we've deleted
+        # everything below makes the queries much faster. Otherwise, every time
+        # we scan for rows to delete we'd re-scan across all the rows that have
+        # previously deleted (until the next table VACUUM).
+
+        if from_stream_id is None:
+            # Minimum device stream ID is 1.
+            from_stream_id = 0
+
+        def delete_messages_for_device_between_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[Optional[int], int]:
+            txn.execute(
+                """
+                SELECT MAX(stream_id) FROM (
+                    SELECT stream_id FROM device_inbox
+                    WHERE user_id = ? AND device_id = ?
+                        AND ? < stream_id AND stream_id <= ?
+                    ORDER BY stream_id
+                    LIMIT ?
+                ) AS d
+                """,
+                (user_id, device_id, from_stream_id, to_stream_id, limit),
+            )
+            row = txn.fetchone()
+            if row is None or row[0] is None:
+                return None, 0
+
+            (max_stream_id,) = row
+
+            txn.execute(
+                """
+                DELETE FROM device_inbox
+                WHERE user_id = ? AND device_id = ?
+                AND ? < stream_id AND stream_id <= ?
+                """,
+                (user_id, device_id, from_stream_id, max_stream_id),
+            )
+
+            num_deleted = txn.rowcount
+            if num_deleted < limit:
+                return None, num_deleted
+
+            return max_stream_id, num_deleted
+
+        return await self.db_pool.runInteraction(
+            "delete_messages_for_device_between",
+            delete_messages_for_device_between_txn,
+            db_autocommit=True,  # We don't need to run in a transaction
+        )
+
+    @trace
     async def get_new_device_msgs_for_remote(
         self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
     ) -> Tuple[List[JsonDict], int]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 8cb61eaee3..9e98729330 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
 
         return otk_rows
 
+    async def get_master_cross_signing_key_updatable_before(
+        self, user_id: str
+    ) -> Tuple[bool, Optional[int]]:
+        """Get time before which a master cross-signing key may be replaced without UIA.
+
+        (UIA means "User-Interactive Auth".)
+
+        There are three cases to distinguish:
+         (1) No master cross-signing key.
+         (2) The key exists, but there is no replace-without-UI timestamp in the DB.
+         (3) The key exists, and has such a timestamp recorded.
+
+        Returns: a 2-tuple of:
+          - a boolean: is there a master cross-signing key already?
+          - an optional timestamp, directly taken from the DB.
+
+        In terms of the cases above, these are:
+         (1) (False, None).
+         (2) (True, None).
+         (3) (True, <timestamp in ms>).
+
+        """
+
+        def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
+            # We want to distinguish between three cases:
+            txn.execute(
+                """
+                SELECT updatable_without_uia_before_ms
+                FROM e2e_cross_signing_keys
+                WHERE user_id = ? AND keytype = 'master'
+                ORDER BY stream_id DESC
+                LIMIT 1
+            """,
+                (user_id,),
+            )
+            row = cast(Optional[Tuple[Optional[int]]], txn.fetchone())
+            if row is None:
+                return False, None
+            return True, row[0]
+
+        return await self.db_pool.runInteraction(
+            "e2e_cross_signing_keys",
+            impl,
+        )
+
 
 class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def __init__(
@@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             ],
             desc="add_e2e_signing_key",
         )
+
+    async def allow_master_cross_signing_key_replacement_without_uia(
+        self, user_id: str, duration_ms: int
+    ) -> Optional[int]:
+        """Mark this user's latest master key as being replaceable without UIA.
+
+        Said replacement will only be permitted for a short time after calling this
+        function. That time period is controlled by the duration argument.
+
+        Returns:
+            None, if there is no such key.
+            Otherwise, the timestamp before which replacement is allowed without UIA.
+        """
+        timestamp = self._clock.time_msec() + duration_ms
+
+        def impl(txn: LoggingTransaction) -> Optional[int]:
+            txn.execute(
+                """
+                UPDATE e2e_cross_signing_keys
+                SET updatable_without_uia_before_ms = ?
+                WHERE stream_id = (
+                    SELECT stream_id
+                    FROM e2e_cross_signing_keys
+                    WHERE user_id = ? AND keytype = 'master'
+                    ORDER BY stream_id DESC
+                    LIMIT 1
+                )
+            """,
+                (timestamp, user_id),
+            )
+            if txn.rowcount == 0:
+                return None
+
+            return timestamp
+
+        return await self.db_pool.runInteraction(
+            "allow_master_cross_signing_key_replacement_without_uia",
+            impl,
+        )
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0061805150..0c91f19c8e 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         """Background update to clean out extremities that should have been
         deleted previously.
 
-        Mainly used to deal with the aftermath of #5269.
+        Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269.
         """
 
         # This works by first copying all existing forward extremities into the
@@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             )
 
             logger.info(
-                "Deleted %d forward extremities of %d checked, to clean up #5269",
+                "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269",
                 deleted,
                 len(original_set),
             )
@@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
                 )
 
                 # Iterate the parent IDs and invalidate caches.
-                for parent_id in {r[1] for r in relations_to_insert}:
-                    cache_tuple = (parent_id,)
-                    self._invalidate_cache_and_stream(  # type: ignore[attr-defined]
-                        txn, self.get_relations_for_event, cache_tuple  # type: ignore[attr-defined]
-                    )
-                    self._invalidate_cache_and_stream(  # type: ignore[attr-defined]
-                        txn, self.get_thread_summary, cache_tuple  # type: ignore[attr-defined]
-                    )
+                cache_tuples = {(r[1],) for r in relations_to_insert}
+                self._invalidate_cache_and_stream_bulk(  # type: ignore[attr-defined]
+                    txn, self.get_relations_for_event, cache_tuples  # type: ignore[attr-defined]
+                )
+                self._invalidate_cache_and_stream_bulk(  # type: ignore[attr-defined]
+                    txn, self.get_thread_summary, cache_tuples  # type: ignore[attr-defined]
+                )
 
             if results:
                 latest_event_id = results[-1][0]
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e831146cca..a880788f00 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore):
             room_version: Optional[RoomVersion]
             if not room_version_id:
                 # this should only happen for out-of-band membership events which
-                # arrived before #6983 landed. For all other events, we should have
+                # arrived before https://github.com/matrix-org/synapse/issues/6983
+                # landed. For all other events, we should have
                 # an entry in the 'rooms' table.
                 #
                 # However, the 'out_of_band_membership' flag is unreliable for older
@@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore):
                         "Room %s for event %s is unknown" % (d["room_id"], event_id)
                     )
 
-                # so, assuming this is an out-of-band-invite that arrived before #6983
+                # so, assuming this is an out-of-band-invite that arrived before
+                # https://github.com/matrix-org/synapse/issues/6983
                 # landed, we know that the room version must be v5 or earlier (because
                 # v6 hadn't been invented at that point, so invites from such rooms
                 # would have been rejected.)
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ce88772f9e..c700872fdc 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore):
             # invalidate takes a tuple corresponding to the params of
             # _get_server_keys_json. _get_server_keys_json only takes one
             # param, which is itself the 2-tuple (server_name, key_id).
-            for key_id in verify_keys:
-                self._invalidate_cache_and_stream(
-                    txn, self._get_server_keys_json, ((server_name, key_id),)
-                )
-                self._invalidate_cache_and_stream(
-                    txn, self.get_server_key_json_for_remote, (server_name, key_id)
-                )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self._get_server_keys_json,
+                [((server_name, key_id),) for key_id in verify_keys],
+            )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self.get_server_key_json_for_remote,
+                [(server_name, key_id) for key_id in verify_keys],
+            )
 
         await self.db_pool.runInteraction(
             "store_server_keys_response", store_server_keys_response_txn
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 3f80a64dc5..149135b8b5 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
 class LocalMedia:
     media_id: str
     media_type: str
-    media_length: int
+    media_length: Optional[int]
     upload_name: str
     created_ts: int
     url_cache: Optional[str]
     last_access_ts: int
     quarantined_by: Optional[str]
     safe_from_quarantine: bool
+    user_id: Optional[str]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
             self._drop_media_index_without_method,
         )
 
+        if hs.config.media.can_load_media_repo:
+            self.unused_expiration_time: Optional[
+                int
+            ] = hs.config.media.unused_expiration_time
+        else:
+            self.unused_expiration_time = None
+
     async def _drop_media_index_without_method(
         self, progress: JsonDict, batch_size: int
     ) -> int:
@@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "url_cache",
                 "last_access_ts",
                 "safe_from_quarantine",
+                "user_id",
             ),
             allow_none=True,
             desc="get_local_media",
@@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             url_cache=row[5],
             last_access_ts=row[6],
             safe_from_quarantine=row[7],
+            user_id=row[8],
         )
 
     async def get_local_media_by_user_paginate(
@@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     url_cache,
                     last_access_ts,
                     quarantined_by,
-                    safe_from_quarantine
+                    safe_from_quarantine,
+                    user_id
                 FROM local_media_repository
                 WHERE user_id = ?
                 ORDER BY {order_by_column} {order}, media_id ASC
@@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     last_access_ts=row[6],
                     quarantined_by=row[7],
                     safe_from_quarantine=bool(row[8]),
+                    user_id=row[9],
                 )
                 for row in txn
             ]
@@ -392,6 +404,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         )
 
     @trace
+    async def store_local_media_id(
+        self,
+        media_id: str,
+        time_now_ms: int,
+        user_id: UserID,
+    ) -> None:
+        await self.db_pool.simple_insert(
+            "local_media_repository",
+            {
+                "media_id": media_id,
+                "created_ts": time_now_ms,
+                "user_id": user_id.to_string(),
+            },
+            desc="store_local_media_id",
+        )
+
+    @trace
     async def store_local_media(
         self,
         media_id: str,
@@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="store_local_media",
         )
 
+    async def update_local_media(
+        self,
+        media_id: str,
+        media_type: str,
+        upload_name: Optional[str],
+        media_length: int,
+        user_id: UserID,
+        url_cache: Optional[str] = None,
+    ) -> None:
+        await self.db_pool.simple_update_one(
+            "local_media_repository",
+            keyvalues={
+                "user_id": user_id.to_string(),
+                "media_id": media_id,
+            },
+            updatevalues={
+                "media_type": media_type,
+                "upload_name": upload_name,
+                "media_length": media_length,
+                "url_cache": url_cache,
+            },
+            desc="update_local_media",
+        )
+
     async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
         """Mark a local media as safe or unsafe from quarantining."""
         await self.db_pool.simple_update_one(
@@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="mark_local_media_as_safe",
         )
 
+    async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]:
+        """Count the number of pending media for a user.
+
+        Returns:
+            A tuple of two integers: the total pending media requests and the earliest
+            expiration timestamp.
+        """
+
+        def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]:
+            sql = """
+            SELECT COUNT(*), MIN(created_ts)
+            FROM local_media_repository
+            WHERE user_id = ?
+                AND created_ts > ?
+                AND media_length IS NULL
+            """
+            assert self.unused_expiration_time is not None
+            txn.execute(
+                sql,
+                (
+                    user_id.to_string(),
+                    self._clock.time_msec() - self.unused_expiration_time,
+                ),
+            )
+            row = txn.fetchone()
+            if not row:
+                return 0, 0
+            return row[0], (row[1] + self.unused_expiration_time if row[1] else 0)
+
+        return await self.db_pool.runInteraction(
+            "get_pending_media", get_pending_media_txn
+        )
+
     async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]:
         """Get the media_id and ts for a cached URL as of the given timestamp
         Returns:
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 4b1061e6d7..2911e53310 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
             if user_id:
                 is_support = self.is_support_user_txn(txn, user_id)
                 if not is_support:
-                    # We do this manually here to avoid hitting #6791
+                    # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791
                     self.db_pool.simple_upsert_txn(
                         txn,
                         table="monthly_active_users",
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 3b444d2d07..0198bb09d2 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
                 # for their user ID.
                 value_values=[(presence_stream_id,) for _ in user_ids],
             )
-            for user_id in user_ids:
-                self._invalidate_cache_and_stream(
-                    txn, self._get_full_presence_stream_token_for_user, (user_id,)
-                )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self._get_full_presence_stream_token_for_user,
+                [(user_id,) for user_id in user_ids],
+            )
 
         return await self.db_pool.runInteraction(
             "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 1e11bf2706..1a5b5731bb 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         # so make sure to keep this actually last.
         txn.execute("DROP TABLE events_to_purge")
 
-        for event_id, should_delete in event_rows:
-            self._invalidate_cache_and_stream(
-                txn, self._get_state_group_for_event, (event_id,)
-            )
+        self._invalidate_cache_and_stream_bulk(
+            txn,
+            self._get_state_group_for_event,
+            [(event_id,) for event_id, _ in event_rows],
+        )
 
-            # XXX: This is racy, since have_seen_events could be called between the
-            #    transaction completing and the invalidation running. On the other hand,
-            #    that's no different to calling `have_seen_events` just before the
-            #    event is deleted from the database.
+        # XXX: This is racy, since have_seen_events could be called between the
+        #    transaction completing and the invalidation running. On the other hand,
+        #    that's no different to calling `have_seen_events` just before the
+        #    event is deleted from the database.
+        self._invalidate_cache_and_stream_bulk(
+            txn,
+            self.have_seen_event,
+            [
+                (room_id, event_id)
+                for event_id, should_delete in event_rows
+                if should_delete
+            ],
+        )
+
+        for event_id, should_delete in event_rows:
             if should_delete:
-                self._invalidate_cache_and_stream(
-                    txn, self.have_seen_event, (room_id, event_id)
-                )
                 self.invalidate_get_event_cache_after_txn(txn, event_id)
 
         logger.info("[purge] done")
@@ -485,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         #  - room_tags_revisions
         #       The problem with these is that they are largeish and there is no room_id
         #       index on them. In any case we should be clearing out 'stream' tables
-        #       periodically anyway (#5888)
+        #       periodically anyway (https://github.com/matrix-org/synapse/issues/5888)
 
         self._invalidate_caches_for_room_and_stream(txn, room_id)
 
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index f72a23c584..cf622e195c 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore):
         before: str,
         after: str,
     ) -> None:
-        # Lock the table since otherwise we'll have annoying races between the
-        # SELECT here and the UPSERT below.
-        self.database_engine.lock_table(txn, "push_rules")
-
         relative_to_rule = before or after
 
-        res = self.db_pool.simple_select_one_txn(
-            txn,
-            table="push_rules",
-            keyvalues={"user_name": user_id, "rule_id": relative_to_rule},
-            retcols=["priority_class", "priority"],
-            allow_none=True,
-        )
+        sql = """
+            SELECT priority, priority_class FROM push_rules
+            WHERE user_name = ? AND rule_id = ?
+        """
+
+        if isinstance(self.database_engine, PostgresEngine):
+            sql += " FOR UPDATE"
+        else:
+            # Annoyingly SQLite doesn't support row level locking, so lock the whole table
+            self.database_engine.lock_table(txn, "push_rules")
+
+        txn.execute(sql, (user_id, relative_to_rule))
+        row = txn.fetchone()
 
-        if not res:
+        if row is None:
             raise RuleNotFoundException(
                 "before/after rule not found: %s" % (relative_to_rule,)
             )
 
-        base_priority_class, base_rule_priority = res
+        base_rule_priority, base_priority_class = row
 
         if base_priority_class != priority_class:
             raise InconsistentRuleException(
@@ -516,9 +518,18 @@ class PushRuleStore(PushRulesWorkerStore):
         conditions_json: str,
         actions_json: str,
     ) -> None:
-        # Lock the table since otherwise we'll have annoying races between the
-        # SELECT here and the UPSERT below.
-        self.database_engine.lock_table(txn, "push_rules")
+        if isinstance(self.database_engine, PostgresEngine):
+            # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first
+            # then re-select the count/max below.
+            sql = """
+                SELECT * FROM push_rules
+                WHERE user_name = ? and priority_class = ?
+                FOR UPDATE
+            """
+            txn.execute(sql, (user_id, priority_class))
+        else:
+            # Annoyingly SQLite doesn't support row level locking, so lock the whole table
+            self.database_engine.lock_table(txn, "push_rules")
 
         # find the highest priority rule in that class
         sql = (
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dec9858575..2c3f30e2eb 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 updatevalues={"shadow_banned": shadow_banned},
             )
             # In order for this to apply immediately, clear the cache for this user.
-            tokens = self.db_pool.simple_select_onecol_txn(
+            tokens = self.db_pool.simple_select_list_txn(
                 txn,
                 table="access_tokens",
                 keyvalues={"user_id": user_id},
-                retcol="token",
+                retcols=("token",),
+            )
+            self._invalidate_cache_and_stream_bulk(
+                txn, self.get_user_by_access_token, tokens
             )
-            for token in tokens:
-                self._invalidate_cache_and_stream(
-                    txn, self.get_user_by_access_token, (token,)
-                )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
         await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn)
@@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
             )
             tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
 
-            for token, _, _ in tokens_and_devices:
-                self._invalidate_cache_and_stream(
-                    txn, self.get_user_by_access_token, (token,)
-                )
+            self._invalidate_cache_and_stream_bulk(
+                txn,
+                self.get_user_by_access_token,
+                [(token,) for token, _, _ in tokens_and_devices],
+            )
 
             txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values)
 
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f4bef4c99b..e25d86818b 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
 
             # we have to set autocommit, because postgres refuses to
             # CREATE INDEX CONCURRENTLY without it.
-            conn.set_session(autocommit=True)
+            conn.engine.attempt_to_set_autocommit(conn.conn, True)
 
             try:
                 c = conn.cursor()
@@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                 # we should now be able to delete the GIST index.
                 c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist")
             finally:
-                conn.set_session(autocommit=False)
+                conn.engine.attempt_to_set_autocommit(conn.conn, False)
 
         if isinstance(self.database_engine, PostgresEngine):
             await self.db_pool.runWithConnection(create_index)
@@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
 
             def create_index(conn: LoggingDatabaseConnection) -> None:
                 conn.rollback()
-                conn.set_session(autocommit=True)
+                conn.engine.attempt_to_set_autocommit(conn.conn, True)
                 c = conn.cursor()
 
                 # We create with NULLS FIRST so that when we search *backwards*
@@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                     ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)
                     """
                 )
-                conn.set_session(autocommit=False)
+                conn.engine.attempt_to_set_autocommit(conn.conn, False)
 
             await self.db_pool.runWithConnection(create_index)
 
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 0f9c550b27..2c3151526d 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
             conn.rollback()
             if isinstance(self.database_engine, PostgresEngine):
                 # postgres insists on autocommit for the index
-                conn.set_session(autocommit=True)
+                conn.engine.attempt_to_set_autocommit(conn.conn, True)
                 try:
                     txn = conn.cursor()
                     txn.execute(
@@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
                     )
                     txn.execute("DROP INDEX IF EXISTS state_groups_state_id")
                 finally:
-                    conn.set_session(autocommit=False)
+                    conn.engine.attempt_to_set_autocommit(conn.conn, False)
             else:
                 txn = conn.cursor()
                 txn.execute(
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 6309363217..ec4c4041b7 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -38,7 +38,8 @@ class PostgresEngine(
         super().__init__(psycopg2, database_config)
         psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
 
-        # Disables passing `bytes` to txn.execute, c.f. #6186. If you do
+        # Disables passing `bytes` to txn.execute, c.f.
+        # https://github.com/matrix-org/synapse/issues/6186. If you do
         # actually want to use bytes than wrap it in `bytearray`.
         def _disable_bytes_adapter(_: bytes) -> NoReturn:
             raise Exception("Passing bytes to DB is disabled.")
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 158b528dce..03e5a0f55d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -109,7 +109,8 @@ Changes in SCHEMA_VERSION = 78
 
 Changes in SCHEMA_VERSION = 79
     - Add tables to handle in DB read-write locks.
-    - Add some mitigations for a painful race between foreground and background updates, cf #15677.
+    - Add some mitigations for a painful race between foreground and background updates, cf
+      https://github.com/matrix-org/synapse/issues/15677.
 
 Changes in SCHEMA_VERSION = 80
     - The event_txn_id_device_id is always written to for new events.
diff --git a/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres b/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres
new file mode 100644
index 0000000000..6bdd1f9569
--- /dev/null
+++ b/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+
+
+-- Re-use unique indices already defined on tables as a replica identity.
+ALTER TABLE applied_module_schemas REPLICA IDENTITY USING INDEX applied_module_schemas_module_name_file_key;
+ALTER TABLE applied_schema_deltas REPLICA IDENTITY USING INDEX applied_schema_deltas_version_file_key;
+ALTER TABLE background_updates REPLICA IDENTITY USING INDEX background_updates_uniqueness;
+ALTER TABLE schema_compat_version REPLICA IDENTITY USING INDEX schema_compat_version_lock_key;
+ALTER TABLE schema_version REPLICA IDENTITY USING INDEX schema_version_lock_key;
+
+
diff --git a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql
index b062ec840c..f713e42aa0 100644
--- a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql
+++ b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql
@@ -14,7 +14,7 @@
  */
 
 -- Start a background job to cleanup extremities that were incorrectly added
--- by bug #5269.
+-- by bug https://github.com/matrix-org/synapse/issues/5269.
 INSERT INTO background_updates (update_name, progress_json) VALUES
   ('delete_soft_failed_extremities', '{}');
 
diff --git a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql
index aeb17813d3..246c3359f7 100644
--- a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql
+++ b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql
@@ -13,6 +13,7 @@
  * limitations under the License.
  */
 
--- Now that #6232 is a thing, we can remove old rooms from the directory.
+-- Now that https://github.com/matrix-org/synapse/pull/6232 is a thing, we can
+-- remove old rooms from the directory.
 INSERT INTO background_updates (update_name, progress_json) VALUES
   ('remove_tombstoned_rooms_from_directory', '{}');
diff --git a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql
index aed79635b2..31a61defa7 100644
--- a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql
+++ b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql
@@ -13,7 +13,8 @@
  * limitations under the License.
  */
 
--- Clean up left over rows from bug #11833, which was fixed in #12770.
+-- Clean up left over rows from bug https://github.com/matrix-org/synapse/issues/11833,
+-- which was fixed in https://github.com/matrix-org/synapse/pull/12770.
 DELETE FROM federation_inbound_events_staging WHERE room_id not in (
     SELECT room_id FROM rooms
 );
diff --git a/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres b/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres
new file mode 100644
index 0000000000..8296d56e56
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres
@@ -0,0 +1,88 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+
+
+-- Where possible, re-use unique indices already defined on tables as a replica
+-- identity.
+ALTER TABLE appservice_room_list REPLICA IDENTITY USING INDEX appservice_room_list_idx;
+ALTER TABLE batch_events REPLICA IDENTITY USING INDEX chunk_events_event_id;
+ALTER TABLE blocked_rooms REPLICA IDENTITY USING INDEX blocked_rooms_idx;
+ALTER TABLE cache_invalidation_stream_by_instance REPLICA IDENTITY USING INDEX cache_invalidation_stream_by_instance_id;
+ALTER TABLE device_lists_changes_in_room REPLICA IDENTITY USING INDEX device_lists_changes_in_stream_id;
+ALTER TABLE device_lists_outbound_last_success REPLICA IDENTITY USING INDEX device_lists_outbound_last_success_unique_idx;
+ALTER TABLE device_lists_remote_cache REPLICA IDENTITY USING INDEX device_lists_remote_cache_unique_id;
+ALTER TABLE device_lists_remote_extremeties REPLICA IDENTITY USING INDEX device_lists_remote_extremeties_unique_idx;
+ALTER TABLE device_lists_remote_resync REPLICA IDENTITY USING INDEX device_lists_remote_resync_idx;
+ALTER TABLE e2e_cross_signing_keys REPLICA IDENTITY USING INDEX e2e_cross_signing_keys_stream_idx;
+ALTER TABLE e2e_room_keys REPLICA IDENTITY USING INDEX e2e_room_keys_with_version_idx;
+ALTER TABLE e2e_room_keys_versions REPLICA IDENTITY USING INDEX e2e_room_keys_versions_idx;
+ALTER TABLE erased_users REPLICA IDENTITY USING INDEX erased_users_user;
+ALTER TABLE event_relations REPLICA IDENTITY USING INDEX event_relations_id;
+ALTER TABLE federation_inbound_events_staging REPLICA IDENTITY USING INDEX federation_inbound_events_staging_instance_event;
+ALTER TABLE federation_stream_position REPLICA IDENTITY USING INDEX federation_stream_position_instance;
+ALTER TABLE ignored_users REPLICA IDENTITY USING INDEX ignored_users_uniqueness;
+ALTER TABLE insertion_events REPLICA IDENTITY USING INDEX insertion_events_event_id;
+ALTER TABLE insertion_event_extremities REPLICA IDENTITY USING INDEX insertion_event_extremities_event_id;
+ALTER TABLE monthly_active_users REPLICA IDENTITY USING INDEX monthly_active_users_users;
+ALTER TABLE ratelimit_override REPLICA IDENTITY USING INDEX ratelimit_override_idx;
+ALTER TABLE room_stats_earliest_token REPLICA IDENTITY USING INDEX room_stats_earliest_token_idx;
+ALTER TABLE room_stats_state REPLICA IDENTITY USING INDEX room_stats_state_room;
+ALTER TABLE stream_positions REPLICA IDENTITY USING INDEX stream_positions_idx;
+ALTER TABLE user_directory REPLICA IDENTITY USING INDEX user_directory_user_idx;
+ALTER TABLE user_directory_search REPLICA IDENTITY USING INDEX user_directory_search_user_idx;
+ALTER TABLE user_ips REPLICA IDENTITY USING INDEX user_ips_user_token_ip_unique_index;
+ALTER TABLE user_signature_stream REPLICA IDENTITY USING INDEX user_signature_stream_idx;
+ALTER TABLE users_in_public_rooms REPLICA IDENTITY USING INDEX users_in_public_rooms_u_idx;
+ALTER TABLE users_who_share_private_rooms REPLICA IDENTITY USING INDEX users_who_share_private_rooms_u_idx;
+ALTER TABLE user_threepid_id_server REPLICA IDENTITY USING INDEX user_threepid_id_server_idx;
+ALTER TABLE worker_locks REPLICA IDENTITY USING INDEX worker_locks_key;
+
+
+-- Where there are no unique indices, use the entire rows as replica identities.
+ALTER TABLE current_state_delta_stream REPLICA IDENTITY FULL;
+ALTER TABLE deleted_pushers REPLICA IDENTITY FULL;
+ALTER TABLE device_auth_providers REPLICA IDENTITY FULL;
+ALTER TABLE device_federation_inbox REPLICA IDENTITY FULL;
+ALTER TABLE device_federation_outbox REPLICA IDENTITY FULL;
+ALTER TABLE device_inbox REPLICA IDENTITY FULL;
+ALTER TABLE device_lists_outbound_pokes REPLICA IDENTITY FULL;
+ALTER TABLE device_lists_stream REPLICA IDENTITY FULL;
+ALTER TABLE e2e_cross_signing_signatures REPLICA IDENTITY FULL;
+ALTER TABLE event_auth_chain_links REPLICA IDENTITY FULL;
+ALTER TABLE event_auth REPLICA IDENTITY FULL;
+ALTER TABLE event_push_actions_staging REPLICA IDENTITY FULL;
+ALTER TABLE insertion_event_edges REPLICA IDENTITY FULL;
+ALTER TABLE local_media_repository_url_cache REPLICA IDENTITY FULL;
+ALTER TABLE presence_stream REPLICA IDENTITY FULL;
+ALTER TABLE push_rules_stream REPLICA IDENTITY FULL;
+ALTER TABLE room_alias_servers REPLICA IDENTITY FULL;
+ALTER TABLE stream_ordering_to_exterm REPLICA IDENTITY FULL;
+ALTER TABLE timeline_gaps REPLICA IDENTITY FULL;
+ALTER TABLE user_daily_visits REPLICA IDENTITY FULL;
+ALTER TABLE users_pending_deactivation REPLICA IDENTITY FULL;
+
+-- special cases: unique indices on nullable columns can't be used
+ALTER TABLE event_push_summary REPLICA IDENTITY FULL;
+ALTER TABLE event_search REPLICA IDENTITY FULL;
+ALTER TABLE local_media_repository_thumbnails REPLICA IDENTITY FULL;
+ALTER TABLE remote_media_cache_thumbnails REPLICA IDENTITY FULL;
+ALTER TABLE threepid_guest_access_tokens REPLICA IDENTITY FULL;
+ALTER TABLE user_filters REPLICA IDENTITY FULL; -- sadly the `CHECK` constraint is not enough here
diff --git a/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql
new file mode 100644
index 0000000000..b74bdd71fa
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql
@@ -0,0 +1,15 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL;
\ No newline at end of file
diff --git a/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres b/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres
new file mode 100644
index 0000000000..a592af814e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres
@@ -0,0 +1,80 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+
+
+-- Where possible, re-use unique indices already defined on tables as a replica
+-- identity.
+ALTER TABLE account_data REPLICA IDENTITY USING INDEX account_data_uniqueness;
+ALTER TABLE application_services_txns REPLICA IDENTITY USING INDEX application_services_txns_as_id_txn_id_key;
+ALTER TABLE appservice_stream_position REPLICA IDENTITY USING INDEX appservice_stream_position_lock_key;
+ALTER TABLE current_state_events REPLICA IDENTITY USING INDEX current_state_events_event_id_key;
+ALTER TABLE device_lists_changes_converted_stream_position REPLICA IDENTITY USING INDEX device_lists_changes_converted_stream_position_lock_key;
+ALTER TABLE devices REPLICA IDENTITY USING INDEX device_uniqueness;
+ALTER TABLE e2e_device_keys_json REPLICA IDENTITY USING INDEX e2e_device_keys_json_uniqueness;
+ALTER TABLE e2e_fallback_keys_json REPLICA IDENTITY USING INDEX e2e_fallback_keys_json_uniqueness;
+ALTER TABLE e2e_one_time_keys_json REPLICA IDENTITY USING INDEX e2e_one_time_keys_json_uniqueness;
+ALTER TABLE event_backward_extremities REPLICA IDENTITY USING INDEX event_backward_extremities_event_id_room_id_key;
+ALTER TABLE event_edges REPLICA IDENTITY USING INDEX event_edges_event_id_prev_event_id_idx;
+ALTER TABLE event_forward_extremities REPLICA IDENTITY USING INDEX event_forward_extremities_event_id_room_id_key;
+ALTER TABLE event_json REPLICA IDENTITY USING INDEX event_json_event_id_key;
+ALTER TABLE event_push_summary_last_receipt_stream_id REPLICA IDENTITY USING INDEX event_push_summary_last_receipt_stream_id_lock_key;
+ALTER TABLE event_push_summary_stream_ordering REPLICA IDENTITY USING INDEX event_push_summary_stream_ordering_lock_key;
+ALTER TABLE events REPLICA IDENTITY USING INDEX events_event_id_key;
+ALTER TABLE event_to_state_groups REPLICA IDENTITY USING INDEX event_to_state_groups_event_id_key;
+ALTER TABLE event_txn_id_device_id REPLICA IDENTITY USING INDEX event_txn_id_device_id_event_id;
+ALTER TABLE event_txn_id REPLICA IDENTITY USING INDEX event_txn_id_event_id;
+ALTER TABLE local_current_membership REPLICA IDENTITY USING INDEX local_current_membership_idx;
+ALTER TABLE partial_state_events REPLICA IDENTITY USING INDEX partial_state_events_event_id_key;
+ALTER TABLE partial_state_rooms_servers REPLICA IDENTITY USING INDEX partial_state_rooms_servers_room_id_server_name_key;
+ALTER TABLE profiles REPLICA IDENTITY USING INDEX profiles_user_id_key;
+ALTER TABLE redactions REPLICA IDENTITY USING INDEX redactions_event_id_key;
+ALTER TABLE registration_tokens REPLICA IDENTITY USING INDEX registration_tokens_token_key;
+ALTER TABLE rejections REPLICA IDENTITY USING INDEX rejections_event_id_key;
+ALTER TABLE room_account_data REPLICA IDENTITY USING INDEX room_account_data_uniqueness;
+ALTER TABLE room_aliases REPLICA IDENTITY USING INDEX room_aliases_room_alias_key;
+ALTER TABLE room_depth REPLICA IDENTITY USING INDEX room_depth_room_id_key;
+ALTER TABLE room_forgetter_stream_pos REPLICA IDENTITY USING INDEX room_forgetter_stream_pos_lock_key;
+ALTER TABLE room_memberships REPLICA IDENTITY USING INDEX room_memberships_event_id_key;
+ALTER TABLE room_tags REPLICA IDENTITY USING INDEX room_tag_uniqueness;
+ALTER TABLE room_tags_revisions REPLICA IDENTITY USING INDEX room_tag_revisions_uniqueness;
+ALTER TABLE server_keys_json REPLICA IDENTITY USING INDEX server_keys_json_uniqueness;
+ALTER TABLE sessions REPLICA IDENTITY USING INDEX sessions_session_type_session_id_key;
+ALTER TABLE state_events REPLICA IDENTITY USING INDEX state_events_event_id_key;
+ALTER TABLE stats_incremental_position REPLICA IDENTITY USING INDEX stats_incremental_position_lock_key;
+ALTER TABLE threads REPLICA IDENTITY USING INDEX threads_uniqueness;
+ALTER TABLE ui_auth_sessions_credentials REPLICA IDENTITY USING INDEX ui_auth_sessions_credentials_session_id_stage_type_key;
+ALTER TABLE ui_auth_sessions_ips REPLICA IDENTITY USING INDEX ui_auth_sessions_ips_session_id_ip_user_agent_key;
+ALTER TABLE ui_auth_sessions REPLICA IDENTITY USING INDEX ui_auth_sessions_session_id_key;
+ALTER TABLE user_directory_stream_pos REPLICA IDENTITY USING INDEX user_directory_stream_pos_lock_key;
+ALTER TABLE user_external_ids REPLICA IDENTITY USING INDEX user_external_ids_auth_provider_external_id_key;
+ALTER TABLE user_threepids REPLICA IDENTITY USING INDEX medium_address;
+ALTER TABLE worker_read_write_locks_mode REPLICA IDENTITY USING INDEX worker_read_write_locks_mode_key;
+ALTER TABLE worker_read_write_locks REPLICA IDENTITY USING INDEX worker_read_write_locks_key;
+
+-- special cases: unique indices on nullable columns can't be used
+ALTER TABLE event_push_actions REPLICA IDENTITY FULL;
+ALTER TABLE local_media_repository REPLICA IDENTITY FULL;
+ALTER TABLE receipts_graph REPLICA IDENTITY FULL;
+ALTER TABLE receipts_linearized REPLICA IDENTITY FULL;
+ALTER TABLE received_transactions REPLICA IDENTITY FULL;
+ALTER TABLE remote_media_cache REPLICA IDENTITY FULL;
+ALTER TABLE server_signature_keys REPLICA IDENTITY FULL;
+ALTER TABLE users REPLICA IDENTITY FULL;
diff --git a/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres b/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres
new file mode 100644
index 0000000000..9b792a39e2
--- /dev/null
+++ b/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Annotate some tables in Postgres with a REPLICA IDENTITY.
+-- Any table that doesn't have a primary key should be annotated explicitly with
+-- a REPLICA IDENTITY so that logical replication can be used.
+-- If this is not done, then UPDATE and DELETE statements on those tables
+-- will fail if logical replication is in use.
+-- See also: 82/04_replica_identities.sql.postgres on the main database
+
+
+-- Where possible, re-use unique indices already defined on tables as a replica
+-- identity.
+ALTER TABLE state_group_edges REPLICA IDENTITY USING INDEX state_group_edges_unique_idx;
+
+
+-- Where there are no unique indices, use the entire rows as replica identities.
+ALTER TABLE state_groups_state REPLICA IDENTITY FULL;