summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/controllers/state.py137
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/filtering.py5
-rw-r--r--synapse/storage/databases/main/profile.py12
-rw-r--r--synapse/storage/databases/main/room.py2
-rw-r--r--synapse/storage/databases/main/roommember.py122
-rw-r--r--synapse/storage/databases/main/stats.py4
-rw-r--r--synapse/storage/databases/main/user_directory.py13
-rw-r--r--synapse/storage/schema/__init__.py7
-rw-r--r--synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py50
-rw-r--r--synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py54
-rw-r--r--synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres (renamed from synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres)64
-rw-r--r--synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite (renamed from synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite)59
-rw-r--r--synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py70
-rw-r--r--synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres69
-rw-r--r--synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite65
17 files changed, 377 insertions, 362 deletions
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 233df7cce2..278c7832ba 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from itertools import chain
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
@@ -19,14 +20,16 @@ from typing import (
     Callable,
     Collection,
     Dict,
+    FrozenSet,
     Iterable,
     List,
     Mapping,
     Optional,
     Tuple,
+    Union,
 )
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.logging.opentracing import tag_args, trace
 from synapse.storage.roommember import ProfileInfo
@@ -34,14 +37,20 @@ from synapse.storage.util.partial_state_events_tracker import (
     PartialCurrentStateTracker,
     PartialStateEventsTracker,
 )
-from synapse.types import MutableStateMap, StateMap
+from synapse.types import MutableStateMap, StateMap, get_domain_from_id
 from synapse.types.state import StateFilter
+from synapse.util.async_helpers import Linearizer
+from synapse.util.caches import intern_string
+from synapse.util.caches.descriptors import cached
 from synapse.util.cancellation import cancellable
+from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
+    from synapse.state import _StateCacheEntry
     from synapse.storage.databases import Databases
 
+
 logger = logging.getLogger(__name__)
 
 
@@ -52,10 +61,15 @@ class StateStorageController:
 
     def __init__(self, hs: "HomeServer", stores: "Databases"):
         self._is_mine_id = hs.is_mine_id
+        self._clock = hs.get_clock()
         self.stores = stores
         self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
         self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
 
+        # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
+        # at a time. Keyed by room_id.
+        self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
+
     def notify_event_un_partial_stated(self, event_id: str) -> None:
         self._partial_state_events_tracker.notify_un_partial_stated(event_id)
 
@@ -627,3 +641,122 @@ class StateStorageController:
         await self._partial_state_room_tracker.await_full_state(room_id)
 
         return await self.stores.main.get_users_in_room_with_profiles(room_id)
+
+    async def get_joined_hosts(
+        self, room_id: str, state_entry: "_StateCacheEntry"
+    ) -> FrozenSet[str]:
+        state_group: Union[object, int] = state_entry.state_group
+        if not state_group:
+            # If state_group is None it means it has yet to be assigned a
+            # state group, i.e. we need to make sure that calls with a state_group
+            # of None don't hit previous cached calls with a None state_group.
+            # To do this we set the state_group to a new object as object() != object()
+            state_group = object()
+
+        assert state_group is not None
+        with Measure(self._clock, "get_joined_hosts"):
+            return await self._get_joined_hosts(
+                room_id, state_group, state_entry=state_entry
+            )
+
+    @cached(num_args=2, max_entries=10000, iterable=True)
+    async def _get_joined_hosts(
+        self,
+        room_id: str,
+        state_group: Union[object, int],
+        state_entry: "_StateCacheEntry",
+    ) -> FrozenSet[str]:
+        # We don't use `state_group`, it's there so that we can cache based on
+        # it. However, its important that its never None, since two
+        # current_state's with a state_group of None are likely to be different.
+        #
+        # The `state_group` must match the `state_entry.state_group` (if not None).
+        assert state_group is not None
+        assert state_entry.state_group is None or state_entry.state_group == state_group
+
+        # We use a secondary cache of previous work to allow us to build up the
+        # joined hosts for the given state group based on previous state groups.
+        #
+        # We cache one object per room containing the results of the last state
+        # group we got joined hosts for. The idea is that generally
+        # `get_joined_hosts` is called with the "current" state group for the
+        # room, and so consecutive calls will be for consecutive state groups
+        # which point to the previous state group.
+        cache = await self.stores.main._get_joined_hosts_cache(room_id)
+
+        # If the state group in the cache matches, we already have the data we need.
+        if state_entry.state_group == cache.state_group:
+            return frozenset(cache.hosts_to_joined_users)
+
+        # Since we'll mutate the cache we need to lock.
+        async with self._joined_host_linearizer.queue(room_id):
+            if state_entry.state_group == cache.state_group:
+                # Same state group, so nothing to do. We've already checked for
+                # this above, but the cache may have changed while waiting on
+                # the lock.
+                pass
+            elif state_entry.prev_group == cache.state_group:
+                # The cached work is for the previous state group, so we work out
+                # the delta.
+                assert state_entry.delta_ids is not None
+                for (typ, state_key), event_id in state_entry.delta_ids.items():
+                    if typ != EventTypes.Member:
+                        continue
+
+                    host = intern_string(get_domain_from_id(state_key))
+                    user_id = state_key
+                    known_joins = cache.hosts_to_joined_users.setdefault(host, set())
+
+                    event = await self.stores.main.get_event(event_id)
+                    if event.membership == Membership.JOIN:
+                        known_joins.add(user_id)
+                    else:
+                        known_joins.discard(user_id)
+
+                        if not known_joins:
+                            cache.hosts_to_joined_users.pop(host, None)
+            else:
+                # The cache doesn't match the state group or prev state group,
+                # so we calculate the result from first principles.
+                #
+                # We need to fetch all hosts joined to the room according to `state` by
+                # inspecting all join memberships in `state`. However, if the `state` is
+                # relatively recent then many of its events are likely to be held in
+                # the current state of the room, which is easily available and likely
+                # cached.
+                #
+                # We therefore compute the set of `state` events not in the
+                # current state and only fetch those.
+                current_memberships = (
+                    await self.stores.main._get_approximate_current_memberships_in_room(
+                        room_id
+                    )
+                )
+                unknown_state_events = {}
+                joined_users_in_current_state = []
+
+                state = await state_entry.get_state(
+                    self, StateFilter.from_types([(EventTypes.Member, None)])
+                )
+
+                for (type, state_key), event_id in state.items():
+                    if event_id not in current_memberships:
+                        unknown_state_events[type, state_key] = event_id
+                    elif current_memberships[event_id] == Membership.JOIN:
+                        joined_users_in_current_state.append(state_key)
+
+                joined_user_ids = await self.stores.main.get_joined_user_ids_from_state(
+                    room_id, unknown_state_events
+                )
+
+                cache.hosts_to_joined_users = {}
+                for user_id in chain(joined_user_ids, joined_users_in_current_state):
+                    host = intern_string(get_domain_from_id(user_id))
+                    cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+            if state_entry.state_group:
+                cache.state_group = state_entry.state_group
+            else:
+                cache.state_group = object()
+
+        return frozenset(cache.hosts_to_joined_users)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index c9d687fb2f..a1c8fb0f46 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -98,8 +98,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
     "event_push_summary": "event_push_summary_unique_index2",
     "receipts_linearized": "receipts_linearized_unique_index",
     "receipts_graph": "receipts_graph_unique_index",
-    "profiles": "profiles_full_user_id_key_idx",
-    "user_filters": "full_users_filters_unique_idx",
 }
 
 
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index b6028853c9..be67d1ff22 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 
 from synapse.api.constants import Direction
 from synapse.config.homeserver import HomeServerConfig
@@ -196,7 +196,7 @@ class DataStore(
             txn: LoggingTransaction,
         ) -> Tuple[List[JsonDict], int]:
             filters = []
