From bb4b11846f3bdd539a1671eb8f1db8ee1a0bf57a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 29 Apr 2021 07:17:28 -0400 Subject: Add missing type hints to handlers and fix a Spam Checker type hint. (#9896) The user_may_create_room_alias method on spam checkers declared the room_alias parameter as a str when in reality it is passed a RoomAlias object. --- synapse/handlers/message.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) (limited to 'synapse/handlers/message.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ec8eb21674..49f8aa25ea 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,7 +15,7 @@ # limitations under the License. import logging import random -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json @@ -66,7 +66,7 @@ logger = logging.getLogger(__name__) class MessageHandler: """Contains some read only APIs to get state about a room""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.clock = hs.get_clock() self.state = hs.get_state_handler() @@ -91,7 +91,7 @@ class MessageHandler: room_id: str, event_type: str, state_key: str, - ) -> dict: + ) -> Optional[EventBase]: """Get data from a room. Args: @@ -115,6 +115,10 @@ class MessageHandler: data = await self.state.get_current_state(room_id, event_type, state_key) elif membership == Membership.LEAVE: key = (event_type, state_key) + # If the membership is not JOIN, then the event ID should exist. + assert ( + membership_event_id is not None + ), "check_user_in_room_or_world_readable returned invalid data" room_state = await self.state_store.get_state_for_events( [membership_event_id], StateFilter.from_types([key]) ) @@ -186,10 +190,12 @@ class MessageHandler: event = last_events[0] if visible_events: - room_state = await self.state_store.get_state_for_events( + room_state_events = await self.state_store.get_state_for_events( [event.event_id], state_filter=state_filter ) - room_state = room_state[event.event_id] + room_state = room_state_events[ + event.event_id + ] # type: Mapping[Any, EventBase] else: raise AuthError( 403, @@ -210,10 +216,14 @@ class MessageHandler: ) room_state = await self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: - room_state = await self.state_store.get_state_for_events( + # If the membership is not JOIN, then the event ID should exist. + assert ( + membership_event_id is not None + ), "check_user_in_room_or_world_readable returned invalid data" + room_state_events = await self.state_store.get_state_for_events( [membership_event_id], state_filter=state_filter ) - room_state = room_state[membership_event_id] + room_state = room_state_events[membership_event_id] now = self.clock.time_msec() events = await self._event_serializer.serialize_events( -- cgit 1.5.1 From d0aee697ac0587c005bc1048f5036979331f1101 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 16:49:34 +0100 Subject: Use get_current_users_in_room from store and not StateHandler (#9910) --- changelog.d/9910.bugfix | 1 + changelog.d/9910.feature | 1 + synapse/handlers/directory.py | 4 ++-- synapse/handlers/events.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/sync.py | 6 +++--- synapse/state/__init__.py | 10 +++++++--- synapse/storage/_base.py | 1 + synapse/storage/databases/main/roommember.py | 8 ++++++-- synapse/storage/databases/main/user_directory.py | 4 +--- 12 files changed, 26 insertions(+), 17 deletions(-) create mode 100644 changelog.d/9910.bugfix create mode 100644 changelog.d/9910.feature (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/9910.bugfix b/changelog.d/9910.bugfix new file mode 100644 index 0000000000..06d523fd46 --- /dev/null +++ b/changelog.d/9910.bugfix @@ -0,0 +1 @@ +Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession. diff --git a/changelog.d/9910.feature b/changelog.d/9910.feature new file mode 100644 index 0000000000..54165cce18 --- /dev/null +++ b/changelog.d/9910.feature @@ -0,0 +1 @@ +Improve performance after joining a large room when presence is enabled. diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index de1b14cde3..4064a2b859 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -78,7 +78,7 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Add transactions. # TODO(erikj): Check if there is a current association. if not servers: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) servers = {get_domain_from_id(u) for u in users} if not servers: @@ -270,7 +270,7 @@ class DirectoryHandler(BaseHandler): Codes.NOT_FOUND, ) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d82144d7fa..f134f1e234 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler): # Send down presence. if event.state_key == auth_user_id: # Send down presence for everyone in the room. - users = await self.state.get_current_users_in_room( + users = await self.store.get_users_in_room( event.room_id ) # type: Iterable[str] else: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 49f8aa25ea..393f17c3a3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -258,7 +258,7 @@ class MessageHandler: "Getting joined members after leaving is not implemented" ) - users_with_profile = await self.state.get_current_users_in_room(room_id) + users_with_profile = await self.store.get_users_in_room_with_profiles(room_id) # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ebbc234334..8e085dfbec 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1293,7 +1293,7 @@ class PresenceHandler(BasePresenceHandler): remote_host = get_domain_from_id(user_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, users)) states_d = await self.current_state_for_users(user_ids) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5a888b7941..fb4823a5cc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1327,7 +1327,7 @@ class RoomShutdownHandler: new_room_id = None logger.info("Shutting down room %r", room_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) kicked_users = [] failed_to_kick_users = [] for user_id in users: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a9a3ee05c3..0fcc1532da 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1190,7 +1190,7 @@ class SyncHandler: # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: - joined_users = await self.state.get_current_users_in_room(room_id) + joined_users = await self.store.get_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they @@ -1206,7 +1206,7 @@ class SyncHandler: # Now find users that we no longer track for room_id in newly_left_rooms: - left_users = await self.state.get_current_users_in_room(room_id) + left_users = await self.store.get_users_in_room(room_id) newly_left_users.update(left_users) # Remove any users that we still share a room with. @@ -1361,7 +1361,7 @@ class SyncHandler: extra_users_ids = set(newly_joined_or_invited_users) for room_id in newly_joined_rooms: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index b3bd92d37c..a1770f620e 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -213,19 +213,23 @@ class StateHandler: return ret.state async def get_current_users_in_room( - self, room_id: str, latest_event_ids: Optional[List[str]] = None + self, room_id: str, latest_event_ids: List[str] ) -> Dict[str, ProfileInfo]: """ Get the users who are currently in a room. + Note: This is much slower than using the equivalent method + `DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`, + so this should only be used when wanting the users at a particular point + in the room. + Args: room_id: The ID of the room. latest_event_ids: Precomputed list of latest event IDs. Will be computed if None. Returns: Dictionary of user IDs to their profileinfo. """ - if not latest_event_ids: - latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id) + assert latest_event_ids is not None logger.debug("calling resolve_state_groups from get_current_users_in_room") diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 6b68d8720c..3d98d3f5f8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -69,6 +69,7 @@ class SQLBaseStore(metaclass=ABCMeta): self._attempt_to_invalidate_cache("is_host_joined", (room_id, host)) self._attempt_to_invalidate_cache("get_users_in_room", (room_id,)) + self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,)) self._attempt_to_invalidate_cache("get_room_summary", (room_id,)) self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,)) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2a8532f8c1..5fc3bb5a7d 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -205,8 +205,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: sql = """ - SELECT user_id, display_name, avatar_url FROM room_memberships - WHERE room_id = ? AND membership = ? + SELECT state_key, display_name, avatar_url FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? """ txn.execute(sql, (room_id, Membership.JOIN)) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 7a082fdd21..a6bfb4902a 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -142,8 +142,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): batch_size (int): Maximum number of state events to process per cycle. """ - state = self.hs.get_state_handler() - # If we don't have progress filed, delete everything. if not progress: await self.delete_all_from_user_dir() @@ -197,7 +195,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): room_id ) - users_with_profile = await state.get_current_users_in_room(room_id) + users_with_profile = await self.get_users_in_room_with_profiles(room_id) user_ids = set(users_with_profile) # Update each user in the user directory. -- cgit 1.5.1 From de8f0a03a3cc3a2327dfd3058c99e48067965079 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 May 2021 16:53:22 +0100 Subject: Don't set the external cache if its been done recently (#9905) --- changelog.d/9905.feature | 1 + synapse/handlers/federation.py | 4 +++- synapse/handlers/message.py | 34 ++++++++++++++++++++++++++++++---- 3 files changed, 34 insertions(+), 5 deletions(-) create mode 100644 changelog.d/9905.feature (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/9905.feature b/changelog.d/9905.feature new file mode 100644 index 0000000000..96a0e7f09f --- /dev/null +++ b/changelog.d/9905.feature @@ -0,0 +1 @@ +Improve performance of sending events for worker-based deployments using Redis. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d867aaf4d..e8330a2b50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2446,7 +2446,9 @@ class FederationHandler(BaseHandler): # If we are going to send this event over federation we precaclculate # the joined hosts. if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event(event) + await self.event_creation_handler.cache_joined_hosts_for_event( + event, context + ) return context diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 393f17c3a3..8729332d4b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder from synapse.util.async_helpers import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -457,6 +458,19 @@ class EventCreationHandler: self._external_cache = hs.get_external_cache() + # Stores the state groups we've recently added to the joined hosts + # external cache. Note that the timeout must be significantly less than + # the TTL on the external cache. + self._external_cache_joined_hosts_updates = ( + None + ) # type: Optional[ExpiringCache] + if self._external_cache.is_enabled(): + self._external_cache_joined_hosts_updates = ExpiringCache( + "_external_cache_joined_hosts_updates", + self.clock, + expiry_ms=30 * 60 * 1000, + ) + async def create_event( self, requester: Requester, @@ -967,7 +981,7 @@ class EventCreationHandler: await self.action_generator.handle_push_actions_for_event(event, context) - await self.cache_joined_hosts_for_event(event) + await self.cache_joined_hosts_for_event(event, context) try: # If we're a worker we need to hit out to the master. @@ -1008,7 +1022,9 @@ class EventCreationHandler: await self.store.remove_push_actions_from_staging(event.event_id) raise - async def cache_joined_hosts_for_event(self, event: EventBase) -> None: + async def cache_joined_hosts_for_event( + self, event: EventBase, context: EventContext + ) -> None: """Precalculate the joined hosts at the event, when using Redis, so that external federation senders don't have to recalculate it themselves. """ @@ -1016,6 +1032,9 @@ class EventCreationHandler: if not self._external_cache.is_enabled(): return + # If external cache is enabled we should always have this. + assert self._external_cache_joined_hosts_updates is not None + # We actually store two mappings, event ID -> prev state group, # state group -> joined hosts, which is much more space efficient # than event ID -> joined hosts. @@ -1023,16 +1042,21 @@ class EventCreationHandler: # Note: We have to cache event ID -> prev state group, as we don't # store that in the DB. # - # Note: We always set the state group -> joined hosts cache, even if - # we already set it, so that the expiry time is reset. + # Note: We set the state group -> joined hosts cache if it hasn't been + # set for a while, so that the expiry time is reset. state_entry = await self.state.resolve_state_groups_for_events( event.room_id, event_ids=event.prev_event_ids() ) if state_entry.state_group: + if state_entry.state_group in self._external_cache_joined_hosts_updates: + return + joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry) + # Note that the expiry times must be larger than the expiry time in + # _external_cache_joined_hosts_updates. await self._external_cache.set( "event_to_prev_state_group", event.event_id, @@ -1046,6 +1070,8 @@ class EventCreationHandler: expiry_ms=60 * 60 * 1000, ) + self._external_cache_joined_hosts_updates[state_entry.state_group] = None + async def _validate_canonical_alias( self, directory_handler, room_alias_str: str, expected_room_id: str ) -> None: -- cgit 1.5.1