summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/__init__.py57
-rw-r--r--synapse/storage/databases/main/account_data.py33
-rw-r--r--synapse/storage/databases/main/appservice.py17
-rw-r--r--synapse/storage/databases/main/cache.py60
-rw-r--r--synapse/storage/databases/main/devices.py391
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py8
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py112
-rw-r--r--synapse/storage/databases/main/event_federation.py394
-rw-r--r--synapse/storage/databases/main/event_push_actions.py1245
-rw-r--r--synapse/storage/databases/main/events.py314
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py16
-rw-r--r--synapse/storage/databases/main/events_worker.py385
-rw-r--r--synapse/storage/databases/main/filtering.py4
-rw-r--r--synapse/storage/databases/main/lock.py121
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py8
-rw-r--r--synapse/storage/databases/main/purge_events.py5
-rw-r--r--synapse/storage/databases/main/push_rule.py167
-rw-r--r--synapse/storage/databases/main/pusher.py173
-rw-r--r--synapse/storage/databases/main/receipts.py376
-rw-r--r--synapse/storage/databases/main/registration.py339
-rw-r--r--synapse/storage/databases/main/relations.py586
-rw-r--r--synapse/storage/databases/main/room.py233
-rw-r--r--synapse/storage/databases/main/roommember.py708
-rw-r--r--synapse/storage/databases/main/search.py376
-rw-r--r--synapse/storage/databases/main/state.py9
-rw-r--r--synapse/storage/databases/main/stats.py86
-rw-r--r--synapse/storage/databases/main/stream.py70
-rw-r--r--synapse/storage/databases/main/transactions.py30
-rw-r--r--synapse/storage/databases/main/user_directory.py9
-rw-r--r--synapse/storage/databases/state/bg_updates.py99
-rw-r--r--synapse/storage/databases/state/store.py3
31 files changed, 4615 insertions, 1819 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py