-            args: List[Union[str, int]] = []
+            args: list = []
 
             # Set ordering
             order_by_column = UserSortOrder(order_by).value
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 75f7fe8756..fff417f9e3 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -188,13 +188,14 @@ class FilteringWorkerStore(SQLBaseStore):
                 filter_id = max_id + 1
 
             sql = (
-                "INSERT INTO user_filters (full_user_id, filter_id, filter_json)"
-                "VALUES(?, ?, ?)"
+                "INSERT INTO user_filters (full_user_id, user_id, filter_id, filter_json)"
+                "VALUES(?, ?, ?, ?)"
             )
             txn.execute(
                 sql,
                 (
                     user_id.to_string(),
+                    user_id.localpart,
                     filter_id,
                     bytearray(def_json),
                 ),
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 660a5507b7..3ba9cc8853 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -173,9 +173,10 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     async def create_profile(self, user_id: UserID) -> None:
+        user_localpart = user_id.localpart
         await self.db_pool.simple_insert(
             table="profiles",
-            values={"full_user_id": user_id.to_string()},
+            values={"user_id": user_localpart, "full_user_id": user_id.to_string()},
             desc="create_profile",
         )
 
@@ -190,11 +191,13 @@ class ProfileWorkerStore(SQLBaseStore):
             new_displayname: The new display name. If this is None, the user's display
                 name is removed.
         """
+        user_localpart = user_id.localpart
         await self.db_pool.simple_upsert(
             table="profiles",
-            keyvalues={"full_user_id": user_id.to_string()},
+            keyvalues={"user_id": user_localpart},
             values={
                 "displayname": new_displayname,
+                "full_user_id": user_id.to_string(),
             },
             desc="set_profile_displayname",
         )
@@ -210,10 +213,11 @@ class ProfileWorkerStore(SQLBaseStore):
             new_avatar_url: The new avatar URL. If this is None, the user's avatar is
                 removed.
         """
+        user_localpart = user_id.localpart
         await self.db_pool.simple_upsert(
             table="profiles",
-            keyvalues={"full_user_id": user_id.to_string()},
-            values={"avatar_url": new_avatar_url},
+            keyvalues={"user_id": user_localpart},
+            values={"avatar_url": new_avatar_url, "full_user_id": user_id.to_string()},
             desc="set_profile_avatar_url",
         )
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index ca8be8c80d..830658f328 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2136,7 +2136,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             raise StoreError(400, "No create event in state")
 
         # Before MSC2175, the room creator was a separate field.
-        if not room_version.msc2175_implicit_room_creator:
+        if not room_version.implicit_room_creator:
             room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
 
             if not isinstance(room_creator, str):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 582875c91a..fff259f74c 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from itertools import chain
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
@@ -57,15 +56,12 @@ from synapse.types import (
     StrCollection,
     get_domain_from_id,
 )
-from synapse.util.async_helpers import Linearizer
-from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
-    from synapse.state import _StateCacheEntry
 
 logger = logging.getLogger(__name__)
 
@@ -91,10 +87,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
     ):
         super().__init__(database, db_conn, hs)
 
-        # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
-        # at a time. Keyed by room_id.
-        self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
-
         self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
 
         if (
@@ -1057,120 +1049,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             "get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn
         )
 
