summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/account_data.py129
-rw-r--r--synapse/storage/databases/main/appservice.py2
-rw-r--r--synapse/storage/databases/main/devices.py49
-rw-r--r--synapse/storage/databases/main/directory.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py27
-rw-r--r--synapse/storage/databases/main/event_federation.py11
-rw-r--r--synapse/storage/databases/main/event_push_actions.py7
-rw-r--r--synapse/storage/databases/main/events.py30
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py6
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py4
-rw-r--r--synapse/storage/databases/main/receipts.py10
-rw-r--r--synapse/storage/databases/main/registration.py4
-rw-r--r--synapse/storage/databases/main/relations.py7
-rw-r--r--synapse/storage/databases/main/roommember.py19
-rw-r--r--synapse/storage/databases/main/signatures.py6
-rw-r--r--synapse/storage/databases/main/tags.py8
-rw-r--r--synapse/storage/databases/main/user_directory.py4
17 files changed, 196 insertions, 131 deletions
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py

index 8a359d7eb8..95567826f2 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -21,6 +21,7 @@ from typing import ( FrozenSet, Iterable, List, + Mapping, Optional, Tuple, cast, @@ -122,25 +123,25 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return self._account_data_id_gen.get_current_token() @cached() - async def get_account_data_for_user( + async def get_global_account_data_for_user( self, user_id: str - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: + ) -> Mapping[str, JsonDict]: """ - Get all the client account_data for a user. + Get all the global client account_data for a user. If experimental MSC3391 support is enabled, any entries with an empty content body are excluded; as this means they have been deleted. Args: user_id: The user to get the account_data for. + Returns: - A 2-tuple of a dict of global account_data and a dict mapping from - room_id string to per room account_data dicts. + The global account_data. """ - def get_account_data_for_user_txn( + def get_global_account_data_for_user( txn: LoggingTransaction, - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: + ) -> Dict[str, JsonDict]: # The 'content != '{}' condition below prevents us from using # `simple_select_list_txn` here, as it doesn't support conditions # other than 'equals'. @@ -158,10 +159,34 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) txn.execute(sql, (user_id,)) rows = self.db_pool.cursor_to_dict(txn) - global_account_data = { + return { row["account_data_type"]: db_to_json(row["content"]) for row in rows } + return await self.db_pool.runInteraction( + "get_global_account_data_for_user", get_global_account_data_for_user + ) + + @cached() + async def get_room_account_data_for_user( + self, user_id: str + ) -> Mapping[str, Mapping[str, JsonDict]]: + """ + Get all of the per-room client account_data for a user. + + If experimental MSC3391 support is enabled, any entries with an empty + content body are excluded; as this means they have been deleted. + + Args: + user_id: The user to get the account_data for. + + Returns: + A dict mapping from room_id string to per-room account_data dicts. + """ + + def get_room_account_data_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, Dict[str, JsonDict]]: # The 'content != '{}' condition below prevents us from using # `simple_select_list_txn` here, as it doesn't support conditions # other than 'equals'. @@ -185,10 +210,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) room_data[row["account_data_type"]] = db_to_json(row["content"]) - return global_account_data, by_room + return by_room return await self.db_pool.runInteraction( - "get_account_data_for_user", get_account_data_for_user_txn + "get_room_account_data_for_user_txn", get_room_account_data_for_user_txn ) @cached(num_args=2, max_entries=5000, tree=True) @@ -215,7 +240,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) @cached(num_args=2, tree=True) async def get_account_data_for_room( self, user_id: str, room_id: str - ) -> Dict[str, JsonDict]: + ) -> Mapping[str, JsonDict]: """Get all the client account_data for a user for a room. Args: @@ -342,36 +367,61 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "get_updated_room_account_data", get_updated_room_account_data_txn ) - async def get_updated_account_data_for_user( + async def get_updated_global_account_data_for_user( self, user_id: str, stream_id: int - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: - """Get all the client account_data for a that's changed for a user + ) -> Dict[str, JsonDict]: + """Get all the global account_data that's changed for a user. Args: user_id: The user to get the account_data for. stream_id: The point in the stream since which to get updates + Returns: - A deferred pair of a dict of global account_data and a dict - mapping from room_id string to per room account_data dicts. + A dict of global account_data. """ - def get_updated_account_data_for_user_txn( + def get_updated_global_account_data_for_user( txn: LoggingTransaction, - ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: - sql = ( - "SELECT account_data_type, content FROM account_data" - " WHERE user_id = ? AND stream_id > ?" - ) - + ) -> Dict[str, JsonDict]: + sql = """ + SELECT account_data_type, content FROM account_data + WHERE user_id = ? AND stream_id > ? + """ txn.execute(sql, (user_id, stream_id)) - global_account_data = {row[0]: db_to_json(row[1]) for row in txn} + return {row[0]: db_to_json(row[1]) for row in txn} - sql = ( - "SELECT room_id, account_data_type, content FROM room_account_data" - " WHERE user_id = ? AND stream_id > ?" - ) + changed = self._account_data_stream_cache.has_entity_changed( + user_id, int(stream_id) + ) + if not changed: + return {} + + return await self.db_pool.runInteraction( + "get_updated_global_account_data_for_user", + get_updated_global_account_data_for_user, + ) + + async def get_updated_room_account_data_for_user( + self, user_id: str, stream_id: int + ) -> Dict[str, Dict[str, JsonDict]]: + """Get all the room account_data that's changed for a user. + Args: + user_id: The user to get the account_data for. + stream_id: The point in the stream since which to get updates + + Returns: + A dict mapping from room_id string to per room account_data dicts. + """ + + def get_updated_room_account_data_for_user_txn( + txn: LoggingTransaction, + ) -> Dict[str, Dict[str, JsonDict]]: + sql = """ + SELECT room_id, account_data_type, content FROM room_account_data + WHERE user_id = ? AND stream_id > ? + """ txn.execute(sql, (user_id, stream_id)) account_data_by_room: Dict[str, Dict[str, JsonDict]] = {} @@ -379,16 +429,17 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) room_account_data = account_data_by_room.setdefault(row[0], {}) room_account_data[row[1]] = db_to_json(row[2]) - return global_account_data, account_data_by_room + return account_data_by_room changed = self._account_data_stream_cache.has_entity_changed( user_id, int(stream_id) ) if not changed: - return {}, {} + return {} return await self.db_pool.runInteraction( - "get_updated_account_data_for_user", get_updated_account_data_for_user_txn + "get_updated_room_account_data_for_user", + get_updated_room_account_data_for_user_txn, ) @cached(max_entries=5000, iterable=True) @@ -444,7 +495,8 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) self.get_global_account_data_by_type_for_user.invalidate( (row.user_id, row.data_type) ) - self.get_account_data_for_user.invalidate((row.user_id,)) + self.get_global_account_data_for_user.invalidate((row.user_id,)) + self.get_room_account_data_for_user.invalidate((row.user_id,)) self.get_account_data_for_room.invalidate((row.user_id, row.room_id)) self.get_account_data_for_room_and_type.invalidate( (row.user_id, row.room_id, row.data_type) @@ -492,7 +544,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_room_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) self.get_account_data_for_room_and_type.prefill( (user_id, room_id, account_data_type), content @@ -558,7 +610,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return None self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_room_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) self.get_account_data_for_room_and_type.prefill( (user_id, room_id, account_data_type), {} @@ -593,7 +645,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.invalidate( (user_id, account_data_type) ) @@ -761,7 +813,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return None self._account_data_stream_cache.entity_has_changed(user_id, next_id) - self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.prefill( (user_id, account_data_type), {} ) @@ -822,7 +874,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) txn, self.get_account_data_for_room_and_type, (user_id,) ) self._invalidate_cache_and_stream( - txn, self.get_account_data_for_user, (user_id,) + txn, self.get_global_account_data_for_user, (user_id,) + ) + self._invalidate_cache_and_stream( + txn, self.get_room_account_data_for_user, (user_id,) ) self._invalidate_cache_and_stream( txn, self.get_global_account_data_by_type_for_user, (user_id,) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 5fb152c4ff..484db175d0 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py
@@ -166,7 +166,7 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore): room_id: str, app_service: "ApplicationService", cache_context: _CacheContext, - ) -> List[str]: + ) -> Sequence[str]: """ Get all users in a room that the appservice controls. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e8b6cc6b80..1ca66d57d4 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -21,6 +21,7 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -100,6 +101,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ("device_lists_outbound_pokes", "stream_id"), ("device_lists_changes_in_room", "stream_id"), ("device_lists_remote_pending", "stream_id"), + ("device_lists_changes_converted_stream_position", "stream_id"), ], is_writer=hs.config.worker.worker_app is None, ) @@ -201,7 +203,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - async def count_devices_by_users(self, user_ids: Optional[List[str]] = None) -> int: + async def count_devices_by_users( + self, user_ids: Optional[Collection[str]] = None + ) -> int: """Retrieve number of all devices of given users. Only returns number of devices that are not marked as hidden. @@ -212,7 +216,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): """ def count_devices_by_users_txn( - txn: LoggingTransaction, user_ids: List[str] + txn: LoggingTransaction, user_ids: Collection[str] ) -> int: sql = """ SELECT count(*) @@ -745,42 +749,47 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): @trace @cancellable async def get_user_devices_from_cache( - self, query_list: List[Tuple[str, Optional[str]]] - ) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]: + self, user_ids: Set[str], user_and_device_ids: List[Tuple[str, str]] + ) -> Tuple[Set[str], Dict[str, Mapping[str, JsonDict]]]: """Get the devices (and keys if any) for remote users from the cache. Args: - query_list: List of (user_id, device_ids), if device_ids is - falsey then return all device ids for that user. + user_ids: users which should have all device IDs returned + user_and_device_ids: List of (user_id, device_ids) Returns: A tuple of (user_ids_not_in_cache, results_map), where user_ids_not_in_cache is a set of user_ids and results_map is a mapping of user_id -> device_id -> device_info. """ - user_ids = {user_id for user_id, _ in query_list} - user_map = await self.get_device_list_last_stream_id_for_remotes(list(user_ids)) + unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids} + user_map = await self.get_device_list_last_stream_id_for_remotes( + list(unique_user_ids) + ) # We go and check if any of the users need to have their device lists # resynced. If they do then we remove them from the cached list. users_needing_resync = await self.get_user_ids_requiring_device_list_resync( - user_ids + unique_user_ids ) user_ids_in_cache = { user_id for user_id, stream_id in user_map.items() if stream_id } - users_needing_resync - user_ids_not_in_cache = user_ids - user_ids_in_cache - - results: Dict[str, Dict[str, JsonDict]] = {} - for user_id, device_id in query_list: - if user_id not in user_ids_in_cache: - continue + user_ids_not_in_cache = unique_user_ids - user_ids_in_cache - if device_id: - device = await self._get_cached_user_device(user_id, device_id) - results.setdefault(user_id, {})[device_id] = device - else: + # First fetch all the users which all devices are to be returned. + results: Dict[str, Mapping[str, JsonDict]] = {} + for user_id in user_ids: + if user_id in user_ids_in_cache: results[user_id] = await self.get_cached_devices_for_user(user_id) + # Then fetch all device-specific requests, but skip users we've already + # fetched all devices for. + device_specific_results: Dict[str, Dict[str, JsonDict]] = {} + for user_id, device_id in user_and_device_ids: + if user_id in user_ids_in_cache and user_id not in user_ids: + device = await self._get_cached_user_device(user_id, device_id) + device_specific_results.setdefault(user_id, {})[device_id] = device + results.update(device_specific_results) set_tag("in_cache", str(results)) set_tag("not_in_cache", str(user_ids_not_in_cache)) @@ -798,7 +807,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return db_to_json(content) @cached() - async def get_cached_devices_for_user(self, user_id: str) -> Dict[str, JsonDict]: + async def get_cached_devices_for_user(self, user_id: str) -> Mapping[str, JsonDict]: devices = await self.db_pool.simple_select_list( table="device_lists_remote_cache", keyvalues={"user_id": user_id}, diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py
index 5903fdaf00..44aa181174 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Iterable, List, Optional, Tuple +from typing import Iterable, List, Optional, Sequence, Tuple import attr @@ -74,7 +74,7 @@ class DirectoryWorkerStore(CacheInvalidationWorkerStore): ) @cached(max_entries=5000) - async def get_aliases_for_room(self, room_id: str) -> List[str]: + async def get_aliases_for_room(self, room_id: str) -> Sequence[str]: return await self.db_pool.simple_select_onecol( "room_aliases", {"room_id": room_id}, diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index c4ac6c33ba..2c2d145666 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -20,7 +20,9 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, + Sequence, Tuple, Union, cast, @@ -260,7 +262,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker for batch in batch_iter(signature_query, 50): cross_sigs_result = await self.db_pool.runInteraction( - "get_e2e_cross_signing_signatures", + "get_e2e_cross_signing_signatures_for_devices", self._get_e2e_cross_signing_signatures_for_devices_txn, batch, ) @@ -691,7 +693,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker @cached(max_entries=10000) async def get_e2e_unused_fallback_key_types( self, user_id: str, device_id: str - ) -> List[str]: + ) -> Sequence[str]: """Returns the fallback key types that have an unused key. Args: @@ -731,7 +733,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return user_keys.get(key_type) @cached(num_args=1) - def _get_bare_e2e_cross_signing_keys(self, user_id: str) -> Dict[str, JsonDict]: + def _get_bare_e2e_cross_signing_keys(self, user_id: str) -> Mapping[str, JsonDict]: """Dummy function. Only used to make a cache for _get_bare_e2e_cross_signing_keys_bulk. """ @@ -744,7 +746,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ) async def _get_bare_e2e_cross_signing_keys_bulk( self, user_ids: Iterable[str] - ) -> Dict[str, Optional[Dict[str, JsonDict]]]: + ) -> Dict[str, Optional[Mapping[str, JsonDict]]]: """Returns the cross-signing keys for a set of users. The output of this function should be passed to _get_e2e_cross_signing_signatures_txn if the signatures for the calling user need to be fetched. @@ -765,7 +767,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ) # The `Optional` comes from the `@cachedList` decorator. - return cast(Dict[str, Optional[Dict[str, JsonDict]]], result) + return cast(Dict[str, Optional[Mapping[str, JsonDict]]], result) def _get_bare_e2e_cross_signing_keys_bulk_txn( self, @@ -924,7 +926,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker @cancellable async def get_e2e_cross_signing_keys_bulk( self, user_ids: List[str], from_user_id: Optional[str] = None - ) -> Dict[str, Optional[Dict[str, JsonDict]]]: + ) -> Dict[str, Optional[Mapping[str, JsonDict]]]: """Returns the cross-signing keys for a set of users. Args: @@ -940,11 +942,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids) if from_user_id: - result = await self.db_pool.runInteraction( - "get_e2e_cross_signing_signatures", - self._get_e2e_cross_signing_signatures_txn, - result, - from_user_id, + result = cast( + Dict[str, Optional[Mapping[str, JsonDict]]], + await self.db_pool.runInteraction( + "get_e2e_cross_signing_signatures", + self._get_e2e_cross_signing_signatures_txn, + result, + from_user_id, + ), ) return result diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index bbee02ab18..ca780cca36 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -22,6 +22,7 @@ from typing import ( Iterable, List, Optional, + Sequence, Set, Tuple, cast, @@ -1004,7 +1005,9 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas room_id, ) - async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[Optional[str], int]: + async def get_max_depth_of( + self, event_ids: Collection[str] + ) -> Tuple[Optional[str], int]: """Returns the event ID and depth for the event that has the max depth from a set of event IDs Args: @@ -1141,7 +1144,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas ) @cached(max_entries=5000, iterable=True) - async def get_latest_event_ids_in_room(self, room_id: str) -> List[str]: + async def get_latest_event_ids_in_room(self, room_id: str) -> Sequence[str]: return await self.db_pool.simple_select_onecol( table="event_forward_extremities", keyvalues={"room_id": room_id}, @@ -1171,7 +1174,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas @cancellable async def get_forward_extremities_for_room_at_stream_ordering( self, room_id: str, stream_ordering: int - ) -> List[str]: + ) -> Sequence[str]: """For a given room_id and stream_ordering, return the forward extremeties of the room at that point in "time". @@ -1204,7 +1207,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas @cached(max_entries=5000, num_args=2) async def _get_forward_extremeties_for_room( self, room_id: str, stream_ordering: int - ) -> List[str]: + ) -> Sequence[str]: """For a given room_id and stream_ordering, return the forward extremeties of the room at that point in "time". diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 3a0c370fde..eeccf5db24 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -203,11 +203,18 @@ class RoomNotifCounts: # Map of thread ID to the notification counts. threads: Dict[str, NotifCounts] + @staticmethod + def empty() -> "RoomNotifCounts": + return _EMPTY_ROOM_NOTIF_COUNTS + def __len__(self) -> int: # To properly account for the amount of space in any caches. return len(self.threads) + 1 +_EMPTY_ROOM_NOTIF_COUNTS = RoomNotifCounts(NotifCounts(), {}) + + def _serialize_action( actions: Collection[Union[Mapping, str]], is_highlight: bool ) -> str: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1536937b67..7996cbb557 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -16,7 +16,6 @@ import itertools import logging from collections import OrderedDict -from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -26,7 +25,6 @@ from typing import ( Iterable, List, Optional, - Sequence, Set, Tuple, ) @@ -36,7 +34,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EventContentFields, EventTypes, RelationTypes -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import PartialStateConflictError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext @@ -52,7 +50,7 @@ from synapse.storage.databases.main.search import SearchEntry from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.storage.util.sequence import SequenceGenerator -from synapse.types import JsonDict, StateMap, get_domain_from_id +from synapse.types import JsonDict, StateMap, StrCollection, get_domain_from_id from synapse.util import json_encoder from synapse.util.iterutils import batch_iter, sorted_topologically from synapse.util.stringutils import non_null_str_or_none @@ -72,24 +70,6 @@ event_counter = Counter( ) -class PartialStateConflictError(SynapseError): - """An internal error raised when attempting to persist an event with partial state - after the room containing the event has been un-partial stated. - - This error should be handled by recomputing the event context and trying again. - - This error has an HTTP status code so that it can be transported over replication. - It should not be exposed to clients. - """ - - def __init__(self) -> None: - super().__init__( - HTTPStatus.CONFLICT, - msg="Cannot persist partial state event in un-partial stated room", - errcode=Codes.UNKNOWN, - ) - - @attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -306,7 +286,7 @@ class PersistEventsStore: # The set of event_ids to return. This includes all soft-failed events # and their prev events. - existing_prevs = set() + existing_prevs: Set[str] = set() def _get_prevs_before_rejected_txn( txn: LoggingTransaction, batch: Collection[str] @@ -571,7 +551,7 @@ class PersistEventsStore: event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], - event_to_auth_chain: Dict[str, Sequence[str]], + event_to_auth_chain: Dict[str, StrCollection], ) -> None: """Calculate the chain cover index for the given events. @@ -865,7 +845,7 @@ class PersistEventsStore: event_chain_id_gen: SequenceGenerator, event_to_room_id: Dict[str, str], event_to_types: Dict[str, Tuple[str, str]], - event_to_auth_chain: Dict[str, Sequence[str]], + event_to_auth_chain: Dict[str, StrCollection], events_to_calc_chain_id_for: Set[str], chain_map: Dict[str, Tuple[int, int]], ) -> Dict[str, Tuple[int, int]]: diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index b9d3c36d60..584536111d 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Set, Tuple, cast +from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, cast import attr @@ -29,7 +29,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events import PersistEventsStore from synapse.storage.types import Cursor -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection if TYPE_CHECKING: from synapse.server import HomeServer @@ -1061,7 +1061,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self.event_chain_id_gen, # type: ignore[attr-defined] event_to_room_id, event_to_types, - cast(Dict[str, Sequence[str]], event_to_auth_chain), + cast(Dict[str, StrCollection], event_to_auth_chain), ) return _CalculateChainCover( diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index db9a24db5e..4b1061e6d7 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, cast from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.database import ( @@ -95,7 +95,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): return await self.db_pool.runInteraction("count_users", _count_users) @cached(num_args=0) - async def get_monthly_active_count_by_service(self) -> Dict[str, int]: + async def get_monthly_active_count_by_service(self) -> Mapping[str, int]: """Generates current count of monthly active users broken down by service. A service is typically an appservice but also includes native matrix users. Since the `monthly_active_users` table is populated from the `user_ips` table diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 29972d5204..dddf49c2d5 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -21,7 +21,9 @@ from typing import ( Dict, Iterable, List, + Mapping, Optional, + Sequence, Tuple, cast, ) @@ -288,7 +290,7 @@ class ReceiptsWorkerStore(SQLBaseStore): async def get_linearized_receipts_for_room( self, room_id: str, to_key: int, from_key: Optional[int] = None - ) -> List[dict]: + ) -> Sequence[JsonDict]: """Get receipts for a single room for sending to clients. Args: @@ -311,7 +313,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cached(tree=True) async def _get_linearized_receipts_for_room( self, room_id: str, to_key: int, from_key: Optional[int] = None - ) -> List[JsonDict]: + ) -> Sequence[JsonDict]: """See get_linearized_receipts_for_room""" def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: @@ -354,7 +356,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) async def _get_linearized_receipts_for_rooms( self, room_ids: Collection[str], to_key: int, from_key: Optional[int] = None - ) -> Dict[str, List[JsonDict]]: + ) -> Dict[str, Sequence[JsonDict]]: if not room_ids: return {} @@ -416,7 +418,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) async def get_linearized_receipts_for_all_rooms( self, to_key: int, from_key: Optional[int] = None - ) -> Dict[str, JsonDict]: + ) -> Mapping[str, JsonDict]: """Get receipts for all rooms between two stream_ids, up to a limit of the latest 100 read receipts. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 31f0f2bd3d..9a55e17624 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -16,7 +16,7 @@ import logging import random import re -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union, cast import attr @@ -192,7 +192,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) @cached() - async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: + async def get_user_by_id(self, user_id: str) -> Optional[Mapping[str, Any]]: """Deprecated: use get_userinfo_by_id instead""" def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 0018d6f7ab..fa3266c081 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py
@@ -22,6 +22,7 @@ from typing import ( List, Mapping, Optional, + Sequence, Set, Tuple, Union, @@ -171,7 +172,7 @@ class RelationsWorkerStore(SQLBaseStore): direction: Direction = Direction.BACKWARDS, from_token: Optional[StreamToken] = None, to_token: Optional[StreamToken] = None, - ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: + ) -> Tuple[Sequence[_RelatedEvent], Optional[StreamToken]]: """Get a list of relations for an event, ordered by topological ordering. Args: @@ -397,7 +398,9 @@ class RelationsWorkerStore(SQLBaseStore): return result is not None @cached() - async def get_aggregation_groups_for_event(self, event_id: str) -> List[JsonDict]: + async def get_aggregation_groups_for_event( + self, event_id: str + ) -> Sequence[JsonDict]: raise NotImplementedError() @cachedList( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index ea6a5e2f34..694a5b802c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -24,6 +24,7 @@ from typing import ( List, Mapping, Optional, + Sequence, Set, Tuple, Union, @@ -153,7 +154,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return self._known_servers_count @cached(max_entries=100000, iterable=True) - async def get_users_in_room(self, room_id: str) -> List[str]: + async def get_users_in_room(self, room_id: str) -> Sequence[str]: """Returns a list of users in the room. Will return inaccurate results for rooms with partial state, since the state for @@ -190,9 +191,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) @cached() - def get_user_in_room_with_profile( - self, room_id: str, user_id: str - ) -> Dict[str, ProfileInfo]: + def get_user_in_room_with_profile(self, room_id: str, user_id: str) -> ProfileInfo: raise NotImplementedError() @cachedList( @@ -246,7 +245,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=100000, iterable=True) async def get_users_in_room_with_profiles( self, room_id: str - ) -> Dict[str, ProfileInfo]: + ) -> Mapping[str, ProfileInfo]: """Get a mapping from user ID to profile information for all users in a given room. The profile information comes directly from this room's `m.room.member` @@ -285,7 +284,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) @cached(max_entries=100000) - async def get_room_summary(self, room_id: str) -> Dict[str, MemberSummary]: + async def get_room_summary(self, room_id: str) -> Mapping[str, MemberSummary]: """Get the details of a room roughly suitable for use by the room summary extension to /sync. Useful when lazy loading room members. Args: @@ -357,7 +356,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached() async def get_invited_rooms_for_local_user( self, user_id: str - ) -> List[RoomsForUser]: + ) -> Sequence[RoomsForUser]: """Get all the rooms the *local* user is invited to. Args: @@ -475,7 +474,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return results @cached(iterable=True) - async def get_local_users_in_room(self, room_id: str) -> List[str]: + async def get_local_users_in_room(self, room_id: str) -> Sequence[str]: """ Retrieves a list of the current roommembers who are local to the server. """ @@ -791,7 +790,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): """Returns the set of users who share a room with `user_id`""" room_ids = await self.get_rooms_for_user(user_id) - user_who_share_room = set() + user_who_share_room: Set[str] = set() for room_id in room_ids: user_ids = await self.get_users_in_room(room_id) user_who_share_room.update(user_ids) @@ -953,7 +952,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return True @cached(iterable=True, max_entries=10000) - async def get_current_hosts_in_room(self, room_id: str) -> Set[str]: + async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]: """Get current hosts in room based on current state.""" # First we check if we already have `get_users_in_room` in the cache, as diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py
index 05da15074a..5dcb1fc0b5 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py
@@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Collection, Dict, List, Tuple +from typing import Collection, Dict, List, Mapping, Tuple from unpaddedbase64 import encode_base64 @@ -26,7 +26,7 @@ from synapse.util.caches.descriptors import cached, cachedList class SignatureWorkerStore(EventsWorkerStore): @cached() - def get_event_reference_hash(self, event_id: str) -> Dict[str, Dict[str, bytes]]: + def get_event_reference_hash(self, event_id: str) -> Mapping[str, bytes]: # This is a dummy function to allow get_event_reference_hashes # to use its cache raise NotImplementedError() @@ -36,7 +36,7 @@ class SignatureWorkerStore(EventsWorkerStore): ) async def get_event_reference_hashes( self, event_ids: Collection[str] - ) -> Dict[str, Dict[str, bytes]]: + ) -> Mapping[str, Mapping[str, bytes]]: """Get all hashes for given events. Args: diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index d5500cdd47..c149a9eacb 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py
@@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import Any, Dict, Iterable, List, Tuple, cast +from typing import Any, Dict, Iterable, List, Mapping, Tuple, cast from synapse.api.constants import AccountDataTypes from synapse.replication.tcp.streams import AccountDataStream @@ -32,7 +32,9 @@ logger = logging.getLogger(__name__) class TagsWorkerStore(AccountDataWorkerStore): @cached() - async def get_tags_for_user(self, user_id: str) -> Dict[str, Dict[str, JsonDict]]: + async def get_tags_for_user( + self, user_id: str + ) -> Mapping[str, Mapping[str, JsonDict]]: """Get all the tags for a user. @@ -107,7 +109,7 @@ class TagsWorkerStore(AccountDataWorkerStore): async def get_updated_tags( self, user_id: str, stream_id: int - ) -> Dict[str, Dict[str, JsonDict]]: + ) -> Mapping[str, Mapping[str, JsonDict]]: """Get all the tags for the rooms where the tags have changed since the given version diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 14ef5b040d..f6a6fd4079 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -16,9 +16,9 @@ import logging import re from typing import ( TYPE_CHECKING, - Dict, Iterable, List, + Mapping, Optional, Sequence, Set, @@ -586,7 +586,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) @cached() - async def get_user_in_directory(self, user_id: str) -> Optional[Dict[str, str]]: + async def get_user_in_directory(self, user_id: str) -> Optional[Mapping[str, str]]: return await self.db_pool.simple_select_one( table="user_directory", keyvalues={"user_id": user_id},