diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 102 |
1 files changed, 61 insertions, 41 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5581e06bb4..2ead626a4d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -111,13 +111,13 @@ class _NewEventInfo: class FederationHandler(BaseHandler): """Handles events that originated from federation. - Responsible for: - a) handling received Pdus before handing them on as Events to the rest - of the homeserver (including auth and state conflict resolutions) - b) converting events that were produced by local clients that may need - to be sent to remote homeservers. - c) doing the necessary dances to invite remote users and join remote - rooms. + Responsible for: + a) handling received Pdus before handing them on as Events to the rest + of the homeserver (including auth and state conflict resolutions) + b) converting events that were produced by local clients that may need + to be sent to remote homeservers. + c) doing the necessary dances to invite remote users and join remote + rooms. """ def __init__(self, hs: "HomeServer"): @@ -150,11 +150,11 @@ class FederationHandler(BaseHandler): ) if hs.config.worker_app: - self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client( - hs + self._user_device_resync = ( + ReplicationUserDevicesResyncRestServlet.make_client(hs) ) - self._maybe_store_room_on_outlier_membership = ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client( - hs + self._maybe_store_room_on_outlier_membership = ( + ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs) ) else: self._device_list_updater = hs.get_device_handler().device_list_updater @@ -172,7 +172,7 @@ class FederationHandler(BaseHandler): self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None: - """ Process a PDU received via a federation /send/ transaction, or + """Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events Args: @@ -368,7 +368,8 @@ class FederationHandler(BaseHandler): # know about for p in prevs - seen: logger.info( - "Requesting state at missing prev_event %s", event_id, + "Requesting state at missing prev_event %s", + event_id, ) with nested_logging_context(p): @@ -388,12 +389,14 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x room_version = await self.store.get_room_version_id(room_id) - state_map = await self._state_resolution_handler.resolve_events_with_store( - room_id, - room_version, - state_maps, - event_map, - state_res_store=StateResolutionStore(self.store), + state_map = ( + await self._state_resolution_handler.resolve_events_with_store( + room_id, + room_version, + state_maps, + event_map, + state_res_store=StateResolutionStore(self.store), + ) ) # We need to give _process_received_pdu the actual state events @@ -687,9 +690,12 @@ class FederationHandler(BaseHandler): return fetched_events async def _process_received_pdu( - self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]], + self, + origin: str, + event: EventBase, + state: Optional[Iterable[EventBase]], ): - """ Called when we have a new pdu. We need to do auth checks and put it + """Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. Args: @@ -801,7 +807,7 @@ class FederationHandler(BaseHandler): @log_function async def backfill(self, dest, room_id, limit, extremities): - """ Trigger a backfill request to `dest` for the given `room_id` + """Trigger a backfill request to `dest` for the given `room_id` This will attempt to get more events from the remote. If the other side has no new events to offer, this will return an empty list. @@ -1204,11 +1210,16 @@ class FederationHandler(BaseHandler): with nested_logging_context(event_id): try: event = await self.federation_client.get_pdu( - [destination], event_id, room_version, outlier=True, + [destination], + event_id, + room_version, + outlier=True, ) if event is None: logger.warning( - "Server %s didn't return event %s", destination, event_id, + "Server %s didn't return event %s", + destination, + event_id, ) return @@ -1235,7 +1246,8 @@ class FederationHandler(BaseHandler): if aid not in event_map ] persisted_events = await self.store.get_events( - auth_events, allow_rejected=True, + auth_events, + allow_rejected=True, ) event_infos = [] @@ -1251,7 +1263,9 @@ class FederationHandler(BaseHandler): event_infos.append(_NewEventInfo(event, None, auth)) await self._handle_new_events( - destination, room_id, event_infos, + destination, + room_id, + event_infos, ) def _sanity_check_event(self, ev): @@ -1287,7 +1301,7 @@ class FederationHandler(BaseHandler): raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") async def send_invite(self, target_host, event): - """ Sends the invite to the remote server for signing. + """Sends the invite to the remote server for signing. Invites must be signed by the invitee's server before distribution. """ @@ -1310,7 +1324,7 @@ class FederationHandler(BaseHandler): async def do_invite_join( self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict ) -> Tuple[str, int]: - """ Attempts to join the `joinee` to the room `room_id` via the + """Attempts to join the `joinee` to the room `room_id` via the servers contained in `target_hosts`. This first triggers a /make_join/ request that returns a partial @@ -1388,7 +1402,8 @@ class FederationHandler(BaseHandler): # so we can rely on it now. # await self.store.upsert_room_on_join( - room_id=room_id, room_version=room_version_obj, + room_id=room_id, + room_version=room_version_obj, ) max_stream_id = await self._persist_auth_tree( @@ -1458,7 +1473,7 @@ class FederationHandler(BaseHandler): async def on_make_join_request( self, origin: str, room_id: str, user_id: str ) -> EventBase: - """ We've received a /make_join/ request, so we create a partial + """We've received a /make_join/ request, so we create a partial join event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. @@ -1483,7 +1498,8 @@ class FederationHandler(BaseHandler): is_in_room = await self.auth.check_host_in_room(room_id, self.server_name) if not is_in_room: logger.info( - "Got /make_join request for room %s we are no longer in", room_id, + "Got /make_join request for room %s we are no longer in", + room_id, ) raise NotFoundError("Not an active room on this server") @@ -1517,7 +1533,7 @@ class FederationHandler(BaseHandler): return event async def on_send_join_request(self, origin, pdu): - """ We have received a join event for a room. Fully process it and + """We have received a join event for a room. Fully process it and respond with the current state and auth chains. """ event = pdu @@ -1573,7 +1589,7 @@ class FederationHandler(BaseHandler): async def on_invite_request( self, origin: str, event: EventBase, room_version: RoomVersion ): - """ We've got an invite event. Process and persist it. Sign it. + """We've got an invite event. Process and persist it. Sign it. Respond with the now signed event. """ @@ -1700,7 +1716,7 @@ class FederationHandler(BaseHandler): async def on_make_leave_request( self, origin: str, room_id: str, user_id: str ) -> EventBase: - """ We've received a /make_leave/ request, so we create a partial + """We've received a /make_leave/ request, so we create a partial leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. @@ -1776,8 +1792,7 @@ class FederationHandler(BaseHandler): return None async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]: - """Returns the state at the event. i.e. not including said event. - """ + """Returns the state at the event. i.e. not including said event.""" event = await self.store.get_event(event_id, check_room_id=room_id) @@ -1803,8 +1818,7 @@ class FederationHandler(BaseHandler): return [] async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]: - """Returns the state at the event. i.e. not including said event. - """ + """Returns the state at the event. i.e. not including said event.""" event = await self.store.get_event(event_id, check_room_id=room_id) state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id]) @@ -2010,7 +2024,11 @@ class FederationHandler(BaseHandler): for e_id in missing_auth_events: m_ev = await self.federation_client.get_pdu( - [origin], e_id, room_version=room_version, outlier=True, timeout=10000, + [origin], + e_id, + room_version=room_version, + outlier=True, + timeout=10000, ) if m_ev and m_ev.event_id == e_id: event_map[e_id] = m_ev @@ -2160,7 +2178,9 @@ class FederationHandler(BaseHandler): ) logger.debug( - "Doing soft-fail check for %s: state %s", event.event_id, current_state_ids, + "Doing soft-fail check for %s: state %s", + event.event_id, + current_state_ids, ) # Now check if event pass auth against said current state @@ -2513,7 +2533,7 @@ class FederationHandler(BaseHandler): async def construct_auth_difference( self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase] ) -> Dict: - """ Given a local and remote auth chain, find the differences. This + """Given a local and remote auth chain, find the differences. This assumes that we have already processed all events in remote_auth Params: |