diff options
author | Erik Johnston <erik@matrix.org> | 2023-11-16 16:27:21 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2023-11-16 16:27:21 +0000 |
commit | b20bdd3997227dd74403ce39977a37a3b89762ed (patch) | |
tree | b35f4771a535bbbd7fe8955c2c238b62f0ad5f71 /synapse/storage | |
parent | Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff) | |
parent | Speed up deleting device messages (#16643) (diff) | |
download | synapse-b20bdd3997227dd74403ce39977a37a3b89762ed.tar.xz |
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
26 files changed, 629 insertions, 113 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 7426dbcad6..62fbd05534 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -49,7 +49,11 @@ else: if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.storage.database import DatabasePool, LoggingTransaction + from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + ) logger = logging.getLogger(__name__) @@ -746,10 +750,10 @@ class BackgroundUpdater: The named index will be dropped upon completion of the new index. """ - def create_index_psql(conn: Connection) -> None: + def create_index_psql(conn: "LoggingDatabaseConnection") -> None: conn.rollback() # postgres insists on autocommit for the index - conn.set_session(autocommit=True) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -793,9 +797,9 @@ class BackgroundUpdater: undo_timeout_sql = f"SET statement_timeout = {default_timeout}" conn.cursor().execute(undo_timeout_sql) - conn.set_session(autocommit=False) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, False) - def create_index_sqlite(conn: Connection) -> None: + def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None: # Sqlite doesn't support concurrent creation of indexes. # # We assume that sqlite doesn't give us invalid indices; however @@ -825,7 +829,9 @@ class BackgroundUpdater: c.execute(sql) if isinstance(self.db_pool.engine, engines.PostgresEngine): - runner: Optional[Callable[[Connection], None]] = create_index_psql + runner: Optional[ + Callable[[LoggingDatabaseConnection], None] + ] = create_index_psql elif psql_only: runner = None else: diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 7aa24ccf21..b57e260fe0 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]): """ databases: List[DatabasePool] - main: "DataStore" # FIXME: #11165: actually an instance of `main_store_class` + main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class` state: StateGroupDataStore persist_events: Optional[PersistEventsStore] diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index d7482a1f4e..07f9b65af3 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) # Invalidate the cache for any ignored users which were added or removed. - for ignored_user_id in previously_ignored_users ^ currently_ignored_users: - self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in ( + previously_ignored_users ^ currently_ignored_users + ) + ], + ) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) async def remove_account_data_for_user( @@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) # Invalidate the cache for ignored users which were removed. - for ignored_user_id in previously_ignored_users: - self._invalidate_cache_and_stream( - txn, self.ignored_by, (ignored_user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in previously_ignored_users + ], + ) # Invalidate for this user the cache tracking ignored users. self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 71eefe6b7c..659631ab65 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: Optional[int] = None, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. - limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - limit_statement = "" if limit is None else f"LIMIT {limit}" - sql = f""" - DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( - SELECT MAX(stream_id) FROM ( - SELECT stream_id FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - ORDER BY stream_id - {limit_statement} - ) AS q1 - ) - """ - txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = await self.db_pool.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) + from_stream_id = None + count = 0 + while True: + from_stream_id, loop_count = await self.delete_messages_for_device_between( + user_id, + device_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, + limit=1000, + ) + count += loop_count + if from_stream_id is None: + break log_kv({"message": f"deleted {count} messages for device", "count": count}) - # In this case we don't know if we hit the limit or the delete is complete - # so let's not update the cache. - if count == limit: - return count - # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -515,6 +503,74 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Tuple[Optional[int], int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted (or None if all messages in the range have + been deleted) and the number of messages deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Tuple[Optional[int], int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + row = txn.fetchone() + if row is None or row[0] is None: + return None, 0 + + (max_stream_id,) = row + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + num_deleted = txn.rowcount + if num_deleted < limit: + return None, num_deleted + + return max_stream_id, num_deleted + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int ) -> Tuple[List[JsonDict], int]: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 8cb61eaee3..9e98729330 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return otk_rows + async def get_master_cross_signing_key_updatable_before( + self, user_id: str + ) -> Tuple[bool, Optional[int]]: + """Get time before which a master cross-signing key may be replaced without UIA. + + (UIA means "User-Interactive Auth".) + + There are three cases to distinguish: + (1) No master cross-signing key. + (2) The key exists, but there is no replace-without-UI timestamp in the DB. + (3) The key exists, and has such a timestamp recorded. + + Returns: a 2-tuple of: + - a boolean: is there a master cross-signing key already? + - an optional timestamp, directly taken from the DB. + + In terms of the cases above, these are: + (1) (False, None). + (2) (True, None). + (3) (True, <timestamp in ms>). + + """ + + def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]: + # We want to distinguish between three cases: + txn.execute( + """ + SELECT updatable_without_uia_before_ms + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + """, + (user_id,), + ) + row = cast(Optional[Tuple[Optional[int]]], txn.fetchone()) + if row is None: + return False, None + return True, row[0] + + return await self.db_pool.runInteraction( + "e2e_cross_signing_keys", + impl, + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def __init__( @@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ], desc="add_e2e_signing_key", ) + + async def allow_master_cross_signing_key_replacement_without_uia( + self, user_id: str, duration_ms: int + ) -> Optional[int]: + """Mark this user's latest master key as being replaceable without UIA. + + Said replacement will only be permitted for a short time after calling this + function. That time period is controlled by the duration argument. + + Returns: + None, if there is no such key. + Otherwise, the timestamp before which replacement is allowed without UIA. + """ + timestamp = self._clock.time_msec() + duration_ms + + def impl(txn: LoggingTransaction) -> Optional[int]: + txn.execute( + """ + UPDATE e2e_cross_signing_keys + SET updatable_without_uia_before_ms = ? + WHERE stream_id = ( + SELECT stream_id + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + ) + """, + (timestamp, user_id), + ) + if txn.rowcount == 0: + return None + + return timestamp + + return await self.db_pool.runInteraction( + "allow_master_cross_signing_key_replacement_without_uia", + impl, + ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 0061805150..0c91f19c8e 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Background update to clean out extremities that should have been deleted previously. - Mainly used to deal with the aftermath of #5269. + Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269. """ # This works by first copying all existing forward extremities into the @@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) logger.info( - "Deleted %d forward extremities of %d checked, to clean up #5269", + "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269", deleted, len(original_set), ) @@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) # Iterate the parent IDs and invalidate caches. - for parent_id in {r[1] for r in relations_to_insert}: - cache_tuple = (parent_id,) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined] - ) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] - ) + cache_tuples = {(r[1],) for r in relations_to_insert} + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined] + ) + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined] + ) if results: latest_event_id = results[-1][0] diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e831146cca..a880788f00 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore): room_version: Optional[RoomVersion] if not room_version_id: # this should only happen for out-of-band membership events which - # arrived before #6983 landed. For all other events, we should have + # arrived before https://github.com/matrix-org/synapse/issues/6983 + # landed. For all other events, we should have # an entry in the 'rooms' table. # # However, the 'out_of_band_membership' flag is unreliable for older @@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore): "Room %s for event %s is unknown" % (d["room_id"], event_id) ) - # so, assuming this is an out-of-band-invite that arrived before #6983 + # so, assuming this is an out-of-band-invite that arrived before + # https://github.com/matrix-org/synapse/issues/6983 # landed, we know that the room version must be v5 or earlier (because # v6 hadn't been invented at that point, so invites from such rooms # would have been rejected.) diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index ce88772f9e..c700872fdc 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore): # invalidate takes a tuple corresponding to the params of # _get_server_keys_json. _get_server_keys_json only takes one # param, which is itself the 2-tuple (server_name, key_id). - for key_id in verify_keys: - self._invalidate_cache_and_stream( - txn, self._get_server_keys_json, ((server_name, key_id),) - ) - self._invalidate_cache_and_stream( - txn, self.get_server_key_json_for_remote, (server_name, key_id) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_server_keys_json, + [((server_name, key_id),) for key_id in verify_keys], + ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_server_key_json_for_remote, + [(server_name, key_id) for key_id in verify_keys], + ) await self.db_pool.runInteraction( "store_server_keys_response", store_server_keys_response_txn diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 3f80a64dc5..149135b8b5 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = ( class LocalMedia: media_id: str media_type: str - media_length: int + media_length: Optional[int] upload_name: str created_ts: int url_cache: Optional[str] last_access_ts: int quarantined_by: Optional[str] safe_from_quarantine: bool + user_id: Optional[str] @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): self._drop_media_index_without_method, ) + if hs.config.media.can_load_media_repo: + self.unused_expiration_time: Optional[ + int + ] = hs.config.media.unused_expiration_time + else: + self.unused_expiration_time = None + async def _drop_media_index_without_method( self, progress: JsonDict, batch_size: int ) -> int: @@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "url_cache", "last_access_ts", "safe_from_quarantine", + "user_id", ), allow_none=True, desc="get_local_media", @@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): url_cache=row[5], last_access_ts=row[6], safe_from_quarantine=row[7], + user_id=row[8], ) async def get_local_media_by_user_paginate( @@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): url_cache, last_access_ts, quarantined_by, - safe_from_quarantine + safe_from_quarantine, + user_id FROM local_media_repository WHERE user_id = ? ORDER BY {order_by_column} {order}, media_id ASC @@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): last_access_ts=row[6], quarantined_by=row[7], safe_from_quarantine=bool(row[8]), + user_id=row[9], ) for row in txn ] @@ -392,6 +404,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ) @trace + async def store_local_media_id( + self, + media_id: str, + time_now_ms: int, + user_id: UserID, + ) -> None: + await self.db_pool.simple_insert( + "local_media_repository", + { + "media_id": media_id, + "created_ts": time_now_ms, + "user_id": user_id.to_string(), + }, + desc="store_local_media_id", + ) + + @trace async def store_local_media( self, media_id: str, @@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) + async def update_local_media( + self, + media_id: str, + media_type: str, + upload_name: Optional[str], + media_length: int, + user_id: UserID, + url_cache: Optional[str] = None, + ) -> None: + await self.db_pool.simple_update_one( + "local_media_repository", + keyvalues={ + "user_id": user_id.to_string(), + "media_id": media_id, + }, + updatevalues={ + "media_type": media_type, + "upload_name": upload_name, + "media_length": media_length, + "url_cache": url_cache, + }, + desc="update_local_media", + ) + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( @@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="mark_local_media_as_safe", ) + async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]: + """Count the number of pending media for a user. + + Returns: + A tuple of two integers: the total pending media requests and the earliest + expiration timestamp. + """ + + def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]: + sql = """ + SELECT COUNT(*), MIN(created_ts) + FROM local_media_repository + WHERE user_id = ? + AND created_ts > ? + AND media_length IS NULL + """ + assert self.unused_expiration_time is not None + txn.execute( + sql, + ( + user_id.to_string(), + self._clock.time_msec() - self.unused_expiration_time, + ), + ) + row = txn.fetchone() + if not row: + return 0, 0 + return row[0], (row[1] + self.unused_expiration_time if row[1] else 0) + + return await self.db_pool.runInteraction( + "get_pending_media", get_pending_media_txn + ) + async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]: """Get the media_id and ts for a cached URL as of the given timestamp Returns: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 4b1061e6d7..2911e53310 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): if user_id: is_support = self.is_support_user_txn(txn, user_id) if not is_support: - # We do this manually here to avoid hitting #6791 + # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791 self.db_pool.simple_upsert_txn( txn, table="monthly_active_users", diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 3b444d2d07..0198bb09d2 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) # for their user ID. value_values=[(presence_stream_id,) for _ in user_ids], ) - for user_id in user_ids: - self._invalidate_cache_and_stream( - txn, self._get_full_presence_stream_token_for_user, (user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_full_presence_stream_token_for_user, + [(user_id,) for user_id in user_ids], + ) return await self.db_pool.runInteraction( "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 1e11bf2706..1a5b5731bb 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") - for event_id, should_delete in event_rows: - self._invalidate_cache_and_stream( - txn, self._get_state_group_for_event, (event_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_state_group_for_event, + [(event_id,) for event_id, _ in event_rows], + ) - # XXX: This is racy, since have_seen_events could be called between the - # transaction completing and the invalidation running. On the other hand, - # that's no different to calling `have_seen_events` just before the - # event is deleted from the database. + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. + self._invalidate_cache_and_stream_bulk( + txn, + self.have_seen_event, + [ + (room_id, event_id) + for event_id, should_delete in event_rows + if should_delete + ], + ) + + for event_id, should_delete in event_rows: if should_delete: - self._invalidate_cache_and_stream( - txn, self.have_seen_event, (room_id, event_id) - ) self.invalidate_get_event_cache_after_txn(txn, event_id) logger.info("[purge] done") @@ -485,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # - room_tags_revisions # The problem with these is that they are largeish and there is no room_id # index on them. In any case we should be clearing out 'stream' tables - # periodically anyway (#5888) + # periodically anyway (https://github.com/matrix-org/synapse/issues/5888) self._invalidate_caches_for_room_and_stream(txn, room_id) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index f72a23c584..cf622e195c 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore): before: str, after: str, ) -> None: - # Lock the table since otherwise we'll have annoying races between the - # SELECT here and the UPSERT below. - self.database_engine.lock_table(txn, "push_rules") - relative_to_rule = before or after - res = self.db_pool.simple_select_one_txn( - txn, - table="push_rules", - keyvalues={"user_name": user_id, "rule_id": relative_to_rule}, - retcols=["priority_class", "priority"], - allow_none=True, - ) + sql = """ + SELECT priority, priority_class FROM push_rules + WHERE user_name = ? AND rule_id = ? + """ + + if isinstance(self.database_engine, PostgresEngine): + sql += " FOR UPDATE" + else: + # Annoyingly SQLite doesn't support row level locking, so lock the whole table + self.database_engine.lock_table(txn, "push_rules") + + txn.execute(sql, (user_id, relative_to_rule)) + row = txn.fetchone() - if not res: + if row is None: raise RuleNotFoundException( "before/after rule not found: %s" % (relative_to_rule,) ) - base_priority_class, base_rule_priority = res + base_rule_priority, base_priority_class = row if base_priority_class != priority_class: raise InconsistentRuleException( @@ -516,9 +518,18 @@ class PushRuleStore(PushRulesWorkerStore): conditions_json: str, actions_json: str, ) -> None: - # Lock the table since otherwise we'll have annoying races between the - # SELECT here and the UPSERT below. - self.database_engine.lock_table(txn, "push_rules") + if isinstance(self.database_engine, PostgresEngine): + # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first + # then re-select the count/max below. + sql = """ + SELECT * FROM push_rules + WHERE user_name = ? and priority_class = ? + FOR UPDATE + """ + txn.execute(sql, (user_id, priority_class)) + else: + # Annoyingly SQLite doesn't support row level locking, so lock the whole table + self.database_engine.lock_table(txn, "push_rules") # find the highest priority rule in that class sql = ( diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index dec9858575..2c3f30e2eb 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): updatevalues={"shadow_banned": shadow_banned}, ) # In order for this to apply immediately, clear the cache for this user. - tokens = self.db_pool.simple_select_onecol_txn( + tokens = self.db_pool.simple_select_list_txn( txn, table="access_tokens", keyvalues={"user_id": user_id}, - retcol="token", + retcols=("token",), + ) + self._invalidate_cache_and_stream_bulk( + txn, self.get_user_by_access_token, tokens ) - for token in tokens: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) @@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): ) tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _, _ in tokens_and_devices: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_user_by_access_token, + [(token,) for token, _, _ in tokens_and_devices], + ) txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values) diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index f4bef4c99b..e25d86818b 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we have to set autocommit, because postgres refuses to # CREATE INDEX CONCURRENTLY without it. - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we should now be able to delete the GIST index. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) if isinstance(self.database_engine, PostgresEngine): await self.db_pool.runWithConnection(create_index) @@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): def create_index(conn: LoggingDatabaseConnection) -> None: conn.rollback() - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) c = conn.cursor() # We create with NULLS FIRST so that when we search *backwards* @@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) """ ) - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) await self.db_pool.runWithConnection(create_index) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 0f9c550b27..2c3151526d 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): conn.rollback() if isinstance(self.database_engine, PostgresEngine): # postgres insists on autocommit for the index - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: txn = conn.cursor() txn.execute( @@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): ) txn.execute("DROP INDEX IF EXISTS state_groups_state_id") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) else: txn = conn.cursor() txn.execute( diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 6309363217..ec4c4041b7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -38,7 +38,8 @@ class PostgresEngine( super().__init__(psycopg2, database_config) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) - # Disables passing `bytes` to txn.execute, c.f. #6186. If you do + # Disables passing `bytes` to txn.execute, c.f. + # https://github.com/matrix-org/synapse/issues/6186. If you do # actually want to use bytes than wrap it in `bytearray`. def _disable_bytes_adapter(_: bytes) -> NoReturn: raise Exception("Passing bytes to DB is disabled.") diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 158b528dce..03e5a0f55d 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -109,7 +109,8 @@ Changes in SCHEMA_VERSION = 78 Changes in SCHEMA_VERSION = 79 - Add tables to handle in DB read-write locks. - - Add some mitigations for a painful race between foreground and background updates, cf #15677. + - Add some mitigations for a painful race between foreground and background updates, cf + https://github.com/matrix-org/synapse/issues/15677. Changes in SCHEMA_VERSION = 80 - The event_txn_id_device_id is always written to for new events. diff --git a/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres b/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres new file mode 100644 index 0000000000..6bdd1f9569 --- /dev/null +++ b/synapse/storage/schema/common/delta/83/07_common_replica_identities.sql.postgres @@ -0,0 +1,30 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Annotate some tables in Postgres with a REPLICA IDENTITY. +-- Any table that doesn't have a primary key should be annotated explicitly with +-- a REPLICA IDENTITY so that logical replication can be used. +-- If this is not done, then UPDATE and DELETE statements on those tables +-- will fail if logical replication is in use. + + +-- Re-use unique indices already defined on tables as a replica identity. +ALTER TABLE applied_module_schemas REPLICA IDENTITY USING INDEX applied_module_schemas_module_name_file_key; +ALTER TABLE applied_schema_deltas REPLICA IDENTITY USING INDEX applied_schema_deltas_version_file_key; +ALTER TABLE background_updates REPLICA IDENTITY USING INDEX background_updates_uniqueness; +ALTER TABLE schema_compat_version REPLICA IDENTITY USING INDEX schema_compat_version_lock_key; +ALTER TABLE schema_version REPLICA IDENTITY USING INDEX schema_version_lock_key; + + diff --git a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql index b062ec840c..f713e42aa0 100644 --- a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql +++ b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql @@ -14,7 +14,7 @@ */ -- Start a background job to cleanup extremities that were incorrectly added --- by bug #5269. +-- by bug https://github.com/matrix-org/synapse/issues/5269. INSERT INTO background_updates (update_name, progress_json) VALUES ('delete_soft_failed_extremities', '{}'); diff --git a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql index aeb17813d3..246c3359f7 100644 --- a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql +++ b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql @@ -13,6 +13,7 @@ * limitations under the License. */ --- Now that #6232 is a thing, we can remove old rooms from the directory. +-- Now that https://github.com/matrix-org/synapse/pull/6232 is a thing, we can +-- remove old rooms from the directory. INSERT INTO background_updates (update_name, progress_json) VALUES ('remove_tombstoned_rooms_from_directory', '{}'); diff --git a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql index aed79635b2..31a61defa7 100644 --- a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql +++ b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql @@ -13,7 +13,8 @@ * limitations under the License. */ --- Clean up left over rows from bug #11833, which was fixed in #12770. +-- Clean up left over rows from bug https://github.com/matrix-org/synapse/issues/11833, +-- which was fixed in https://github.com/matrix-org/synapse/pull/12770. DELETE FROM federation_inbound_events_staging WHERE room_id not in ( SELECT room_id FROM rooms ); diff --git a/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres b/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres new file mode 100644 index 0000000000..8296d56e56 --- /dev/null +++ b/synapse/storage/schema/main/delta/83/04_replica_identities.sql.postgres @@ -0,0 +1,88 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Annotate some tables in Postgres with a REPLICA IDENTITY. +-- Any table that doesn't have a primary key should be annotated explicitly with +-- a REPLICA IDENTITY so that logical replication can be used. +-- If this is not done, then UPDATE and DELETE statements on those tables +-- will fail if logical replication is in use. + + +-- Where possible, re-use unique indices already defined on tables as a replica +-- identity. +ALTER TABLE appservice_room_list REPLICA IDENTITY USING INDEX appservice_room_list_idx; +ALTER TABLE batch_events REPLICA IDENTITY USING INDEX chunk_events_event_id; +ALTER TABLE blocked_rooms REPLICA IDENTITY USING INDEX blocked_rooms_idx; +ALTER TABLE cache_invalidation_stream_by_instance REPLICA IDENTITY USING INDEX cache_invalidation_stream_by_instance_id; +ALTER TABLE device_lists_changes_in_room REPLICA IDENTITY USING INDEX device_lists_changes_in_stream_id; +ALTER TABLE device_lists_outbound_last_success REPLICA IDENTITY USING INDEX device_lists_outbound_last_success_unique_idx; +ALTER TABLE device_lists_remote_cache REPLICA IDENTITY USING INDEX device_lists_remote_cache_unique_id; +ALTER TABLE device_lists_remote_extremeties REPLICA IDENTITY USING INDEX device_lists_remote_extremeties_unique_idx; +ALTER TABLE device_lists_remote_resync REPLICA IDENTITY USING INDEX device_lists_remote_resync_idx; +ALTER TABLE e2e_cross_signing_keys REPLICA IDENTITY USING INDEX e2e_cross_signing_keys_stream_idx; +ALTER TABLE e2e_room_keys REPLICA IDENTITY USING INDEX e2e_room_keys_with_version_idx; +ALTER TABLE e2e_room_keys_versions REPLICA IDENTITY USING INDEX e2e_room_keys_versions_idx; +ALTER TABLE erased_users REPLICA IDENTITY USING INDEX erased_users_user; +ALTER TABLE event_relations REPLICA IDENTITY USING INDEX event_relations_id; +ALTER TABLE federation_inbound_events_staging REPLICA IDENTITY USING INDEX federation_inbound_events_staging_instance_event; +ALTER TABLE federation_stream_position REPLICA IDENTITY USING INDEX federation_stream_position_instance; +ALTER TABLE ignored_users REPLICA IDENTITY USING INDEX ignored_users_uniqueness; +ALTER TABLE insertion_events REPLICA IDENTITY USING INDEX insertion_events_event_id; +ALTER TABLE insertion_event_extremities REPLICA IDENTITY USING INDEX insertion_event_extremities_event_id; +ALTER TABLE monthly_active_users REPLICA IDENTITY USING INDEX monthly_active_users_users; +ALTER TABLE ratelimit_override REPLICA IDENTITY USING INDEX ratelimit_override_idx; +ALTER TABLE room_stats_earliest_token REPLICA IDENTITY USING INDEX room_stats_earliest_token_idx; +ALTER TABLE room_stats_state REPLICA IDENTITY USING INDEX room_stats_state_room; +ALTER TABLE stream_positions REPLICA IDENTITY USING INDEX stream_positions_idx; +ALTER TABLE user_directory REPLICA IDENTITY USING INDEX user_directory_user_idx; +ALTER TABLE user_directory_search REPLICA IDENTITY USING INDEX user_directory_search_user_idx; +ALTER TABLE user_ips REPLICA IDENTITY USING INDEX user_ips_user_token_ip_unique_index; +ALTER TABLE user_signature_stream REPLICA IDENTITY USING INDEX user_signature_stream_idx; +ALTER TABLE users_in_public_rooms REPLICA IDENTITY USING INDEX users_in_public_rooms_u_idx; +ALTER TABLE users_who_share_private_rooms REPLICA IDENTITY USING INDEX users_who_share_private_rooms_u_idx; +ALTER TABLE user_threepid_id_server REPLICA IDENTITY USING INDEX user_threepid_id_server_idx; +ALTER TABLE worker_locks REPLICA IDENTITY USING INDEX worker_locks_key; + + +-- Where there are no unique indices, use the entire rows as replica identities. +ALTER TABLE current_state_delta_stream REPLICA IDENTITY FULL; +ALTER TABLE deleted_pushers REPLICA IDENTITY FULL; +ALTER TABLE device_auth_providers REPLICA IDENTITY FULL; +ALTER TABLE device_federation_inbox REPLICA IDENTITY FULL; +ALTER TABLE device_federation_outbox REPLICA IDENTITY FULL; +ALTER TABLE device_inbox REPLICA IDENTITY FULL; +ALTER TABLE device_lists_outbound_pokes REPLICA IDENTITY FULL; +ALTER TABLE device_lists_stream REPLICA IDENTITY FULL; +ALTER TABLE e2e_cross_signing_signatures REPLICA IDENTITY FULL; +ALTER TABLE event_auth_chain_links REPLICA IDENTITY FULL; +ALTER TABLE event_auth REPLICA IDENTITY FULL; +ALTER TABLE event_push_actions_staging REPLICA IDENTITY FULL; +ALTER TABLE insertion_event_edges REPLICA IDENTITY FULL; +ALTER TABLE local_media_repository_url_cache REPLICA IDENTITY FULL; +ALTER TABLE presence_stream REPLICA IDENTITY FULL; +ALTER TABLE push_rules_stream REPLICA IDENTITY FULL; +ALTER TABLE room_alias_servers REPLICA IDENTITY FULL; +ALTER TABLE stream_ordering_to_exterm REPLICA IDENTITY FULL; +ALTER TABLE timeline_gaps REPLICA IDENTITY FULL; +ALTER TABLE user_daily_visits REPLICA IDENTITY FULL; +ALTER TABLE users_pending_deactivation REPLICA IDENTITY FULL; + +-- special cases: unique indices on nullable columns can't be used +ALTER TABLE event_push_summary REPLICA IDENTITY FULL; +ALTER TABLE event_search REPLICA IDENTITY FULL; +ALTER TABLE local_media_repository_thumbnails REPLICA IDENTITY FULL; +ALTER TABLE remote_media_cache_thumbnails REPLICA IDENTITY FULL; +ALTER TABLE threepid_guest_access_tokens REPLICA IDENTITY FULL; +ALTER TABLE user_filters REPLICA IDENTITY FULL; -- sadly the `CHECK` constraint is not enough here diff --git a/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql new file mode 100644 index 0000000000..b74bdd71fa --- /dev/null +++ b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql @@ -0,0 +1,15 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL; \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres b/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres new file mode 100644 index 0000000000..a592af814e --- /dev/null +++ b/synapse/storage/schema/main/delta/83/06_more_replica_identities.sql.postgres @@ -0,0 +1,80 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Annotate some tables in Postgres with a REPLICA IDENTITY. +-- Any table that doesn't have a primary key should be annotated explicitly with +-- a REPLICA IDENTITY so that logical replication can be used. +-- If this is not done, then UPDATE and DELETE statements on those tables +-- will fail if logical replication is in use. + + +-- Where possible, re-use unique indices already defined on tables as a replica +-- identity. +ALTER TABLE account_data REPLICA IDENTITY USING INDEX account_data_uniqueness; +ALTER TABLE application_services_txns REPLICA IDENTITY USING INDEX application_services_txns_as_id_txn_id_key; +ALTER TABLE appservice_stream_position REPLICA IDENTITY USING INDEX appservice_stream_position_lock_key; +ALTER TABLE current_state_events REPLICA IDENTITY USING INDEX current_state_events_event_id_key; +ALTER TABLE device_lists_changes_converted_stream_position REPLICA IDENTITY USING INDEX device_lists_changes_converted_stream_position_lock_key; +ALTER TABLE devices REPLICA IDENTITY USING INDEX device_uniqueness; +ALTER TABLE e2e_device_keys_json REPLICA IDENTITY USING INDEX e2e_device_keys_json_uniqueness; +ALTER TABLE e2e_fallback_keys_json REPLICA IDENTITY USING INDEX e2e_fallback_keys_json_uniqueness; +ALTER TABLE e2e_one_time_keys_json REPLICA IDENTITY USING INDEX e2e_one_time_keys_json_uniqueness; +ALTER TABLE event_backward_extremities REPLICA IDENTITY USING INDEX event_backward_extremities_event_id_room_id_key; +ALTER TABLE event_edges REPLICA IDENTITY USING INDEX event_edges_event_id_prev_event_id_idx; +ALTER TABLE event_forward_extremities REPLICA IDENTITY USING INDEX event_forward_extremities_event_id_room_id_key; +ALTER TABLE event_json REPLICA IDENTITY USING INDEX event_json_event_id_key; +ALTER TABLE event_push_summary_last_receipt_stream_id REPLICA IDENTITY USING INDEX event_push_summary_last_receipt_stream_id_lock_key; +ALTER TABLE event_push_summary_stream_ordering REPLICA IDENTITY USING INDEX event_push_summary_stream_ordering_lock_key; +ALTER TABLE events REPLICA IDENTITY USING INDEX events_event_id_key; +ALTER TABLE event_to_state_groups REPLICA IDENTITY USING INDEX event_to_state_groups_event_id_key; +ALTER TABLE event_txn_id_device_id REPLICA IDENTITY USING INDEX event_txn_id_device_id_event_id; +ALTER TABLE event_txn_id REPLICA IDENTITY USING INDEX event_txn_id_event_id; +ALTER TABLE local_current_membership REPLICA IDENTITY USING INDEX local_current_membership_idx; +ALTER TABLE partial_state_events REPLICA IDENTITY USING INDEX partial_state_events_event_id_key; +ALTER TABLE partial_state_rooms_servers REPLICA IDENTITY USING INDEX partial_state_rooms_servers_room_id_server_name_key; +ALTER TABLE profiles REPLICA IDENTITY USING INDEX profiles_user_id_key; +ALTER TABLE redactions REPLICA IDENTITY USING INDEX redactions_event_id_key; +ALTER TABLE registration_tokens REPLICA IDENTITY USING INDEX registration_tokens_token_key; +ALTER TABLE rejections REPLICA IDENTITY USING INDEX rejections_event_id_key; +ALTER TABLE room_account_data REPLICA IDENTITY USING INDEX room_account_data_uniqueness; +ALTER TABLE room_aliases REPLICA IDENTITY USING INDEX room_aliases_room_alias_key; +ALTER TABLE room_depth REPLICA IDENTITY USING INDEX room_depth_room_id_key; +ALTER TABLE room_forgetter_stream_pos REPLICA IDENTITY USING INDEX room_forgetter_stream_pos_lock_key; +ALTER TABLE room_memberships REPLICA IDENTITY USING INDEX room_memberships_event_id_key; +ALTER TABLE room_tags REPLICA IDENTITY USING INDEX room_tag_uniqueness; +ALTER TABLE room_tags_revisions REPLICA IDENTITY USING INDEX room_tag_revisions_uniqueness; +ALTER TABLE server_keys_json REPLICA IDENTITY USING INDEX server_keys_json_uniqueness; +ALTER TABLE sessions REPLICA IDENTITY USING INDEX sessions_session_type_session_id_key; +ALTER TABLE state_events REPLICA IDENTITY USING INDEX state_events_event_id_key; +ALTER TABLE stats_incremental_position REPLICA IDENTITY USING INDEX stats_incremental_position_lock_key; +ALTER TABLE threads REPLICA IDENTITY USING INDEX threads_uniqueness; +ALTER TABLE ui_auth_sessions_credentials REPLICA IDENTITY USING INDEX ui_auth_sessions_credentials_session_id_stage_type_key; +ALTER TABLE ui_auth_sessions_ips REPLICA IDENTITY USING INDEX ui_auth_sessions_ips_session_id_ip_user_agent_key; +ALTER TABLE ui_auth_sessions REPLICA IDENTITY USING INDEX ui_auth_sessions_session_id_key; +ALTER TABLE user_directory_stream_pos REPLICA IDENTITY USING INDEX user_directory_stream_pos_lock_key; +ALTER TABLE user_external_ids REPLICA IDENTITY USING INDEX user_external_ids_auth_provider_external_id_key; +ALTER TABLE user_threepids REPLICA IDENTITY USING INDEX medium_address; +ALTER TABLE worker_read_write_locks_mode REPLICA IDENTITY USING INDEX worker_read_write_locks_mode_key; +ALTER TABLE worker_read_write_locks REPLICA IDENTITY USING INDEX worker_read_write_locks_key; + +-- special cases: unique indices on nullable columns can't be used +ALTER TABLE event_push_actions REPLICA IDENTITY FULL; +ALTER TABLE local_media_repository REPLICA IDENTITY FULL; +ALTER TABLE receipts_graph REPLICA IDENTITY FULL; +ALTER TABLE receipts_linearized REPLICA IDENTITY FULL; +ALTER TABLE received_transactions REPLICA IDENTITY FULL; +ALTER TABLE remote_media_cache REPLICA IDENTITY FULL; +ALTER TABLE server_signature_keys REPLICA IDENTITY FULL; +ALTER TABLE users REPLICA IDENTITY FULL; diff --git a/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres b/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres new file mode 100644 index 0000000000..9b792a39e2 --- /dev/null +++ b/synapse/storage/schema/state/delta/83/05_replica_identities_in_state_db.sql.postgres @@ -0,0 +1,30 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Annotate some tables in Postgres with a REPLICA IDENTITY. +-- Any table that doesn't have a primary key should be annotated explicitly with +-- a REPLICA IDENTITY so that logical replication can be used. +-- If this is not done, then UPDATE and DELETE statements on those tables +-- will fail if logical replication is in use. +-- See also: 82/04_replica_identities.sql.postgres on the main database + + +-- Where possible, re-use unique indices already defined on tables as a replica +-- identity. +ALTER TABLE state_group_edges REPLICA IDENTITY USING INDEX state_group_edges_unique_idx; + + +-- Where there are no unique indices, use the entire rows as replica identities. +ALTER TABLE state_groups_state REPLICA IDENTITY FULL; |