summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/account_data.py7
-rw-r--r--synapse/storage/databases/main/cache.py14
-rw-r--r--synapse/storage/databases/main/devices.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py31
-rw-r--r--synapse/storage/databases/main/events_worker.py13
-rw-r--r--synapse/storage/databases/main/filtering.py95
-rw-r--r--synapse/storage/databases/main/metrics.py83
-rw-r--r--synapse/storage/databases/main/profile.py102
-rw-r--r--synapse/storage/databases/main/receipts.py7
-rw-r--r--synapse/storage/databases/main/roommember.py2
-rw-r--r--synapse/storage/databases/main/user_directory.py190
-rw-r--r--synapse/storage/databases/state/bg_updates.py5
-rw-r--r--synapse/storage/databases/state/store.py15
13 files changed, 373 insertions, 193 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py

index a9843f6e17..8f7bdbc61a 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) writers=hs.config.worker.writers.account_data, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index bd07d20171..46fa0a73f9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ cache_func = getattr(self, cache_name, None) @@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ txn.call_after(cache_func.invalidate, keys) @@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_all_cache_and_stream( self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: - """Invalidates the entire cache and adds it to the cache stream so slaves + """Invalidates the entire cache and adds it to the cache stream so other workers will know to invalidate their caches. """ diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5503621ad6..a67fdb3c22 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): is_writer=hs.config.worker.worker_app is None, ) - # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a - # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ac19de183c..2681917d0b 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -46,7 +46,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache @@ -1584,6 +1584,35 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) @trace + async def get_event_ids_with_failed_pull_attempts( + self, event_ids: StrCollection + ) -> Set[str]: + """ + Filter the given list of `event_ids` and return events which have any failed + pull attempts. + + Args: + event_ids: A list of events to filter down. + + Returns: + A filtered down list of `event_ids` that have previous failed pull attempts. + """ + + rows = await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="get_event_ids_with_failed_pull_attempts", + ) + event_ids_with_failed_pull_attempts: Set[str] = { + row["event_id"] for row in rows + } + + return event_ids_with_failed_pull_attempts + + @trace async def get_event_ids_to_not_pull_from_backoff( self, room_id: str, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 0ff3fc7369..a39bc90974 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore): writers=hs.config.worker.writers.events, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), @@ -1976,12 +1973,6 @@ class EventsWorkerStore(SQLBaseStore): return rows, to_token, True - async def is_event_after(self, event_id1: str, event_id2: str) -> bool: - """Returns True if event_id1 is after event_id2 in the stream""" - to_1, so_1 = await self.get_event_ordering(event_id1) - to_2, so_2 = await self.get_event_ordering(event_id2) - return (to_1, so_1) > (to_2, so_2) - @cached(max_entries=5000) async def get_event_ordering(self, event_id: str) -> Tuple[int, int]: res = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 50516402f9..da31eb44dc 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py
@@ -25,6 +25,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, UserID from synapse.util.caches.descriptors import cached @@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "full_users_filters_unique_idx", index_name="full_users_unique_idx", @@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_user_filters", + self.populate_full_user_id_user_filters, + ) + + async def populate_full_user_id_user_filters( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + user_filters from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM user_filters + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_user_filters" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_user_filters", + progress, + ) + + return 50 + @cached(num_args=2) async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 14294a0bb8..595e22982e 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py
@@ -248,89 +248,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): (count,) = cast(Tuple[int], txn.fetchone()) return count - async def count_r30_users(self) -> Dict[str, int]: - """ - Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days ago - * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days apart - - Returns: - A mapping of counts globally as well as broken out by platform. - """ - - def _count_r30_users(txn: LoggingTransaction) -> Dict[str, int]: - thirty_days_in_secs = 86400 * 30 - now = int(self._clock.time()) - thirty_days_ago_in_secs = now - thirty_days_in_secs - - sql = """ - SELECT platform, COUNT(*) FROM ( - SELECT - users.name, platform, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen, - CASE - WHEN user_agent LIKE '%%Android%%' THEN 'android' - WHEN user_agent LIKE '%%iOS%%' THEN 'ios' - WHEN user_agent LIKE '%%Electron%%' THEN 'electron' - WHEN user_agent LIKE '%%Mozilla%%' THEN 'web' - WHEN user_agent LIKE '%%Gecko%%' THEN 'web' - ELSE 'unknown' - END - AS platform - FROM user_ips - ) uip - ON users.name = uip.user_id - AND users.appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, platform, users.creation_ts - ) u GROUP BY platform - """ - - results = {} - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - for row in txn: - if row[0] == "unknown": - pass - results[row[0]] = row[1] - - sql = """ - SELECT COUNT(*) FROM ( - SELECT users.name, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen - FROM user_ips - ) uip - ON users.name = uip.user_id - AND appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, users.creation_ts - ) u - """ - - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - (count,) = cast(Tuple[int], txn.fetchone()) - results["all"] = count - - return results - - return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) - async def count_r30v2_users(self) -> Dict[str, int]: """ Counts the number of 30 day retained users, defined as users that: diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index c4022d2427..65c92bef51 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py
@@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.databases.main.roommember import ProfileInfo -from synapse.types import UserID +from synapse.storage.engines import PostgresEngine +from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "profiles_full_user_id_key_idx", index_name="profiles_full_user_id_key", @@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_profiles", self.populate_full_user_id_profiles + ) + + async def populate_full_user_id_profiles( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + profiles from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM profiles + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_profiles" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_profiles", + progress, + ) + + return 50 + async def get_profileinfo(self, user_localpart: str) -> ProfileInfo: try: profile = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 074942b167..5ee5c7ad9f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore): else: self._can_write_to_receipts = True + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e068f27a10..ae9c201b87 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -1099,7 +1099,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): # `get_joined_hosts` is called with the "current" state group for the # room, and so consecutive calls will be for consecutive state groups # which point to the previous state group. - cache = await self._get_joined_hosts_cache(room_id) # type: ignore[misc] + cache = await self._get_joined_hosts_cache(room_id) # If the state group in the cache matches, we already have the data we need. if state_entry.state_group == cache.state_group: diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index b7d58978de..a0319575f0 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -17,6 +17,7 @@ import re import unicodedata from typing import ( TYPE_CHECKING, + Collection, Iterable, List, Mapping, @@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none if TYPE_CHECKING: from synapse.server import HomeServer -from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules +from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): Add all local users to the user directory. """ - def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: - sql = "SELECT user_id FROM %s LIMIT %s" % ( - TEMP_TABLE + "_users", - str(batch_size), - ) - txn.execute(sql) - user_result = cast(List[Tuple[str]], txn.fetchall()) + def _populate_user_directory_process_users_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + if self.database_engine.supports_returning: + # Note: we use an ORDER BY in the SELECT to force usage of an + # index. Otherwise, postgres does a sequential scan that is + # surprisingly slow (I think due to the fact it will read/skip + # over lots of already deleted rows). + sql = f""" + DELETE FROM {TEMP_TABLE + "_users"} + WHERE user_id IN ( + SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ? + ) + RETURNING user_id + """ + txn.execute(sql, (batch_size,)) + user_result = cast(List[Tuple[str]], txn.fetchall()) + else: + sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( + TEMP_TABLE + "_users", + str(batch_size), + ) + txn.execute(sql) + user_result = cast(List[Tuple[str]], txn.fetchall()) if not user_result: return None @@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): assert count_result is not None progress["remaining"] = count_result[0] - return users_to_work_on - - users_to_work_on = await self.db_pool.runInteraction( - "populate_user_directory_temp_read", _get_next_batch - ) + if not users_to_work_on: + return None - # No more users -- complete the transaction. - if not users_to_work_on: - await self.db_pool.updates._end_background_update( - "populate_user_directory_process_users" + logger.debug( + "Processing the next %d users of %d remaining", + len(users_to_work_on), + progress["remaining"], ) - return 1 - - logger.debug( - "Processing the next %d users of %d remaining" - % (len(users_to_work_on), progress["remaining"]) - ) - # First filter down to users we want to insert into the user directory. - users_to_insert = [ - user_id - for user_id in users_to_work_on - if await self.should_include_local_user_in_dir(user_id) - ] + # First filter down to users we want to insert into the user directory. + users_to_insert = self._filter_local_users_for_dir_txn( + txn, users_to_work_on + ) - # Next fetch their profiles. Note that the `user_id` here is the - # *localpart*, and that not all users have profiles. - profile_rows = await self.db_pool.simple_select_many_batch( - table="profiles", - column="user_id", - iterable=[get_localpart_from_id(u) for u in users_to_insert], - retcols=( - "user_id", - "displayname", - "avatar_url", - ), - keyvalues={}, - desc="populate_user_directory_process_users_get_profiles", - ) - profiles = { - f"@{row['user_id']}:{self.server_name}": _UserDirProfile( - f"@{row['user_id']}:{self.server_name}", - row["displayname"], - row["avatar_url"], + # Next fetch their profiles. Note that the `user_id` here is the + # *localpart*, and that not all users have profiles. + profile_rows = self.db_pool.simple_select_many_txn( + txn, + table="profiles", + column="user_id", + iterable=[get_localpart_from_id(u) for u in users_to_insert], + retcols=( + "user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, ) - for row in profile_rows - } + profiles = { + f"@{row['user_id']}:{self.server_name}": _UserDirProfile( + f"@{row['user_id']}:{self.server_name}", + row["displayname"], + row["avatar_url"], + ) + for row in profile_rows + } - profiles_to_insert = [ - profiles.get(user_id) or _UserDirProfile(user_id) - for user_id in users_to_insert - ] + profiles_to_insert = [ + profiles.get(user_id) or _UserDirProfile(user_id) + for user_id in users_to_insert + ] + + # Actually insert the users with their profiles into the directory. + self._update_profiles_in_user_dir_txn(txn, profiles_to_insert) + + # We've finished processing the users. Delete it from the table, if + # we haven't already. + if not self.database_engine.supports_returning: + self.db_pool.simple_delete_many_txn( + txn, + table=TEMP_TABLE + "_users", + column="user_id", + values=users_to_work_on, + keyvalues={}, + ) - # Actually insert the users with their profiles into the directory. - await self.db_pool.runInteraction( - "populate_user_directory_process_users_insertion", - self._update_profiles_in_user_dir_txn, - profiles_to_insert, - ) + # Update the remaining counter. + progress["remaining"] -= len(users_to_work_on) + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_users", progress + ) + return len(users_to_work_on) - # We've finished processing the users. Delete it from the table. - await self.db_pool.simple_delete_many( - table=TEMP_TABLE + "_users", - column="user_id", - iterable=users_to_work_on, - keyvalues={}, - desc="populate_user_directory_process_users_delete", + processed_count = await self.db_pool.runInteraction( + "populate_user_directory_temp", _populate_user_directory_process_users_txn ) - # Update the remaining counter. - progress["remaining"] -= len(users_to_work_on) - await self.db_pool.runInteraction( - "populate_user_directory", - self.db_pool.updates._background_update_progress_txn, - "populate_user_directory_process_users", - progress, - ) + # No more users -- complete the transaction. + if not processed_count: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_users" + ) + return 1 - return len(users_to_work_on) + return processed_count async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. @@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return True + def _filter_local_users_for_dir_txn( + self, txn: LoggingTransaction, users: Collection[str] + ) -> Collection[str]: + """A batched version of `should_include_local_user_in_dir`""" + users = [ + user + for user in users + if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined] + and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined] + ] + + rows = self.db_pool.simple_select_many_txn( + txn, + table="users", + column="name", + iterable=users, + keyvalues={ + "deactivated": 0, + }, + retcols=("name", "user_type"), + ) + + return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT] + async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool: """Check if the room is either world_readable or publically joinable""" diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 097dea5182..86eb1a8a08 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): updates. """ + @trace + @tag_args def _count_state_group_hops_txn( self, txn: LoggingTransaction, state_group: int ) -> int: @@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): return count + @trace + @tag_args def _get_state_groups_from_groups_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 29ff64e876..6984d11352 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -20,6 +20,7 @@ import attr from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "get_state_group_delta", _get_state_group_delta_txn ) + @trace + @tag_args @cancellable async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter @@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return results + @trace + @tag_args def _get_state_for_group_using_cache( self, cache: DictionaryCache[int, StateKey, str], @@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types + @trace + @tag_args @cancellable async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None @@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state + @trace + @tag_args def _get_state_for_groups_using_cache( self, groups: Iterable[int], @@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): fetched_keys=non_member_types, ) + @trace + @tag_args async def store_state_deltas_for_batched( self, events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]], @@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): prev_group, ) + @trace + @tag_args async def store_state_group( self, event_id: str, @@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ((sg,) for sg in state_groups_to_delete), ) + @trace + @tag_args async def get_previous_state_groups( self, state_groups: Iterable[int] ) -> Dict[int, int]: