summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py225
1 files changed, 142 insertions, 83 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 492005a170..252c1f1684 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -24,7 +24,8 @@ from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import (
-    compute_event_signature, check_event_content_hash
+    compute_event_signature, check_event_content_hash,
+    add_hashes_and_signatures,
 )
 from syutil.jsonutil import encode_canonical_json
 
@@ -122,7 +123,8 @@ class FederationHandler(BaseHandler):
                 event.origin, redacted_pdu_json
             )
         except SynapseError as e:
-            logger.warn("Signature check failed for %s redacted to %s",
+            logger.warn(
+                "Signature check failed for %s redacted to %s",
                 encode_canonical_json(pdu.get_pdu_json()),
                 encode_canonical_json(redacted_pdu_json),
             )
@@ -140,15 +142,27 @@ class FederationHandler(BaseHandler):
             )
             event = redacted_event
 
-        is_new_state = yield self.state_handler.annotate_event_with_state(
-            event,
-            old_state=state
-        )
-
         logger.debug("Event: %s", event)
 
+        # FIXME (erikj): Awful hack to make the case where we are not currently
+        # in the room work
+        current_state = None
+        if state:
+            is_in_room = yield self.auth.check_host_in_room(
+                event.room_id,
+                self.server_name
+            )
+            if not is_in_room:
+                logger.debug("Got event for room we're not in.")
+                current_state = state
+
         try:
-            self.auth.check(event, raises=True)
+            yield self._handle_new_event(
+                event,
+                state=state,
+                backfilled=backfilled,
+                current_state=current_state,
+            )
         except AuthError as e:
             raise FederationError(
                 "ERROR",
@@ -157,43 +171,14 @@ class FederationHandler(BaseHandler):
                 affected=event.event_id,
             )
 
-        is_new_state = is_new_state and not backfilled
-
-        # TODO: Implement something in federation that allows us to
-        # respond to PDU.
-
-        yield self.store.persist_event(
-            event,
-            backfilled,
-            is_new_state=is_new_state
-        )
-
         room = yield self.store.get_room(event.room_id)
 
         if not room:
-            # Huh, let's try and get the current state
-            try:
-                yield self.replication_layer.get_state_for_context(
-                    event.origin, event.room_id, event.event_id,
-                )
-
-                hosts = yield self.store.get_joined_hosts_for_room(
-                    event.room_id
-                )
-                if self.hs.hostname in hosts:
-                    try:
-                        yield self.store.store_room(
-                            room_id=event.room_id,
-                            room_creator_user_id="",
-                            is_public=False,
-                        )
-                    except:
-                        pass
-            except:
-                logger.exception(
-                    "Failed to get current state for room %s",
-                    event.room_id
-                )
+            yield self.store.store_room(
+                room_id=event.room_id,
+                room_creator_user_id="",
+                is_public=False,
+            )
 
         if not backfilled:
             extra_users = []
@@ -209,7 +194,7 @@ class FederationHandler(BaseHandler):
         if event.type == RoomMemberEvent.TYPE:
             if event.membership == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
-                self.distributor.fire(
+                yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
 
@@ -254,6 +239,8 @@ class FederationHandler(BaseHandler):
             pdu=event
         )
 
+
+
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
@@ -275,6 +262,8 @@ class FederationHandler(BaseHandler):
         We suspend processing of any received events from this room until we
         have finished processing the join.
         """
+        logger.debug("Joining %s to %s", joinee, room_id)
+
         pdu = yield self.replication_layer.make_join(
             target_host,
             room_id,
@@ -297,19 +286,28 @@ class FederationHandler(BaseHandler):
 
         try:
             event.event_id = self.event_factory.create_event_id()
+            event.origin = self.hs.hostname
             event.content = content
 
-            state = yield self.replication_layer.send_join(
+            if not hasattr(event, "signatures"):
+                event.signatures = {}
+
+            add_hashes_and_signatures(
+                event,
+                self.hs.hostname,
+                self.hs.config.signing_key[0],
+            )
+
+            ret = yield self.replication_layer.send_join(
                 target_host,
                 event
             )
 
-            logger.debug("do_invite_join state: %s", state)
+            state = ret["state"]
+            auth_chain = ret["auth_chain"]
 
-            yield self.state_handler.annotate_event_with_state(
-                event,
-                old_state=state
-            )
+            logger.debug("do_invite_join auth_chain: %s", auth_chain)
+            logger.debug("do_invite_join state: %s", state)
 
             logger.debug("do_invite_join event: %s", event)
 
@@ -323,34 +321,41 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            for e in state:
-                # FIXME: Auth these.
+            for e in auth_chain:
                 e.outlier = True
-
-                yield self.state_handler.annotate_event_with_state(
-                    e,
+                yield self._handle_new_event(e)
+                yield self.notifier.on_new_room_event(
+                    e, extra_users=[joinee]
                 )
 
-                yield self.store.persist_event(
-                    e,
-                    backfilled=False,
-                    is_new_state=True
+            for e in state:
+                # FIXME: Auth these.
+                e.outlier = True
+                yield self._handle_new_event(e)
+                yield self.notifier.on_new_room_event(
+                    e, extra_users=[joinee]
                 )
 
-            yield self.store.persist_event(
+            yield self._handle_new_event(
                 event,
-                backfilled=False,
-                is_new_state=True
+                state=state,
+                current_state=state
+            )
+
+            yield self.notifier.on_new_room_event(
+                event, extra_users=[joinee]
             )
+
+            logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
             for p in room_queue:
                 try:
-                    yield self.on_receive_pdu(p, backfilled=False)
+                    self.on_receive_pdu(p, backfilled=False)
                 except:
-                    pass
+                    logger.exception("Couldn't handle pdu")
 
         defer.returnValue(True)
 
@@ -374,7 +379,7 @@ class FederationHandler(BaseHandler):
 
         yield self.state_handler.annotate_event_with_state(event)
         yield self.auth.add_auth_events(event)
-        self.auth.check(event, raises=True)
+        self.auth.check(event, auth_events=event.old_state_events)
 
         pdu = event
 
@@ -390,16 +395,7 @@ class FederationHandler(BaseHandler):
 
         event.outlier = False
 
-        is_new_state = yield self.state_handler.annotate_event_with_state(event)
-        self.auth.check(event, raises=True)
-
-        # FIXME (erikj):  All this is duplicated above :(
-
-        yield self.store.persist_event(
-            event,
-            backfilled=False,
-            is_new_state=is_new_state
-        )
+        yield self._handle_new_event(event)
 
         extra_users = []
         if event.type == RoomMemberEvent.TYPE:
@@ -412,9 +408,9 @@ class FederationHandler(BaseHandler):
         )
 
         if event.type == RoomMemberEvent.TYPE:
-            if event.membership == Membership.JOIN:
+            if event.content["membership"] == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
-                self.distributor.fire(
+                yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
 
@@ -527,7 +523,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def get_persisted_pdu(self, origin, event_id):
+    def get_persisted_pdu(self, origin, event_id, do_auth=True):
         """ Get a PDU from the database with given origin and id.
 
         Returns:
@@ -539,12 +535,13 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            in_room = yield self.auth.check_host_in_room(
-                event.room_id,
-                origin
-            )
-            if not in_room:
-                raise AuthError(403, "Host not in room.")
+            if do_auth:
+                in_room = yield self.auth.check_host_in_room(
+                    event.room_id,
+                    origin
+                )
+                if not in_room:
+                    raise AuthError(403, "Host not in room.")
 
             defer.returnValue(event)
         else:
@@ -562,3 +559,65 @@ class FederationHandler(BaseHandler):
         )
         while waiters:
             waiters.pop().callback(None)
+
+    @defer.inlineCallbacks
+    def _handle_new_event(self, event, state=None, backfilled=False,
+                          current_state=None):
+        if state:
+            for s in state:
+                yield self._handle_new_event(s)
+
+        is_new_state = yield self.state_handler.annotate_event_with_state(
+            event,
+            old_state=state
+        )
+
+        if event.old_state_events:
+            known_ids = set(
+                [s.event_id for s in event.old_state_events.values()]
+            )
+            for e_id, _ in event.auth_events:
+                if e_id not in known_ids:
+                    e = yield self.store.get_event(
+                        e_id,
+                        allow_none=True,
+                    )
+
+                    if not e:
+                        # TODO: Do some conflict res to make sure that we're
+                        # not the ones who are wrong.
+                        logger.info(
+                            "Rejecting %s as %s not in %s",
+                            event.event_id, e_id, known_ids,
+                        )
+                        raise AuthError(403, "Auth events are stale")
+
+            auth_events = event.old_state_events
+        else:
+            # We need to get the auth events from somewhere.
+
+            # TODO: Don't just hit the DBs?
+
+            auth_events = {}
+            for e_id, _ in event.auth_events:
+                e = yield self.store.get_event(
+                    e_id,
+                    allow_none=True,
+                )
+
+                if not e:
+                    raise AuthError(
+                        403,
+                        "Can't find auth event %s." % (e_id, )
+                    )
+
+                auth_events[(e.type, e.state_key)] = e
+
+        self.auth.check(event, auth_events=auth_events)
+
+        yield self.store.persist_event(
+            event,
+            backfilled=backfilled,
+            is_new_state=(is_new_state and not backfilled),
+            current_state=current_state,
+        )