summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-05-06 10:25:53 +0100
committerErik Johnston <erik@matrix.org>2021-05-06 10:25:53 +0100
commita8f48246b64a2019e5167a9427517b05afe718a0 (patch)
tree68325cf8b1f12b42d6252d1314890e78708b9200 /synapse/handlers
parentMerge branch 'release-v1.33.0' of github.com:matrix-org/synapse into matrix-o... (diff)
parentFollow-up to #9915 to correct the identifier for room types. (diff)
downloadsynapse-a8f48246b64a2019e5167a9427517b05afe718a0.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/directory.py63
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/identity.py9
-rw-r--r--synapse/handlers/message.py60
-rw-r--r--synapse/handlers/presence.py311
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_member.py2
-rw-r--r--synapse/handlers/space_summary.py9
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/handlers/ui_auth/checkers.py35
11 files changed, 294 insertions, 209 deletions
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py

index 90932316f3..4064a2b859 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py
@@ -14,7 +14,7 @@ import logging import string -from typing import Iterable, List, Optional +from typing import TYPE_CHECKING, Iterable, List, Optional from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes from synapse.api.errors import ( @@ -27,15 +27,19 @@ from synapse.api.errors import ( SynapseError, ) from synapse.appservice import ApplicationService -from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id +from synapse.storage.databases.main.directory import RoomAliasMapping +from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class DirectoryHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.state = hs.get_state_handler() @@ -60,7 +64,7 @@ class DirectoryHandler(BaseHandler): room_id: str, servers: Optional[Iterable[str]] = None, creator: Optional[str] = None, - ): + ) -> None: # general association creation for both human users and app services for wchar in string.whitespace: @@ -74,7 +78,7 @@ class DirectoryHandler(BaseHandler): # TODO(erikj): Add transactions. # TODO(erikj): Check if there is a current association. if not servers: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) servers = {get_domain_from_id(u) for u in users} if not servers: @@ -104,8 +108,9 @@ class DirectoryHandler(BaseHandler): """ user_id = requester.user.to_string() + room_alias_str = room_alias.to_string() - if len(room_alias.to_string()) > MAX_ALIAS_LENGTH: + if len(room_alias_str) > MAX_ALIAS_LENGTH: raise SynapseError( 400, "Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH, @@ -114,7 +119,7 @@ class DirectoryHandler(BaseHandler): service = requester.app_service if service: - if not service.is_interested_in_alias(room_alias.to_string()): + if not service.is_interested_in_alias(room_alias_str): raise SynapseError( 400, "This application service has not reserved this kind of alias.", @@ -138,7 +143,7 @@ class DirectoryHandler(BaseHandler): raise AuthError(403, "This user is not permitted to create this alias") if not self.config.is_alias_creation_allowed( - user_id, room_id, room_alias.to_string() + user_id, room_id, room_alias_str ): # Lets just return a generic message, as there may be all sorts of # reasons why we said no. TODO: Allow configurable error messages @@ -211,7 +216,7 @@ class DirectoryHandler(BaseHandler): async def delete_appservice_association( self, service: ApplicationService, room_alias: RoomAlias - ): + ) -> None: if not service.is_interested_in_alias(room_alias.to_string()): raise SynapseError( 400, @@ -220,7 +225,7 @@ class DirectoryHandler(BaseHandler): ) await self._delete_association(room_alias) - async def _delete_association(self, room_alias: RoomAlias): + async def _delete_association(self, room_alias: RoomAlias) -> str: if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") @@ -228,17 +233,19 @@ class DirectoryHandler(BaseHandler): return room_id - async def get_association(self, room_alias: RoomAlias): + async def get_association(self, room_alias: RoomAlias) -> JsonDict: room_id = None if self.hs.is_mine(room_alias): - result = await self.get_association_from_room_alias(room_alias) + result = await self.get_association_from_room_alias( + room_alias + ) # type: Optional[RoomAliasMapping] if result: room_id = result.room_id servers = result.servers else: try: - result = await self.federation.make_query( + fed_result = await self.federation.make_query( destination=room_alias.domain, query_type="directory", args={"room_alias": room_alias.to_string()}, @@ -248,13 +255,13 @@ class DirectoryHandler(BaseHandler): except CodeMessageException as e: logging.warning("Error retrieving alias") if e.code == 404: - result = None + fed_result = None else: raise - if result and "room_id" in result and "servers" in result: - room_id = result["room_id"] - servers = result["servers"] + if fed_result and "room_id" in fed_result and "servers" in fed_result: + room_id = fed_result["room_id"] + servers = fed_result["servers"] if not room_id: raise SynapseError( @@ -263,7 +270,7 @@ class DirectoryHandler(BaseHandler): Codes.NOT_FOUND, ) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_servers = {get_domain_from_id(u) for u in users} servers = set(extra_servers) | set(servers) @@ -275,7 +282,7 @@ class DirectoryHandler(BaseHandler): return {"room_id": room_id, "servers": servers} - async def on_directory_query(self, args): + async def on_directory_query(self, args: JsonDict) -> JsonDict: room_alias = RoomAlias.from_string(args["room_alias"]) if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room Alias is not hosted on this homeserver") @@ -293,7 +300,7 @@ class DirectoryHandler(BaseHandler): async def _update_canonical_alias( self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias - ): + ) -> None: """ Send an updated canonical alias event if the removed alias was set as the canonical alias or listed in the alt_aliases field. @@ -344,7 +351,9 @@ class DirectoryHandler(BaseHandler): ratelimit=False, ) - async def get_association_from_room_alias(self, room_alias: RoomAlias): + async def get_association_from_room_alias( + self, room_alias: RoomAlias + ) -> Optional[RoomAliasMapping]: result = await self.store.get_association_from_room_alias(room_alias) if not result: # Query AS to see if it exists @@ -372,7 +381,7 @@ class DirectoryHandler(BaseHandler): # either no interested services, or no service with an exclusive lock return True - async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str): + async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool: """Determine whether a user can delete an alias. One of the following must be true: @@ -394,14 +403,13 @@ class DirectoryHandler(BaseHandler): if not room_id: return False - res = await self.auth.check_can_change_room_list( + return await self.auth.check_can_change_room_list( room_id, UserID.from_string(user_id) ) - return res async def edit_published_room_list( self, requester: Requester, room_id: str, visibility: str - ): + ) -> None: """Edit the entry of the room in the published room list. requester @@ -469,7 +477,7 @@ class DirectoryHandler(BaseHandler): async def edit_published_appservice_room_list( self, appservice_id: str, network_id: str, room_id: str, visibility: str - ): + ) -> None: """Add or remove a room from the appservice/network specific public room list. @@ -499,5 +507,4 @@ class DirectoryHandler(BaseHandler): room_id, requester.user.to_string() ) - aliases = await self.store.get_aliases_for_room(room_id) - return aliases + return await self.store.get_aliases_for_room(room_id) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index d82144d7fa..f134f1e234 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py
@@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler): # Send down presence. if event.state_key == auth_user_id: # Send down presence for everyone in the room. - users = await self.state.get_current_users_in_room( + users = await self.store.get_users_in_room( event.room_id ) # type: Iterable[str] else: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 9d867aaf4d..e8330a2b50 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -2446,7 +2446,9 @@ class FederationHandler(BaseHandler): # If we are going to send this event over federation we precaclculate # the joined hosts. if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event(event) + await self.event_creation_handler.cache_joined_hosts_for_event( + event, context + ) return context diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 0b3b1fadb5..33d16fbf9c 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py
@@ -17,7 +17,7 @@ """Utilities for interacting with Identity Servers""" import logging import urllib.parse -from typing import Awaitable, Callable, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple from synapse.api.errors import ( CodeMessageException, @@ -41,13 +41,16 @@ from synapse.util.stringutils import ( from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) id_server_scheme = "https://" class IdentityHandler(BaseHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) # An HTTP client for contacting trusted URLs. @@ -80,7 +83,7 @@ class IdentityHandler(BaseHandler): request: SynapseRequest, medium: str, address: str, - ): + ) -> None: """Used to ratelimit requests to `/requestToken` by IP and address. Args: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index afd9811fbb..db065ce061 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@ # limitations under the License. import logging import random -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json @@ -51,6 +51,7 @@ from synapse.storage.state import StateFilter from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester from synapse.util import json_decoder, json_encoder from synapse.util.async_helpers import Linearizer +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client @@ -66,7 +67,7 @@ logger = logging.getLogger(__name__) class MessageHandler: """Contains some read only APIs to get state about a room""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.auth = hs.get_auth() self.clock = hs.get_clock() self.state = hs.get_state_handler() @@ -91,7 +92,7 @@ class MessageHandler: room_id: str, event_type: str, state_key: str, - ) -> dict: + ) -> Optional[EventBase]: """Get data from a room. Args: @@ -115,6 +116,10 @@ class MessageHandler: data = await self.state.get_current_state(room_id, event_type, state_key) elif membership == Membership.LEAVE: key = (event_type, state_key) + # If the membership is not JOIN, then the event ID should exist. + assert ( + membership_event_id is not None + ), "check_user_in_room_or_world_readable returned invalid data" room_state = await self.state_store.get_state_for_events( [membership_event_id], StateFilter.from_types([key]) ) @@ -186,10 +191,12 @@ class MessageHandler: event = last_events[0] if visible_events: - room_state = await self.state_store.get_state_for_events( + room_state_events = await self.state_store.get_state_for_events( [event.event_id], state_filter=state_filter ) - room_state = room_state[event.event_id] + room_state = room_state_events[ + event.event_id + ] # type: Mapping[Any, EventBase] else: raise AuthError( 403, @@ -210,10 +217,14 @@ class MessageHandler: ) room_state = await self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: - room_state = await self.state_store.get_state_for_events( + # If the membership is not JOIN, then the event ID should exist. + assert ( + membership_event_id is not None + ), "check_user_in_room_or_world_readable returned invalid data" + room_state_events = await self.state_store.get_state_for_events( [membership_event_id], state_filter=state_filter ) - room_state = room_state[membership_event_id] + room_state = room_state_events[membership_event_id] now = self.clock.time_msec() events = await self._event_serializer.serialize_events( @@ -248,7 +259,7 @@ class MessageHandler: "Getting joined members after leaving is not implemented" ) - users_with_profile = await self.state.get_current_users_in_room(room_id) + users_with_profile = await self.store.get_users_in_room_with_profiles(room_id) # If this is an AS, double check that they are allowed to see the members. # This can either be because the AS user is in the room or because there @@ -447,6 +458,19 @@ class EventCreationHandler: self._external_cache = hs.get_external_cache() + # Stores the state groups we've recently added to the joined hosts + # external cache. Note that the timeout must be significantly less than + # the TTL on the external cache. + self._external_cache_joined_hosts_updates = ( + None + ) # type: Optional[ExpiringCache] + if self._external_cache.is_enabled(): + self._external_cache_joined_hosts_updates = ExpiringCache( + "_external_cache_joined_hosts_updates", + self.clock, + expiry_ms=30 * 60 * 1000, + ) + async def create_event( self, requester: Requester, @@ -957,7 +981,7 @@ class EventCreationHandler: await self.action_generator.handle_push_actions_for_event(event, context) - await self.cache_joined_hosts_for_event(event) + await self.cache_joined_hosts_for_event(event, context) try: # If we're a worker we need to hit out to the master. @@ -998,7 +1022,9 @@ class EventCreationHandler: await self.store.remove_push_actions_from_staging(event.event_id) raise - async def cache_joined_hosts_for_event(self, event: EventBase) -> None: + async def cache_joined_hosts_for_event( + self, event: EventBase, context: EventContext + ) -> None: """Precalculate the joined hosts at the event, when using Redis, so that external federation senders don't have to recalculate it themselves. """ @@ -1006,6 +1032,9 @@ class EventCreationHandler: if not self._external_cache.is_enabled(): return + # If external cache is enabled we should always have this. + assert self._external_cache_joined_hosts_updates is not None + # We actually store two mappings, event ID -> prev state group, # state group -> joined hosts, which is much more space efficient # than event ID -> joined hosts. @@ -1013,16 +1042,21 @@ class EventCreationHandler: # Note: We have to cache event ID -> prev state group, as we don't # store that in the DB. # - # Note: We always set the state group -> joined hosts cache, even if - # we already set it, so that the expiry time is reset. + # Note: We set the state group -> joined hosts cache if it hasn't been + # set for a while, so that the expiry time is reset. state_entry = await self.state.resolve_state_groups_for_events( event.room_id, event_ids=event.prev_event_ids() ) if state_entry.state_group: + if state_entry.state_group in self._external_cache_joined_hosts_updates: + return + joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry) + # Note that the expiry times must be larger than the expiry time in + # _external_cache_joined_hosts_updates. await self._external_cache.set( "event_to_prev_state_group", event.event_id, @@ -1036,6 +1070,8 @@ class EventCreationHandler: expiry_ms=60 * 60 * 1000, ) + self._external_cache_joined_hosts_updates[state_entry.state_group] = None + async def _validate_canonical_alias( self, directory_handler, room_alias_str: str, expected_room_id: str ) -> None: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 12df35f26e..6fd1f34289 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py
@@ -28,6 +28,7 @@ from bisect import bisect from contextlib import contextmanager from typing import ( TYPE_CHECKING, + Callable, Collection, Dict, FrozenSet, @@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC): """ async def update_external_syncs_row( - self, process_id, user_id, is_syncing, sync_time_msec - ): + self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + ) -> None: """Update the syncing users for an external process as a delta. This is a no-op when presence is handled by a different worker. Args: - process_id (str): An identifier for the process the users are + process_id: An identifier for the process the users are syncing against. This allows synapse to process updates as user start and stop syncing against a given process. - user_id (str): The user who has started or stopped syncing - is_syncing (bool): Whether or not the user is now syncing - sync_time_msec(int): Time in ms when the user was last syncing + user_id: The user who has started or stopped syncing + is_syncing: Whether or not the user is now syncing + sync_time_msec: Time in ms when the user was last syncing """ pass - async def update_external_syncs_clear(self, process_id): + async def update_external_syncs_clear(self, process_id: str) -> None: """Marks all users that had been marked as syncing by a given process as offline. @@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]): class WorkerPresenceHandler(BasePresenceHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs @@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler): # user_id -> last_sync_ms. Lists the users that have stopped syncing but # we haven't notified the presence writer of that yet - self.users_going_offline = {} + self.users_going_offline = {} # type: Dict[str, int] self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs) @@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler): self._on_shutdown, ) - def _on_shutdown(self): + def _on_shutdown(self) -> None: if self._presence_enabled: self.hs.get_tcp_replication().send_command( ClearUserSyncsCommand(self.instance_id) ) - def send_user_sync(self, user_id, is_syncing, last_sync_ms): + def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: if self._presence_enabled: self.hs.get_tcp_replication().send_user_sync( self.instance_id, user_id, is_syncing, last_sync_ms ) - def mark_as_coming_online(self, user_id): + def mark_as_coming_online(self, user_id: str) -> None: """A user has started syncing. Send a UserSync to the presence writer, unless they had recently stopped syncing. - - Args: - user_id (str) """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: @@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler): # were offline self.send_user_sync(user_id, True, self.clock.time_msec()) - def mark_as_going_offline(self, user_id): + def mark_as_going_offline(self, user_id: str) -> None: """A user has stopped syncing. We wait before notifying the presence writer as its likely they'll come back soon. This allows us to avoid sending a stopped syncing immediately followed by a started syncing notification to the presence writer - - Args: - user_id (str) """ self.users_going_offline[user_id] = self.clock.time_msec() - def send_stop_syncing(self): + def send_stop_syncing(self) -> None: """Check if there are any users who have stopped syncing a while ago and haven't come back yet. If there are poke the presence writer about them. """ @@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler): return _user_syncing() - async def notify_from_replication(self, states, stream_id): + async def notify_from_replication( + self, states: List[UserPresenceState], stream_id: int + ) -> None: parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties @@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler): if count > 0 ] - async def set_state(self, target_user, state, ignore_status_msg=False): + async def set_state( + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + ) -> None: """Set the presence state of the user.""" presence = state["presence"] @@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler): ignore_status_msg=ignore_status_msg, ) - async def bump_presence_active_time(self, user): + async def bump_presence_active_time(self, user: UserID) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler): # we assume that all the sync requests on that process have stopped. # Stored as a dict from process_id to set of user_id, and a dict of # process_id to millisecond timestamp last updated. - self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]] - self.external_process_last_updated_ms = {} # type: Dict[int, int] + self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]] + self.external_process_last_updated_ms = {} # type: Dict[str, int] self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") @@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler): self._event_pos = self.store.get_current_events_token() self._event_processing = False - async def _on_shutdown(self): + async def _on_shutdown(self) -> None: """Gets called when shutting down. This lets us persist any updates that we haven't yet persisted, e.g. updates that only changes some internal timers. This allows changes to persist across startup without having to @@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler): ) logger.info("Finished _on_shutdown") - async def _persist_unpersisted_changes(self): + async def _persist_unpersisted_changes(self) -> None: """We periodically persist the unpersisted changes, as otherwise they may stack up and slow down shutdown times. """ @@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler): states, destinations ) - async def _handle_timeouts(self): + async def _handle_timeouts(self) -> None: """Checks the presence of users that have timed out and updates as appropriate. """ @@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler): return await self._update_states(changes) - async def bump_presence_active_time(self, user): + async def bump_presence_active_time(self, user: UserID) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler): return [] async def update_external_syncs_row( - self, process_id, user_id, is_syncing, sync_time_msec - ): + self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + ) -> None: """Update the syncing users for an external process as a delta. Args: - process_id (str): An identifier for the process the users are + process_id: An identifier for the process the users are syncing against. This allows synapse to process updates as user start and stop syncing against a given process. - user_id (str): The user who has started or stopped syncing - is_syncing (bool): Whether or not the user is now syncing - sync_time_msec(int): Time in ms when the user was last syncing + user_id: The user who has started or stopped syncing + is_syncing: Whether or not the user is now syncing + sync_time_msec: Time in ms when the user was last syncing """ with (await self.external_sync_linearizer.queue(process_id)): prev_state = await self.current_state_for_user(user_id) @@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler): self.external_process_last_updated_ms[process_id] = self.clock.time_msec() - async def update_external_syncs_clear(self, process_id): + async def update_external_syncs_clear(self, process_id: str) -> None: """Marks all users that had been marked as syncing by a given process as offline. @@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler): ) self.external_process_last_updated_ms.pop(process_id, None) - async def current_state_for_user(self, user_id): + async def current_state_for_user(self, user_id: str) -> UserPresenceState: """Get the current presence state for a user.""" res = await self.current_state_for_users([user_id]) return res[user_id] - async def _persist_and_notify(self, states): + async def _persist_and_notify(self, states: List[UserPresenceState]) -> None: """Persist states in the database, poke the notifier and send to interested remote servers """ @@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler): # stream (which is updated by `store.update_presence`). await self.maybe_send_presence_to_interested_destinations(states) - async def incoming_presence(self, origin, content): + async def incoming_presence(self, origin: str, content: JsonDict) -> None: """Called when we receive a `m.presence` EDU from a remote server.""" if not self._presence_enabled: return @@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler): federation_presence_counter.inc(len(updates)) await self._update_states(updates) - async def set_state(self, target_user, state, ignore_status_msg=False): + async def set_state( + self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + ) -> None: """Set the presence state of the user.""" status_msg = state.get("status_msg", None) presence = state["presence"] @@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler): await self._update_states([prev_state.copy_and_replace(**new_fields)]) - async def is_visible(self, observed_user, observer_user): + async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: """Returns whether a user can see another user's presence.""" observer_room_ids = await self.store.get_rooms_for_user( observer_user.to_string() @@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler): ) return rows - def notify_new_event(self): + def notify_new_event(self) -> None: """Called when new events have happened. Handles users and servers joining rooms and require being sent presence. """ @@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler): run_as_background_process("presence.notify_new_event", _process_presence) - async def _unsafe_process(self): + async def _unsafe_process(self) -> None: # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "presence_delta"): @@ -1179,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler): max_pos, deltas = await self.store.get_current_state_deltas( self._event_pos, room_max_stream_ordering ) - await self._handle_state_delta(deltas) + + # We may get multiple deltas for different rooms, but we want to + # handle them on a room by room basis, so we batch them up by + # room. + deltas_by_room: Dict[str, List[JsonDict]] = {} + for delta in deltas: + deltas_by_room.setdefault(delta["room_id"], []).append(delta) + + for room_id, deltas_for_room in deltas_by_room.items(): + await self._handle_state_delta(room_id, deltas_for_room) self._event_pos = max_pos @@ -1188,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler): max_pos ) - async def _handle_state_delta(self, deltas): - """Process current state deltas to find new joins that need to be - handled. + async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None: + """Process current state deltas for the room to find new joins that need + to be handled. """ - # A map of destination to a set of user state that they should receive - presence_destinations = {} # type: Dict[str, Set[UserPresenceState]] + + # Sets of newly joined users. Note that if the local server is + # joining a remote room for the first time we'll see both the joining + # user and all remote users as newly joined. + newly_joined_users = set() for delta in deltas: + assert room_id == delta["room_id"] + typ = delta["type"] state_key = delta["state_key"] - room_id = delta["room_id"] event_id = delta["event_id"] prev_event_id = delta["prev_event_id"] @@ -1227,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler): # Ignore changes to join events. continue - # Retrieve any user presence state updates that need to be sent as a result, - # and the destinations that need to receive it - destinations, user_presence_states = await self._on_user_joined_room( - room_id, state_key - ) - - # Insert the destinations and respective updates into our destinations dict - for destination in destinations: - presence_destinations.setdefault(destination, set()).update( - user_presence_states - ) - - # Send out user presence updates for each destination - for destination, user_state_set in presence_destinations.items(): - self._federation_queue.send_presence_to_destinations( - destinations=[destination], states=user_state_set - ) + newly_joined_users.add(state_key) - async def _on_user_joined_room( - self, room_id: str, user_id: str - ) -> Tuple[List[str], List[UserPresenceState]]: - """Called when we detect a user joining the room via the current state - delta stream. Returns the destinations that need to be updated and the - presence updates to send to them. - - Args: - room_id: The ID of the room that the user has joined. - user_id: The ID of the user that has joined the room. - - Returns: - A tuple of destinations and presence updates to send to them. - """ - if self.is_mine_id(user_id): - # If this is a local user then we need to send their presence - # out to hosts in the room (who don't already have it) - - # TODO: We should be able to filter the hosts down to those that - # haven't previously seen the user - - remote_hosts = await self.state.get_current_hosts_in_room(room_id) - - # Filter out ourselves. - filtered_remote_hosts = [ - host for host in remote_hosts if host != self.server_name - ] - - state = await self.current_state_for_user(user_id) - return filtered_remote_hosts, [state] - else: - # A remote user has joined the room, so we need to: - # 1. Check if this is a new server in the room - # 2. If so send any presence they don't already have for - # local users in the room. - - # TODO: We should be able to filter the users down to those that - # the server hasn't previously seen - - # TODO: Check that this is actually a new server joining the - # room. - - remote_host = get_domain_from_id(user_id) + if not newly_joined_users: + # If nobody has joined then there's nothing to do. + return - users = await self.state.get_current_users_in_room(room_id) - user_ids = list(filter(self.is_mine_id, users)) + # We want to send: + # 1. presence states of all local users in the room to newly joined + # remote servers + # 2. presence states of newly joined users to all remote servers in + # the room. + # + # TODO: Only send presence states to remote hosts that don't already + # have them (because they already share rooms). + + # Get all the users who were already in the room, by fetching the + # current users in the room and removing the newly joined users. + users = await self.store.get_users_in_room(room_id) + prev_users = set(users) - newly_joined_users + + # Construct sets for all the local users and remote hosts that were + # already in the room + prev_local_users = [] + prev_remote_hosts = set() + for user_id in prev_users: + if self.is_mine_id(user_id): + prev_local_users.append(user_id) + else: + prev_remote_hosts.add(get_domain_from_id(user_id)) + + # Similarly, construct sets for all the local users and remote hosts + # that were *not* already in the room. Care needs to be taken with the + # calculating the remote hosts, as a host may have already been in the + # room even if there is a newly joined user from that host. + newly_joined_local_users = [] + newly_joined_remote_hosts = set() + for user_id in newly_joined_users: + if self.is_mine_id(user_id): + newly_joined_local_users.append(user_id) + else: + host = get_domain_from_id(user_id) + if host not in prev_remote_hosts: + newly_joined_remote_hosts.add(host) - states_d = await self.current_state_for_users(user_ids) + # Send presence states of all local users in the room to newly joined + # remote servers. (We actually only send states for local users already + # in the room, as we'll send states for newly joined local users below.) + if prev_local_users and newly_joined_remote_hosts: + local_states = await self.current_state_for_users(prev_local_users) # Filter out old presence, i.e. offline presence states where # the user hasn't been active for a week. We can change this @@ -1302,16 +1302,30 @@ class PresenceHandler(BasePresenceHandler): now = self.clock.time_msec() states = [ state - for state in states_d.values() + for state in local_states.values() if state.state != PresenceState.OFFLINE or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000 or state.status_msg is not None ] - return [remote_host], states + self._federation_queue.send_presence_to_destinations( + destinations=newly_joined_remote_hosts, + states=states, + ) + # Send presence states of newly joined users to all remote servers in + # the room + if newly_joined_local_users and ( + prev_remote_hosts or newly_joined_remote_hosts + ): + local_states = await self.current_state_for_users(newly_joined_local_users) + self._federation_queue.send_presence_to_destinations( + destinations=prev_remote_hosts | newly_joined_remote_hosts, + states=list(local_states.values()), + ) -def should_notify(old_state, new_state): + +def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool: """Decides if a presence state change should be sent to interested parties.""" if old_state == new_state: return False @@ -1347,7 +1361,9 @@ def should_notify(old_state, new_state): return False -def format_user_presence_state(state, now, include_user_id=True): +def format_user_presence_state( + state: UserPresenceState, now: int, include_user_id: bool = True +) -> JsonDict: """Convert UserPresenceState to a format that can be sent down to clients and to other servers. @@ -1385,11 +1401,11 @@ class PresenceEventSource: @log_function async def get_new_events( self, - user, - from_key, - room_ids=None, - include_offline=True, - explicit_room_id=None, + user: UserID, + from_key: Optional[int], + room_ids: Optional[List[str]] = None, + include_offline: bool = True, + explicit_room_id: Optional[str] = None, **kwargs, ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: @@ -1594,7 +1610,7 @@ class PresenceEventSource: if update.state != PresenceState.OFFLINE ] - def get_current_key(self): + def get_current_key(self) -> int: return self.store.get_current_presence_token() @cached(num_args=2, cache_context=True) @@ -1654,15 +1670,20 @@ class PresenceEventSource: return users_interested_in -def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): +def handle_timeouts( + user_states: List[UserPresenceState], + is_mine_fn: Callable[[str], bool], + syncing_user_ids: Set[str], + now: int, +) -> List[UserPresenceState]: """Checks the presence of users that have timed out and updates as appropriate. Args: - user_states(list): List of UserPresenceState's to check. - is_mine_fn (fn): Function that returns if a user_id is ours - syncing_user_ids (set): Set of user_ids with active syncs. - now (int): Current time in ms. + user_states: List of UserPresenceState's to check. + is_mine_fn: Function that returns if a user_id is ours + syncing_user_ids: Set of user_ids with active syncs. + now: Current time in ms. Returns: List of UserPresenceState updates @@ -1679,14 +1700,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now): return list(changes.values()) -def handle_timeout(state, is_mine, syncing_user_ids, now): +def handle_timeout( + state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int +) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed Args: - state (UserPresenceState) - is_mine (bool): Whether the user is ours - syncing_user_ids (set): Set of user_ids with active syncs. - now (int): Current time in ms. + state + is_mine: Whether the user is ours + syncing_user_ids: Set of user_ids with active syncs. + now: Current time in ms. Returns: A UserPresenceState update or None if no update. @@ -1738,23 +1761,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now): return state if changed else None -def handle_update(prev_state, new_state, is_mine, wheel_timer, now): +def handle_update( + prev_state: UserPresenceState, + new_state: UserPresenceState, + is_mine: bool, + wheel_timer: WheelTimer, + now: int, +) -> Tuple[UserPresenceState, bool, bool]: """Given a presence update: 1. Add any appropriate timers. 2. Check if we should notify anyone. Args: - prev_state (UserPresenceState) - new_state (UserPresenceState) - is_mine (bool): Whether the user is ours - wheel_timer (WheelTimer) - now (int): Time now in ms + prev_state + new_state + is_mine: Whether the user is ours + wheel_timer + now: Time now in ms Returns: 3-tuple: `(new_state, persist_and_notify, federation_ping)` where: - new_state: is the state to actually persist - - persist_and_notify (bool): whether to persist and notify people - - federation_ping (bool): whether we should send a ping over federation + - persist_and_notify: whether to persist and notify people + - federation_ping: whether we should send a ping over federation """ user_id = new_state.user_id diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5a888b7941..fb4823a5cc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py
@@ -1327,7 +1327,7 @@ class RoomShutdownHandler: new_room_id = None logger.info("Shutting down room %r", room_id) - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) kicked_users = [] failed_to_kick_users = [] for user_id in users: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 54c25e3557..242abc77c0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py
@@ -1064,7 +1064,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): class RoomMemberMasterHandler(RoomMemberHandler): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.distributor = hs.get_distributor() diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 01e3e050f9..2e997841f1 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py
@@ -288,6 +288,7 @@ class SpaceSummaryHandler: ev.data for ev in res.events if ev.event_type == EventTypes.MSC1772_SPACE_CHILD + or ev.event_type == EventTypes.SpaceChild ) async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool: @@ -331,7 +332,9 @@ class SpaceSummaryHandler: ) # TODO: update once MSC1772 lands - room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE) + room_type = create_event.content.get(EventContentFields.ROOM_TYPE) + if not room_type: + room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE) entry = { "room_id": stats["room_id"], @@ -344,6 +347,7 @@ class SpaceSummaryHandler: stats["history_visibility"] == HistoryVisibility.WORLD_READABLE ), "guest_can_join": stats["guest_access"] == "can_join", + "creation_ts": create_event.origin_server_ts, "room_type": room_type, } @@ -360,8 +364,9 @@ class SpaceSummaryHandler: [ event_id for key, event_id in current_state_ids.items() - # TODO: update once MSC1772 lands + # TODO: update once MSC1772 has been FCP for a period of time. if key[0] == EventTypes.MSC1772_SPACE_CHILD + or key[0] == EventTypes.SpaceChild ] ) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3ffc4628cb..02ae36d2de 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -1191,7 +1191,7 @@ class SyncHandler: # Step 1b, check for newly joined rooms for room_id in newly_joined_rooms: - joined_users = await self.state.get_current_users_in_room(room_id) + joined_users = await self.store.get_users_in_room(room_id) newly_joined_or_invited_users.update(joined_users) # TODO: Check that these users are actually new, i.e. either they @@ -1207,7 +1207,7 @@ class SyncHandler: # Now find users that we no longer track for room_id in newly_left_rooms: - left_users = await self.state.get_current_users_in_room(room_id) + left_users = await self.store.get_users_in_room(room_id) newly_left_users.update(left_users) # Remove any users that we still share a room with. @@ -1362,7 +1362,7 @@ class SyncHandler: extra_users_ids = set(newly_joined_or_invited_users) for room_id in newly_joined_rooms: - users = await self.state.get_current_users_in_room(room_id) + users = await self.store.get_users_in_room(room_id) extra_users_ids.update(users) extra_users_ids.discard(user.to_string()) diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 0eeb7c03f2..5414ce77d8 100644 --- a/synapse/handlers/ui_auth/checkers.py +++ b/synapse/handlers/ui_auth/checkers.py
@@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Any +from typing import TYPE_CHECKING, Any from twisted.web.client import PartialDownloadError @@ -22,13 +22,16 @@ from synapse.api.errors import Codes, LoginError, SynapseError from synapse.config.emailconfig import ThreepidBehaviour from synapse.util import json_decoder +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class UserInteractiveAuthChecker: """Abstract base class for an interactive auth checker""" - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): pass def is_enabled(self) -> bool: @@ -57,10 +60,10 @@ class UserInteractiveAuthChecker: class DummyAuthChecker(UserInteractiveAuthChecker): AUTH_TYPE = LoginType.DUMMY - def is_enabled(self): + def is_enabled(self) -> bool: return True - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return True @@ -70,24 +73,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker): def is_enabled(self): return True - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return True class RecaptchaAuthChecker(UserInteractiveAuthChecker): AUTH_TYPE = LoginType.RECAPTCHA - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self._enabled = bool(hs.config.recaptcha_private_key) self._http_client = hs.get_proxied_http_client() self._url = hs.config.recaptcha_siteverify_api self._secret = hs.config.recaptcha_private_key - def is_enabled(self): + def is_enabled(self) -> bool: return self._enabled - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: try: user_response = authdict["response"] except KeyError: @@ -132,11 +135,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker): class _BaseThreepidAuthChecker: - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastore() - async def _check_threepid(self, medium, authdict): + async def _check_threepid(self, medium: str, authdict: dict) -> dict: if "threepid_creds" not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) @@ -206,31 +209,31 @@ class _BaseThreepidAuthChecker: class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker): AUTH_TYPE = LoginType.EMAIL_IDENTITY - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): UserInteractiveAuthChecker.__init__(self, hs) _BaseThreepidAuthChecker.__init__(self, hs) - def is_enabled(self): + def is_enabled(self) -> bool: return self.hs.config.threepid_behaviour_email in ( ThreepidBehaviour.REMOTE, ThreepidBehaviour.LOCAL, ) - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return await self._check_threepid("email", authdict) class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker): AUTH_TYPE = LoginType.MSISDN - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): UserInteractiveAuthChecker.__init__(self, hs) _BaseThreepidAuthChecker.__init__(self, hs) - def is_enabled(self): + def is_enabled(self) -> bool: return bool(self.hs.config.account_threepid_delegate_msisdn) - async def check_auth(self, authdict, clientip): + async def check_auth(self, authdict: dict, clientip: str) -> Any: return await self._check_threepid("msisdn", authdict)