diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 623a1f0801..c6346c680b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -550,29 +550,30 @@ class FederationHandler(BaseHandler):
else:
raise
- @defer.inlineCallbacks
- @log_function
- def _get_state_for_room(
- self, destination, room_id, event_id, include_event_in_state
- ):
+ async def _get_state_for_room(
+ self,
+ destination: str,
+ room_id: str,
+ event_id: str,
+ include_event_in_state: bool = False,
+ ) -> Tuple[List[EventBase], List[EventBase]]:
"""Requests all of the room state at a given event from a remote homeserver.
Args:
- destination (str): The remote homeserver to query for the state.
- room_id (str): The id of the room we're interested in.
- event_id (str): The id of the event we want the state at.
+ destination: The remote homeserver to query for the state.
+ room_id: The id of the room we're interested in.
+ event_id: The id of the event we want the state at.
include_event_in_state: if true, the event itself will be included in the
returned state event list.
Returns:
- Deferred[Tuple[List[EventBase], List[EventBase]]]:
- A list of events in the state, and a list of events in the auth chain
- for the given event.
+ A list of events in the state, possibly including the event itself, and
+ a list of events in the auth chain for the given event.
"""
(
state_event_ids,
auth_event_ids,
- ) = yield self.federation_client.get_room_state_ids(
+ ) = await self.federation_client.get_room_state_ids(
destination, room_id, event_id=event_id
)
@@ -581,15 +582,15 @@ class FederationHandler(BaseHandler):
if include_event_in_state:
desired_events.add(event_id)
- event_map = yield self._get_events_from_store_or_dest(
+ event_map = await self._get_events_from_store_or_dest(
destination, room_id, desired_events
)
failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
- "Failed to fetch missing state/auth events for %s: %s",
- room_id,
+ "Failed to fetch missing state/auth events for %s %s",
+ event_id,
failed_to_fetch,
)
@@ -609,15 +610,11 @@ class FederationHandler(BaseHandler):
return remote_state, auth_chain
- @defer.inlineCallbacks
- def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
+ async def _get_events_from_store_or_dest(
+ self, destination: str, room_id: str, event_ids: Iterable[str]
+ ) -> Dict[str, EventBase]:
"""Fetch events from a remote destination, checking if we already have them.
- Args:
- destination (str)
- room_id (str)
- event_ids (Iterable[str])
-
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
@@ -625,10 +622,9 @@ class FederationHandler(BaseHandler):
be in the given room.
Returns:
- Deferred[dict[str, EventBase]]: A deferred resolving to a map
- from event_id to event
+ map from event_id to event
"""
- fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
+ fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
missing_events = set(event_ids) - fetched_events.keys()
@@ -639,14 +635,14 @@ class FederationHandler(BaseHandler):
room_id,
)
- yield self._get_events_and_persist(
+ await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
- (yield self.store.get_events(missing_events, allow_rejected=True))
+ (await self.store.get_events(missing_events, allow_rejected=True))
)
# check for events which were in the wrong room.
@@ -672,12 +668,14 @@ class FederationHandler(BaseHandler):
bad_room_id,
room_id,
)
+
del fetched_events[bad_event_id]
return fetched_events
- @defer.inlineCallbacks
- def _process_received_pdu(self, origin, event, state):
+ async def _process_received_pdu(
+ self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]],
+ ):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
@@ -696,15 +694,15 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
try:
- context = yield self._handle_new_event(origin, event, state=state)
+ context = await self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- room = yield self.store.get_room(room_id)
+ room = await self.store.get_room(room_id)
if not room:
try:
- yield self.store.store_room(
+ await self.store.store_room(
room_id=room_id, room_creator_user_id="", is_public=False
)
except StoreError:
@@ -717,11 +715,11 @@ class FederationHandler(BaseHandler):
# changing their profile info.
newly_joined = True
- prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = await context.get_prev_state_ids(self.store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id:
- prev_state = yield self.store.get_event(
+ prev_state = await self.store.get_event(
prev_state_id, allow_none=True
)
if prev_state and prev_state.membership == Membership.JOIN:
@@ -729,7 +727,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, room_id)
+ await self.user_joined_room(user, room_id)
@log_function
async def backfill(self, dest, room_id, limit, extremities):
|