summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/account_data.py7
-rw-r--r--synapse/storage/databases/main/cache.py14
-rw-r--r--synapse/storage/databases/main/devices.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py13
-rw-r--r--synapse/storage/databases/main/filtering.py95
-rw-r--r--synapse/storage/databases/main/metrics.py83
-rw-r--r--synapse/storage/databases/main/profile.py102
-rw-r--r--synapse/storage/databases/main/receipts.py7
-rw-r--r--synapse/storage/databases/state/bg_updates.py5
-rw-r--r--synapse/storage/databases/state/store.py15
10 files changed, 228 insertions, 115 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py

index a9843f6e17..8f7bdbc61a 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -85,13 +85,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) writers=hs.config.worker.writers.account_data, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._account_data_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index bd07d20171..46fa0a73f9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -274,11 +274,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ cache_func = getattr(self, cache_name, None) @@ -297,11 +297,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: - """Invalidates the cache and adds it to the cache stream so slaves + """Invalidates the cache and adds it to the cache stream so other workers will know to invalidate their caches. - This should only be used to invalidate caches where slaves won't - otherwise know from other replication streams that the cache should + This should only be used to invalidate caches where other workers won't + otherwise have known from other replication streams that the cache should be invalidated. """ txn.call_after(cache_func.invalidate, keys) @@ -310,7 +310,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_all_cache_and_stream( self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: - """Invalidates the entire cache and adds it to the cache stream so slaves + """Invalidates the entire cache and adds it to the cache stream so other workers will know to invalidate their caches. """ diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 5503621ad6..a67fdb3c22 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -105,8 +105,6 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): is_writer=hs.config.worker.worker_app is None, ) - # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a - # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 0ff3fc7369..a39bc90974 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -213,13 +213,10 @@ class EventsWorkerStore(SQLBaseStore): writers=hs.config.worker.writers.events, ) else: + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._stream_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), @@ -1976,12 +1973,6 @@ class EventsWorkerStore(SQLBaseStore): return rows, to_token, True - async def is_event_after(self, event_id1: str, event_id2: str) -> bool: - """Returns True if event_id1 is after event_id2 in the stream""" - to_1, so_1 = await self.get_event_ordering(event_id1) - to_2, so_2 = await self.get_event_ordering(event_id2) - return (to_1, so_1) > (to_2, so_2) - @cached(max_entries=5000) async def get_event_ordering(self, event_id: str) -> Tuple[int, int]: res = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 50516402f9..da31eb44dc 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py
@@ -25,6 +25,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, UserID from synapse.util.caches.descriptors import cached @@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "full_users_filters_unique_idx", index_name="full_users_unique_idx", @@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_user_filters", + self.populate_full_user_id_user_filters, + ) + + async def populate_full_user_id_user_filters( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + user_filters from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM user_filters + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_user_filters" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_user_filters", + progress, + ) + + return 50 + @cached(num_args=2) async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 14294a0bb8..595e22982e 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py
@@ -248,89 +248,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): (count,) = cast(Tuple[int], txn.fetchone()) return count - async def count_r30_users(self) -> Dict[str, int]: - """ - Counts the number of 30 day retained users, defined as:- - * Users who have created their accounts more than 30 days ago - * Where last seen at most 30 days ago - * Where account creation and last_seen are > 30 days apart - - Returns: - A mapping of counts globally as well as broken out by platform. - """ - - def _count_r30_users(txn: LoggingTransaction) -> Dict[str, int]: - thirty_days_in_secs = 86400 * 30 - now = int(self._clock.time()) - thirty_days_ago_in_secs = now - thirty_days_in_secs - - sql = """ - SELECT platform, COUNT(*) FROM ( - SELECT - users.name, platform, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen, - CASE - WHEN user_agent LIKE '%%Android%%' THEN 'android' - WHEN user_agent LIKE '%%iOS%%' THEN 'ios' - WHEN user_agent LIKE '%%Electron%%' THEN 'electron' - WHEN user_agent LIKE '%%Mozilla%%' THEN 'web' - WHEN user_agent LIKE '%%Gecko%%' THEN 'web' - ELSE 'unknown' - END - AS platform - FROM user_ips - ) uip - ON users.name = uip.user_id - AND users.appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, platform, users.creation_ts - ) u GROUP BY platform - """ - - results = {} - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - for row in txn: - if row[0] == "unknown": - pass - results[row[0]] = row[1] - - sql = """ - SELECT COUNT(*) FROM ( - SELECT users.name, users.creation_ts * 1000, - MAX(uip.last_seen) - FROM users - INNER JOIN ( - SELECT - user_id, - last_seen - FROM user_ips - ) uip - ON users.name = uip.user_id - AND appservice_id is NULL - AND users.creation_ts < ? - AND uip.last_seen/1000 > ? - AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30 - GROUP BY users.name, users.creation_ts - ) u - """ - - txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs)) - - (count,) = cast(Tuple[int], txn.fetchone()) - results["all"] = count - - return results - - return await self.db_pool.runInteraction("count_r30_users", _count_r30_users) - async def count_r30v2_users(self) -> Dict[str, int]: """ Counts the number of 30 day retained users, defined as users that: diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index c4022d2427..65c92bef51 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py
@@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.databases.main.roommember import ProfileInfo -from synapse.types import UserID +from synapse.storage.engines import PostgresEngine +from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "profiles_full_user_id_key_idx", index_name="profiles_full_user_id_key", @@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_profiles", self.populate_full_user_id_profiles + ) + + async def populate_full_user_id_profiles( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + profiles from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM profiles + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_profiles" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_profiles", + progress, + ) + + return 50 + async def get_profileinfo(self, user_localpart: str) -> ProfileInfo: try: profile = await self.db_pool.simple_select_one( diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 074942b167..5ee5c7ad9f 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -85,13 +85,10 @@ class ReceiptsWorkerStore(SQLBaseStore): else: self._can_write_to_receipts = True + # Multiple writers are not supported for SQLite. + # # We shouldn't be running in worker mode with SQLite, but its useful # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). self._receipts_id_gen = StreamIdGenerator( db_conn, hs.get_replication_notifier(), diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 097dea5182..86eb1a8a08 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): updates. """ + @trace + @tag_args def _count_state_group_hops_txn( self, txn: LoggingTransaction, state_group: int ) -> int: @@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): return count + @trace + @tag_args def _get_state_groups_from_groups_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 29ff64e876..6984d11352 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -20,6 +20,7 @@ import attr from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "get_state_group_delta", _get_state_group_delta_txn ) + @trace + @tag_args @cancellable async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter @@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return results + @trace + @tag_args def _get_state_for_group_using_cache( self, cache: DictionaryCache[int, StateKey, str], @@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types + @trace + @tag_args @cancellable async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None @@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state + @trace + @tag_args def _get_state_for_groups_using_cache( self, groups: Iterable[int], @@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): fetched_keys=non_member_types, ) + @trace + @tag_args async def store_state_deltas_for_batched( self, events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]], @@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): prev_group, ) + @trace + @tag_args async def store_state_group( self, event_id: str, @@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ((sg,) for sg in state_groups_to_delete), ) + @trace + @tag_args async def get_previous_state_groups( self, state_groups: Iterable[int] ) -> Dict[int, int]: