diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ca7da42a3f..e43bccd721 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -61,6 +61,7 @@ from synapse.logging.context import (
run_in_background,
)
from synapse.logging.utils import log_function
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
@@ -618,6 +619,11 @@ class FederationHandler(BaseHandler):
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
+ This function *does not* automatically get missing auth events of the
+ newly fetched events. Callers must include the full auth chain of
+ of the missing events in the `event_ids` argument, to ensure that any
+ missing auth events are correctly fetched.
+
Returns:
map from event_id to event
"""
@@ -784,15 +790,25 @@ class FederationHandler(BaseHandler):
resync = True
if resync:
- await self.store.mark_remote_user_device_cache_as_stale(event.sender)
+ run_as_background_process(
+ "resync_device_due_to_pdu", self._resync_device, event.sender
+ )
- # Immediately attempt a resync in the background
- if self.config.worker_app:
- return run_in_background(self._user_device_resync, event.sender)
- else:
- return run_in_background(
- self._device_list_updater.user_device_resync, event.sender
- )
+ async def _resync_device(self, sender: str) -> None:
+ """We have detected that the device list for the given user may be out
+ of sync, so we try and resync them.
+ """
+
+ try:
+ await self.store.mark_remote_user_device_cache_as_stale(sender)
+
+ # Immediately attempt a resync in the background
+ if self.config.worker_app:
+ await self._user_device_resync(user_id=sender)
+ else:
+ await self._device_list_updater.user_device_resync(sender)
+ except Exception:
+ logger.exception("Failed to resync device for %s", sender)
@log_function
async def backfill(self, dest, room_id, limit, extremities):
@@ -1131,12 +1147,16 @@ class FederationHandler(BaseHandler):
):
"""Fetch the given events from a server, and persist them as outliers.
+ This function *does not* recursively get missing auth events of the
+ newly fetched events. Callers must include in the `events` argument
+ any missing events from the auth chain.
+
Logs a warning if we can't find the given event.
"""
room_version = await self.store.get_room_version(room_id)
- event_infos = []
+ event_map = {} # type: Dict[str, EventBase]
async def get_event(event_id: str):
with nested_logging_context(event_id):
@@ -1150,17 +1170,7 @@ class FederationHandler(BaseHandler):
)
return
- # recursively fetch the auth events for this event
- auth_events = await self._get_events_from_store_or_dest(
- destination, room_id, event.auth_event_ids()
- )
- auth = {}
- for auth_event_id in event.auth_event_ids():
- ae = auth_events.get(auth_event_id)
- if ae:
- auth[(ae.type, ae.state_key)] = ae
-
- event_infos.append(_NewEventInfo(event, None, auth))
+ event_map[event.event_id] = event
except Exception as e:
logger.warning(
@@ -1172,6 +1182,32 @@ class FederationHandler(BaseHandler):
await concurrently_execute(get_event, events, 5)
+ # Make a map of auth events for each event. We do this after fetching
+ # all the events as some of the events' auth events will be in the list
+ # of requested events.
+
+ auth_events = [
+ aid
+ for event in event_map.values()
+ for aid in event.auth_event_ids()
+ if aid not in event_map
+ ]
+ persisted_events = await self.store.get_events(
+ auth_events, allow_rejected=True,
+ )
+
+ event_infos = []
+ for event in event_map.values():
+ auth = {}
+ for auth_event_id in event.auth_event_ids():
+ ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+ if ae:
+ auth[(ae.type, ae.state_key)] = ae
+ else:
+ logger.info("Missing auth event %s", auth_event_id)
+
+ event_infos.append(_NewEventInfo(event, None, auth))
+
await self._handle_new_events(
destination, event_infos,
)
|