diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3fccccfecd..623a1f0801 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -338,12 +338,6 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- logger.info(
- "Event %s is missing prev_events: calculating state for a "
- "backwards extremity",
- event_id,
- )
-
# Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event.
event_map = {event_id: pdu}
@@ -363,7 +357,10 @@ class FederationHandler(BaseHandler):
# know about
for p in prevs - seen:
logger.info(
- "Requesting state at missing prev_event %s", event_id,
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id,
+ event_id,
+ p,
)
with nested_logging_context(p):
@@ -399,7 +396,7 @@ class FederationHandler(BaseHandler):
evs = await self.store.get_events(
list(state_map.values()),
get_prev_content=False,
- redact_behaviour=EventRedactBehaviour.AS_IS,
+ check_redacted=False,
)
event_map.update(evs)
@@ -553,30 +550,29 @@ class FederationHandler(BaseHandler):
else:
raise
- async def _get_state_for_room(
- self,
- destination: str,
- room_id: str,
- event_id: str,
- include_event_in_state: bool = False,
- ) -> Tuple[List[EventBase], List[EventBase]]:
+ @defer.inlineCallbacks
+ @log_function
+ def _get_state_for_room(
+ self, destination, room_id, event_id, include_event_in_state
+ ):
"""Requests all of the room state at a given event from a remote homeserver.
Args:
- destination: The remote homeserver to query for the state.
- room_id: The id of the room we're interested in.
- event_id: The id of the event we want the state at.
+ destination (str): The remote homeserver to query for the state.
+ room_id (str): The id of the room we're interested in.
+ event_id (str): The id of the event we want the state at.
include_event_in_state: if true, the event itself will be included in the
returned state event list.
Returns:
- A list of events in the state, possibly including the event itself, and
- a list of events in the auth chain for the given event.
+ Deferred[Tuple[List[EventBase], List[EventBase]]]:
+ A list of events in the state, and a list of events in the auth chain
+ for the given event.
"""
(
state_event_ids,
auth_event_ids,
- ) = await self.federation_client.get_room_state_ids(
+ ) = yield self.federation_client.get_room_state_ids(
destination, room_id, event_id=event_id
)
@@ -585,15 +581,15 @@ class FederationHandler(BaseHandler):
if include_event_in_state:
desired_events.add(event_id)
- event_map = await self._get_events_from_store_or_dest(
+ event_map = yield self._get_events_from_store_or_dest(
destination, room_id, desired_events
)
failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
- "Failed to fetch missing state/auth events for %s %s",
- event_id,
+ "Failed to fetch missing state/auth events for %s: %s",
+ room_id,
failed_to_fetch,
)
@@ -613,11 +609,15 @@ class FederationHandler(BaseHandler):
return remote_state, auth_chain
- async def _get_events_from_store_or_dest(
- self, destination: str, room_id: str, event_ids: Iterable[str]
- ) -> Dict[str, EventBase]:
+ @defer.inlineCallbacks
+ def _get_events_from_store_or_dest(self, destination, room_id, event_ids):
"""Fetch events from a remote destination, checking if we already have them.
+ Args:
+ destination (str)
+ room_id (str)
+ event_ids (Iterable[str])
+
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
@@ -625,9 +625,10 @@ class FederationHandler(BaseHandler):
be in the given room.
Returns:
- map from event_id to event
+ Deferred[dict[str, EventBase]]: A deferred resolving to a map
+ from event_id to event
"""
- fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
+ fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
missing_events = set(event_ids) - fetched_events.keys()
@@ -638,14 +639,14 @@ class FederationHandler(BaseHandler):
room_id,
)
- await self._get_events_and_persist(
+ yield self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
- (await self.store.get_events(missing_events, allow_rejected=True))
+ (yield self.store.get_events(missing_events, allow_rejected=True))
)
# check for events which were in the wrong room.
@@ -675,9 +676,8 @@ class FederationHandler(BaseHandler):
return fetched_events
- async def _process_received_pdu(
- self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]],
- ):
+ @defer.inlineCallbacks
+ def _process_received_pdu(self, origin, event, state):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
@@ -696,15 +696,15 @@ class FederationHandler(BaseHandler):
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
try:
- context = await self._handle_new_event(origin, event, state=state)
+ context = yield self._handle_new_event(origin, event, state=state)
except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
- room = await self.store.get_room(room_id)
+ room = yield self.store.get_room(room_id)
if not room:
try:
- await self.store.store_room(
+ yield self.store.store_room(
room_id=room_id, room_creator_user_id="", is_public=False
)
except StoreError:
@@ -717,11 +717,11 @@ class FederationHandler(BaseHandler):
# changing their profile info.
newly_joined = True
- prev_state_ids = await context.get_prev_state_ids(self.store)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id:
- prev_state = await self.store.get_event(
+ prev_state = yield self.store.get_event(
prev_state_id, allow_none=True
)
if prev_state and prev_state.membership == Membership.JOIN:
@@ -729,7 +729,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- await self.user_joined_room(user, room_id)
+ yield self.user_joined_room(user, room_id)
@log_function
async def backfill(self, dest, room_id, limit, extremities):
@@ -1444,8 +1444,15 @@ class FederationHandler(BaseHandler):
if self.hs.config.block_non_admin_invites:
raise SynapseError(403, "This server does not accept room invites")
+ is_published = yield self.store.is_room_published(event.room_id)
+
if not self.spam_checker.user_may_invite(
- event.sender, event.state_key, event.room_id
+ event.sender,
+ event.state_key,
+ None,
+ room_id=event.room_id,
+ new_room=False,
+ published_room=is_published,
):
raise SynapseError(
403, "This user is not permitted to send invites to this server/user"
|