-    async def get_joined_hosts(
-        self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
-    ) -> FrozenSet[str]:
-        state_group: Union[object, int] = state_entry.state_group
-        if not state_group:
-            # If state_group is None it means it has yet to be assigned a
-            # state group, i.e. we need to make sure that calls with a state_group
-            # of None don't hit previous cached calls with a None state_group.
-            # To do this we set the state_group to a new object as object() != object()
-            state_group = object()
-
-        assert state_group is not None
-        with Measure(self._clock, "get_joined_hosts"):
-            return await self._get_joined_hosts(
-                room_id, state_group, state, state_entry=state_entry
-            )
-
-    @cached(num_args=2, max_entries=10000, iterable=True)
-    async def _get_joined_hosts(
-        self,
-        room_id: str,
-        state_group: Union[object, int],
-        state: StateMap[str],
-        state_entry: "_StateCacheEntry",
-    ) -> FrozenSet[str]:
-        # We don't use `state_group`, it's there so that we can cache based on
-        # it. However, its important that its never None, since two
-        # current_state's with a state_group of None are likely to be different.
-        #
-        # The `state_group` must match the `state_entry.state_group` (if not None).
-        assert state_group is not None
-        assert state_entry.state_group is None or state_entry.state_group == state_group
-
-        # We use a secondary cache of previous work to allow us to build up the
-        # joined hosts for the given state group based on previous state groups.
-        #
-        # We cache one object per room containing the results of the last state
-        # group we got joined hosts for. The idea is that generally
-        # `get_joined_hosts` is called with the "current" state group for the
-        # room, and so consecutive calls will be for consecutive state groups
-        # which point to the previous state group.
-        cache = await self._get_joined_hosts_cache(room_id)
-
-        # If the state group in the cache matches, we already have the data we need.
-        if state_entry.state_group == cache.state_group:
-            return frozenset(cache.hosts_to_joined_users)
-
-        # Since we'll mutate the cache we need to lock.
-        async with self._joined_host_linearizer.queue(room_id):
-            if state_entry.state_group == cache.state_group:
-                # Same state group, so nothing to do. We've already checked for
-                # this above, but the cache may have changed while waiting on
-                # the lock.
-                pass
-            elif state_entry.prev_group == cache.state_group:
-                # The cached work is for the previous state group, so we work out
-                # the delta.
-                assert state_entry.delta_ids is not None
-                for (typ, state_key), event_id in state_entry.delta_ids.items():
-                    if typ != EventTypes.Member:
-                        continue
-
-                    host = intern_string(get_domain_from_id(state_key))
-                    user_id = state_key
-                    known_joins = cache.hosts_to_joined_users.setdefault(host, set())
-
-                    event = await self.get_event(event_id)
-                    if event.membership == Membership.JOIN:
-                        known_joins.add(user_id)
-                    else:
-                        known_joins.discard(user_id)
-
-                        if not known_joins:
-                            cache.hosts_to_joined_users.pop(host, None)
-            else:
-                # The cache doesn't match the state group or prev state group,
-                # so we calculate the result from first principles.
-                #
-                # We need to fetch all hosts joined to the room according to `state` by
-                # inspecting all join memberships in `state`. However, if the `state` is
-                # relatively recent then many of its events are likely to be held in
-                # the current state of the room, which is easily available and likely
-                # cached.
-                #
-                # We therefore compute the set of `state` events not in the
-                # current state and only fetch those.
-                current_memberships = (
-                    await self._get_approximate_current_memberships_in_room(room_id)
-                )
-                unknown_state_events = {}
-                joined_users_in_current_state = []
-
-                for (type, state_key), event_id in state.items():
-                    if event_id not in current_memberships:
-                        unknown_state_events[type, state_key] = event_id
-                    elif current_memberships[event_id] == Membership.JOIN:
-                        joined_users_in_current_state.append(state_key)
-
-                joined_user_ids = await self.get_joined_user_ids_from_state(
-                    room_id, unknown_state_events
-                )
-
-                cache.hosts_to_joined_users = {}
-                for user_id in chain(joined_user_ids, joined_users_in_current_state):
-                    host = intern_string(get_domain_from_id(user_id))
-                    cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
-
-            if state_entry.state_group:
-                cache.state_group = state_entry.state_group
-            else:
-                cache.state_group = object()
-
-        return frozenset(cache.hosts_to_joined_users)
-
     async def _get_approximate_current_memberships_in_room(
         self, room_id: str
     ) -> Mapping[str, Optional[str]]:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 97c4dc2603..f34b7ce8f4 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -697,7 +697,7 @@ class StatsStore(StateDeltasStore):
             txn: LoggingTransaction,
         ) -> Tuple[List[JsonDict], int]:
             filters = []
