summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py68
1 files changed, 33 insertions, 35 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py

index 623a1f0801..c6346c680b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -550,29 +550,30 @@ class FederationHandler(BaseHandler): else: raise - @defer.inlineCallbacks - @log_function - def _get_state_for_room( - self, destination, room_id, event_id, include_event_in_state - ): + async def _get_state_for_room( + self, + destination: str, + room_id: str, + event_id: str, + include_event_in_state: bool = False, + ) -> Tuple[List[EventBase], List[EventBase]]: """Requests all of the room state at a given event from a remote homeserver. Args: - destination (str): The remote homeserver to query for the state. - room_id (str): The id of the room we're interested in. - event_id (str): The id of the event we want the state at. + destination: The remote homeserver to query for the state. + room_id: The id of the room we're interested in. + event_id: The id of the event we want the state at. include_event_in_state: if true, the event itself will be included in the returned state event list. Returns: - Deferred[Tuple[List[EventBase], List[EventBase]]]: - A list of events in the state, and a list of events in the auth chain - for the given event. + A list of events in the state, possibly including the event itself, and + a list of events in the auth chain for the given event. """ ( state_event_ids, auth_event_ids, - ) = yield self.federation_client.get_room_state_ids( + ) = await self.federation_client.get_room_state_ids( destination, room_id, event_id=event_id ) @@ -581,15 +582,15 @@ class FederationHandler(BaseHandler): if include_event_in_state: desired_events.add(event_id) - event_map = yield self._get_events_from_store_or_dest( + event_map = await self._get_events_from_store_or_dest( destination, room_id, desired_events ) failed_to_fetch = desired_events - event_map.keys() if failed_to_fetch: logger.warning( - "Failed to fetch missing state/auth events for %s: %s", - room_id, + "Failed to fetch missing state/auth events for %s %s", + event_id, failed_to_fetch, ) @@ -609,15 +610,11 @@ class FederationHandler(BaseHandler): return remote_state, auth_chain - @defer.inlineCallbacks - def _get_events_from_store_or_dest(self, destination, room_id, event_ids): + async def _get_events_from_store_or_dest( + self, destination: str, room_id: str, event_ids: Iterable[str] + ) -> Dict[str, EventBase]: """Fetch events from a remote destination, checking if we already have them. - Args: - destination (str) - room_id (str) - event_ids (Iterable[str]) - Persists any events we don't already have as outliers. If we fail to fetch any of the events, a warning will be logged, and the event @@ -625,10 +622,9 @@ class FederationHandler(BaseHandler): be in the given room. Returns: - Deferred[dict[str, EventBase]]: A deferred resolving to a map - from event_id to event + map from event_id to event """ - fetched_events = yield self.store.get_events(event_ids, allow_rejected=True) + fetched_events = await self.store.get_events(event_ids, allow_rejected=True) missing_events = set(event_ids) - fetched_events.keys() @@ -639,14 +635,14 @@ class FederationHandler(BaseHandler): room_id, ) - yield self._get_events_and_persist( + await self._get_events_and_persist( destination=destination, room_id=room_id, events=missing_events ) # we need to make sure we re-load from the database to get the rejected # state correct. fetched_events.update( - (yield self.store.get_events(missing_events, allow_rejected=True)) + (await self.store.get_events(missing_events, allow_rejected=True)) ) # check for events which were in the wrong room. @@ -672,12 +668,14 @@ class FederationHandler(BaseHandler): bad_room_id, room_id, ) + del fetched_events[bad_event_id] return fetched_events - @defer.inlineCallbacks - def _process_received_pdu(self, origin, event, state): + async def _process_received_pdu( + self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]], + ): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. @@ -696,15 +694,15 @@ class FederationHandler(BaseHandler): logger.debug("[%s %s] Processing event: %s", room_id, event_id, event) try: - context = yield self._handle_new_event(origin, event, state=state) + context = await self._handle_new_event(origin, event, state=state) except AuthError as e: raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) - room = yield self.store.get_room(room_id) + room = await self.store.get_room(room_id) if not room: try: - yield self.store.store_room( + await self.store.store_room( room_id=room_id, room_creator_user_id="", is_public=False ) except StoreError: @@ -717,11 +715,11 @@ class FederationHandler(BaseHandler): # changing their profile info. newly_joined = True - prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = await context.get_prev_state_ids(self.store) prev_state_id = prev_state_ids.get((event.type, event.state_key)) if prev_state_id: - prev_state = yield self.store.get_event( + prev_state = await self.store.get_event( prev_state_id, allow_none=True ) if prev_state and prev_state.membership == Membership.JOIN: @@ -729,7 +727,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, room_id) + await self.user_joined_room(user, room_id) @log_function async def backfill(self, dest, room_id, limit, extremities):