diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eddc7582d0..51bdf97920 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,7 +1,8 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017-2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+# Copyright 2020 Sorunome
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -111,13 +112,13 @@ class _NewEventInfo:
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
- Responsible for:
- a) handling received Pdus before handing them on as Events to the rest
- of the homeserver (including auth and state conflict resolutions)
- b) converting events that were produced by local clients that may need
- to be sent to remote homeservers.
- c) doing the necessary dances to invite remote users and join remote
- rooms.
+ Responsible for:
+ a) handling received Pdus before handing them on as Events to the rest
+ of the homeserver (including auth and state conflict resolutions)
+ b) converting events that were produced by local clients that may need
+ to be sent to remote homeservers.
+ c) doing the necessary dances to invite remote users and join remote
+ rooms.
"""
def __init__(self, hs: "HomeServer"):
@@ -150,11 +151,11 @@ class FederationHandler(BaseHandler):
)
if hs.config.worker_app:
- self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client(
- hs
+ self._user_device_resync = (
+ ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
- self._maybe_store_room_on_outlier_membership = ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(
- hs
+ self._maybe_store_room_on_outlier_membership = (
+ ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
)
else:
self._device_list_updater = hs.get_device_handler().device_list_updater
@@ -172,7 +173,7 @@ class FederationHandler(BaseHandler):
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:
- """ Process a PDU received via a federation /send/ transaction, or
+ """Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
Args:
@@ -186,7 +187,7 @@ class FederationHandler(BaseHandler):
room_id = pdu.room_id
event_id = pdu.event_id
- logger.info("handling received PDU: %s", pdu)
+ logger.info("[%s %s] handling received PDU: %s", room_id, event_id, pdu)
# We reprocess pdus when we have seen them only as outliers
existing = await self.store.get_event(
@@ -301,6 +302,14 @@ class FederationHandler(BaseHandler):
room_id,
event_id,
)
+ elif missing_prevs:
+ logger.info(
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id,
+ event_id,
+ len(missing_prevs),
+ shortstr(missing_prevs),
+ )
if prevs - seen:
# We've still not been able to get all of the prev_events for this event.
@@ -345,12 +354,6 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- logger.info(
- "Event %s is missing prev_events: calculating state for a "
- "backwards extremity",
- event_id,
- )
-
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
event_map = {event_id: pdu}
@@ -368,7 +371,8 @@ class FederationHandler(BaseHandler):
# know about
for p in prevs - seen:
logger.info(
- "Requesting state at missing prev_event %s", event_id,
+ "Requesting state at missing prev_event %s",
+ event_id,
)
with nested_logging_context(p):
@@ -388,12 +392,14 @@ class FederationHandler(BaseHandler):
event_map[x.event_id] = x
room_version = await self.store.get_room_version_id(room_id)
- state_map = await self._state_resolution_handler.resolve_events_with_store(
- room_id,
- room_version,
- state_maps,
- event_map,
- state_res_store=StateResolutionStore(self.store),
+ state_map = (
+ await self._state_resolution_handler.resolve_events_with_store(
+ room_id,
+ room_version,
+ state_maps,
+ event_map,
+ state_res_store=StateResolutionStore(self.store),
+ )
)
# We need to give _process_received_pdu the actual state events
@@ -402,9 +408,7 @@ class FederationHandler(BaseHandler):
# First though we need to fetch all the events that are in
# state_map, so we can build up the state below.
evs = await self.store.get_events(
- list(state_map.values()),
- get_prev_content=False,
- redact_behaviour=EventRedactBehaviour.AS_IS,
+ list(state_map.values()), get_prev_content=False,
)
event_map.update(evs)
@@ -687,9 +691,12 @@ class FederationHandler(BaseHandler):
return fetched_events
async def _process_received_pdu(
- self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]],
+ self,
+ origin: str,
+ event: EventBase,
+ state: Optional[Iterable[EventBase]],
):
- """ Called when we have a new pdu. We need to do auth checks and put it
+ """Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
Args:
@@ -801,7 +808,7 @@ class FederationHandler(BaseHandler):
@log_function
async def backfill(self, dest, room_id, limit, extremities):
- """ Trigger a backfill request to `dest` for the given `room_id`
+ """Trigger a backfill request to `dest` for the given `room_id`
This will attempt to get more events from the remote. If the other side
has no new events to offer, this will return an empty list.
@@ -1204,11 +1211,16 @@ class FederationHandler(BaseHandler):
with nested_logging_context(event_id):
try:
event = await self.federation_client.get_pdu(
- [destination], event_id, room_version, outlier=True,
+ [destination],
+ event_id,
+ room_version,
+ outlier=True,
)
if event is None:
logger.warning(
- "Server %s didn't return event %s", destination, event_id,
+ "Server %s didn't return event %s",
+ destination,
+ event_id,
)
return
@@ -1235,7 +1247,8 @@ class FederationHandler(BaseHandler):
if aid not in event_map
]
persisted_events = await self.store.get_events(
- auth_events, allow_rejected=True,
+ auth_events,
+ allow_rejected=True,
)
event_infos = []
@@ -1251,7 +1264,9 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth))
await self._handle_new_events(
- destination, room_id, event_infos,
+ destination,
+ room_id,
+ event_infos,
)
def _sanity_check_event(self, ev):
@@ -1287,7 +1302,7 @@ class FederationHandler(BaseHandler):
raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
async def send_invite(self, target_host, event):
- """ Sends the invite to the remote server for signing.
+ """Sends the invite to the remote server for signing.
Invites must be signed by the invitee's server before distribution.
"""
@@ -1310,7 +1325,7 @@ class FederationHandler(BaseHandler):
async def do_invite_join(
self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict
) -> Tuple[str, int]:
- """ Attempts to join the `joinee` to the room `room_id` via the
+ """Attempts to join the `joinee` to the room `room_id` via the
servers contained in `target_hosts`.
This first triggers a /make_join/ request that returns a partial
@@ -1354,8 +1369,6 @@ class FederationHandler(BaseHandler):
await self._clean_room_for_join(room_id)
- handled_events = set()
-
try:
# Try the host we successfully got a response to /make_join/
# request first.
@@ -1375,10 +1388,6 @@ class FederationHandler(BaseHandler):
auth_chain = ret["auth_chain"]
auth_chain.sort(key=lambda e: e.depth)
- handled_events.update([s.event_id for s in state])
- handled_events.update([a.event_id for a in auth_chain])
- handled_events.add(event.event_id)
-
logger.debug("do_invite_join auth_chain: %s", auth_chain)
logger.debug("do_invite_join state: %s", state)
@@ -1394,7 +1403,8 @@ class FederationHandler(BaseHandler):
# so we can rely on it now.
#
await self.store.upsert_room_on_join(
- room_id=room_id, room_version=room_version_obj,
+ room_id=room_id,
+ room_version=room_version_obj,
)
max_stream_id = await self._persist_auth_tree(
@@ -1439,6 +1449,73 @@ class FederationHandler(BaseHandler):
run_in_background(self._handle_queued_pdus, room_queue)
+ @log_function
+ async def do_knock(
+ self, target_hosts: List[str], room_id: str, knockee: str, content: JsonDict,
+ ) -> Tuple[str, int]:
+ """Sends the knock to the remote server.
+
+ This first triggers a make_knock request that returns a partial
+ event that we can fill out and sign. This is then sent to the
+ remote server via send_knock.
+
+ Knock events must be signed by the knockee's server before distributing.
+
+ Args:
+ target_hosts: A list of hosts that we want to try knocking through.
+ room_id: The ID of the room to knock on.
+ knockee: The ID of the user who is knocking.
+ content: The content of the knock event.
+
+ Returns:
+ A tuple of (event ID, stream ID).
+
+ Raises:
+ SynapseError: If the chosen remote server returns a 3xx/4xx code.
+ RuntimeError: If no servers were reachable.
+ """
+ logger.debug("Knocking on room %s on behalf of user %s", room_id, knockee)
+
+ # Inform the remote server of the room versions we support
+ supported_room_versions = list(KNOWN_ROOM_VERSIONS.keys())
+
+ # Ask the remote server to create a valid knock event for us. Once received,
+ # we sign the event
+ params = {"ver": supported_room_versions} # type: Dict[str, Iterable[str]]
+ origin, event, event_format_version = await self._make_and_verify_event(
+ target_hosts, room_id, knockee, Membership.KNOCK, content, params=params
+ )
+
+ # Record the room ID and its version so that we have a record of the room
+ await self._maybe_store_room_on_outlier_membership(
+ room_id=event.room_id, room_version=event_format_version
+ )
+
+ # Initially try the host that we successfully called /make_knock on
+ try:
+ target_hosts.remove(origin)
+ target_hosts.insert(0, origin)
+ except ValueError:
+ pass
+
+ # Send the signed event back to the room, and potentially receive some
+ # further information about the room in the form of partial state events
+ stripped_room_state = await self.federation_client.send_knock(
+ target_hosts, event
+ )
+
+ # Store any stripped room state events in the "unsigned" key of the event.
+ # This is a bit of a hack and is cribbing off of invites. Basically we
+ # store the room state here and retrieve it again when this event appears
+ # in the invitee's sync stream. It is stripped out for all other local users.
+ event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
+
+ context = await self.state_handler.compute_event_context(event)
+ stream_id = await self.persist_events_and_notify(
+ event.room_id, [(event, context)]
+ )
+ return event.event_id, stream_id
+
async def _handle_queued_pdus(self, room_queue):
"""Process PDUs which got queued up while we were busy send_joining.
@@ -1464,7 +1541,7 @@ class FederationHandler(BaseHandler):
async def on_make_join_request(
self, origin: str, room_id: str, user_id: str
) -> EventBase:
- """ We've received a /make_join/ request, so we create a partial
+ """We've received a /make_join/ request, so we create a partial
join event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
@@ -1489,7 +1566,8 @@ class FederationHandler(BaseHandler):
is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
if not is_in_room:
logger.info(
- "Got /make_join request for room %s we are no longer in", room_id,
+ "Got /make_join request for room %s we are no longer in",
+ room_id,
)
raise NotFoundError("Not an active room on this server")
@@ -1523,7 +1601,7 @@ class FederationHandler(BaseHandler):
return event
async def on_send_join_request(self, origin, pdu):
- """ We have received a join event for a room. Fully process it and
+ """We have received a join event for a room. Fully process it and
respond with the current state and auth chains.
"""
event = pdu
@@ -1579,7 +1657,7 @@ class FederationHandler(BaseHandler):
async def on_invite_request(
self, origin: str, event: EventBase, room_version: RoomVersion
):
- """ We've got an invite event. Process and persist it. Sign it.
+ """We've got an invite event. Process and persist it. Sign it.
Respond with the now signed event.
"""
@@ -1593,8 +1671,15 @@ class FederationHandler(BaseHandler):
if self.hs.config.block_non_admin_invites:
raise SynapseError(403, "This server does not accept room invites")
+ is_published = await self.store.is_room_published(event.room_id)
+
if not await self.spam_checker.user_may_invite(
- event.sender, event.state_key, event.room_id
+ event.sender,
+ event.state_key,
+ None,
+ room_id=event.room_id,
+ new_room=False,
+ published_room=is_published,
):
raise SynapseError(
403, "This user is not permitted to send invites to this server/user"
@@ -1706,7 +1791,7 @@ class FederationHandler(BaseHandler):
async def on_make_leave_request(
self, origin: str, room_id: str, user_id: str
) -> EventBase:
- """ We've received a /make_leave/ request, so we create a partial
+ """We've received a /make_leave/ request, so we create a partial
leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
@@ -1781,9 +1866,122 @@ class FederationHandler(BaseHandler):
return None
- async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
- """Returns the state at the event. i.e. not including said event.
+ @log_function
+ async def on_make_knock_request(
+ self, origin: str, room_id: str, user_id: str
+ ) -> EventBase:
+ """We've received a make_knock request, so we create a partial
+ knock event for the room and return that. We do *not* persist or
+ process it until the other server has signed it and sent it back.
+
+ Args:
+ origin: The (verified) server name of the requesting server.
+ room_id: The room to create the knock event in.
+ user_id: The user to create the knock for.
+
+ Returns:
+ The partial knock event.
"""
+ if get_domain_from_id(user_id) != origin:
+ logger.info(
+ "Get /xyz.amorgan.knock/make_knock request for user %r"
+ "from different origin %s, ignoring",
+ user_id,
+ origin,
+ )
+ raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
+ room_version = await self.store.get_room_version_id(room_id)
+
+ builder = self.event_builder_factory.new(
+ room_version,
+ {
+ "type": EventTypes.Member,
+ "content": {"membership": Membership.KNOCK},
+ "room_id": room_id,
+ "sender": user_id,
+ "state_key": user_id,
+ },
+ )
+
+ event, context = await self.event_creation_handler.create_new_client_event(
+ builder=builder
+ )
+
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
+ event, context
+ )
+ if not event_allowed:
+ logger.warning("Creation of knock %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN
+ )
+
+ try:
+ # The remote hasn't signed it yet, obviously. We'll do the full checks
+ # when we get the event back in `on_send_knock_request`
+ await self.auth.check_from_context(
+ room_version, event, context, do_sig_check=False
+ )
+ except AuthError as e:
+ logger.warning("Failed to create new knock %r because %s", event, e)
+ raise e
+
+ return event
+
+ @log_function
+ async def on_send_knock_request(
+ self, origin: str, event: EventBase
+ ) -> EventContext:
+ """
+ We have received a knock event for a room. Verify that event and send it into the room
+ on the knocking homeserver's behalf.
+
+ Args:
+ origin: The remote homeserver of the knocking user.
+ event: The knocking member event that has been signed by the remote homeserver.
+
+ Returns:
+ The context of the event after inserting it into the room graph.
+ """
+ logger.debug(
+ "on_send_knock_request: Got event: %s, signatures: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ if get_domain_from_id(event.sender) != origin:
+ logger.info(
+ "Got /xyz.amorgan.knock/send_knock request for user %r "
+ "from different origin %s",
+ event.sender,
+ origin,
+ )
+ raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
+ event.internal_metadata.outlier = False
+
+ context = await self._handle_new_event(origin, event)
+
+ event_allowed = await self.third_party_event_rules.check_event_allowed(
+ event, context
+ )
+ if not event_allowed:
+ logger.info("Sending of knock %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN
+ )
+
+ logger.debug(
+ "on_send_knock_request: After _handle_new_event: %s, sigs: %s",
+ event.event_id,
+ event.signatures,
+ )
+
+ return context
+
+ async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
+ """Returns the state at the event. i.e. not including said event."""
event = await self.store.get_event(event_id, check_room_id=room_id)
@@ -1809,8 +2007,7 @@ class FederationHandler(BaseHandler):
return []
async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
- """Returns the state at the event. i.e. not including said event.
- """
+ """Returns the state at the event. i.e. not including said event."""
event = await self.store.get_event(event_id, check_room_id=room_id)
state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
@@ -2016,7 +2213,11 @@ class FederationHandler(BaseHandler):
for e_id in missing_auth_events:
m_ev = await self.federation_client.get_pdu(
- [origin], e_id, room_version=room_version, outlier=True, timeout=10000,
+ [origin],
+ e_id,
+ room_version=room_version,
+ outlier=True,
+ timeout=10000,
)
if m_ev and m_ev.event_id == e_id:
event_map[e_id] = m_ev
@@ -2166,7 +2367,9 @@ class FederationHandler(BaseHandler):
)
logger.debug(
- "Doing soft-fail check for %s: state %s", event.event_id, current_state_ids,
+ "Doing soft-fail check for %s: state %s",
+ event.event_id,
+ current_state_ids,
)
# Now check if event pass auth against said current state
@@ -2519,7 +2722,7 @@ class FederationHandler(BaseHandler):
async def construct_auth_difference(
self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase]
) -> Dict:
- """ Given a local and remote auth chain, find the differences. This
+ """Given a local and remote auth chain, find the differences. This
assumes that we have already processed all events in remote_auth
Params:
|