diff options
author | Erik Johnston <erik@matrix.org> | 2014-11-26 12:06:36 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-11-26 12:06:36 +0000 |
commit | 48ee9ddb22b36f1adc23290e76287a922488596d (patch) | |
tree | d9eccedd90f9c573634c5456cfbbb65684214c6e /synapse/handlers/federation.py | |
parent | Use tagged version of matrix-angular-sdk (diff) | |
parent | Bump version numbers and change log (diff) | |
download | synapse-48ee9ddb22b36f1adc23290e76287a922488596d.tar.xz |
Merge branch 'release-v0.5.1' of github.com:matrix-org/synapse v0.5.1
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 225 |
1 files changed, 142 insertions, 83 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 492005a170..252c1f1684 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -24,7 +24,8 @@ from synapse.api.constants import Membership from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( - compute_event_signature, check_event_content_hash + compute_event_signature, check_event_content_hash, + add_hashes_and_signatures, ) from syutil.jsonutil import encode_canonical_json @@ -122,7 +123,8 @@ class FederationHandler(BaseHandler): event.origin, redacted_pdu_json ) except SynapseError as e: - logger.warn("Signature check failed for %s redacted to %s", + logger.warn( + "Signature check failed for %s redacted to %s", encode_canonical_json(pdu.get_pdu_json()), encode_canonical_json(redacted_pdu_json), ) @@ -140,15 +142,27 @@ class FederationHandler(BaseHandler): ) event = redacted_event - is_new_state = yield self.state_handler.annotate_event_with_state( - event, - old_state=state - ) - logger.debug("Event: %s", event) + # FIXME (erikj): Awful hack to make the case where we are not currently + # in the room work + current_state = None + if state: + is_in_room = yield self.auth.check_host_in_room( + event.room_id, + self.server_name + ) + if not is_in_room: + logger.debug("Got event for room we're not in.") + current_state = state + try: - self.auth.check(event, raises=True) + yield self._handle_new_event( + event, + state=state, + backfilled=backfilled, + current_state=current_state, + ) except AuthError as e: raise FederationError( "ERROR", @@ -157,43 +171,14 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - is_new_state = is_new_state and not backfilled - - # TODO: Implement something in federation that allows us to - # respond to PDU. - - yield self.store.persist_event( - event, - backfilled, - is_new_state=is_new_state - ) - room = yield self.store.get_room(event.room_id) if not room: - # Huh, let's try and get the current state - try: - yield self.replication_layer.get_state_for_context( - event.origin, event.room_id, event.event_id, - ) - - hosts = yield self.store.get_joined_hosts_for_room( - event.room_id - ) - if self.hs.hostname in hosts: - try: - yield self.store.store_room( - room_id=event.room_id, - room_creator_user_id="", - is_public=False, - ) - except: - pass - except: - logger.exception( - "Failed to get current state for room %s", - event.room_id - ) + yield self.store.store_room( + room_id=event.room_id, + room_creator_user_id="", + is_public=False, + ) if not backfilled: extra_users = [] @@ -209,7 +194,7 @@ class FederationHandler(BaseHandler): if event.type == RoomMemberEvent.TYPE: if event.membership == Membership.JOIN: user = self.hs.parse_userid(event.state_key) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) @@ -254,6 +239,8 @@ class FederationHandler(BaseHandler): pdu=event ) + + defer.returnValue(pdu) @defer.inlineCallbacks @@ -275,6 +262,8 @@ class FederationHandler(BaseHandler): We suspend processing of any received events from this room until we have finished processing the join. """ + logger.debug("Joining %s to %s", joinee, room_id) + pdu = yield self.replication_layer.make_join( target_host, room_id, @@ -297,19 +286,28 @@ class FederationHandler(BaseHandler): try: event.event_id = self.event_factory.create_event_id() + event.origin = self.hs.hostname event.content = content - state = yield self.replication_layer.send_join( + if not hasattr(event, "signatures"): + event.signatures = {} + + add_hashes_and_signatures( + event, + self.hs.hostname, + self.hs.config.signing_key[0], + ) + + ret = yield self.replication_layer.send_join( target_host, event ) - logger.debug("do_invite_join state: %s", state) + state = ret["state"] + auth_chain = ret["auth_chain"] - yield self.state_handler.annotate_event_with_state( - event, - old_state=state - ) + logger.debug("do_invite_join auth_chain: %s", auth_chain) + logger.debug("do_invite_join state: %s", state) logger.debug("do_invite_join event: %s", event) @@ -323,34 +321,41 @@ class FederationHandler(BaseHandler): # FIXME pass - for e in state: - # FIXME: Auth these. + for e in auth_chain: e.outlier = True - - yield self.state_handler.annotate_event_with_state( - e, + yield self._handle_new_event(e) + yield self.notifier.on_new_room_event( + e, extra_users=[joinee] ) - yield self.store.persist_event( - e, - backfilled=False, - is_new_state=True + for e in state: + # FIXME: Auth these. + e.outlier = True + yield self._handle_new_event(e) + yield self.notifier.on_new_room_event( + e, extra_users=[joinee] ) - yield self.store.persist_event( + yield self._handle_new_event( event, - backfilled=False, - is_new_state=True + state=state, + current_state=state + ) + + yield self.notifier.on_new_room_event( + event, extra_users=[joinee] ) + + logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] del self.room_queues[room_id] for p in room_queue: try: - yield self.on_receive_pdu(p, backfilled=False) + self.on_receive_pdu(p, backfilled=False) except: - pass + logger.exception("Couldn't handle pdu") defer.returnValue(True) @@ -374,7 +379,7 @@ class FederationHandler(BaseHandler): yield self.state_handler.annotate_event_with_state(event) yield self.auth.add_auth_events(event) - self.auth.check(event, raises=True) + self.auth.check(event, auth_events=event.old_state_events) pdu = event @@ -390,16 +395,7 @@ class FederationHandler(BaseHandler): event.outlier = False - is_new_state = yield self.state_handler.annotate_event_with_state(event) - self.auth.check(event, raises=True) - - # FIXME (erikj): All this is duplicated above :( - - yield self.store.persist_event( - event, - backfilled=False, - is_new_state=is_new_state - ) + yield self._handle_new_event(event) extra_users = [] if event.type == RoomMemberEvent.TYPE: @@ -412,9 +408,9 @@ class FederationHandler(BaseHandler): ) if event.type == RoomMemberEvent.TYPE: - if event.membership == Membership.JOIN: + if event.content["membership"] == Membership.JOIN: user = self.hs.parse_userid(event.state_key) - self.distributor.fire( + yield self.distributor.fire( "user_joined_room", user=user, room_id=event.room_id ) @@ -527,7 +523,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, origin, event_id): + def get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. Returns: @@ -539,12 +535,13 @@ class FederationHandler(BaseHandler): ) if event: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin - ) - if not in_room: - raise AuthError(403, "Host not in room.") + if do_auth: + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") defer.returnValue(event) else: @@ -562,3 +559,65 @@ class FederationHandler(BaseHandler): ) while waiters: waiters.pop().callback(None) + + @defer.inlineCallbacks + def _handle_new_event(self, event, state=None, backfilled=False, + current_state=None): + if state: + for s in state: + yield self._handle_new_event(s) + + is_new_state = yield self.state_handler.annotate_event_with_state( + event, + old_state=state + ) + + if event.old_state_events: + known_ids = set( + [s.event_id for s in event.old_state_events.values()] + ) + for e_id, _ in event.auth_events: + if e_id not in known_ids: + e = yield self.store.get_event( + e_id, + allow_none=True, + ) + + if not e: + # TODO: Do some conflict res to make sure that we're + # not the ones who are wrong. + logger.info( + "Rejecting %s as %s not in %s", + event.event_id, e_id, known_ids, + ) + raise AuthError(403, "Auth events are stale") + + auth_events = event.old_state_events + else: + # We need to get the auth events from somewhere. + + # TODO: Don't just hit the DBs? + + auth_events = {} + for e_id, _ in event.auth_events: + e = yield self.store.get_event( + e_id, + allow_none=True, + ) + + if not e: + raise AuthError( + 403, + "Can't find auth event %s." % (e_id, ) + ) + + auth_events[(e.type, e.state_key)] = e + + self.auth.check(event, auth_events=auth_events) + + yield self.store.persist_event( + event, + backfilled=backfilled, + is_new_state=(is_new_state and not backfilled), + current_state=current_state, + ) |