summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/__init__.py2
-rw-r--r--synapse/storage/databases/main/account_data.py24
-rw-r--r--synapse/storage/databases/main/deviceinbox.py106
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py84
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py19
-rw-r--r--synapse/storage/databases/main/events_worker.py6
-rw-r--r--synapse/storage/databases/main/keys.py17
-rw-r--r--synapse/storage/databases/main/media_repository.py90
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py2
-rw-r--r--synapse/storage/databases/main/presence.py9
-rw-r--r--synapse/storage/databases/main/purge_events.py33
-rw-r--r--synapse/storage/databases/main/push_rule.py43
-rw-r--r--synapse/storage/databases/main/registration.py20
-rw-r--r--synapse/storage/databases/main/search.py8
-rw-r--r--synapse/storage/databases/state/bg_updates.py4
15 files changed, 365 insertions, 102 deletions
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py

index 7aa24ccf21..b57e260fe0 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py
@@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]): """ databases: List[DatabasePool] - main: "DataStore" # FIXME: #11165: actually an instance of `main_store_class` + main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class` state: StateGroupDataStore persist_events: Optional[PersistEventsStore] diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index d7482a1f4e..07f9b65af3 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) # Invalidate the cache for any ignored users which were added or removed. - for ignored_user_id in previously_ignored_users ^ currently_ignored_users: - self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in ( + previously_ignored_users ^ currently_ignored_users + ) + ], + ) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) async def remove_account_data_for_user( @@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) # Invalidate the cache for ignored users which were removed. - for ignored_user_id in previously_ignored_users: - self._invalidate_cache_and_stream( - txn, self.ignored_by, (ignored_user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in previously_ignored_users + ], + ) # Invalidate for this user the cache tracking ignored users. self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 71eefe6b7c..659631ab65 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py
@@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: Optional[int] = None, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. - limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - limit_statement = "" if limit is None else f"LIMIT {limit}" - sql = f""" - DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( - SELECT MAX(stream_id) FROM ( - SELECT stream_id FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - ORDER BY stream_id - {limit_statement} - ) AS q1 - ) - """ - txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = await self.db_pool.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) + from_stream_id = None + count = 0 + while True: + from_stream_id, loop_count = await self.delete_messages_for_device_between( + user_id, + device_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, + limit=1000, + ) + count += loop_count + if from_stream_id is None: + break log_kv({"message": f"deleted {count} messages for device", "count": count}) - # In this case we don't know if we hit the limit or the delete is complete - # so let's not update the cache. - if count == limit: - return count - # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -515,6 +503,74 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Tuple[Optional[int], int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted (or None if all messages in the range have + been deleted) and the number of messages deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Tuple[Optional[int], int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + row = txn.fetchone() + if row is None or row[0] is None: + return None, 0 + + (max_stream_id,) = row + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + num_deleted = txn.rowcount + if num_deleted < limit: + return None, num_deleted + + return max_stream_id, num_deleted + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int ) -> Tuple[List[JsonDict], int]: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 8cb61eaee3..9e98729330 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return otk_rows + async def get_master_cross_signing_key_updatable_before( + self, user_id: str + ) -> Tuple[bool, Optional[int]]: + """Get time before which a master cross-signing key may be replaced without UIA. + + (UIA means "User-Interactive Auth".) + + There are three cases to distinguish: + (1) No master cross-signing key. + (2) The key exists, but there is no replace-without-UI timestamp in the DB. + (3) The key exists, and has such a timestamp recorded. + + Returns: a 2-tuple of: + - a boolean: is there a master cross-signing key already? + - an optional timestamp, directly taken from the DB. + + In terms of the cases above, these are: + (1) (False, None). + (2) (True, None). + (3) (True, <timestamp in ms>). + + """ + + def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]: + # We want to distinguish between three cases: + txn.execute( + """ + SELECT updatable_without_uia_before_ms + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + """, + (user_id,), + ) + row = cast(Optional[Tuple[Optional[int]]], txn.fetchone()) + if row is None: + return False, None + return True, row[0] + + return await self.db_pool.runInteraction( + "e2e_cross_signing_keys", + impl, + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def __init__( @@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ], desc="add_e2e_signing_key", ) + + async def allow_master_cross_signing_key_replacement_without_uia( + self, user_id: str, duration_ms: int + ) -> Optional[int]: + """Mark this user's latest master key as being replaceable without UIA. + + Said replacement will only be permitted for a short time after calling this + function. That time period is controlled by the duration argument. + + Returns: + None, if there is no such key. + Otherwise, the timestamp before which replacement is allowed without UIA. + """ + timestamp = self._clock.time_msec() + duration_ms + + def impl(txn: LoggingTransaction) -> Optional[int]: + txn.execute( + """ + UPDATE e2e_cross_signing_keys + SET updatable_without_uia_before_ms = ? + WHERE stream_id = ( + SELECT stream_id + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + ) + """, + (timestamp, user_id), + ) + if txn.rowcount == 0: + return None + + return timestamp + + return await self.db_pool.runInteraction( + "allow_master_cross_signing_key_replacement_without_uia", + impl, + ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 0061805150..0c91f19c8e 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Background update to clean out extremities that should have been deleted previously. - Mainly used to deal with the aftermath of #5269. + Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269. """ # This works by first copying all existing forward extremities into the @@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) logger.info( - "Deleted %d forward extremities of %d checked, to clean up #5269", + "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269", deleted, len(original_set), ) @@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) # Iterate the parent IDs and invalidate caches. - for parent_id in {r[1] for r in relations_to_insert}: - cache_tuple = (parent_id,) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined] - ) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] - ) + cache_tuples = {(r[1],) for r in relations_to_insert} + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined] + ) + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined] + ) if results: latest_event_id = results[-1][0] diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e831146cca..a880788f00 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore): room_version: Optional[RoomVersion] if not room_version_id: # this should only happen for out-of-band membership events which - # arrived before #6983 landed. For all other events, we should have + # arrived before https://github.com/matrix-org/synapse/issues/6983 + # landed. For all other events, we should have # an entry in the 'rooms' table. # # However, the 'out_of_band_membership' flag is unreliable for older @@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore): "Room %s for event %s is unknown" % (d["room_id"], event_id) ) - # so, assuming this is an out-of-band-invite that arrived before #6983 + # so, assuming this is an out-of-band-invite that arrived before + # https://github.com/matrix-org/synapse/issues/6983 # landed, we know that the room version must be v5 or earlier (because # v6 hadn't been invented at that point, so invites from such rooms # would have been rejected.) diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ce88772f9e..c700872fdc 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py
@@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore): # invalidate takes a tuple corresponding to the params of # _get_server_keys_json. _get_server_keys_json only takes one # param, which is itself the 2-tuple (server_name, key_id). - for key_id in verify_keys: - self._invalidate_cache_and_stream( - txn, self._get_server_keys_json, ((server_name, key_id),) - ) - self._invalidate_cache_and_stream( - txn, self.get_server_key_json_for_remote, (server_name, key_id) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_server_keys_json, + [((server_name, key_id),) for key_id in verify_keys], + ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_server_key_json_for_remote, + [(server_name, key_id) for key_id in verify_keys], + ) await self.db_pool.runInteraction( "store_server_keys_response", store_server_keys_response_txn diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 3f80a64dc5..149135b8b5 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py
@@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = ( class LocalMedia: media_id: str media_type: str - media_length: int + media_length: Optional[int] upload_name: str created_ts: int url_cache: Optional[str] last_access_ts: int quarantined_by: Optional[str] safe_from_quarantine: bool + user_id: Optional[str] @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): self._drop_media_index_without_method, ) + if hs.config.media.can_load_media_repo: + self.unused_expiration_time: Optional[ + int + ] = hs.config.media.unused_expiration_time + else: + self.unused_expiration_time = None + async def _drop_media_index_without_method( self, progress: JsonDict, batch_size: int ) -> int: @@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "url_cache", "last_access_ts", "safe_from_quarantine", + "user_id", ), allow_none=True, desc="get_local_media", @@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): url_cache=row[5], last_access_ts=row[6], safe_from_quarantine=row[7], + user_id=row[8], ) async def get_local_media_by_user_paginate( @@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): url_cache, last_access_ts, quarantined_by, - safe_from_quarantine + safe_from_quarantine, + user_id FROM local_media_repository WHERE user_id = ? ORDER BY {order_by_column} {order}, media_id ASC @@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): last_access_ts=row[6], quarantined_by=row[7], safe_from_quarantine=bool(row[8]), + user_id=row[9], ) for row in txn ] @@ -392,6 +404,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ) @trace + async def store_local_media_id( + self, + media_id: str, + time_now_ms: int, + user_id: UserID, + ) -> None: + await self.db_pool.simple_insert( + "local_media_repository", + { + "media_id": media_id, + "created_ts": time_now_ms, + "user_id": user_id.to_string(), + }, + desc="store_local_media_id", + ) + + @trace async def store_local_media( self, media_id: str, @@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) + async def update_local_media( + self, + media_id: str, + media_type: str, + upload_name: Optional[str], + media_length: int, + user_id: UserID, + url_cache: Optional[str] = None, + ) -> None: + await self.db_pool.simple_update_one( + "local_media_repository", + keyvalues={ + "user_id": user_id.to_string(), + "media_id": media_id, + }, + updatevalues={ + "media_type": media_type, + "upload_name": upload_name, + "media_length": media_length, + "url_cache": url_cache, + }, + desc="update_local_media", + ) + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( @@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="mark_local_media_as_safe", ) + async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]: + """Count the number of pending media for a user. + + Returns: + A tuple of two integers: the total pending media requests and the earliest + expiration timestamp. + """ + + def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]: + sql = """ + SELECT COUNT(*), MIN(created_ts) + FROM local_media_repository + WHERE user_id = ? + AND created_ts > ? + AND media_length IS NULL + """ + assert self.unused_expiration_time is not None + txn.execute( + sql, + ( + user_id.to_string(), + self._clock.time_msec() - self.unused_expiration_time, + ), + ) + row = txn.fetchone() + if not row: + return 0, 0 + return row[0], (row[1] + self.unused_expiration_time if row[1] else 0) + + return await self.db_pool.runInteraction( + "get_pending_media", get_pending_media_txn + ) + async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]: """Get the media_id and ts for a cached URL as of the given timestamp Returns: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 4b1061e6d7..2911e53310 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): if user_id: is_support = self.is_support_user_txn(txn, user_id) if not is_support: - # We do this manually here to avoid hitting #6791 + # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791 self.db_pool.simple_upsert_txn( txn, table="monthly_active_users", diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 3b444d2d07..0198bb09d2 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py
@@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) # for their user ID. value_values=[(presence_stream_id,) for _ in user_ids], ) - for user_id in user_ids: - self._invalidate_cache_and_stream( - txn, self._get_full_presence_stream_token_for_user, (user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_full_presence_stream_token_for_user, + [(user_id,) for user_id in user_ids], + ) return await self.db_pool.runInteraction( "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 1e11bf2706..1a5b5731bb 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") - for event_id, should_delete in event_rows: - self._invalidate_cache_and_stream( - txn, self._get_state_group_for_event, (event_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_state_group_for_event, + [(event_id,) for event_id, _ in event_rows], + ) - # XXX: This is racy, since have_seen_events could be called between the - # transaction completing and the invalidation running. On the other hand, - # that's no different to calling `have_seen_events` just before the - # event is deleted from the database. + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. + self._invalidate_cache_and_stream_bulk( + txn, + self.have_seen_event, + [ + (room_id, event_id) + for event_id, should_delete in event_rows + if should_delete + ], + ) + + for event_id, should_delete in event_rows: if should_delete: - self._invalidate_cache_and_stream( - txn, self.have_seen_event, (room_id, event_id) - ) self.invalidate_get_event_cache_after_txn(txn, event_id) logger.info("[purge] done") @@ -485,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # - room_tags_revisions # The problem with these is that they are largeish and there is no room_id # index on them. In any case we should be clearing out 'stream' tables - # periodically anyway (#5888) + # periodically anyway (https://github.com/matrix-org/synapse/issues/5888) self._invalidate_caches_for_room_and_stream(txn, room_id) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index f72a23c584..cf622e195c 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore): before: str, after: str, ) -> None: - # Lock the table since otherwise we'll have annoying races between the - # SELECT here and the UPSERT below. - self.database_engine.lock_table(txn, "push_rules") - relative_to_rule = before or after - res = self.db_pool.simple_select_one_txn( - txn, - table="push_rules", - keyvalues={"user_name": user_id, "rule_id": relative_to_rule}, - retcols=["priority_class", "priority"], - allow_none=True, - ) + sql = """ + SELECT priority, priority_class FROM push_rules + WHERE user_name = ? AND rule_id = ? + """ + + if isinstance(self.database_engine, PostgresEngine): + sql += " FOR UPDATE" + else: + # Annoyingly SQLite doesn't support row level locking, so lock the whole table + self.database_engine.lock_table(txn, "push_rules") + + txn.execute(sql, (user_id, relative_to_rule)) + row = txn.fetchone() - if not res: + if row is None: raise RuleNotFoundException( "before/after rule not found: %s" % (relative_to_rule,) ) - base_priority_class, base_rule_priority = res + base_rule_priority, base_priority_class = row if base_priority_class != priority_class: raise InconsistentRuleException( @@ -516,9 +518,18 @@ class PushRuleStore(PushRulesWorkerStore): conditions_json: str, actions_json: str, ) -> None: - # Lock the table since otherwise we'll have annoying races between the - # SELECT here and the UPSERT below. - self.database_engine.lock_table(txn, "push_rules") + if isinstance(self.database_engine, PostgresEngine): + # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first + # then re-select the count/max below. + sql = """ + SELECT * FROM push_rules + WHERE user_name = ? and priority_class = ? + FOR UPDATE + """ + txn.execute(sql, (user_id, priority_class)) + else: + # Annoyingly SQLite doesn't support row level locking, so lock the whole table + self.database_engine.lock_table(txn, "push_rules") # find the highest priority rule in that class sql = ( diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dec9858575..2c3f30e2eb 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): updatevalues={"shadow_banned": shadow_banned}, ) # In order for this to apply immediately, clear the cache for this user. - tokens = self.db_pool.simple_select_onecol_txn( + tokens = self.db_pool.simple_select_list_txn( txn, table="access_tokens", keyvalues={"user_id": user_id}, - retcol="token", + retcols=("token",), + ) + self._invalidate_cache_and_stream_bulk( + txn, self.get_user_by_access_token, tokens ) - for token in tokens: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) @@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): ) tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _, _ in tokens_and_devices: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_user_by_access_token, + [(token,) for token, _, _ in tokens_and_devices], + ) txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values) diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f4bef4c99b..e25d86818b 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py
@@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we have to set autocommit, because postgres refuses to # CREATE INDEX CONCURRENTLY without it. - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we should now be able to delete the GIST index. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) if isinstance(self.database_engine, PostgresEngine): await self.db_pool.runWithConnection(create_index) @@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): def create_index(conn: LoggingDatabaseConnection) -> None: conn.rollback() - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) c = conn.cursor() # We create with NULLS FIRST so that when we search *backwards* @@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) """ ) - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) await self.db_pool.runWithConnection(create_index) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 0f9c550b27..2c3151526d 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): conn.rollback() if isinstance(self.database_engine, PostgresEngine): # postgres insists on autocommit for the index - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: txn = conn.cursor() txn.execute( @@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): ) txn.execute("DROP INDEX IF EXISTS state_groups_state_id") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) else: txn = conn.cursor() txn.execute(