diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 141 |
1 files changed, 112 insertions, 29 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcef602055..925eb5376e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -101,7 +101,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled, state=None): + def on_receive_pdu(self, origin, pdu, backfilled, state=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -112,7 +112,7 @@ class FederationHandler(BaseHandler): # If we are currently in the process of joining this room, then we # queue up events for later processing. if event.room_id in self.room_queues: - self.room_queues[event.room_id].append(pdu) + self.room_queues[event.room_id].append((pdu, origin)) return logger.debug("Processing event: %s", event.event_id) @@ -149,14 +149,49 @@ class FederationHandler(BaseHandler): # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None - if state: - is_in_room = yield self.auth.check_host_in_room( - event.room_id, - self.server_name + is_in_room = yield self.auth.check_host_in_room( + event.room_id, + self.server_name + ) + if not is_in_room and not event.outlier: + logger.debug("Got event for room we're not in.") + + replication_layer = self.replication_layer + auth_chain = yield replication_layer.get_event_auth( + origin, + context=event.room_id, + event_id=event.event_id, ) - if not is_in_room: - logger.debug("Got event for room we're not in.") - current_state = state + + for e in auth_chain: + e.outlier = True + try: + yield self._handle_new_event(e, fetch_missing=False) + except: + logger.exception( + "Failed to parse auth event %s", + e.event_id, + ) + + if not state: + state = yield replication_layer.get_state_for_context( + origin, + context=event.room_id, + event_id=event.event_id, + ) + + current_state = state + + if state: + for e in state: + e.outlier = True + try: + yield self._handle_new_event(e) + except: + logger.exception( + "Failed to parse state event %s", + e.event_id, + ) try: yield self._handle_new_event( @@ -251,6 +286,16 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_event_auth(self, event_id): auth = yield self.store.get_auth_chain(event_id) + + for event in auth: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + defer.returnValue([e for e in auth]) @log_function @@ -310,6 +355,7 @@ class FederationHandler(BaseHandler): state = ret["state"] auth_chain = ret["auth_chain"] + auth_chain.sort(key=lambda e: e.depth) logger.debug("do_invite_join auth_chain: %s", auth_chain) logger.debug("do_invite_join state: %s", state) @@ -328,23 +374,32 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.outlier = True - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event( - e, extra_users=[joinee] - ) + try: + yield self._handle_new_event(e, fetch_missing=False) + except: + logger.exception( + "Failed to parse auth event %s", + e.event_id, + ) for e in state: # FIXME: Auth these. e.outlier = True - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event( - e, extra_users=[joinee] - ) + try: + yield self._handle_new_event( + e, + fetch_missing=True + ) + except: + logger.exception( + "Failed to parse state event %s", + e.event_id, + ) yield self._handle_new_event( event, state=state, - current_state=state + current_state=state, ) yield self.notifier.on_new_room_event( @@ -356,9 +411,9 @@ class FederationHandler(BaseHandler): room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p in room_queue: + for p, origin in room_queue: try: - self.on_receive_pdu(p, backfilled=False) + self.on_receive_pdu(origin, p, backfilled=False) except: logger.exception("Couldn't handle pdu") @@ -507,7 +562,17 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - defer.returnValue(results.values()) + res = results.values() + for event in res: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + defer.returnValue(res) else: defer.returnValue([]) @@ -540,6 +605,17 @@ class FederationHandler(BaseHandler): ) if event: + # FIXME: This is a temporary work around where we occasionally + # return events slightly differently than when they were + # originally signed + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + if do_auth: in_room = yield self.auth.check_host_in_room( event.room_id, @@ -567,11 +643,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None): - if state: - for s in state: - yield self._handle_new_event(s) - + current_state=None, fetch_missing=True): is_new_state = yield self.state_handler.annotate_event_with_state( event, old_state=state @@ -611,11 +683,22 @@ class FederationHandler(BaseHandler): ) if not e: - raise AuthError( - 403, - "Can't find auth event %s." % (e_id, ) + e = yield self.replication_layer.get_pdu( + event.origin, e_id, outlier=True ) + if e and fetch_missing: + try: + yield self.on_receive_pdu(event.origin, e, False) + except: + logger.exception( + "Failed to parse auth event %s", + e_id, + ) + + if not e: + logger.warn("Can't find auth event %s.", e_id) + auth_events[(e.type, e.state_key)] = e if event.type == RoomMemberEvent.TYPE and not event.auth_events: |