diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 7 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 237 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 6 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 11 | ||||
-rw-r--r-- | synapse/handlers/receipts.py | 3 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 46 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 13 | ||||
-rw-r--r-- | synapse/handlers/send_email.py | 94 | ||||
-rw-r--r-- | synapse/handlers/space_summary.py | 182 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 14 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 3 |
11 files changed, 438 insertions, 178 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 21a17cd2e8..4ab4046650 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -392,9 +392,6 @@ class ApplicationServicesHandler: protocols[p].append(info) def _merge_instances(infos: List[JsonDict]) -> JsonDict: - if not infos: - return {} - # Merge the 'instances' lists of multiple results, but just take # the other fields from the first as they ought to be identical # copy the result so as not to corrupt the cached one @@ -406,7 +403,9 @@ class ApplicationServicesHandler: return combined - return {p: _merge_instances(protocols[p]) for p in protocols.keys()} + return { + p: _merge_instances(protocols[p]) for p in protocols.keys() if protocols[p] + } async def _get_services_for_event( self, event: EventBase diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8197b60b76..9a5e726533 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -42,6 +42,7 @@ from twisted.internet import defer from synapse import event_auth from synapse.api.constants import ( + EventContentFields, EventTypes, Membership, RejectedReason, @@ -108,21 +109,33 @@ soft_failed_event_counter = Counter( ) -@attr.s(slots=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _NewEventInfo: """Holds information about a received event, ready for passing to _auth_and_persist_events Attributes: event: the received event - state: the state at that event + state: the state at that event, according to /state_ids from a remote + homeserver. Only populated for backfilled events which are going to be a + new backwards extremity. + + claimed_auth_event_map: a map of (type, state_key) => event for the event's + claimed auth_events. + + This can include events which have not yet been persisted, in the case that + we are backfilling a batch of events. + + Note: May be incomplete: if we were unable to find all of the claimed auth + events. Also, treat the contents with caution: the events might also have + been rejected, might not yet have been authorized themselves, or they might + be in the wrong room. - auth_events: the auth_event map for that event """ - event = attr.ib(type=EventBase) - state = attr.ib(type=Optional[Sequence[EventBase]], default=None) - auth_events = attr.ib(type=Optional[MutableStateMap[EventBase]], default=None) + event: EventBase + state: Optional[Sequence[EventBase]] + claimed_auth_event_map: StateMap[EventBase] class FederationHandler(BaseHandler): @@ -262,7 +275,12 @@ class FederationHandler(BaseHandler): state = None - # Get missing pdus if necessary. + # Check that the event passes auth based on the state at the event. This is + # done for events that are to be added to the timeline (non-outliers). + # + # Get missing pdus if necessary: + # - Fetching any missing prev events to fill in gaps in the graph + # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. min_depth = await self.get_min_depth_for_context(pdu.room_id) @@ -432,6 +450,13 @@ class FederationHandler(BaseHandler): affected=event_id, ) + # A second round of checks for all events. Check that the event passes auth + # based on `auth_events`, this allows us to assert that the event would + # have been allowed at some point. If an event passes this check its OK + # for it to be used as part of a returned `/state` request, as either + # a) we received the event as part of the original join and so trust it, or + # b) we'll do a state resolution with existing state before it becomes + # part of the "current state", which adds more protection. await self._process_received_pdu(origin, pdu, state=state) async def _get_missing_events_for_pdu( @@ -889,6 +914,79 @@ class FederationHandler(BaseHandler): "resync_device_due_to_pdu", self._resync_device, event.sender ) + await self._handle_marker_event(origin, event) + + async def _handle_marker_event(self, origin: str, marker_event: EventBase): + """Handles backfilling the insertion event when we receive a marker + event that points to one. + + Args: + origin: Origin of the event. Will be called to get the insertion event + marker_event: The event to process + """ + + if marker_event.type != EventTypes.MSC2716_MARKER: + # Not a marker event + return + + if marker_event.rejected_reason is not None: + # Rejected event + return + + # Skip processing a marker event if the room version doesn't + # support it. + room_version = await self.store.get_room_version(marker_event.room_id) + if not room_version.msc2716_historical: + return + + logger.debug("_handle_marker_event: received %s", marker_event) + + insertion_event_id = marker_event.content.get( + EventContentFields.MSC2716_MARKER_INSERTION + ) + + if insertion_event_id is None: + # Nothing to retrieve then (invalid marker) + return + + logger.debug( + "_handle_marker_event: backfilling insertion event %s", insertion_event_id + ) + + await self._get_events_and_persist( + origin, + marker_event.room_id, + [insertion_event_id], + ) + + insertion_event = await self.store.get_event( + insertion_event_id, allow_none=True + ) + if insertion_event is None: + logger.warning( + "_handle_marker_event: server %s didn't return insertion event %s for marker %s", + origin, + insertion_event_id, + marker_event.event_id, + ) + return + + logger.debug( + "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s", + insertion_event, + marker_event, + ) + + await self.store.insert_insertion_extremity( + insertion_event_id, marker_event.room_id + ) + + logger.debug( + "_handle_marker_event: insertion extremity added for %s from marker event %s", + insertion_event, + marker_event, + ) + async def _resync_device(self, sender: str) -> None: """We have detected that the device list for the given user may be out of sync, so we try and resync them. @@ -1000,7 +1098,7 @@ class FederationHandler(BaseHandler): _NewEventInfo( event=ev, state=events_to_state[e_id], - auth_events={ + claimed_auth_event_map={ ( auth_events[a_id].type, auth_events[a_id].state_key, @@ -1057,9 +1155,19 @@ class FederationHandler(BaseHandler): async def _maybe_backfill_inner( self, room_id: str, current_depth: int, limit: int ) -> bool: - extremities = await self.store.get_oldest_events_with_depth_in_room(room_id) + oldest_events_with_depth = ( + await self.store.get_oldest_event_ids_with_depth_in_room(room_id) + ) + insertion_events_to_be_backfilled = ( + await self.store.get_insertion_event_backwards_extremities_in_room(room_id) + ) + logger.debug( + "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s", + oldest_events_with_depth, + insertion_events_to_be_backfilled, + ) - if not extremities: + if not oldest_events_with_depth and not insertion_events_to_be_backfilled: logger.debug("Not backfilling as no extremeties found.") return False @@ -1089,10 +1197,12 @@ class FederationHandler(BaseHandler): # state *before* the event, ignoring the special casing certain event # types have. - forward_events = await self.store.get_successor_events(list(extremities)) + forward_event_ids = await self.store.get_successor_events( + list(oldest_events_with_depth) + ) extremities_events = await self.store.get_events( - forward_events, + forward_event_ids, redact_behaviour=EventRedactBehaviour.AS_IS, get_prev_content=False, ) @@ -1106,10 +1216,19 @@ class FederationHandler(BaseHandler): redact=False, check_history_visibility_only=True, ) + logger.debug( + "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities + ) - if not filtered_extremities: + if not filtered_extremities and not insertion_events_to_be_backfilled: return False + extremities = { + **oldest_events_with_depth, + # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks + **insertion_events_to_be_backfilled, + } + # Check if we reached a point where we should start backfilling. sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1])) max_depth = sorted_extremeties_tuple[0][1] @@ -2208,7 +2327,7 @@ class FederationHandler(BaseHandler): event: EventBase, context: EventContext, state: Optional[Iterable[EventBase]] = None, - auth_events: Optional[MutableStateMap[EventBase]] = None, + claimed_auth_event_map: Optional[StateMap[EventBase]] = None, backfilled: bool = False, ) -> None: """ @@ -2220,17 +2339,18 @@ class FederationHandler(BaseHandler): context: The event context. - NB that this function potentially modifies it. state: The state events used to check the event for soft-fail. If this is not provided the current state events will be used. - auth_events: - Map from (event_type, state_key) to event - Normally, our calculated auth_events based on the state of the room - at the event's position in the DAG, though occasionally (eg if the - event is an outlier), may be the auth events claimed by the remote - server. + claimed_auth_event_map: + A map of (type, state_key) => event for the event's claimed auth_events. + Possibly incomplete, and possibly including events that are not yet + persisted, or authed, or in the right room. + + Only populated where we may not already have persisted these events - + for example, when populating outliers. + backfilled: True if the event was backfilled. """ context = await self._check_event_auth( @@ -2238,7 +2358,7 @@ class FederationHandler(BaseHandler): event, context, state=state, - auth_events=auth_events, + claimed_auth_event_map=claimed_auth_event_map, backfilled=backfilled, ) @@ -2302,7 +2422,7 @@ class FederationHandler(BaseHandler): event, res, state=ev_info.state, - auth_events=ev_info.auth_events, + claimed_auth_event_map=ev_info.claimed_auth_event_map, backfilled=backfilled, ) return res @@ -2568,7 +2688,7 @@ class FederationHandler(BaseHandler): event: EventBase, context: EventContext, state: Optional[Iterable[EventBase]] = None, - auth_events: Optional[MutableStateMap[EventBase]] = None, + claimed_auth_event_map: Optional[StateMap[EventBase]] = None, backfilled: bool = False, ) -> EventContext: """ @@ -2580,21 +2700,19 @@ class FederationHandler(BaseHandler): context: The event context. - NB that this function potentially modifies it. state: The state events used to check the event for soft-fail. If this is not provided the current state events will be used. - auth_events: - Map from (event_type, state_key) to event - Normally, our calculated auth_events based on the state of the room - at the event's position in the DAG, though occasionally (eg if the - event is an outlier), may be the auth events claimed by the remote - server. + claimed_auth_event_map: + A map of (type, state_key) => event for the event's claimed auth_events. + Possibly incomplete, and possibly including events that are not yet + persisted, or authed, or in the right room. - Also NB that this function adds entries to it. + Only populated where we may not already have persisted these events - + for example, when populating outliers, or the state for a backwards + extremity. - If this is not provided, it is calculated from the previous state IDs. backfilled: True if the event was backfilled. Returns: @@ -2603,7 +2721,12 @@ class FederationHandler(BaseHandler): room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - if not auth_events: + if claimed_auth_event_map: + # if we have a copy of the auth events from the event, use that as the + # basis for auth. + auth_events = claimed_auth_event_map + else: + # otherwise, we calculate what the auth events *should* be, and use that prev_state_ids = await context.get_prev_state_ids() auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=True @@ -2611,18 +2734,11 @@ class FederationHandler(BaseHandler): auth_events_x = await self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} - # This is a hack to fix some old rooms where the initial join event - # didn't reference the create event in its auth events. - if event.type == EventTypes.Member and not event.auth_event_ids(): - if len(event.prev_event_ids()) == 1 and event.depth < 5: - c = await self.store.get_event( - event.prev_event_ids()[0], allow_none=True - ) - if c and c.type == EventTypes.Create: - auth_events[(c.type, c.state_key)] = c - try: - context = await self._update_auth_events_and_context_for_auth( + ( + context, + auth_events_for_auth, + ) = await self._update_auth_events_and_context_for_auth( origin, event, context, auth_events ) except Exception: @@ -2635,9 +2751,10 @@ class FederationHandler(BaseHandler): "Ignoring failure and continuing processing of event.", event.event_id, ) + auth_events_for_auth = auth_events try: - event_auth.check(room_version_obj, event, auth_events=auth_events) + event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth) except AuthError as e: logger.warning("Failed auth resolution for %r because %s", event, e) context.rejected = RejectedReason.AUTH_ERROR @@ -2662,8 +2779,8 @@ class FederationHandler(BaseHandler): origin: str, event: EventBase, context: EventContext, - auth_events: MutableStateMap[EventBase], - ) -> EventContext: + input_auth_events: StateMap[EventBase], + ) -> Tuple[EventContext, StateMap[EventBase]]: """Helper for _check_event_auth. See there for docs. Checks whether a given event has the expected auth events. If it @@ -2680,7 +2797,7 @@ class FederationHandler(BaseHandler): event: context: - auth_events: + input_auth_events: Map from (event_type, state_key) to event Normally, our calculated auth_events based on the state of the room @@ -2688,11 +2805,12 @@ class FederationHandler(BaseHandler): event is an outlier), may be the auth events claimed by the remote server. - Also NB that this function adds entries to it. - Returns: - updated context + updated context, updated auth event map """ + # take a copy of input_auth_events before we modify it. + auth_events: MutableStateMap[EventBase] = dict(input_auth_events) + event_auth_events = set(event.auth_event_ids()) # missing_auth is the set of the event's auth_events which we don't yet have @@ -2721,7 +2839,7 @@ class FederationHandler(BaseHandler): # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. logger.info("Failed to get event auth from remote: %s", e1) - return context + return context, auth_events seen_remotes = await self.store.have_seen_events( event.room_id, [e.event_id for e in remote_auth_chain] @@ -2752,7 +2870,10 @@ class FederationHandler(BaseHandler): await self.state_handler.compute_event_context(e) ) await self._auth_and_persist_event( - origin, e, missing_auth_event_context, auth_events=auth + origin, + e, + missing_auth_event_context, + claimed_auth_event_map=auth, ) if e.event_id in event_auth_events: @@ -2770,14 +2891,14 @@ class FederationHandler(BaseHandler): # obviously be empty # (b) alternatively, why don't we do it earlier? logger.info("Skipping auth_event fetch for outlier") - return context + return context, auth_events different_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) if not different_auth: - return context + return context, auth_events logger.info( "auth_events refers to events which are not in our calculated auth " @@ -2803,7 +2924,7 @@ class FederationHandler(BaseHandler): # XXX: should we reject the event in this case? It feels like we should, # but then shouldn't we also do so if we've failed to fetch any of the # auth events? - return context + return context, auth_events # now we state-resolve between our own idea of the auth events, and the remote's # idea of them. @@ -2833,7 +2954,7 @@ class FederationHandler(BaseHandler): event, context, auth_events ) - return context + return context, auth_events async def _update_context_for_auth_events( self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 0961dec5ab..8ffeabacf9 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -824,6 +824,7 @@ class IdentityHandler(BaseHandler): room_avatar_url: str, room_join_rules: str, room_name: str, + room_type: Optional[str], inviter_display_name: str, inviter_avatar_url: str, id_access_token: Optional[str] = None, @@ -843,6 +844,7 @@ class IdentityHandler(BaseHandler): notifications. room_join_rules: The join rules of the email (e.g. "public"). room_name: The m.room.name of the room. + room_type: The type of the room from its m.room.create event (e.g "m.space"). inviter_display_name: The current display name of the inviter. inviter_avatar_url: The URL of the inviter's avatar. @@ -869,6 +871,10 @@ class IdentityHandler(BaseHandler): "sender_display_name": inviter_display_name, "sender_avatar_url": inviter_avatar_url, } + + if room_type is not None: + invite_config["org.matrix.msc3288.room_type"] = room_type + # If a custom web client location is available, include it in the request. if self._web_client_location: invite_config["org.matrix.web_client_location"] = self._web_client_location diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 016c5df2ca..7ca14e1d84 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1184,8 +1184,7 @@ class PresenceHandler(BasePresenceHandler): new_fields = {"state": presence} if not ignore_status_msg: - msg = status_msg if presence != PresenceState.OFFLINE else None - new_fields["status_msg"] = msg + new_fields["status_msg"] = status_msg if presence == PresenceState.ONLINE or ( presence == PresenceState.BUSY and self._busy_presence_enabled @@ -1478,7 +1477,7 @@ def format_user_presence_state( content["user_id"] = state.user_id if state.last_active_ts: content["last_active_ago"] = now - state.last_active_ts - if state.status_msg and state.state != PresenceState.OFFLINE: + if state.status_msg: content["status_msg"] = state.status_msg if state.state == PresenceState.ONLINE: content["currently_active"] = state.currently_active @@ -1840,9 +1839,7 @@ def handle_timeout( # don't set them as offline. sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) if now - sync_or_active > SYNC_ONLINE_TIMEOUT: - state = state.copy_and_replace( - state=PresenceState.OFFLINE, status_msg=None - ) + state = state.copy_and_replace(state=PresenceState.OFFLINE) changed = True else: # We expect to be poked occasionally by the other side. @@ -1850,7 +1847,7 @@ def handle_timeout( # no one gets stuck online forever. if now - state.last_federation_update_ts > FEDERATION_TIMEOUT: # The other side seems to have disappeared. - state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None) + state = state.copy_and_replace(state=PresenceState.OFFLINE) changed = True return state if changed else None diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index b9085bbccb..5fd4525700 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -70,7 +70,8 @@ class ReceiptsHandler(BaseHandler): ) if not is_in_room: logger.info( - "Ignoring receipt from %s as we're not in the room", + "Ignoring receipt for room %r from server %s as we're not in the room", + room_id, origin, ) continue diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index fae2c098e3..6d433fad41 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -356,6 +356,12 @@ class RoomListHandler(BaseHandler): include_all_networks: bool = False, third_party_instance_id: Optional[str] = None, ) -> JsonDict: + """Get the public room list from remote server + + Raises: + SynapseError + """ + if not self.enable_room_list_search: return {"chunk": [], "total_room_count_estimate": 0} @@ -395,13 +401,16 @@ class RoomListHandler(BaseHandler): limit = None since_token = None - res = await self._get_remote_list_cached( - server_name, - limit=limit, - since_token=since_token, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) + try: + res = await self._get_remote_list_cached( + server_name, + limit=limit, + since_token=since_token, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) + except (RequestSendFailed, HttpResponseException): + raise SynapseError(502, "Failed to fetch room list") if search_filter: res = { @@ -423,20 +432,21 @@ class RoomListHandler(BaseHandler): include_all_networks: bool = False, third_party_instance_id: Optional[str] = None, ) -> JsonDict: + """Wrapper around FederationClient.get_public_rooms that caches the + result. + """ + repl_layer = self.hs.get_federation_client() if search_filter: # We can't cache when asking for search - try: - return await repl_layer.get_public_rooms( - server_name, - limit=limit, - since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - except (RequestSendFailed, HttpResponseException): - raise SynapseError(502, "Failed to fetch room list") + return await repl_layer.get_public_rooms( + server_name, + limit=limit, + since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) key = ( server_name, diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 65ad3efa6a..ba13196218 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -19,7 +19,12 @@ from http import HTTPStatus from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple from synapse import types -from synapse.api.constants import AccountDataTypes, EventTypes, Membership +from synapse.api.constants import ( + AccountDataTypes, + EventContentFields, + EventTypes, + Membership, +) from synapse.api.errors import ( AuthError, Codes, @@ -1237,6 +1242,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if room_name_event: room_name = room_name_event.content.get("name", "") + room_type = None + room_create_event = room_state.get((EventTypes.Create, "")) + if room_create_event: + room_type = room_create_event.content.get(EventContentFields.ROOM_TYPE) + room_join_rules = "" join_rules_event = room_state.get((EventTypes.JoinRules, "")) if join_rules_event: @@ -1263,6 +1273,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): room_avatar_url=room_avatar_url, room_join_rules=room_join_rules, room_name=room_name, + room_type=room_type, inviter_display_name=inviter_display_name, inviter_avatar_url=inviter_avatar_url, id_access_token=id_access_token, diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py index e9f6aef06f..dda9659c11 100644 --- a/synapse/handlers/send_email.py +++ b/synapse/handlers/send_email.py @@ -16,7 +16,12 @@ import email.utils import logging from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from typing import TYPE_CHECKING +from io import BytesIO +from typing import TYPE_CHECKING, Optional + +from twisted.internet.defer import Deferred +from twisted.internet.interfaces import IReactorTCP +from twisted.mail.smtp import ESMTPSenderFactory from synapse.logging.context import make_deferred_yieldable @@ -26,19 +31,75 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +async def _sendmail( + reactor: IReactorTCP, + smtphost: str, + smtpport: int, + from_addr: str, + to_addr: str, + msg_bytes: bytes, + username: Optional[bytes] = None, + password: Optional[bytes] = None, + require_auth: bool = False, + require_tls: bool = False, + tls_hostname: Optional[str] = None, +) -> None: + """A simple wrapper around ESMTPSenderFactory, to allow substitution in tests + + Params: + reactor: reactor to use to make the outbound connection + smtphost: hostname to connect to + smtpport: port to connect to + from_addr: "From" address for email + to_addr: "To" address for email + msg_bytes: Message content + username: username to authenticate with, if auth is enabled + password: password to give when authenticating + require_auth: if auth is not offered, fail the request + require_tls: if TLS is not offered, fail the reqest + tls_hostname: TLS hostname to check for. None to disable TLS. + """ + msg = BytesIO(msg_bytes) + + d: "Deferred[object]" = Deferred() + + factory = ESMTPSenderFactory( + username, + password, + from_addr, + to_addr, + msg, + d, + heloFallback=True, + requireAuthentication=require_auth, + requireTransportSecurity=require_tls, + hostname=tls_hostname, + ) + + # the IReactorTCP interface claims host has to be a bytes, which seems to be wrong + reactor.connectTCP(smtphost, smtpport, factory, timeout=30, bindAddress=None) # type: ignore[arg-type] + + await make_deferred_yieldable(d) + + class SendEmailHandler: def __init__(self, hs: "HomeServer"): self.hs = hs - self._sendmail = hs.get_sendmail() self._reactor = hs.get_reactor() self._from = hs.config.email.email_notif_from self._smtp_host = hs.config.email.email_smtp_host self._smtp_port = hs.config.email.email_smtp_port - self._smtp_user = hs.config.email.email_smtp_user - self._smtp_pass = hs.config.email.email_smtp_pass + + user = hs.config.email.email_smtp_user + self._smtp_user = user.encode("utf-8") if user is not None else None + passwd = hs.config.email.email_smtp_pass + self._smtp_pass = passwd.encode("utf-8") if passwd is not None else None self._require_transport_security = hs.config.email.require_transport_security + self._enable_tls = hs.config.email.enable_smtp_tls + + self._sendmail = _sendmail async def send_email( self, @@ -82,17 +143,16 @@ class SendEmailHandler: logger.info("Sending email to %s" % email_address) - await make_deferred_yieldable( - self._sendmail( - self._smtp_host, - raw_from, - raw_to, - multipart_msg.as_string().encode("utf8"), - reactor=self._reactor, - port=self._smtp_port, - requireAuthentication=self._smtp_user is not None, - username=self._smtp_user, - password=self._smtp_pass, - requireTransportSecurity=self._require_transport_security, - ) + await self._sendmail( + self._reactor, + self._smtp_host, + self._smtp_port, + raw_from, + raw_to, + multipart_msg.as_string().encode("utf8"), + username=self._smtp_user, + password=self._smtp_pass, + require_auth=self._smtp_user is not None, + require_tls=self._require_transport_security, + tls_hostname=self._smtp_host if self._enable_tls else None, ) diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py index 5f7d4602bd..2517f278b6 100644 --- a/synapse/handlers/space_summary.py +++ b/synapse/handlers/space_summary.py @@ -16,7 +16,17 @@ import itertools import logging import re from collections import deque -from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Sequence, + Set, + Tuple, +) import attr @@ -116,20 +126,22 @@ class SpaceSummaryHandler: max_children = max_rooms_per_space if processed_rooms else None if is_in_room: - room, events = await self._summarize_local_room( + room_entry = await self._summarize_local_room( requester, None, room_id, suggested_only, max_children ) + events: Collection[JsonDict] = [] + if room_entry: + rooms_result.append(room_entry.room) + events = room_entry.children + logger.debug( "Query of local room %s returned events %s", room_id, ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events], ) - - if room: - rooms_result.append(room) else: - fed_rooms, fed_events = await self._summarize_remote_room( + fed_rooms = await self._summarize_remote_room( queue_entry, suggested_only, max_children, @@ -141,12 +153,10 @@ class SpaceSummaryHandler: # user is not permitted see. # # Filter the returned results to only what is accessible to the user. - room_ids = set() events = [] - for room in fed_rooms: - fed_room_id = room.get("room_id") - if not fed_room_id or not isinstance(fed_room_id, str): - continue + for room_entry in fed_rooms: + room = room_entry.room + fed_room_id = room_entry.room_id # The room should only be included in the summary if: # a. the user is in the room; @@ -169,7 +179,9 @@ class SpaceSummaryHandler: # Check if the user is a member of any of the allowed spaces # from the response. - allowed_rooms = room.get("allowed_spaces") + allowed_rooms = room.get("allowed_room_ids") or room.get( + "allowed_spaces" + ) if ( not include_room and allowed_rooms @@ -188,22 +200,23 @@ class SpaceSummaryHandler: # The user can see the room, include it! if include_room: + # Before returning to the client, remove the allowed_room_ids + # and allowed_spaces keys. + room.pop("allowed_room_ids", None) + room.pop("allowed_spaces", None) + rooms_result.append(room) - room_ids.add(fed_room_id) + events.extend(room_entry.children) # All rooms returned don't need visiting again (even if the user # didn't have access to them). processed_rooms.add(fed_room_id) - for event in fed_events: - if event.get("room_id") in room_ids: - events.append(event) - logger.debug( "Query of %s returned rooms %s, events %s", room_id, - [room.get("room_id") for room in fed_rooms], - ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in fed_events], + [room_entry.room.get("room_id") for room_entry in fed_rooms], + ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events], ) # the room we queried may or may not have been returned, but don't process @@ -230,11 +243,6 @@ class SpaceSummaryHandler: ) processed_events.add(ev_key) - # Before returning to the client, remove the allowed_spaces key for any - # rooms. - for room in rooms_result: - room.pop("allowed_spaces", None) - return {"rooms": rooms_result, "events": events_result} async def federation_space_summary( @@ -283,20 +291,20 @@ class SpaceSummaryHandler: # already done this room continue - logger.debug("Processing room %s", room_id) - - room, events = await self._summarize_local_room( + room_entry = await self._summarize_local_room( None, origin, room_id, suggested_only, max_rooms_per_space ) processed_rooms.add(room_id) - if room: - rooms_result.append(room) - events_result.extend(events) + if room_entry: + rooms_result.append(room_entry.room) + events_result.extend(room_entry.children) - # add any children to the queue - room_queue.extend(edge_event["state_key"] for edge_event in events) + # add any children to the queue + room_queue.extend( + edge_event["state_key"] for edge_event in room_entry.children + ) return {"rooms": rooms_result, "events": events_result} @@ -307,7 +315,7 @@ class SpaceSummaryHandler: room_id: str, suggested_only: bool, max_children: Optional[int], - ) -> Tuple[Optional[JsonDict], Sequence[JsonDict]]: + ) -> Optional["_RoomEntry"]: """ Generate a room entry and a list of event entries for a given room. @@ -326,21 +334,16 @@ class SpaceSummaryHandler: to a server-set limit. Returns: - A tuple of: - The room information, if the room should be returned to the - user. None, otherwise. - - An iterable of the sorted children events. This may be limited - to a maximum size or may include all children. + A room entry if the room should be returned. None, otherwise. """ if not await self._is_room_accessible(room_id, requester, origin): - return None, () + return None - room_entry = await self._build_room_entry(room_id) + room_entry = await self._build_room_entry(room_id, for_federation=bool(origin)) # If the room is not a space, return just the room information. if room_entry.get("room_type") != RoomTypes.SPACE: - return room_entry, () + return _RoomEntry(room_id, room_entry) # Otherwise, look for child rooms/spaces. child_events = await self._get_child_events(room_id) @@ -363,7 +366,7 @@ class SpaceSummaryHandler: ) ) - return room_entry, events_result + return _RoomEntry(room_id, room_entry, events_result) async def _summarize_remote_room( self, @@ -371,7 +374,7 @@ class SpaceSummaryHandler: suggested_only: bool, max_children: Optional[int], exclude_rooms: Iterable[str], - ) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]: + ) -> Iterable["_RoomEntry"]: """ Request room entries and a list of event entries for a given room by querying a remote server. @@ -386,11 +389,7 @@ class SpaceSummaryHandler: Rooms IDs which do not need to be summarized. Returns: - A tuple of: - An iterable of rooms. - - An iterable of the sorted children events. This may be limited - to a maximum size or may include all children. + An iterable of room entries. """ room_id = room.room_id logger.info("Requesting summary for %s via %s", room_id, room.via) @@ -414,11 +413,30 @@ class SpaceSummaryHandler: e, exc_info=logger.isEnabledFor(logging.DEBUG), ) - return (), () + return () + + # Group the events by their room. + children_by_room: Dict[str, List[JsonDict]] = {} + for ev in res.events: + if ev.event_type == EventTypes.SpaceChild: + children_by_room.setdefault(ev.room_id, []).append(ev.data) + + # Generate the final results. + results = [] + for fed_room in res.rooms: + fed_room_id = fed_room.get("room_id") + if not fed_room_id or not isinstance(fed_room_id, str): + continue - return res.rooms, tuple( - ev.data for ev in res.events if ev.event_type == EventTypes.SpaceChild - ) + results.append( + _RoomEntry( + fed_room_id, + fed_room, + children_by_room.get(fed_room_id, []), + ) + ) + + return results async def _is_room_accessible( self, room_id: str, requester: Optional[str], origin: Optional[str] @@ -532,8 +550,18 @@ class SpaceSummaryHandler: ) return False - async def _build_room_entry(self, room_id: str) -> JsonDict: - """Generate en entry suitable for the 'rooms' list in the summary response""" + async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDict: + """ + Generate en entry suitable for the 'rooms' list in the summary response. + + Args: + room_id: The room ID to summarize. + for_federation: True if this is a summary requested over federation + (which includes additional fields). + + Returns: + The JSON dictionary for the room. + """ stats = await self._store.get_room_with_stats(room_id) # currently this should be impossible because we call @@ -546,15 +574,6 @@ class SpaceSummaryHandler: current_state_ids[(EventTypes.Create, "")] ) - room_version = await self._store.get_room_version(room_id) - allowed_rooms = None - if await self._event_auth_handler.has_restricted_join_rules( - current_state_ids, room_version - ): - allowed_rooms = await self._event_auth_handler.get_rooms_that_allow_join( - current_state_ids - ) - entry = { "room_id": stats["room_id"], "name": stats["name"], @@ -569,9 +588,25 @@ class SpaceSummaryHandler: "guest_can_join": stats["guest_access"] == "can_join", "creation_ts": create_event.origin_server_ts, "room_type": create_event.content.get(EventContentFields.ROOM_TYPE), - "allowed_spaces": allowed_rooms, } + # Federation requests need to provide additional information so the + # requested server is able to filter the response appropriately. + if for_federation: + room_version = await self._store.get_room_version(room_id) + if await self._event_auth_handler.has_restricted_join_rules( + current_state_ids, room_version + ): + allowed_rooms = ( + await self._event_auth_handler.get_rooms_that_allow_join( + current_state_ids + ) + ) + if allowed_rooms: + entry["allowed_room_ids"] = allowed_rooms + # TODO Remove this key once the API is stable. + entry["allowed_spaces"] = allowed_rooms + # Filter out Nones – rather omit the field altogether room_entry = {k: v for k, v in entry.items() if v is not None} @@ -606,10 +641,21 @@ class SpaceSummaryHandler: return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class _RoomQueueEntry: - room_id = attr.ib(type=str) - via = attr.ib(type=Sequence[str]) + room_id: str + via: Sequence[str] + + +@attr.s(frozen=True, slots=True, auto_attribs=True) +class _RoomEntry: + room_id: str + # The room summary for this room. + room: JsonDict + # An iterable of the sorted, stripped children events for children of this room. + # + # This may not include all children. + children: Collection[JsonDict] = () def _has_valid_via(e: EventBase) -> bool: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f30bfcc93c..590642f510 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -269,14 +269,22 @@ class SyncHandler: self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache( - hs.get_clock(), "sync" - ) self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() self.state_store = self.storage.state + # TODO: flush cache entries on subsequent sync request. + # Once we get the next /sync request (ie, one with the same access token + # that sets 'since' to 'next_batch'), we know that device won't need a + # cached result any more, and we could flush the entry from the cache to save + # memory. + self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache( + hs.get_clock(), + "sync", + timeout_ms=hs.config.caches.sync_response_cache_duration, + ) + # ExpiringCache((User, Device)) -> LruCache(user_id => event_id) self.lazy_loaded_members_cache: ExpiringCache[ Tuple[str, Optional[str]], LruCache[str, str] diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0cb651a400..a97c448595 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -335,7 +335,8 @@ class TypingWriterHandler(FollowerTypingHandler): ) if not is_in_room: logger.info( - "Ignoring typing update from %s as we're not in the room", + "Ignoring typing update for room %r from server %s as we're not in the room", + room_id, origin, ) return |