summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/federation.py144
1 files changed, 92 insertions, 52 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f601de4488..14066ac4f3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -24,7 +24,8 @@ from synapse.api.constants import Membership
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import (
-    compute_event_signature, check_event_content_hash
+    compute_event_signature, check_event_content_hash,
+    add_hashes_and_signatures,
 )
 from syutil.jsonutil import encode_canonical_json
 
@@ -141,15 +142,14 @@ class FederationHandler(BaseHandler):
             )
             event = redacted_event
 
-        is_new_state = yield self.state_handler.annotate_event_with_state(
-            event,
-            old_state=state
-        )
-
         logger.debug("Event: %s", event)
 
         try:
-            self.auth.check(event, raises=True)
+            yield self._handle_new_event(
+                event,
+                state=state,
+                backfilled=backfilled
+            )
         except AuthError as e:
             raise FederationError(
                 "ERROR",
@@ -158,17 +158,6 @@ class FederationHandler(BaseHandler):
                 affected=event.event_id,
             )
 
-        is_new_state = is_new_state and not backfilled
-
-        # TODO: Implement something in federation that allows us to
-        # respond to PDU.
-
-        yield self.store.persist_event(
-            event,
-            backfilled,
-            is_new_state=is_new_state
-        )
-
         room = yield self.store.get_room(event.room_id)
 
         if not room:
@@ -276,6 +265,8 @@ class FederationHandler(BaseHandler):
         We suspend processing of any received events from this room until we
         have finished processing the join.
         """
+        logger.debug("Joining %s to %s", joinee, room_id)
+
         pdu = yield self.replication_layer.make_join(
             target_host,
             room_id,
@@ -298,19 +289,28 @@ class FederationHandler(BaseHandler):
 
         try:
             event.event_id = self.event_factory.create_event_id()
+            event.origin = self.hs.hostname
             event.content = content
 
-            state = yield self.replication_layer.send_join(
+            if not hasattr(event, "signatures"):
+                event.signatures = {}
+
+            add_hashes_and_signatures(
+                event,
+                self.hs.hostname,
+                self.hs.config.signing_key[0],
+            )
+
+            ret = yield self.replication_layer.send_join(
                 target_host,
                 event
             )
 
-            logger.debug("do_invite_join state: %s", state)
+            state = ret["state"]
+            auth_chain = ret["auth_chain"]
 
-            yield self.state_handler.annotate_event_with_state(
-                event,
-                old_state=state
-            )
+            logger.debug("do_invite_join auth_chain: %s", auth_chain)
+            logger.debug("do_invite_join state: %s", state)
 
             logger.debug("do_invite_join event: %s", event)
 
@@ -324,34 +324,31 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
+            for e in auth_chain:
+                e.outlier = True
+                yield self._handle_new_event(e)
+
             for e in state:
                 # FIXME: Auth these.
                 e.outlier = True
+                yield self._handle_new_event(e)
 
-                yield self.state_handler.annotate_event_with_state(
-                    e,
-                )
-
-                yield self.store.persist_event(
-                    e,
-                    backfilled=False,
-                    is_new_state=True
-                )
+            yield self._handle_new_event(event, state=state)
 
-            yield self.store.persist_event(
-                event,
-                backfilled=False,
-                is_new_state=True
+            yield self.notifier.on_new_room_event(
+                event, extra_users=[joinee]
             )
+
+            logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
             for p in room_queue:
                 try:
-                    yield self.on_receive_pdu(p, backfilled=False)
+                    self.on_receive_pdu(p, backfilled=False)
                 except:
-                    pass
+                    logger.exception("Couldn't handle pdu")
 
         defer.returnValue(True)
 
@@ -375,7 +372,7 @@ class FederationHandler(BaseHandler):
 
         yield self.state_handler.annotate_event_with_state(event)
         yield self.auth.add_auth_events(event)
-        self.auth.check(event, raises=True)
+        self.auth.check(event, auth_events=event.old_state_events)
 
         pdu = event
 
@@ -391,17 +388,7 @@ class FederationHandler(BaseHandler):
 
         event.outlier = False
 
-        state_handler = self.state_handler
-        is_new_state = yield state_handler.annotate_event_with_state(event)
-        self.auth.check(event, raises=True)
-
-        # FIXME (erikj):  All this is duplicated above :(
-
-        yield self.store.persist_event(
-            event,
-            backfilled=False,
-            is_new_state=is_new_state
-        )
+        yield self._handle_new_event(event)
 
         extra_users = []
         if event.type == RoomMemberEvent.TYPE:
@@ -414,7 +401,7 @@ class FederationHandler(BaseHandler):
         )
 
         if event.type == RoomMemberEvent.TYPE:
-            if event.membership == Membership.JOIN:
+            if event.content["membership"] == Membership.JOIN:
                 user = self.hs.parse_userid(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
@@ -565,3 +552,56 @@ class FederationHandler(BaseHandler):
         )
         while waiters:
             waiters.pop().callback(None)
+
+    @defer.inlineCallbacks
+    def _handle_new_event(self, event, state=None, backfilled=False):
+        is_new_state = yield self.state_handler.annotate_event_with_state(
+            event,
+            old_state=state
+        )
+
+        if event.old_state_events:
+            known_ids = set(
+                [s.event_id for s in event.old_state_events.values()]
+            )
+            for e_id, _ in event.auth_events:
+                if e_id not in known_ids:
+                    e = yield self.store.get_event(
+                        e_id,
+                        allow_none=True,
+                    )
+
+                    if not e:
+                        # TODO: Do some conflict res to make sure that we're
+                        # not the ones who are wrong.
+                        logger.info(
+                            "Rejecting %s as %s not in %s",
+                            event.event_id, e_id, known_ids,
+                        )
+                        raise AuthError(403, "Auth events are stale")
+
+            auth_events = event.old_state_events
+        else:
+            # We need to get the auth events from somewhere.
+
+            # TODO: Don't just hit the DBs?
+
+            auth_events = {}
+            for e_id, _ in event.auth_events:
+                e = yield self.store.get_event(
+                    e_id,
+                    allow_none=True,
+                )
+
+                if not e:
+                    raise AuthError(403, "Can't find auth event.")
+
+                auth_events[(e.type, e.state_key)] = e
+
+        self.auth.check(event, auth_events=auth_events)
+
+        yield self.store.persist_event(
+            event,
+            backfilled=backfilled,
+            is_new_state=(is_new_state and not backfilled)
+        )