summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/controllers/state.py7
-rw-r--r--synapse/storage/database.py47
-rw-r--r--synapse/storage/databases/main/account_data.py3
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py5
-rw-r--r--synapse/storage/databases/main/event_federation.py4
-rw-r--r--synapse/storage/databases/main/event_push_actions.py256
-rw-r--r--synapse/storage/databases/main/events_worker.py22
-rw-r--r--synapse/storage/databases/main/lock.py121
-rw-r--r--synapse/storage/databases/main/push_rule.py6
-rw-r--r--synapse/storage/databases/main/receipts.py12
-rw-r--r--synapse/storage/databases/main/registration.py8
-rw-r--r--synapse/storage/databases/main/room.py17
-rw-r--r--synapse/storage/databases/main/roommember.py301
-rw-r--r--synapse/storage/databases/main/state.py4
-rw-r--r--synapse/storage/databases/main/stats.py86
-rw-r--r--synapse/storage/databases/main/stream.py2
-rw-r--r--synapse/storage/databases/main/transactions.py30
-rw-r--r--synapse/storage/databases/state/store.py3
-rw-r--r--synapse/storage/engines/_base.py8
-rw-r--r--synapse/storage/engines/postgres.py7
-rw-r--r--synapse/storage/engines/sqlite.py13
-rw-r--r--synapse/storage/schema/__init__.py1
-rw-r--r--synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres17
-rw-r--r--synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite40
-rw-r--r--synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql19
-rw-r--r--synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql16
-rw-r--r--synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql (renamed from synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql)0
-rw-r--r--synapse/storage/util/id_generators.py13
-rw-r--r--synapse/storage/util/partial_state_events_tracker.py3
30 files changed, 525 insertions, 550 deletions
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py