index 4dccbb732a..0e47592be3 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py
@@ -26,9 +26,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.stats import UserSortOrder from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.types import Cursor -from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict, get_domain_from_id -from synapse.util.caches.stream_change_cache import StreamChangeCache from .account_data import AccountDataStore from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore @@ -83,6 +81,7 @@ logger = logging.getLogger(__name__) class DataStore( EventsBackgroundUpdatesStore, + DeviceStore, RoomMemberStore, RoomStore, RoomBatchStore, @@ -114,7 +113,6 @@ class DataStore( StreamWorkerStore, OpenIdStore, ClientIpWorkerStore, - DeviceStore, DeviceInboxStore, UserDirectoryStore, UserErasureStore, @@ -138,41 +136,8 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._device_list_id_gen = StreamIdGenerator( - db_conn, - "device_lists_stream", - "stream_id", - extra_tables=[ - ("user_signature_stream", "stream_id"), - ("device_lists_outbound_pokes", "stream_id"), - ("device_lists_changes_in_room", "stream_id"), - ], - ) - super().__init__(database, db_conn, hs) - events_max = self._stream_id_gen.get_current_token() - curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( - db_conn, - "current_state_delta_stream", - entity_column="room_id", - stream_column="stream_id", - max_value=events_max, # As we share the stream id with events token - limit=1000, - ) - self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", - min_curr_state_delta_id, - prefilled_cache=curr_state_delta_prefill, - ) - - self._stream_order_on_start = self.get_room_max_stream_ordering() - self._min_stream_order_on_start = self.get_room_min_stream_ordering() - - def get_device_stream_token(self) -> int: - # TODO: shouldn't this be moved to `DeviceWorkerStore`? - return self._device_list_id_gen.get_current_token() - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. @@ -201,8 +166,9 @@ class DataStore( name: Optional[str] = None, guests: bool = True, deactivated: bool = False, - order_by: str = UserSortOrder.USER_ID.value, + order_by: str = UserSortOrder.NAME.value, direction: str = "f", + approved: bool = True, ) -> Tuple[List[JsonDict], int]: """Function to retrieve a paginated list of users from users list. This will return a json list of users and the @@ -217,6 +183,7 @@ class DataStore( deactivated: whether to include deactivated users order_by: the sort order of the returned list direction: sort ascending or descending + approved: whether to include approved users Returns: A tuple of a list of mappings from user to information and a count of total users. """ @@ -249,11 +216,17 @@ class DataStore( if not deactivated: filters.append("deactivated = 0") + if not approved: + # We ignore NULL values for the approved flag because these should only + # be already existing users that we consider as already approved. + filters.append("approved IS FALSE") + where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else "" sql_base = f""" FROM users as u LEFT JOIN profiles AS p ON u.name = '@' || p.user_id || ':' || ? + LEFT JOIN erased_users AS eu ON u.name = eu.user_id {where_clause} """ sql = "SELECT COUNT(*) as total_users " + sql_base @@ -262,7 +235,8 @@ class DataStore( sql = f""" SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, - displayname, avatar_url, creation_ts * 1000 as creation_ts + displayname, avatar_url, creation_ts * 1000 as creation_ts, approved, + eu.user_id is not null as erased {sql_base} ORDER BY {order_by_column} {order}, u.name ASC LIMIT ? OFFSET ? @@ -270,6 +244,13 @@ class DataStore( args += [limit, start] txn.execute(sql, args) users = self.db_pool.cursor_to_dict(txn) + + # some of those boolean values are returned as integers when we're on SQLite + columns_to_boolify = ["erased"] + for user in users: + for column in columns_to_boolify: + user[column] = bool(user[column]) + return users, count return await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 9af9f4f18e..282687ebce 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py
@@ -27,7 +27,6 @@ from typing import ( ) from synapse.api.constants import AccountDataTypes -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream from synapse.storage._base import db_to_json from synapse.storage.database import ( @@ -68,12 +67,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) # to write account data. A value of `True` implies that `_account_data_id_gen` # is an `AbstractStreamIdGenerator` and not just a tracker. self._account_data_id_gen: AbstractStreamIdTracker + self._can_write_to_account_data = ( + self._instance_name in hs.config.worker.writers.account_data + ) if isinstance(database.engine, PostgresEngine): - self._can_write_to_account_data = ( - self._instance_name in hs.config.worker.writers.account_data - ) - self._account_data_id_gen = MultiWriterIdGenerator( db_conn=db_conn, db=database, @@ -95,21 +93,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # updated over replication. (Multiple writers are not supported for # SQLite). - if self._instance_name in hs.config.worker.writers.account_data: - self._can_write_to_account_data = True - self._account_data_id_gen = StreamIdGenerator( - db_conn, - "room_account_data", - "stream_id", - extra_tables=[("room_tags_revisions", "stream_id")], - ) - else: - self._account_data_id_gen = SlavedIdTracker( - db_conn, - "room_account_data", - "stream_id", - extra_tables=[("room_tags_revisions", "stream_id")], - ) + self._account_data_id_gen = StreamIdGenerator( + db_conn, + "room_account_data", + "stream_id", + extra_tables=[("room_tags_revisions", "stream_id")], + is_writer=self._instance_name in hs.config.worker.writers.account_data, + ) account_max = self.get_max_account_data_stream_id() self._account_data_stream_cache = StreamChangeCache( @@ -650,9 +640,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) txn, self.get_account_data_for_room, (user_id,) ) self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,)) - self._invalidate_cache_and_stream( - txn, self.get_push_rules_enabled_for_user, (user_id,) - ) # This user might be contained in the ignored_by cache for other users, # so we have to invalidate it all. self._invalidate_all_cache_and_stream(txn, self.ignored_by) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 64b70a7b28..63046c0527 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py
@@ -157,10 +157,23 @@ class ApplicationServiceWorkerStore(RoomMemberWorkerStore): app_service: "ApplicationService", cache_context: _CacheContext, ) -> List[str]: - users_in_room = await self.get_users_in_room( + """ + Get all users in a room that the appservice controls. + + Args: + room_id: The room to check in. + app_service: The application service to check interest/control against + + Returns: + List of user IDs that the appservice controls. + """ + # We can use `get_local_users_in_room(...)` here because an application service + # can only be interested in local users of the server it's on (ignore any remote + # users that might match the user namespace regex). + local_users_in_room = await self.get_local_users_in_room( room_id, on_invalidate=cache_context.invalidate ) - return list(filter(app_service.is_interested_in_user, users_in_room)) + return list(filter(app_service.is_interested_in_user, local_users_in_room)) class ApplicationServiceStore(ApplicationServiceWorkerStore): diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 12e9a42382..a58668a380 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -33,7 +33,7 @@ from synapse.storage.database import ( ) from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.util.caches.descriptors import _CachedFunction +from synapse.util.caches.descriptors import CachedFunction from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -205,6 +205,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self.get_rooms_for_user_with_stream_ordering.invalidate( (data.state_key,) ) + self.get_rooms_for_user.invalidate((data.state_key,)) else: raise Exception("Unknown events stream row type %s" % (row.type,)) @@ -223,15 +224,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore): # process triggering the invalidation is responsible for clearing any external # cached objects. self._invalidate_local_get_event_cache(event_id) - self.have_seen_event.invalidate((room_id, event_id)) - self.get_latest_event_ids_in_room.invalidate((room_id,)) - - self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,)) + self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id)) + self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,)) + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) + ) # The `_get_membership_from_event_id` is immutable, except for the # case where we look up an event *before* persisting it. - self._get_membership_from_event_id.invalidate((event_id,)) + self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,)) if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) @@ -240,19 +242,31 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._invalidate_local_get_event_cache(redacts) # Caches which might leak edits must be invalidated for the event being # redacted. - self.get_relations_for_event.invalidate((redacts,)) - self.get_applicable_edit.invalidate((redacts,)) + self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,)) + self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,)) + self._attempt_to_invalidate_cache("get_thread_id", (redacts,)) + self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,)) if etype == EventTypes.Member: self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) - self.get_invited_rooms_for_local_user.invalidate((state_key,)) + self._attempt_to_invalidate_cache( + "get_invited_rooms_for_local_user", (state_key,) + ) + self._attempt_to_invalidate_cache( + "get_rooms_for_user_with_stream_ordering", (state_key,) + ) + self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,)) if relates_to: - self.get_relations_for_event.invalidate((relates_to,)) - self.get_aggregation_groups_for_event.invalidate((relates_to,)) - self.get_applicable_edit.invalidate((relates_to,)) - self.get_thread_summary.invalidate((relates_to,)) - self.get_thread_participated.invalidate((relates_to,)) + self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,)) + self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,)) + self._attempt_to_invalidate_cache( + "get_aggregation_groups_for_event", (relates_to,) + ) + self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,)) + self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,)) + self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,)) + self._attempt_to_invalidate_cache("get_threads", (room_id,)) async def invalidate_cache_and_stream( self, cache_name: str, keys: Tuple[Any, ...] @@ -269,9 +283,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): return cache_func.invalidate(keys) - await self.db_pool.runInteraction( - "invalidate_cache_and_stream", - self._send_invalidation_to_replication, + await self.send_invalidation_to_replication( cache_func.__name__, keys, ) @@ -279,7 +291,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_cache_and_stream( self, txn: LoggingTransaction, - cache_func: _CachedFunction, + cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: """Invalidates the cache and adds it to the cache stream so slaves @@ -293,7 +305,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._send_invalidation_to_replication(txn, cache_func.__name__, keys) def _invalidate_all_cache_and_stream( - self, txn: LoggingTransaction, cache_func: _CachedFunction + self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: """Invalidates the entire cache and adds it to the cache stream so slaves will know to invalidate their caches. @@ -334,6 +346,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore): txn, CURRENT_STATE_CACHE_NAME, [room_id] ) + async def send_invalidation_to_replication( + self, cache_name: str, keys: Optional[Collection[Any]] + ) -> None: + await self.db_pool.runInteraction( + "send_invalidation_to_replication", + self._send_invalidation_to_replication, + cache_name, + keys, + ) + def _send_invalidation_to_replication( self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]] ) -> None: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index ca0fe8c4be..05a193f889 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py
@@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import abc import logging from typing import ( TYPE_CHECKING, @@ -39,6 +38,7 @@ from synapse.logging.opentracing import ( whitelisted_homeserver, ) from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -47,12 +47,19 @@ from synapse.storage.database import ( make_tuple_comparison_clause, ) from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore +from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.types import Cursor +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + AbstractStreamIdTracker, + StreamIdGenerator, +) from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.stringutils import shortstr @@ -69,7 +76,7 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" -class DeviceWorkerStore(EndToEndKeyWorkerStore): +class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def __init__( self, database: DatabasePool, @@ -78,9 +85,23 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): ): super().__init__(database, db_conn, hs) + # In the worker store this is an ID tracker which we overwrite in the non-worker + # class below that is used on the main process. + self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + db_conn, + "device_lists_stream", + "stream_id", + extra_tables=[ + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ("device_lists_changes_in_room", "stream_id"), + ], + is_writer=hs.config.worker.worker_app is None, + ) + # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). - device_list_max = self._device_list_id_gen.get_current_token() # type: ignore[attr-defined] + device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, "device_lists_stream", @@ -134,6 +155,39 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: + if stream_name == DeviceListsStream.NAME: + self._device_list_id_gen.advance(instance_name, token) + self._invalidate_caches_for_devices(token, rows) + elif stream_name == UserSignatureStream.NAME: + self._device_list_id_gen.advance(instance_name, token) + for row in rows: + self._user_signature_stream_cache.entity_has_changed(row.user_id, token) + return super().process_replication_rows(stream_name, instance_name, token, rows) + + def _invalidate_caches_for_devices( + self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] + ) -> None: + for row in rows: + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. + if row.entity.startswith("@"): + self._device_list_stream_cache.entity_has_changed(row.entity, token) + self.get_cached_devices_for_user.invalidate((row.entity,)) + self._get_cached_user_device.invalidate((row.entity,)) + self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) + + else: + self._device_list_federation_stream_cache.entity_has_changed( + row.entity, token + ) + + def get_device_stream_token(self) -> int: + return self._device_list_id_gen.get_current_token() + async def count_devices_by_users(self, user_ids: Optional[List[str]] = None) -> int: """Retrieve number of all devices of given users. Only returns number of devices that are not marked as hidden. @@ -272,6 +326,13 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): destination, int(from_stream_id) ) if not has_changed: + # debugging for https://github.com/matrix-org/synapse/issues/14251 + issue_8631_logger.debug( + "%s: no change between %i and %i", + destination, + from_stream_id, + now_stream_id, + ) return now_stream_id, [] updates = await self.db_pool.runInteraction( @@ -464,7 +525,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): limit: Maximum number of device updates to return Returns: - List: List of device update tuples: + List of device update tuples: - user_id - device_id - stream_id @@ -537,9 +598,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): "device_id": device_id, "prev_id": [prev_id] if prev_id else [], "stream_id": stream_id, - "org.matrix.opentracing_context": opentracing_context, } + if opentracing_context != "{}": + result["org.matrix.opentracing_context"] = opentracing_context + prev_id = stream_id if device is not None: @@ -547,7 +610,11 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): if keys: result["keys"] = keys - device_display_name = device.display_name + device_display_name = None + if ( + self.hs.config.federation.allow_device_name_lookup_over_federation + ): + device_display_name = device.display_name if device_display_name: result["device_display_name"] = device_display_name else: @@ -662,12 +729,8 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): }, ) - @abc.abstractmethod - def get_device_stream_token(self) -> int: - """Get the current stream id from the _device_list_id_gen""" - ... - @trace + @cancellable async def get_user_devices_from_cache( self, query_list: List[Tuple[str, Optional[str]]] ) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]: @@ -743,6 +806,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): return self._device_list_stream_cache.get_all_entities_changed(from_key) + @cancellable async def get_users_whose_devices_changed( self, from_key: int, @@ -982,24 +1046,59 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): desc="mark_remote_user_device_cache_as_valid", ) + async def handle_potentially_left_users(self, user_ids: Set[str]) -> None: + """Given a set of remote users check if the server still shares a room with + them. If not then mark those users' device cache as stale. + """ + + if not user_ids: + return + + await self.db_pool.runInteraction( + "_handle_potentially_left_users", + self.handle_potentially_left_users_txn, + user_ids, + ) + + def handle_potentially_left_users_txn( + self, + txn: LoggingTransaction, + user_ids: Set[str], + ) -> None: + """Given a set of remote users check if the server still shares a room with + them. If not then mark those users' device cache as stale. + """ + + if not user_ids: + return + + joined_users = self.get_users_server_still_shares_room_with_txn(txn, user_ids) + left_users = user_ids - joined_users + + for user_id in left_users: + self.mark_remote_user_device_list_as_unsubscribed_txn(txn, user_id) + async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None: """Mark that we no longer track device lists for remote user.""" - def _mark_remote_user_device_list_as_unsubscribed_txn( - txn: LoggingTransaction, - ) -> None: - self.db_pool.simple_delete_txn( - txn, - table="device_lists_remote_extremeties", - keyvalues={"user_id": user_id}, - ) - self._invalidate_cache_and_stream( - txn, self.get_device_list_last_stream_id_for_remote, (user_id,) - ) - await self.db_pool.runInteraction( "mark_remote_user_device_list_as_unsubscribed", - _mark_remote_user_device_list_as_unsubscribed_txn, + self.mark_remote_user_device_list_as_unsubscribed_txn, + user_id, + ) + + def mark_remote_user_device_list_as_unsubscribed_txn( + self, + txn: LoggingTransaction, + user_id: str, + ) -> None: + self.db_pool.simple_delete_txn( + txn, + table="device_lists_remote_extremeties", + keyvalues={"user_id": user_id}, + ) + self._invalidate_cache_and_stream( + txn, self.get_device_list_last_stream_id_for_remote, (user_id,) ) async def get_dehydrated_device( @@ -1221,6 +1320,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): desc="get_min_device_lists_changes_in_room", ) + @cancellable async def get_device_list_changes_in_rooms( self, room_ids: Collection[str], from_id: int ) -> Optional[Set[str]]: @@ -1267,6 +1367,33 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore): return changes + async def get_device_list_changes_in_room( + self, room_id: str, min_stream_id: int + ) -> Collection[Tuple[str, str]]: + """Get all device list changes that happened in the room since the given + stream ID. + + Returns: + Collection of user ID/device ID tuples of all devices that have + changed + """ + + sql = """ + SELECT DISTINCT user_id, device_id FROM device_lists_changes_in_room + WHERE room_id = ? AND stream_id > ? + """ + + def get_device_list_changes_in_room_txn( + txn: LoggingTransaction, + ) -> Collection[Tuple[str, str]]: + txn.execute(sql, (room_id, min_stream_id)) + return cast(Collection[Tuple[str, str]], txn.fetchall()) + + return await self.db_pool.runInteraction( + "get_device_list_changes_in_room", + get_device_list_changes_in_room_txn, + ) + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1314,6 +1441,13 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): self._remove_duplicate_outbound_pokes, ) + self.db_pool.updates.register_background_index_update( + "device_lists_changes_in_room_by_room_index", + index_name="device_lists_changes_in_room_by_room_idx", + table="device_lists_changes_in_room", + columns=["room_id", "stream_id"], + ) + async def _drop_device_list_streams_non_unique_indexes( self, progress: JsonDict, batch_size: int ) -> int: @@ -1401,6 +1535,10 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): + # Because we have write access, this will be a StreamIdGenerator + # (see DeviceWorkerStore.__init__) + _device_list_id_gen: AbstractStreamIdGenerator + def __init__( self, database: DatabasePool, @@ -1725,7 +1863,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): context, ) - async with self._device_list_id_gen.get_next_mult( # type: ignore[attr-defined] + async with self._device_list_id_gen.get_next_mult( len(device_ids) ) as stream_ids: await self.db_pool.runInteraction( @@ -1775,7 +1913,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self, txn: LoggingTransaction, user_id: str, - device_ids: Iterable[str], + device_id: str, hosts: Collection[str], stream_ids: List[int], context: Optional[Dict[str, str]], @@ -1791,6 +1929,21 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): stream_id_iterator = iter(stream_ids) encoded_context = json_encoder.encode(context) + mark_sent = not self.hs.is_mine_id(user_id) + + values = [ + ( + destination, + next(stream_id_iterator), + user_id, + device_id, + mark_sent, + now, + encoded_context if whitelisted_homeserver(destination) else "{}", + ) + for destination in hosts + ] + self.db_pool.simple_insert_many_txn( txn, table="device_lists_outbound_pokes", @@ -1803,23 +1956,21 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "ts", "opentracing_context", ), - values=[ - ( - destination, - next(stream_id_iterator), - user_id, - device_id, - not self.hs.is_mine_id( - user_id - ), # We only need to send out update for *our* users - now, - encoded_context if whitelisted_homeserver(destination) else "{}", - ) - for destination in hosts - for device_id in device_ids - ], + values=values, ) + # debugging for https://github.com/matrix-org/synapse/issues/14251 + if issue_8631_logger.isEnabledFor(logging.DEBUG): + issue_8631_logger.debug( + "Recorded outbound pokes for %s:%s with device stream ids %s", + user_id, + device_id, + { + stream_id: destination + for (destination, stream_id, _, _, _, _, _) in values + }, + ) + def _add_device_outbound_room_poke_txn( self, txn: LoggingTransaction, @@ -1864,27 +2015,48 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) async def get_uncoverted_outbound_room_pokes( - self, limit: int = 10 + self, start_stream_id: int, start_room_id: str, limit: int = 10 ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: """Get device list changes by room that have not yet been handled and written to `device_lists_outbound_pokes`. + Args: + start_stream_id: Together with `start_room_id`, indicates the position after + which to return device list changes. + start_room_id: Together with `start_stream_id`, indicates the position after + which to return device list changes. + limit: The maximum number of device list changes to return. + Returns: - A list of user ID, device ID, room ID, stream ID and optional opentracing context. + A list of user ID, device ID, room ID, stream ID and optional opentracing + context, in order of ascending (stream ID, room ID). """ sql = """ SELECT user_id, device_id, room_id, stream_id, opentracing_context FROM device_lists_changes_in_room - WHERE NOT converted_to_destinations - ORDER BY stream_id + WHERE + (stream_id, room_id) > (?, ?) AND + stream_id <= ? AND + NOT converted_to_destinations + ORDER BY stream_id ASC, room_id ASC LIMIT ? """ def get_uncoverted_outbound_room_pokes_txn( txn: LoggingTransaction, ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]: - txn.execute(sql, (limit,)) + txn.execute( + sql, + ( + start_stream_id, + start_room_id, + # Avoid returning rows if there may be uncommitted device list + # changes with smaller stream IDs. + self._device_list_id_gen.get_current_token(), + limit, + ), + ) return [ ( @@ -1906,52 +2078,119 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: str, device_id: str, room_id: str, - stream_id: int, hosts: Collection[str], context: Optional[Dict[str, str]], ) -> None: """Queue the device update to be sent to the given set of hosts, calculated from the room ID. - - Marks the associated row in `device_lists_changes_in_room` as handled. """ + if not hosts: + return def add_device_list_outbound_pokes_txn( txn: LoggingTransaction, stream_ids: List[int] ) -> None: - if hosts: - self._add_device_outbound_poke_to_stream_txn( - txn, - user_id=user_id, - device_ids=[device_id], - hosts=hosts, - stream_ids=stream_ids, - context=context, - ) - - self.db_pool.simple_update_txn( + self._add_device_outbound_poke_to_stream_txn( txn, - table="device_lists_changes_in_room", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - "stream_id": stream_id, - "room_id": room_id, - }, - updatevalues={"converted_to_destinations": True}, + user_id=user_id, + device_id=device_id, + hosts=hosts, + stream_ids=stream_ids, + context=context, ) - if not hosts: - # If there are no hosts then we don't try and generate stream IDs. + async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: return await self.db_pool.runInteraction( "add_device_list_outbound_pokes", add_device_list_outbound_pokes_txn, - [], + stream_ids, ) - async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: # type: ignore[attr-defined] - return await self.db_pool.runInteraction( - "add_device_list_outbound_pokes", - add_device_list_outbound_pokes_txn, - stream_ids, + async def add_remote_device_list_to_pending( + self, user_id: str, device_id: str + ) -> None: + """Add a device list update to the table tracking remote device list + updates during partial joins. + """ + + async with self._device_list_id_gen.get_next() as stream_id: + await self.db_pool.simple_upsert( + table="device_lists_remote_pending", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={"stream_id": stream_id}, + desc="add_remote_device_list_to_pending", ) + + async def get_pending_remote_device_list_updates_for_room( + self, room_id: str + ) -> Collection[Tuple[str, str]]: + """Get the set of remote device list updates from the pending table for + the room. + """ + + min_device_stream_id = await self.db_pool.simple_select_one_onecol( + table="partial_state_rooms", + keyvalues={ + "room_id": room_id, + }, + retcol="device_lists_stream_id", + desc="get_pending_remote_device_list_updates_for_room_device", + ) + + sql = """ + SELECT user_id, device_id FROM device_lists_remote_pending AS d + INNER JOIN current_state_events AS c ON + type = 'm.room.member' + AND state_key = user_id + AND membership = 'join' + WHERE + room_id = ? AND stream_id > ? + """ + + def get_pending_remote_device_list_updates_for_room_txn( + txn: LoggingTransaction, + ) -> Collection[Tuple[str, str]]: + txn.execute(sql, (room_id, min_device_stream_id)) + return cast(Collection[Tuple[str, str]], txn.fetchall()) + + return await self.db_pool.runInteraction( + "get_pending_remote_device_list_updates_for_room", + get_pending_remote_device_list_updates_for_room_txn, + ) + + async def get_device_change_last_converted_pos(self) -> Tuple[int, str]: + """ + Get the position of the last row in `device_list_changes_in_room` that has been + converted to `device_lists_outbound_pokes`. + + Rows with a strictly greater position where `converted_to_destinations` is + `FALSE` have not been converted. + """ + + row = await self.db_pool.simple_select_one( + table="device_lists_changes_converted_stream_position", + keyvalues={}, + retcols=["stream_id", "room_id"], + desc="get_device_change_last_converted_pos", + ) + return row["stream_id"], row["room_id"] + + async def set_device_change_last_converted_pos( + self, + stream_id: int, + room_id: str, + ) -> None: + """ + Set the position of the last row in `device_list_changes_in_room` that has been + converted to `device_lists_outbound_pokes`. + """ + + await self.db_pool.simple_update_one( + table="device_lists_changes_converted_stream_position", + keyvalues={}, + updatevalues={"stream_id": stream_id, "room_id": room_id}, + desc="set_device_change_last_converted_pos", + ) diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index af59be6b48..6240f9a75e 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -391,10 +391,10 @@ class EndToEndRoomKeyStore(SQLBaseStore): Returns: A dict giving the info metadata for this backup version, with fields including: - version(str) - algorithm(str) - auth_data(object): opaque dict supplied by the client - etag(int): tag of the keys in the backup + version (str) + algorithm (str) + auth_data (object): opaque dict supplied by the client + etag (int): tag of the keys in the backup """ def _get_e2e_room_keys_version_info_txn(txn: LoggingTransaction) -> JsonDict: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 46c0d06157..cf33e73e2b 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -43,6 +43,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, make_in_list_sql_clause, + make_tuple_in_list_sql_clause, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.engines import PostgresEngine @@ -50,6 +51,7 @@ from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -135,12 +137,17 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return now_stream_id, [] @trace + @cancellable async def get_e2e_device_keys_for_cs_api( - self, query_list: List[Tuple[str, Optional[str]]] + self, + query_list: List[Tuple[str, Optional[str]]], + include_displaynames: bool = True, ) -> Dict[str, Dict[str, JsonDict]]: """Fetch a list of device keys, formatted suitably for the C/S API. Args: - query_list(list): List of pairs of user_ids and device_ids. + query_list: List of pairs of user_ids and device_ids. + include_displaynames: Whether to include the displayname of returned devices + (if one exists). Returns: Dict mapping from user-id to dict mapping from device_id to key data. The key data will be a dict in the same format as the @@ -163,9 +170,12 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker continue r["unsigned"] = {} - display_name = device_info.display_name - if display_name is not None: - r["unsigned"]["device_display_name"] = display_name + if include_displaynames: + # Include the device's display name in the "unsigned" dictionary + display_name = device_info.display_name + if display_name is not None: + r["unsigned"]["device_display_name"] = display_name + rv[user_id][device_id] = r return rv @@ -197,6 +207,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ... @trace + @cancellable async def get_e2e_device_keys_and_signatures( self, query_list: Collection[Tuple[str, Optional[str]]], @@ -275,7 +286,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker def _get_e2e_device_keys_txn( self, txn: LoggingTransaction, - query_list: Collection[Tuple[str, str]], + query_list: Collection[Tuple[str, Optional[str]]], include_all_devices: bool = False, include_deleted_devices: bool = False, ) -> Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]: @@ -285,8 +296,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker cross-signing signatures which have been added subsequently (for which, see get_e2e_device_keys_and_signatures) """ - query_clauses = [] - query_params = [] + query_clauses: List[str] = [] + query_params_list: List[List[object]] = [] if include_all_devices is False: include_deleted_devices = False @@ -294,40 +305,64 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker if include_deleted_devices: deleted_devices = set(query_list) + # Split the query list into queries for users and queries for particular + # devices. + user_list = [] + user_device_list = [] for (user_id, device_id) in query_list: - query_clause = "user_id = ?" - query_params.append(user_id) - - if device_id is not None: - query_clause += " AND device_id = ?" - query_params.append(device_id) - - query_clauses.append(query_clause) - - sql = ( - "SELECT user_id, device_id, " - " d.display_name, " - " k.key_json" - " FROM devices d" - " %s JOIN e2e_device_keys_json k USING (user_id, device_id)" - " WHERE %s AND NOT d.hidden" - ) % ( - "LEFT" if include_all_devices else "INNER", - " OR ".join("(" + q + ")" for q in query_clauses), - ) + if device_id is None: + user_list.append(user_id) + else: + user_device_list.append((user_id, device_id)) - txn.execute(sql, query_params) + if user_list: + user_id_in_list_clause, user_args = make_in_list_sql_clause( + txn.database_engine, "user_id", user_list + ) + query_clauses.append(user_id_in_list_clause) + query_params_list.append(user_args) + + if user_device_list: + # Divide the device queries into batches, to avoid excessively large + # queries. + for user_device_batch in batch_iter(user_device_list, 1024): + ( + user_device_id_in_list_clause, + user_device_args, + ) = make_tuple_in_list_sql_clause( + txn.database_engine, ("user_id", "device_id"), user_device_batch + ) + query_clauses.append(user_device_id_in_list_clause) + query_params_list.append(user_device_args) result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {} - for (user_id, device_id, display_name, key_json) in txn: - if include_deleted_devices: - deleted_devices.remove((user_id, device_id)) - result.setdefault(user_id, {})[device_id] = DeviceKeyLookupResult( - display_name, db_to_json(key_json) if key_json else None + for query_clause, query_params in zip(query_clauses, query_params_list): + sql = ( + "SELECT user_id, device_id, " + " d.display_name, " + " k.key_json" + " FROM devices d" + " %s JOIN e2e_device_keys_json k USING (user_id, device_id)" + " WHERE %s AND NOT d.hidden" + ) % ( + "LEFT" if include_all_devices else "INNER", + query_clause, ) + txn.execute(sql, query_params) + + for (user_id, device_id, display_name, key_json) in txn: + assert device_id is not None + if include_deleted_devices: + deleted_devices.remove((user_id, device_id)) + result.setdefault(user_id, {})[device_id] = DeviceKeyLookupResult( + display_name, db_to_json(key_json) if key_json else None + ) + if include_deleted_devices: for user_id, device_id in deleted_devices: + if device_id is None: + continue result.setdefault(user_id, {})[device_id] = None return result @@ -377,10 +412,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker """Retrieve a number of one-time keys for a user Args: - user_id(str): id of user to get keys for - device_id(str): id of device to get keys for - key_ids(list[str]): list of key ids (excluding algorithm) to - retrieve + user_id: id of user to get keys for + device_id: id of device to get keys for + key_ids: list of key ids (excluding algorithm) to retrieve Returns: A map from (algorithm, key_id) to json string for key @@ -887,6 +921,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return keys + @cancellable async def get_e2e_cross_signing_keys_bulk( self, user_ids: List[str], from_user_id: Optional[str] = None ) -> Dict[str, Optional[Dict[str, JsonDict]]]: @@ -902,7 +937,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker keys were not found, either their user ID will not be in the dict, or their user ID will map to None. """ - result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids) if from_user_id: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index eec55b6478..309a4ba664 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import itertools import logging from queue import Empty, PriorityQueue @@ -33,6 +34,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict +from synapse.logging.opentracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -42,11 +44,12 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -70,6 +73,30 @@ pdus_pruned_from_federation_queue = Counter( logger = logging.getLogger(__name__) +# Parameters controlling exponential backoff between backfill failures. +# After the first failure to backfill, we wait 2 hours before trying again. If the +# second attempt fails, we wait 4 hours before trying again. If the third attempt fails, +# we wait 8 hours before trying again, ... and so on. +# +# Each successive backoff period is twice as long as the last. However we cap this +# period at a maximum of 2^8 = 256 hours: a little over 10 days. (This is the smallest +# power of 2 which yields a maximum backoff period of at least 7 days---which was the +# original maximum backoff period.) Even when we hit this cap, we will continue to +# make backfill attempts once every 10 days. +BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS = 8 +BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS = int( + datetime.timedelta(hours=1).total_seconds() * 1000 +) + +# We need a cap on the power of 2 or else the backoff period +# 2^N * (milliseconds per hour) +# will overflow when calcuated within the database. We ensure overflow does not occur +# by checking that the largest backoff period fits in a 32-bit signed integer. +_LONGEST_BACKOFF_PERIOD_MILLISECONDS = ( + 2**BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS +) * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS +assert 0 < _LONGEST_BACKOFF_PERIOD_MILLISECONDS <= ((2**31) - 1) + # All the info we need while iterating the DAG while backfilling @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -126,6 +153,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas ) return await self.get_events_as_list(event_ids) + @trace + @tag_args async def get_auth_chain_ids( self, room_id: str, @@ -709,95 +738,264 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # Return all events where not all sets can reach them. return {eid for eid, n in event_to_missing_sets.items() if n} - async def get_oldest_event_ids_with_depth_in_room( - self, room_id: str + @trace + @tag_args + async def get_backfill_points_in_room( + self, + room_id: str, + current_depth: int, + limit: int, ) -> List[Tuple[str, int]]: - """Gets the oldest events(backwards extremities) in the room along with the - aproximate depth. - - We use this function so that we can compare and see if someones current - depth at their current scrollback is within pagination range of the - event extremeties. If the current depth is close to the depth of given - oldest event, we can trigger a backfill. + """ + Get the backward extremities to backfill from in the room along with the + approximate depth. + + Only returns events that are at a depth lower than or + equal to the `current_depth`. Sorted by depth, highest to lowest (descending) + so the closest events to the `current_depth` are first in the list. + + We ignore extremities that are newer than the user's current scroll position + (ie, those with depth greater than `current_depth`) as: + 1. we don't really care about getting events that have happened + after our current position; and + 2. by the nature of paginating and scrolling back, we have likely + previously tried and failed to backfill from that extremity, so + to avoid getting "stuck" requesting the same backfill repeatedly + we drop those extremities. Args: room_id: Room where we want to find the oldest events + current_depth: The depth at the user's current scrollback position + limit: The max number of backfill points to return Returns: - List of (event_id, depth) tuples + List of (event_id, depth) tuples. Sorted by depth, highest to lowest + (descending) so the closest events to the `current_depth` are first + in the list. """ - def get_oldest_event_ids_with_depth_in_room_txn( + def get_backfill_points_in_room_txn( txn: LoggingTransaction, room_id: str ) -> List[Tuple[str, int]]: - # Assemble a dictionary with event_id -> depth for the oldest events + # Assemble a tuple lookup of event_id -> depth for the oldest events # we know of in the room. Backwards extremeties are the oldest # events we know of in the room but we only know of them because - # some other event referenced them by prev_event and aren't peristed - # in our database yet (meaning we don't know their depth - # specifically). So we need to look for the aproximate depth from + # some other event referenced them by prev_event and aren't + # persisted in our database yet (meaning we don't know their depth + # specifically). So we need to look for the approximate depth from # the events connected to the current backwards extremeties. - sql = """ - SELECT b.event_id, MAX(e.depth) FROM events as e + + if isinstance(self.database_engine, PostgresEngine): + least_function = "LEAST" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "MIN" + else: + raise RuntimeError("Unknown database engine") + + sql = f""" + SELECT backward_extrem.event_id, event.depth FROM events AS event /** * Get the edge connections from the event_edges table * so we can see whether this event's prev_events points * to a backward extremity in the next join. */ - INNER JOIN event_edges as g - ON g.event_id = e.event_id + INNER JOIN event_edges AS edge + ON edge.event_id = event.event_id /** * We find the "oldest" events in the room by looking for * events connected to backwards extremeties (oldest events * in the room that we know of so far). */ - INNER JOIN event_backward_extremities as b - ON g.prev_event_id = b.event_id - WHERE b.room_id = ? AND g.is_state is ? - GROUP BY b.event_id + INNER JOIN event_backward_extremities AS backward_extrem + ON edge.prev_event_id = backward_extrem.event_id + /** + * We use this info to make sure we don't retry to use a backfill point + * if we've already attempted to backfill from it recently. + */ + LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info + ON + failed_backfill_attempt_info.room_id = backward_extrem.room_id + AND failed_backfill_attempt_info.event_id = backward_extrem.event_id + WHERE + backward_extrem.room_id = ? + /* We only care about non-state edges because we used to use + * `event_edges` for two different sorts of "edges" (the current + * event DAG, but also a link to the previous state, for state + * events). These legacy state event edges can be distinguished by + * `is_state` and are removed from the codebase and schema but + * because the schema change is in a background update, it's not + * necessarily safe to assume that it will have been completed. + */ + AND edge.is_state is ? /* False */ + /** + * We only want backwards extremities that are older than or at + * the same position of the given `current_depth` (where older + * means less than the given depth) because we're looking backwards + * from the `current_depth` when backfilling. + * + * current_depth (ignore events that come after this, ignore 2-4) + * | + * â–¼ + * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time> + */ + AND event.depth <= ? /* current_depth */ + /** + * Exponential back-off (up to the upper bound) so we don't retry the + * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ( + failed_backfill_attempt_info.event_id IS NULL + OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + ( + (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */)) + * ? /* step */ + ) + ) + /** + * Sort from highest (closest to the `current_depth`) to the lowest depth + * because the closest are most relevant to backfill from first. + * Then tie-break on alphabetical order of the event_ids so we get a + * consistent ordering which is nice when asserting things in tests. + */ + ORDER BY event.depth DESC, backward_extrem.event_id DESC + LIMIT ? """ - txn.execute(sql, (room_id, False)) + txn.execute( + sql, + ( + room_id, + False, + current_depth, + self._clock.time_msec(), + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, + limit, + ), + ) return cast(List[Tuple[str, int]], txn.fetchall()) return await self.db_pool.runInteraction( - "get_oldest_event_ids_with_depth_in_room", - get_oldest_event_ids_with_depth_in_room_txn, + "get_backfill_points_in_room", + get_backfill_points_in_room_txn, room_id, ) + @trace async def get_insertion_event_backward_extremities_in_room( - self, room_id: str + self, + room_id: str, + current_depth: int, + limit: int, ) -> List[Tuple[str, int]]: - """Get the insertion events we know about that we haven't backfilled yet. - - We use this function so that we can compare and see if someones current - depth at their current scrollback is within pagination range of the - insertion event. If the current depth is close to the depth of given - insertion event, we can trigger a backfill. + """ + Get the insertion events we know about that we haven't backfilled yet + along with the approximate depth. Only returns insertion events that are + at a depth lower than or equal to the `current_depth`. Sorted by depth, + highest to lowest (descending) so the closest events to the + `current_depth` are first in the list. + + We ignore insertion events that are newer than the user's current scroll + position (ie, those with depth greater than `current_depth`) as: + 1. we don't really care about getting events that have happened + after our current position; and + 2. by the nature of paginating and scrolling back, we have likely + previously tried and failed to backfill from that insertion event, so + to avoid getting "stuck" requesting the same backfill repeatedly + we drop those insertion event. Args: room_id: Room where we want to find the oldest events + current_depth: The depth at the user's current scrollback position + limit: The max number of insertion event extremities to return Returns: - List of (event_id, depth) tuples + List of (event_id, depth) tuples. Sorted by depth, highest to lowest + (descending) so the closest events to the `current_depth` are first + in the list. """ def get_insertion_event_backward_extremities_in_room_txn( txn: LoggingTransaction, room_id: str ) -> List[Tuple[str, int]]: - sql = """ - SELECT b.event_id, MAX(e.depth) FROM insertion_events as i + if isinstance(self.database_engine, PostgresEngine): + least_function = "LEAST" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "MIN" + else: + raise RuntimeError("Unknown database engine") + + sql = f""" + SELECT + insertion_event_extremity.event_id, event.depth /* We only want insertion events that are also marked as backwards extremities */ - INNER JOIN insertion_event_extremities as b USING (event_id) + FROM insertion_event_extremities AS insertion_event_extremity /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - WHERE b.room_id = ? - GROUP BY b.event_id + INNER JOIN events AS event USING (event_id) + /** + * We use this info to make sure we don't retry to use a backfill point + * if we've already attempted to backfill from it recently. + */ + LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info + ON + failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id + AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id + WHERE + insertion_event_extremity.room_id = ? + /** + * We only want extremities that are older than or at + * the same position of the given `current_depth` (where older + * means less than the given depth) because we're looking backwards + * from the `current_depth` when backfilling. + * + * current_depth (ignore events that come after this, ignore 2-4) + * | + * â–¼ + * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time> + */ + AND event.depth <= ? /* current_depth */ + /** + * Exponential back-off (up to the upper bound) so we don't retry the + * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ( + failed_backfill_attempt_info.event_id IS NULL + OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + ( + (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */)) + * ? /* step */ + ) + ) + /** + * Sort from highest (closest to the `current_depth`) to the lowest depth + * because the closest are most relevant to backfill from first. + * Then tie-break on alphabetical order of the event_ids so we get a + * consistent ordering which is nice when asserting things in tests. + */ + ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC + LIMIT ? """ - txn.execute(sql, (room_id,)) + txn.execute( + sql, + ( + room_id, + current_depth, + self._clock.time_msec(), + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, + limit, + ), + ) return cast(List[Tuple[str, int]], txn.fetchall()) return await self.db_pool.runInteraction( @@ -970,6 +1168,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return int(min_depth) if min_depth is not None else None + @cancellable async def get_forward_extremities_for_room_at_stream_ordering( self, room_id: str, stream_ordering: int ) -> List[str]: @@ -1286,6 +1485,105 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return event_id_results + @trace + async def record_event_failed_pull_attempt( + self, room_id: str, event_id: str, cause: str + ) -> None: + """ + Record when we fail to pull an event over federation. + + This information allows us to be more intelligent when we decide to + retry (we don't need to fail over and over) and we can process that + event in the background so we don't block on it each time. + + Args: + room_id: The room where the event failed to pull from + event_id: The event that failed to be fetched or processed + cause: The error message or reason that we failed to pull the event + """ + logger.debug( + "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s", + room_id, + event_id, + cause, + ) + await self.db_pool.runInteraction( + "record_event_failed_pull_attempt", + self._record_event_failed_pull_attempt_upsert_txn, + room_id, + event_id, + cause, + db_autocommit=True, # Safe as it's a single upsert + ) + + def _record_event_failed_pull_attempt_upsert_txn( + self, + txn: LoggingTransaction, + room_id: str, + event_id: str, + cause: str, + ) -> None: + sql = """ + INSERT INTO event_failed_pull_attempts ( + room_id, event_id, num_attempts, last_attempt_ts, last_cause + ) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (room_id, event_id) DO UPDATE SET + num_attempts=event_failed_pull_attempts.num_attempts + 1, + last_attempt_ts=EXCLUDED.last_attempt_ts, + last_cause=EXCLUDED.last_cause; + """ + + txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + + @trace + async def get_event_ids_to_not_pull_from_backoff( + self, + room_id: str, + event_ids: Collection[str], + ) -> List[str]: + """ + Filter down the events to ones that we've failed to pull before recently. Uses + exponential backoff. + + Args: + room_id: The room that the events belong to + event_ids: A list of events to filter down + + Returns: + List of event_ids that should not be attempted to be pulled + """ + event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=( + "event_id", + "last_attempt_ts", + "num_attempts", + ), + desc="get_event_ids_to_not_pull_from_backoff", + ) + + current_time = self._clock.time_msec() + return [ + event_failed_pull_attempt["event_id"] + for event_failed_pull_attempt in event_failed_pull_attempts + # Exponential back-off (up to the upper bound) so we don't try to + # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + if current_time + < event_failed_pull_attempt["last_attempt_ts"] + + ( + 2 + ** min( + event_failed_pull_attempt["num_attempts"], + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + ) + ) + * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS + ] + async def get_missing_events( self, room_id: str, @@ -1339,6 +1637,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas event_results.reverse() return event_results + @trace + @tag_args async def get_successor_events(self, event_id: str) -> List[str]: """Fetch all events that have the given event as a prev event @@ -1375,6 +1675,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas _delete_old_forward_extrem_cache_txn, ) + @trace async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: await self.db_pool.simple_upsert( table="insertion_event_extremities", @@ -1483,7 +1784,12 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas self, room_id: str, ) -> Optional[Tuple[str, str]]: - """Get the next event ID in the staging area for the given room.""" + """ + Get the next event ID in the staging area for the given room. + + Returns: + Tuple of the `origin` and `event_id` + """ def _get_next_staged_event_id_for_room_txn( txn: LoggingTransaction, @@ -1597,7 +1903,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas logger.info("Invalid prev_events for %s", event_id) continue - if room_version.event_format == EventFormatVersions.V1: + if room_version.event_format == EventFormatVersions.ROOM_V1_V2: for prev_event_tuple in prev_events: if ( not isinstance(prev_event_tuple, list) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index dd2627037c..b283ab0f9c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -12,14 +12,85 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +"""Responsible for storing and fetching push actions / notifications. + +There are two main uses for push actions: + 1. Sending out push to a user's device; and + 2. Tracking per-room per-user notification counts (used in sync requests). + +For the former we simply use the `event_push_actions` table, which contains all +the calculated actions for a given user (which were calculated by the +`BulkPushRuleEvaluator`). + +For the latter we could simply count the number of rows in `event_push_actions` +table for a given room/user, but in practice this is *very* heavyweight when +there were a large number of notifications (due to e.g. the user never reading a +room). Plus, keeping all push actions indefinitely uses a lot of disk space. + +To fix these issues, we add a new table `event_push_summary` that tracks +per-user per-room counts of all notifications that happened before a stream +ordering S. Thus, to get the notification count for a user / room we can simply +query a single row in `event_push_summary` and count the number of rows in +`event_push_actions` with a stream ordering larger than S (and as long as S is +"recent", the number of rows needing to be scanned will be small). + +The `event_push_summary` table is updated via a background job that periodically +chooses a new stream ordering S' (usually the latest stream ordering), counts +all notifications in `event_push_actions` between the existing S and S', and +adds them to the existing counts in `event_push_summary`. + +This allows us to delete old rows from `event_push_actions` once those rows have +been counted and added to `event_push_summary` (we call this process +"rotation"). + + +We need to handle when a user sends a read receipt to the room. Again this is +done as a background process. For each receipt we clear the row in +`event_push_summary` and count the number of notifications in +`event_push_actions` that happened after the receipt but before S, and insert +that count into `event_push_summary` (If the receipt happened *after* S then we +simply clear the `event_push_summary`.) + +Note that its possible that if the read receipt is for an old event the relevant +`event_push_actions` rows will have been rotated and we get the wrong count +(it'll be too low). We accept this as a rare edge case that is unlikely to +impact the user much (since the vast majority of read receipts will be for the +latest event). + +The last complication is to handle the race where we request the notifications +counts after a user sends a read receipt into the room, but *before* the +background update handles the receipt (without any special handling the counts +would be outdated). We fix this by including in `event_push_summary` the read +receipt we used when updating `event_push_summary`, and every time we query the +table we check if that matches the most recent read receipt in the room. If yes, +continue as above, if not we simply query the `event_push_actions` table +directly. + +Since read receipts are almost always for recent events, scanning the +`event_push_actions` table in this case is unlikely to be a problem. Even if it +is a problem, it is temporary until the background job handles the new read +receipt. +""" + import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + List, + Mapping, + Optional, + Tuple, + Union, + cast, +) import attr -from synapse.api.constants import ReceiptTypes +from synapse.api.constants import MAIN_TIMELINE, ReceiptTypes from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -27,6 +98,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -47,6 +119,32 @@ DEFAULT_HIGHLIGHT_ACTION: List[Union[dict, str]] = [ ] +@attr.s(slots=True, auto_attribs=True) +class _RoomReceipt: + """ + HttpPushAction instances include the information used to generate HTTP + requests to a push gateway. + """ + + unthreaded_stream_ordering: int = 0 + # threaded_stream_ordering includes the main pseudo-thread. + threaded_stream_ordering: Dict[str, int] = attr.Factory(dict) + + def is_unread(self, thread_id: str, stream_ordering: int) -> bool: + """Returns True if the stream ordering is unread according to the receipt information.""" + + # Only include push actions with a stream ordering after both the unthreaded + # and threaded receipt. Properly handles a user without any receipts present. + return ( + self.unthreaded_stream_ordering < stream_ordering + and self.threaded_stream_ordering.get(thread_id, 0) < stream_ordering + ) + + +# A _RoomReceipt with no receipts in it. +MISSING_ROOM_RECEIPT = _RoomReceipt() + + @attr.s(slots=True, frozen=True, auto_attribs=True) class HttpPushAction: """ @@ -85,7 +183,7 @@ class UserPushAction(EmailPushAction): @attr.s(slots=True, auto_attribs=True) class NotifCounts: """ - The per-user, per-room count of notifications. Used by sync and push. + The per-user, per-room, per-thread count of notifications. Used by sync and push. """ notify_count: int = 0 @@ -93,7 +191,24 @@ class NotifCounts: highlight_count: int = 0 -def _serialize_action(actions: List[Union[dict, str]], is_highlight: bool) -> str: +@attr.s(slots=True, auto_attribs=True) +class RoomNotifCounts: + """ + The per-user, per-room count of notifications. Used by sync and push. + """ + + main_timeline: NotifCounts + # Map of thread ID to the notification counts. + threads: Dict[str, NotifCounts] + + def __len__(self) -> int: + # To properly account for the amount of space in any caches. + return len(self.threads) + 1 + + +def _serialize_action( + actions: Collection[Union[Mapping, str]], is_highlight: bool +) -> str: """Custom serializer for actions. This allows us to "compress" common actions. We use the fact that most users have the same actions for notifs (and for @@ -131,6 +246,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ): super().__init__(database, db_conn, hs) + # Track when the process started. + self._started_ts = self._clock.time_msec() + # These get correctly set by _find_stream_orderings_for_times_txn self.stream_ordering_month_ago: Optional[int] = None self.stream_ordering_day_ago: Optional[int] = None @@ -150,6 +268,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas self._rotate_notifs, 30 * 1000 ) + self._clear_old_staging_loop = self._clock.looping_call( + self._clear_old_push_actions_staging, 30 * 60 * 1000 + ) + self.db_pool.updates.register_background_index_update( "event_push_summary_unique_index", index_name="event_push_summary_unique_index", @@ -159,14 +281,196 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas replaces_index="event_push_summary_user_rm", ) - @cached(tree=True, max_entries=5000) + self.db_pool.updates.register_background_index_update( + "event_push_summary_unique_index2", + index_name="event_push_summary_unique_index2", + table="event_push_summary", + columns=["user_id", "room_id", "thread_id"], + unique=True, + ) + + self.db_pool.updates.register_background_update_handler( + "event_push_backfill_thread_id", + self._background_backfill_thread_id, + ) + + # Indexes which will be used to quickly make the thread_id column non-null. + self.db_pool.updates.register_background_index_update( + "event_push_actions_thread_id_null", + index_name="event_push_actions_thread_id_null", + table="event_push_actions", + columns=["thread_id"], + where_clause="thread_id IS NULL", + ) + self.db_pool.updates.register_background_index_update( + "event_push_summary_thread_id_null", + index_name="event_push_summary_thread_id_null", + table="event_push_summary", + columns=["thread_id"], + where_clause="thread_id IS NULL", + ) + + # Check ASAP (and then later, every 1s) to see if we have finished + # background updates the event_push_actions and event_push_summary tables. + self._clock.call_later(0.0, self._check_event_push_backfill_thread_id) + self._event_push_backfill_thread_id_done = False + + @wrap_as_background_process("check_event_push_backfill_thread_id") + async def _check_event_push_backfill_thread_id(self) -> None: + """ + Has thread_id finished backfilling? + + If not, we need to just-in-time update it so the queries work. + """ + done = await self.db_pool.updates.has_completed_background_update( + "event_push_backfill_thread_id" + ) + + if done: + self._event_push_backfill_thread_id_done = True + else: + # Reschedule to run. + self._clock.call_later(15.0, self._check_event_push_backfill_thread_id) + + async def _background_backfill_thread_id( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Fill in the thread_id field for event_push_actions and event_push_summary. + + This is preparatory so that it can be made non-nullable in the future. + + Because all current (null) data is done in an unthreaded manner this + simply assumes it is on the "main" timeline. Since event_push_actions + are periodically cleared it is not possible to correctly re-calculate + the thread_id. + """ + event_push_actions_done = progress.get("event_push_actions_done", False) + + def add_thread_id_txn( + txn: LoggingTransaction, start_stream_ordering: int + ) -> int: + sql = """ + SELECT stream_ordering + FROM event_push_actions + WHERE + thread_id IS NULL + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT ? + """ + txn.execute(sql, (start_stream_ordering, batch_size)) + + # No more rows to process. + rows = txn.fetchall() + if not rows: + progress["event_push_actions_done"] = True + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + return 0 + + # Update the thread ID for any of those rows. + max_stream_ordering = rows[-1][0] + + sql = """ + UPDATE event_push_actions + SET thread_id = 'main' + WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL + """ + txn.execute( + sql, + ( + start_stream_ordering, + max_stream_ordering, + ), + ) + + # Update progress. + processed_rows = txn.rowcount + progress["max_event_push_actions_stream_ordering"] = max_stream_ordering + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + + return processed_rows + + def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: + min_user_id = progress.get("max_summary_user_id", "") + min_room_id = progress.get("max_summary_room_id", "") + + # Slightly overcomplicated query for getting the Nth user ID / room + # ID tuple, or the last if there are less than N remaining. + sql = """ + SELECT user_id, room_id FROM ( + SELECT user_id, room_id FROM event_push_summary + WHERE (user_id, room_id) > (?, ?) + AND thread_id IS NULL + ORDER BY user_id, room_id + LIMIT ? + ) AS e + ORDER BY user_id DESC, room_id DESC + LIMIT 1 + """ + + txn.execute(sql, (min_user_id, min_room_id, batch_size)) + row = txn.fetchone() + if not row: + return 0 + + max_user_id, max_room_id = row + + sql = """ + UPDATE event_push_summary + SET thread_id = 'main' + WHERE + (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) + AND thread_id IS NULL + """ + txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) + processed_rows = txn.rowcount + + progress["max_summary_user_id"] = max_user_id + progress["max_summary_room_id"] = max_room_id + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + + return processed_rows + + # First update the event_push_actions table, then the event_push_summary table. + # + # Note that the event_push_actions_staging table is ignored since it is + # assumed that items in that table will only exist for a short period of + # time. + if not event_push_actions_done: + result = await self.db_pool.runInteraction( + "event_push_backfill_thread_id", + add_thread_id_txn, + progress.get("max_event_push_actions_stream_ordering", 0), + ) + else: + result = await self.db_pool.runInteraction( + "event_push_backfill_thread_id", + add_thread_id_summary_txn, + ) + + # Only done after the event_push_summary table is done. + if not result: + await self.db_pool.updates._end_background_update( + "event_push_backfill_thread_id" + ) + + return result + + @cached(tree=True, max_entries=5000, iterable=True) async def get_unread_event_push_actions_by_room_for_user( self, room_id: str, user_id: str, - ) -> NotifCounts: + ) -> RoomNotifCounts: """Get the notification count, the highlight count and the unread message count - for a given user in a given room after the given read receipt. + for a given user in a given room after their latest read receipt. Note that this function assumes the user to be a current member of the room, since it's either called by the sync handler to handle joined room entries, or by @@ -177,9 +481,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas user_id: The user to retrieve the counts for. Returns - A dict containing the counts mentioned earlier in this docstring, - respectively under the keys "notify_count", "highlight_count" and - "unread_count". + A RoomNotifCounts object containing the notification count, the + highlight count and the unread message count for both the main timeline + and threads. """ return await self.db_pool.runInteraction( "get_unread_event_push_actions_by_room", @@ -193,21 +497,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas txn: LoggingTransaction, room_id: str, user_id: str, - ) -> NotifCounts: - result = self.get_last_receipt_for_user_txn( + ) -> RoomNotifCounts: + # Get the stream ordering of the user's latest receipt in the room. + result = self.get_last_unthreaded_receipt_for_user_txn( txn, user_id, room_id, receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) - stream_ordering = None if result: _, stream_ordering = result - if stream_ordering is None: - # Either last_read_event_id is None, or it's an event we don't have (e.g. - # because it's been purged), in which case retrieve the stream ordering for + else: + # If the user has no receipts in the room, retrieve the stream ordering for # the latest membership event from this user in this room (which we assume is # a join). event_id = self.db_pool.simple_select_one_onecol_txn( @@ -224,72 +527,236 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ) def _get_unread_counts_by_pos_txn( - self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int - ) -> NotifCounts: + self, + txn: LoggingTransaction, + room_id: str, + user_id: str, + unthreaded_receipt_stream_ordering: int, + ) -> RoomNotifCounts: """Get the number of unread messages for a user/room that have happened since the given stream ordering. + + Args: + txn: The database transaction. + room_id: The room ID to get unread counts for. + user_id: The user ID to get unread counts for. + unthreaded_receipt_stream_ordering: The stream ordering of the user's latest + unthreaded receipt in the room. If there are no unthreaded receipts, + the stream ordering of the user's join event. + + Returns: + A RoomNotifCounts object containing the notification count, the + highlight count and the unread message count for both the main timeline + and threads. """ - counts = NotifCounts() + main_counts = NotifCounts() + thread_counts: Dict[str, NotifCounts] = {} + + def _get_thread(thread_id: str) -> NotifCounts: + if thread_id == MAIN_TIMELINE: + return main_counts + return thread_counts.setdefault(thread_id, NotifCounts()) + + receipt_types_clause, receipts_args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) + + # First ensure that the existing rows have an updated thread_id field. + if not self._event_push_backfill_thread_id_done: + txn.execute( + """ + UPDATE event_push_summary + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + txn.execute( + """ + UPDATE event_push_actions + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) # First we pull the counts from the summary table. # - # We check that `last_receipt_stream_ordering` matches the stream - # ordering given. If it doesn't match then a new read receipt has arrived and - # we haven't yet updated the counts in `event_push_summary` to reflect - # that; in that case we simply ignore `event_push_summary` counts - # and do a manual count of all of the rows in the `event_push_actions` table - # for this user/room. + # We check that `last_receipt_stream_ordering` matches the stream ordering of the + # latest receipt for the thread (which may be either the unthreaded read receipt + # or the threaded read receipt). + # + # If it doesn't match then a new read receipt has arrived and we haven't yet + # updated the counts in `event_push_summary` to reflect that; in that case we + # simply ignore `event_push_summary` counts. + # + # We then do a manual count of all the rows in the `event_push_actions` table + # for any user/room/thread which did not have a valid summary found. # - # If `last_receipt_stream_ordering` is null then that means it's up to - # date (as the row was written by an older version of Synapse that + # If `last_receipt_stream_ordering` is null then that means it's up-to-date + # (as the row was written by an older version of Synapse that # updated `event_push_summary` synchronously when persisting a new read # receipt). txn.execute( - """ - SELECT stream_ordering, notif_count, COALESCE(unread_count, 0) + f""" + SELECT notif_count, COALESCE(unread_count, 0), thread_id FROM event_push_summary + LEFT JOIN ( + SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + user_id = ? + AND room_id = ? + AND stream_ordering > ? + AND {receipt_types_clause} + GROUP BY thread_id + ) AS receipts USING (thread_id) WHERE room_id = ? AND user_id = ? AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > ?) - OR last_receipt_stream_ordering = ? - ) + (last_receipt_stream_ordering IS NULL AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)) + OR last_receipt_stream_ordering = COALESCE(threaded_receipt_stream_ordering, ?) + ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0) """, - (room_id, user_id, stream_ordering, stream_ordering), + ( + user_id, + room_id, + unthreaded_receipt_stream_ordering, + *receipts_args, + room_id, + user_id, + unthreaded_receipt_stream_ordering, + unthreaded_receipt_stream_ordering, + ), ) - row = txn.fetchone() - - summary_stream_ordering = 0 - if row: - summary_stream_ordering = row[0] - counts.notify_count += row[1] - counts.unread_count += row[2] - - # Next we need to count highlights, which aren't summarized - sql = """ - SELECT COUNT(*) FROM event_push_actions + summarised_threads = set() + for notif_count, unread_count, thread_id in txn: + summarised_threads.add(thread_id) + counts = _get_thread(thread_id) + counts.notify_count += notif_count + counts.unread_count += unread_count + + # Next we need to count highlights, which aren't summarised + sql = f""" + SELECT COUNT(*), thread_id FROM event_push_actions + LEFT JOIN ( + SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + user_id = ? + AND room_id = ? + AND stream_ordering > ? + AND {receipt_types_clause} + GROUP BY thread_id + ) AS receipts USING (thread_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > ? + AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?) AND highlight = 1 + GROUP BY thread_id """ - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() - if row: - counts.highlight_count += row[0] + txn.execute( + sql, + ( + user_id, + room_id, + unthreaded_receipt_stream_ordering, + *receipts_args, + user_id, + room_id, + unthreaded_receipt_stream_ordering, + ), + ) + for highlight_count, thread_id in txn: + _get_thread(thread_id).highlight_count += highlight_count - # Finally we need to count push actions that aren't included in the - # summary returned above, e.g. recent events that haven't been - # summarized yet, or the summary is empty due to a recent read receipt. - stream_ordering = max(stream_ordering, summary_stream_ordering) - notify_count, unread_count = self._get_notif_unread_count_for_user_room( - txn, room_id, user_id, stream_ordering + # For threads which were summarised we need to count actions since the last + # rotation. + thread_id_clause, thread_id_args = make_in_list_sql_clause( + self.database_engine, "thread_id", summarised_threads ) - counts.notify_count += notify_count - counts.unread_count += unread_count + # The (inclusive) event stream ordering that was previously summarised. + rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) - return counts + unread_counts = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, rotated_upto_stream_ordering + ) + for notif_count, unread_count, thread_id in unread_counts: + if thread_id not in summarised_threads: + continue + + if thread_id == MAIN_TIMELINE: + counts.notify_count += notif_count + counts.unread_count += unread_count + elif thread_id in thread_counts: + thread_counts[thread_id].notify_count += notif_count + thread_counts[thread_id].unread_count += unread_count + else: + # Previous thread summaries of 0 are discarded above. + # + # TODO If empty summaries are deleted this can be removed. + thread_counts[thread_id] = NotifCounts( + notify_count=notif_count, + unread_count=unread_count, + highlight_count=0, + ) + + # Finally we need to count push actions that aren't included in the + # summary returned above. This might be due to recent events that haven't + # been summarised yet or the summary is out of date due to a recent read + # receipt. + sql = f""" + SELECT + COUNT(CASE WHEN notif = 1 THEN 1 END), + COUNT(CASE WHEN unread = 1 THEN 1 END), + thread_id + FROM event_push_actions + LEFT JOIN ( + SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + user_id = ? + AND room_id = ? + AND stream_ordering > ? + AND {receipt_types_clause} + GROUP BY thread_id + ) AS receipts USING (thread_id) + WHERE user_id = ? + AND room_id = ? + AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?) + AND NOT {thread_id_clause} + GROUP BY thread_id + """ + txn.execute( + sql, + ( + user_id, + room_id, + unthreaded_receipt_stream_ordering, + *receipts_args, + user_id, + room_id, + unthreaded_receipt_stream_ordering, + *thread_id_args, + ), + ) + for notif_count, unread_count, thread_id in txn: + counts = _get_thread(thread_id) + counts.notify_count += notif_count + counts.unread_count += unread_count + + return RoomNotifCounts(main_counts, thread_counts) def _get_notif_unread_count_for_user_room( self, @@ -298,48 +765,70 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas user_id: str, stream_ordering: int, max_stream_ordering: Optional[int] = None, - ) -> Tuple[int, int]: + thread_id: Optional[str] = None, + ) -> List[Tuple[int, int, str]]: """Returns the notify and unread counts from `event_push_actions` for the given user/room in the given range. Does not consult `event_push_summary` table, which may include push actions that have been deleted from `event_push_actions` table. + + Args: + txn: The database transaction. + room_id: The room ID to get unread counts for. + user_id: The user ID to get unread counts for. + stream_ordering: The (exclusive) minimum stream ordering to consider. + max_stream_ordering: The (inclusive) maximum stream ordering to consider. + If this is not given, then no maximum is applied. + thread_id: The thread ID to fetch unread counts for. If this is not provided + then the results for *all* threads is returned. + + Note that if this is provided the resulting list will only have 0 or + 1 tuples in it. + + Return: + A tuple of the notif count and unread count in the given range for + each thread. """ # If there have been no events in the room since the stream ordering, # there can't be any push actions either. if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering): - return 0, 0 + return [] - clause = "" + stream_ordering_clause = "" args = [user_id, room_id, stream_ordering] if max_stream_ordering is not None: - clause = "AND ea.stream_ordering <= ?" + stream_ordering_clause = "AND ea.stream_ordering <= ?" args.append(max_stream_ordering) # If the max stream ordering is less than the min stream ordering, # then obviously there are zero push actions in that range. if max_stream_ordering <= stream_ordering: - return 0, 0 + return [] + + # Either limit the results to a specific thread or fetch all threads. + thread_id_clause = "" + if thread_id is not None: + thread_id_clause = "AND thread_id = ?" + args.append(thread_id) sql = f""" SELECT COUNT(CASE WHEN notif = 1 THEN 1 END), - COUNT(CASE WHEN unread = 1 THEN 1 END) - FROM event_push_actions ea - WHERE user_id = ? + COUNT(CASE WHEN unread = 1 THEN 1 END), + thread_id + FROM event_push_actions ea + WHERE user_id = ? AND room_id = ? AND ea.stream_ordering > ? - {clause} + {stream_ordering_clause} + {thread_id_clause} + GROUP BY thread_id """ txn.execute(sql, args) - row = txn.fetchone() - - if row: - return cast(Tuple[int, int], row) - - return 0, 0 + return cast(List[Tuple[int, int, str]], txn.fetchall()) async def get_push_action_users_in_range( self, min_stream_ordering: int, max_stream_ordering: int @@ -354,6 +843,49 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas return await self.db_pool.runInteraction("get_push_action_users_in_range", f) + def _get_receipts_by_room_txn( + self, txn: LoggingTransaction, user_id: str + ) -> Dict[str, _RoomReceipt]: + """ + Generate a map of room ID to the latest stream ordering that has been + read by the given user. + + Args: + txn: + user_id: The user to fetch receipts for. + + Returns: + A map including all rooms the user is in with a receipt. It maps + room IDs to _RoomReceipt instances + """ + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) + + sql = f""" + SELECT room_id, thread_id, MAX(stream_ordering) + FROM receipts_linearized + INNER JOIN events USING (room_id, event_id) + WHERE {receipt_types_clause} + AND user_id = ? + GROUP BY room_id, thread_id + """ + + args.extend((user_id,)) + txn.execute(sql, args) + + result: Dict[str, _RoomReceipt] = {} + for room_id, thread_id, stream_ordering in txn: + room_receipt = result.setdefault(room_id, _RoomReceipt()) + if thread_id is None: + room_receipt.unthreaded_stream_ordering = stream_ordering + else: + room_receipt.threaded_stream_ordering[thread_id] = stream_ordering + + return result + async def get_unread_push_actions_for_user_in_range_for_http( self, user_id: str, @@ -376,81 +908,45 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas The list will be ordered by ascending stream_ordering. The list will have between 0~limit entries. """ - # find rooms that have a read receipt in them and return the next - # push actions - def get_after_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool]]: - # find rooms that have a read receipt in them and return the next - # push actions - sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " ep.highlight " - " FROM (" - " SELECT room_id," - " MAX(stream_ordering) as stream_ordering" - " FROM events" - " INNER JOIN receipts_linearized USING (room_id, event_id)" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - ") AS rl," - " event_push_actions AS ep" - " WHERE" - " ep.room_id = rl.room_id" - " AND ep.stream_ordering > rl.stream_ordering" - " AND ep.user_id = ?" - " AND ep.stream_ordering > ?" - " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" - " ORDER BY ep.stream_ordering ASC LIMIT ?" - ) - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) - after_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt + receipts_by_room = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http_receipts", + self._get_receipts_by_room_txn, + user_id=user_id, ) - # There are rooms with push actions in them but you don't have a read receipt in - # them e.g. rooms you've been invited to, so get push actions for rooms which do - # not have read receipts in them too. - def get_no_receipt( + def get_push_actions_txn( txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool]]: - sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " ep.highlight " - " FROM event_push_actions AS ep" - " INNER JOIN events AS e USING (room_id, event_id)" - " WHERE" - " ep.room_id NOT IN (" - " SELECT room_id FROM receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - " )" - " AND ep.user_id = ?" - " AND ep.stream_ordering > ?" - " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" - " ORDER BY ep.stream_ordering ASC LIMIT ?" - ) - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall()) + ) -> List[Tuple[str, str, str, int, str, bool]]: + sql = """ + SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering, + ep.actions, ep.highlight + FROM event_push_actions AS ep + WHERE + ep.user_id = ? + AND ep.stream_ordering > ? + AND ep.stream_ordering <= ? + AND ep.notif = 1 + ORDER BY ep.stream_ordering ASC LIMIT ? + """ + txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) + return cast(List[Tuple[str, str, str, int, str, bool]], txn.fetchall()) - no_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + push_actions = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_http", get_push_actions_txn ) notifs = [ HttpPushAction( - event_id=row[0], - room_id=row[1], - stream_ordering=row[2], - actions=_deserialize_action(row[3], row[4]), + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + actions=_deserialize_action(actions, highlight), + ) + for event_id, room_id, thread_id, stream_ordering, actions, highlight in push_actions + if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread( + thread_id, stream_ordering ) - for row in after_read_receipt + no_read_receipt ] # Now sort it so it's ordered correctly, since currently it will @@ -485,82 +981,48 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas The list will be ordered by descending received_ts. The list will have between 0~limit entries. """ - # find rooms that have a read receipt in them and return the most recent - # push actions - def get_after_receipt( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool, int]]: - sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " ep.highlight, e.received_ts" - " FROM (" - " SELECT room_id," - " MAX(stream_ordering) as stream_ordering" - " FROM events" - " INNER JOIN receipts_linearized USING (room_id, event_id)" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - ") AS rl," - " event_push_actions AS ep" - " INNER JOIN events AS e USING (room_id, event_id)" - " WHERE" - " ep.room_id = rl.room_id" - " AND ep.stream_ordering > rl.stream_ordering" - " AND ep.user_id = ?" - " AND ep.stream_ordering > ?" - " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" - " ORDER BY ep.stream_ordering DESC LIMIT ?" - ) - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) - after_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt + receipts_by_room = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email_receipts", + self._get_receipts_by_room_txn, + user_id=user_id, ) - # There are rooms with push actions in them but you don't have a read receipt in - # them e.g. rooms you've been invited to, so get push actions for rooms which do - # not have read receipts in them too. - def get_no_receipt( + def get_push_actions_txn( txn: LoggingTransaction, - ) -> List[Tuple[str, str, int, str, bool, int]]: - sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " ep.highlight, e.received_ts" - " FROM event_push_actions AS ep" - " INNER JOIN events AS e USING (room_id, event_id)" - " WHERE" - " ep.room_id NOT IN (" - " SELECT room_id FROM receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - " )" - " AND ep.user_id = ?" - " AND ep.stream_ordering > ?" - " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" - " ORDER BY ep.stream_ordering DESC LIMIT ?" - ) - args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] - txn.execute(sql, args) - return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall()) + ) -> List[Tuple[str, str, str, int, str, bool, int]]: + sql = """ + SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering, + ep.actions, ep.highlight, e.received_ts + FROM event_push_actions AS ep + INNER JOIN events AS e USING (room_id, event_id) + WHERE + ep.user_id = ? + AND ep.stream_ordering > ? + AND ep.stream_ordering <= ? + AND ep.notif = 1 + ORDER BY ep.stream_ordering DESC LIMIT ? + """ + txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) + return cast(List[Tuple[str, str, str, int, str, bool, int]], txn.fetchall()) - no_read_receipt = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt + push_actions = await self.db_pool.runInteraction( + "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn ) # Make a list of dicts from the two sets of results. notifs = [ EmailPushAction( - event_id=row[0], - room_id=row[1], - stream_ordering=row[2], - actions=_deserialize_action(row[3], row[4]), - received_ts=row[5], + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + actions=_deserialize_action(actions, highlight), + received_ts=received_ts, + ) + for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions + if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread( + thread_id, stream_ordering ) - for row in after_read_receipt + no_read_receipt ] # Now sort it so it's ordered correctly, since currently it will @@ -606,8 +1068,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas async def add_push_actions_to_staging( self, event_id: str, - user_id_actions: Dict[str, List[Union[dict, str]]], + user_id_actions: Dict[str, Collection[Union[Mapping, str]]], count_as_unread: bool, + thread_id: str, ) -> None: """Add the push actions for the event to the push action staging area. @@ -616,6 +1079,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas user_id_actions: A mapping of user_id to list of push actions, where an action can either be a string or dict. count_as_unread: Whether this event should increment unread counts. + thread_id: The thread this event is parent of, if applicable. """ if not user_id_actions: return @@ -623,8 +1087,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # This is a helper function for generating the necessary tuple that # can be used to insert into the `event_push_actions_staging` table. def _gen_entry( - user_id: str, actions: List[Union[dict, str]] - ) -> Tuple[str, str, str, int, int, int]: + user_id: str, actions: Collection[Union[Mapping, str]] + ) -> Tuple[str, str, str, int, int, int, str, int]: is_highlight = 1 if _action_has_highlight(actions) else 0 notif = 1 if "notify" in actions else 0 return ( @@ -634,28 +1098,27 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas notif, # notif column is_highlight, # highlight column int(count_as_unread), # unread column + thread_id, # thread_id column + self._clock.time_msec(), # inserted_ts column ) - def _add_push_actions_to_staging_txn(txn: LoggingTransaction) -> None: - # We don't use simple_insert_many here to avoid the overhead - # of generating lists of dicts. - - sql = """ - INSERT INTO event_push_actions_staging - (event_id, user_id, actions, notif, highlight, unread) - VALUES (?, ?, ?, ?, ?, ?) - """ - - txn.execute_batch( - sql, - ( - _gen_entry(user_id, actions) - for user_id, actions in user_id_actions.items() - ), - ) - - return await self.db_pool.runInteraction( - "add_push_actions_to_staging", _add_push_actions_to_staging_txn + await self.db_pool.simple_insert_many( + "event_push_actions_staging", + keys=( + "event_id", + "user_id", + "actions", + "notif", + "highlight", + "unread", + "thread_id", + "inserted_ts", + ), + values=[ + _gen_entry(user_id, actions) + for user_id, actions in user_id_actions.items() + ], + desc="add_push_actions_to_staging", ) async def remove_push_actions_from_staging(self, event_id: str) -> None: @@ -769,12 +1232,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # [10, <none>, 20], we should treat this as being equivalent to # [10, 10, 20]. # - sql = ( - "SELECT received_ts FROM events" - " WHERE stream_ordering <= ?" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ) + sql = """ + SELECT received_ts FROM events + WHERE stream_ordering <= ? + ORDER BY stream_ordering DESC + LIMIT 1 + """ while range_end - range_start > 0: middle = (range_end + range_start) // 2 @@ -802,14 +1265,14 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas self, stream_ordering: int ) -> Optional[int]: def f(txn: LoggingTransaction) -> Optional[Tuple[int]]: - sql = ( - "SELECT e.received_ts" - " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.stream_ordering > ? AND notif = 1" - " ORDER BY ep.stream_ordering ASC" - " LIMIT 1" - ) + sql = """ + SELECT e.received_ts + FROM event_push_actions AS ep + JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id + WHERE ep.stream_ordering > ? AND notif = 1 + ORDER BY ep.stream_ordering ASC + LIMIT 1 + """ txn.execute(sql, (stream_ordering,)) return cast(Optional[Tuple[int]], txn.fetchone()) @@ -858,10 +1321,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas Any push actions which predate the user's most recent read receipt are now redundant, so we can remove them from `event_push_actions` and update `event_push_summary`. + + Returns true if all new receipts have been processed. """ limit = 100 + # The (inclusive) receipt stream ID that was previously processed.. min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn( txn, table="event_push_summary_last_receipt_stream_id", @@ -871,8 +1337,16 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas max_receipts_stream_id = self._receipts_id_gen.get_current_token() + # The (inclusive) event stream ordering that was previously summarised. + old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + sql = """ - SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering + SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering FROM receipts_linearized AS r INNER JOIN events AS e USING (event_id) WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ? @@ -893,73 +1367,133 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas limit, ), ) - rows = txn.fetchall() - - old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( - txn, - table="event_push_summary_stream_ordering", - keyvalues={}, - retcol="stream_ordering", - ) + rows = cast(List[Tuple[int, str, str, Optional[str], int]], txn.fetchall()) # For each new read receipt we delete push actions from before it and # recalculate the summary. - for _, room_id, user_id, stream_ordering in rows: + # + # Care must be taken of whether it is a threaded or unthreaded receipt. + for _, room_id, user_id, thread_id, stream_ordering in rows: # Only handle our own read receipts. if not self.hs.is_mine_id(user_id): continue + thread_clause = "" + thread_args: Tuple = () + if thread_id is not None: + thread_clause = "AND thread_id = ?" + thread_args = (thread_id,) + + # For each new read receipt we delete push actions from before it and + # recalculate the summary. txn.execute( - """ + f""" DELETE FROM event_push_actions WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? AND highlight = 0 + {thread_clause} """, - (room_id, user_id, stream_ordering), + (room_id, user_id, stream_ordering, *thread_args), ) - notif_count, unread_count = self._get_notif_unread_count_for_user_room( - txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering + # First ensure that the existing rows have an updated thread_id field. + if not self._event_push_backfill_thread_id_done: + txn.execute( + """ + UPDATE event_push_summary + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + txn.execute( + """ + UPDATE event_push_actions + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + + # Fetch the notification counts between the stream ordering of the + # latest receipt and what was previously summarised. + unread_counts = self._get_notif_unread_count_for_user_room( + txn, + room_id, + user_id, + stream_ordering, + old_rotate_stream_ordering, + thread_id, ) - self.db_pool.simple_upsert_txn( + # For an unthreaded receipt, mark the summary for all threads in the room + # as cleared. + if thread_id is None: + self.db_pool.simple_update_txn( + txn, + table="event_push_summary", + keyvalues={"user_id": user_id, "room_id": room_id}, + updatevalues={ + "notif_count": 0, + "unread_count": 0, + "stream_ordering": old_rotate_stream_ordering, + "last_receipt_stream_ordering": stream_ordering, + }, + ) + + # For a threaded receipt, we *always* want to update that receipt, + # event if there are no new notifications in that thread. This ensures + # the stream_ordering & last_receipt_stream_ordering are updated. + elif not unread_counts: + unread_counts = [(0, 0, thread_id)] + + # Then any updated threads get their notification count and unread + # count updated. + self.db_pool.simple_update_many_txn( txn, table="event_push_summary", - keyvalues={"room_id": room_id, "user_id": user_id}, - values={ - "notif_count": notif_count, - "unread_count": unread_count, - "stream_ordering": old_rotate_stream_ordering, - "last_receipt_stream_ordering": stream_ordering, - }, + key_names=("room_id", "user_id", "thread_id"), + key_values=[(room_id, user_id, row[2]) for row in unread_counts], + value_names=( + "notif_count", + "unread_count", + "stream_ordering", + "last_receipt_stream_ordering", + ), + value_values=[ + (row[0], row[1], old_rotate_stream_ordering, stream_ordering) + for row in unread_counts + ], ) # We always update `event_push_summary_last_receipt_stream_id` to # ensure that we don't rescan the same receipts for remote users. - upper_limit = max_receipts_stream_id + receipts_last_processed_stream_id = max_receipts_stream_id if len(rows) >= limit: # If we pulled out a limited number of rows we only update the # position to the last receipt we processed, so we continue # processing the rest next iteration. - upper_limit = rows[-1][0] + receipts_last_processed_stream_id = rows[-1][0] self.db_pool.simple_update_txn( txn, table="event_push_summary_last_receipt_stream_id", keyvalues={}, - updatevalues={"stream_id": upper_limit}, + updatevalues={"stream_id": receipts_last_processed_stream_id}, ) return len(rows) < limit def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool: - """Archives older notifications into event_push_summary. Returns whether - the archiving process has caught up or not. + """Archives older notifications (from event_push_actions) into event_push_summary. + + Returns whether the archiving process has caught up or not. """ + # The (inclusive) event stream ordering that was previously summarised. old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( txn, table="event_push_summary_stream_ordering", @@ -974,7 +1508,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas SELECT stream_ordering FROM event_push_actions WHERE stream_ordering > ? ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? - """, + """, (old_rotate_stream_ordering, self._rotate_count), ) stream_row = txn.fetchone() @@ -993,39 +1527,62 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) - self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) + self._rotate_notifs_before_txn( + txn, old_rotate_stream_ordering, rotate_to_stream_ordering + ) return caught_up def _rotate_notifs_before_txn( - self, txn: LoggingTransaction, rotate_to_stream_ordering: int + self, + txn: LoggingTransaction, + old_rotate_stream_ordering: int, + rotate_to_stream_ordering: int, ) -> None: - old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( - txn, - table="event_push_summary_stream_ordering", - keyvalues={}, - retcol="stream_ordering", - ) + """Archives older notifications (from event_push_actions) into event_push_summary. + + Any event_push_actions between old_rotate_stream_ordering (exclusive) and + rotate_to_stream_ordering (inclusive) will be added to the event_push_summary + table. + + Args: + txn: The database transaction. + old_rotate_stream_ordering: The previous maximum event stream ordering. + rotate_to_stream_ordering: The new maximum event stream ordering to summarise. + """ + + # Ensure that any new actions have an updated thread_id. + if not self._event_push_backfill_thread_id_done: + txn.execute( + """ + UPDATE event_push_actions + SET thread_id = ? + WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL + """, + (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering), + ) + + # XXX Do we need to update summaries here too? # Calculate the new counts that should be upserted into event_push_summary sql = """ - SELECT user_id, room_id, + SELECT user_id, room_id, thread_id, coalesce(old.%s, 0) + upd.cnt, upd.stream_ordering FROM ( - SELECT user_id, room_id, count(*) as cnt, + SELECT user_id, room_id, thread_id, count(*) as cnt, max(ea.stream_ordering) as stream_ordering FROM event_push_actions AS ea - LEFT JOIN event_push_summary AS old USING (user_id, room_id) + LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id) WHERE ? < ea.stream_ordering AND ea.stream_ordering <= ? AND ( old.last_receipt_stream_ordering IS NULL OR old.last_receipt_stream_ordering < ea.stream_ordering ) AND %s = 1 - GROUP BY user_id, room_id + GROUP BY user_id, room_id, thread_id ) AS upd - LEFT JOIN event_push_summary AS old USING (user_id, room_id) + LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id) """ # First get the count of unread messages. @@ -1039,11 +1596,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # object because we might not have the same amount of rows in each of them. To do # this, we use a dict indexed on the user ID and room ID to make it easier to # populate. - summaries: Dict[Tuple[str, str], _EventPushSummary] = {} + summaries: Dict[Tuple[str, str, str], _EventPushSummary] = {} for row in txn: - summaries[(row[0], row[1])] = _EventPushSummary( - unread_count=row[2], - stream_ordering=row[3], + summaries[(row[0], row[1], row[2])] = _EventPushSummary( + unread_count=row[3], + stream_ordering=row[4], notif_count=0, ) @@ -1054,26 +1611,43 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas ) for row in txn: - if (row[0], row[1]) in summaries: - summaries[(row[0], row[1])].notif_count = row[2] + if (row[0], row[1], row[2]) in summaries: + summaries[(row[0], row[1], row[2])].notif_count = row[3] else: # Because the rules on notifying are different than the rules on marking # a message unread, we might end up with messages that notify but aren't # marked unread, so we might not have a summary for this (user, room) # tuple to complete. - summaries[(row[0], row[1])] = _EventPushSummary( + summaries[(row[0], row[1], row[2])] = _EventPushSummary( unread_count=0, - stream_ordering=row[3], - notif_count=row[2], + stream_ordering=row[4], + notif_count=row[3], ) logger.info("Rotating notifications, handling %d rows", len(summaries)) + # Ensure that any updated threads have the proper thread_id. + if not self._event_push_backfill_thread_id_done: + txn.execute_batch( + """ + UPDATE event_push_summary + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + [ + (MAIN_TIMELINE, room_id, user_id) + for user_id, room_id, _ in summaries + ], + ) + self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", - key_names=("user_id", "room_id"), - key_values=[(user_id, room_id) for user_id, room_id in summaries], + key_names=("user_id", "room_id", "thread_id"), + key_values=[ + (user_id, room_id, thread_id) + for user_id, room_id, thread_id in summaries + ], value_names=("notif_count", "unread_count", "stream_ordering"), value_values=[ ( @@ -1090,12 +1664,13 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (rotate_to_stream_ordering,), ) - async def _remove_old_push_actions_that_have_rotated( - self, - ) -> None: - """Clear out old push actions that have been summarized.""" + async def _remove_old_push_actions_that_have_rotated(self) -> None: + """ + Clear out old push actions that have been summarised (and are older than + 1 day ago). + """ - # We want to clear out anything that older than a day that *has* already + # We want to clear out anything that is older than a day that *has* already # been rotated. rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol( table="event_push_summary_stream_ordering", @@ -1119,7 +1694,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas SELECT stream_ordering FROM event_push_actions WHERE stream_ordering <= ? AND highlight = 0 ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? - """, + """, ( max_stream_ordering_to_delete, batch_size, @@ -1154,6 +1729,53 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas if done: break + @wrap_as_background_process("_clear_old_push_actions_staging") + async def _clear_old_push_actions_staging(self) -> None: + """Clear out any old event push actions from the staging table for + events that we failed to persist. + """ + + # We delete anything more than an hour old, on the assumption that we'll + # never take more than an hour to persist an event. + delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000 + + if self._started_ts > delete_before_ts: + # We need to wait for at least an hour before we started deleting, + # so that we know it's safe to delete rows with NULL `inserted_ts`. + return + + # We don't have an index on `inserted_ts`, instead we assume that the + # number of "live" rows in `event_push_actions_staging` is small enough + # that an infrequent periodic scan won't cause a problem. + # + # Note: we also delete any columns with NULL `inserted_ts`, this is safe + # as we added a default value to new rows and so they must be at least + # an hour old. + limit = 1000 + sql = """ + DELETE FROM event_push_actions_staging WHERE event_id IN ( + SELECT event_id FROM event_push_actions_staging WHERE + inserted_ts < ? OR inserted_ts IS NULL + LIMIT ? + ) + """ + + def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool: + txn.execute(sql, (delete_before_ts, limit)) + return txn.rowcount >= limit + + while True: + # Returns true if we have more stuff to delete from the table. + deleted = await self.db_pool.runInteraction( + "_clear_old_push_actions_staging", _clear_old_push_actions_staging_txn + ) + + if not deleted: + return + + # We sleep to ensure that we don't overwhelm the DB. + await self._clock.sleep(1.0) + class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" @@ -1188,7 +1810,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_actions", columns=["highlight", "stream_ordering"], where_clause="highlight=0", - psql_only=True, ) async def get_push_actions_for_user( @@ -1215,16 +1836,18 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # NB. This assumes event_ids are globally unique since # it makes the query easier to index - sql = ( - "SELECT epa.event_id, epa.room_id," - " epa.stream_ordering, epa.topological_ordering," - " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" - " FROM event_push_actions epa, events e" - " WHERE epa.event_id = e.event_id" - " AND epa.user_id = ? %s" - " AND epa.notif = 1" - " ORDER BY epa.stream_ordering DESC" - " LIMIT ?" % (before_clause,) + sql = """ + SELECT epa.event_id, epa.room_id, + epa.stream_ordering, epa.topological_ordering, + epa.actions, epa.highlight, epa.profile_tag, e.received_ts + FROM event_push_actions epa, events e + WHERE epa.event_id = e.event_id + AND epa.user_id = ? %s + AND epa.notif = 1 + ORDER BY epa.stream_ordering DESC + LIMIT ? + """ % ( + before_clause, ) txn.execute(sql, args) return cast( @@ -1247,7 +1870,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ] -def _action_has_highlight(actions: List[Union[dict, str]]) -> bool: +def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool: for action in actions: if not isinstance(action, dict): continue diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f600f1190..0f097a2927 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.opentracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], @@ -353,9 +355,9 @@ class PersistEventsStore: txn: LoggingTransaction, *, events_and_contexts: List[Tuple[EventBase, EventContext]], - inhibit_local_membership_updates: bool = False, - state_delta_for_room: Optional[Dict[str, DeltaState]] = None, - new_forward_extremities: Optional[Dict[str, Set[str]]] = None, + inhibit_local_membership_updates: bool, + state_delta_for_room: Dict[str, DeltaState], + new_forward_extremities: Dict[str, Set[str]], ) -> None: """Insert some number of room events into the necessary database tables. @@ -382,9 +384,6 @@ class PersistEventsStore: PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ - state_delta_for_room = state_delta_for_room or {} - new_forward_extremities = new_forward_extremities or {} - all_events_and_contexts = events_and_contexts min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering @@ -408,6 +407,31 @@ class PersistEventsStore: assert min_stream_order assert max_stream_order + # Once the txn completes, invalidate all of the relevant caches. Note that we do this + # up here because it captures all the events_and_contexts before any are removed. + for event, _ in events_and_contexts: + self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) + if event.redacts: + self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) + + relates_to = None + relation = relation_from_event(event) + if relation: + relates_to = relation.parent_id + + assert event.internal_metadata.stream_ordering is not None + txn.call_after( + self.store._invalidate_caches_for_event, + event.internal_metadata.stream_ordering, + event.event_id, + event.room_id, + event.type, + getattr(event, "state_key", None), + event.redacts, + relates_to, + backfilled=False, + ) + self._update_forward_extremities_txn( txn, new_forward_extremities=new_forward_extremities, @@ -457,6 +481,7 @@ class PersistEventsStore: # We call this last as it assumes we've inserted the events into # room_memberships, where applicable. + # NB: This function invalidates all state related caches self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) def _persist_event_auth_chain_txn( @@ -1170,17 +1195,16 @@ class PersistEventsStore: ) # Invalidate the various caches - - for member in members_changed: - txn.call_after( - self.store.get_rooms_for_user_with_stream_ordering.invalidate, - (member,), - ) - self.store._invalidate_state_caches_and_stream( txn, room_id, members_changed ) + # Check if any of the remote membership changes requires us to + # unsubscribe from their device lists. + self.store.handle_potentially_left_users_txn( + txn, {m for m in members_changed if not self.hs.is_mine_id(m)} + ) + def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None: """Update the room version in the database based off current state events. @@ -1220,9 +1244,6 @@ class PersistEventsStore: self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) - txn.call_after( - self.store.get_latest_event_ids_in_room.invalidate, (room_id,) - ) self.db_pool.simple_insert_many_txn( txn, @@ -1258,9 +1279,10 @@ class PersistEventsStore: Pick the earliest non-outlier if there is one, else the earliest one. Args: - events_and_contexts (list[(EventBase, EventContext)]): + events_and_contexts: + Returns: - list[(EventBase, EventContext)]: filtered list + filtered list """ new_events_and_contexts: OrderedDict[ str, Tuple[EventBase, EventContext] @@ -1286,14 +1308,11 @@ class PersistEventsStore: """Update min_depth for each room Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting + txn: db connection + events_and_contexts: events we are persisting """ depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: - # Remove the any existing cache entries for the event_ids - self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) # Then update the `stream_ordering` position to mark the latest # event as the front of the room. This should not be done for # backfilled events because backfilled events have negative @@ -1490,7 +1509,7 @@ class PersistEventsStore: event.sender, "url" in event.content and isinstance(event.content["url"], str), event.get_state_key(), - context.rejected or None, + context.rejected, ) for event, context in events_and_contexts ), @@ -1561,13 +1580,11 @@ class PersistEventsStore: """Update all the miscellaneous tables for new events Args: - txn (twisted.enterprise.adbapi.Connection): db connection - events_and_contexts (list[(EventBase, EventContext)]): events - we are persisting - all_events_and_contexts (list[(EventBase, EventContext)]): all - events that we were going to persist. This includes events - we've already persisted, etc, that wouldn't appear in - events_and_context. + txn: db connection + events_and_contexts: events we are persisting + all_events_and_contexts: all events that we were going to persist. + This includes events we've already persisted, etc, that wouldn't + appear in events_and_context. inhibit_local_membership_updates: Stop the local_current_membership from being updated by these events. This should be set to True for backfilled events because backfilled events in the past do @@ -1594,7 +1611,7 @@ class PersistEventsStore: ) # Remove from relations table. - self._handle_redact_relations(txn, event.redacts) + self._handle_redact_relations(txn, event.room_id, event.redacts) # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. @@ -1695,16 +1712,7 @@ class PersistEventsStore: txn.async_call_after(prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: - """Invalidate the caches for the redacted event. - - Note that these caches are also cleared as part of event replication in - _invalidate_caches_for_event. - """ assert event.redacts is not None - self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) - txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,)) - txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,)) - self.db_pool.simple_upsert_txn( txn, table="redactions", @@ -1805,34 +1813,6 @@ class PersistEventsStore: for event in events: assert event.internal_metadata.stream_ordering is not None - txn.call_after( - self.store._membership_stream_cache.entity_has_changed, - event.state_key, - event.internal_metadata.stream_ordering, - ) - txn.call_after( - self.store.get_invited_rooms_for_local_user.invalidate, - (event.state_key,), - ) - txn.call_after( - self.store.get_local_users_in_room.invalidate, - (event.room_id,), - ) - txn.call_after( - self.store.get_number_joined_users_in_room.invalidate, - (event.room_id,), - ) - txn.call_after( - self.store.get_user_in_room_with_profile.invalidate, - (event.room_id, event.state_key), - ) - - # The `_get_membership_from_event_id` is immutable, except for the - # case where we look up an event *before* persisting it. - txn.call_after( - self.store._get_membership_from_event_id.invalidate, - (event.event_id,), - ) # We update the local_current_membership table only if the event is # "current", i.e., its something that has just happened. @@ -1881,33 +1861,32 @@ class PersistEventsStore: }, ) - txn.call_after( - self.store.get_relations_for_event.invalidate, (relation.parent_id,) - ) - txn.call_after( - self.store.get_aggregation_groups_for_event.invalidate, - (relation.parent_id,), - ) - txn.call_after( - self.store.get_mutual_event_relations_for_rel_type.invalidate, - (relation.parent_id,), - ) - - if relation.rel_type == RelationTypes.REPLACE: - txn.call_after( - self.store.get_applicable_edit.invalidate, (relation.parent_id,) - ) - if relation.rel_type == RelationTypes.THREAD: - txn.call_after( - self.store.get_thread_summary.invalidate, (relation.parent_id,) - ) - # It should be safe to only invalidate the cache if the user has not - # previously participated in the thread, but that's difficult (and - # potentially error-prone) so it is always invalidated. - txn.call_after( - self.store.get_thread_participated.invalidate, - (relation.parent_id, event.sender), + # Upsert into the threads table, but only overwrite the value if the + # new event is of a later topological order OR if the topological + # ordering is equal, but the stream ordering is later. + sql = """ + INSERT INTO threads (room_id, thread_id, latest_event_id, topological_ordering, stream_ordering) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (room_id, thread_id) + DO UPDATE SET + latest_event_id = excluded.latest_event_id, + topological_ordering = excluded.topological_ordering, + stream_ordering = excluded.stream_ordering + WHERE + threads.topological_ordering <= excluded.topological_ordering AND + threads.stream_ordering < excluded.stream_ordering + """ + + txn.execute( + sql, + ( + event.room_id, + relation.parent_id, + event.event_id, + event.depth, + event.internal_metadata.stream_ordering, + ), ) def _handle_insertion_event( @@ -2033,35 +2012,52 @@ class PersistEventsStore: txn.execute(sql, (batch_id,)) def _handle_redact_relations( - self, txn: LoggingTransaction, redacted_event_id: str + self, txn: LoggingTransaction, room_id: str, redacted_event_id: str ) -> None: """Handles receiving a redaction and checking whether the redacted event has any relations which must be removed from the database. Args: txn + room_id: The room ID of the event that was redacted. redacted_event_id: The event that was redacted. """ - # Fetch the current relation of the event being redacted. - redacted_relates_to = self.db_pool.simple_select_one_onecol_txn( + # Fetch the relation of the event being redacted. + row = self.db_pool.simple_select_one_txn( txn, table="event_relations", keyvalues={"event_id": redacted_event_id}, - retcol="relates_to_id", + retcols=("relates_to_id", "relation_type"), allow_none=True, ) + # Nothing to do if no relation is found. + if row is None: + return + + redacted_relates_to = row["relates_to_id"] + rel_type = row["relation_type"] + self.db_pool.simple_delete_txn( + txn, table="event_relations", keyvalues={"event_id": redacted_event_id} + ) + # Any relation information for the related event must be cleared. - if redacted_relates_to is not None: + self.store._invalidate_cache_and_stream( + txn, self.store.get_relations_for_event, (redacted_relates_to,) + ) + if rel_type == RelationTypes.ANNOTATION: self.store._invalidate_cache_and_stream( - txn, self.store.get_relations_for_event, (redacted_relates_to,) + txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,) ) + if rel_type == RelationTypes.REFERENCE: self.store._invalidate_cache_and_stream( - txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,) + txn, self.store.get_references_for_event, (redacted_relates_to,) ) + if rel_type == RelationTypes.REPLACE: self.store._invalidate_cache_and_stream( txn, self.store.get_applicable_edit, (redacted_relates_to,) ) + if rel_type == RelationTypes.THREAD: self.store._invalidate_cache_and_stream( txn, self.store.get_thread_summary, (redacted_relates_to,) ) @@ -2069,14 +2065,41 @@ class PersistEventsStore: txn, self.store.get_thread_participated, (redacted_relates_to,) ) self.store._invalidate_cache_and_stream( - txn, - self.store.get_mutual_event_relations_for_rel_type, - (redacted_relates_to,), + txn, self.store.get_threads, (room_id,) ) - self.db_pool.simple_delete_txn( - txn, table="event_relations", keyvalues={"event_id": redacted_event_id} - ) + # Find the new latest event in the thread. + sql = """ + SELECT event_id, topological_ordering, stream_ordering + FROM event_relations + INNER JOIN events USING (event_id) + WHERE relates_to_id = ? AND relation_type = ? + ORDER BY topological_ordering DESC, stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (redacted_relates_to, RelationTypes.THREAD)) + + # If a latest event is found, update the threads table, this might + # be the same current latest event (if an earlier event in the thread + # was redacted). + latest_event_row = txn.fetchone() + if latest_event_row: + self.db_pool.simple_upsert_txn( + txn, + table="threads", + keyvalues={"room_id": room_id, "thread_id": redacted_relates_to}, + values={ + "latest_event_id": latest_event_row[0], + "topological_ordering": latest_event_row[1], + "stream_ordering": latest_event_row[2], + }, + ) + + # Otherwise, delete the thread: it no longer exists. + else: + self.db_pool.simple_delete_one_txn( + txn, table="threads", keyvalues={"thread_id": redacted_relates_to} + ) def _store_room_topic_txn(self, txn: LoggingTransaction, event: EventBase) -> None: if isinstance(event.content.get("topic"), str): @@ -2178,26 +2201,26 @@ class PersistEventsStore: appear in events_and_context. """ - # Only non outlier events will have push actions associated with them, + # Only notifiable events will have push actions associated with them, # so let's filter them out. (This makes joining large rooms faster, as # these queries took seconds to process all the state events). - non_outlier_events = [ + notifiable_events = [ event for event, _ in events_and_contexts - if not event.internal_metadata.is_outlier() + if event.internal_metadata.is_notifiable() ] sql = """ INSERT INTO event_push_actions ( room_id, event_id, user_id, actions, stream_ordering, - topological_ordering, notif, highlight, unread + topological_ordering, notif, highlight, unread, thread_id ) - SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread + SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight, unread, thread_id FROM event_push_actions_staging WHERE event_id = ? """ - if non_outlier_events: + if notifiable_events: txn.execute_batch( sql, ( @@ -2207,32 +2230,10 @@ class PersistEventsStore: event.depth, event.event_id, ) - for event in non_outlier_events + for event in notifiable_events ), ) - room_to_event_ids: Dict[str, List[str]] = {} - for e in non_outlier_events: - room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) - - for room_id, event_ids in room_to_event_ids.items(): - rows = self.db_pool.simple_select_many_txn( - txn, - table="event_push_actions_staging", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("user_id",), - ) - - user_ids = {row["user_id"] for row in rows} - - for user_id in user_ids: - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id, user_id), - ) - # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( @@ -2240,18 +2241,13 @@ class PersistEventsStore: ( (event.event_id,) for event, _ in all_events_and_contexts - if not event.internal_metadata.is_outlier() + if event.internal_metadata.is_notifiable() ), ) def _remove_push_actions_for_event_id_txn( self, txn: LoggingTransaction, room_id: str, event_id: str ) -> None: - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id,), - ) txn.execute( "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", (room_id, event_id), @@ -2433,17 +2429,31 @@ class PersistEventsStore: "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" ) + backward_extremity_tuples_to_remove = [ + (ev.event_id, ev.room_id) + for ev in events + if not ev.internal_metadata.is_outlier() + # If we encountered an event with no prev_events, then we might + # as well remove it now because it won't ever have anything else + # to backfill from. + or len(ev.prev_event_ids()) == 0 + ] txn.execute_batch( query, - [ - (ev.event_id, ev.room_id) - for ev in events - if not ev.internal_metadata.is_outlier() - # If we encountered an event with no prev_events, then we might - # as well remove it now because it won't ever have anything else - # to backfill from. - or len(ev.prev_event_ids()) == 0 - ], + backward_extremity_tuples_to_remove, + ) + + # Clear out the failed backfill attempts after we successfully pulled + # the event. Since we no longer need these events as backward + # extremities, it also means that they won't be backfilled from again so + # we no longer need to store the backfill attempts around it. + query = """ + DELETE FROM event_failed_pull_attempts + WHERE event_id = ? and room_id = ? + """ + txn.execute_batch( + query, + backward_extremity_tuples_to_remove, ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 6e8aeed7b4..9e31798ab1 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1435,16 +1435,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ), ) - endpoint = None row = txn.fetchone() if row: endpoint = row[0] + else: + # if the query didn't return a row, we must be almost done. We just + # need to go up to the recorded max_stream_ordering. + endpoint = max_stream_ordering_inclusive - where_clause = "stream_ordering > ?" - args = [min_stream_ordering_exclusive] - if endpoint: - where_clause += " AND stream_ordering <= ?" - args.append(endpoint) + where_clause = "stream_ordering > ? AND stream_ordering <= ?" + args = [min_stream_ordering_exclusive, endpoint] # now do the updates. txn.execute( @@ -1458,13 +1458,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) logger.info( - "populated new `events` columns up to %s/%i: updated %i rows", + "populated new `events` columns up to %i/%i: updated %i rows", endpoint, max_stream_ordering_inclusive, txn.rowcount, ) - if endpoint is None: + if endpoint >= max_stream_ordering_inclusive: # we're done return True diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 29c99c6357..01e935edef 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -54,11 +54,11 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) +from synapse.logging.opentracing import start_active_span, tag_args, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -80,6 +80,8 @@ from synapse.util import unwrapFirstError from synapse.util.async_helpers import ObservableDeferred, delay_cancellation from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import AsyncLruCache +from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -210,26 +212,35 @@ class EventsWorkerStore(SQLBaseStore): # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # updated over replication. (Multiple writers are not supported for # SQLite). - if hs.get_instance_name() in hs.config.worker.writers.events: - self._stream_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) - else: - self._stream_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering" - ) - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 - ) + self._stream_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + is_writer=hs.get_instance_name() in hs.config.worker.writers.events, + ) + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], + is_writer=hs.get_instance_name() in hs.config.worker.writers.events, + ) + + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( + db_conn, + "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache( + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) if hs.config.worker.run_background_tasks: # We periodically clean out old transaction ID mappings @@ -338,6 +349,7 @@ class EventsWorkerStore(SQLBaseStore): ) -> Optional[EventBase]: ... + @cancellable async def get_event( self, event_id: str, @@ -371,7 +383,7 @@ class EventsWorkerStore(SQLBaseStore): If there is a mismatch, behave as per allow_none. Returns: - The event, or None if the event was not found. + The event, or None if the event was not found and allow_none is `True`. """ if not isinstance(event_id, str): raise TypeError("Invalid event event_id %r" % (event_id,)) @@ -430,6 +442,9 @@ class EventsWorkerStore(SQLBaseStore): return {e.event_id: e for e in events} + @trace + @tag_args + @cancellable async def get_events_as_list( self, event_ids: Collection[str], @@ -468,7 +483,7 @@ class EventsWorkerStore(SQLBaseStore): return [] # there may be duplicates so we cast the list to a set - event_entry_map = await self._get_events_from_cache_or_db( + event_entry_map = await self.get_unredacted_events_from_cache_or_db( set(event_ids), allow_rejected=allow_rejected ) @@ -503,7 +518,9 @@ class EventsWorkerStore(SQLBaseStore): continue redacted_event_id = entry.event.redacts - event_map = await self._get_events_from_cache_or_db([redacted_event_id]) + event_map = await self.get_unredacted_events_from_cache_or_db( + [redacted_event_id] + ) original_event_entry = event_map.get(redacted_event_id) if not original_event_entry: # we don't have the redacted event (or it was rejected). @@ -581,11 +598,17 @@ class EventsWorkerStore(SQLBaseStore): return events - async def _get_events_from_cache_or_db( - self, event_ids: Iterable[str], allow_rejected: bool = False + @cancellable + async def get_unredacted_events_from_cache_or_db( + self, + event_ids: Iterable[str], + allow_rejected: bool = False, ) -> Dict[str, EventCacheEntry]: """Fetch a bunch of events from the cache or the database. + Note that the events pulled by this function will not have any redactions + applied, and no guarantee is made about the ordering of the events returned. + If events are pulled from the database, they will be cached for future lookups. Unknown events are omitted from the response. @@ -600,7 +623,11 @@ class EventsWorkerStore(SQLBaseStore): Returns: map from event id to result """ - event_entry_map = await self._get_events_from_cache( + # Shortcut: check if we have any events in the *in memory* cache - this function + # may be called repeatedly for the same event so at this point we cannot reach + # out to any external cache for performance reasons. The external cache is + # checked later on in the `get_missing_events_from_cache_or_db` function below. + event_entry_map = self._get_events_from_local_cache( event_ids, ) @@ -632,7 +659,9 @@ class EventsWorkerStore(SQLBaseStore): if missing_events_ids: - async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]: + async def get_missing_events_from_cache_or_db() -> Dict[ + str, EventCacheEntry + ]: """Fetches the events in `missing_event_ids` from the database. Also creates entries in `self._current_event_fetches` to allow @@ -657,10 +686,18 @@ class EventsWorkerStore(SQLBaseStore): # the events have been redacted, and if so pulling the redaction event # out of the database to check it. # + missing_events = {} try: - missing_events = await self._get_events_from_db( + # Try to fetch from any external cache. We already checked the + # in-memory cache above. + missing_events = await self._get_events_from_external_cache( missing_events_ids, ) + # Now actually fetch any remaining events from the DB + db_missing_events = await self._get_events_from_db( + missing_events_ids - missing_events.keys(), + ) + missing_events.update(db_missing_events) except Exception as e: with PreserveLoggingContext(): fetching_deferred.errback(e) @@ -679,7 +716,7 @@ class EventsWorkerStore(SQLBaseStore): # cancellations, since multiple `_get_events_from_cache_or_db` calls can # reuse the same fetch. missing_events: Dict[str, EventCacheEntry] = await delay_cancellation( - get_missing_events_from_db() + get_missing_events_from_cache_or_db() ) event_entry_map.update(missing_events) @@ -754,7 +791,54 @@ class EventsWorkerStore(SQLBaseStore): async def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: - """Fetch events from the caches. + """Fetch events from the caches, both in memory and any external. + + May return rejected events. + + Args: + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics + """ + event_map = self._get_events_from_local_cache( + events, update_metrics=update_metrics + ) + + missing_event_ids = (e for e in events if e not in event_map) + event_map.update( + await self._get_events_from_external_cache( + events=missing_event_ids, + update_metrics=update_metrics, + ) + ) + + return event_map + + async def _get_events_from_external_cache( + self, events: Iterable[str], update_metrics: bool = True + ) -> Dict[str, EventCacheEntry]: + """Fetch events from any configured external cache. + + May return rejected events. + + Args: + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics + """ + event_map = {} + + for event_id in events: + ret = await self._get_event_cache.get_external( + (event_id,), None, update_metrics=update_metrics + ) + if ret: + event_map[event_id] = ret + + return event_map + + def _get_events_from_local_cache( + self, events: Iterable[str], update_metrics: bool = True + ) -> Dict[str, EventCacheEntry]: + """Fetch events from the local, in memory, caches. May return rejected events. @@ -766,7 +850,7 @@ class EventsWorkerStore(SQLBaseStore): for event_id in events: # First check if it's in the event cache - ret = await self._get_event_cache.get( + ret = self._get_event_cache.get_local( (event_id,), None, update_metrics=update_metrics ) if ret: @@ -788,7 +872,7 @@ class EventsWorkerStore(SQLBaseStore): # We add the entry back into the cache as we want to keep # recently queried events in the cache. - await self._get_event_cache.set((event_id,), cache_entry) + self._get_event_cache.set_local((event_id,), cache_entry) return event_map @@ -1029,23 +1113,42 @@ class EventsWorkerStore(SQLBaseStore): """ fetched_event_ids: Set[str] = set() fetched_events: Dict[str, _EventRow] = {} - events_to_fetch = event_ids - while events_to_fetch: - row_map = await self._enqueue_events(events_to_fetch) + async def _fetch_event_ids_and_get_outstanding_redactions( + event_ids_to_fetch: Collection[str], + ) -> Collection[str]: + """ + Fetch all of the given event_ids and return any associated redaction event_ids + that we still need to fetch in the next iteration. + """ + row_map = await self._enqueue_events(event_ids_to_fetch) # we need to recursively fetch any redactions of those events redaction_ids: Set[str] = set() - for event_id in events_to_fetch: + for event_id in event_ids_to_fetch: row = row_map.get(event_id) fetched_event_ids.add(event_id) if row: fetched_events[event_id] = row redaction_ids.update(row.redactions) - events_to_fetch = redaction_ids.difference(fetched_event_ids) - if events_to_fetch: - logger.debug("Also fetching redaction events %s", events_to_fetch) + event_ids_to_fetch = redaction_ids.difference(fetched_event_ids) + return event_ids_to_fetch + + # Grab the initial list of events requested + event_ids_to_fetch = await _fetch_event_ids_and_get_outstanding_redactions( + event_ids + ) + # Then go and recursively find all of the associated redactions + with start_active_span("recursively fetching redactions"): + while event_ids_to_fetch: + logger.debug("Also fetching redaction events %s", event_ids_to_fetch) + + event_ids_to_fetch = ( + await _fetch_event_ids_and_get_outstanding_redactions( + event_ids_to_fetch + ) + ) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {} @@ -1073,7 +1176,7 @@ class EventsWorkerStore(SQLBaseStore): if format_version is None: # This means that we stored the event before we had the concept # of a event format version, so it must be a V1 event. - format_version = EventFormatVersions.V1 + format_version = EventFormatVersions.ROOM_V1_V2 room_version_id = row.room_version_id @@ -1103,10 +1206,10 @@ class EventsWorkerStore(SQLBaseStore): # # So, the following approximations should be adequate. - if format_version == EventFormatVersions.V1: + if format_version == EventFormatVersions.ROOM_V1_V2: # if it's event format v1 then it must be room v1 or v2 room_version = RoomVersions.V1 - elif format_version == EventFormatVersions.V2: + elif format_version == EventFormatVersions.ROOM_V3: # if it's event format v2 then it must be room v3 room_version = RoomVersions.V3 else: @@ -1363,6 +1466,8 @@ class EventsWorkerStore(SQLBaseStore): return {r["event_id"] for r in rows} + @trace + @tag_args async def have_seen_events( self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: @@ -1385,36 +1490,36 @@ class EventsWorkerStore(SQLBaseStore): # the batches as big as possible. results: Set[str] = set() - for chunk in batch_iter(event_ids, 500): - r = await self._have_seen_events_dict( - [(room_id, event_id) for event_id in chunk] + for event_ids_chunk in batch_iter(event_ids, 500): + events_seen_dict = await self._have_seen_events_dict( + room_id, event_ids_chunk + ) + results.update( + eid for (eid, have_event) in events_seen_dict.items() if have_event ) - results.update(eid for ((_rid, eid), have_event) in r.items() if have_event) return results - @cachedList(cached_method_name="have_seen_event", list_name="keys") + @cachedList(cached_method_name="have_seen_event", list_name="event_ids") async def _have_seen_events_dict( - self, keys: Collection[Tuple[str, str]] - ) -> Dict[Tuple[str, str], bool]: + self, + room_id: str, + event_ids: Collection[str], + ) -> Dict[str, bool]: """Helper for have_seen_events Returns: - a dict {(room_id, event_id)-> bool} + a dict {event_id -> bool} """ - # if the event cache contains the event, obviously we've seen it. - - cache_results = { - (rid, eid) - for (rid, eid) in keys - if await self._get_event_cache.contains((eid,)) - } - results = dict.fromkeys(cache_results, True) - remaining = [k for k in keys if k not in cache_results] - if not remaining: - return results - - def have_seen_events_txn(txn: LoggingTransaction) -> None: + # TODO: We used to query the _get_event_cache here as a fast-path before + # hitting the database. For if an event were in the cache, we've presumably + # seen it before. + # + # But this is currently an invalid assumption due to the _get_event_cache + # not being invalidated when purging events from a room. The optimisation can + # be re-added after https://github.com/matrix-org/synapse/issues/13476 + + def have_seen_events_txn(txn: LoggingTransaction) -> Dict[str, bool]: # we deliberately do *not* query the database for room_id, to make the # query an index-only lookup on `events_event_id_key`. # @@ -1422,23 +1527,22 @@ class EventsWorkerStore(SQLBaseStore): sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining] + txn.database_engine, "e.event_id", event_ids ) txn.execute(sql + clause, args) found_events = {eid for eid, in txn} # ... and then we can update the results for each key - results.update( - {(rid, eid): (eid in found_events) for (rid, eid) in remaining} - ) + return {eid: (eid in found_events) for eid in event_ids} - await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn) - return results + return await self.db_pool.runInteraction( + "have_seen_events", have_seen_events_txn + ) @cached(max_entries=100000, tree=True) async def have_seen_event(self, room_id: str, event_id: str) -> bool: - res = await self._have_seen_events_dict(((room_id, event_id),)) - return res[(room_id, event_id)] + res = await self._have_seen_events_dict(room_id, [event_id]) + return res[event_id] def _get_current_state_event_counts_txn( self, txn: LoggingTransaction, room_id: str @@ -1478,7 +1582,7 @@ class EventsWorkerStore(SQLBaseStore): room_id: The room ID to query. Returns: - dict[str:float] of complexity version to complexity. + Map of complexity version to complexity. """ state_events = await self.get_current_state_event_counts(room_id) @@ -1876,12 +1980,17 @@ class EventsWorkerStore(SQLBaseStore): Args: room_id: room where the event lives - event_id: event to check + event: event to check (can't be an `outlier`) Returns: Boolean indicating whether it's an extremity """ + assert not event.internal_metadata.is_outlier(), ( + "is_event_next_to_backward_gap(...) can't be used with `outlier` events. " + "This function relies on `event_backward_extremities` which won't be filled in for `outliers`." + ) + def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool: # If the event in question has any of its prev_events listed as a # backward extremity, it's next to a gap. @@ -1931,12 +2040,17 @@ class EventsWorkerStore(SQLBaseStore): Args: room_id: room where the event lives - event_id: event to check + event: event to check (can't be an `outlier`) Returns: Boolean indicating whether it's an extremity """ + assert not event.internal_metadata.is_outlier(), ( + "is_event_next_to_forward_gap(...) can't be used with `outlier` events. " + "This function relies on `event_edges` and `event_forward_extremities` which won't be filled in for `outliers`." + ) + def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool: # If the event in question is a forward extremity, we will just # consider any potential forward gap as not a gap since it's one of @@ -2017,35 +2131,50 @@ class EventsWorkerStore(SQLBaseStore): The closest event_id otherwise None if we can't find any event in the given direction. """ + if direction == "b": + # Find closest event *before* a given timestamp. We use descending + # (which gives values largest to smallest) because we want the + # largest possible timestamp *before* the given timestamp. + comparison_operator = "<=" + order = "DESC" + else: + # Find closest event *after* a given timestamp. We use ascending + # (which gives values smallest to largest) because we want the + # closest possible timestamp *after* the given timestamp. + comparison_operator = ">=" + order = "ASC" - sql_template = """ + sql_template = f""" SELECT event_id FROM events LEFT JOIN rejections USING (event_id) WHERE - origin_server_ts %s ? - AND room_id = ? + room_id = ? + AND origin_server_ts {comparison_operator} ? + /** + * Make sure the event isn't an `outlier` because we have no way + * to later check whether it's next to a gap. `outliers` do not + * have entries in the `event_edges`, `event_forward_extremeties`, + * and `event_backward_extremities` tables to check against + * (used by `is_event_next_to_backward_gap` and `is_event_next_to_forward_gap`). + */ + AND NOT outlier /* Make sure event is not rejected */ AND rejections.event_id IS NULL - ORDER BY origin_server_ts %s + /** + * First sort by the message timestamp. If the message timestamps are the + * same, we want the message that logically comes "next" (before/after + * the given timestamp) based on the DAG and its topological order (`depth`). + * Finally, we can tie-break based on when it was received on the server + * (`stream_ordering`). + */ + ORDER BY origin_server_ts {order}, depth {order}, stream_ordering {order} LIMIT 1; """ def get_event_id_for_timestamp_txn(txn: LoggingTransaction) -> Optional[str]: - if direction == "b": - # Find closest event *before* a given timestamp. We use descending - # (which gives values largest to smallest) because we want the - # largest possible timestamp *before* the given timestamp. - comparison_operator = "<=" - order = "DESC" - else: - # Find closest event *after* a given timestamp. We use ascending - # (which gives values smallest to largest) because we want the - # closest possible timestamp *after* the given timestamp. - comparison_operator = ">=" - order = "ASC" - txn.execute( - sql_template % (comparison_operator, order), (timestamp, room_id) + sql_template, + (room_id, timestamp), ) row = txn.fetchone() if row: @@ -2099,7 +2228,15 @@ class EventsWorkerStore(SQLBaseStore): return result is not None async def get_partial_state_events_batch(self, room_id: str) -> List[str]: - """Get a list of events in the given room that have partial state""" + """ + Get a list of events in the given room that: + - have partial state; and + - are ready to be resynced (because they have no prev_events that are + partial-stated) + + See the docstring on `_get_partial_state_events_batch_txn` for more + information. + """ return await self.db_pool.runInteraction( "get_partial_state_events_batch", self._get_partial_state_events_batch_txn, @@ -2139,3 +2276,63 @@ class EventsWorkerStore(SQLBaseStore): (room_id,), ) return [row[0] for row in txn] + + def mark_event_rejected_txn( + self, + txn: LoggingTransaction, + event_id: str, + rejection_reason: Optional[str], + ) -> None: + """Mark an event that was previously accepted as rejected, or vice versa + + This can happen, for example, when resyncing state during a faster join. + + Args: + txn: + event_id: ID of event to update + rejection_reason: reason it has been rejected, or None if it is now accepted + """ + if rejection_reason is None: + logger.info( + "Marking previously-processed event %s as accepted", + event_id, + ) + self.db_pool.simple_delete_txn( + txn, + "rejections", + keyvalues={"event_id": event_id}, + ) + else: + logger.info( + "Marking previously-processed event %s as rejected(%s)", + event_id, + rejection_reason, + ) + self.db_pool.simple_upsert_txn( + txn, + table="rejections", + keyvalues={"event_id": event_id}, + values={ + "reason": rejection_reason, + "last_check": self._clock.time_msec(), + }, + ) + self.db_pool.simple_update_txn( + txn, + table="events", + keyvalues={"event_id": event_id}, + updatevalues={"rejection_reason": rejection_reason}, + ) + + self.invalidate_get_event_cache_after_txn(txn, event_id) + + # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just + # call '_send_invalidation_to_replication', but we actually need the other + # end to call _invalidate_local_get_event_cache() rather than (just) + # _get_event_cache.invalidate(). + # + # One solution might be to (somehow) get the workers to call + # _invalidate_caches_for_event() (though that will invalidate more than + # strictly necessary). + # + # https://github.com/matrix-org/synapse/issues/12994 diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index cb9ee08fa8..12f3b601f1 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py
@@ -24,7 +24,7 @@ from synapse.types import JsonDict from synapse.util.caches.descriptors import cached -class FilteringStore(SQLBaseStore): +class FilteringWorkerStore(SQLBaseStore): @cached(num_args=2) async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] @@ -46,6 +46,8 @@ class FilteringStore(SQLBaseStore): return db_to_json(def_json) + +class FilteringStore(FilteringWorkerStore): async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int: def_json = encode_canonical_json(user_filter) diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore): now = self._clock.time_msec() token = random_string(6) - if self.db_pool.engine.can_native_upsert: - - def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: - # We take out the lock if either a) there is no row for the lock - # already, b) the existing row has timed out, or c) the row is - # for this instance (which means the process got killed and - # restarted) - sql = """ - INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?) - ON CONFLICT (lock_name, lock_key) - DO UPDATE - SET - token = EXCLUDED.token, - instance_name = EXCLUDED.instance_name, - last_renewed_ts = EXCLUDED.last_renewed_ts - WHERE - worker_locks.last_renewed_ts < ? - OR worker_locks.instance_name = EXCLUDED.instance_name - """ - txn.execute( - sql, - ( - lock_name, - lock_key, - self._instance_name, - token, - now, - now - _LOCK_TIMEOUT_MS, - ), - ) - - # We only acquired the lock if we inserted or updated the table. - return bool(txn.rowcount) - - did_lock = await self.db_pool.runInteraction( - "try_acquire_lock", - _try_acquire_lock_txn, - # We can autocommit here as we're executing a single query, this - # will avoid serialization errors. - db_autocommit=True, + def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: + # We take out the lock if either a) there is no row for the lock + # already, b) the existing row has timed out, or c) the row is + # for this instance (which means the process got killed and + # restarted) + sql = """ + INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (lock_name, lock_key) + DO UPDATE + SET + token = EXCLUDED.token, + instance_name = EXCLUDED.instance_name, + last_renewed_ts = EXCLUDED.last_renewed_ts + WHERE + worker_locks.last_renewed_ts < ? + OR worker_locks.instance_name = EXCLUDED.instance_name + """ + txn.execute( + sql, + ( + lock_name, + lock_key, + self._instance_name, + token, + now, + now - _LOCK_TIMEOUT_MS, + ), ) - if not did_lock: - return None - - else: - # If we're on an old SQLite we emulate the above logic by first - # clearing out any existing stale locks and then upserting. - - def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool: - sql = """ - DELETE FROM worker_locks - WHERE - lock_name = ? - AND lock_key = ? - AND (last_renewed_ts < ? OR instance_name = ?) - """ - txn.execute( - sql, - (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name), - ) - - inserted = self.db_pool.simple_upsert_txn_emulated( - txn, - table="worker_locks", - keyvalues={ - "lock_name": lock_name, - "lock_key": lock_key, - }, - values={}, - insertion_values={ - "token": token, - "last_renewed_ts": self._clock.time_msec(), - "instance_name": self._instance_name, - }, - ) - - return inserted - did_lock = await self.db_pool.runInteraction( - "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn - ) + # We only acquired the lock if we inserted or updated the table. + return bool(txn.rowcount) - if not did_lock: - return None + did_lock = await self.db_pool.runInteraction( + "try_acquire_lock", + _try_acquire_lock_txn, + # We can autocommit here as we're executing a single query, this + # will avoid serialization errors. + db_autocommit=True, + ) + if not did_lock: + return None lock = Lock( self._reactor, diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index efd136a864..db9a24db5e 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -217,7 +217,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): def _reap_users(txn: LoggingTransaction, reserved_users: List[str]) -> None: """ Args: - reserved_users (tuple): reserved users to preserve + reserved_users: reserved users to preserve """ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) @@ -370,8 +370,8 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): should not appear in the MAU stats). Args: - txn (cursor): - user_id (str): user to add/update + txn: + user_id: user to add/update """ assert ( self._update_on_this_worker @@ -401,7 +401,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): add the user to the monthly active tables Args: - user_id(str): the user_id to query + user_id: the user_id to query """ assert ( self._update_on_this_worker diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index f6822707e4..9213ce0b5a 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py
@@ -419,6 +419,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "event_forward_extremities", "event_push_actions", "event_search", + "event_failed_pull_attempts", "partial_state_events", "events", "federation_inbound_events_staging", @@ -441,6 +442,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "e2e_room_keys", "event_push_summary", "pusher_throttle", + "insertion_events", + "insertion_event_extremities", + "insertion_event_edges", + "batch_events", "room_account_data", "room_tags", # "rooms" happens last, to keep the foreign keys in the other tables diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 768f95d16c..12ad44dbb3 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -12,15 +12,26 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import abc import logging -from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + Iterable, + List, + Mapping, + Optional, + Sequence, + Tuple, + Union, + cast, +) from synapse.api.errors import StoreError from synapse.config.homeserver import ExperimentalConfig -from synapse.push.baserules import list_with_base_rules -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.replication.tcp.streams import PushRulesStream +from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -39,6 +50,7 @@ from synapse.storage.util.id_generators import ( IdGenerator, StreamIdGenerator, ) +from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList @@ -50,64 +62,34 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -def _is_experimental_rule_enabled( - rule_id: str, experimental_config: ExperimentalConfig -) -> bool: - """Used by `_load_rules` to filter out experimental rules when they - have not been enabled. - """ - if ( - rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl" - and not experimental_config.msc3786_enabled - ): - return False - if ( - rule_id == "global/underride/.org.matrix.msc3772.thread_reply" - and not experimental_config.msc3772_enabled - ): - return False - return True - - def _load_rules( rawrules: List[JsonDict], enabled_map: Dict[str, bool], experimental_config: ExperimentalConfig, -) -> List[JsonDict]: - ruleslist = [] - for rawrule in rawrules: - rule = dict(rawrule) - rule["conditions"] = db_to_json(rawrule["conditions"]) - rule["actions"] = db_to_json(rawrule["actions"]) - rule["default"] = False - ruleslist.append(rule) - - # We're going to be mutating this a lot, so copy it. We also filter out - # any experimental default push rules that aren't enabled. - rules = [ - rule - for rule in list_with_base_rules(ruleslist) - if _is_experimental_rule_enabled(rule["rule_id"], experimental_config) - ] +) -> FilteredPushRules: + """Take the DB rows returned from the DB and convert them into a full + `FilteredPushRules` object. + """ - for i, rule in enumerate(rules): - rule_id = rule["rule_id"] + ruleslist = [ + PushRule.from_db( + rule_id=rawrule["rule_id"], + priority_class=rawrule["priority_class"], + conditions=rawrule["conditions"], + actions=rawrule["actions"], + ) + for rawrule in rawrules + ] - if rule_id not in enabled_map: - continue - if rule.get("enabled", True) == bool(enabled_map[rule_id]): - continue + push_rules = PushRules(ruleslist) - # Rules are cached across users. - rule = dict(rule) - rule["enabled"] = bool(enabled_map[rule_id]) - rules[i] = rule + filtered_rules = FilteredPushRules( + push_rules, enabled_map, msc3664_enabled=experimental_config.msc3664_enabled + ) - return rules + return filtered_rules -# The ABCMeta metaclass ensures that it cannot be instantiated without -# the abstract methods being implemented. class PushRulesWorkerStore( ApplicationServiceWorkerStore, PusherWorkerStore, @@ -115,7 +97,6 @@ class PushRulesWorkerStore( ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore, - metaclass=abc.ABCMeta, ): """This is an abstract base class where subclasses must implement `get_max_push_rules_stream_id` which can be called in the initializer. @@ -129,14 +110,14 @@ class PushRulesWorkerStore( ): super().__init__(database, db_conn, hs) - if hs.config.worker.worker_app is None: - self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( - db_conn, "push_rules_stream", "stream_id" - ) - else: - self._push_rules_stream_id_gen = SlavedIdTracker( - db_conn, "push_rules_stream", "stream_id" - ) + # In the worker store this is an ID tracker which we overwrite in the non-worker + # class below that is used on the main process. + self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + db_conn, + "push_rules_stream", + "stream_id", + is_writer=hs.config.worker.worker_app is None, + ) push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict( db_conn, @@ -152,17 +133,26 @@ class PushRulesWorkerStore( prefilled_cache=push_rules_prefill, ) - @abc.abstractmethod def get_max_push_rules_stream_id(self) -> int: """Get the position of the push rules stream. Returns: int """ - raise NotImplementedError() + return self._push_rules_stream_id_gen.get_current_token() + + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: + if stream_name == PushRulesStream.NAME: + self._push_rules_stream_id_gen.advance(instance_name, token) + for row in rows: + self.get_push_rules_for_user.invalidate((row.user_id,)) + self.push_rules_stream_cache.entity_has_changed(row.user_id, token) + return super().process_replication_rows(stream_name, instance_name, token, rows) @cached(max_entries=5000) - async def get_push_rules_for_user(self, user_id: str) -> List[JsonDict]: + async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules: rows = await self.db_pool.simple_select_list( table="push_rules", keyvalues={"user_name": user_id}, @@ -183,7 +173,6 @@ class PushRulesWorkerStore( return _load_rules(rows, enabled_map, self.hs.config.experimental) - @cached(max_entries=5000) async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]: results = await self.db_pool.simple_select_list( table="push_rules_enable", @@ -216,11 +205,11 @@ class PushRulesWorkerStore( @cachedList(cached_method_name="get_push_rules_for_user", list_name="user_ids") async def bulk_get_push_rules( self, user_ids: Collection[str] - ) -> Dict[str, List[JsonDict]]: + ) -> Dict[str, FilteredPushRules]: if not user_ids: return {} - results: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} + raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} rows = await self.db_pool.simple_select_many_batch( table="push_rules", @@ -234,20 +223,19 @@ class PushRulesWorkerStore( rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) for row in rows: - results.setdefault(row["user_name"], []).append(row) + raw_rules.setdefault(row["user_name"], []).append(row) enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids) - for user_id, rules in results.items(): + results: Dict[str, FilteredPushRules] = {} + + for user_id, rules in raw_rules.items(): results[user_id] = _load_rules( rules, enabled_map_by_user.get(user_id, {}), self.hs.config.experimental ) return results - @cachedList( - cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids" - ) async def bulk_get_push_rules_enabled( self, user_ids: Collection[str] ) -> Dict[str, Dict[str, bool]]: @@ -262,6 +250,7 @@ class PushRulesWorkerStore( iterable=user_ids, retcols=("user_name", "rule_id", "enabled"), desc="bulk_get_push_rules_enabled", + batch_size=1000, ) for row in rows: enabled = bool(row["enabled"]) @@ -345,8 +334,8 @@ class PushRuleStore(PushRulesWorkerStore): user_id: str, rule_id: str, priority_class: int, - conditions: List[Dict[str, str]], - actions: List[Union[JsonDict, str]], + conditions: Sequence[Mapping[str, str]], + actions: Sequence[Union[Mapping[str, Any], str]], before: Optional[str] = None, after: Optional[str] = None, ) -> None: @@ -808,7 +797,6 @@ class PushRuleStore(PushRulesWorkerStore): self.db_pool.simple_insert_txn(txn, "push_rules_stream", values=values) txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,)) - txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,)) txn.call_after( self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) @@ -817,7 +805,7 @@ class PushRuleStore(PushRulesWorkerStore): return self._push_rules_stream_id_gen.get_current_token() async def copy_push_rule_from_room_to_room( - self, new_room_id: str, user_id: str, rule: dict + self, new_room_id: str, user_id: str, rule: PushRule ) -> None: """Copy a single push rule from one room to another for a specific user. @@ -827,21 +815,27 @@ class PushRuleStore(PushRulesWorkerStore): rule: A push rule. """ # Create new rule id - rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) + rule_id_scope = "/".join(rule.rule_id.split("/")[:-1]) new_rule_id = rule_id_scope + "/" + new_room_id + new_conditions = [] + # Change room id in each condition - for condition in rule.get("conditions", []): + for condition in rule.conditions: + new_condition = condition if condition.get("key") == "room_id": - condition["pattern"] = new_room_id + new_condition = dict(condition) + new_condition["pattern"] = new_room_id + + new_conditions.append(new_condition) # Add the rule for the new room await self.add_push_rule( user_id=user_id, rule_id=new_rule_id, - priority_class=rule["priority_class"], - conditions=rule["conditions"], - actions=rule["actions"], + priority_class=rule.priority_class, + conditions=new_conditions, + actions=rule.actions, ) async def copy_push_rules_from_room_to_room_for_user( @@ -859,8 +853,11 @@ class PushRuleStore(PushRulesWorkerStore): user_push_rules = await self.get_push_rules_for_user(user_id) # Get rules relating to the old room and copy them to the new room - for rule in user_push_rules: - conditions = rule.get("conditions", []) + for rule, enabled in user_push_rules.rules(): + if not enabled: + continue + + conditions = rule.conditions if any( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index bd0cfa7f32..fee37b9ce4 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py
@@ -27,13 +27,18 @@ from typing import ( ) from synapse.push import PusherConfig, ThrottleParams +from synapse.replication.tcp.streams import PushersStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + AbstractStreamIdTracker, + StreamIdGenerator, +) from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -52,8 +57,15 @@ class PusherWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) - self._pushers_id_gen = StreamIdGenerator( - db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] + + # In the worker store this is an ID tracker which we overwrite in the non-worker + # class below that is used on the main process. + self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + db_conn, + "pushers", + "id", + extra_tables=[("deleted_pushers", "stream_id")], + is_writer=hs.config.worker.worker_app is None, ) self.db_pool.updates.register_background_update_handler( @@ -89,8 +101,23 @@ class PusherWorkerStore(SQLBaseStore): ) continue + # If we're using SQLite, then boolean values are integers. This is + # troublesome since some code using the return value of this method might + # expect it to be a boolean, or will expose it to clients (in responses). + r["enabled"] = bool(r["enabled"]) + yield PusherConfig(**r) + def get_pushers_stream_token(self) -> int: + return self._pushers_id_gen.get_current_token() + + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: + if stream_name == PushersStream.NAME: + self._pushers_id_gen.advance(instance_name, token) + return super().process_replication_rows(stream_name, instance_name, token, rows) + async def get_pushers_by_app_id_and_pushkey( self, app_id: str, pushkey: str ) -> Iterator[PusherConfig]: @@ -100,38 +127,52 @@ class PusherWorkerStore(SQLBaseStore): return await self.get_pushers_by({"user_name": user_id}) async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConfig]: - ret = await self.db_pool.simple_select_list( - "pushers", - keyvalues, - [ - "id", - "user_name", - "access_token", - "profile_tag", - "kind", - "app_id", - "app_display_name", - "device_display_name", - "pushkey", - "ts", - "lang", - "data", - "last_stream_ordering", - "last_success", - "failing_since", - ], + """Retrieve pushers that match the given criteria. + + Args: + keyvalues: A {column: value} dictionary. + + Returns: + The pushers for which the given columns have the given values. + """ + + def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + # We could technically use simple_select_list here, but we need to call + # COALESCE on the 'enabled' column. While it is technically possible to give + # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it + # feels a bit hacky, so it's probably better to just inline the query. + sql = """ + SELECT + id, user_name, access_token, profile_tag, kind, app_id, + app_display_name, device_display_name, pushkey, ts, lang, data, + last_stream_ordering, last_success, failing_since, + COALESCE(enabled, TRUE) AS enabled, device_id + FROM pushers + """ + + sql += "WHERE %s" % (" AND ".join("%s = ?" % (k,) for k in keyvalues),) + + txn.execute(sql, list(keyvalues.values())) + + return self.db_pool.cursor_to_dict(txn) + + ret = await self.db_pool.runInteraction( desc="get_pushers_by", + func=get_pushers_by_txn, ) + return self._decode_pushers_rows(ret) - async def get_all_pushers(self) -> Iterator[PusherConfig]: - def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]: - txn.execute("SELECT * FROM pushers") + async def get_enabled_pushers(self) -> Iterator[PusherConfig]: + def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]: + txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)") rows = self.db_pool.cursor_to_dict(txn) return self._decode_pushers_rows(rows) - return await self.db_pool.runInteraction("get_all_pushers", get_pushers) + return await self.db_pool.runInteraction( + "get_enabled_pushers", get_enabled_pushers_txn + ) async def get_all_updated_pushers_rows( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -458,9 +499,77 @@ class PusherWorkerStore(SQLBaseStore): return number_deleted -class PusherStore(PusherWorkerStore): - def get_pushers_stream_token(self) -> int: - return self._pushers_id_gen.get_current_token() +class PusherBackgroundUpdatesStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + "set_device_id_for_pushers", self._set_device_id_for_pushers + ) + + async def _set_device_id_for_pushers( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to populate the device_id column of the pushers table.""" + last_pusher_id = progress.get("pusher_id", 0) + + def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: + txn.execute( + """ + SELECT p.id, at.device_id + FROM pushers AS p + INNER JOIN access_tokens AS at + ON p.access_token = at.id + WHERE + p.access_token IS NOT NULL + AND at.device_id IS NOT NULL + AND p.id > ? + ORDER BY p.id + LIMIT ? + """, + (last_pusher_id, batch_size), + ) + + rows = self.db_pool.cursor_to_dict(txn) + if len(rows) == 0: + return 0 + + self.db_pool.simple_update_many_txn( + txn=txn, + table="pushers", + key_names=("id",), + key_values=[(row["id"],) for row in rows], + value_names=("device_id",), + value_values=[(row["device_id"],) for row in rows], + ) + + self.db_pool.updates._background_update_progress_txn( + txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["id"]} + ) + + return len(rows) + + nb_processed = await self.db_pool.runInteraction( + "set_device_id_for_pushers", set_device_id_for_pushers_txn + ) + + if nb_processed < batch_size: + await self.db_pool.updates._end_background_update( + "set_device_id_for_pushers" + ) + + return nb_processed + + +class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): + # Because we have write access, this will be a StreamIdGenerator + # (see PusherWorkerStore.__init__) + _pushers_id_gen: AbstractStreamIdGenerator async def add_pusher( self, @@ -476,6 +585,8 @@ class PusherStore(PusherWorkerStore): data: Optional[JsonDict], last_stream_ordering: int, profile_tag: str = "", + enabled: bool = True, + device_id: Optional[str] = None, ) -> None: async with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on @@ -494,6 +605,8 @@ class PusherStore(PusherWorkerStore): "last_stream_ordering": last_stream_ordering, "profile_tag": profile_tag, "id": stream_id, + "enabled": enabled, + "device_id": device_id, }, desc="add_pusher", lock=False, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0090c9f225..a580e4bdda 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py
@@ -27,7 +27,6 @@ from typing import ( ) from synapse.api.constants import EduTypes -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import ReceiptsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -61,6 +60,9 @@ class ReceiptsWorkerStore(SQLBaseStore): hs: "HomeServer", ): self._instance_name = hs.get_instance_name() + + # In the worker store this is an ID tracker which we overwrite in the non-worker + # class below that is used on the main process. self._receipts_id_gen: AbstractStreamIdTracker if isinstance(database.engine, PostgresEngine): @@ -87,14 +89,12 @@ class ReceiptsWorkerStore(SQLBaseStore): # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets # updated over replication. (Multiple writers are not supported for # SQLite). - if hs.get_instance_name() in hs.config.worker.writers.receipts: - self._receipts_id_gen = StreamIdGenerator( - db_conn, "receipts_linearized", "stream_id" - ) - else: - self._receipts_id_gen = SlavedIdTracker( - db_conn, "receipts_linearized", "stream_id" - ) + self._receipts_id_gen = StreamIdGenerator( + db_conn, + "receipts_linearized", + "stream_id", + is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts, + ) super().__init__(database, db_conn, hs) @@ -117,34 +117,7 @@ class ReceiptsWorkerStore(SQLBaseStore): """Get the current max stream ID for receipts stream""" return self._receipts_id_gen.get_current_token() - async def get_last_receipt_event_id_for_user( - self, user_id: str, room_id: str, receipt_types: Collection[str] - ) -> Optional[str]: - """ - Fetch the event ID for the latest receipt in a room with one of the given receipt types. - - Args: - user_id: The user to fetch receipts for. - room_id: The room ID to fetch the receipt for. - receipt_type: The receipt types to fetch. - - Returns: - The latest receipt, if one exists. - """ - result = await self.db_pool.runInteraction( - "get_last_receipt_event_id_for_user", - self.get_last_receipt_for_user_txn, - user_id, - room_id, - receipt_types, - ) - if not result: - return None - - event_id, _ = result - return event_id - - def get_last_receipt_for_user_txn( + def get_last_unthreaded_receipt_for_user_txn( self, txn: LoggingTransaction, user_id: str, @@ -152,16 +125,16 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_types: Collection[str], ) -> Optional[Tuple[str, int]]: """ - Fetch the event ID and stream_ordering for the latest receipt in a room - with one of the given receipt types. + Fetch the event ID and stream_ordering for the latest unthreaded receipt + in a room with one of the given receipt types. Args: user_id: The user to fetch receipts for. room_id: The room ID to fetch the receipt for. - receipt_type: The receipt types to fetch. + receipt_types: The receipt types to fetch. Returns: - The latest receipt, if one exists. + The event ID and stream ordering of the latest receipt, if one exists. """ clause, args = make_in_list_sql_clause( @@ -175,6 +148,7 @@ class ReceiptsWorkerStore(SQLBaseStore): WHERE {clause} AND user_id = ? AND room_id = ? + AND thread_id IS NULL ORDER BY stream_ordering DESC LIMIT 1 """ @@ -426,6 +400,8 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type = event_entry.setdefault(row["receipt_type"], {}) receipt_type[row["user_id"]] = db_to_json(row["data"]) + if row["thread_id"]: + receipt_type[row["user_id"]]["thread_id"] = row["thread_id"] results = { room_id: [results[room_id]] if room_id in results else [] @@ -522,7 +498,9 @@ class ReceiptsWorkerStore(SQLBaseStore): async def get_all_updated_receipts( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[ + List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], int, bool + ]: """Get updates for receipts replication stream. Args: @@ -549,9 +527,13 @@ class ReceiptsWorkerStore(SQLBaseStore): def get_all_updated_receipts_txn( txn: LoggingTransaction, - ) -> Tuple[List[Tuple[int, list]], int, bool]: + ) -> Tuple[ + List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], + int, + bool, + ]: sql = """ - SELECT stream_id, room_id, receipt_type, user_id, event_id, data + SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data FROM receipts_linearized WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC @@ -560,8 +542,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, (last_id, current_id, limit)) updates = cast( - List[Tuple[int, list]], - [(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn], + List[Tuple[int, Tuple[str, str, str, str, Optional[str], JsonDict]]], + [(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn], ) limited = False @@ -613,6 +595,7 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type: str, user_id: str, event_id: str, + thread_id: Optional[str], data: JsonDict, stream_id: int, ) -> Optional[int]: @@ -639,12 +622,27 @@ class ReceiptsWorkerStore(SQLBaseStore): # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts if stream_ordering is not None: - sql = ( - "SELECT stream_ordering, event_id FROM events" - " INNER JOIN receipts_linearized AS r USING (event_id, room_id)" - " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + if thread_id is None: + thread_clause = "r.thread_id IS NULL" + thread_args: Tuple[str, ...] = () + else: + thread_clause = "r.thread_id = ?" + thread_args = (thread_id,) + + sql = f""" + SELECT stream_ordering, event_id FROM events + INNER JOIN receipts_linearized AS r USING (event_id, room_id) + WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause} + """ + txn.execute( + sql, + ( + room_id, + receipt_type, + user_id, + ) + + thread_args, ) - txn.execute(sql, (room_id, receipt_type, user_id)) for so, eid in txn: if int(so) >= stream_ordering: @@ -664,22 +662,28 @@ class ReceiptsWorkerStore(SQLBaseStore): self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) + keyvalues = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + where_clause = "" + if thread_id is None: + where_clause = "thread_id IS NULL" + else: + keyvalues["thread_id"] = thread_id + self.db_pool.simple_upsert_txn( txn, table="receipts_linearized", - keyvalues={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - }, + keyvalues=keyvalues, values={ "stream_id": stream_id, "event_id": event_id, + "event_stream_ordering": stream_ordering, "data": json_encoder.encode(data), }, - # receipts_linearized has a unique constraint on - # (user_id, room_id, receipt_type), so no need to lock - lock=False, + where_clause=where_clause, ) return rx_ts @@ -728,6 +732,7 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type: str, user_id: str, event_ids: List[str], + thread_id: Optional[str], data: dict, ) -> Optional[Tuple[int, int]]: """Insert a receipt, either from local client or remote server. @@ -760,6 +765,7 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type, user_id, linearized_event_id, + thread_id, data, stream_id=stream_id, # Read committed is actually beneficial here because we check for a receipt with @@ -774,7 +780,8 @@ class ReceiptsWorkerStore(SQLBaseStore): now = self._clock.time_msec() logger.debug( - "RR for event %s in %s (%i ms old)", + "Receipt %s for event %s in %s (%i ms old)", + receipt_type, linearized_event_id, room_id, now - event_ts, @@ -787,6 +794,7 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type, user_id, event_ids, + thread_id, data, ) @@ -801,6 +809,7 @@ class ReceiptsWorkerStore(SQLBaseStore): receipt_type: str, user_id: str, event_ids: List[str], + thread_id: Optional[str], data: JsonDict, ) -> None: assert self._can_write_to_receipts @@ -812,27 +821,246 @@ class ReceiptsWorkerStore(SQLBaseStore): # FIXME: This shouldn't invalidate the whole cache txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,)) - self.db_pool.simple_delete_txn( - txn, - table="receipts_graph", - keyvalues={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - }, - ) - self.db_pool.simple_insert_txn( + keyvalues = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + where_clause = "" + if thread_id is None: + where_clause = "thread_id IS NULL" + else: + keyvalues["thread_id"] = thread_id + + self.db_pool.simple_upsert_txn( txn, table="receipts_graph", + keyvalues=keyvalues, values={ - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, "event_ids": json_encoder.encode(event_ids), "data": json_encoder.encode(data), }, + where_clause=where_clause, + ) + + +class ReceiptsBackgroundUpdateStore(SQLBaseStore): + POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering" + RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME = "receipts_linearized_unique_index" + RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME = "receipts_graph_unique_index" + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + self._populate_receipt_event_stream_ordering, + ) + self.db_pool.updates.register_background_update_handler( + self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME, + self._background_receipts_linearized_unique_index, + ) + self.db_pool.updates.register_background_update_handler( + self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME, + self._background_receipts_graph_unique_index, + ) + + async def _populate_receipt_event_stream_ordering( + self, progress: JsonDict, batch_size: int + ) -> int: + def _populate_receipt_event_stream_ordering_txn( + txn: LoggingTransaction, + ) -> bool: + + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM receipts_linearized") + res = txn.fetchone() + if res is None or res[0] is None: + return True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + sql = """ + UPDATE receipts_linearized + SET event_stream_ordering = ( + SELECT stream_ordering + FROM events + WHERE event_id = receipts_linearized.event_id + ) + WHERE stream_id >= ? AND stream_id < ? + """ + txn.execute(sql, (start, stop)) + + self.db_pool.updates._background_update_progress_txn( + txn, + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, + ) + + return stop > max_stream_id + + finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _populate_receipt_event_stream_ordering_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING + ) + + return batch_size + + async def _create_receipts_index(self, index_name: str, table: str) -> None: + """Adds a unique index on `(room_id, receipt_type, user_id)` to the given + receipts table, for non-thread receipts.""" + + def _create_index(conn: LoggingDatabaseConnection) -> None: + conn.rollback() + + # we have to set autocommit, because postgres refuses to + # CREATE INDEX CONCURRENTLY without it. + if isinstance(self.database_engine, PostgresEngine): + conn.set_session(autocommit=True) + + try: + c = conn.cursor() + + # Now that the duplicates are gone, we can create the index. + concurrently = ( + "CONCURRENTLY" + if isinstance(self.database_engine, PostgresEngine) + else "" + ) + sql = f""" + CREATE UNIQUE INDEX {concurrently} {index_name} + ON {table}(room_id, receipt_type, user_id) + WHERE thread_id IS NULL + """ + c.execute(sql) + finally: + if isinstance(self.database_engine, PostgresEngine): + conn.set_session(autocommit=False) + + await self.db_pool.runWithConnection(_create_index) + + async def _background_receipts_linearized_unique_index( + self, progress: dict, batch_size: int + ) -> int: + """Removes duplicate receipts and adds a unique index on + `(room_id, receipt_type, user_id)` to `receipts_linearized`, for non-thread + receipts.""" + + def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: + # Identify any duplicate receipts arising from + # https://github.com/matrix-org/synapse/issues/14406. + # We expect the following query to use the per-thread receipt index and take + # less than a minute. + sql = """ + SELECT MAX(stream_id), room_id, receipt_type, user_id + FROM receipts_linearized + WHERE thread_id IS NULL + GROUP BY room_id, receipt_type, user_id + HAVING COUNT(*) > 1 + """ + txn.execute(sql) + duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn)) + + # Then remove duplicate receipts, keeping the one with the highest + # `stream_id`. There should only be a single receipt with any given + # `stream_id`. + for max_stream_id, room_id, receipt_type, user_id in duplicate_keys: + sql = """ + DELETE FROM receipts_linearized + WHERE + room_id = ? AND + receipt_type = ? AND + user_id = ? AND + thread_id IS NULL AND + stream_id < ? + """ + txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id)) + + await self.db_pool.runInteraction( + self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME, + _remote_duplicate_receipts_txn, + ) + + await self._create_receipts_index( + "receipts_linearized_unique_index", + "receipts_linearized", + ) + + await self.db_pool.updates._end_background_update( + self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME + ) + + return 1 + + async def _background_receipts_graph_unique_index( + self, progress: dict, batch_size: int + ) -> int: + """Removes duplicate receipts and adds a unique index on + `(room_id, receipt_type, user_id)` to `receipts_graph`, for non-thread + receipts.""" + + def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: + # Identify any duplicate receipts arising from + # https://github.com/matrix-org/synapse/issues/14406. + # We expect the following query to use the per-thread receipt index and take + # less than a minute. + sql = """ + SELECT room_id, receipt_type, user_id FROM receipts_graph + WHERE thread_id IS NULL + GROUP BY room_id, receipt_type, user_id + HAVING COUNT(*) > 1 + """ + txn.execute(sql) + duplicate_keys = cast(List[Tuple[str, str, str]], list(txn)) + + # Then remove all duplicate receipts. + # We could be clever and try to keep the latest receipt out of every set of + # duplicates, but it's far simpler to remove them all. + for room_id, receipt_type, user_id in duplicate_keys: + sql = """ + DELETE FROM receipts_graph + WHERE + room_id = ? AND + receipt_type = ? AND + user_id = ? AND + thread_id IS NULL + """ + txn.execute(sql, (room_id, receipt_type, user_id)) + + await self.db_pool.runInteraction( + self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME, + _remote_duplicate_receipts_txn, + ) + + await self._create_receipts_index( + "receipts_graph_unique_index", + "receipts_graph", ) + await self.db_pool.updates._end_background_update( + self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME + ) + + return 1 + -class ReceiptsStore(ReceiptsWorkerStore): +class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore): pass diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cb63cd9b7d..31f0f2bd3d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -21,7 +21,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast import attr from synapse.api.constants import UserTypes -from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError +from synapse.api.errors import ( + Codes, + NotFoundError, + StoreError, + SynapseError, + ThreepidValidationError, +) from synapse.config.homeserver import HomeServerConfig from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.database import ( @@ -50,6 +56,14 @@ class ExternalIDReuseException(Exception): because this external id is given to an other user.""" +class LoginTokenExpired(Exception): + """Exception if the login token sent expired""" + + +class LoginTokenReused(Exception): + """Exception if the login token sent was already used""" + + @attr.s(frozen=True, slots=True, auto_attribs=True) class TokenLookupResult: """Result of looking up an access token. @@ -69,9 +83,9 @@ class TokenLookupResult: """ user_id: str + token_id: int is_guest: bool = False shadow_banned: bool = False - token_id: Optional[int] = None device_id: Optional[str] = None valid_until_ms: Optional[int] = None token_owner: str = attr.ib() @@ -115,6 +129,20 @@ class RefreshTokenLookupResult: If None, the session can be refreshed indefinitely.""" +@attr.s(auto_attribs=True, frozen=True, slots=True) +class LoginTokenLookupResult: + """Result of looking up a login token.""" + + user_id: str + """The user this token belongs to.""" + + auth_provider_id: Optional[str] + """The SSO Identity Provider that the user authenticated with, to get this token.""" + + auth_provider_session_id: Optional[str] + """The session ID advertised by the SSO Identity Provider.""" + + class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -166,26 +194,49 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): @cached() async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: """Deprecated: use get_userinfo_by_id instead""" - return await self.db_pool.simple_select_one( - table="users", - keyvalues={"name": user_id}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin", - "consent_version", - "consent_server_notice_sent", - "appservice_id", - "creation_ts", - "user_type", - "deactivated", - "shadow_banned", - ], - allow_none=True, + + def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: + # We could technically use simple_select_one here, but it would not perform + # the COALESCEs (unless hacked into the column names), which could yield + # confusing results. + txn.execute( + """ + SELECT + name, password_hash, is_guest, admin, consent_version, consent_ts, + consent_server_notice_sent, appservice_id, creation_ts, user_type, + deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned, + COALESCE(approved, TRUE) AS approved + FROM users + WHERE name = ? + """, + (user_id,), + ) + + rows = self.db_pool.cursor_to_dict(txn) + + if len(rows) == 0: + return None + + return rows[0] + + row = await self.db_pool.runInteraction( desc="get_user_by_id", + func=get_user_by_id_txn, ) + if row is not None: + # If we're using SQLite our boolean values will be integers. Because we + # present some of this data as is to e.g. server admins via REST APIs, we + # want to make sure we're returning the right type of data. + # Note: when adding a column name to this list, be wary of NULLable columns, + # since NULL values will be turned into False. + boolean_columns = ["admin", "deactivated", "shadow_banned", "approved"] + for column in boolean_columns: + if not isinstance(row[column], bool): + row[column] = bool(row[column]) + + return row + async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]: """Get a UserInfo object for a user by user ID. @@ -902,7 +953,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Returns user id from threepid Args: - txn (cursor): + txn: medium: threepid medium e.g. email address: threepid address e.g. me@example.com @@ -1232,8 +1283,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Sets an expiration date to the account with the given user ID. Args: - user_id (str): User ID to set an expiration date for. - use_delta (bool): If set to False, the expiration date for the user will be + user_id: User ID to set an expiration date for. + use_delta: If set to False, the expiration date for the user will be now + validity period. If set to True, this expiration date will be a random value in the [now + period - d ; now + period] range, d being a delta equal to 10% of the validity period. @@ -1766,6 +1817,130 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "replace_refresh_token", _replace_refresh_token_txn ) + async def add_login_token_to_user( + self, + user_id: str, + token: str, + expiry_ts: int, + auth_provider_id: Optional[str], + auth_provider_session_id: Optional[str], + ) -> None: + """Adds a short-term login token for the given user. + + Args: + user_id: The user ID. + token: The new login token to add. + expiry_ts (milliseconds since the epoch): Time after which the login token + cannot be used. + auth_provider_id: The SSO Identity Provider that the user authenticated with + to get this token, if any + auth_provider_session_id: The session ID advertised by the SSO Identity + Provider, if any. + """ + await self.db_pool.simple_insert( + "login_tokens", + { + "token": token, + "user_id": user_id, + "expiry_ts": expiry_ts, + "auth_provider_id": auth_provider_id, + "auth_provider_session_id": auth_provider_session_id, + }, + desc="add_login_token_to_user", + ) + + def _consume_login_token( + self, + txn: LoggingTransaction, + token: str, + ts: int, + ) -> LoginTokenLookupResult: + values = self.db_pool.simple_select_one_txn( + txn, + "login_tokens", + keyvalues={"token": token}, + retcols=( + "user_id", + "expiry_ts", + "used_ts", + "auth_provider_id", + "auth_provider_session_id", + ), + allow_none=True, + ) + + if values is None: + raise NotFoundError() + + self.db_pool.simple_update_one_txn( + txn, + "login_tokens", + keyvalues={"token": token}, + updatevalues={"used_ts": ts}, + ) + user_id = values["user_id"] + expiry_ts = values["expiry_ts"] + used_ts = values["used_ts"] + auth_provider_id = values["auth_provider_id"] + auth_provider_session_id = values["auth_provider_session_id"] + + # Token was already used + if used_ts is not None: + raise LoginTokenReused() + + # Token expired + if ts > int(expiry_ts): + raise LoginTokenExpired() + + return LoginTokenLookupResult( + user_id=user_id, + auth_provider_id=auth_provider_id, + auth_provider_session_id=auth_provider_session_id, + ) + + async def consume_login_token(self, token: str) -> LoginTokenLookupResult: + """Lookup a login token and consume it. + + Args: + token: The login token. + + Returns: + The data stored with that token, including the `user_id`. Returns `None` if + the token does not exist or if it expired. + + Raises: + NotFound if the login token was not found in database + LoginTokenExpired if the login token expired + LoginTokenReused if the login token was already used + """ + return await self.db_pool.runInteraction( + "consume_login_token", + self._consume_login_token, + token, + self._clock.time_msec(), + ) + + async def invalidate_login_tokens_by_session_id( + self, auth_provider_id: str, auth_provider_session_id: str + ) -> None: + """Invalidate login tokens with the given IdP session ID. + + Args: + auth_provider_id: The SSO Identity Provider that the user authenticated with + to get this token + auth_provider_session_id: The session ID advertised by the SSO Identity + Provider + """ + await self.db_pool.simple_update( + table="login_tokens", + keyvalues={ + "auth_provider_id": auth_provider_id, + "auth_provider_session_id": auth_provider_session_id, + }, + updatevalues={"used_ts": self._clock.time_msec()}, + desc="invalidate_login_tokens_by_session_id", + ) + @cached() async def is_guest(self, user_id: str) -> bool: res = await self.db_pool.simple_select_one_onecol( @@ -1778,6 +1953,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return res if res else False + @cached() + async def is_user_approved(self, user_id: str) -> bool: + """Checks if a user is approved and therefore can be allowed to log in. + + If the user's 'approved' column is NULL, we consider it as true given it means + the user was registered when support for an approval flow was either disabled + or nonexistent. + + Args: + user_id: the user to check the approval status of. + + Returns: + A boolean that is True if the user is approved, False otherwise. + """ + + def is_user_approved_txn(txn: LoggingTransaction) -> bool: + txn.execute( + """ + SELECT COALESCE(approved, TRUE) AS approved FROM users WHERE name = ? + """, + (user_id,), + ) + + rows = self.db_pool.cursor_to_dict(txn) + + # We cast to bool because the value returned by the database engine might + # be an integer if we're using SQLite. + return bool(rows[0]["approved"]) + + return await self.db_pool.runInteraction( + desc="is_user_pending_approval", + func=is_user_approved_txn, + ) + class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): def __init__( @@ -1915,6 +2124,29 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) + def update_user_approval_status_txn( + self, txn: LoggingTransaction, user_id: str, approved: bool + ) -> None: + """Set the user's 'approved' flag to the given value. + + The boolean is turned into an int because the column is a smallint. + + Args: + txn: the current database transaction. + user_id: the user to update the flag for. + approved: the value to set the flag to. + """ + self.db_pool.simple_update_one_txn( + txn=txn, + table="users", + keyvalues={"name": user_id}, + updatevalues={"approved": approved}, + ) + + # Invalidate the caches of methods that read the value of the 'approved' flag. + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) + self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,)) + class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): def __init__( @@ -1932,6 +2164,19 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") + # If support for MSC3866 is enabled and configured to require approval for new + # account, we will create new users with an 'approved' flag set to false. + self._require_approval = ( + hs.config.experimental.msc3866.enabled + and hs.config.experimental.msc3866.require_approval_for_new_accounts + ) + + # Create a background job for removing expired login tokens + if hs.config.worker.run_background_tasks: + self._clock.looping_call( + self._delete_expired_login_tokens, THIRTY_MINUTES_IN_MS + ) + async def add_access_token_to_user( self, user_id: str, @@ -2064,6 +2309,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): admin: bool = False, user_type: Optional[str] = None, shadow_banned: bool = False, + approved: bool = False, ) -> None: """Attempts to register an account. @@ -2082,6 +2328,8 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): or None for a normal user. shadow_banned: Whether the user is shadow-banned, i.e. they may be told their requests succeeded but we ignore them. + approved: Whether to consider the user has already been approved by an + administrator. Raises: StoreError if the user_id could not be registered. @@ -2098,6 +2346,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): admin, user_type, shadow_banned, + approved, ) def _register_user( @@ -2112,11 +2361,14 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): admin: bool, user_type: Optional[str], shadow_banned: bool, + approved: bool, ) -> None: user_id_obj = UserID.from_string(user_id) now = int(self._clock.time()) + user_approved = approved or not self._require_approval + try: if was_guest: # Ensure that the guest user actually exists @@ -2142,6 +2394,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): "admin": 1 if admin else 0, "user_type": user_type, "shadow_banned": shadow_banned, + "approved": user_approved, }, ) else: @@ -2157,6 +2410,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): "admin": 1 if admin else 0, "user_type": user_type, "shadow_banned": shadow_banned, + "approved": user_approved, }, ) @@ -2227,7 +2481,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): txn, table="users", keyvalues={"name": user_id}, - updatevalues={"consent_version": consent_version}, + updatevalues={ + "consent_version": consent_version, + "consent_ts": self._clock.time_msec(), + }, ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -2499,6 +2756,42 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): start_or_continue_validation_session_txn, ) + async def update_user_approval_status( + self, user_id: UserID, approved: bool + ) -> None: + """Set the user's 'approved' flag to the given value. + + The boolean will be turned into an int (in update_user_approval_status_txn) + because the column is a smallint. + + Args: + user_id: the user to update the flag for. + approved: the value to set the flag to. + """ + await self.db_pool.runInteraction( + "update_user_approval_status", + self.update_user_approval_status_txn, + user_id.to_string(), + approved, + ) + + @wrap_as_background_process("delete_expired_login_tokens") + async def _delete_expired_login_tokens(self) -> None: + """Remove login tokens with expiry dates that have passed.""" + + def _delete_expired_login_tokens_txn(txn: LoggingTransaction, ts: int) -> None: + sql = "DELETE FROM login_tokens WHERE expiry_ts <= ?" + txn.execute(sql, (ts,)) + + # We keep the expired tokens for an extra 5 minutes so we can measure how many + # times a token is being used after its expiry + now = self._clock.time_msec() + await self.db_pool.runInteraction( + "delete_expired_login_tokens", + _delete_expired_login_tokens_txn, + now - (5 * 60 * 1000), + ) + def find_max_generated_user_id_localpart(cur: Cursor) -> int: """ diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 7bd27790eb..aea96e9d24 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py
@@ -14,11 +14,13 @@ import logging from typing import ( + TYPE_CHECKING, Collection, Dict, FrozenSet, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -28,19 +30,48 @@ from typing import ( import attr -from synapse.api.constants import RelationTypes +from synapse.api.constants import MAIN_TIMELINE, RelationTypes +from synapse.api.errors import SynapseError from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken from synapse.util.caches.descriptors import cached, cachedList +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @attr.s(slots=True, frozen=True, auto_attribs=True) +class ThreadsNextBatch: + topological_ordering: int + stream_ordering: int + + def __str__(self) -> str: + return f"{self.topological_ordering}_{self.stream_ordering}" + + @classmethod + def from_string(cls, string: str) -> "ThreadsNextBatch": + """ + Creates a ThreadsNextBatch from its textual representation. + """ + try: + keys = (int(s) for s in string.split("_")) + return cls(*keys) + except Exception: + raise SynapseError(400, "Invalid threads token") + + +@attr.s(slots=True, frozen=True, auto_attribs=True) class _RelatedEvent: """ Contains enough information about a related event in order to properly filter @@ -54,6 +85,76 @@ class _RelatedEvent: class RelationsWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + "threads_backfill", self._backfill_threads + ) + + async def _backfill_threads(self, progress: JsonDict, batch_size: int) -> int: + """Backfill the threads table.""" + + def threads_backfill_txn(txn: LoggingTransaction) -> int: + last_thread_id = progress.get("last_thread_id", "") + + # Get the latest event in each thread by topo ordering / stream ordering. + # + # Note that the MAX(event_id) is needed to abide by the rules of group by, + # but doesn't actually do anything since there should only be a single event + # ID per topo/stream ordering pair. + sql = f""" + SELECT room_id, relates_to_id, MAX(topological_ordering), MAX(stream_ordering), MAX(event_id) + FROM event_relations + INNER JOIN events USING (event_id) + WHERE + relates_to_id > ? AND + relation_type = '{RelationTypes.THREAD}' + GROUP BY room_id, relates_to_id + ORDER BY relates_to_id + LIMIT ? + """ + txn.execute(sql, (last_thread_id, batch_size)) + + # No more rows to process. + rows = txn.fetchall() + if not rows: + return 0 + + # Insert the rows into the threads table. If a matching thread already exists, + # assume it is from a newer event. + sql = """ + INSERT INTO threads (room_id, thread_id, topological_ordering, stream_ordering, latest_event_id) + VALUES %s + ON CONFLICT (room_id, thread_id) + DO NOTHING + """ + if isinstance(txn.database_engine, PostgresEngine): + txn.execute_values(sql % ("?",), rows, fetch=False) + else: + txn.execute_batch(sql % ("(?, ?, ?, ?, ?)",), rows) + + # Mark the progress. + self.db_pool.updates._background_update_progress_txn( + txn, "threads_backfill", {"last_thread_id": rows[-1][1]} + ) + + return txn.rowcount + + result = await self.db_pool.runInteraction( + "threads_backfill", threads_backfill_txn + ) + + if not result: + await self.db_pool.updates._end_background_update("threads_backfill") + + return result + @cached(uncached_args=("event",), tree=True) async def get_relations_for_event( self, @@ -91,6 +192,9 @@ class RelationsWorkerStore(SQLBaseStore): # it. The `event_id` must match the `event.event_id`. assert event.event_id == event_id + # Ensure bad limits aren't being passed in. + assert limit >= 0 + where_clause = ["relates_to_id = ?", "room_id = ?"] where_args: List[Union[str, int]] = [event.event_id, room_id] is_redacted = event.internal_metadata.is_redacted() @@ -139,21 +243,40 @@ class RelationsWorkerStore(SQLBaseStore): ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]: txn.execute(sql, where_args + [limit + 1]) - last_topo_id = None - last_stream_id = None events = [] - for row in txn: + topo_orderings: List[int] = [] + stream_orderings: List[int] = [] + for event_id, relation_type, sender, topo_ordering, stream_ordering in cast( + List[Tuple[str, str, str, int, int]], txn + ): # Do not include edits for redacted events as they leak event # content. - if not is_redacted or row[1] != RelationTypes.REPLACE: - events.append(_RelatedEvent(row[0], row[2])) - last_topo_id = row[3] - last_stream_id = row[4] + if not is_redacted or relation_type != RelationTypes.REPLACE: + events.append(_RelatedEvent(event_id, sender)) + topo_orderings.append(topo_ordering) + stream_orderings.append(stream_ordering) - # If there are more events, generate the next pagination key. + # If there are more events, generate the next pagination key from the + # last event returned. next_token = None - if len(events) > limit and last_topo_id and last_stream_id: - next_key = RoomStreamToken(last_topo_id, last_stream_id) + if len(events) > limit: + # Instead of using the last row (which tells us there is more + # data), use the last row to be returned. + events = events[:limit] + topo_orderings = topo_orderings[:limit] + stream_orderings = stream_orderings[:limit] + + topo = topo_orderings[-1] + token = stream_orderings[-1] + if direction == "b": + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + token -= 1 + next_key = RoomStreamToken(topo, token) + if from_token: next_token = from_token.copy_and_replace( StreamKeyType.ROOM, next_key @@ -177,6 +300,42 @@ class RelationsWorkerStore(SQLBaseStore): "get_recent_references_for_event", _get_recent_references_for_event_txn ) + async def get_all_relations_for_event_with_types( + self, + event_id: str, + relation_types: List[str], + ) -> List[str]: + """Get the event IDs of all events that have a relation to the given event with + one of the given relation types. + + Args: + event_id: The event for which to look for related events. + relation_types: The types of relations to look for. + + Returns: + A list of the IDs of the events that relate to the given event with one of + the given relation types. + """ + + def get_all_relation_ids_for_event_with_types_txn( + txn: LoggingTransaction, + ) -> List[str]: + rows = self.db_pool.simple_select_many_txn( + txn=txn, + table="event_relations", + column="relation_type", + iterable=relation_types, + keyvalues={"relates_to_id": event_id}, + retcols=["event_id"], + ) + + return [row["event_id"] for row in rows] + + return await self.db_pool.runInteraction( + desc="get_all_relation_ids_for_event_with_types", + func=get_all_relation_ids_for_event_with_types_txn, + ) + async def event_includes_relation(self, event_id: str) -> bool: """Check if the given event relates to another event. @@ -240,112 +399,196 @@ class RelationsWorkerStore(SQLBaseStore): ) return result is not None - @cached(tree=True) - async def get_aggregation_groups_for_event( - self, event_id: str, room_id: str, limit: int = 5 - ) -> List[JsonDict]: - """Get a list of annotations on the event, grouped by event type and + @cached() + async def get_aggregation_groups_for_event(self, event_id: str) -> List[JsonDict]: + raise NotImplementedError() + + @cachedList( + cached_method_name="get_aggregation_groups_for_event", list_name="event_ids" + ) + async def get_aggregation_groups_for_events( + self, event_ids: Collection[str] + ) -> Mapping[str, Optional[List[JsonDict]]]: + """Get a list of annotations on the given events, grouped by event type and aggregation key, sorted by count. This is used e.g. to get the what and how many reactions have happend on an event. Args: - event_id: Fetch events that relate to this event ID. - room_id: The room the event belongs to. - limit: Only fetch the `limit` groups. + event_ids: Fetch events that relate to these event IDs. Returns: - List of groups of annotations that match. Each row is a dict with - `type`, `key` and `count` fields. + A map of event IDs to a list of groups of annotations that match. + Each entry is a dict with `type`, `key` and `count` fields. """ + # The number of entries to return per event ID. + limit = 5 - args = [ - event_id, - room_id, - RelationTypes.ANNOTATION, - limit, - ] + clause, args = make_in_list_sql_clause( + self.database_engine, "relates_to_id", event_ids + ) + args.append(RelationTypes.ANNOTATION) - sql = """ - SELECT type, aggregation_key, COUNT(DISTINCT sender) - FROM event_relations - INNER JOIN events USING (event_id) - WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? - GROUP BY relation_type, type, aggregation_key - ORDER BY COUNT(*) DESC - LIMIT ? + sql = f""" + SELECT + relates_to_id, + annotation.type, + aggregation_key, + COUNT(DISTINCT annotation.sender) + FROM events AS annotation + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS parent ON + parent.event_id = relates_to_id + AND parent.room_id = annotation.room_id + WHERE + {clause} + AND relation_type = ? + GROUP BY relates_to_id, annotation.type, aggregation_key + ORDER BY relates_to_id, COUNT(*) DESC """ - def _get_aggregation_groups_for_event_txn( + def _get_aggregation_groups_for_events_txn( txn: LoggingTransaction, - ) -> List[JsonDict]: + ) -> Mapping[str, List[JsonDict]]: txn.execute(sql, args) - return [{"type": row[0], "key": row[1], "count": row[2]} for row in txn] + result: Dict[str, List[JsonDict]] = {} + for event_id, type, key, count in cast( + List[Tuple[str, str, str, int]], txn + ): + event_results = result.setdefault(event_id, []) + + # Limit the number of results per event ID. + if len(event_results) == limit: + continue + + event_results.append({"type": type, "key": key, "count": count}) + + return result return await self.db_pool.runInteraction( - "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn + "get_aggregation_groups_for_events", _get_aggregation_groups_for_events_txn ) async def get_aggregation_groups_for_users( - self, - event_id: str, - room_id: str, - limit: int, - users: FrozenSet[str] = frozenset(), - ) -> Dict[Tuple[str, str], int]: + self, event_ids: Collection[str], users: FrozenSet[str] + ) -> Dict[str, Dict[Tuple[str, str], int]]: """Fetch the partial aggregations for an event for specific users. This is used, in conjunction with get_aggregation_groups_for_event, to remove information from the results for ignored users. Args: - event_id: Fetch events that relate to this event ID. - room_id: The room the event belongs to. - limit: Only fetch the `limit` groups. + event_ids: Fetch events that relate to these event IDs. users: The users to fetch information for. Returns: - A map of (event type, aggregation key) to a count of users. + A map of event ID to a map of (event type, aggregation key) to a + count of users. """ if not users: return {} - args: List[Union[str, int]] = [ - event_id, - room_id, - RelationTypes.ANNOTATION, - ] + events_sql, args = make_in_list_sql_clause( + self.database_engine, "relates_to_id", event_ids + ) users_sql, users_args = make_in_list_sql_clause( - self.database_engine, "sender", users + self.database_engine, "annotation.sender", users ) args.extend(users_args) + args.append(RelationTypes.ANNOTATION) sql = f""" - SELECT type, aggregation_key, COUNT(DISTINCT sender) - FROM event_relations - INNER JOIN events USING (event_id) - WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? AND {users_sql} - GROUP BY relation_type, type, aggregation_key - ORDER BY COUNT(*) DESC - LIMIT ? + SELECT + relates_to_id, + annotation.type, + aggregation_key, + COUNT(DISTINCT annotation.sender) + FROM events AS annotation + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS parent ON + parent.event_id = relates_to_id + AND parent.room_id = annotation.room_id + WHERE {events_sql} AND {users_sql} AND relation_type = ? + GROUP BY relates_to_id, annotation.type, aggregation_key + ORDER BY relates_to_id, COUNT(*) DESC """ def _get_aggregation_groups_for_users_txn( txn: LoggingTransaction, - ) -> Dict[Tuple[str, str], int]: - txn.execute(sql, args + [limit]) + ) -> Dict[str, Dict[Tuple[str, str], int]]: + txn.execute(sql, args) - return {(row[0], row[1]): row[2] for row in txn} + result: Dict[str, Dict[Tuple[str, str], int]] = {} + for event_id, type, key, count in cast( + List[Tuple[str, str, str, int]], txn + ): + result.setdefault(event_id, {})[(type, key)] = count + + return result return await self.db_pool.runInteraction( "get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn ) @cached() + async def get_references_for_event(self, event_id: str) -> List[JsonDict]: + raise NotImplementedError() + + @cachedList(cached_method_name="get_references_for_event", list_name="event_ids") + async def get_references_for_events( + self, event_ids: Collection[str] + ) -> Mapping[str, Optional[List[_RelatedEvent]]]: + """Get a list of references to the given events. + + Args: + event_ids: Fetch events that relate to these event IDs. + + Returns: + A map of event IDs to a list of related event IDs (and their senders). + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "relates_to_id", event_ids + ) + args.append(RelationTypes.REFERENCE) + + sql = f""" + SELECT relates_to_id, ref.event_id, ref.sender + FROM events AS ref + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS parent ON + parent.event_id = relates_to_id + AND parent.room_id = ref.room_id + WHERE + {clause} + AND relation_type = ? + ORDER BY ref.topological_ordering, ref.stream_ordering + """ + + def _get_references_for_events_txn( + txn: LoggingTransaction, + ) -> Mapping[str, List[_RelatedEvent]]: + txn.execute(sql, args) + + result: Dict[str, List[_RelatedEvent]] = {} + for relates_to_id, event_id, sender in cast( + List[Tuple[str, str, str]], txn + ): + result.setdefault(relates_to_id, []).append( + _RelatedEvent(event_id, sender) + ) + + return result + + return await self.db_pool.runInteraction( + "_get_references_for_events_txn", _get_references_for_events_txn + ) + + @cached() def get_applicable_edit(self, event_id: str) -> Optional[EventBase]: raise NotImplementedError() @@ -761,57 +1004,192 @@ class RelationsWorkerStore(SQLBaseStore): "get_if_user_has_annotated_event", _get_if_user_has_annotated_event ) - @cached(iterable=True) - async def get_mutual_event_relations_for_rel_type( - self, event_id: str, relation_type: str - ) -> Set[Tuple[str, str]]: - raise NotImplementedError() + @cached(tree=True) + async def get_threads( + self, + room_id: str, + limit: int = 5, + from_token: Optional[ThreadsNextBatch] = None, + ) -> Tuple[List[str], Optional[ThreadsNextBatch]]: + """Get a list of thread IDs, ordered by topological ordering of their + latest reply. - @cachedList( - cached_method_name="get_mutual_event_relations_for_rel_type", - list_name="relation_types", - ) - async def get_mutual_event_relations( - self, event_id: str, relation_types: Collection[str] - ) -> Dict[str, Set[Tuple[str, str]]]: + Args: + room_id: The room the event belongs to. + limit: Only fetch the most recent `limit` threads. + from_token: Fetch rows from a previous next_batch, or from the start if None. + + Returns: + A tuple of: + A list of thread root event IDs. + + The next_batch, if one exists. + """ + # Generate the pagination clause, if necessary. + # + # Find any threads where the latest reply is equal / before the last + # thread's topo ordering and earlier in stream ordering. + pagination_clause = "" + pagination_args: tuple = () + if from_token: + pagination_clause = "AND topological_ordering <= ? AND stream_ordering < ?" + pagination_args = ( + from_token.topological_ordering, + from_token.stream_ordering, + ) + + sql = f""" + SELECT thread_id, topological_ordering, stream_ordering + FROM threads + WHERE + room_id = ? + {pagination_clause} + ORDER BY topological_ordering DESC, stream_ordering DESC + LIMIT ? + """ + + def _get_threads_txn( + txn: LoggingTransaction, + ) -> Tuple[List[str], Optional[ThreadsNextBatch]]: + txn.execute(sql, (room_id, *pagination_args, limit + 1)) + + rows = cast(List[Tuple[str, int, int]], txn.fetchall()) + thread_ids = [r[0] for r in rows] + + # If there are more events, generate the next pagination key from the + # last thread which will be returned. + next_token = None + if len(thread_ids) > limit: + last_topo_id = rows[-2][1] + last_stream_id = rows[-2][2] + next_token = ThreadsNextBatch(last_topo_id, last_stream_id) + + return thread_ids[:limit], next_token + + return await self.db_pool.runInteraction("get_threads", _get_threads_txn) + + @cached() + async def get_thread_id(self, event_id: str) -> str: """ - Fetch event metadata for events which related to the same event as the given event. + Get the thread ID for an event. This considers multi-level relations, + e.g. an annotation to an event which is part of a thread. - If the given event has no relation information, returns an empty dictionary. + It only searches up the relations tree, i.e. it only searches for events + which the given event is related to (and which those events are related + to, etc.) + + Given the following DAG: + + A <---[m.thread]-- B <--[m.annotation]-- C + ^ + |--[m.reference]-- D <--[m.annotation]-- E + + get_thread_id(X) considers events B and C as part of thread A. + + See also get_thread_id_for_receipts. Args: - event_id: The event ID which is targeted by relations. - relation_types: The relation types to check for mutual relations. + event_id: The event ID to fetch the thread ID for. Returns: - A dictionary of relation type to: - A set of tuples of: - The sender - The event type + The event ID of the root event in the thread, if this event is part + of a thread. "main", otherwise. """ - rel_type_sql, rel_type_args = make_in_list_sql_clause( - self.database_engine, "relation_type", relation_types - ) - sql = f""" - SELECT DISTINCT relation_type, sender, type FROM event_relations - INNER JOIN events USING (event_id) - WHERE relates_to_id = ? AND {rel_type_sql} + # Recurse event relations up to the *root* event, then search that chain + # of relations for a thread relation. If one is found, the root event is + # returned. + # + # Note that this should only ever find 0 or 1 entries since it is invalid + # for an event to have a thread relation to an event which also has a + # relation. + sql = """ + WITH RECURSIVE related_events AS ( + SELECT event_id, relates_to_id, relation_type, 0 depth + FROM event_relations + WHERE event_id = ? + UNION SELECT e.event_id, e.relates_to_id, e.relation_type, depth + 1 + FROM event_relations e + INNER JOIN related_events r ON r.relates_to_id = e.event_id + WHERE depth <= 3 + ) + SELECT relates_to_id FROM related_events + WHERE relation_type = 'm.thread' + ORDER BY depth DESC + LIMIT 1; """ - def _get_event_relations( - txn: LoggingTransaction, - ) -> Dict[str, Set[Tuple[str, str]]]: - txn.execute(sql, [event_id] + rel_type_args) - result: Dict[str, Set[Tuple[str, str]]] = { - rel_type: set() for rel_type in relation_types - } - for rel_type, sender, type in txn.fetchall(): - result[rel_type].add((sender, type)) - return result + def _get_thread_id(txn: LoggingTransaction) -> str: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + return row[0] + + # If no thread was found, it is part of the main timeline. + return MAIN_TIMELINE + + return await self.db_pool.runInteraction("get_thread_id", _get_thread_id) + + @cached() + async def get_thread_id_for_receipts(self, event_id: str) -> str: + """ + Get the thread ID for an event by traversing to the top-most related event + and confirming any children events form a thread. + + Given the following DAG: + + A <---[m.thread]-- B <--[m.annotation]-- C + ^ + |--[m.reference]-- D <--[m.annotation]-- E + + get_thread_id_for_receipts(X) considers events A, B, C, D, and E as part + of thread A. + + See also get_thread_id. + + Args: + event_id: The event ID to fetch the thread ID for. + + Returns: + The event ID of the root event in the thread, if this event is part + of a thread. "main", otherwise. + """ + + # Recurse event relations up to the *root* event, then search for any events + # related to that root node for a thread relation. If one is found, the + # root event is returned. + # + # Note that there cannot be thread relations in the middle of the chain since + # it is invalid for an event to have a thread relation to an event which also + # has a relation. + sql = """ + SELECT relates_to_id FROM event_relations WHERE relates_to_id = COALESCE(( + WITH RECURSIVE related_events AS ( + SELECT event_id, relates_to_id, relation_type, 0 depth + FROM event_relations + WHERE event_id = ? + UNION SELECT e.event_id, e.relates_to_id, e.relation_type, depth + 1 + FROM event_relations e + INNER JOIN related_events r ON r.relates_to_id = e.event_id + WHERE depth <= 3 + ) + SELECT relates_to_id FROM related_events + ORDER BY depth DESC + LIMIT 1 + ), ?) AND relation_type = 'm.thread' LIMIT 1; + """ + + def _get_related_thread_id(txn: LoggingTransaction) -> str: + txn.execute(sql, (event_id, event_id)) + row = txn.fetchone() + if row: + return row[0] + + # If no thread was found, it is part of the main timeline. + return MAIN_TIMELINE return await self.db_pool.runInteraction( - "get_event_relations", _get_event_relations + "get_related_thread_id", _get_related_thread_id ) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 0f1f0d11ea..52ad947c6c 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py
@@ -25,6 +25,7 @@ from typing import ( List, Mapping, Optional, + Sequence, Tuple, Union, cast, @@ -96,6 +97,12 @@ class RoomSortOrder(Enum): STATE_EVENTS = "state_events" +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PartialStateResyncInfo: + joined_via: Optional[str] + servers_in_room: List[str] = attr.ib(factory=list) + + class RoomWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -206,21 +213,30 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): def _construct_room_type_where_clause( self, room_types: Union[List[Union[str, None]], None] - ) -> Tuple[Union[str, None], List[str]]: + ) -> Tuple[Union[str, None], list]: if not room_types: return None, [] - else: - # We use None when we want get rooms without a type - is_null_clause = "" - if None in room_types: - is_null_clause = "OR room_type IS NULL" - room_types = [value for value in room_types if value is not None] + # Since None is used to represent a room without a type, care needs to + # be taken into account when constructing the where clause. + clauses = [] + args: list = [] + + room_types_set = set(room_types) + + # We use None to represent a room without a type. + if None in room_types_set: + clauses.append("room_type IS NULL") + room_types_set.remove(None) + + # If there are other room types, generate the proper clause. + if room_types: list_clause, args = make_in_list_sql_clause( - self.database_engine, "room_type", room_types + self.database_engine, "room_type", room_types_set ) + clauses.append(list_clause) - return f"({list_clause} {is_null_clause})", args + return f"({' OR '.join(clauses)})", args async def count_public_rooms( self, @@ -240,14 +256,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): def _count_public_rooms_txn(txn: LoggingTransaction) -> int: query_args = [] - room_type_clause, args = self._construct_room_type_where_clause( - search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None) - if search_filter - else None - ) - room_type_clause = f" AND {room_type_clause}" if room_type_clause else "" - query_args += args - if network_tuple: if network_tuple.appservice_id: published_sql = """ @@ -267,6 +275,14 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): UNION SELECT room_id from appservice_room_list """ + room_type_clause, args = self._construct_room_type_where_clause( + search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None) + if search_filter + else None + ) + room_type_clause = f" AND {room_type_clause}" if room_type_clause else "" + query_args += args + sql = f""" SELECT COUNT(*) @@ -641,8 +657,10 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): "version": room[5], "creator": room[6], "encryption": room[7], - "federatable": room[8], - "public": room[9], + # room_stats_state.federatable is an integer on sqlite. + "federatable": bool(room[8]), + # rooms.is_public is an integer on sqlite. + "public": bool(room[9]), "join_rules": room[10], "guest_access": room[11], "history_visibility": room[12], @@ -894,7 +912,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): event_json = db_to_json(content_json) content = event_json["content"] content_url = content.get("url") - thumbnail_url = content.get("info", {}).get("thumbnail_url") + info = content.get("info") + if isinstance(info, dict): + thumbnail_url = info.get("thumbnail_url") + else: + thumbnail_url = None for url in (content_url, thumbnail_url): if not url: @@ -1131,17 +1153,46 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): get_rooms_for_retention_period_in_range_txn, ) - async def get_partial_state_rooms_and_servers( + @cached(iterable=True) + async def get_partial_state_servers_at_join(self, room_id: str) -> Sequence[str]: + """Gets the list of servers in a partial state room at the time we joined it. + + Returns: + The `servers_in_room` list from the `/send_join` response for partial state + rooms. May not be accurate or complete, as it comes from a remote + homeserver. + An empty list for full state rooms. + """ + return await self.db_pool.simple_select_onecol( + "partial_state_rooms_servers", + keyvalues={"room_id": room_id}, + retcol="server_name", + desc="get_partial_state_servers_at_join", + ) + + async def get_partial_state_room_resync_info( self, - ) -> Mapping[str, Collection[str]]: - """Get all rooms containing events with partial state, and the servers known - to be in the room. + ) -> Mapping[str, PartialStateResyncInfo]: + """Get all rooms containing events with partial state, and the information + needed to restart a "resync" of those rooms. Returns: A dictionary of rooms with partial state, with room IDs as keys and lists of servers in rooms as values. """ - room_servers: Dict[str, List[str]] = {} + room_servers: Dict[str, PartialStateResyncInfo] = {} + + rows = await self.db_pool.simple_select_list( + table="partial_state_rooms", + keyvalues={}, + retcols=("room_id", "joined_via"), + desc="get_server_which_served_partial_join", + ) + + for row in rows: + room_id = row["room_id"] + joined_via = row["joined_via"] + room_servers[room_id] = PartialStateResyncInfo(joined_via=joined_via) rows = await self.db_pool.simple_select_list( "partial_state_rooms_servers", @@ -1153,7 +1204,15 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): for row in rows: room_id = row["room_id"] server_name = row["server_name"] - room_servers.setdefault(room_id, []).append(server_name) + entry = room_servers.get(room_id) + if entry is None: + # There is a foreign key constraint which enforces that every room_id in + # partial_state_rooms_servers appears in partial_state_rooms. So we + # expect `entry` to be non-null. (This reasoning fails if we've + # partial-joined between the two SELECTs, but this is unlikely to happen + # in practice.) + continue + entry.servers_in_room.append(server_name) return room_servers @@ -1183,8 +1242,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): ) return False - @staticmethod - def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None: + def _clear_partial_state_room_txn( + self, txn: LoggingTransaction, room_id: str + ) -> None: DatabasePool.simple_delete_txn( txn, table="partial_state_rooms_servers", @@ -1195,7 +1255,32 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): table="partial_state_rooms", keyvalues={"room_id": room_id}, ) + self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_partial_state_servers_at_join, (room_id,) + ) + # We now delete anything from `device_lists_remote_pending` with a + # stream ID less than the minimum + # `partial_state_rooms.device_lists_stream_id`, as we no longer need them. + device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn( + txn, + table="partial_state_rooms", + keyvalues={}, + retcol="MIN(device_lists_stream_id)", + allow_none=True, + ) + if device_lists_stream_id is None: + # There are no rooms being currently partially joined, so we delete everything. + txn.execute("DELETE FROM device_lists_remote_pending") + else: + sql = """ + DELETE FROM device_lists_remote_pending + WHERE stream_id <= ? + """ + txn.execute(sql, (device_lists_stream_id,)) + + @cached() async def is_partial_state_room(self, room_id: str) -> bool: """Checks if this room has partial state. @@ -1214,6 +1299,22 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return entry is not None + async def get_join_event_id_and_device_lists_stream_id_for_partial_state( + self, room_id: str + ) -> Tuple[str, int]: + """Get the event ID of the initial join that started the partial + join, and the device list stream ID at the point we started the partial + join. + """ + + result = await self.db_pool.simple_select_one( + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + retcols=("join_event_id", "device_lists_stream_id"), + desc="get_join_event_id_for_partial_state", + ) + return result["join_event_id"], result["device_lists_stream_id"] + class _BackgroundUpdates: REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -1755,29 +1856,51 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): self, room_id: str, servers: Collection[str], + device_lists_stream_id: int, + joined_via: str, ) -> None: - """Mark the given room as containing events with partial state + """Mark the given room as containing events with partial state. + + We also store additional data that describes _when_ we first partial-joined this + room, which helps us to keep other homeservers in sync when we finally fully + join this room. + + We do not include a `join_event_id` here---we need to wait for the join event + to be persisted first. Args: room_id: the ID of the room servers: other servers known to be in the room + device_lists_stream_id: the device_lists stream ID at the time when we first + joined the room. + joined_via: the server name we requested a partial join from. """ await self.db_pool.runInteraction( "store_partial_state_room", self._store_partial_state_room_txn, room_id, servers, + device_lists_stream_id, + joined_via, ) - @staticmethod def _store_partial_state_room_txn( - txn: LoggingTransaction, room_id: str, servers: Collection[str] + self, + txn: LoggingTransaction, + room_id: str, + servers: Collection[str], + device_lists_stream_id: int, + joined_via: str, ) -> None: DatabasePool.simple_insert_txn( txn, table="partial_state_rooms", values={ "room_id": room_id, + "device_lists_stream_id": device_lists_stream_id, + # To be updated later once the join event is persisted. + "join_event_id": None, + "joined_via": joined_via, }, ) DatabasePool.simple_insert_many_txn( @@ -1786,6 +1909,40 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): keys=("room_id", "server_name"), values=((room_id, s) for s in servers), ) + self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_partial_state_servers_at_join, (room_id,) + ) + + async def write_partial_state_rooms_join_event_id( + self, + room_id: str, + join_event_id: str, + ) -> None: + """Record the join event which resulted from a partial join. + + We do this separately to `store_partial_state_room` because we need to wait for + the join event to be persisted. Otherwise we violate a foreign key constraint. + """ + await self.db_pool.runInteraction( + "write_partial_state_rooms_join_event_id", + self._write_partial_state_rooms_join_event_id, + room_id, + join_event_id, + ) + + def _write_partial_state_rooms_join_event_id( + self, + txn: LoggingTransaction, + room_id: str, + join_event_id: str, + ) -> None: + DatabasePool.simple_update_txn( + txn, + table="partial_state_rooms", + keyvalues={"room_id": room_id}, + updatevalues={"join_event_id": join_event_id}, + ) async def maybe_store_room_on_outlier_membership( self, room_id: str, room_version: RoomVersion @@ -1904,7 +2061,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): Args: report_id: ID of reported event in database Returns: - event_report: json list of information from event report + JSON dict of information from an event report or None if the + report does not exist. """ def _get_event_report_txn( @@ -1977,8 +2135,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): user_id: search for user_id. Ignored if user_id is None room_id: search for room_id. Ignored if room_id is None Returns: - event_reports: json list of event reports - count: total number of event reports matching the filter criteria + Tuple of: + json list of event reports + total number of event reports matching the filter criteria """ def _get_event_reports_paginate_txn( @@ -2001,9 +2160,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else "" + # We join on room_stats_state despite not using any columns from it + # because the join can influence the number of rows returned; + # e.g. a room that doesn't have state, maybe because it was deleted. + # The query returning the total count should be consistent with + # the query returning the results. sql = """ SELECT COUNT(*) as total_event_reports FROM event_reports AS er + JOIN room_stats_state ON room_stats_state.room_id = er.room_id {} """.format( where_clause diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e2cccc688c..f02c1d7ea7 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -15,7 +15,6 @@ import logging from typing import ( TYPE_CHECKING, - Callable, Collection, Dict, FrozenSet, @@ -31,12 +30,8 @@ from typing import ( import attr from synapse.api.constants import EventTypes, Membership -from synapse.events import EventBase from synapse.metrics import LaterGauge -from synapse.metrics.background_process_metrics import ( - run_as_background_process, - wrap_as_background_process, -) +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -91,15 +86,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # at a time. Keyed by room_id. self._joined_host_linearizer = Linearizer("_JoinedHostsCache") - # Is the current_state_events.membership up to date? Or is the - # background update still running? - self._current_state_events_membership_up_to_date = False - - txn = db_conn.cursor( - txn_name="_check_safe_current_state_events_membership_updated" - ) - self._check_safe_current_state_events_membership_updated_txn(txn) - txn.close() + self._server_notices_mxid = hs.config.servernotices.server_notices_mxid if ( self.hs.config.worker.run_background_tasks @@ -157,61 +144,42 @@ class RoomMemberWorkerStore(EventsWorkerStore): self._known_servers_count = max([count, 1]) return self._known_servers_count - def _check_safe_current_state_events_membership_updated_txn( - self, txn: LoggingTransaction - ) -> None: - """Checks if it is safe to assume the new current_state_events - membership column is up to date - """ - - pending_update = self.db_pool.simple_select_one_txn( - txn, - table="background_updates", - keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME}, - retcols=["update_name"], - allow_none=True, - ) - - self._current_state_events_membership_up_to_date = not pending_update - - # If the update is still running, reschedule to run. - if pending_update: - self._clock.call_later( - 15.0, - run_as_background_process, - "_check_safe_current_state_events_membership_updated", - self.db_pool.runInteraction, - "_check_safe_current_state_events_membership_updated", - self._check_safe_current_state_events_membership_updated_txn, - ) - @cached(max_entries=100000, iterable=True) async def get_users_in_room(self, room_id: str) -> List[str]: - return await self.db_pool.runInteraction( - "get_users_in_room", self.get_users_in_room_txn, room_id + """Returns a list of users in the room. + + Will return inaccurate results for rooms with partial state, since the state for + the forward extremities of those rooms will exclude most members. We may also + calculate room state incorrectly for such rooms and believe that a member is or + is not in the room when the opposite is true. + + Note: If you only care about users in the room local to the homeserver, use + `get_local_users_in_room(...)` instead which will be more performant. + """ + return await self.db_pool.simple_select_onecol( + table="current_state_events", + keyvalues={ + "type": EventTypes.Member, + "room_id": room_id, + "membership": Membership.JOIN, + }, + retcol="state_key", + desc="get_users_in_room", ) def get_users_in_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[str]: - # If we can assume current_state_events.membership is up to date - # then we can avoid a join, which is a Very Good Thing given how - # frequently this function gets called. - if self._current_state_events_membership_up_to_date: - sql = """ - SELECT state_key FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? AND membership = ? - """ - else: - sql = """ - SELECT state_key FROM room_memberships as m - INNER JOIN current_state_events as c - ON m.event_id = c.event_id - AND m.room_id = c.room_id - AND m.user_id = c.state_key - WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? - """ + """Returns a list of users in the room.""" - txn.execute(sql, (room_id, Membership.JOIN)) - return [r[0] for r in txn] + return self.db_pool.simple_select_onecol_txn( + txn, + table="current_state_events", + keyvalues={ + "type": EventTypes.Member, + "room_id": room_id, + "membership": Membership.JOIN, + }, + retcol="state_key", + ) @cached() def get_user_in_room_with_profile( @@ -283,6 +251,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): Returns: A mapping from user ID to ProfileInfo. + + Preconditions: + - There is full state available for the room (it is not partial-stated). """ def _get_users_in_room_with_profiles( @@ -322,28 +293,14 @@ class RoomMemberWorkerStore(EventsWorkerStore): # We do this all in one transaction to keep the cache small. # FIXME: get rid of this when we have room_stats - # If we can assume current_state_events.membership is up to date - # then we can avoid a join, which is a Very Good Thing given how - # frequently this function gets called. - if self._current_state_events_membership_up_to_date: - # Note, rejected events will have a null membership field, so - # we we manually filter them out. - sql = """ - SELECT count(*), membership FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? - AND membership IS NOT NULL - GROUP BY membership - """ - else: - sql = """ - SELECT count(*), m.membership FROM room_memberships as m - INNER JOIN current_state_events as c - ON m.event_id = c.event_id - AND m.room_id = c.room_id - AND m.user_id = c.state_key - WHERE c.type = 'm.room.member' AND c.room_id = ? - GROUP BY m.membership - """ + # Note, rejected events will have a null membership field, so + # we we manually filter them out. + sql = """ + SELECT count(*), membership FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? + AND membership IS NOT NULL + GROUP BY membership + """ txn.execute(sql, (room_id,)) res: Dict[str, MemberSummary] = {} @@ -352,30 +309,18 @@ class RoomMemberWorkerStore(EventsWorkerStore): # we order by membership and then fairly arbitrarily by event_id so # heroes are consistent - if self._current_state_events_membership_up_to_date: - # Note, rejected events will have a null membership field, so - # we we manually filter them out. - sql = """ - SELECT state_key, membership, event_id - FROM current_state_events - WHERE type = 'm.room.member' AND room_id = ? - AND membership IS NOT NULL - ORDER BY - CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC, - event_id ASC - LIMIT ? - """ - else: - sql = """ - SELECT c.state_key, m.membership, c.event_id - FROM room_memberships as m - INNER JOIN current_state_events as c USING (room_id, event_id) - WHERE c.type = 'm.room.member' AND c.room_id = ? - ORDER BY - CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC, - c.event_id ASC - LIMIT ? - """ + # Note, rejected events will have a null membership field, so + # we we manually filter them out. + sql = """ + SELECT state_key, membership, event_id + FROM current_state_events + WHERE type = 'm.room.member' AND room_id = ? + AND membership IS NOT NULL + ORDER BY + CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC, + event_id ASC + LIMIT ? + """ # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user. txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6)) @@ -531,6 +476,47 @@ class RoomMemberWorkerStore(EventsWorkerStore): desc="get_local_users_in_room", ) + async def check_local_user_in_room(self, user_id: str, room_id: str) -> bool: + """ + Check whether a given local user is currently joined to the given room. + + Returns: + A boolean indicating whether the user is currently joined to the room + + Raises: + Exeption when called with a non-local user to this homeserver + """ + if not self.hs.is_mine_id(user_id): + raise Exception( + "Cannot call 'check_local_user_in_room' on " + "non-local user %s" % (user_id,), + ) + + ( + membership, + member_event_id, + ) = await self.get_local_current_membership_for_user_in_room( + user_id=user_id, + room_id=room_id, + ) + + return membership == Membership.JOIN + + async def is_server_notice_room(self, room_id: str) -> bool: + """ + Determines whether the given room is a 'Server Notices' room, used for + sending server notices to a user. + + This is determined by seeing whether the server notices user is present + in the room. + """ + if self._server_notices_mxid is None: + return False + is_server_notices_room = await self.check_local_user_in_room( + user_id=self._server_notices_mxid, room_id=room_id + ) + return is_server_notices_room + async def get_local_current_membership_for_user_in_room( self, user_id: str, room_id: str ) -> Tuple[Optional[str], Optional[str]]: @@ -592,27 +578,15 @@ class RoomMemberWorkerStore(EventsWorkerStore): # We use `current_state_events` here and not `local_current_membership` # as a) this gets called with remote users and b) this only gets called # for rooms the server is participating in. - if self._current_state_events_membership_up_to_date: - sql = """ - SELECT room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.state_key = ? - AND c.membership = ? - """ - else: - sql = """ - SELECT room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN room_memberships AS m USING (room_id, event_id) - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.state_key = ? - AND m.membership = ? - """ + sql = """ + SELECT room_id, e.instance_name, e.stream_ordering + FROM current_state_events AS c + INNER JOIN events AS e USING (room_id, event_id) + WHERE + c.type = 'm.room.member' + AND c.state_key = ? + AND c.membership = ? + """ txn.execute(sql, (user_id, Membership.JOIN)) return frozenset( @@ -622,117 +596,124 @@ class RoomMemberWorkerStore(EventsWorkerStore): for room_id, instance, stream_id in txn ) - @cachedList( - cached_method_name="get_rooms_for_user_with_stream_ordering", - list_name="user_ids", - ) - async def get_rooms_for_users_with_stream_ordering( + async def get_users_server_still_shares_room_with( self, user_ids: Collection[str] - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: - """A batched version of `get_rooms_for_user_with_stream_ordering`. - - Returns: - Map from user_id to set of rooms that is currently in. + ) -> Set[str]: + """Given a list of users return the set that the server still share a + room with. """ + + if not user_ids: + return set() + return await self.db_pool.runInteraction( - "get_rooms_for_users_with_stream_ordering", - self._get_rooms_for_users_with_stream_ordering_txn, + "get_users_server_still_shares_room_with", + self.get_users_server_still_shares_room_with_txn, user_ids, ) - def _get_rooms_for_users_with_stream_ordering_txn( - self, txn: LoggingTransaction, user_ids: Collection[str] - ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: + def get_users_server_still_shares_room_with_txn( + self, + txn: LoggingTransaction, + user_ids: Collection[str], + ) -> Set[str]: + if not user_ids: + return set() + + sql = """ + SELECT state_key FROM current_state_events + WHERE + type = 'm.room.member' + AND membership = 'join' + AND %s + GROUP BY state_key + """ clause, args = make_in_list_sql_clause( - self.database_engine, - "c.state_key", - user_ids, + self.database_engine, "state_key", user_ids ) - if self._current_state_events_membership_up_to_date: - sql = f""" - SELECT c.state_key, room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND c.membership = ? - AND {clause} - """ - else: - sql = f""" - SELECT c.state_key, room_id, e.instance_name, e.stream_ordering - FROM current_state_events AS c - INNER JOIN room_memberships AS m USING (room_id, event_id) - INNER JOIN events AS e USING (room_id, event_id) - WHERE - c.type = 'm.room.member' - AND m.membership = ? - AND {clause} - """ + txn.execute(sql % (clause,), args) - txn.execute(sql, [Membership.JOIN] + args) + return {row[0] for row in txn} - result: Dict[str, Set[GetRoomsForUserWithStreamOrdering]] = { - user_id: set() for user_id in user_ids - } - for user_id, room_id, instance, stream_id in txn: - result[user_id].add( - GetRoomsForUserWithStreamOrdering( - room_id, PersistedEventPosition(instance, stream_id) - ) - ) - - return {user_id: frozenset(v) for user_id, v in result.items()} + @cached(max_entries=500000, iterable=True) + async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]: + """Returns a set of room_ids the user is currently joined to. - async def get_users_server_still_shares_room_with( - self, user_ids: Collection[str] - ) -> Set[str]: - """Given a list of users return the set that the server still share a - room with. + If a remote user only returns rooms this server is currently + participating in. """ + rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate( + (user_id,), + None, + update_metrics=False, + ) + if rooms: + return frozenset(r.room_id for r in rooms) - if not user_ids: - return set() - - def _get_users_server_still_shares_room_with_txn( - txn: LoggingTransaction, - ) -> Set[str]: - sql = """ - SELECT state_key FROM current_state_events - WHERE - type = 'm.room.member' - AND membership = 'join' - AND %s - GROUP BY state_key - """ + room_ids = await self.db_pool.simple_select_onecol( + table="current_state_events", + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + "state_key": user_id, + }, + retcol="room_id", + desc="get_rooms_for_user", + ) - clause, args = make_in_list_sql_clause( - self.database_engine, "state_key", user_ids - ) + return frozenset(room_ids) - txn.execute(sql % (clause,), args) + @cachedList( + cached_method_name="get_rooms_for_user", + list_name="user_ids", + ) + async def _get_rooms_for_users( + self, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[str]]: + """A batched version of `get_rooms_for_user`. - return {row[0] for row in txn} + Returns: + Map from user_id to set of rooms that is currently in. + """ - return await self.db_pool.runInteraction( - "get_users_server_still_shares_room_with", - _get_users_server_still_shares_room_with_txn, + rows = await self.db_pool.simple_select_many_batch( + table="current_state_events", + column="state_key", + iterable=user_ids, + retcols=( + "state_key", + "room_id", + ), + keyvalues={ + "type": EventTypes.Member, + "membership": Membership.JOIN, + }, + desc="get_rooms_for_users", ) - async def get_rooms_for_user( - self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None - ) -> FrozenSet[str]: - """Returns a set of room_ids the user is currently joined to. + user_rooms: Dict[str, Set[str]] = {user_id: set() for user_id in user_ids} - If a remote user only returns rooms this server is currently - participating in. + for row in rows: + user_rooms[row["state_key"]].add(row["room_id"]) + + return {key: frozenset(rooms) for key, rooms in user_rooms.items()} + + async def get_rooms_for_users( + self, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[str]]: + """A batched wrapper around `_get_rooms_for_users`, to prevent locking + other calls to `get_rooms_for_user` for large user lists. """ - rooms = await self.get_rooms_for_user_with_stream_ordering( - user_id, on_invalidate=on_invalidate - ) - return frozenset(r.room_id for r in rooms) + all_user_rooms: Dict[str, FrozenSet[str]] = {} + + # 250 users is pretty arbitrary but the data can be quite large if users + # are in many rooms. + for batch_user_ids in batch_iter(user_ids, 250): + all_user_rooms.update(await self._get_rooms_for_users(batch_user_ids)) + + return all_user_rooms @cached(max_entries=10000) async def does_pair_of_users_share_a_room( @@ -764,7 +745,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # user and the set of other users, and then checking if there is any # overlap. sql = f""" - SELECT b.state_key + SELECT DISTINCT b.state_key FROM ( SELECT room_id FROM current_state_events WHERE type = 'm.room.member' AND membership = 'join' AND state_key = ? @@ -773,7 +754,6 @@ class RoomMemberWorkerStore(EventsWorkerStore): SELECT room_id, state_key FROM current_state_events WHERE type = 'm.room.member' AND membership = 'join' AND {clause} ) AS b using (room_id) - LIMIT 1 """ txn.execute(sql, (user_id, *args)) @@ -832,144 +812,92 @@ class RoomMemberWorkerStore(EventsWorkerStore): return shared_room_ids or frozenset() - async def get_joined_users_from_state( - self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry" - ) -> Dict[str, ProfileInfo]: - state_group: Union[object, int] = state_entry.state_group - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() - - assert state_group is not None - with Measure(self._clock, "get_joined_users_from_state"): - return await self._get_joined_users_from_context( - room_id, state_group, state, context=state_entry - ) + async def get_joined_user_ids_from_state( + self, room_id: str, state: StateMap[str] + ) -> Set[str]: + """ + For a given set of state IDs, get a set of user IDs in the room. - @cached(num_args=2, iterable=True, max_entries=100000) - async def _get_joined_users_from_context( - self, - room_id: str, - state_group: Union[object, int], - current_state_ids: StateMap[str], - event: Optional[EventBase] = None, - context: Optional["_StateCacheEntry"] = None, - ) -> Dict[str, ProfileInfo]: - # We don't use `state_group`, it's there so that we can cache based - # on it. However, it's important that it's never None, since two current_states - # with a state_group of None are likely to be different. - assert state_group is not None + This method checks the local event cache, before calling + `_get_user_ids_from_membership_event_ids` for any uncached events. + """ - users_in_room = {} - member_event_ids = [ - e_id - for key, e_id in current_state_ids.items() - if key[0] == EventTypes.Member - ] - - if context is not None: - # If we have a context with a delta from a previous state group, - # check if we also have the result from the previous group in cache. - # If we do then we can reuse that result and simply update it with - # any membership changes in `delta_ids` - if context.prev_group and context.delta_ids: - prev_res = self._get_joined_users_from_context.cache.get_immediate( - (room_id, context.prev_group), None - ) - if prev_res and isinstance(prev_res, dict): - users_in_room = dict(prev_res) - member_event_ids = [ - e_id - for key, e_id in context.delta_ids.items() - if key[0] == EventTypes.Member - ] - for etype, state_key in context.delta_ids: - if etype == EventTypes.Member: - users_in_room.pop(state_key, None) - - # We check if we have any of the member event ids in the event cache - # before we ask the DB - - # We don't update the event cache hit ratio as it completely throws off - # the hit ratio counts. After all, we don't populate the cache if we - # miss it here - event_map = await self._get_events_from_cache( - member_event_ids, update_metrics=False - ) + with Measure(self._clock, "get_joined_user_ids_from_state"): + users_in_room = set() + member_event_ids = [ + e_id for key, e_id in state.items() if key[0] == EventTypes.Member + ] - missing_member_event_ids = [] - for event_id in member_event_ids: - ev_entry = event_map.get(event_id) - if ev_entry and not ev_entry.event.rejected_reason: - if ev_entry.event.membership == Membership.JOIN: - users_in_room[ev_entry.event.state_key] = ProfileInfo( - display_name=ev_entry.event.content.get("displayname", None), - avatar_url=ev_entry.event.content.get("avatar_url", None), - ) - else: - missing_member_event_ids.append(event_id) + # We check if we have any of the member event ids in the event cache + # before we ask the DB - if missing_member_event_ids: - event_to_memberships = await self._get_joined_profiles_from_event_ids( - missing_member_event_ids + # We don't update the event cache hit ratio as it completely throws off + # the hit ratio counts. After all, we don't populate the cache if we + # miss it here + event_map = self._get_events_from_local_cache( + member_event_ids, update_metrics=False ) - users_in_room.update(row for row in event_to_memberships.values() if row) - - if event is not None and event.type == EventTypes.Member: - if event.membership == Membership.JOIN: - if event.event_id in member_event_ids: - users_in_room[event.state_key] = ProfileInfo( - display_name=event.content.get("displayname", None), - avatar_url=event.content.get("avatar_url", None), + + missing_member_event_ids = [] + for event_id in member_event_ids: + ev_entry = event_map.get(event_id) + if ev_entry and not ev_entry.event.rejected_reason: + if ev_entry.event.membership == Membership.JOIN: + users_in_room.add(ev_entry.event.state_key) + else: + missing_member_event_ids.append(event_id) + + if missing_member_event_ids: + event_to_memberships = ( + await self._get_user_ids_from_membership_event_ids( + missing_member_event_ids ) + ) + users_in_room.update( + user_id for user_id in event_to_memberships.values() if user_id + ) - return users_in_room + return users_in_room - @cached(max_entries=10000) - def _get_joined_profile_from_event_id( + @cached( + max_entries=10000, + # This name matches the old function that has been replaced - the cache name + # is kept here to maintain backwards compatibility. + name="_get_joined_profile_from_event_id", + ) + def _get_user_id_from_membership_event_id( self, event_id: str ) -> Optional[Tuple[str, ProfileInfo]]: raise NotImplementedError() @cachedList( - cached_method_name="_get_joined_profile_from_event_id", + cached_method_name="_get_user_id_from_membership_event_id", list_name="event_ids", ) - async def _get_joined_profiles_from_event_ids( + async def _get_user_ids_from_membership_event_ids( self, event_ids: Iterable[str] - ) -> Dict[str, Optional[Tuple[str, ProfileInfo]]]: + ) -> Dict[str, Optional[str]]: """For given set of member event_ids check if they point to a join - event and if so return the associated user and profile info. + event. Args: event_ids: The member event IDs to lookup Returns: - Map from event ID to `user_id` and ProfileInfo (or None if not join event). + Map from event ID to `user_id`, or None if event is not a join. """ rows = await self.db_pool.simple_select_many_batch( table="room_memberships", column="event_id", iterable=event_ids, - retcols=("user_id", "display_name", "avatar_url", "event_id"), + retcols=("user_id", "event_id"), keyvalues={"membership": Membership.JOIN}, batch_size=1000, - desc="_get_joined_profiles_from_event_ids", + desc="_get_user_ids_from_membership_event_ids", ) - return { - row["event_id"]: ( - row["user_id"], - ProfileInfo( - avatar_url=row["avatar_url"], display_name=row["display_name"] - ), - ) - for row in rows - } + return {row["event_id"]: row["user_id"] for row in rows} @cached(max_entries=10000) async def is_host_joined(self, room_id: str, host: str) -> bool: @@ -1051,6 +979,72 @@ class RoomMemberWorkerStore(EventsWorkerStore): "get_current_hosts_in_room", get_current_hosts_in_room_txn ) + @cached(iterable=True, max_entries=10000) + async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]: + """ + Get current hosts in room based on current state. + + The heuristic of sorting by servers who have been in the room the + longest is good because they're most likely to have anything we ask + about. + + For SQLite the returned list is not ordered, as SQLite doesn't support + the appropriate SQL. + + Uses `m.room.member`s in the room state at the current forward + extremities to determine which hosts are in the room. + + Will return inaccurate results for rooms with partial state, since the + state for the forward extremities of those rooms will exclude most + members. We may also calculate room state incorrectly for such rooms and + believe that a host is or is not in the room when the opposite is true. + + Returns: + Returns a list of servers sorted by longest in the room first. (aka. + sorted by join with the lowest depth first). + """ + + if isinstance(self.database_engine, Sqlite3Engine): + # If we're using SQLite then let's just always use + # `get_users_in_room` rather than funky SQL. + + domains = await self.get_current_hosts_in_room(room_id) + return list(domains) + + # For PostgreSQL we can use a regex to pull out the domains from the + # joined users in `current_state_events` via regex. + + def get_current_hosts_in_room_ordered_txn(txn: LoggingTransaction) -> List[str]: + # Returns a list of servers currently joined in the room sorted by + # longest in the room first (aka. with the lowest depth). The + # heuristic of sorting by servers who have been in the room the + # longest is good because they're most likely to have anything we + # ask about. + sql = """ + SELECT + /* Match the domain part of the MXID */ + substring(c.state_key FROM '@[^:]*:(.*)$') as server_domain + FROM current_state_events c + /* Get the depth of the event from the events table */ + INNER JOIN events AS e USING (event_id) + WHERE + /* Find any join state events in the room */ + c.type = 'm.room.member' + AND c.membership = 'join' + AND c.room_id = ? + /* Group all state events from the same domain into their own buckets (groups) */ + GROUP BY server_domain + /* Sorted by lowest depth first */ + ORDER BY min(e.depth) ASC; + """ + txn.execute(sql, (room_id,)) + # `server_domain` will be `NULL` for malformed MXIDs with no colons. + return [d for d, in txn if d is not None] + + return await self.db_pool.runInteraction( + "get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn + ) + async def get_joined_hosts( self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry" ) -> FrozenSet[str]: @@ -1128,12 +1122,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): else: # The cache doesn't match the state group or prev state group, # so we calculate the result from first principles. - joined_users = await self.get_joined_users_from_state( - room_id, state, state_entry + joined_user_ids = await self.get_joined_user_ids_from_state( + room_id, state ) cache.hosts_to_joined_users = {} - for user_id in joined_users: + for user_id in joined_user_ids: host = intern_string(get_domain_from_id(user_id)) cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) @@ -1212,6 +1206,30 @@ class RoomMemberWorkerStore(EventsWorkerStore): "get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn ) + async def is_locally_forgotten_room(self, room_id: str) -> bool: + """Returns whether all local users have forgotten this room_id. + + Args: + room_id: The room ID to query. + + Returns: + Whether the room is forgotten. + """ + + sql = """ + SELECT count(*) > 0 FROM local_current_membership + INNER JOIN room_memberships USING (room_id, event_id) + WHERE + room_id = ? + AND forgotten = 0; + """ + + rows = await self.db_pool.execute("is_forgotten_room", None, sql, room_id) + + # `count(*)` returns always an integer + # If any rows still exist it means someone has not forgotten this room yet + return not rows[0][0] + async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]: """Get all rooms that the user has ever been in. @@ -1499,6 +1517,36 @@ class RoomMemberStore( await self.db_pool.runInteraction("forget_membership", f) +def extract_heroes_from_room_summary( + details: Mapping[str, MemberSummary], me: str +) -> List[str]: + """Determine the users that represent a room, from the perspective of the `me` user. + + The rules which say which users we select are specified in the "Room Summary" + section of + https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3sync + + Returns a list (possibly empty) of heroes' mxids. + """ + empty_ms = MemberSummary([], 0) + + joined_user_ids = [ + r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me + ] + invited_user_ids = [ + r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me + ] + gone_user_ids = [ + r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me + ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me] + + # FIXME: order by stream ordering rather than as returned by SQL + if joined_user_ids or invited_user_ids: + return sorted(joined_user_ids + invited_user_ids)[0:5] + else: + return sorted(gone_user_ids)[0:5] + + @attr.s(slots=True, auto_attribs=True) class _JoinedHostsCache: """The cached data used by the `_get_joined_hosts_cache`.""" diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f6e24b68d2..3fe433f66c 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py
@@ -11,10 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import enum import logging import re -from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Set, Tuple +from collections import deque +from dataclasses import dataclass +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) import attr @@ -27,7 +39,7 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict if TYPE_CHECKING: @@ -68,11 +80,11 @@ class SearchWorkerStore(SQLBaseStore): if not self.hs.config.server.enable_search: return if isinstance(self.database_engine, PostgresEngine): - sql = ( - "INSERT INTO event_search" - " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" - " VALUES (?,?,?,to_tsvector('english', ?),?,?)" - ) + sql = """ + INSERT INTO event_search + (event_id, room_id, key, vector, stream_ordering, origin_server_ts) + VALUES (?,?,?,to_tsvector('english', ?),?,?) + """ args1 = ( ( @@ -89,20 +101,20 @@ class SearchWorkerStore(SQLBaseStore): txn.execute_batch(sql, args1) elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, value)" - " VALUES (?,?,?,?)" - ) - args2 = ( - ( - entry.event_id, - entry.room_id, - entry.key, - _clean_value_for_search(entry.value), - ) - for entry in entries + self.db_pool.simple_insert_many_txn( + txn, + table="event_search", + keys=("event_id", "room_id", "key", "value"), + values=( + ( + entry.event_id, + entry.room_id, + entry.key, + _clean_value_for_search(entry.value), + ) + for entry in entries + ), ) - txn.execute_batch(sql, args2) else: # This should be unreachable. @@ -150,15 +162,17 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): TYPES = ["m.room.name", "m.room.message", "m.room.topic"] def reindex_search_txn(txn: LoggingTransaction) -> int: - sql = ( - "SELECT stream_ordering, event_id, room_id, type, json, " - " origin_server_ts FROM events" - " JOIN event_json USING (room_id, event_id)" - " WHERE ? <= stream_ordering AND stream_ordering < ?" - " AND (%s)" - " ORDER BY stream_ordering DESC" - " LIMIT ?" - ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),) + sql = """ + SELECT stream_ordering, event_id, room_id, type, json, origin_server_ts + FROM events + JOIN event_json USING (room_id, event_id) + WHERE ? <= stream_ordering AND stream_ordering < ? + AND (%s) + ORDER BY stream_ordering DESC + LIMIT ? + """ % ( + " OR ".join("type = '%s'" % (t,) for t in TYPES), + ) txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) @@ -272,8 +286,10 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): try: c.execute( - "CREATE INDEX CONCURRENTLY event_search_fts_idx" - " ON event_search USING GIN (vector)" + """ + CREATE INDEX CONCURRENTLY event_search_fts_idx + ON event_search USING GIN (vector) + """ ) except psycopg2.ProgrammingError as e: logger.warning( @@ -311,12 +327,16 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # We create with NULLS FIRST so that when we search *backwards* # we get the ones with non null origin_server_ts *first* c.execute( - "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search(" - "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" + """ + CREATE INDEX CONCURRENTLY event_search_room_order + ON event_search(room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) + """ ) c.execute( - "CREATE INDEX CONCURRENTLY event_search_order ON event_search(" - "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" + """ + CREATE INDEX CONCURRENTLY event_search_order + ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) + """ ) conn.set_session(autocommit=False) @@ -333,14 +353,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ) def reindex_search_txn(txn: LoggingTransaction) -> Tuple[int, bool]: - sql = ( - "UPDATE event_search AS es SET stream_ordering = e.stream_ordering," - " origin_server_ts = e.origin_server_ts" - " FROM events AS e" - " WHERE e.event_id = es.event_id" - " AND ? <= e.stream_ordering AND e.stream_ordering < ?" - " RETURNING es.stream_ordering" - ) + sql = """ + UPDATE event_search AS es + SET stream_ordering = e.stream_ordering, origin_server_ts = e.origin_server_ts + FROM events AS e + WHERE e.event_id = es.event_id + AND ? <= e.stream_ordering AND e.stream_ordering < ? + RETURNING es.stream_ordering + """ min_stream_id = max_stream_id - batch_size txn.execute(sql, (min_stream_id, max_stream_id)) @@ -421,8 +441,6 @@ class SearchStore(SearchBackgroundUpdateStore): """ clauses = [] - search_query = _parse_query(self.database_engine, search_term) - args: List[Any] = [] # Make sure we don't explode because the person is in too many rooms. @@ -444,32 +462,35 @@ class SearchStore(SearchBackgroundUpdateStore): count_clauses = clauses if isinstance(self.database_engine, PostgresEngine): - sql = ( - "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank," - " room_id, event_id" - " FROM event_search" - " WHERE vector @@ to_tsquery('english', ?)" - ) + search_query = search_term + sql = """ + SELECT ts_rank_cd(vector, websearch_to_tsquery('english', ?)) AS rank, + room_id, event_id + FROM event_search + WHERE vector @@ websearch_to_tsquery('english', ?) + """ args = [search_query, search_query] + args - count_sql = ( - "SELECT room_id, count(*) as count FROM event_search" - " WHERE vector @@ to_tsquery('english', ?)" - ) + count_sql = """ + SELECT room_id, count(*) as count FROM event_search + WHERE vector @@ websearch_to_tsquery('english', ?) + """ count_args = [search_query] + count_args elif isinstance(self.database_engine, Sqlite3Engine): - sql = ( - "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" - " FROM event_search" - " WHERE value MATCH ?" - ) + search_query = _parse_query_for_sqlite(search_term) + + sql = """ + SELECT rank(matchinfo(event_search)) as rank, room_id, event_id + FROM event_search + WHERE value MATCH ? + """ args = [search_query] + args - count_sql = ( - "SELECT room_id, count(*) as count FROM event_search" - " WHERE value MATCH ?" - ) - count_args = [search_term] + count_args + count_sql = """ + SELECT room_id, count(*) as count FROM event_search + WHERE value MATCH ? + """ + count_args = [search_query] + count_args else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -510,7 +531,6 @@ class SearchStore(SearchBackgroundUpdateStore): ) count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) - return { "results": [ {"event": event_map[r["event_id"]], "rank": r["rank"]} @@ -542,9 +562,6 @@ class SearchStore(SearchBackgroundUpdateStore): Each match as a dictionary. """ clauses = [] - - search_query = _parse_query(self.database_engine, search_term) - args: List[Any] = [] # Make sure we don't explode because the person is in too many rooms. @@ -576,26 +593,29 @@ class SearchStore(SearchBackgroundUpdateStore): raise SynapseError(400, "Invalid pagination token") clauses.append( - "(origin_server_ts < ?" - " OR (origin_server_ts = ? AND stream_ordering < ?))" + """ + (origin_server_ts < ? OR (origin_server_ts = ? AND stream_ordering < ?)) + """ ) args.extend([origin_server_ts, origin_server_ts, stream]) if isinstance(self.database_engine, PostgresEngine): - sql = ( - "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank," - " origin_server_ts, stream_ordering, room_id, event_id" - " FROM event_search" - " WHERE vector @@ to_tsquery('english', ?) AND " - ) + search_query = search_term + sql = """ + SELECT ts_rank_cd(vector, websearch_to_tsquery('english', ?)) as rank, + origin_server_ts, stream_ordering, room_id, event_id + FROM event_search + WHERE vector @@ websearch_to_tsquery('english', ?) AND + """ args = [search_query, search_query] + args - count_sql = ( - "SELECT room_id, count(*) as count FROM event_search" - " WHERE vector @@ to_tsquery('english', ?) AND " - ) + count_sql = """ + SELECT room_id, count(*) as count FROM event_search + WHERE vector @@ websearch_to_tsquery('english', ?) AND + """ count_args = [search_query] + count_args elif isinstance(self.database_engine, Sqlite3Engine): + # We use CROSS JOIN here to ensure we use the right indexes. # https://sqlite.org/optoverview.html#crossjoin # @@ -604,23 +624,25 @@ class SearchStore(SearchBackgroundUpdateStore): # in the events table to get the topological ordering. We need # to use the indexes in this order because sqlite refuses to # MATCH unless it uses the full text search index - sql = ( - "SELECT rank(matchinfo) as rank, room_id, event_id," - " origin_server_ts, stream_ordering" - " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" - " FROM event_search" - " WHERE value MATCH ?" - " )" - " CROSS JOIN events USING (event_id)" - " WHERE " + sql = """ + SELECT + rank(matchinfo) as rank, room_id, event_id, origin_server_ts, stream_ordering + FROM ( + SELECT key, event_id, matchinfo(event_search) as matchinfo + FROM event_search + WHERE value MATCH ? ) + CROSS JOIN events USING (event_id) + WHERE + """ + search_query = _parse_query_for_sqlite(search_term) args = [search_query] + args - count_sql = ( - "SELECT room_id, count(*) as count FROM event_search" - " WHERE value MATCH ? AND " - ) - count_args = [search_term] + count_args + count_sql = """ + SELECT room_id, count(*) as count FROM event_search + WHERE value MATCH ? AND + """ + count_args = [search_query] + count_args else: # This should be unreachable. raise Exception("Unrecognized database engine") @@ -631,17 +653,17 @@ class SearchStore(SearchBackgroundUpdateStore): # We add an arbitrary limit here to ensure we don't try to pull the # entire table from the database. if isinstance(self.database_engine, PostgresEngine): - sql += ( - " ORDER BY origin_server_ts DESC NULLS LAST," - " stream_ordering DESC NULLS LAST LIMIT ?" - ) + sql += """ + ORDER BY origin_server_ts DESC NULLS LAST, stream_ordering DESC NULLS LAST + LIMIT ? + """ elif isinstance(self.database_engine, Sqlite3Engine): sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" else: raise Exception("Unrecognized database engine") # mypy expects to append only a `str`, not an `int` - args.append(limit) # type: ignore[arg-type] + args.append(limit) results = await self.db_pool.execute( "search_rooms", self.db_pool.cursor_to_dict, sql, *args @@ -729,13 +751,16 @@ class SearchStore(SearchBackgroundUpdateStore): while stop_sel in value: stop_sel += ">" - query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % ( - _to_postgres_options( - { - "StartSel": start_sel, - "StopSel": stop_sel, - "MaxFragments": "50", - } + query = ( + "SELECT ts_headline(?, websearch_to_tsquery('english', ?), %s)" + % ( + _to_postgres_options( + { + "StartSel": start_sel, + "StopSel": stop_sel, + "MaxFragments": "50", + } + ) ) ) txn.execute(query, (value, search_query)) @@ -760,20 +785,127 @@ def _to_postgres_options(options_dict: JsonDict) -> str: return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),) -def _parse_query(database_engine: BaseDatabaseEngine, search_term: str) -> str: - """Takes a plain unicode string from the user and converts it into a form - that can be passed to database. - We use this so that we can add prefix matching, which isn't something - that is supported by default. +@dataclass +class Phrase: + phrase: List[str] + + +class SearchToken(enum.Enum): + Not = enum.auto() + Or = enum.auto() + And = enum.auto() + + +Token = Union[str, Phrase, SearchToken] +TokenList = List[Token] + + +def _is_stop_word(word: str) -> bool: + # TODO Pull these out of the dictionary: + # https://github.com/postgres/postgres/blob/master/src/backend/snowball/stopwords/english.stop + return word in {"the", "a", "you", "me", "and", "but"} + + +def _tokenize_query(query: str) -> TokenList: """ + Convert the user-supplied `query` into a TokenList, which can be translated into + some DB-specific syntax. + + The following constructs are supported: + + - phrase queries using "double quotes" + - case-insensitive `or` and `and` operators + - negation of a keyword via unary `-` + - unary hyphen to denote NOT e.g. 'include -exclude' - # Pull out the individual words, discarding any non-word characters. - results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + The following differs from websearch_to_tsquery: - if isinstance(database_engine, PostgresEngine): - return " & ".join(result + ":*" for result in results) - elif isinstance(database_engine, Sqlite3Engine): - return " & ".join(result + "*" for result in results) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") + - Stop words are not removed. + - Unclosed phrases are treated differently. + + """ + tokens: TokenList = [] + + # Find phrases. + in_phrase = False + parts = deque(query.split('"')) + for i, part in enumerate(parts): + # The contents inside double quotes is treated as a phrase. + in_phrase = bool(i % 2) + + # Pull out the individual words, discarding any non-word characters. + words = deque(re.findall(r"([\w\-]+)", part, re.UNICODE)) + + # Phrases have simplified handling of words. + if in_phrase: + # Skip stop words. + phrase = [word for word in words if not _is_stop_word(word)] + + # Consecutive words are implicitly ANDed together. + if tokens and tokens[-1] not in (SearchToken.Not, SearchToken.Or): + tokens.append(SearchToken.And) + + # Add the phrase. + tokens.append(Phrase(phrase)) + continue + + # Otherwise, not in a phrase. + while words: + word = words.popleft() + + if word.startswith("-"): + tokens.append(SearchToken.Not) + + # If there's more word, put it back to be processed again. + word = word[1:] + if word: + words.appendleft(word) + elif word.lower() == "or": + tokens.append(SearchToken.Or) + else: + # Skip stop words. + if _is_stop_word(word): + continue + + # Consecutive words are implicitly ANDed together. + if tokens and tokens[-1] not in (SearchToken.Not, SearchToken.Or): + tokens.append(SearchToken.And) + + # Add the search term. + tokens.append(word) + + return tokens + + +def _tokens_to_sqlite_match_query(tokens: TokenList) -> str: + """ + Convert the list of tokens to a string suitable for passing to sqlite's MATCH. + Assume sqlite was compiled with enhanced query syntax. + + Ref: https://www.sqlite.org/fts3.html#full_text_index_queries + """ + match_query = [] + for token in tokens: + if isinstance(token, str): + match_query.append(token) + elif isinstance(token, Phrase): + match_query.append('"' + " ".join(token.phrase) + '"') + elif token == SearchToken.Not: + # TODO: SQLite treats NOT as a *binary* operator. Hopefully a search + # term has already been added before this. + match_query.append(" NOT ") + elif token == SearchToken.Or: + match_query.append(" OR ") + elif token == SearchToken.And: + match_query.append(" AND ") + else: + raise ValueError(f"unknown token {token}") + + return "".join(match_query) + + +def _parse_query_for_sqlite(search_term: str) -> str: + """Takes a plain unicode string from the user and converts it into a form + that can be passed to sqllite's matchinfo(). + """ + return _tokens_to_sqlite_match_query(_tokenize_query(search_term)) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index f70705a0af..af7bebee80 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py
@@ -23,6 +23,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.logging.opentracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -36,6 +37,7 @@ from synapse.storage.state import StateFilter from synapse.types import JsonDict, JsonMapping, StateMap from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -142,6 +144,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return room_version + @trace async def get_metadata_for_events( self, event_ids: Collection[str] ) -> Dict[str, EventMetadata]: @@ -281,6 +284,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) # FIXME: how should this be cached? + @cancellable async def get_partial_filtered_current_state_ids( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: @@ -430,6 +434,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): updatevalues={"state_group": state_group}, ) + # the event may now be rejected where it was not before, or vice versa, + # in which case we need to update the rejected flags. + if bool(context.rejected) != (event.rejected_reason is not None): + self.mark_event_rejected_txn(txn, event.event_id, context.rejected) + self.db_pool.simple_delete_one_txn( txn, table="partial_state_events", diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore): absolutes: Absolute (set) fields additive_relatives: Fields that will be added onto if existing row present. """ - if self.database_engine.can_native_upsert: - absolute_updates = [ - "%(field)s = EXCLUDED.%(field)s" % {"field": field} - for field in absolutes.keys() - ] - - relative_updates = [ - "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" - % {"table": table, "field": field} - for field in additive_relatives.keys() - ] - - insert_cols = [] - qargs = [] - - for (key, val) in chain( - keyvalues.items(), absolutes.items(), additive_relatives.items() - ): - insert_cols.append(key) - qargs.append(val) + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "key_columns": ", ".join(keyvalues), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } - sql = """ - INSERT INTO %(table)s (%(insert_cols_cs)s) - VALUES (%(insert_vals_qs)s) - ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s - """ % { - "table": table, - "insert_cols_cs": ", ".join(insert_cols), - "insert_vals_qs": ", ".join( - ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) - ), - "key_columns": ", ".join(keyvalues), - "updates": ", ".join(chain(absolute_updates, relative_updates)), - } - - txn.execute(sql, qargs) - else: - self.database_engine.lock_table(txn, table) - retcols = list(chain(absolutes.keys(), additive_relatives.keys())) - current_row = self.db_pool.simple_select_one_txn( - txn, table, keyvalues, retcols, allow_none=True - ) - if current_row is None: - merged_dict = {**keyvalues, **absolutes, **additive_relatives} - self.db_pool.simple_insert_txn(txn, table, merged_dict) - else: - for (key, val) in additive_relatives.items(): - if current_row[key] is None: - current_row[key] = val - else: - current_row[key] += val - current_row.update(absolutes) - self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row) + txn.execute(sql, qargs) async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None: """Calculate and insert an entry into room_stats_current. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 2590b52f73..cc27ec3804 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -58,6 +58,7 @@ from twisted.internet import defer from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.opentracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -71,6 +72,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.cancellation import cancellable if TYPE_CHECKING: from synapse.server import HomeServer @@ -355,6 +357,24 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: ) args.extend(event_filter.related_by_rel_types) + if event_filter.rel_types: + clauses.append( + "(%s)" + % " OR ".join( + "event_relation.relation_type = ?" for _ in event_filter.rel_types + ) + ) + args.extend(event_filter.rel_types) + + if event_filter.not_rel_types: + clauses.append( + "((%s) OR event_relation.relation_type IS NULL)" + % " AND ".join( + "event_relation.relation_type != ?" for _ in event_filter.not_rel_types + ) + ) + args.extend(event_filter.not_rel_types) + return " AND ".join(clauses), args @@ -395,6 +415,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) self._stream_order_on_start = self.get_room_max_stream_ordering() + self._min_stream_order_on_start = self.get_room_min_stream_ordering() def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to @@ -596,6 +617,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret, key + @cancellable async def get_membership_changes_for_user( self, user_id: str, @@ -1021,28 +1043,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "after": {"event_ids": events_after, "token": end_token}, } - async def get_all_new_events_stream( - self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False - ) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]: + async def get_all_new_event_ids_stream( + self, + from_id: int, + current_id: int, + limit: int, + ) -> Tuple[int, Dict[str, Optional[int]]]: """Get all new events - Returns all events with from_id < stream_ordering <= current_id. + Returns all event ids with from_id < stream_ordering <= current_id. Args: from_id: the stream_ordering of the last event we processed current_id: the stream_ordering of the most recently processed event limit: the maximum number of events to return - get_prev_content: whether to fetch previous event content Returns: - A tuple of (next_id, events, event_to_received_ts), where `next_id` + A tuple of (next_id, event_to_received_ts), where `next_id` is the next value to pass as `from_id` (it will either be the stream_ordering of the last returned event, or, if fewer than `limit` events were found, the `current_id`). The `event_to_received_ts` is - a dictionary mapping event ID to the event `received_ts`. + a dictionary mapping event ID to the event `received_ts`, sorted by ascending + stream_ordering. """ - def get_all_new_events_stream_txn( + def get_all_new_event_ids_stream_txn( txn: LoggingTransaction, ) -> Tuple[int, Dict[str, Optional[int]]]: sql = ( @@ -1067,15 +1092,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return upper_bound, event_to_received_ts upper_bound, event_to_received_ts = await self.db_pool.runInteraction( - "get_all_new_events_stream", get_all_new_events_stream_txn - ) - - events = await self.get_events_as_list( - event_to_received_ts.keys(), - get_prev_content=get_prev_content, + "get_all_new_event_ids_stream", get_all_new_event_ids_stream_txn ) - return upper_bound, events, event_to_received_ts + return upper_bound, event_to_received_ts async def get_federation_out_pos(self, typ: str) -> int: if self._need_to_reset_federation_stream_positions: @@ -1199,8 +1219,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): `to_token`), or `limit` is zero. """ - assert int(limit) >= 0 - # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -1279,8 +1297,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Multiple labels could cause the same event to appear multiple times. needs_distinct = True - # If there is a filter on relation_senders and relation_types join to the - # relations table. + # If there is a relation_senders and relation_types filter join to the + # relations table to get events related to the current event. if event_filter and ( event_filter.related_by_senders or event_filter.related_by_rel_types ): @@ -1295,6 +1313,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id) """ + # If there is a not_rel_types filter join to the relations table to get + # the event's relation information. + if event_filter and (event_filter.rel_types or event_filter.not_rel_types): + join_clause += """ + LEFT JOIN event_relations AS event_relation USING (event_id) + """ + if needs_distinct: select_keywords += " DISTINCT" @@ -1331,21 +1356,22 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if rows: topo = rows[-1].topological_ordering - toke = rows[-1].stream_ordering + token = rows[-1].stream_ordering if direction == "b": # Tokens are positions between events. # This token points *after* the last event in the chunk. # We need it to point to the event before it in the chunk # when we are going backwards so we subtract one from the # stream part. - toke -= 1 - next_token = RoomStreamToken(topo, toke) + token -= 1 + next_token = RoomStreamToken(topo, token) else: # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token return rows, next_token + @trace async def paginate_room_events( self, room_id: str, diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): retry_interval: how long until next retry in ms """ - if self.database_engine.can_native_upsert: - await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings_native, - destination, - failure_ts, - retry_last_ts, - retry_interval, - db_autocommit=True, # Safe as its a single upsert - ) - else: - await self.db_pool.runInteraction( - "set_destination_retry_timings", - self._set_destination_retry_timings_emulated, - destination, - failure_ts, - retry_last_ts, - retry_interval, - ) + await self.db_pool.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings_native, + destination, + failure_ts, + retry_last_ts, + retry_interval, + db_autocommit=True, # Safe as it's a single upsert + ) def _set_destination_retry_timings_native( self, @@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): retry_last_ts: int, retry_interval: int, ) -> None: - assert self.database_engine.can_native_upsert - # Upsert retry time interval if retry_interval is zero (i.e. we're # resetting it) or greater than the existing retry interval. # diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index ddb25b5cea..698d6f7515 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -185,9 +185,8 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): - who should be in the user_directory. Args: - progress (dict) - batch_size (int): Maximum number of state events to process - per cycle. + progress + batch_size: Maximum number of state events to process per cycle. Returns: number of events processed. @@ -708,10 +707,10 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): Returns the rooms that a user is in. Args: - user_id(str): Must be a local user + user_id: Must be a local user Returns: - list: user_id + List of room IDs """ rows = await self.db_pool.simple_select_onecol( table="users_who_share_private_rooms", diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index a7fcc564a9..4a4ad0f492 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -93,13 +93,6 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): results: Dict[int, MutableStateMap[str]] = {group: {} for group in groups} - where_clause, where_args = state_filter.make_sql_filter_clause() - - # Unless the filter clause is empty, we're going to append it after an - # existing where clause - if where_clause: - where_clause = " AND (%s)" % (where_clause,) - if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is # a temporary hack until we can add the right indices in @@ -110,31 +103,91 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): # against `state_groups_state` to fetch the latest state. # It assumes that previous state groups are always numerically # lesser. - # The PARTITION is used to get the event_id in the greatest state - # group for the given type, state_key. # This may return multiple rows per (type, state_key), but last_value # should be the same. sql = """ - WITH RECURSIVE state(state_group) AS ( + WITH RECURSIVE sgs(state_group) AS ( VALUES(?::bigint) UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s + SELECT prev_state_group FROM state_group_edges e, sgs s WHERE s.state_group = e.state_group ) - SELECT DISTINCT ON (type, state_key) - type, state_key, event_id - FROM state_groups_state - WHERE state_group IN ( - SELECT state_group FROM state - ) %s - ORDER BY type, state_key, state_group DESC + %s """ + overall_select_query_args: List[Union[int, str]] = [] + + # This is an optimization to create a select clause per-condition. This + # makes the query planner a lot smarter on what rows should pull out in the + # first place and we end up with something that takes 10x less time to get a + # result. + use_condition_optimization = ( + not state_filter.include_others and not state_filter.is_full() + ) + state_filter_condition_combos: List[Tuple[str, Optional[str]]] = [] + # We don't need to caclculate this list if we're not using the condition + # optimization + if use_condition_optimization: + for etype, state_keys in state_filter.types.items(): + if state_keys is None: + state_filter_condition_combos.append((etype, None)) + else: + for state_key in state_keys: + state_filter_condition_combos.append((etype, state_key)) + # And here is the optimization itself. We don't want to do the optimization + # if there are too many individual conditions. 10 is an arbitrary number + # with no testing behind it but we do know that we specifically made this + # optimization for when we grab the necessary state out for + # `filter_events_for_client` which just uses 2 conditions + # (`EventTypes.RoomHistoryVisibility` and `EventTypes.Member`). + if use_condition_optimization and len(state_filter_condition_combos) < 10: + select_clause_list: List[str] = [] + for etype, skey in state_filter_condition_combos: + if skey is None: + where_clause = "(type = ?)" + overall_select_query_args.extend([etype]) + else: + where_clause = "(type = ? AND state_key = ?)" + overall_select_query_args.extend([etype, skey]) + + select_clause_list.append( + f""" + ( + SELECT DISTINCT ON (type, state_key) + type, state_key, event_id + FROM state_groups_state + INNER JOIN sgs USING (state_group) + WHERE {where_clause} + ORDER BY type, state_key, state_group DESC + ) + """ + ) + + overall_select_clause = " UNION ".join(select_clause_list) + else: + where_clause, where_args = state_filter.make_sql_filter_clause() + # Unless the filter clause is empty, we're going to append it after an + # existing where clause + if where_clause: + where_clause = " AND (%s)" % (where_clause,) + + overall_select_query_args.extend(where_args) + + overall_select_clause = f""" + SELECT DISTINCT ON (type, state_key) + type, state_key, event_id + FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM sgs + ) {where_clause} + ORDER BY type, state_key, state_group DESC + """ + for group in groups: args: List[Union[int, str]] = [group] - args.extend(where_args) + args.extend(overall_select_query_args) - txn.execute(sql % (where_clause,), args) + txn.execute(sql % (overall_select_clause,), args) for row in txn: typ, state_key, event_id = row key = (intern_string(typ), intern_string(state_key)) @@ -142,6 +195,12 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): else: max_entries_returned = state_filter.max_entries_returned() + where_clause, where_args = state_filter.make_sql_filter_clause() + # Unless the filter clause is empty, we're going to append it after an + # existing where clause + if where_clause: + where_clause = " AND (%s)" % (where_clause,) + # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index bb64543c1f..f8cfcaca83 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -31,6 +31,7 @@ from synapse.storage.util.sequence import build_sequence_generator from synapse.types import MutableStateMap, StateKey, StateMap from synapse.util.caches.descriptors import cached from synapse.util.caches.dictionary_cache import DictionaryCache +from synapse.util.cancellation import cancellable if TYPE_CHECKING: from synapse.server import HomeServer @@ -156,6 +157,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "get_state_group_delta", _get_state_group_delta_txn ) + @cancellable async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter ) -> Dict[int, StateMap[str]]: @@ -235,6 +237,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types + @cancellable async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None ) -> Dict[int, MutableStateMap[str]]: