diff options
author | Richard van der Hoff <richard@matrix.org> | 2019-12-10 16:54:34 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2019-12-11 14:39:25 +0000 |
commit | 7712e751b87b83086a8cb0a1cda1eef40e177d07 (patch) | |
tree | 697d3ca8b471fb29b6e660f231a3e6fd19bc0595 /synapse/handlers/federation.py | |
parent | Clean up some logging (#6515) (diff) | |
download | synapse-7712e751b87b83086a8cb0a1cda1eef40e177d07.tar.xz |
Convert federation backfill to async
PaginationHandler.get_messages is only called by RoomMessageListRestServlet, which is async. Chase the code path down from there: - FederationHandler.maybe_backfill (and nested try_backfill) - FederationHandler.backfill
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 47 |
1 files changed, 22 insertions, 25 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cf9c46d027..e54d509b62 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -756,8 +756,7 @@ class FederationHandler(BaseHandler): yield self.user_joined_room(user, room_id) @log_function - @defer.inlineCallbacks - def backfill(self, dest, room_id, limit, extremities): + async def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` This will attempt to get more events from the remote. If the other side @@ -774,9 +773,9 @@ class FederationHandler(BaseHandler): if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") - room_version = yield self.store.get_room_version(room_id) + room_version = await self.store.get_room_version(room_id) - events = yield self.federation_client.backfill( + events = await self.federation_client.backfill( dest, room_id, limit=limit, extremities=extremities ) @@ -791,7 +790,7 @@ class FederationHandler(BaseHandler): # self._sanity_check_event(ev) # Don't bother processing events we already have. - seen_events = yield self.store.have_events_in_timeline( + seen_events = await self.store.have_events_in_timeline( set(e.event_id for e in events) ) @@ -814,7 +813,7 @@ class FederationHandler(BaseHandler): state_events = {} events_to_state = {} for e_id in edges: - state, auth = yield self._get_state_for_room( + state, auth = await self._get_state_for_room( destination=dest, room_id=room_id, event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) @@ -839,7 +838,7 @@ class FederationHandler(BaseHandler): # We repeatedly do this until we stop finding new auth events. while missing_auth - failed_to_fetch: logger.info("Missing auth for backfill: %r", missing_auth) - ret_events = yield self.store.get_events(missing_auth - failed_to_fetch) + ret_events = await self.store.get_events(missing_auth - failed_to_fetch) auth_events.update(ret_events) required_auth.update( @@ -853,7 +852,7 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch, ) - results = yield make_deferred_yieldable( + results = await make_deferred_yieldable( defer.gatherResults( [ run_in_background( @@ -880,7 +879,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_seen_events( + seen_events = await self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -942,7 +941,7 @@ class FederationHandler(BaseHandler): ) ) - yield self._handle_new_events(dest, ev_infos, backfilled=True) + await self._handle_new_events(dest, ev_infos, backfilled=True) # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) @@ -958,16 +957,15 @@ class FederationHandler(BaseHandler): # We store these one at a time since each event depends on the # previous to work out the state. # TODO: We can probably do something more clever here. - yield self._handle_new_event(dest, event, backfilled=True) + await self._handle_new_event(dest, event, backfilled=True) return events - @defer.inlineCallbacks - def maybe_backfill(self, room_id, current_depth): + async def maybe_backfill(self, room_id, current_depth): """Checks the database to see if we should backfill before paginating, and if so do. """ - extremities = yield self.store.get_oldest_events_with_depth_in_room(room_id) + extremities = await self.store.get_oldest_events_with_depth_in_room(room_id) if not extremities: logger.debug("Not backfilling as no extremeties found.") @@ -999,9 +997,9 @@ class FederationHandler(BaseHandler): # state *before* the event, ignoring the special casing certain event # types have. - forward_events = yield self.store.get_successor_events(list(extremities)) + forward_events = await self.store.get_successor_events(list(extremities)) - extremities_events = yield self.store.get_events( + extremities_events = await self.store.get_events( forward_events, redact_behaviour=EventRedactBehaviour.AS_IS, get_prev_content=False, @@ -1009,7 +1007,7 @@ class FederationHandler(BaseHandler): # We set `check_history_visibility_only` as we might otherwise get false # positives from users having been erased. - filtered_extremities = yield filter_events_for_server( + filtered_extremities = await filter_events_for_server( self.storage, self.server_name, list(extremities_events.values()), @@ -1039,7 +1037,7 @@ class FederationHandler(BaseHandler): # First we try hosts that are already in the room # TODO: HEURISTIC ALERT. - curr_state = yield self.state_handler.get_current_state(room_id) + curr_state = await self.state_handler.get_current_state(room_id) def get_domains_from_state(state): """Get joined domains from state @@ -1078,12 +1076,11 @@ class FederationHandler(BaseHandler): domain for domain, depth in curr_domains if domain != self.server_name ] - @defer.inlineCallbacks - def try_backfill(domains): + async def try_backfill(domains): # TODO: Should we try multiple of these at a time? for dom in domains: try: - yield self.backfill( + await self.backfill( dom, room_id, limit=100, extremities=extremities ) # If this succeeded then we probably already have the @@ -1114,7 +1111,7 @@ class FederationHandler(BaseHandler): return False - success = yield try_backfill(likely_domains) + success = await try_backfill(likely_domains) if success: return True @@ -1128,7 +1125,7 @@ class FederationHandler(BaseHandler): logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events) - states = yield make_deferred_yieldable( + states = await make_deferred_yieldable( defer.gatherResults( [resolve(room_id, [e]) for e in event_ids], consumeErrors=True ) @@ -1138,7 +1135,7 @@ class FederationHandler(BaseHandler): # event_ids. states = dict(zip(event_ids, [s.state for s in states])) - state_map = yield self.store.get_events( + state_map = await self.store.get_events( [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False, ) @@ -1154,7 +1151,7 @@ class FederationHandler(BaseHandler): for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id]) - success = yield try_backfill( + success = await try_backfill( [dom for dom, _ in likely_domains if dom not in tried_domains] ) if success: |