index 5964835ea3..19d9d315f6 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py
@@ -23,7 +23,6 @@ from typing import ( List, Mapping, Optional, - Set, Tuple, ) @@ -37,6 +36,7 @@ from synapse.storage.util.partial_state_events_tracker import ( PartialStateEventsTracker, ) from synapse.types import MutableStateMap, StateMap +from synapse.util.cancellation import cancellable if TYPE_CHECKING: from synapse.server import HomeServer @@ -230,6 +230,7 @@ class StateStorageController: @trace @tag_args + @cancellable async def get_state_ids_for_events( self, event_ids: Collection[str], @@ -351,6 +352,7 @@ class StateStorageController: @trace @tag_args + @cancellable async def get_state_group_for_events( self, event_ids: Collection[str], @@ -399,6 +401,7 @@ class StateStorageController: event_id, room_id, prev_group, delta_ids, current_state_ids ) + @cancellable async def get_current_state_ids( self, room_id: str, @@ -520,7 +523,7 @@ class StateStorageController: ) return state_map.get(key) - async def get_current_hosts_in_room(self, room_id: str) -> Set[str]: + async def get_current_hosts_in_room(self, room_id: str) -> List[str]: """Get current hosts in room based on current state.""" await self._partial_state_room_tracker.await_full_state(room_id) diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ca0f606797..15d5da3fa5 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool: if isinstance(self.engine, Sqlite3Engine): self._unsafe_to_upsert_tables.add("user_directory_search") - if self.engine.can_native_upsert: - # Check ASAP (and then later, every 1s) to see if we have finished - # background updates of tables that aren't safe to update. - self._clock.call_later( - 0.0, - run_as_background_process, - "upsert_safety_check", - self._check_safe_to_upsert, - ) + # Check ASAP (and then later, every 1s) to see if we have finished + # background updates of tables that aren't safe to update. + self._clock.call_later( + 0.0, + run_as_background_process, + "upsert_safety_check", + self._check_safe_to_upsert, + ) def name(self) -> str: "Return the name of this database" @@ -1156,11 +1155,8 @@ class DatabasePool: attempts = 0 while True: try: - # We can autocommit if we are going to use native upserts - autocommit = ( - self.engine.can_native_upsert - and table not in self._unsafe_to_upsert_tables - ) + # We can autocommit if it is safe to upsert + autocommit = table not in self._unsafe_to_upsert_tables return await self.runInteraction( desc, @@ -1195,7 +1191,7 @@ class DatabasePool: ) -> bool: """ Pick the UPSERT method which works best on the platform. Either the - native one (Pg9.5+, recent SQLites), or fall back to an emulated method. + native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method. Args: txn: The transaction to use. @@ -1203,14 +1199,15 @@ class DatabasePool: keyvalues: The unique key tables and their new values values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting - lock: True to lock the table when doing the upsert. + lock: True to lock the table when doing the upsert. Unused when performing + a native upsert. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) """ insertion_values = insertion_values or {} - if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: + if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_txn_native_upsert( txn, table, keyvalues, values, insertion_values=insertion_values ) @@ -1361,14 +1358,12 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused if the database engine - supports native upserts. + lock: True to lock the table when doing the upsert. Unused when performing + a native upsert. """ - # We can autocommit if we are going to use native upserts - autocommit = ( - self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables - ) + # We can autocommit if it safe to upsert + autocommit = table not in self._unsafe_to_upsert_tables await self.runInteraction( desc, @@ -1402,10 +1397,10 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused if the database engine - supports native upserts. + lock: True to lock the table when doing the upsert. Unused when performing + a native upsert. """ - if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: + if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( txn, table, key_names, key_values, value_names, value_values ) diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 9af9f4f18e..c38b8a9e5a 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -650,9 +650,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) txn, self.get_account_data_for_room, (user_id,) ) self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,)) - self._invalidate_cache_and_stream( - txn, self.get_push_rules_enabled_for_user, (user_id,) - ) # This user might be contained in the ignored_by cache for other users, # so we have to invalidate it all. self._invalidate_all_cache_and_stream(txn, self.ignored_by) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 7ceb7a202b..dfca34550d 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -53,6 +53,7 @@ from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -668,6 +669,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): ... @trace + @cancellable async def get_user_devices_from_cache( self, query_list: List[Tuple[str, Optional[str]]] ) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]: @@ -743,6 +745,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): return self._device_list_stream_cache.get_all_entities_changed(from_key) + @cancellable async def get_users_whose_devices_changed( self, from_key: int, @@ -1221,6 +1224,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): desc="get_min_device_lists_changes_in_room", ) + @cancellable async def get_device_list_changes_in_rooms( self, room_ids: Collection[str], from_id: int ) -> Optional[Set[str]]: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 2df8101390..210cfab073 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -50,6 +50,7 @@ from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -135,6 +136,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return now_stream_id, [] @trace + @cancellable async def get_e2e_device_keys_for_cs_api( self, query_list: List[Tuple[str, Optional[str]]] ) -> Dict[str, Dict[str, JsonDict]]: @@ -197,6 +199,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ... @trace + @cancellable async def get_e2e_device_keys_and_signatures( self, query_list: Collection[Tuple[str, Optional[str]]], @@ -887,6 +890,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return keys + @cancellable async def get_e2e_cross_signing_keys_bulk( self, user_ids: List[str], from_user_id: Optional[str] = None ) -> Dict[str, Optional[Dict[str, JsonDict]]]: @@ -902,7 +906,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker keys were not found, either their user ID will not be in the dict, or their user ID will map to None. """ - result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids) if from_user_id: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 41b015dba1..271e7a71c1 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -48,6 +48,7 @@ from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -976,6 +977,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return int(min_depth) if min_depth is not None else None + @cancellable async def get_forward_extremities_for_room_at_stream_ordering( self, room_id: str, stream_ordering: int ) -> List[str]: @@ -1606,7 +1608,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas logger.info("Invalid prev_events for %s", event_id) continue - if room_version.event_format == EventFormatVersions.V1: + if room_version.event_format == EventFormatVersions.ROOM_V1_V2: for prev_event_tuple in prev_events: if ( not isinstance(prev_event_tuple, list) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index eabf9c9739..f4a07de2a3 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -274,7 +274,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas receipt_types=( ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, ), ) @@ -459,6 +458,31 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas return await self.db_pool.runInteraction("get_push_action_users_in_range", f) + def _get_receipts_by_room_txn( + self, txn: LoggingTransaction, user_id: str + ) -> List[Tuple[str, int]]: + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + ( + ReceiptTypes.READ, + ReceiptTypes.READ_PRIVATE, + ), + ) + + sql = f""" + SELECT room_id, MAX(stream_ordering) + FROM receipts_linearized + INNER JOIN events USING (room_id, event_id) + WHERE {receipt_types_clause} + AND user_id = ? + GROUP BY room_id + """ + + args.extend((user_id,)) + txn.execute(sql, args) + return cast(List[Tuple[str, int]], txn.fetchall()) + async def get_unread_push_actions_for_user_in_range_for_http( self, user_id: str, @@ -482,106 +506,45 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas The list will have between 0~limit entries. """ - # find rooms that have a read receipt in them and return the next - # push actions - def get_after_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool]]: - # find rooms that have a read receipt in them and return the next - # push actions - - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight - FROM ( - SELECT room_id, - MAX(stream_ordering) as stream_ordering - FROM events - INNER JOIN receipts_linearized USING (room_id, event_id) - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) AS rl, - event_push_actions AS ep - WHERE - ep.room_id = rl.room_id - AND ep.stream_ordering > rl.stream_ordering - AND ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering ASC LIMIT ? - """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) - - after_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt + receipts_by_room = dict( + await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http_receipts", + self._get_receipts_by_room_txn, + user_id=user_id, + ), ) - # There are rooms with push actions in them but you don't have a read receipt in - # them e.g. rooms you've been invited to, so get push actions for rooms which do - # not have read receipts in them too. - def get_no_receipt( + def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool]]: - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight + sql = """ + SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight FROM event_push_actions AS ep - INNER JOIN events AS e USING (room_id, event_id) WHERE - ep.room_id NOT IN ( - SELECT room_id FROM receipts_linearized - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) - AND ep.user_id = ? + ep.user_id = ? AND ep.stream_ordering > ? AND ep.stream_ordering <= ? AND ep.notif = 1 ORDER BY ep.stream_ordering ASC LIMIT ? """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) + txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) - no_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + push_actions = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn ) notifs = [ HttpPushAction( - event_id=row[0], - room_id=row[1], - stream_ordering=row[2], - actions=_deserialize_action(row[3], row[4]), + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + actions=_deserialize_action(actions, highlight), ) - for row in after_read_receipt + no_read_receipt + for event_id, room_id, stream_ordering, actions, highlight in push_actions + # Only include push actions with a stream ordering after any receipt, or without any + # receipt present (invited to but never read rooms). + if stream_ordering > receipts_by_room.get(room_id, 0) ] # Now sort it so it's ordered correctly, since currently it will @@ -617,106 +580,49 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas The list will have between 0~limit entries. """ - # find rooms that have a read receipt in them and return the most recent - # push actions - def get_after_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool, int]]: - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" - SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, - ep.highlight, e.received_ts - FROM ( - SELECT room_id, - MAX(stream_ordering) as stream_ordering - FROM events - INNER JOIN receipts_linearized USING (room_id, event_id) - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) AS rl, - event_push_actions AS ep - INNER JOIN events AS e USING (room_id, event_id) - WHERE - ep.room_id = rl.room_id - AND ep.stream_ordering > rl.stream_ordering - AND ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering DESC LIMIT ? - """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) - - after_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt + receipts_by_room = dict( + await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email_receipts", + self._get_receipts_by_room_txn, + user_id=user_id, + ), ) - # There are rooms with push actions in them but you don't have a read receipt in - # them e.g. rooms you've been invited to, so get push actions for rooms which do - # not have read receipts in them too. - def get_no_receipt( + def get_push_actions_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, int, str, bool, int]]: - receipt_types_clause, args = make_in_list_sql_clause( - self.database_engine, - "receipt_type", - ( - ReceiptTypes.READ, - ReceiptTypes.READ_PRIVATE, - ReceiptTypes.UNSTABLE_READ_PRIVATE, - ), - ) - - sql = f""" + sql = """ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight, e.received_ts FROM event_push_actions AS ep INNER JOIN events AS e USING (room_id, event_id) WHERE - ep.room_id NOT IN ( - SELECT room_id FROM receipts_linearized - WHERE {receipt_types_clause} AND user_id = ? - GROUP BY room_id - ) - AND ep.user_id = ? + ep.user_id = ? AND ep.stream_ordering > ? AND ep.stream_ordering <= ? AND ep.notif = 1 ORDER BY ep.stream_ordering DESC LIMIT ? """ - args.extend( - (user_id, user_id, min_stream_ordering, max_stream_ordering, limit) - ) - txn.execute(sql, args) + txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) - no_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt + push_actions = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn ) # Make a list of dicts from the two sets of results. notifs = [ EmailPushAction( - event_id=row[0], - room_id=row[1], - stream_ordering=row[2], - actions=_deserialize_action(row[3], row[4]), - received_ts=row[5], + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + actions=_deserialize_action(actions, highlight), + received_ts=received_ts, ) - for row in after_read_receipt + no_read_receipt + for event_id, room_id, stream_ordering, actions, highlight, received_ts in push_actions + # Only include push actions with a stream ordering after any receipt, or without any + # receipt present (invited to but never read rooms). + if stream_ordering > receipts_by_room.get(room_id, 0) ] # Now sort it so it's ordered correctly, since currently it will @@ -792,26 +698,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas int(count_as_unread), # unread column ) - def _add_push_actions_to_staging_txn(txn: LoggingTransaction) -> None: - # We don't use simple_insert_many here to avoid the overhead - # of generating lists of dicts. - - sql = """ - INSERT INTO event_push_actions_staging - (event_id, user_id, actions, notif, highlight, unread) - VALUES (?, ?, ?, ?, ?, ?) - """ - - txn.execute_batch( - sql, - ( - _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.items() - ), - ) - - return await self.db_pool.runInteraction( - "add_push_actions_to_staging", _add_push_actions_to_staging_txn + await self.db_pool.simple_insert_many( + "event_push_actions_staging", + keys=("event_id", "user_id", "actions", "notif", "highlight", "unread"), + values=[ + _gen_entry(user_id, actions) + for user_id, actions in user_id_actions.items() + ], + desc="add_push_actions_to_staging", ) async def remove_push_actions_from_staging(self, event_id: str) -> None: diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 90e6d82058..9f6b1fcef1 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -81,6 +81,7 @@ from synapse.util import unwrapFirstError from synapse.util.async_helpers import ObservableDeferred, delay_cancellation from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import AsyncLruCache +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -339,6 +340,7 @@ class EventsWorkerStore(SQLBaseStore): ) -> Optional[EventBase]: ... + @cancellable async def get_event( self, event_id: str, @@ -433,6 +435,7 @@ class EventsWorkerStore(SQLBaseStore): @trace @tag_args + @cancellable async def get_events_as_list( self, event_ids: Collection[str], @@ -584,6 +587,7 @@ class EventsWorkerStore(SQLBaseStore): return events + @cancellable async def _get_events_from_cache_or_db( self, event_ids: Iterable[str], allow_rejected: bool = False ) -> Dict[str, EventCacheEntry]: @@ -1156,7 +1160,7 @@ class EventsWorkerStore(SQLBaseStore): if format_version is None: # This means that we stored the event before we had the concept # of a event format version, so it must be a V1 event. - format_version = EventFormatVersions.V1 + format_version = EventFormatVersions.ROOM_V1_V2 room_version_id = row.room_version_id @@ -1186,10 +1190,10 @@ class EventsWorkerStore(SQLBaseStore): # # So, the following approximations should be adequate. - if format_version == EventFormatVersions.V1: + if format_version == EventFormatVersions.ROOM_V1_V2: # if it's event format v1 then it must be room v1 or v2 room_version = RoomVersions.V1 - elif format_version == EventFormatVersions.V2: + elif format_version == EventFormatVersions.ROOM_V3: # if it's event format v2 then it must be room v3 room_version = RoomVersions.V3 else: @@ -2111,7 +2115,14 @@ class EventsWorkerStore(SQLBaseStore): AND room_id = ? /* Make sure event is not rejected */ AND rejections.event_id IS NULL - ORDER BY origin_server_ts %s + /** + * First sort by the message timestamp. If the message timestamps are the + * same, we want the message that logically comes "next" (before/after + * the given timestamp) based on the DAG and its topological order (`depth`). + * Finally, we can tie-break based on when it was received on the server + * (`stream_ordering`). + */ + ORDER BY origin_server_ts %s, depth %s, stream_ordering %s LIMIT 1; """ @@ -2130,7 +2141,8 @@ class EventsWorkerStore(SQLBaseStore): order = "ASC" txn.execute( - sql_template % (comparison_operator, order), (timestamp, room_id) + sql_template % (comparison_operator, order, order, order), + (timestamp, room_id), ) row = txn.fetchone() if row: diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore): now = self._clock.time_msec() token = random_string(6) - if self.db_pool.engine.can_native_upsert: - - def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: - # We take out the lock if either a) there is no row for the lock - # already, b) the existing row has timed out, or c) the row is - # for this instance (which means the process got killed and - # restarted) - sql = """ - INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (lock_name, lock_key) - DO UPDATE - SET - token = EXCLUDED.token, - instance_name = EXCLUDED.instance_name, - last_renewed_ts = EXCLUDED.last_renewed_ts - WHERE - worker_locks.last_renewed_ts < ? - OR worker_locks.instance_name = EXCLUDED.instance_name - """ - txn.execute( - sql, - ( - lock_name, - lock_key, - self._instance_name, - token, - now, - now - _LOCK_TIMEOUT_MS, - ), - ) - - # We only acquired the lock if we inserted or updated the table. - return bool(txn.rowcount) - - did_lock = await self.db_pool.runInteraction( - "try_acquire_lock", - _try_acquire_lock_txn, - # We can autocommit here as we're executing a single query, this - # will avoid serialization errors. - db_autocommit=True, + def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: + # We take out the lock if either a) there is no row for the lock + # already, b) the existing row has timed out, or c) the row is + # for this instance (which means the process got killed and + # restarted) + sql = """ + INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (lock_name, lock_key) + DO UPDATE + SET + token = EXCLUDED.token, + instance_name = EXCLUDED.instance_name, + last_renewed_ts = EXCLUDED.last_renewed_ts + WHERE + worker_locks.last_renewed_ts < ? + OR worker_locks.instance_name = EXCLUDED.instance_name + """ + txn.execute( + sql, + ( + lock_name, + lock_key, + self._instance_name, + token, + now, + now - _LOCK_TIMEOUT_MS, + ), ) - if not did_lock: - return None - - else: - # If we're on an old SQLite we emulate the above logic by first - # clearing out any existing stale locks and then upserting. - - def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool: - sql = """ - DELETE FROM worker_locks - WHERE - lock_name = ? - AND lock_key = ? - AND (last_renewed_ts < ? OR instance_name = ?) - """ - txn.execute( - sql, - (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name), - ) - - inserted = self.db_pool.simple_upsert_txn_emulated( - txn, - table="worker_locks", - keyvalues={ - "lock_name": lock_name, - "lock_key": lock_key, - }, - values={}, - insertion_values={ - "token": token, - "last_renewed_ts": self._clock.time_msec(), - "instance_name": self._instance_name, - }, - ) - - return inserted - did_lock = await self.db_pool.runInteraction( - "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn - ) + # We only acquired the lock if we inserted or updated the table. + return bool(txn.rowcount) - if not did_lock: - return None + did_lock = await self.db_pool.runInteraction( + "try_acquire_lock", + _try_acquire_lock_txn, + # We can autocommit here as we're executing a single query, this + # will avoid serialization errors. + db_autocommit=True, + ) + if not did_lock: + return None lock = Lock( self._reactor, diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 255620f996..5079edd1e0 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -165,7 +165,6 @@ class PushRulesWorkerStore( return _load_rules(rows, enabled_map, self.hs.config.experimental) - @cached(max_entries=5000) async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]: results = await self.db_pool.simple_select_list( table="push_rules_enable", @@ -229,9 +228,6 @@ class PushRulesWorkerStore( return results - @cachedList( - cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids" - ) async def bulk_get_push_rules_enabled( self, user_ids: Collection[str] ) -> Dict[str, Dict[str, bool]]: @@ -246,6 +242,7 @@ class PushRulesWorkerStore( iterable=user_ids, retcols=("user_name", "rule_id", "enabled"), desc="bulk_get_push_rules_enabled", + batch_size=1000, ) for row in rows: enabled = bool(row["enabled"]) @@ -792,7 +789,6 @@ class PushRuleStore(PushRulesWorkerStore): self.db_pool.simple_insert_txn(txn, "push_rules_stream", values=values) txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,)) - txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,)) txn.call_after( self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 124c70ad37..3838409519 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -812,7 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore): # FIXME: This shouldn't invalidate the whole cache txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) - self.db_pool.simple_delete_txn( + self.db_pool.simple_upsert_txn( txn, table="receipts_graph", keyvalues={ @@ -820,17 +820,13 @@ class ReceiptsWorkerStore(SQLBaseStore): "receipt_type": receipt_type, "user_id": user_id, }, - ) - self.db_pool.simple_insert_txn( - txn, - table="receipts_graph", values={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, "event_ids": json_encoder.encode(event_ids), "data": json_encoder.encode(data), }, + # receipts_graph has a unique constraint on + # (user_id, room_id, receipt_type), so no need to lock + lock=False, ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cb63cd9b7d..ac821878b0 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -69,9 +69,9 @@ class TokenLookupResult: """ user_id: str + token_id: int is_guest: bool = False shadow_banned: bool = False - token_id: Optional[int] = None device_id: Optional[str] = None valid_until_ms: Optional[int] = None token_owner: str = attr.ib() @@ -175,6 +175,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "is_guest", "admin", "consent_version", + "consent_ts", "consent_server_notice_sent", "appservice_id", "creation_ts", @@ -2227,7 +2228,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): txn, table="users", keyvalues={"name": user_id}, - updatevalues={"consent_version": consent_version}, + updatevalues={ + "consent_version": consent_version, + "consent_ts": self._clock.time_msec(), + }, ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index b7d4baa6bb..bef66f1992 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -641,8 +641,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): "version": room[5], "creator": room[6], "encryption": room[7], - "federatable": room[8], - "public": room[9], + # room_stats_state.federatable is an integer on sqlite. + "federatable": bool(room[8]), + # rooms.is_public is an integer on sqlite. + "public": bool(room[9]), "join_rules": room[10], "guest_access": room[11], "history_visibility": room[12], @@ -1183,8 +1185,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): ) return False - @staticmethod - def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None: + def _clear_partial_state_room_txn( + self, txn: LoggingTransaction, room_id: str + ) -> None: DatabasePool.simple_delete_txn( txn, table="partial_state_rooms_servers", @@ -1195,7 +1198,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): table="partial_state_rooms", keyvalues={"room_id": room_id}, ) + self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) + @cached() async def is_partial_state_room(self, room_id: str) -> bool: """Checks if this room has partial state. @@ -1769,9 +1774,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): servers, ) - @staticmethod def _store_partial_state_room_txn( - txn: LoggingTransaction, room_id: str, servers: Collection[str] + self, txn: LoggingTransaction, room_id: str, servers: Collection[str] ) -> None: DatabasePool.simple_insert_txn( txn, @@ -1786,6 +1790,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): keys=("room_id", "server_name"), values=((room_id, s) for s in servers), ) + self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) async def maybe_store_room_on_outlier_membership( self, room_id: str, room_version: RoomVersion diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 827c1f1efd..6e1ff5626b 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -31,7 +31,6 @@ from typing import ( import attr from synapse.api.constants import EventTypes, Membership -from synapse.events import EventBase from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import ( run_as_background_process, @@ -56,6 +55,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -187,27 +187,55 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(max_entries=100000, iterable=True) async def get_users_in_room(self, room_id: str) -> List[str]: + """ + Returns a list of users in the room sorted by longest in the room first + (aka. with the lowest depth). This is done to match the sort in + `get_current_hosts_in_room()` and so we can re-use the cache but it's + not horrible to have here either. + + Uses `m.room.member`s in the room state at the current forward extremities to + determine which users are in the room. + + Will return inaccurate results for rooms with partial state, since the state for + the forward extremities of those rooms will exclude most members. We may also + calculate room state incorrectly for such rooms and believe that a member is or + is not in the room when the opposite is true. + """ return await self.db_pool.runInteraction( "get_users_in_room", self.get_users_in_room_txn, room_id ) def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[str]: + """ + Returns a list of users in the room sorted by longest in the room first + (aka. with the lowest depth). This is done to match the sort in + `get_current_hosts_in_room()` and so we can re-use the cache but it's + not horrible to have here either. + """ # If we can assume current_state_events.membership is up to date # then we can avoid a join, which is a Very Good Thing given how # frequently this function gets called. if self._current_state_events_membership_up_to_date: sql = """ - SELECT state_key FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? AND membership = ? + SELECT c.state_key FROM current_state_events as c + /* Get the depth of the event from the events table */ + INNER JOIN events AS e USING (event_id) + WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ? + /* Sorted by lowest depth first */ + ORDER BY e.depth ASC; """ else: sql = """ - SELECT state_key FROM room_memberships as m + SELECT c.state_key FROM room_memberships as m + /* Get the depth of the event from the events table */ + INNER JOIN events AS e USING (event_id) INNER JOIN current_state_events as c ON m.event_id = c.event_id AND m.room_id = c.room_id AND m.user_id = c.state_key WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? + /* Sorted by lowest depth first */ + ORDER BY e.depth ASC; """ txn.execute(sql, (room_id, Membership.JOIN)) @@ -534,6 +562,32 @@ class RoomMemberWorkerStore(EventsWorkerStore): desc="get_local_users_in_room", ) + async def check_local_user_in_room(self, user_id: str, room_id: str) -> bool: + """ + Check whether a given local user is currently joined to the given room. + + Returns: + A boolean indicating whether the user is currently joined to the room + + Raises: + Exeption when called with a non-local user to this homeserver + """ + if not self.hs.is_mine_id(user_id): + raise Exception( + "Cannot call 'check_local_user_in_room' on " + "non-local user %s" % (user_id,), + ) + + ( + membership, + member_event_id, + ) = await self.get_local_current_membership_for_user_in_room( + user_id=user_id, + room_id=room_id, + ) + + return membership == Membership.JOIN + async def get_local_current_membership_for_user_in_room( self, user_id: str, room_id: str ) -> Tuple[Optional[str], Optional[str]]: @@ -724,6 +778,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): _get_users_server_still_shares_room_with_txn, ) + @cancellable async def get_rooms_for_user( self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None ) -> FrozenSet[str]: @@ -835,144 +890,92 @@ class RoomMemberWorkerStore(EventsWorkerStore): return shared_room_ids or frozenset() - async def get_joined_users_from_state( - self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry" - ) -> Dict[str, ProfileInfo]: - state_group: Union[object, int] = state_entry.state_group - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() - - assert state_group is not None - with Measure(self._clock, "get_joined_users_from_state"): - return await self._get_joined_users_from_context( - room_id, state_group, state, context=state_entry - ) + async def get_joined_user_ids_from_state( + self, room_id: str, state: StateMap[str] + ) -> Set[str]: + """ + For a given set of state IDs, get a set of user IDs in the room. - @cached(num_args=2, iterable=True, max_entries=100000) - async def _get_joined_users_from_context( - self, - room_id: str, - state_group: Union[object, int], - current_state_ids: StateMap[str], - event: Optional[EventBase] = None, - context: Optional["_StateCacheEntry"] = None, - ) -> Dict[str, ProfileInfo]: - # We don't use `state_group`, it's there so that we can cache based - # on it. However, it's important that it's never None, since two current_states - # with a state_group of None are likely to be different. - assert state_group is not None + This method checks the local event cache, before calling + `_get_user_ids_from_membership_event_ids` for any uncached events. + """ - users_in_room = {} - member_event_ids = [ - e_id - for key, e_id in current_state_ids.items() - if key[0] == EventTypes.Member - ] - - if context is not None: - # If we have a context with a delta from a previous state group, - # check if we also have the result from the previous group in cache. - # If we do then we can reuse that result and simply update it with - # any membership changes in `delta_ids` - if context.prev_group and context.delta_ids: - prev_res = self._get_joined_users_from_context.cache.get_immediate( - (room_id, context.prev_group), None - ) - if prev_res and isinstance(prev_res, dict): - users_in_room = dict(prev_res) - member_event_ids = [ - e_id - for key, e_id in context.delta_ids.items() - if key[0] == EventTypes.Member - ] - for etype, state_key in context.delta_ids: - if etype == EventTypes.Member: - users_in_room.pop(state_key, None) - - # We check if we have any of the member event ids in the event cache - # before we ask the DB - - # We don't update the event cache hit ratio as it completely throws off - # the hit ratio counts. After all, we don't populate the cache if we - # miss it here - event_map = self._get_events_from_local_cache( - member_event_ids, update_metrics=False - ) + with Measure(self._clock, "get_joined_user_ids_from_state"): + users_in_room = set() + member_event_ids = [ + e_id for key, e_id in state.items() if key[0] == EventTypes.Member + ] - missing_member_event_ids = [] - for event_id in member_event_ids: - ev_entry = event_map.get(event_id) - if ev_entry and not ev_entry.event.rejected_reason: - if ev_entry.event.membership == Membership.JOIN: - users_in_room[ev_entry.event.state_key] = ProfileInfo( - display_name=ev_entry.event.content.get("displayname", None), - avatar_url=ev_entry.event.content.get("avatar_url", None), - ) - else: - missing_member_event_ids.append(event_id) + # We check if we have any of the member event ids in the event cache + # before we ask the DB - if missing_member_event_ids: - event_to_memberships = await self._get_joined_profiles_from_event_ids( - missing_member_event_ids + # We don't update the event cache hit ratio as it completely throws off + # the hit ratio counts. After all, we don't populate the cache if we + # miss it here + event_map = self._get_events_from_local_cache( + member_event_ids, update_metrics=False ) - users_in_room.update(row for row in event_to_memberships.values() if row) - - if event is not None and event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - if event.event_id in member_event_ids: - users_in_room[event.state_key] = ProfileInfo( - display_name=event.content.get("displayname", None), - avatar_url=event.content.get("avatar_url", None), + + missing_member_event_ids = [] + for event_id in member_event_ids: + ev_entry = event_map.get(event_id) + if ev_entry and not ev_entry.event.rejected_reason: + if ev_entry.event.membership == Membership.JOIN: + users_in_room.add(ev_entry.event.state_key) + else: + missing_member_event_ids.append(event_id) + + if missing_member_event_ids: + event_to_memberships = ( + await self._get_user_ids_from_membership_event_ids( + missing_member_event_ids ) + ) + users_in_room.update( + user_id for user_id in event_to_memberships.values() if user_id + ) - return users_in_room + return users_in_room - @cached(max_entries=10000) - def _get_joined_profile_from_event_id( + @cached( + max_entries=10000, + # This name matches the old function that has been replaced - the cache name + # is kept here to maintain backwards compatibility. + name="_get_joined_profile_from_event_id", + ) + def _get_user_id_from_membership_event_id( self, event_id: str ) -> Optional[Tuple[str, ProfileInfo]]: raise NotImplementedError() @cachedList( - cached_method_name="_get_joined_profile_from_event_id", + cached_method_name="_get_user_id_from_membership_event_id", list_name="event_ids", ) - async def _get_joined_profiles_from_event_ids( + async def _get_user_ids_from_membership_event_ids( self, event_ids: Iterable[str] - ) -> Dict[str, Optional[Tuple[str, ProfileInfo]]]: + ) -> Dict[str, Optional[str]]: """For given set of member event_ids check if they point to a join - event and if so return the associated user and profile info. + event. Args: event_ids: The member event IDs to lookup Returns: - Map from event ID to `user_id` and ProfileInfo (or None if not join event). + Map from event ID to `user_id`, or None if event is not a join. """ rows = await self.db_pool.simple_select_many_batch( table="room_memberships", column="event_id", iterable=event_ids, - retcols=("user_id", "display_name", "avatar_url", "event_id"), + retcols=("user_id", "event_id"), keyvalues={"membership": Membership.JOIN}, batch_size=1000, - desc="_get_joined_profiles_from_event_ids", + desc="_get_user_ids_from_membership_event_ids", ) - return { - row["event_id"]: ( - row["user_id"], - ProfileInfo( - avatar_url=row["avatar_url"], display_name=row["display_name"] - ), - ) - for row in rows - } + return {row["event_id"]: row["user_id"] for row in rows} @cached(max_entries=10000) async def is_host_joined(self, room_id: str, host: str) -> bool: @@ -1018,37 +1021,81 @@ class RoomMemberWorkerStore(EventsWorkerStore): return True @cached(iterable=True, max_entries=10000) - async def get_current_hosts_in_room(self, room_id: str) -> Set[str]: - """Get current hosts in room based on current state.""" + async def get_current_hosts_in_room(self, room_id: str) -> List[str]: + """ + Get current hosts in room based on current state. + + The heuristic of sorting by servers who have been in the room the + longest is good because they're most likely to have anything we ask + about. + + Uses `m.room.member`s in the room state at the current forward extremities to + determine which hosts are in the room. + + Will return inaccurate results for rooms with partial state, since the state for + the forward extremities of those rooms will exclude most members. We may also + calculate room state incorrectly for such rooms and believe that a host is or + is not in the room when the opposite is true. + + Returns: + Returns a list of servers sorted by longest in the room first. (aka. + sorted by join with the lowest depth first). + """ # First we check if we already have `get_users_in_room` in the cache, as # we can just calculate result from that users = self.get_users_in_room.cache.get_immediate( (room_id,), None, update_metrics=False ) - if users is not None: - return {get_domain_from_id(u) for u in users} - - if isinstance(self.database_engine, Sqlite3Engine): + if users is None and isinstance(self.database_engine, Sqlite3Engine): # If we're using SQLite then let's just always use # `get_users_in_room` rather than funky SQL. users = await self.get_users_in_room(room_id) - return {get_domain_from_id(u) for u in users} + + if users is not None: + # Because `users` is sorted from lowest -> highest depth, the list + # of domains will also be sorted that way. + domains: List[str] = [] + # We use a `Set` just for fast lookups + domain_set: Set[str] = set() + for u in users: + if ":" not in u: + continue + domain = get_domain_from_id(u) + if domain not in domain_set: + domain_set.add(domain) + domains.append(domain) + return domains # For PostgreSQL we can use a regex to pull out the domains from the # joined users in `current_state_events` via regex. - def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> Set[str]: + def get_current_hosts_in_room_txn(txn: LoggingTransaction) -> List[str]: + # Returns a list of servers currently joined in the room sorted by + # longest in the room first (aka. with the lowest depth). The + # heuristic of sorting by servers who have been in the room the + # longest is good because they're most likely to have anything we + # ask about. sql = """ - SELECT DISTINCT substring(state_key FROM '@[^:]*:(.*)$') - FROM current_state_events + SELECT + /* Match the domain part of the MXID */ + substring(c.state_key FROM '@[^:]*:(.*)$') as server_domain + FROM current_state_events c + /* Get the depth of the event from the events table */ + INNER JOIN events AS e USING (event_id) WHERE - type = 'm.room.member' - AND membership = 'join' - AND room_id = ? + /* Find any join state events in the room */ + c.type = 'm.room.member' + AND c.membership = 'join' + AND c.room_id = ? + /* Group all state events from the same domain into their own buckets (groups) */ + GROUP BY server_domain + /* Sorted by lowest depth first */ + ORDER BY min(e.depth) ASC; """ txn.execute(sql, (room_id,)) - return {d for d, in txn} + # `server_domain` will be `NULL` for malformed MXIDs with no colons. + return [d for d, in txn if d is not None] return await self.db_pool.runInteraction( "get_current_hosts_in_room", get_current_hosts_in_room_txn @@ -1131,12 +1178,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): else: # The cache doesn't match the state group or prev state group, # so we calculate the result from first principles. - joined_users = await self.get_joined_users_from_state( - room_id, state, state_entry + joined_user_ids = await self.get_joined_user_ids_from_state( + room_id, state ) cache.hosts_to_joined_users = {} - for user_id in joined_users: + for user_id in joined_user_ids: host = intern_string(get_domain_from_id(user_id)) cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 0b10af0e58..32095d7969 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -23,6 +23,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.logging.tracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -36,6 +37,7 @@ from synapse.storage.state import StateFilter from synapse.types import JsonDict, JsonMapping, StateMap from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -142,6 +144,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return room_version + @trace async def get_metadata_for_events( self, event_ids: Collection[str] ) -> Dict[str, EventMetadata]: @@ -281,6 +284,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) # FIXME: how should this be cached? + @cancellable async def get_partial_filtered_current_state_ids( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore): absolutes: Absolute (set) fields additive_relatives: Fields that will be added onto if existing row present. """ - if self.database_engine.can_native_upsert: - absolute_updates = [ - "%(field)s = EXCLUDED.%(field)s" % {"field": field} - for field in absolutes.keys() - ] - - relative_updates = [ - "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" - % {"table": table, "field": field} - for field in additive_relatives.keys() - ] - - insert_cols = [] - qargs = [] - - for (key, val) in chain( - keyvalues.items(), absolutes.items(), additive_relatives.items() - ): - insert_cols.append(key) - qargs.append(val) + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "key_columns": ", ".join(keyvalues), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } - sql = """ - INSERT INTO %(table)s (%(insert_cols_cs)s) - VALUES (%(insert_vals_qs)s) - ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s - """ % { - "table": table, - "insert_cols_cs": ", ".join(insert_cols), - "insert_vals_qs": ", ".join( - ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) - ), - "key_columns": ", ".join(keyvalues), - "updates": ", ".join(chain(absolute_updates, relative_updates)), - } - - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, table) - retcols = list(chain(absolutes.keys(), additive_relatives.keys())) - current_row = self.db_pool.simple_select_one_txn( - txn, table, keyvalues, retcols, allow_none=True - ) - if current_row is None: - merged_dict = {**keyvalues, **absolutes, **additive_relatives} - self.db_pool.simple_insert_txn(txn, table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - if current_row[key] is None: - current_row[key] = val - else: - current_row[key] += val - current_row.update(absolutes) - self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row) + txn.execute(sql, qargs) async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None: """Calculate and insert an entry into room_stats_current. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index f61f290547..f0b179eea5 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -72,6 +72,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.cancellation import cancellable if TYPE_CHECKING: from synapse.server import HomeServer @@ -597,6 +598,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret, key + @cancellable async def get_membership_changes_for_user( self, user_id: str, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): retry_interval: how long until next retry in ms """ - if self.database_engine.can_native_upsert: - await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings_native, - destination, - failure_ts, - retry_last_ts, - retry_interval, - db_autocommit=True, # Safe as its a single upsert - ) - else: - await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings_emulated, - destination, - failure_ts, - retry_last_ts, - retry_interval, - ) + await self.db_pool.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings_native, + destination, + failure_ts, + retry_last_ts, + retry_interval, + db_autocommit=True, # Safe as it's a single upsert + ) def _set_destination_retry_timings_native( self, @@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): retry_last_ts: int, retry_interval: int, ) -> None: - assert self.database_engine.can_native_upsert - # Upsert retry time interval if retry_interval is zero (i.e. we're # resetting it) or greater than the existing retry interval. # diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index bb64543c1f..f8cfcaca83 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -31,6 +31,7 @@ from synapse.storage.util.sequence import build_sequence_generator from synapse.types import MutableStateMap, StateKey, StateMap from synapse.util.caches.descriptors import cached from synapse.util.caches.dictionary_cache import DictionaryCache +from synapse.util.cancellation import cancellable if TYPE_CHECKING: from synapse.server import HomeServer @@ -156,6 +157,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "get_state_group_delta", _get_state_group_delta_txn ) + @cancellable async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter ) -> Dict[int, StateMap[str]]: @@ -235,6 +237,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types + @cancellable async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None ) -> Dict[int, MutableStateMap[str]]: diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): @property @abc.abstractmethod - def can_native_upsert(self) -> bool: - """ - Do we support native UPSERTs? - """ - ... - - @property - @abc.abstractmethod def supports_using_any_list(self) -> bool: """ Do we support using `a = ANY(?)` and passing a list diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]): db_conn.commit() @property - def can_native_upsert(self) -> bool: - """ - Can we use native UPSERTs? - """ - return True - - @property def supports_using_any_list(self) -> bool: """Do we support using `a = ANY(?)` and passing a list""" return True diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]): return True @property - def can_native_upsert(self) -> bool: - """ - Do we support native UPSERTs? This requires SQLite3 3.24+, plus some - more work we haven't done yet to tell what was inserted vs updated. - """ - return sqlite3.sqlite_version_info >= (3, 24, 0) - - @property def supports_using_any_list(self) -> bool: """Do we support using `a = ANY(?)` and passing a list""" return False @@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]): self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False ) -> None: if not allow_outdated_version: - version = sqlite3.sqlite_version_info # Synapse is untested against older SQLite versions, and we don't want # to let users upgrade to a version of Synapse with broken support for their # sqlite version, because it risks leaving them with a half-upgraded db. - if version < (3, 22, 0): - raise RuntimeError("Synapse requires sqlite 3.22 or above.") + if sqlite3.sqlite_version_info < (3, 27, 0): + raise RuntimeError("Synapse requires sqlite 3.27 or above.") def check_new_database(self, txn: Cursor) -> None: """Gets called when setting up a brand new database. This allows us to diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index dd187f7422..b0458643e6 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -75,6 +75,7 @@ Changes in SCHEMA_VERSION = 71: Changes in SCHEMA_VERSION = 72: - event_edges.(room_id, is_state) are no longer written to. - Tables related to groups are dropped. + - Unused column application_services_state.last_txn is dropped - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room` from `opentracing_context` to generalized `tracing_context`. """ diff --git a/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres new file mode 100644
index 0000000000..13d47de9e6 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.postgres
@@ -0,0 +1,17 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop unused column application_services_state.last_txn +ALTER table application_services_state DROP COLUMN last_txn; \ No newline at end of file diff --git a/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite new file mode 100644
index 0000000000..3be1c88d72 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/04drop_column_application_services_state_last_txn.sql.sqlite
@@ -0,0 +1,40 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop unused column application_services_state.last_txn + +CREATE TABLE application_services_state2 ( + as_id TEXT PRIMARY KEY NOT NULL, + state VARCHAR(5), + read_receipt_stream_id BIGINT, + presence_stream_id BIGINT, + to_device_stream_id BIGINT, + device_list_stream_id BIGINT +); + + +INSERT INTO application_services_state2 ( + as_id, + state, + read_receipt_stream_id, + presence_stream_id, + to_device_stream_id, + device_list_stream_id +) +SELECT as_id, state, read_receipt_stream_id, presence_stream_id, to_device_stream_id, device_list_stream_id +FROM application_services_state; + +DROP TABLE application_services_state; +ALTER TABLE application_services_state2 RENAME TO application_services_state; diff --git a/synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql b/synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql new file mode 100644
index 0000000000..36b41049cd --- /dev/null +++ b/synapse/storage/schema/main/delta/72/05remove_unstable_private_read_receipts.sql
@@ -0,0 +1,19 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop previously received private read receipts so they do not accidentally +-- get leaked to other users. +DELETE FROM receipts_linearized WHERE receipt_type = 'org.matrix.msc2285.read.private'; +DELETE FROM receipts_graph WHERE receipt_type = 'org.matrix.msc2285.read.private'; diff --git a/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql new file mode 100644
index 0000000000..609eb1750f --- /dev/null +++ b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
@@ -0,0 +1,16 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE users ADD consent_ts bigint; diff --git a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql b/synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql
index ae904863f8..ae904863f8 100644 --- a/synapse/storage/schema/main/delta/72/04rename_opentelemtetry_tracing_context.sql +++ b/synapse/storage/schema/main/delta/72/07rename_opentelemtetry_tracing_context.sql
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 3c13859faa..2dfe4c0b66 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py
@@ -460,8 +460,17 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): # Cast safety: this corresponds to the types returned by the query above. rows.extend(cast(Iterable[Tuple[str, int]], cur)) - # Sort so that we handle rows in order for each instance. - rows.sort() + # Sort by stream_id (ascending, lowest -> highest) so that we handle + # rows in order for each instance because we don't want to overwrite + # the current_position of an instance to a lower stream ID than + # we're actually at. + def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int: + (instance, stream_id) = row + # If `stream_id` is ever `None`, we will see a `TypeError: '<' + # not supported between instances of 'NoneType' and 'X'` error. + return stream_id + + rows.sort(key=sort_by_stream_id_key_func) with self._lock: for ( diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
index 07af89ee31..9bdb68c0c1 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -24,6 +24,7 @@ from synapse.logging.tracing import trace_with_opname from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.util import unwrapFirstError +from synapse.util.cancellation import cancellable logger = logging.getLogger(__name__) @@ -60,6 +61,7 @@ class PartialStateEventsTracker: o.callback(None) @trace_with_opname("PartialStateEventsTracker.await_full_state") + @cancellable async def await_full_state(self, event_ids: Collection[str]) -> None: """Wait for all the given events to have full state. @@ -154,6 +156,7 @@ class PartialCurrentStateTracker: o.callback(None) @trace_with_opname("PartialCurrentStateTracker.await_full_state") + @cancellable async def await_full_state(self, room_id: str) -> None: # We add the deferred immediately so that the DB call to check for # partial state doesn't race when we unpartial the room.