diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 83 |
1 files changed, 48 insertions, 35 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4aec3563ac..195f7c618a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014 OpenMarket Ltd +# Copyright 2014, 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -91,11 +91,12 @@ class FederationHandler(BaseHandler): yield run_on_reactor() - yield self.replication_layer.send_pdu(event, destinations) + self.replication_layer.send_pdu(event, destinations) @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, backfilled, state=None): + def on_receive_pdu(self, origin, pdu, backfilled, state=None, + auth_chain=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -150,40 +151,41 @@ class FederationHandler(BaseHandler): if not is_in_room and not event.internal_metadata.outlier: logger.debug("Got event for room we're not in.") - replication_layer = self.replication_layer - auth_chain = yield replication_layer.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) + replication = self.replication_layer + + if not state: + state, auth_chain = yield replication.get_state_for_context( + origin, context=event.room_id, event_id=event.event_id, + ) + + if not auth_chain: + auth_chain = yield replication.get_event_auth( + origin, + context=event.room_id, + event_id=event.event_id, + ) for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_missing=False) + yield self._handle_new_event(e, fetch_auth_from=origin) except: logger.exception( - "Failed to parse auth event %s", + "Failed to handle auth event %s", e.event_id, ) - if not state: - state = yield replication_layer.get_state_for_context( - origin, - context=event.room_id, - event_id=event.event_id, - ) - current_state = state if state: for e in state: + logging.info("A :) %r", e) e.internal_metadata.outlier = True try: yield self._handle_new_event(e) except: logger.exception( - "Failed to parse state event %s", + "Failed to handle state event %s", e.event_id, ) @@ -288,7 +290,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_event_auth(self, event_id): - auth = yield self.store.get_auth_chain(event_id) + auth = yield self.store.get_auth_chain([event_id]) for event in auth: event.signatures.update( @@ -391,10 +393,10 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_missing=False) + yield self._handle_new_event(e) except: logger.exception( - "Failed to parse auth event %s", + "Failed to handle auth event %s", e.event_id, ) @@ -403,12 +405,11 @@ class FederationHandler(BaseHandler): e.internal_metadata.outlier = True try: yield self._handle_new_event( - e, - fetch_missing=True + e, fetch_auth_from=target_host ) except: logger.exception( - "Failed to parse state event %s", + "Failed to handle state event %s", e.event_id, ) @@ -526,9 +527,12 @@ class FederationHandler(BaseHandler): event.signatures, ) - yield self.replication_layer.send_pdu(new_pdu, destinations) + self.replication_layer.send_pdu(new_pdu, destinations) - auth_chain = yield self.store.get_auth_chain(event.event_id) + state_ids = [e.event_id for e in context.current_state.values()] + auth_chain = yield self.store.get_auth_chain(set( + [event.event_id] + state_ids + )) defer.returnValue({ "state": context.current_state.values(), @@ -613,13 +617,13 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_backfill_request(self, origin, context, pdu_list, limit): - in_room = yield self.auth.check_host_in_room(context, origin) + def on_backfill_request(self, origin, room_id, pdu_list, limit): + in_room = yield self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") events = yield self.store.get_backfill_events( - context, + room_id, pdu_list, limit ) @@ -678,7 +682,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_missing=True): + current_state=None, fetch_auth_from=None): logger.debug( "_handle_new_event: Before annotate: %s, sigs: %s", @@ -699,11 +703,20 @@ class FederationHandler(BaseHandler): known_ids = set( [s.event_id for s in context.auth_events.values()] ) + for e_id, _ in event.auth_events: if e_id not in known_ids: - e = yield self.store.get_event( - e_id, allow_none=True, - ) + e = yield self.store.get_event(e_id, allow_none=True) + + if not e and fetch_auth_from is not None: + # Grab the auth_chain over federation if we are missing + # auth events. + auth_chain = yield self.replication_layer.get_event_auth( + fetch_auth_from, event.event_id, event.room_id + ) + for auth_event in auth_chain: + yield self._handle_new_event(auth_event) + e = yield self.store.get_event(e_id, allow_none=True) if not e: # TODO: Do some conflict res to make sure that we're @@ -713,7 +726,7 @@ class FederationHandler(BaseHandler): event.event_id, e_id, known_ids, ) # FIXME: How does raising AuthError work with federation? - raise AuthError(403, "Auth events are stale") + raise AuthError(403, "Cannot find auth event") context.auth_events[(e.type, e.state_key)] = e |