diff options
Diffstat (limited to 'synapse/storage')
17 files changed, 377 insertions, 362 deletions
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 233df7cce2..278c7832ba 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, @@ -19,14 +20,16 @@ from typing import ( Callable, Collection, Dict, + FrozenSet, Iterable, List, Mapping, Optional, Tuple, + Union, ) -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.logging.opentracing import tag_args, trace from synapse.storage.roommember import ProfileInfo @@ -34,14 +37,20 @@ from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, PartialStateEventsTracker, ) -from synapse.types import MutableStateMap, StateMap +from synapse.types import MutableStateMap, StateMap, get_domain_from_id from synapse.types.state import StateFilter +from synapse.util.async_helpers import Linearizer +from synapse.util.caches import intern_string +from synapse.util.caches.descriptors import cached from synapse.util.cancellation import cancellable +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer + from synapse.state import _StateCacheEntry from synapse.storage.databases import Databases + logger = logging.getLogger(__name__) @@ -52,10 +61,15 @@ class StateStorageController: def __init__(self, hs: "HomeServer", stores: "Databases"): self._is_mine_id = hs.is_mine_id + self._clock = hs.get_clock() self.stores = stores self._partial_state_events_tracker = PartialStateEventsTracker(stores.main) self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main) + # Used by `_get_joined_hosts` to ensure only one thing mutates the cache + # at a time. Keyed by room_id. + self._joined_host_linearizer = Linearizer("_JoinedHostsCache") + def notify_event_un_partial_stated(self, event_id: str) -> None: self._partial_state_events_tracker.notify_un_partial_stated(event_id) @@ -627,3 +641,122 @@ class StateStorageController: await self._partial_state_room_tracker.await_full_state(room_id) return await self.stores.main.get_users_in_room_with_profiles(room_id) + + async def get_joined_hosts( + self, room_id: str, state_entry: "_StateCacheEntry" + ) -> FrozenSet[str]: + state_group: Union[object, int] = state_entry.state_group + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + assert state_group is not None + with Measure(self._clock, "get_joined_hosts"): + return await self._get_joined_hosts( + room_id, state_group, state_entry=state_entry + ) + + @cached(num_args=2, max_entries=10000, iterable=True) + async def _get_joined_hosts( + self, + room_id: str, + state_group: Union[object, int], + state_entry: "_StateCacheEntry", + ) -> FrozenSet[str]: + # We don't use `state_group`, it's there so that we can cache based on + # it. However, its important that its never None, since two + # current_state's with a state_group of None are likely to be different. + # + # The `state_group` must match the `state_entry.state_group` (if not None). + assert state_group is not None + assert state_entry.state_group is None or state_entry.state_group == state_group + + # We use a secondary cache of previous work to allow us to build up the + # joined hosts for the given state group based on previous state groups. + # + # We cache one object per room containing the results of the last state + # group we got joined hosts for. The idea is that generally + # `get_joined_hosts` is called with the "current" state group for the + # room, and so consecutive calls will be for consecutive state groups + # which point to the previous state group. + cache = await self.stores.main._get_joined_hosts_cache(room_id) + + # If the state group in the cache matches, we already have the data we need. + if state_entry.state_group == cache.state_group: + return frozenset(cache.hosts_to_joined_users) + + # Since we'll mutate the cache we need to lock. + async with self._joined_host_linearizer.queue(room_id): + if state_entry.state_group == cache.state_group: + # Same state group, so nothing to do. We've already checked for + # this above, but the cache may have changed while waiting on + # the lock. + pass + elif state_entry.prev_group == cache.state_group: + # The cached work is for the previous state group, so we work out + # the delta. + assert state_entry.delta_ids is not None + for (typ, state_key), event_id in state_entry.delta_ids.items(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = cache.hosts_to_joined_users.setdefault(host, set()) + + event = await self.stores.main.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + cache.hosts_to_joined_users.pop(host, None) + else: + # The cache doesn't match the state group or prev state group, + # so we calculate the result from first principles. + # + # We need to fetch all hosts joined to the room according to `state` by + # inspecting all join memberships in `state`. However, if the `state` is + # relatively recent then many of its events are likely to be held in + # the current state of the room, which is easily available and likely + # cached. + # + # We therefore compute the set of `state` events not in the + # current state and only fetch those. + current_memberships = ( + await self.stores.main._get_approximate_current_memberships_in_room( + room_id + ) + ) + unknown_state_events = {} + joined_users_in_current_state = [] + + state = await state_entry.get_state( + self, StateFilter.from_types([(EventTypes.Member, None)]) + ) + + for (type, state_key), event_id in state.items(): + if event_id not in current_memberships: + unknown_state_events[type, state_key] = event_id + elif current_memberships[event_id] == Membership.JOIN: + joined_users_in_current_state.append(state_key) + + joined_user_ids = await self.stores.main.get_joined_user_ids_from_state( + room_id, unknown_state_events + ) + + cache.hosts_to_joined_users = {} + for user_id in chain(joined_user_ids, joined_users_in_current_state): + host = intern_string(get_domain_from_id(user_id)) + cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + if state_entry.state_group: + cache.state_group = state_entry.state_group + else: + cache.state_group = object() + + return frozenset(cache.hosts_to_joined_users) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index c9d687fb2f..a1c8fb0f46 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -98,8 +98,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = { "event_push_summary": "event_push_summary_unique_index2", "receipts_linearized": "receipts_linearized_unique_index", "receipts_graph": "receipts_graph_unique_index", - "profiles": "profiles_full_user_id_key_idx", - "user_filters": "full_users_filters_unique_idx", } diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index b6028853c9..be67d1ff22 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, List, Optional, Tuple, cast from synapse.api.constants import Direction from synapse.config.homeserver import HomeServerConfig @@ -196,7 +196,7 @@ class DataStore( txn: LoggingTransaction, ) -> Tuple[List[JsonDict], int]: filters = [] - args: List[Union[str, int]] = [] + args: list = [] # Set ordering order_by_column = UserSortOrder(order_by).value diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index 75f7fe8756..fff417f9e3 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -188,13 +188,14 @@ class FilteringWorkerStore(SQLBaseStore): filter_id = max_id + 1 sql = ( - "INSERT INTO user_filters (full_user_id, filter_id, filter_json)" - "VALUES(?, ?, ?)" + "INSERT INTO user_filters (full_user_id, user_id, filter_id, filter_json)" + "VALUES(?, ?, ?, ?)" ) txn.execute( sql, ( user_id.to_string(), + user_id.localpart, filter_id, bytearray(def_json), ), diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 660a5507b7..3ba9cc8853 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -173,9 +173,10 @@ class ProfileWorkerStore(SQLBaseStore): ) async def create_profile(self, user_id: UserID) -> None: + user_localpart = user_id.localpart await self.db_pool.simple_insert( table="profiles", - values={"full_user_id": user_id.to_string()}, + values={"user_id": user_localpart, "full_user_id": user_id.to_string()}, desc="create_profile", ) @@ -190,11 +191,13 @@ class ProfileWorkerStore(SQLBaseStore): new_displayname: The new display name. If this is None, the user's display name is removed. """ + user_localpart = user_id.localpart await self.db_pool.simple_upsert( table="profiles", - keyvalues={"full_user_id": user_id.to_string()}, + keyvalues={"user_id": user_localpart}, values={ "displayname": new_displayname, + "full_user_id": user_id.to_string(), }, desc="set_profile_displayname", ) @@ -210,10 +213,11 @@ class ProfileWorkerStore(SQLBaseStore): new_avatar_url: The new avatar URL. If this is None, the user's avatar is removed. """ + user_localpart = user_id.localpart await self.db_pool.simple_upsert( table="profiles", - keyvalues={"full_user_id": user_id.to_string()}, - values={"avatar_url": new_avatar_url}, + keyvalues={"user_id": user_localpart}, + values={"avatar_url": new_avatar_url, "full_user_id": user_id.to_string()}, desc="set_profile_avatar_url", ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index ca8be8c80d..830658f328 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -2136,7 +2136,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): raise StoreError(400, "No create event in state") # Before MSC2175, the room creator was a separate field. - if not room_version.msc2175_implicit_room_creator: + if not room_version.implicit_room_creator: room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) if not isinstance(room_creator, str): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 582875c91a..fff259f74c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, @@ -57,15 +56,12 @@ from synapse.types import ( StrCollection, get_domain_from_id, ) -from synapse.util.async_helpers import Linearizer -from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.state import _StateCacheEntry logger = logging.getLogger(__name__) @@ -91,10 +87,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ): super().__init__(database, db_conn, hs) - # Used by `_get_joined_hosts` to ensure only one thing mutates the cache - # at a time. Keyed by room_id. - self._joined_host_linearizer = Linearizer("_JoinedHostsCache") - self._server_notices_mxid = hs.config.servernotices.server_notices_mxid if ( @@ -1057,120 +1049,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): "get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn ) - async def get_joined_hosts( - self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry" - ) -> FrozenSet[str]: - state_group: Union[object, int] = state_entry.state_group - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() - - assert state_group is not None - with Measure(self._clock, "get_joined_hosts"): - return await self._get_joined_hosts( - room_id, state_group, state, state_entry=state_entry - ) - - @cached(num_args=2, max_entries=10000, iterable=True) - async def _get_joined_hosts( - self, - room_id: str, - state_group: Union[object, int], - state: StateMap[str], - state_entry: "_StateCacheEntry", - ) -> FrozenSet[str]: - # We don't use `state_group`, it's there so that we can cache based on - # it. However, its important that its never None, since two - # current_state's with a state_group of None are likely to be different. - # - # The `state_group` must match the `state_entry.state_group` (if not None). - assert state_group is not None - assert state_entry.state_group is None or state_entry.state_group == state_group - - # We use a secondary cache of previous work to allow us to build up the - # joined hosts for the given state group based on previous state groups. - # - # We cache one object per room containing the results of the last state - # group we got joined hosts for. The idea is that generally - # `get_joined_hosts` is called with the "current" state group for the - # room, and so consecutive calls will be for consecutive state groups - # which point to the previous state group. - cache = await self._get_joined_hosts_cache(room_id) - - # If the state group in the cache matches, we already have the data we need. - if state_entry.state_group == cache.state_group: - return frozenset(cache.hosts_to_joined_users) - - # Since we'll mutate the cache we need to lock. - async with self._joined_host_linearizer.queue(room_id): - if state_entry.state_group == cache.state_group: - # Same state group, so nothing to do. We've already checked for - # this above, but the cache may have changed while waiting on - # the lock. - pass - elif state_entry.prev_group == cache.state_group: - # The cached work is for the previous state group, so we work out - # the delta. - assert state_entry.delta_ids is not None - for (typ, state_key), event_id in state_entry.delta_ids.items(): - if typ != EventTypes.Member: - continue - - host = intern_string(get_domain_from_id(state_key)) - user_id = state_key - known_joins = cache.hosts_to_joined_users.setdefault(host, set()) - - event = await self.get_event(event_id) - if event.membership == Membership.JOIN: - known_joins.add(user_id) - else: - known_joins.discard(user_id) - - if not known_joins: - cache.hosts_to_joined_users.pop(host, None) - else: - # The cache doesn't match the state group or prev state group, - # so we calculate the result from first principles. - # - # We need to fetch all hosts joined to the room according to `state` by - # inspecting all join memberships in `state`. However, if the `state` is - # relatively recent then many of its events are likely to be held in - # the current state of the room, which is easily available and likely - # cached. - # - # We therefore compute the set of `state` events not in the - # current state and only fetch those. - current_memberships = ( - await self._get_approximate_current_memberships_in_room(room_id) - ) - unknown_state_events = {} - joined_users_in_current_state = [] - - for (type, state_key), event_id in state.items(): - if event_id not in current_memberships: - unknown_state_events[type, state_key] = event_id - elif current_memberships[event_id] == Membership.JOIN: - joined_users_in_current_state.append(state_key) - - joined_user_ids = await self.get_joined_user_ids_from_state( - room_id, unknown_state_events - ) - - cache.hosts_to_joined_users = {} - for user_id in chain(joined_user_ids, joined_users_in_current_state): - host = intern_string(get_domain_from_id(user_id)) - cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) - - if state_entry.state_group: - cache.state_group = state_entry.state_group - else: - cache.state_group = object() - - return frozenset(cache.hosts_to_joined_users) - async def _get_approximate_current_memberships_in_room( self, room_id: str ) -> Mapping[str, Optional[str]]: diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 97c4dc2603..f34b7ce8f4 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -697,7 +697,7 @@ class StatsStore(StateDeltasStore): txn: LoggingTransaction, ) -> Tuple[List[JsonDict], int]: filters = [] - args = [self.hs.config.server.server_name] + args: list = [] if search_term: filters.append("(lmr.user_id LIKE ? OR displayname LIKE ?)") @@ -733,7 +733,7 @@ class StatsStore(StateDeltasStore): sql_base = """ FROM local_media_repository as lmr - LEFT JOIN profiles AS p ON lmr.user_id = '@' || p.user_id || ':' || ? + LEFT JOIN profiles AS p ON lmr.user_id = p.full_user_id {} GROUP BY lmr.user_id, displayname """.format( diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 924022c95c..2a136f2ff6 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -409,23 +409,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): txn, users_to_work_on ) - # Next fetch their profiles. Note that the `user_id` here is the - # *localpart*, and that not all users have profiles. + # Next fetch their profiles. Note that not all users have profiles. profile_rows = self.db_pool.simple_select_many_txn( txn, table="profiles", - column="user_id", - iterable=[get_localpart_from_id(u) for u in users_to_insert], + column="full_user_id", + iterable=list(users_to_insert), retcols=( - "user_id", + "full_user_id", "displayname", "avatar_url", ), keyvalues={}, ) profiles = { - f"@{row['user_id']}:{self.server_name}": _UserDirProfile( - f"@{row['user_id']}:{self.server_name}", + row["full_user_id"]: _UserDirProfile( + row["full_user_id"], row["displayname"], row["avatar_url"], ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 6d14963c0a..d3ec648f6d 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -108,7 +108,8 @@ Changes in SCHEMA_VERSION = 78 - Validate check (full_user_id IS NOT NULL) on tables profiles and user_filters Changes in SCHEMA_VERSION = 79 - - We no longer write to column user_id of tables profiles and user_filters + - Add tables to handle in DB read-write locks. + - Add some mitigations for a painful race between foreground and background updates, cf #15677. """ @@ -121,9 +122,7 @@ SCHEMA_COMPAT_VERSION = ( # # insertions to the column `full_user_id` of tables profiles and user_filters can no # longer be null - # - # we no longer write to column `full_user_id` of tables profiles and user_filters - 78 + 76 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py b/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py deleted file mode 100644 index 3541266f7d..0000000000 --- a/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py +++ /dev/null @@ -1,50 +0,0 @@ -from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine - - -def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - """ - Update to drop the NOT NULL constraint on column user_id so that we can cease to - write to it without inserts to other columns triggering the constraint - """ - - if isinstance(database_engine, PostgresEngine): - drop_sql = """ - ALTER TABLE profiles ALTER COLUMN user_id DROP NOT NULL - """ - cur.execute(drop_sql) - else: - # irritatingly in SQLite we need to rewrite the table to drop the constraint. - cur.execute("DROP TABLE IF EXISTS temp_profiles") - - create_sql = """ - CREATE TABLE temp_profiles ( - full_user_id text NOT NULL, - user_id text, - displayname text, - avatar_url text, - UNIQUE (full_user_id), - UNIQUE (user_id) - ) - """ - cur.execute(create_sql) - - copy_sql = """ - INSERT INTO temp_profiles ( - user_id, - displayname, - avatar_url, - full_user_id) - SELECT user_id, displayname, avatar_url, full_user_id FROM profiles - """ - cur.execute(copy_sql) - - drop_sql = """ - DROP TABLE profiles - """ - cur.execute(drop_sql) - - rename_sql = """ - ALTER TABLE temp_profiles RENAME to profiles - """ - cur.execute(rename_sql) diff --git a/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py b/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py deleted file mode 100644 index 8e7569c470..0000000000 --- a/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py +++ /dev/null @@ -1,54 +0,0 @@ -from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine - - -def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - """ - Update to drop the NOT NULL constraint on column user_id so that we can cease to - write to it without inserts to other columns triggering the constraint - """ - if isinstance(database_engine, PostgresEngine): - drop_sql = """ - ALTER TABLE user_filters ALTER COLUMN user_id DROP NOT NULL - """ - cur.execute(drop_sql) - - else: - # irritatingly in SQLite we need to rewrite the table to drop the constraint. - cur.execute("DROP TABLE IF EXISTS temp_user_filters") - - create_sql = """ - CREATE TABLE temp_user_filters ( - full_user_id text NOT NULL, - user_id text, - filter_id bigint NOT NULL, - filter_json bytea NOT NULL - ) - """ - cur.execute(create_sql) - - index_sql = """ - CREATE UNIQUE INDEX IF NOT EXISTS user_filters_full_user_id_unique ON - temp_user_filters (full_user_id, filter_id) - """ - cur.execute(index_sql) - - copy_sql = """ - INSERT INTO temp_user_filters ( - user_id, - filter_id, - filter_json, - full_user_id) - SELECT user_id, filter_id, filter_json, full_user_id FROM user_filters - """ - cur.execute(copy_sql) - - drop_sql = """ - DROP TABLE user_filters - """ - cur.execute(drop_sql) - - rename_sql = """ - ALTER TABLE temp_user_filters RENAME to user_filters - """ - cur.execute(rename_sql) diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres index e1a41be9c9..7df07ab0da 100644 --- a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres +++ b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres @@ -44,7 +44,7 @@ -- A table to track whether a lock is currently acquired, and if so whether its -- in read or write mode. -CREATE TABLE worker_read_write_locks_mode ( +CREATE TABLE IF NOT EXISTS worker_read_write_locks_mode ( lock_name TEXT NOT NULL, lock_key TEXT NOT NULL, -- Whether this lock is in read (false) or write (true) mode @@ -55,14 +55,14 @@ CREATE TABLE worker_read_write_locks_mode ( ); -- Ensure that we can only have one row per lock -CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key); +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key); -- We need this (redundant) constraint so that we can have a foreign key -- constraint against this table. -CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock); +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock); -- A table to track who has currently acquired a given lock. -CREATE TABLE worker_read_write_locks ( +CREATE TABLE IF NOT EXISTS worker_read_write_locks ( lock_name TEXT NOT NULL, lock_key TEXT NOT NULL, -- We write the instance name to ease manual debugging, we don't ever read @@ -84,9 +84,9 @@ CREATE TABLE worker_read_write_locks ( FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock) ); -CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token); +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token); -- Ensures that only one instance can acquire a lock in write mode at a time. -CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock; +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock; -- Add a foreign key constraint to ensure that if a lock is in @@ -97,56 +97,6 @@ CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lo -- We only add to PostgreSQL as SQLite does not support adding constraints -- after table creation, and so doesn't support "circular" foreign key -- constraints. +ALTER TABLE worker_read_write_locks_mode DROP CONSTRAINT IF EXISTS worker_read_write_locks_mode_foreign; ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED; - - --- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try --- and acquire a lock, i.e. insert into `worker_read_write_locks`, -CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$ -BEGIN - INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) - VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) - ON CONFLICT (lock_name, lock_key) - DO NOTHING; - RETURN NEW; -END -$$ -LANGUAGE plpgsql; - -CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks - FOR EACH ROW - EXECUTE PROCEDURE upsert_read_write_lock_parent(); - - --- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock --- is released (i.e. a row deleted from `worker_read_write_locks`). Either we --- update the `worker_read_write_locks_mode.token` to match another instance --- that has currently acquired the lock, or we delete the row if nobody has --- currently acquired a lock. -CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$ -DECLARE - new_token TEXT; -BEGIN - SELECT token INTO new_token FROM worker_read_write_locks - WHERE - lock_name = OLD.lock_name - AND lock_key = OLD.lock_key; - - IF NOT FOUND THEN - DELETE FROM worker_read_write_locks_mode - WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; - ELSE - UPDATE worker_read_write_locks_mode - SET token = new_token - WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; - END IF; - - RETURN NEW; -END -$$ -LANGUAGE plpgsql; - -CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks - FOR EACH ROW - EXECUTE PROCEDURE delete_read_write_lock_parent(); diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite index be2dfbbb8a..95f9dbf120 100644 --- a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite +++ b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite @@ -22,7 +22,7 @@ -- A table to track whether a lock is currently acquired, and if so whether its -- in read or write mode. -CREATE TABLE worker_read_write_locks_mode ( +CREATE TABLE IF NOT EXISTS worker_read_write_locks_mode ( lock_name TEXT NOT NULL, lock_key TEXT NOT NULL, -- Whether this lock is in read (false) or write (true) mode @@ -38,14 +38,14 @@ CREATE TABLE worker_read_write_locks_mode ( ); -- Ensure that we can only have one row per lock -CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key); +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key); -- We need this (redundant) constraint so that we can have a foreign key -- constraint against this table. -CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock); +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock); -- A table to track who has currently acquired a given lock. -CREATE TABLE worker_read_write_locks ( +CREATE TABLE IF NOT EXISTS worker_read_write_locks ( lock_name TEXT NOT NULL, lock_key TEXT NOT NULL, -- We write the instance name to ease manual debugging, we don't ever read @@ -67,53 +67,6 @@ CREATE TABLE worker_read_write_locks ( FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock) ); -CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token); +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token); -- Ensures that only one instance can acquire a lock in write mode at a time. -CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock; - - --- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try --- and acquire a lock, i.e. insert into `worker_read_write_locks`, -CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger -BEFORE INSERT ON worker_read_write_locks -FOR EACH ROW -BEGIN - -- First ensure that `worker_read_write_locks_mode` doesn't have stale - -- entries in it, as on SQLite we don't have the foreign key constraint to - -- enforce this. - DELETE FROM worker_read_write_locks_mode - WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key - AND NOT EXISTS ( - SELECT 1 FROM worker_read_write_locks - WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key - ); - - INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) - VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) - ON CONFLICT (lock_name, lock_key) - DO NOTHING; -END; - --- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock --- is released (i.e. a row deleted from `worker_read_write_locks`). Either we --- update the `worker_read_write_locks_mode.token` to match another instance --- that has currently acquired the lock, or we delete the row if nobody has --- currently acquired a lock. -CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger -AFTER DELETE ON worker_read_write_locks -FOR EACH ROW -BEGIN - DELETE FROM worker_read_write_locks_mode - WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key - AND NOT EXISTS ( - SELECT 1 FROM worker_read_write_locks - WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key - ); - - UPDATE worker_read_write_locks_mode - SET token = ( - SELECT token FROM worker_read_write_locks - WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key - ) - WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; -END; +CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock; diff --git a/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py b/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py new file mode 100644 index 0000000000..ae63585847 --- /dev/null +++ b/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py @@ -0,0 +1,70 @@ +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine + + +def run_create( + cur: LoggingTransaction, + database_engine: BaseDatabaseEngine, +) -> None: + """ + An attempt to mitigate a painful race between foreground and background updates + touching the `stream_ordering` column of the events table. More info can be found + at https://github.com/matrix-org/synapse/issues/15677. + """ + + # technically the bg update we're concerned with below should only have been added in + # postgres but it doesn't hurt to be extra careful + if isinstance(database_engine, PostgresEngine): + select_sql = """ + SELECT 1 FROM background_updates + WHERE update_name = 'replace_stream_ordering_column' + """ + cur.execute(select_sql) + res = cur.fetchone() + + # if the background update `replace_stream_ordering_column` is still pending, we need + # to drop the indexes added in 7403, and re-add them to the column `stream_ordering2` + # with the idea that they will be preserved when the column is renamed `stream_ordering` + # after the background update has finished + if res: + drop_cse_sql = """ + ALTER TABLE current_state_events DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey + """ + cur.execute(drop_cse_sql) + + drop_lcm_sql = """ + ALTER TABLE local_current_membership DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey + """ + cur.execute(drop_lcm_sql) + + drop_rm_sql = """ + ALTER TABLE room_memberships DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey + """ + cur.execute(drop_rm_sql) + + add_cse_sql = """ + ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey + FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID; + """ + cur.execute(add_cse_sql) + + add_lcm_sql = """ + ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey + FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID; + """ + cur.execute(add_lcm_sql) + + add_rm_sql = """ + ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey + FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID; + """ + cur.execute(add_rm_sql) diff --git a/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres new file mode 100644 index 0000000000..ea3496ef2d --- /dev/null +++ b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres @@ -0,0 +1,69 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql` + +-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try +-- and acquire a lock, i.e. insert into `worker_read_write_locks`, +CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$ +BEGIN + INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) + VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) + ON CONFLICT (lock_name, lock_key) + DO UPDATE SET write_lock = NEW.write_lock, token = NEW.token; + RETURN NEW; +END +$$ +LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks; +CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks + FOR EACH ROW + EXECUTE PROCEDURE upsert_read_write_lock_parent(); + + +-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock +-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we +-- update the `worker_read_write_locks_mode.token` to match another instance +-- that has currently acquired the lock, or we delete the row if nobody has +-- currently acquired a lock. +CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$ +DECLARE + new_token TEXT; +BEGIN + SELECT token INTO new_token FROM worker_read_write_locks + WHERE + lock_name = OLD.lock_name + AND lock_key = OLD.lock_key + LIMIT 1 FOR UPDATE; + + IF NOT FOUND THEN + DELETE FROM worker_read_write_locks_mode + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key AND token = OLD.token; + ELSE + UPDATE worker_read_write_locks_mode + SET token = new_token + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; + END IF; + + RETURN NEW; +END +$$ +LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger ON worker_read_write_locks; +CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks + FOR EACH ROW + EXECUTE PROCEDURE delete_read_write_lock_parent(); diff --git a/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite new file mode 100644 index 0000000000..acb1a77c80 --- /dev/null +++ b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite @@ -0,0 +1,65 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql` + +-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try +-- and acquire a lock, i.e. insert into `worker_read_write_locks`, +DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger; +CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger +BEFORE INSERT ON worker_read_write_locks +FOR EACH ROW +BEGIN + -- First ensure that `worker_read_write_locks_mode` doesn't have stale + -- entries in it, as on SQLite we don't have the foreign key constraint to + -- enforce this. + DELETE FROM worker_read_write_locks_mode + WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key + AND NOT EXISTS ( + SELECT 1 FROM worker_read_write_locks + WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key + ); + + INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token) + VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token) + ON CONFLICT (lock_name, lock_key) + DO UPDATE SET write_lock = NEW.write_lock, token = NEW.token; +END; + +-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock +-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we +-- update the `worker_read_write_locks_mode.token` to match another instance +-- that has currently acquired the lock, or we delete the row if nobody has +-- currently acquired a lock. +DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger; +CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger +AFTER DELETE ON worker_read_write_locks +FOR EACH ROW +BEGIN + DELETE FROM worker_read_write_locks_mode + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key + AND token = OLD.token + AND NOT EXISTS ( + SELECT 1 FROM worker_read_write_locks + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key + ); + + UPDATE worker_read_write_locks_mode + SET token = ( + SELECT token FROM worker_read_write_locks + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key + ) + WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key; +END; |