summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/account_validity.py99
-rw-r--r--synapse/handlers/event_auth.py13
-rw-r--r--synapse/handlers/federation.py29
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/register.py10
-rw-r--r--synapse/handlers/room.py112
-rw-r--r--synapse/handlers/sync.py9
-rw-r--r--synapse/handlers/user_directory.py323
9 files changed, 411 insertions, 190 deletions
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py

index 33e45e3a11..4aa4ebf7e4 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py
@@ -15,9 +15,7 @@ import email.mime.multipart import email.utils import logging -from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Tuple - -from twisted.web.http import Request +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -30,25 +28,17 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -# Types for callbacks to be registered via the module api -IS_USER_EXPIRED_CALLBACK = Callable[[str], Awaitable[Optional[bool]]] -ON_USER_REGISTRATION_CALLBACK = Callable[[str], Awaitable] -# Temporary hooks to allow for a transition from `/_matrix/client` endpoints -# to `/_synapse/client/account_validity`. See `register_account_validity_callbacks`. -ON_LEGACY_SEND_MAIL_CALLBACK = Callable[[str], Awaitable] -ON_LEGACY_RENEW_CALLBACK = Callable[[str], Awaitable[Tuple[bool, bool, int]]] -ON_LEGACY_ADMIN_REQUEST = Callable[[Request], Awaitable] - class AccountValidityHandler: def __init__(self, hs: "HomeServer"): self.hs = hs self.config = hs.config - self.store = self.hs.get_datastores().main - self.send_email_handler = self.hs.get_send_email_handler() - self.clock = self.hs.get_clock() + self.store = hs.get_datastores().main + self.send_email_handler = hs.get_send_email_handler() + self.clock = hs.get_clock() - self._app_name = self.hs.config.email.email_app_name + self._app_name = hs.config.email.email_app_name + self._module_api_callbacks = hs.get_module_api_callbacks().account_validity self._account_validity_enabled = ( hs.config.account_validity.account_validity_enabled @@ -78,69 +68,6 @@ class AccountValidityHandler: if hs.config.worker.run_background_tasks: self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000) - self._is_user_expired_callbacks: List[IS_USER_EXPIRED_CALLBACK] = [] - self._on_user_registration_callbacks: List[ON_USER_REGISTRATION_CALLBACK] = [] - self._on_legacy_send_mail_callback: Optional[ - ON_LEGACY_SEND_MAIL_CALLBACK - ] = None - self._on_legacy_renew_callback: Optional[ON_LEGACY_RENEW_CALLBACK] = None - - # The legacy admin requests callback isn't a protected attribute because we need - # to access it from the admin servlet, which is outside of this handler. - self.on_legacy_admin_request_callback: Optional[ON_LEGACY_ADMIN_REQUEST] = None - - def register_account_validity_callbacks( - self, - is_user_expired: Optional[IS_USER_EXPIRED_CALLBACK] = None, - on_user_registration: Optional[ON_USER_REGISTRATION_CALLBACK] = None, - on_legacy_send_mail: Optional[ON_LEGACY_SEND_MAIL_CALLBACK] = None, - on_legacy_renew: Optional[ON_LEGACY_RENEW_CALLBACK] = None, - on_legacy_admin_request: Optional[ON_LEGACY_ADMIN_REQUEST] = None, - ) -> None: - """Register callbacks from module for each hook.""" - if is_user_expired is not None: - self._is_user_expired_callbacks.append(is_user_expired) - - if on_user_registration is not None: - self._on_user_registration_callbacks.append(on_user_registration) - - # The builtin account validity feature exposes 3 endpoints (send_mail, renew, and - # an admin one). As part of moving the feature into a module, we need to change - # the path from /_matrix/client/unstable/account_validity/... to - # /_synapse/client/account_validity, because: - # - # * the feature isn't part of the Matrix spec thus shouldn't live under /_matrix - # * the way we register servlets means that modules can't register resources - # under /_matrix/client - # - # We need to allow for a transition period between the old and new endpoints - # in order to allow for clients to update (and for emails to be processed). - # - # Once the email-account-validity module is loaded, it will take control of account - # validity by moving the rows from our `account_validity` table into its own table. - # - # Therefore, we need to allow modules (in practice just the one implementing the - # email-based account validity) to temporarily hook into the legacy endpoints so we - # can route the traffic coming into the old endpoints into the module, which is - # why we have the following three temporary hooks. - if on_legacy_send_mail is not None: - if self._on_legacy_send_mail_callback is not None: - raise RuntimeError("Tried to register on_legacy_send_mail twice") - - self._on_legacy_send_mail_callback = on_legacy_send_mail - - if on_legacy_renew is not None: - if self._on_legacy_renew_callback is not None: - raise RuntimeError("Tried to register on_legacy_renew twice") - - self._on_legacy_renew_callback = on_legacy_renew - - if on_legacy_admin_request is not None: - if self.on_legacy_admin_request_callback is not None: - raise RuntimeError("Tried to register on_legacy_admin_request twice") - - self.on_legacy_admin_request_callback = on_legacy_admin_request - async def is_user_expired(self, user_id: str) -> bool: """Checks if a user has expired against third-party modules. @@ -150,7 +77,7 @@ class AccountValidityHandler: Returns: Whether the user has expired. """ - for callback in self._is_user_expired_callbacks: + for callback in self._module_api_callbacks.is_user_expired_callbacks: expired = await delay_cancellation(callback(user_id)) if expired is not None: return expired @@ -168,7 +95,7 @@ class AccountValidityHandler: Args: user_id: The ID of the newly registered user. """ - for callback in self._on_user_registration_callbacks: + for callback in self._module_api_callbacks.on_user_registration_callbacks: await callback(user_id) @wrap_as_background_process("send_renewals") @@ -198,8 +125,8 @@ class AccountValidityHandler: """ # If a module supports sending a renewal email from here, do that, otherwise do # the legacy dance. - if self._on_legacy_send_mail_callback is not None: - await self._on_legacy_send_mail_callback(user_id) + if self._module_api_callbacks.on_legacy_send_mail_callback is not None: + await self._module_api_callbacks.on_legacy_send_mail_callback(user_id) return if not self._account_validity_renew_by_email_enabled: @@ -336,8 +263,10 @@ class AccountValidityHandler: """ # If a module supports triggering a renew from here, do that, otherwise do the # legacy dance. - if self._on_legacy_renew_callback is not None: - return await self._on_legacy_renew_callback(renewal_token) + if self._module_api_callbacks.on_legacy_renew_callback is not None: + return await self._module_api_callbacks.on_legacy_renew_callback( + renewal_token + ) try: ( diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index c508861b6a..0db0bd7304 100644 --- a/synapse/handlers/event_auth.py +++ b/synapse/handlers/event_auth.py
@@ -63,9 +63,18 @@ class EventAuthHandler: self._store, event, batched_auth_events ) auth_event_ids = event.auth_event_ids() - auth_events_by_id = await self._store.get_events(auth_event_ids) + if batched_auth_events: - auth_events_by_id.update(batched_auth_events) + # Copy the batched auth events to avoid mutating them. + auth_events_by_id = dict(batched_auth_events) + needed_auth_event_ids = set(auth_event_ids) - set(batched_auth_events) + if needed_auth_event_ids: + auth_events_by_id.update( + await self._store.get_events(needed_auth_event_ids) + ) + else: + auth_events_by_id = await self._store.get_events(auth_event_ids) + check_state_dependent_auth_rules(event, auth_events_by_id.values()) def compute_auth_events( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 50f8041f17..dedcc620ac 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -392,7 +392,7 @@ class FederationHandler: get_prev_content=False, ) - # We set `check_history_visibility_only` as we might otherwise get false + # We unset `filter_out_erased_senders` as we might otherwise get false # positives from users having been erased. filtered_extremities = await filter_events_for_server( self._storage_controllers, @@ -400,7 +400,8 @@ class FederationHandler: self.server_name, events_to_check, redact=False, - check_history_visibility_only=True, + filter_out_erased_senders=False, + filter_out_remote_partial_state_events=False, ) if filtered_extremities: extremities_to_request.append(bp.event_id) @@ -1333,7 +1334,13 @@ class FederationHandler: ) events = await filter_events_for_server( - self._storage_controllers, origin, self.server_name, events + self._storage_controllers, + origin, + self.server_name, + events, + redact=True, + filter_out_erased_senders=True, + filter_out_remote_partial_state_events=True, ) return events @@ -1364,7 +1371,13 @@ class FederationHandler: await self._event_auth_handler.assert_host_in_room(event.room_id, origin) events = await filter_events_for_server( - self._storage_controllers, origin, self.server_name, [event] + self._storage_controllers, + origin, + self.server_name, + [event], + redact=True, + filter_out_erased_senders=True, + filter_out_remote_partial_state_events=True, ) event = events[0] return event @@ -1392,7 +1405,13 @@ class FederationHandler: ) missing_events = await filter_events_for_server( - self._storage_controllers, origin, self.server_name, missing_events + self._storage_controllers, + origin, + self.server_name, + missing_events, + redact=True, + filter_out_erased_senders=True, + filter_out_remote_partial_state_events=True, ) return missing_events diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 8c79c055ba..63b35c8d62 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py
@@ -683,7 +683,7 @@ class PaginationHandler: await self._storage_controllers.purge_events.purge_room(room_id) - logger.info("complete") + logger.info("purge complete for room_id %s", room_id) self._delete_by_id[delete_id].status = DeleteStatus.STATUS_COMPLETE except Exception: f = Failure() diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4bf9a047a3..9a81a77cbd 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py
@@ -63,7 +63,7 @@ class ProfileHandler: self._third_party_rules = hs.get_third_party_event_rules() - async def get_profile(self, user_id: str) -> JsonDict: + async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict: target_user = UserID.from_string(user_id) if self.hs.is_mine(target_user): @@ -81,7 +81,7 @@ class ProfileHandler: destination=target_user.domain, query_type="profile", args={"user_id": user_id}, - ignore_backoff=True, + ignore_backoff=ignore_backoff, ) return result except RequestSendFailed as e: diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index e4e506e62c..6b110dcb6e 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py
@@ -596,14 +596,20 @@ class RegistrationHandler: Args: user_id: The user to join """ + # If there are no rooms to auto-join, just bail. + if not self.hs.config.registration.auto_join_rooms: + return + # auto-join the user to any rooms we're supposed to dump them into # try to create the room if we're the first real user on the server. Note # that an auto-generated support or bot user is not a real user and will never be # the user to create the room should_auto_create_rooms = False - is_real_user = await self.store.is_real_user(user_id) - if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user: + if ( + self.hs.config.registration.autocreate_auto_join_rooms + and await self.store.is_real_user(user_id) + ): count = await self.store.count_real_users() should_auto_create_rooms = count == 1 diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c70afa3176..f28a602741 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -570,7 +570,7 @@ class RoomCreationHandler: new_room_id, # we expect to override all the presets with initial_state, so this is # somewhat arbitrary. - preset_config=RoomCreationPreset.PRIVATE_CHAT, + room_config={"preset": RoomCreationPreset.PRIVATE_CHAT}, invite_list=[], initial_state=initial_state, creation_content=creation_content, @@ -905,13 +905,6 @@ class RoomCreationHandler: check_membership=False, ) - preset_config = config.get( - "preset", - RoomCreationPreset.PRIVATE_CHAT - if visibility == "private" - else RoomCreationPreset.PUBLIC_CHAT, - ) - raw_initial_state = config.get("initial_state", []) initial_state = OrderedDict() @@ -930,7 +923,7 @@ class RoomCreationHandler: ) = await self._send_events_for_new_room( requester, room_id, - preset_config=preset_config, + room_config=config, invite_list=invite_list, initial_state=initial_state, creation_content=creation_content, @@ -939,48 +932,6 @@ class RoomCreationHandler: creator_join_profile=creator_join_profile, ) - if "name" in config: - name = config["name"] - ( - name_event, - last_stream_id, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.Name, - "room_id": room_id, - "sender": user_id, - "state_key": "", - "content": {"name": name}, - }, - ratelimit=False, - prev_event_ids=[last_sent_event_id], - depth=depth, - ) - last_sent_event_id = name_event.event_id - depth += 1 - - if "topic" in config: - topic = config["topic"] - ( - topic_event, - last_stream_id, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.Topic, - "room_id": room_id, - "sender": user_id, - "state_key": "", - "content": {"topic": topic}, - }, - ratelimit=False, - prev_event_ids=[last_sent_event_id], - depth=depth, - ) - last_sent_event_id = topic_event.event_id - depth += 1 - # we avoid dropping the lock between invites, as otherwise joins can # start coming in and making the createRoom slow. # @@ -1048,7 +999,7 @@ class RoomCreationHandler: self, creator: Requester, room_id: str, - preset_config: str, + room_config: JsonDict, invite_list: List[str], initial_state: MutableStateMap, creation_content: JsonDict, @@ -1065,11 +1016,33 @@ class RoomCreationHandler: Rate limiting should already have been applied by this point. + Args: + creator: + the user requesting the room creation + room_id: + room id for the room being created + room_config: + A dict of configuration options. This will be the body of + a /createRoom request; see + https://spec.matrix.org/latest/client-server-api/#post_matrixclientv3createroom + invite_list: + a list of user ids to invite to the room + initial_state: + A list of state events to set in the new room. + creation_content: + Extra keys, such as m.federate, to be added to the content of the m.room.create event. + room_alias: + alias for the room + power_level_content_override: + The power level content to override in the default power level event. + creator_join_profile: + Set to override the displayname and avatar for the creating + user in this room. + Returns: A tuple containing the stream ID, event ID and depth of the last event sent to the room. """ - creator_id = creator.user.to_string() event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""} depth = 1 @@ -1080,9 +1053,6 @@ class RoomCreationHandler: # created (but not persisted to the db) to determine state for future created events # (as this info can't be pulled from the db) state_map: MutableStateMap[str] = {} - # current_state_group of last event created. Used for computing event context of - # events to be batched - current_state_group: Optional[int] = None def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict: e = {"type": etype, "content": content} @@ -1129,7 +1099,9 @@ class RoomCreationHandler: event_dict, prev_event_ids=prev_event, depth=depth, - state_map=state_map, + # Take a copy to ensure each event gets a unique copy of + # state_map since it is modified below. + state_map=dict(state_map), for_batch=for_batch, ) @@ -1139,6 +1111,14 @@ class RoomCreationHandler: return new_event, new_unpersisted_context, third_party_event + visibility = room_config.get("visibility", "private") + preset_config = room_config.get( + "preset", + RoomCreationPreset.PRIVATE_CHAT + if visibility == "private" + else RoomCreationPreset.PUBLIC_CHAT, + ) + try: config = self._presets_dict[preset_config] except KeyError: @@ -1327,6 +1307,24 @@ class RoomCreationHandler: context = await unpersisted_context.persist(event) events_to_send.append((event, context)) + if "name" in room_config: + name = room_config["name"] + name_event, name_context = await create_event( + EventTypes.Name, + {"name": name}, + True, + ) + events_to_send.append((name_event, name_context)) + + if "topic" in room_config: + topic = room_config["topic"] + topic_event, topic_context = await create_event( + EventTypes.Topic, + {"topic": topic}, + True, + ) + events_to_send.append((topic_event, topic_context)) + datastore = self.hs.get_datastores().state events_and_context = ( await UnpersistedEventContext.batch_persist_unpersisted_contexts( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index fd6d946c37..9f5b83ed54 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -1226,6 +1226,10 @@ class SyncHandler: continue event_with_membership_auth = events_with_membership_auth[member] + is_create = ( + event_with_membership_auth.is_state() + and event_with_membership_auth.type == EventTypes.Create + ) is_join = ( event_with_membership_auth.is_state() and event_with_membership_auth.type == EventTypes.Member @@ -1233,9 +1237,10 @@ class SyncHandler: and event_with_membership_auth.content.get("membership") == Membership.JOIN ) - if not is_join: + if not is_create and not is_join: # The event must include the desired membership as an auth event, unless - # it's the first join event for a given user. + # it's the `m.room.create` event for a room or the first join event for + # a given user. missing_members.add(member) auth_event_ids.update(event_with_membership_auth.auth_event_ids()) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 3610b6bf78..28a92d41d6 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -13,21 +13,52 @@ # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +from twisted.internet.interfaces import IDelayedCall + import synapse.metrics from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership +from synapse.api.errors import Codes, SynapseError from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main.user_directory import SearchResult from synapse.storage.roommember import ProfileInfo +from synapse.types import UserID from synapse.util.metrics import Measure +from synapse.util.retryutils import NotRetryingDestination +from synapse.util.stringutils import non_null_str_or_none if TYPE_CHECKING: from synapse.server import HomeServer logger = logging.getLogger(__name__) +# Don't refresh a stale user directory entry, using a Federation /profile request, +# for 60 seconds. This gives time for other state events to arrive (which will +# then be coalesced such that only one /profile request is made). +USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000 + +# Maximum number of remote servers that we will attempt to refresh profiles for +# in one go. +MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5 + +# As long as we have servers to refresh (without backoff), keep adding more +# every 15 seconds. +INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15 + + +def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int: + """ + Calculates the time of a next retry given `now_ts` in ms and the number + of failures encountered thus far. + + Currently the sequence goes: + 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week + """ + return now_ts + 60_000 * (5 ** min(retry_count, 7)) + class UserDirectoryHandler(StateDeltasHandler): """Handles queries and updates for the user_directory. @@ -64,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler): self.update_user_directory = hs.config.worker.should_update_user_directory self.search_all_users = hs.config.userdirectory.user_directory_search_all_users self.spam_checker = hs.get_spam_checker() + self._hs = hs + # The current position in the current_state_delta stream self.pos: Optional[int] = None # Guard to ensure we only process deltas one at a time self._is_processing = False + # Guard to ensure we only have one process for refreshing remote profiles + self._is_refreshing_remote_profiles = False + # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process` + self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None + + # Guard to ensure we only have one process for refreshing remote profiles + # for the given servers. + # Set of server names. + self._is_refreshing_remote_profiles_for_servers: Set[str] = set() + if self.update_user_directory: self.notifier.add_replication_callback(self.notify_new_event) @@ -77,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) + # Kick off the profile refresh process on startup + self._refresh_remote_profiles_call_later = self.clock.call_later( + 10, self.kick_off_remote_profile_refresh_process + ) + async def search_users( self, user_id: str, search_term: str, limit: int ) -> SearchResult: @@ -200,8 +248,8 @@ class UserDirectoryHandler(StateDeltasHandler): typ = delta["type"] state_key = delta["state_key"] room_id = delta["room_id"] - event_id = delta["event_id"] - prev_event_id = delta["prev_event_id"] + event_id: Optional[str] = delta["event_id"] + prev_event_id: Optional[str] = delta["prev_event_id"] logger.debug("Handling: %r %r, %s", typ, state_key, event_id) @@ -297,8 +345,8 @@ class UserDirectoryHandler(StateDeltasHandler): async def _handle_room_membership_event( self, room_id: str, - prev_event_id: str, - event_id: str, + prev_event_id: Optional[str], + event_id: Optional[str], state_key: str, ) -> None: """Process a single room membershp event. @@ -348,7 +396,8 @@ class UserDirectoryHandler(StateDeltasHandler): # Handle any profile changes for remote users. # (For local users the rest of the application calls # `handle_local_profile_change`.) - if is_remote: + # Only process if there is an event_id. + if is_remote and event_id is not None: await self._handle_possible_remote_profile_change( state_key, room_id, prev_event_id, event_id ) @@ -356,29 +405,13 @@ class UserDirectoryHandler(StateDeltasHandler): # This may be the first time we've seen a remote user. If # so, ensure we have a directory entry for them. (For local users, # the rest of the application calls `handle_local_profile_change`.) - if is_remote: - await self._upsert_directory_entry_for_remote_user(state_key, event_id) + # Only process if there is an event_id. + if is_remote and event_id is not None: + await self._handle_possible_remote_profile_change( + state_key, room_id, None, event_id + ) await self._track_user_joined_room(room_id, state_key) - async def _upsert_directory_entry_for_remote_user( - self, user_id: str, event_id: str - ) -> None: - """A remote user has just joined a room. Ensure they have an entry in - the user directory. The caller is responsible for making sure they're - remote. - """ - event = await self.store.get_event(event_id, allow_none=True) - # It isn't expected for this event to not exist, but we - # don't want the entire background process to break. - if event is None: - return - - logger.debug("Adding new user to dir, %r", user_id) - - await self.store.update_profile_in_user_dir( - user_id, event.content.get("displayname"), event.content.get("avatar_url") - ) - async def _track_user_joined_room(self, room_id: str, joining_user_id: str) -> None: """Someone's just joined a room. Update `users_in_public_rooms` or `users_who_share_private_rooms` as appropriate. @@ -460,14 +493,17 @@ class UserDirectoryHandler(StateDeltasHandler): user_id: str, room_id: str, prev_event_id: Optional[str], - event_id: Optional[str], + event_id: str, ) -> None: """Check member event changes for any profile changes and update the database if there are. This is intended for remote users only. The caller is responsible for checking that the given user is remote. """ - if not prev_event_id or not event_id: - return + + if not prev_event_id: + # If we don't have an older event to fall back on, just fetch the same + # event itself. + prev_event_id = event_id prev_event = await self.store.get_event(prev_event_id, allow_none=True) event = await self.store.get_event(event_id, allow_none=True) @@ -478,17 +514,236 @@ class UserDirectoryHandler(StateDeltasHandler): if event.membership != Membership.JOIN: return + is_public = await self.store.is_room_world_readable_or_publicly_joinable( + room_id + ) + if not is_public: + # Don't collect user profiles from private rooms as they are not guaranteed + # to be the same as the user's global profile. + now_ts = self.clock.time_msec() + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS, + retry_counter=0, + ) + # Schedule a wake-up to refresh the user directory for this server. + # We intentionally wake up this server directly because we don't want + # other servers ahead of it in the queue to get in the way of updating + # the profile if the server only just sent us an event. + self.clock.call_later( + USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + self.kick_off_remote_profile_refresh_process_for_remote_server, + UserID.from_string(user_id).domain, + ) + # Schedule a wake-up to handle any backoffs that may occur in the future. + self.clock.call_later( + 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1, + self.kick_off_remote_profile_refresh_process, + ) + return + prev_name = prev_event.content.get("displayname") new_name = event.content.get("displayname") - # If the new name is an unexpected form, do not update the directory. + # If the new name is an unexpected form, replace with None. if not isinstance(new_name, str): - new_name = prev_name + new_name = None prev_avatar = prev_event.content.get("avatar_url") new_avatar = event.content.get("avatar_url") - # If the new avatar is an unexpected form, do not update the directory. + # If the new avatar is an unexpected form, replace with None. if not isinstance(new_avatar, str): - new_avatar = prev_avatar + new_avatar = None - if prev_name != new_name or prev_avatar != new_avatar: + if ( + prev_name != new_name + or prev_avatar != new_avatar + or prev_event_id == event_id + ): + # Only update if something has changed, or we didn't have a previous event + # in the first place. await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar) + + def kick_off_remote_profile_refresh_process(self) -> None: + """Called when there may be remote users with stale profiles to be refreshed""" + if not self.update_user_directory: + return + + if self._is_refreshing_remote_profiles: + return + + if self._refresh_remote_profiles_call_later: + if self._refresh_remote_profiles_call_later.active(): + self._refresh_remote_profiles_call_later.cancel() + self._refresh_remote_profiles_call_later = None + + async def process() -> None: + try: + await self._unsafe_refresh_remote_profiles() + finally: + self._is_refreshing_remote_profiles = False + + self._is_refreshing_remote_profiles = True + run_as_background_process("user_directory.refresh_remote_profiles", process) + + async def _unsafe_refresh_remote_profiles(self) -> None: + limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len( + self._is_refreshing_remote_profiles_for_servers + ) + if limit <= 0: + # nothing to do: already refreshing the maximum number of servers + # at once. + # Come back later. + self._refresh_remote_profiles_call_later = self.clock.call_later( + INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, + self.kick_off_remote_profile_refresh_process, + ) + return + + servers_to_refresh = ( + await self.store.get_remote_servers_with_profiles_to_refresh( + now_ts=self.clock.time_msec(), limit=limit + ) + ) + + if not servers_to_refresh: + # Do we have any backing-off servers that we should try again + # for eventually? + # By setting `now` is a point in the far future, we can ask for + # which server/user is next to be refreshed, even though it is + # not actually refreshable *now*. + end_of_time = 1 << 62 + backing_off_servers = ( + await self.store.get_remote_servers_with_profiles_to_refresh( + now_ts=end_of_time, limit=1 + ) + ) + if backing_off_servers: + # Find out when the next user is refreshable and schedule a + # refresh then. + backing_off_server_name = backing_off_servers[0] + users = await self.store.get_remote_users_to_refresh_on_server( + backing_off_server_name, now_ts=end_of_time, limit=1 + ) + if not users: + return + _, _, next_try_at_ts = users[0] + self._refresh_remote_profiles_call_later = self.clock.call_later( + ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2, + self.kick_off_remote_profile_refresh_process, + ) + + return + + for server_to_refresh in servers_to_refresh: + self.kick_off_remote_profile_refresh_process_for_remote_server( + server_to_refresh + ) + + self._refresh_remote_profiles_call_later = self.clock.call_later( + INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES, + self.kick_off_remote_profile_refresh_process, + ) + + def kick_off_remote_profile_refresh_process_for_remote_server( + self, server_name: str + ) -> None: + """Called when there may be remote users with stale profiles to be refreshed + on the given server.""" + if not self.update_user_directory: + return + + if server_name in self._is_refreshing_remote_profiles_for_servers: + return + + async def process() -> None: + try: + await self._unsafe_refresh_remote_profiles_for_remote_server( + server_name + ) + finally: + self._is_refreshing_remote_profiles_for_servers.remove(server_name) + + self._is_refreshing_remote_profiles_for_servers.add(server_name) + run_as_background_process( + "user_directory.refresh_remote_profiles_for_remote_server", process + ) + + async def _unsafe_refresh_remote_profiles_for_remote_server( + self, server_name: str + ) -> None: + logger.info("Refreshing profiles in user directory for %s", server_name) + + while True: + # Get a handful of users to process. + next_batch = await self.store.get_remote_users_to_refresh_on_server( + server_name, now_ts=self.clock.time_msec(), limit=10 + ) + if not next_batch: + # Finished for now + return + + for user_id, retry_counter, _ in next_batch: + # Request the profile of the user. + try: + profile = await self._hs.get_profile_handler().get_profile( + user_id, ignore_backoff=False + ) + except NotRetryingDestination as e: + logger.info( + "Failed to refresh profile for %r because the destination is undergoing backoff", + user_id, + ) + # As a special-case, we back off until the destination is no longer + # backed off from. + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + e.retry_last_ts + e.retry_interval, + retry_counter=retry_counter + 1, + ) + continue + except SynapseError as e: + if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND: + # The profile doesn't exist. + # TODO Does this mean we should clear it from our user + # directory? + await self.store.clear_remote_user_profile_in_user_dir_stale( + user_id + ) + logger.warning( + "Refresh of remote profile %r: not found (%r)", + user_id, + e.msg, + ) + continue + + logger.warning( + "Failed to refresh profile for %r because %r", user_id, e + ) + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + calculate_time_of_next_retry( + self.clock.time_msec(), retry_counter + 1 + ), + retry_counter=retry_counter + 1, + ) + continue + except Exception: + logger.error( + "Failed to refresh profile for %r due to unhandled exception", + user_id, + exc_info=True, + ) + await self.store.set_remote_user_profile_in_user_dir_stale( + user_id, + calculate_time_of_next_retry( + self.clock.time_msec(), retry_counter + 1 + ), + retry_counter=retry_counter + 1, + ) + continue + + await self.store.update_profile_in_user_dir( + user_id, + display_name=non_null_str_or_none(profile.get("displayname")), + avatar_url=non_null_str_or_none(profile.get("avatar_url")), + )