-            args = [self.hs.config.server.server_name]
+            args: list = []
 
             if search_term:
                 filters.append("(lmr.user_id LIKE ? OR displayname LIKE ?)")
@@ -733,7 +733,7 @@ class StatsStore(StateDeltasStore):
 
             sql_base = """
                 FROM local_media_repository as lmr
-                LEFT JOIN profiles AS p ON lmr.user_id = '@' || p.user_id || ':' || ?
+                LEFT JOIN profiles AS p ON lmr.user_id = p.full_user_id
                 {}
                 GROUP BY lmr.user_id, displayname
             """.format(
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 924022c95c..2a136f2ff6 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -409,23 +409,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                 txn, users_to_work_on
             )
 
-            # Next fetch their profiles. Note that the `user_id` here is the
-            # *localpart*, and that not all users have profiles.
+            # Next fetch their profiles. Note that not all users have profiles.
             profile_rows = self.db_pool.simple_select_many_txn(
                 txn,
                 table="profiles",
-                column="user_id",
-                iterable=[get_localpart_from_id(u) for u in users_to_insert],
+                column="full_user_id",
+                iterable=list(users_to_insert),
                 retcols=(
-                    "user_id",
+                    "full_user_id",
                     "displayname",
                     "avatar_url",
                 ),
                 keyvalues={},
             )
             profiles = {
-                f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
-                    f"@{row['user_id']}:{self.server_name}",
+                row["full_user_id"]: _UserDirProfile(
+                    row["full_user_id"],
                     row["displayname"],
                     row["avatar_url"],
                 )
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 6d14963c0a..d3ec648f6d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -108,7 +108,8 @@ Changes in SCHEMA_VERSION = 78
     - Validate check (full_user_id IS NOT NULL) on tables profiles and user_filters
 
 Changes in SCHEMA_VERSION = 79
-    - We no longer write to column user_id of tables profiles and user_filters
+    - Add tables to handle in DB read-write locks.
+    - Add some mitigations for a painful race between foreground and background updates, cf #15677.
 """
 
 
@@ -121,9 +122,7 @@ SCHEMA_COMPAT_VERSION = (
     #
     # insertions to the column `full_user_id` of tables profiles and user_filters can no
     # longer be null
-    #
-    # we no longer write to column `full_user_id` of tables profiles and user_filters
-    78
+    76
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py b/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py
deleted file mode 100644
index 3541266f7d..0000000000
--- a/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py
+++ /dev/null
@@ -1,50 +0,0 @@
-from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-
-
-def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
-    """
-    Update to drop the NOT NULL constraint on column user_id so that we can cease to
-    write to it without inserts to other columns triggering the constraint
-    """
-
-    if isinstance(database_engine, PostgresEngine):
-        drop_sql = """
-        ALTER TABLE profiles ALTER COLUMN user_id DROP NOT NULL
-        """
-        cur.execute(drop_sql)
-    else:
-        # irritatingly in SQLite we need to rewrite the table to drop the constraint.
-        cur.execute("DROP TABLE IF EXISTS temp_profiles")
-
-        create_sql = """
-        CREATE TABLE temp_profiles (
-            full_user_id text NOT NULL,
-            user_id text,
-            displayname text,
-            avatar_url text,
-            UNIQUE (full_user_id),
-            UNIQUE (user_id)
-        )
-        """
-        cur.execute(create_sql)
-
-        copy_sql = """
-        INSERT INTO temp_profiles (
-            user_id,
-            displayname,
-            avatar_url,
-            full_user_id)
-            SELECT user_id, displayname, avatar_url, full_user_id FROM profiles
-        """
-        cur.execute(copy_sql)
-
-        drop_sql = """
-        DROP TABLE profiles
-        """
-        cur.execute(drop_sql)
-
-        rename_sql = """
-        ALTER TABLE temp_profiles RENAME to profiles
-        """
-        cur.execute(rename_sql)
diff --git a/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py b/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py
deleted file mode 100644
index 8e7569c470..0000000000
--- a/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-
-
-def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
-    """
-    Update to drop the NOT NULL constraint on column user_id so that we can cease to
-    write to it without inserts to other columns triggering the constraint
-    """
-    if isinstance(database_engine, PostgresEngine):
-        drop_sql = """
-        ALTER TABLE user_filters ALTER COLUMN user_id DROP NOT NULL
-        """
-        cur.execute(drop_sql)
-
-    else:
-        # irritatingly in SQLite we need to rewrite the table to drop the constraint.
-        cur.execute("DROP TABLE IF EXISTS temp_user_filters")
-
-        create_sql = """
-        CREATE TABLE temp_user_filters (
-            full_user_id text NOT NULL,
-            user_id text,
-            filter_id bigint NOT NULL,
-            filter_json bytea NOT NULL
-        )
-        """
-        cur.execute(create_sql)
-
-        index_sql = """
-            CREATE UNIQUE INDEX IF NOT EXISTS user_filters_full_user_id_unique ON
-            temp_user_filters (full_user_id, filter_id)
-        """
-        cur.execute(index_sql)
-
-        copy_sql = """
-            INSERT INTO temp_user_filters (
-                user_id,
-                filter_id,
-                filter_json,
-                full_user_id)
-            SELECT user_id, filter_id, filter_json, full_user_id FROM user_filters
-        """
-        cur.execute(copy_sql)
-
-        drop_sql = """
-        DROP TABLE user_filters
-        """
-        cur.execute(drop_sql)
-
-        rename_sql = """
-        ALTER TABLE temp_user_filters RENAME to user_filters
-        """
-        cur.execute(rename_sql)
diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres
index e1a41be9c9..7df07ab0da 100644
--- a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres
+++ b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres
@@ -44,7 +44,7 @@
 
 -- A table to track whether a lock is currently acquired, and if so whether its
 -- in read or write mode.
-CREATE TABLE worker_read_write_locks_mode (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks_mode (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- Whether this lock is in read (false) or write (true) mode
@@ -55,14 +55,14 @@ CREATE TABLE worker_read_write_locks_mode (
 );
 
 -- Ensure that we can only have one row per lock
-CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
 -- We need this (redundant) constraint so that we can have a foreign key
 -- constraint against this table.
-CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
 
 
 -- A table to track who has currently acquired a given lock.
-CREATE TABLE worker_read_write_locks (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- We write the instance name to ease manual debugging, we don't ever read
@@ -84,9 +84,9 @@ CREATE TABLE worker_read_write_locks (
     FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
 );
 
-CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
 -- Ensures that only one instance can acquire a lock in write mode at a time.
-CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
 
 
 -- Add a foreign key constraint to ensure that if a lock is in
@@ -97,56 +97,6 @@ CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lo
 -- We only add to PostgreSQL as SQLite does not support adding constraints
 -- after table creation, and so doesn't support "circular" foreign key
 -- constraints.
+ALTER TABLE worker_read_write_locks_mode DROP CONSTRAINT IF EXISTS worker_read_write_locks_mode_foreign;
 ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign
     FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED;
-
-
--- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
--- and acquire a lock, i.e. insert into `worker_read_write_locks`,
-CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
-BEGIN
-    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
-        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
-        ON CONFLICT (lock_name, lock_key)
-        DO NOTHING;
-    RETURN NEW;
-END
-$$
-LANGUAGE plpgsql;
-
-CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
-    FOR EACH ROW
-    EXECUTE PROCEDURE upsert_read_write_lock_parent();
-
-
--- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
--- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
--- update the `worker_read_write_locks_mode.token` to match another instance
--- that has currently acquired the lock, or we delete the row if nobody has
--- currently acquired a lock.
-CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$
-DECLARE
-    new_token TEXT;
-BEGIN
-    SELECT token INTO new_token FROM worker_read_write_locks
-        WHERE
-            lock_name = OLD.lock_name
-            AND lock_key = OLD.lock_key;
-
-    IF NOT FOUND THEN
-        DELETE FROM worker_read_write_locks_mode
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
-    ELSE
-        UPDATE worker_read_write_locks_mode
-            SET token = new_token
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
-    END IF;
-
-    RETURN NEW;
-END
-$$
-LANGUAGE plpgsql;
-
-CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks
-    FOR EACH ROW
-    EXECUTE PROCEDURE delete_read_write_lock_parent();
diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite
index be2dfbbb8a..95f9dbf120 100644
--- a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite
+++ b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite
@@ -22,7 +22,7 @@
 
 -- A table to track whether a lock is currently acquired, and if so whether its
 -- in read or write mode.
-CREATE TABLE worker_read_write_locks_mode (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks_mode (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- Whether this lock is in read (false) or write (true) mode
@@ -38,14 +38,14 @@ CREATE TABLE worker_read_write_locks_mode (
 );
 
 -- Ensure that we can only have one row per lock
-CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
 -- We need this (redundant) constraint so that we can have a foreign key
 -- constraint against this table.
-CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
 
 
 -- A table to track who has currently acquired a given lock.
-CREATE TABLE worker_read_write_locks (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- We write the instance name to ease manual debugging, we don't ever read
@@ -67,53 +67,6 @@ CREATE TABLE worker_read_write_locks (
     FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
 );
 
-CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
 -- Ensures that only one instance can acquire a lock in write mode at a time.
-CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
-
-
--- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
--- and acquire a lock, i.e. insert into `worker_read_write_locks`,
-CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger
-BEFORE INSERT ON worker_read_write_locks
-FOR EACH ROW
-BEGIN
-    -- First ensure that `worker_read_write_locks_mode` doesn't have stale
-    -- entries in it, as on SQLite we don't have the foreign key constraint to
-    -- enforce this.
-    DELETE FROM worker_read_write_locks_mode
-        WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
-        AND NOT EXISTS (
-            SELECT 1 FROM worker_read_write_locks
-            WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
-        );
-
-    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
-        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
-        ON CONFLICT (lock_name, lock_key)
-        DO NOTHING;
-END;
-
--- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
--- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
--- update the `worker_read_write_locks_mode.token` to match another instance
--- that has currently acquired the lock, or we delete the row if nobody has
--- currently acquired a lock.
-CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger
-AFTER DELETE ON worker_read_write_locks
-FOR EACH ROW
-BEGIN
-    DELETE FROM worker_read_write_locks_mode
-        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
-        AND NOT EXISTS (
-            SELECT 1 FROM worker_read_write_locks
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
-        );
-
-    UPDATE worker_read_write_locks_mode
-        SET token = (
-            SELECT token FROM worker_read_write_locks
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
-        )
-        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
-END;
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
diff --git a/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py b/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py
new file mode 100644
index 0000000000..ae63585847
--- /dev/null
+++ b/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py
@@ -0,0 +1,70 @@
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+
+
+def run_create(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+) -> None:
+    """
+    An attempt to mitigate a painful race between foreground and background updates
+    touching the `stream_ordering` column of the events table. More info can be found
+    at https://github.com/matrix-org/synapse/issues/15677.
+    """
+
+    # technically the bg update we're concerned with below should only have been added in
+    # postgres but it doesn't hurt to be extra careful
+    if isinstance(database_engine, PostgresEngine):
+        select_sql = """
+            SELECT 1 FROM background_updates
+                WHERE update_name = 'replace_stream_ordering_column'
+        """
+        cur.execute(select_sql)
+        res = cur.fetchone()
+
+        # if the background update `replace_stream_ordering_column` is still pending, we need
+        # to drop the indexes added in 7403, and re-add them to the column `stream_ordering2`
+        # with the idea that they will be preserved when the column is renamed `stream_ordering`
+        # after the background update has finished
+        if res:
+            drop_cse_sql = """
+            ALTER TABLE current_state_events DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey
+            """
+            cur.execute(drop_cse_sql)
+
+            drop_lcm_sql = """
+            ALTER TABLE local_current_membership DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey
+            """
+            cur.execute(drop_lcm_sql)
+
+            drop_rm_sql = """
+            ALTER TABLE room_memberships DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey
+            """
+            cur.execute(drop_rm_sql)
+
+            add_cse_sql = """
+            ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey
+            FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID;
+            """
+            cur.execute(add_cse_sql)
+
+            add_lcm_sql = """
+            ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey
+            FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID;
+            """
+            cur.execute(add_lcm_sql)
+
+            add_rm_sql = """
+            ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey
+            FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID;
+            """
+            cur.execute(add_rm_sql)
diff --git a/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres
new file mode 100644
index 0000000000..ea3496ef2d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres
@@ -0,0 +1,69 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
+
+-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
+-- and acquire a lock, i.e. insert into `worker_read_write_locks`,
+CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
+BEGIN
+    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
+        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
+        ON CONFLICT (lock_name, lock_key)
+        DO UPDATE SET write_lock = NEW.write_lock, token = NEW.token;
+    RETURN NEW;
+END
+$$
+LANGUAGE plpgsql;
+
+DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks;
+CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
+    FOR EACH ROW
+    EXECUTE PROCEDURE upsert_read_write_lock_parent();
+
+
+-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
+-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
+-- update the `worker_read_write_locks_mode.token` to match another instance
+-- that has currently acquired the lock, or we delete the row if nobody has
+-- currently acquired a lock.
+CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$
+DECLARE
+    new_token TEXT;
+BEGIN
+    SELECT token INTO new_token FROM worker_read_write_locks
+        WHERE
+            lock_name = OLD.lock_name
+            AND lock_key = OLD.lock_key
+        LIMIT 1 FOR UPDATE;
+
+    IF NOT FOUND THEN
+        DELETE FROM worker_read_write_locks_mode
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key AND token = OLD.token;
+    ELSE
+        UPDATE worker_read_write_locks_mode
+            SET token = new_token
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
+    END IF;
+
+    RETURN NEW;
+END
+$$
+LANGUAGE plpgsql;
+
+DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger ON worker_read_write_locks;
+CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks
+    FOR EACH ROW
+    EXECUTE PROCEDURE delete_read_write_lock_parent();
diff --git a/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite
new file mode 100644
index 0000000000..acb1a77c80
--- /dev/null
+++ b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite
@@ -0,0 +1,65 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
+
+-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
+-- and acquire a lock, i.e. insert into `worker_read_write_locks`,
+DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger;
+CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger
+BEFORE INSERT ON worker_read_write_locks
+FOR EACH ROW
+BEGIN
+    -- First ensure that `worker_read_write_locks_mode` doesn't have stale
+    -- entries in it, as on SQLite we don't have the foreign key constraint to
+    -- enforce this.
+    DELETE FROM worker_read_write_locks_mode
+        WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
+        AND NOT EXISTS (
+            SELECT 1 FROM worker_read_write_locks
+            WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
+        );
+
+    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
+        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
+        ON CONFLICT (lock_name, lock_key)
+        DO UPDATE SET write_lock = NEW.write_lock, token = NEW.token;
+END;
+
+-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
+-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
+-- update the `worker_read_write_locks_mode.token` to match another instance
+-- that has currently acquired the lock, or we delete the row if nobody has
+-- currently acquired a lock.
+DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger;
+CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger
+AFTER DELETE ON worker_read_write_locks
+FOR EACH ROW
+BEGIN
+    DELETE FROM worker_read_write_locks_mode
+        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
+            AND token = OLD.token
+        AND NOT EXISTS (
+            SELECT 1 FROM worker_read_write_locks
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
+        );
+
+    UPDATE worker_read_write_locks_mode
+        SET token = (
+            SELECT token FROM worker_read_write_locks
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
+        )
+        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
+END;