diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
commit | 951690e54d5673431abefafc30c94af6e11341e3 (patch) | |
tree | 779471113f16dfbf7e67504731dd412101eee180 /synapse/handlers/federation.py | |
parent | Fix user query checks. HS>AS pushing now works. (diff) | |
parent | Merge branch 'federation_client_retries' of github.com:matrix-org/synapse int... (diff) | |
download | synapse-951690e54d5673431abefafc30c94af6e11341e3.tar.xz |
Merge branch 'develop' into application-services
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 193 |
1 files changed, 116 insertions, 77 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8bf5a4cc11..aba266c2bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,6 +30,7 @@ from synapse.types import UserID from twisted.internet import defer +import itertools import logging @@ -123,8 +124,21 @@ class FederationHandler(BaseHandler): logger.debug("Got event for room we're not in.") current_state = state + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + if state and auth_chain is not None: - for e in state: + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue + e.internal_metadata.outlier = True try: auth_ids = [e_id for e_id, _ in e.auth_events] @@ -132,7 +146,10 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in auth_chain if e.event_id in auth_ids } - yield self._handle_new_event(origin, e, auth_events=auth) + yield self._handle_new_event( + origin, e, auth_events=auth + ) + seen_ids.add(e.event_id) except: logger.exception( "Failed to handle state event %s", @@ -256,7 +273,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -270,8 +287,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - pdu = yield self.replication_layer.make_join( - target_host, + origin, pdu = yield self.replication_layer.make_join( + target_hosts, room_id, joinee ) @@ -313,11 +330,17 @@ class FederationHandler(BaseHandler): new_event = builder.build() + # Try the host we successfully got a response to /make_join/ + # request first. + target_hosts.remove(origin) + target_hosts.insert(0, origin) + ret = yield self.replication_layer.send_join( - target_host, + target_hosts, new_event ) + origin = ret["origin"] state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) @@ -354,7 +377,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -374,7 +397,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -389,7 +412,7 @@ class FederationHandler(BaseHandler): } yield self._handle_new_event( - target_host, + origin, new_event, state=state, current_state=state, @@ -498,6 +521,8 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) + destinations.remove(origin) + logger.debug( "on_send_join_request: Sending event: %s, signatures: %s", event.event_id, @@ -618,6 +643,7 @@ class FederationHandler(BaseHandler): event = yield self.store.get_event( event_id, allow_none=True, + allow_rejected=True, ) if event: @@ -701,6 +727,8 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + # FIXME: Don't store as rejected with AUTH_ERROR if we haven't + # seen all the auth events. yield self.store.persist_event( event, context=context, @@ -750,7 +778,7 @@ class FederationHandler(BaseHandler): ) ) - logger.debug("on_query_auth reutrning: %s", ret) + logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @@ -770,41 +798,45 @@ class FederationHandler(BaseHandler): if missing_auth: logger.debug("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. - remote_auth_chain = yield self.replication_layer.get_event_auth( - origin, event.room_id, event.event_id - ) + try: + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in remote_auth_chain] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) - for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): - continue + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue - if e.event_id == event.event_id: - continue + if e.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in remote_auth_chain - if e.event_id in auth_ids - } - e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + e.internal_metadata.outlier = True - logger.debug( - "do_auth %s missing_auth: %s", - event.event_id, e.event_id - ) - yield self._handle_new_event( - origin, e, auth_events=auth - ) + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( + origin, e, auth_events=auth + ) - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e - except AuthError: - pass + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + except: + # FIXME: + logger.exception("Failed to get auth chain") # FIXME: Assumes we have and stored all the state for all the # prev_events @@ -816,50 +848,57 @@ class FederationHandler(BaseHandler): logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. - auth_ids = self.auth.compute_auth_events(event, context) - local_auth_chain = yield self.store.get_auth_chain(auth_ids) - - # 2. Get remote difference. - result = yield self.replication_layer.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) - - seen_remotes = yield self.store.have_events( - [e.event_id for e in result["auth_chain"]] + auth_ids = self.auth.compute_auth_events( + event, context.current_state ) + local_auth_chain = yield self.store.get_auth_chain(auth_ids) - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): - continue + try: + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - if ev.event_id == event.event_id: - continue + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) - try: - auth_ids = [e_id for e_id, _ in ev.auth_events] - auth = { - (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - } - ev.internal_metadata.outlier = True + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue + + if ev.event_id == event.event_id: + continue + + try: + auth_ids = [e_id for e_id, _ in ev.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + ev.internal_metadata.outlier = True + + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + yield self._handle_new_event( + origin, ev, auth_events=auth + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + except: + # FIXME: + logger.exception("Failed to query auth chain") # 4. Look at rejects and their proofs. # TODO. @@ -983,7 +1022,7 @@ class FederationHandler(BaseHandler): if reason is None: # FIXME: ERRR?! logger.warn("Could not find reason for %s", e.event_id) - raise RuntimeError("") + raise RuntimeError("Could not find reason for %s" % e.event_id) reason_map[e.event_id] = reason |