diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 21a17cd2e8..4ab4046650 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -392,9 +392,6 @@ class ApplicationServicesHandler:
protocols[p].append(info)
def _merge_instances(infos: List[JsonDict]) -> JsonDict:
- if not infos:
- return {}
-
# Merge the 'instances' lists of multiple results, but just take
# the other fields from the first as they ought to be identical
# copy the result so as not to corrupt the cached one
@@ -406,7 +403,9 @@ class ApplicationServicesHandler:
return combined
- return {p: _merge_instances(protocols[p]) for p in protocols.keys()}
+ return {
+ p: _merge_instances(protocols[p]) for p in protocols.keys() if protocols[p]
+ }
async def _get_services_for_event(
self, event: EventBase
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8197b60b76..9a5e726533 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -42,6 +42,7 @@ from twisted.internet import defer
from synapse import event_auth
from synapse.api.constants import (
+ EventContentFields,
EventTypes,
Membership,
RejectedReason,
@@ -108,21 +109,33 @@ soft_failed_event_counter = Counter(
)
-@attr.s(slots=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class _NewEventInfo:
"""Holds information about a received event, ready for passing to _auth_and_persist_events
Attributes:
event: the received event
- state: the state at that event
+ state: the state at that event, according to /state_ids from a remote
+ homeserver. Only populated for backfilled events which are going to be a
+ new backwards extremity.
+
+ claimed_auth_event_map: a map of (type, state_key) => event for the event's
+ claimed auth_events.
+
+ This can include events which have not yet been persisted, in the case that
+ we are backfilling a batch of events.
+
+ Note: May be incomplete: if we were unable to find all of the claimed auth
+ events. Also, treat the contents with caution: the events might also have
+ been rejected, might not yet have been authorized themselves, or they might
+ be in the wrong room.
- auth_events: the auth_event map for that event
"""
- event = attr.ib(type=EventBase)
- state = attr.ib(type=Optional[Sequence[EventBase]], default=None)
- auth_events = attr.ib(type=Optional[MutableStateMap[EventBase]], default=None)
+ event: EventBase
+ state: Optional[Sequence[EventBase]]
+ claimed_auth_event_map: StateMap[EventBase]
class FederationHandler(BaseHandler):
@@ -262,7 +275,12 @@ class FederationHandler(BaseHandler):
state = None
- # Get missing pdus if necessary.
+ # Check that the event passes auth based on the state at the event. This is
+ # done for events that are to be added to the timeline (non-outliers).
+ #
+ # Get missing pdus if necessary:
+ # - Fetching any missing prev events to fill in gaps in the graph
+ # - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +450,13 @@ class FederationHandler(BaseHandler):
affected=event_id,
)
+ # A second round of checks for all events. Check that the event passes auth
+ # based on `auth_events`, this allows us to assert that the event would
+ # have been allowed at some point. If an event passes this check its OK
+ # for it to be used as part of a returned `/state` request, as either
+ # a) we received the event as part of the original join and so trust it, or
+ # b) we'll do a state resolution with existing state before it becomes
+ # part of the "current state", which adds more protection.
await self._process_received_pdu(origin, pdu, state=state)
async def _get_missing_events_for_pdu(
@@ -889,6 +914,79 @@ class FederationHandler(BaseHandler):
"resync_device_due_to_pdu", self._resync_device, event.sender
)
+ await self._handle_marker_event(origin, event)
+
+ async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+ """Handles backfilling the insertion event when we receive a marker
+ event that points to one.
+
+ Args:
+ origin: Origin of the event. Will be called to get the insertion event
+ marker_event: The event to process
+ """
+
+ if marker_event.type != EventTypes.MSC2716_MARKER:
+ # Not a marker event
+ return
+
+ if marker_event.rejected_reason is not None:
+ # Rejected event
+ return
+
+ # Skip processing a marker event if the room version doesn't
+ # support it.
+ room_version = await self.store.get_room_version(marker_event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ logger.debug("_handle_marker_event: received %s", marker_event)
+
+ insertion_event_id = marker_event.content.get(
+ EventContentFields.MSC2716_MARKER_INSERTION
+ )
+
+ if insertion_event_id is None:
+ # Nothing to retrieve then (invalid marker)
+ return
+
+ logger.debug(
+ "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+ )
+
+ await self._get_events_and_persist(
+ origin,
+ marker_event.room_id,
+ [insertion_event_id],
+ )
+
+ insertion_event = await self.store.get_event(
+ insertion_event_id, allow_none=True
+ )
+ if insertion_event is None:
+ logger.warning(
+ "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+ origin,
+ insertion_event_id,
+ marker_event.event_id,
+ )
+ return
+
+ logger.debug(
+ "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+ insertion_event,
+ marker_event,
+ )
+
+ await self.store.insert_insertion_extremity(
+ insertion_event_id, marker_event.room_id
+ )
+
+ logger.debug(
+ "_handle_marker_event: insertion extremity added for %s from marker event %s",
+ insertion_event,
+ marker_event,
+ )
+
async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
@@ -1000,7 +1098,7 @@ class FederationHandler(BaseHandler):
_NewEventInfo(
event=ev,
state=events_to_state[e_id],
- auth_events={
+ claimed_auth_event_map={
(
auth_events[a_id].type,
auth_events[a_id].state_key,
@@ -1057,9 +1155,19 @@ class FederationHandler(BaseHandler):
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
- extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
+ oldest_events_with_depth = (
+ await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
+ )
+ insertion_events_to_be_backfilled = (
+ await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
+ )
+ logger.debug(
+ "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
+ oldest_events_with_depth,
+ insertion_events_to_be_backfilled,
+ )
- if not extremities:
+ if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
return False
@@ -1089,10 +1197,12 @@ class FederationHandler(BaseHandler):
# state *before* the event, ignoring the special casing certain event
# types have.
- forward_events = await self.store.get_successor_events(list(extremities))
+ forward_event_ids = await self.store.get_successor_events(
+ list(oldest_events_with_depth)
+ )
extremities_events = await self.store.get_events(
- forward_events,
+ forward_event_ids,
redact_behaviour=EventRedactBehaviour.AS_IS,
get_prev_content=False,
)
@@ -1106,10 +1216,19 @@ class FederationHandler(BaseHandler):
redact=False,
check_history_visibility_only=True,
)
+ logger.debug(
+ "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
+ )
- if not filtered_extremities:
+ if not filtered_extremities and not insertion_events_to_be_backfilled:
return False
+ extremities = {
+ **oldest_events_with_depth,
+ # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
+ **insertion_events_to_be_backfilled,
+ }
+
# Check if we reached a point where we should start backfilling.
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
max_depth = sorted_extremeties_tuple[0][1]
@@ -2208,7 +2327,7 @@ class FederationHandler(BaseHandler):
event: EventBase,
context: EventContext,
state: Optional[Iterable[EventBase]] = None,
- auth_events: Optional[MutableStateMap[EventBase]] = None,
+ claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
backfilled: bool = False,
) -> None:
"""
@@ -2220,17 +2339,18 @@ class FederationHandler(BaseHandler):
context:
The event context.
- NB that this function potentially modifies it.
state:
The state events used to check the event for soft-fail. If this is
not provided the current state events will be used.
- auth_events:
- Map from (event_type, state_key) to event
- Normally, our calculated auth_events based on the state of the room
- at the event's position in the DAG, though occasionally (eg if the
- event is an outlier), may be the auth events claimed by the remote
- server.
+ claimed_auth_event_map:
+ A map of (type, state_key) => event for the event's claimed auth_events.
+ Possibly incomplete, and possibly including events that are not yet
+ persisted, or authed, or in the right room.
+
+ Only populated where we may not already have persisted these events -
+ for example, when populating outliers.
+
backfilled: True if the event was backfilled.
"""
context = await self._check_event_auth(
@@ -2238,7 +2358,7 @@ class FederationHandler(BaseHandler):
event,
context,
state=state,
- auth_events=auth_events,
+ claimed_auth_event_map=claimed_auth_event_map,
backfilled=backfilled,
)
@@ -2302,7 +2422,7 @@ class FederationHandler(BaseHandler):
event,
res,
state=ev_info.state,
- auth_events=ev_info.auth_events,
+ claimed_auth_event_map=ev_info.claimed_auth_event_map,
backfilled=backfilled,
)
return res
@@ -2568,7 +2688,7 @@ class FederationHandler(BaseHandler):
event: EventBase,
context: EventContext,
state: Optional[Iterable[EventBase]] = None,
- auth_events: Optional[MutableStateMap[EventBase]] = None,
+ claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
backfilled: bool = False,
) -> EventContext:
"""
@@ -2580,21 +2700,19 @@ class FederationHandler(BaseHandler):
context:
The event context.
- NB that this function potentially modifies it.
state:
The state events used to check the event for soft-fail. If this is
not provided the current state events will be used.
- auth_events:
- Map from (event_type, state_key) to event
- Normally, our calculated auth_events based on the state of the room
- at the event's position in the DAG, though occasionally (eg if the
- event is an outlier), may be the auth events claimed by the remote
- server.
+ claimed_auth_event_map:
+ A map of (type, state_key) => event for the event's claimed auth_events.
+ Possibly incomplete, and possibly including events that are not yet
+ persisted, or authed, or in the right room.
- Also NB that this function adds entries to it.
+ Only populated where we may not already have persisted these events -
+ for example, when populating outliers, or the state for a backwards
+ extremity.
- If this is not provided, it is calculated from the previous state IDs.
backfilled: True if the event was backfilled.
Returns:
@@ -2603,7 +2721,12 @@ class FederationHandler(BaseHandler):
room_version = await self.store.get_room_version_id(event.room_id)
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- if not auth_events:
+ if claimed_auth_event_map:
+ # if we have a copy of the auth events from the event, use that as the
+ # basis for auth.
+ auth_events = claimed_auth_event_map
+ else:
+ # otherwise, we calculate what the auth events *should* be, and use that
prev_state_ids = await context.get_prev_state_ids()
auth_events_ids = self._event_auth_handler.compute_auth_events(
event, prev_state_ids, for_verification=True
@@ -2611,18 +2734,11 @@ class FederationHandler(BaseHandler):
auth_events_x = await self.store.get_events(auth_events_ids)
auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
- # This is a hack to fix some old rooms where the initial join event
- # didn't reference the create event in its auth events.
- if event.type == EventTypes.Member and not event.auth_event_ids():
- if len(event.prev_event_ids()) == 1 and event.depth < 5:
- c = await self.store.get_event(
- event.prev_event_ids()[0], allow_none=True
- )
- if c and c.type == EventTypes.Create:
- auth_events[(c.type, c.state_key)] = c
-
try:
- context = await self._update_auth_events_and_context_for_auth(
+ (
+ context,
+ auth_events_for_auth,
+ ) = await self._update_auth_events_and_context_for_auth(
origin, event, context, auth_events
)
except Exception:
@@ -2635,9 +2751,10 @@ class FederationHandler(BaseHandler):
"Ignoring failure and continuing processing of event.",
event.event_id,
)
+ auth_events_for_auth = auth_events
try:
- event_auth.check(room_version_obj, event, auth_events=auth_events)
+ event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
except AuthError as e:
logger.warning("Failed auth resolution for %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
@@ -2662,8 +2779,8 @@ class FederationHandler(BaseHandler):
origin: str,
event: EventBase,
context: EventContext,
- auth_events: MutableStateMap[EventBase],
- ) -> EventContext:
+ input_auth_events: StateMap[EventBase],
+ ) -> Tuple[EventContext, StateMap[EventBase]]:
"""Helper for _check_event_auth. See there for docs.
Checks whether a given event has the expected auth events. If it
@@ -2680,7 +2797,7 @@ class FederationHandler(BaseHandler):
event:
context:
- auth_events:
+ input_auth_events:
Map from (event_type, state_key) to event
Normally, our calculated auth_events based on the state of the room
@@ -2688,11 +2805,12 @@ class FederationHandler(BaseHandler):
event is an outlier), may be the auth events claimed by the remote
server.
- Also NB that this function adds entries to it.
-
Returns:
- updated context
+ updated context, updated auth event map
"""
+ # take a copy of input_auth_events before we modify it.
+ auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
+
event_auth_events = set(event.auth_event_ids())
# missing_auth is the set of the event's auth_events which we don't yet have
@@ -2721,7 +2839,7 @@ class FederationHandler(BaseHandler):
# The other side isn't around or doesn't implement the
# endpoint, so lets just bail out.
logger.info("Failed to get event auth from remote: %s", e1)
- return context
+ return context, auth_events
seen_remotes = await self.store.have_seen_events(
event.room_id, [e.event_id for e in remote_auth_chain]
@@ -2752,7 +2870,10 @@ class FederationHandler(BaseHandler):
await self.state_handler.compute_event_context(e)
)
await self._auth_and_persist_event(
- origin, e, missing_auth_event_context, auth_events=auth
+ origin,
+ e,
+ missing_auth_event_context,
+ claimed_auth_event_map=auth,
)
if e.event_id in event_auth_events:
@@ -2770,14 +2891,14 @@ class FederationHandler(BaseHandler):
# obviously be empty
# (b) alternatively, why don't we do it earlier?
logger.info("Skipping auth_event fetch for outlier")
- return context
+ return context, auth_events
different_auth = event_auth_events.difference(
e.event_id for e in auth_events.values()
)
if not different_auth:
- return context
+ return context, auth_events
logger.info(
"auth_events refers to events which are not in our calculated auth "
@@ -2803,7 +2924,7 @@ class FederationHandler(BaseHandler):
# XXX: should we reject the event in this case? It feels like we should,
# but then shouldn't we also do so if we've failed to fetch any of the
# auth events?
- return context
+ return context, auth_events
# now we state-resolve between our own idea of the auth events, and the remote's
# idea of them.
@@ -2833,7 +2954,7 @@ class FederationHandler(BaseHandler):
event, context, auth_events
)
- return context
+ return context, auth_events
async def _update_context_for_auth_events(
self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 0961dec5ab..8ffeabacf9 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -824,6 +824,7 @@ class IdentityHandler(BaseHandler):
room_avatar_url: str,
room_join_rules: str,
room_name: str,
+ room_type: Optional[str],
inviter_display_name: str,
inviter_avatar_url: str,
id_access_token: Optional[str] = None,
@@ -843,6 +844,7 @@ class IdentityHandler(BaseHandler):
notifications.
room_join_rules: The join rules of the email (e.g. "public").
room_name: The m.room.name of the room.
+ room_type: The type of the room from its m.room.create event (e.g "m.space").
inviter_display_name: The current display name of the
inviter.
inviter_avatar_url: The URL of the inviter's avatar.
@@ -869,6 +871,10 @@ class IdentityHandler(BaseHandler):
"sender_display_name": inviter_display_name,
"sender_avatar_url": inviter_avatar_url,
}
+
+ if room_type is not None:
+ invite_config["org.matrix.msc3288.room_type"] = room_type
+
# If a custom web client location is available, include it in the request.
if self._web_client_location:
invite_config["org.matrix.web_client_location"] = self._web_client_location
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 016c5df2ca..7ca14e1d84 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1184,8 +1184,7 @@ class PresenceHandler(BasePresenceHandler):
new_fields = {"state": presence}
if not ignore_status_msg:
- msg = status_msg if presence != PresenceState.OFFLINE else None
- new_fields["status_msg"] = msg
+ new_fields["status_msg"] = status_msg
if presence == PresenceState.ONLINE or (
presence == PresenceState.BUSY and self._busy_presence_enabled
@@ -1478,7 +1477,7 @@ def format_user_presence_state(
content["user_id"] = state.user_id
if state.last_active_ts:
content["last_active_ago"] = now - state.last_active_ts
- if state.status_msg and state.state != PresenceState.OFFLINE:
+ if state.status_msg:
content["status_msg"] = state.status_msg
if state.state == PresenceState.ONLINE:
content["currently_active"] = state.currently_active
@@ -1840,9 +1839,7 @@ def handle_timeout(
# don't set them as offline.
sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
- state = state.copy_and_replace(
- state=PresenceState.OFFLINE, status_msg=None
- )
+ state = state.copy_and_replace(state=PresenceState.OFFLINE)
changed = True
else:
# We expect to be poked occasionally by the other side.
@@ -1850,7 +1847,7 @@ def handle_timeout(
# no one gets stuck online forever.
if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
# The other side seems to have disappeared.
- state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
+ state = state.copy_and_replace(state=PresenceState.OFFLINE)
changed = True
return state if changed else None
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index b9085bbccb..5fd4525700 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -70,7 +70,8 @@ class ReceiptsHandler(BaseHandler):
)
if not is_in_room:
logger.info(
- "Ignoring receipt from %s as we're not in the room",
+ "Ignoring receipt for room %r from server %s as we're not in the room",
+ room_id,
origin,
)
continue
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index fae2c098e3..6d433fad41 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -356,6 +356,12 @@ class RoomListHandler(BaseHandler):
include_all_networks: bool = False,
third_party_instance_id: Optional[str] = None,
) -> JsonDict:
+ """Get the public room list from remote server
+
+ Raises:
+ SynapseError
+ """
+
if not self.enable_room_list_search:
return {"chunk": [], "total_room_count_estimate": 0}
@@ -395,13 +401,16 @@ class RoomListHandler(BaseHandler):
limit = None
since_token = None
- res = await self._get_remote_list_cached(
- server_name,
- limit=limit,
- since_token=since_token,
- include_all_networks=include_all_networks,
- third_party_instance_id=third_party_instance_id,
- )
+ try:
+ res = await self._get_remote_list_cached(
+ server_name,
+ limit=limit,
+ since_token=since_token,
+ include_all_networks=include_all_networks,
+ third_party_instance_id=third_party_instance_id,
+ )
+ except (RequestSendFailed, HttpResponseException):
+ raise SynapseError(502, "Failed to fetch room list")
if search_filter:
res = {
@@ -423,20 +432,21 @@ class RoomListHandler(BaseHandler):
include_all_networks: bool = False,
third_party_instance_id: Optional[str] = None,
) -> JsonDict:
+ """Wrapper around FederationClient.get_public_rooms that caches the
+ result.
+ """
+
repl_layer = self.hs.get_federation_client()
if search_filter:
# We can't cache when asking for search
- try:
- return await repl_layer.get_public_rooms(
- server_name,
- limit=limit,
- since_token=since_token,
- search_filter=search_filter,
- include_all_networks=include_all_networks,
- third_party_instance_id=third_party_instance_id,
- )
- except (RequestSendFailed, HttpResponseException):
- raise SynapseError(502, "Failed to fetch room list")
+ return await repl_layer.get_public_rooms(
+ server_name,
+ limit=limit,
+ since_token=since_token,
+ search_filter=search_filter,
+ include_all_networks=include_all_networks,
+ third_party_instance_id=third_party_instance_id,
+ )
key = (
server_name,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 65ad3efa6a..ba13196218 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -19,7 +19,12 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
from synapse import types
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import (
+ AccountDataTypes,
+ EventContentFields,
+ EventTypes,
+ Membership,
+)
from synapse.api.errors import (
AuthError,
Codes,
@@ -1237,6 +1242,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if room_name_event:
room_name = room_name_event.content.get("name", "")
+ room_type = None
+ room_create_event = room_state.get((EventTypes.Create, ""))
+ if room_create_event:
+ room_type = room_create_event.content.get(EventContentFields.ROOM_TYPE)
+
room_join_rules = ""
join_rules_event = room_state.get((EventTypes.JoinRules, ""))
if join_rules_event:
@@ -1263,6 +1273,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
room_avatar_url=room_avatar_url,
room_join_rules=room_join_rules,
room_name=room_name,
+ room_type=room_type,
inviter_display_name=inviter_display_name,
inviter_avatar_url=inviter_avatar_url,
id_access_token=id_access_token,
diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py
index e9f6aef06f..dda9659c11 100644
--- a/synapse/handlers/send_email.py
+++ b/synapse/handlers/send_email.py
@@ -16,7 +16,12 @@ import email.utils
import logging
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
-from typing import TYPE_CHECKING
+from io import BytesIO
+from typing import TYPE_CHECKING, Optional
+
+from twisted.internet.defer import Deferred
+from twisted.internet.interfaces import IReactorTCP
+from twisted.mail.smtp import ESMTPSenderFactory
from synapse.logging.context import make_deferred_yieldable
@@ -26,19 +31,75 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+async def _sendmail(
+ reactor: IReactorTCP,
+ smtphost: str,
+ smtpport: int,
+ from_addr: str,
+ to_addr: str,
+ msg_bytes: bytes,
+ username: Optional[bytes] = None,
+ password: Optional[bytes] = None,
+ require_auth: bool = False,
+ require_tls: bool = False,
+ tls_hostname: Optional[str] = None,
+) -> None:
+ """A simple wrapper around ESMTPSenderFactory, to allow substitution in tests
+
+ Params:
+ reactor: reactor to use to make the outbound connection
+ smtphost: hostname to connect to
+ smtpport: port to connect to
+ from_addr: "From" address for email
+ to_addr: "To" address for email
+ msg_bytes: Message content
+ username: username to authenticate with, if auth is enabled
+ password: password to give when authenticating
+ require_auth: if auth is not offered, fail the request
+ require_tls: if TLS is not offered, fail the reqest
+ tls_hostname: TLS hostname to check for. None to disable TLS.
+ """
+ msg = BytesIO(msg_bytes)
+
+ d: "Deferred[object]" = Deferred()
+
+ factory = ESMTPSenderFactory(
+ username,
+ password,
+ from_addr,
+ to_addr,
+ msg,
+ d,
+ heloFallback=True,
+ requireAuthentication=require_auth,
+ requireTransportSecurity=require_tls,
+ hostname=tls_hostname,
+ )
+
+ # the IReactorTCP interface claims host has to be a bytes, which seems to be wrong
+ reactor.connectTCP(smtphost, smtpport, factory, timeout=30, bindAddress=None) # type: ignore[arg-type]
+
+ await make_deferred_yieldable(d)
+
+
class SendEmailHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
- self._sendmail = hs.get_sendmail()
self._reactor = hs.get_reactor()
self._from = hs.config.email.email_notif_from
self._smtp_host = hs.config.email.email_smtp_host
self._smtp_port = hs.config.email.email_smtp_port
- self._smtp_user = hs.config.email.email_smtp_user
- self._smtp_pass = hs.config.email.email_smtp_pass
+
+ user = hs.config.email.email_smtp_user
+ self._smtp_user = user.encode("utf-8") if user is not None else None
+ passwd = hs.config.email.email_smtp_pass
+ self._smtp_pass = passwd.encode("utf-8") if passwd is not None else None
self._require_transport_security = hs.config.email.require_transport_security
+ self._enable_tls = hs.config.email.enable_smtp_tls
+
+ self._sendmail = _sendmail
async def send_email(
self,
@@ -82,17 +143,16 @@ class SendEmailHandler:
logger.info("Sending email to %s" % email_address)
- await make_deferred_yieldable(
- self._sendmail(
- self._smtp_host,
- raw_from,
- raw_to,
- multipart_msg.as_string().encode("utf8"),
- reactor=self._reactor,
- port=self._smtp_port,
- requireAuthentication=self._smtp_user is not None,
- username=self._smtp_user,
- password=self._smtp_pass,
- requireTransportSecurity=self._require_transport_security,
- )
+ await self._sendmail(
+ self._reactor,
+ self._smtp_host,
+ self._smtp_port,
+ raw_from,
+ raw_to,
+ multipart_msg.as_string().encode("utf8"),
+ username=self._smtp_user,
+ password=self._smtp_pass,
+ require_auth=self._smtp_user is not None,
+ require_tls=self._require_transport_security,
+ tls_hostname=self._smtp_host if self._enable_tls else None,
)
diff --git a/synapse/handlers/space_summary.py b/synapse/handlers/space_summary.py
index 5f7d4602bd..2517f278b6 100644
--- a/synapse/handlers/space_summary.py
+++ b/synapse/handlers/space_summary.py
@@ -16,7 +16,17 @@ import itertools
import logging
import re
from collections import deque
-from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Dict,
+ Iterable,
+ List,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+)
import attr
@@ -116,20 +126,22 @@ class SpaceSummaryHandler:
max_children = max_rooms_per_space if processed_rooms else None
if is_in_room:
- room, events = await self._summarize_local_room(
+ room_entry = await self._summarize_local_room(
requester, None, room_id, suggested_only, max_children
)
+ events: Collection[JsonDict] = []
+ if room_entry:
+ rooms_result.append(room_entry.room)
+ events = room_entry.children
+
logger.debug(
"Query of local room %s returned events %s",
room_id,
["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
-
- if room:
- rooms_result.append(room)
else:
- fed_rooms, fed_events = await self._summarize_remote_room(
+ fed_rooms = await self._summarize_remote_room(
queue_entry,
suggested_only,
max_children,
@@ -141,12 +153,10 @@ class SpaceSummaryHandler:
# user is not permitted see.
#
# Filter the returned results to only what is accessible to the user.
- room_ids = set()
events = []
- for room in fed_rooms:
- fed_room_id = room.get("room_id")
- if not fed_room_id or not isinstance(fed_room_id, str):
- continue
+ for room_entry in fed_rooms:
+ room = room_entry.room
+ fed_room_id = room_entry.room_id
# The room should only be included in the summary if:
# a. the user is in the room;
@@ -169,7 +179,9 @@ class SpaceSummaryHandler:
# Check if the user is a member of any of the allowed spaces
# from the response.
- allowed_rooms = room.get("allowed_spaces")
+ allowed_rooms = room.get("allowed_room_ids") or room.get(
+ "allowed_spaces"
+ )
if (
not include_room
and allowed_rooms
@@ -188,22 +200,23 @@ class SpaceSummaryHandler:
# The user can see the room, include it!
if include_room:
+ # Before returning to the client, remove the allowed_room_ids
+ # and allowed_spaces keys.
+ room.pop("allowed_room_ids", None)
+ room.pop("allowed_spaces", None)
+
rooms_result.append(room)
- room_ids.add(fed_room_id)
+ events.extend(room_entry.children)
# All rooms returned don't need visiting again (even if the user
# didn't have access to them).
processed_rooms.add(fed_room_id)
- for event in fed_events:
- if event.get("room_id") in room_ids:
- events.append(event)
-
logger.debug(
"Query of %s returned rooms %s, events %s",
room_id,
- [room.get("room_id") for room in fed_rooms],
- ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in fed_events],
+ [room_entry.room.get("room_id") for room_entry in fed_rooms],
+ ["%s->%s" % (ev["room_id"], ev["state_key"]) for ev in events],
)
# the room we queried may or may not have been returned, but don't process
@@ -230,11 +243,6 @@ class SpaceSummaryHandler:
)
processed_events.add(ev_key)
- # Before returning to the client, remove the allowed_spaces key for any
- # rooms.
- for room in rooms_result:
- room.pop("allowed_spaces", None)
-
return {"rooms": rooms_result, "events": events_result}
async def federation_space_summary(
@@ -283,20 +291,20 @@ class SpaceSummaryHandler:
# already done this room
continue
- logger.debug("Processing room %s", room_id)
-
- room, events = await self._summarize_local_room(
+ room_entry = await self._summarize_local_room(
None, origin, room_id, suggested_only, max_rooms_per_space
)
processed_rooms.add(room_id)
- if room:
- rooms_result.append(room)
- events_result.extend(events)
+ if room_entry:
+ rooms_result.append(room_entry.room)
+ events_result.extend(room_entry.children)
- # add any children to the queue
- room_queue.extend(edge_event["state_key"] for edge_event in events)
+ # add any children to the queue
+ room_queue.extend(
+ edge_event["state_key"] for edge_event in room_entry.children
+ )
return {"rooms": rooms_result, "events": events_result}
@@ -307,7 +315,7 @@ class SpaceSummaryHandler:
room_id: str,
suggested_only: bool,
max_children: Optional[int],
- ) -> Tuple[Optional[JsonDict], Sequence[JsonDict]]:
+ ) -> Optional["_RoomEntry"]:
"""
Generate a room entry and a list of event entries for a given room.
@@ -326,21 +334,16 @@ class SpaceSummaryHandler:
to a server-set limit.
Returns:
- A tuple of:
- The room information, if the room should be returned to the
- user. None, otherwise.
-
- An iterable of the sorted children events. This may be limited
- to a maximum size or may include all children.
+ A room entry if the room should be returned. None, otherwise.
"""
if not await self._is_room_accessible(room_id, requester, origin):
- return None, ()
+ return None
- room_entry = await self._build_room_entry(room_id)
+ room_entry = await self._build_room_entry(room_id, for_federation=bool(origin))
# If the room is not a space, return just the room information.
if room_entry.get("room_type") != RoomTypes.SPACE:
- return room_entry, ()
+ return _RoomEntry(room_id, room_entry)
# Otherwise, look for child rooms/spaces.
child_events = await self._get_child_events(room_id)
@@ -363,7 +366,7 @@ class SpaceSummaryHandler:
)
)
- return room_entry, events_result
+ return _RoomEntry(room_id, room_entry, events_result)
async def _summarize_remote_room(
self,
@@ -371,7 +374,7 @@ class SpaceSummaryHandler:
suggested_only: bool,
max_children: Optional[int],
exclude_rooms: Iterable[str],
- ) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
+ ) -> Iterable["_RoomEntry"]:
"""
Request room entries and a list of event entries for a given room by querying a remote server.
@@ -386,11 +389,7 @@ class SpaceSummaryHandler:
Rooms IDs which do not need to be summarized.
Returns:
- A tuple of:
- An iterable of rooms.
-
- An iterable of the sorted children events. This may be limited
- to a maximum size or may include all children.
+ An iterable of room entries.
"""
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)
@@ -414,11 +413,30 @@ class SpaceSummaryHandler:
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
- return (), ()
+ return ()
+
+ # Group the events by their room.
+ children_by_room: Dict[str, List[JsonDict]] = {}
+ for ev in res.events:
+ if ev.event_type == EventTypes.SpaceChild:
+ children_by_room.setdefault(ev.room_id, []).append(ev.data)
+
+ # Generate the final results.
+ results = []
+ for fed_room in res.rooms:
+ fed_room_id = fed_room.get("room_id")
+ if not fed_room_id or not isinstance(fed_room_id, str):
+ continue
- return res.rooms, tuple(
- ev.data for ev in res.events if ev.event_type == EventTypes.SpaceChild
- )
+ results.append(
+ _RoomEntry(
+ fed_room_id,
+ fed_room,
+ children_by_room.get(fed_room_id, []),
+ )
+ )
+
+ return results
async def _is_room_accessible(
self, room_id: str, requester: Optional[str], origin: Optional[str]
@@ -532,8 +550,18 @@ class SpaceSummaryHandler:
)
return False
- async def _build_room_entry(self, room_id: str) -> JsonDict:
- """Generate en entry suitable for the 'rooms' list in the summary response"""
+ async def _build_room_entry(self, room_id: str, for_federation: bool) -> JsonDict:
+ """
+ Generate en entry suitable for the 'rooms' list in the summary response.
+
+ Args:
+ room_id: The room ID to summarize.
+ for_federation: True if this is a summary requested over federation
+ (which includes additional fields).
+
+ Returns:
+ The JSON dictionary for the room.
+ """
stats = await self._store.get_room_with_stats(room_id)
# currently this should be impossible because we call
@@ -546,15 +574,6 @@ class SpaceSummaryHandler:
current_state_ids[(EventTypes.Create, "")]
)
- room_version = await self._store.get_room_version(room_id)
- allowed_rooms = None
- if await self._event_auth_handler.has_restricted_join_rules(
- current_state_ids, room_version
- ):
- allowed_rooms = await self._event_auth_handler.get_rooms_that_allow_join(
- current_state_ids
- )
-
entry = {
"room_id": stats["room_id"],
"name": stats["name"],
@@ -569,9 +588,25 @@ class SpaceSummaryHandler:
"guest_can_join": stats["guest_access"] == "can_join",
"creation_ts": create_event.origin_server_ts,
"room_type": create_event.content.get(EventContentFields.ROOM_TYPE),
- "allowed_spaces": allowed_rooms,
}
+ # Federation requests need to provide additional information so the
+ # requested server is able to filter the response appropriately.
+ if for_federation:
+ room_version = await self._store.get_room_version(room_id)
+ if await self._event_auth_handler.has_restricted_join_rules(
+ current_state_ids, room_version
+ ):
+ allowed_rooms = (
+ await self._event_auth_handler.get_rooms_that_allow_join(
+ current_state_ids
+ )
+ )
+ if allowed_rooms:
+ entry["allowed_room_ids"] = allowed_rooms
+ # TODO Remove this key once the API is stable.
+ entry["allowed_spaces"] = allowed_rooms
+
# Filter out Nones – rather omit the field altogether
room_entry = {k: v for k, v in entry.items() if v is not None}
@@ -606,10 +641,21 @@ class SpaceSummaryHandler:
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
-@attr.s(frozen=True, slots=True)
+@attr.s(frozen=True, slots=True, auto_attribs=True)
class _RoomQueueEntry:
- room_id = attr.ib(type=str)
- via = attr.ib(type=Sequence[str])
+ room_id: str
+ via: Sequence[str]
+
+
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class _RoomEntry:
+ room_id: str
+ # The room summary for this room.
+ room: JsonDict
+ # An iterable of the sorted, stripped children events for children of this room.
+ #
+ # This may not include all children.
+ children: Collection[JsonDict] = ()
def _has_valid_via(e: EventBase) -> bool:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f30bfcc93c..590642f510 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -269,14 +269,22 @@ class SyncHandler:
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
- hs.get_clock(), "sync"
- )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
self.state_store = self.storage.state
+ # TODO: flush cache entries on subsequent sync request.
+ # Once we get the next /sync request (ie, one with the same access token
+ # that sets 'since' to 'next_batch'), we know that device won't need a
+ # cached result any more, and we could flush the entry from the cache to save
+ # memory.
+ self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
+ hs.get_clock(),
+ "sync",
+ timeout_ms=hs.config.caches.sync_response_cache_duration,
+ )
+
# ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
self.lazy_loaded_members_cache: ExpiringCache[
Tuple[str, Optional[str]], LruCache[str, str]
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0cb651a400..a97c448595 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -335,7 +335,8 @@ class TypingWriterHandler(FollowerTypingHandler):
)
if not is_in_room:
logger.info(
- "Ignoring typing update from %s as we're not in the room",
+ "Ignoring typing update for room %r from server %s as we're not in the room",
+ room_id,
origin,
)
return
|