summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py546
1 files changed, 377 insertions, 169 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4ff20599d6..c1bce07e31 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,6 +21,7 @@ from synapse.api.errors import (
     AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
 )
 from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
@@ -40,7 +41,6 @@ from twisted.internet import defer
 import itertools
 import logging
 
-
 logger = logging.getLogger(__name__)
 
 
@@ -58,6 +58,8 @@ class FederationHandler(BaseHandler):
     def __init__(self, hs):
         super(FederationHandler, self).__init__(hs)
 
+        self.hs = hs
+
         self.distributor.observe(
             "user_joined_room",
             self._on_user_joined
@@ -68,12 +70,9 @@ class FederationHandler(BaseHandler):
         self.store = hs.get_datastore()
         self.replication_layer = hs.get_replication_layer()
         self.state_handler = hs.get_state_handler()
-        # self.auth_handler = gs.get_auth_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
 
-        self.lock_manager = hs.get_room_lock_manager()
-
         self.replication_layer.set_handler(self)
 
         # When joining a room we need to queue any events for that room up
@@ -125,60 +124,72 @@ class FederationHandler(BaseHandler):
         )
         if not is_in_room and not event.internal_metadata.is_outlier():
             logger.debug("Got event for room we're not in.")
-            current_state = state
 
-        event_ids = set()
-        if state:
-            event_ids |= {e.event_id for e in state}
-        if auth_chain:
-            event_ids |= {e.event_id for e in auth_chain}
+            try:
+                event_stream_id, max_stream_id = yield self._persist_auth_tree(
+                    auth_chain, state, event
+                )
+            except AuthError as e:
+                raise FederationError(
+                    "ERROR",
+                    e.code,
+                    e.msg,
+                    affected=event.event_id,
+                )
 
-        seen_ids = set(
-            (yield self.store.have_events(event_ids)).keys()
-        )
+        else:
+            event_ids = set()
+            if state:
+                event_ids |= {e.event_id for e in state}
+            if auth_chain:
+                event_ids |= {e.event_id for e in auth_chain}
+
+            seen_ids = set(
+                (yield self.store.have_events(event_ids)).keys()
+            )
 
-        if state and auth_chain is not None:
-            # If we have any state or auth_chain given to us by the replication
-            # layer, then we should handle them (if we haven't before.)
+            if state and auth_chain is not None:
+                # If we have any state or auth_chain given to us by the replication
+                # layer, then we should handle them (if we haven't before.)
 
-            event_infos = []
+                event_infos = []
 
-            for e in itertools.chain(auth_chain, state):
-                if e.event_id in seen_ids:
-                    continue
-                e.internal_metadata.outlier = True
-                auth_ids = [e_id for e_id, _ in e.auth_events]
-                auth = {
-                    (e.type, e.state_key): e for e in auth_chain
-                    if e.event_id in auth_ids
-                }
-                event_infos.append({
-                    "event": e,
-                    "auth_events": auth,
-                })
-                seen_ids.add(e.event_id)
+                for e in itertools.chain(auth_chain, state):
+                    if e.event_id in seen_ids:
+                        continue
+                    e.internal_metadata.outlier = True
+                    auth_ids = [e_id for e_id, _ in e.auth_events]
+                    auth = {
+                        (e.type, e.state_key): e for e in auth_chain
+                        if e.event_id in auth_ids or e.type == EventTypes.Create
+                    }
+                    event_infos.append({
+                        "event": e,
+                        "auth_events": auth,
+                    })
+                    seen_ids.add(e.event_id)
 
-            yield self._handle_new_events(
-                origin,
-                event_infos,
-                outliers=True
-            )
+                yield self._handle_new_events(
+                    origin,
+                    event_infos,
+                    outliers=True
+                )
 
-        try:
-            _, event_stream_id, max_stream_id = yield self._handle_new_event(
-                origin,
-                event,
-                state=state,
-                backfilled=backfilled,
-                current_state=current_state,
-            )
-        except AuthError as e:
-            raise FederationError(
-                "ERROR",
-                e.code,
-                e.msg,
-                affected=event.event_id,
-            )
+            try:
+                _, event_stream_id, max_stream_id = yield self._handle_new_event(
+                    origin,
+                    event,
+                    state=state,
+                    backfilled=backfilled,
+                    current_state=current_state,
+                )
+            except AuthError as e:
+                raise FederationError(
+                    "ERROR",
+                    e.code,
+                    e.msg,
+                    affected=event.event_id,
+                )
 
         # if we're receiving valid events from an origin,
         # it's probably a good idea to mark it as not in retry-state
@@ -230,7 +241,7 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
-            room_id, frozenset(e.event_id for e in events),
+            frozenset(e.event_id for e in events),
             types=(
                 (EventTypes.RoomHistoryVisibility, ""),
                 (EventTypes.Member, None),
@@ -553,7 +564,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
+    def do_invite_join(self, target_hosts, room_id, joinee, content):
         """ Attempts to join the `joinee` to the room `room_id` via the
         server `target_host`.
 
@@ -569,49 +580,19 @@ class FederationHandler(BaseHandler):
 
         yield self.store.clean_room_for_join(room_id)
 
-        origin, pdu = yield self.replication_layer.make_join(
+        origin, event = yield self._make_and_verify_event(
             target_hosts,
             room_id,
-            joinee
+            joinee,
+            "join",
+            content,
         )
 
-        logger.debug("Got response to make_join: %s", pdu)
-
-        event = pdu
-
-        # We should assert some things.
-        # FIXME: Do this in a nicer way
-        assert(event.type == EventTypes.Member)
-        assert(event.user_id == joinee)
-        assert(event.state_key == joinee)
-        assert(event.room_id == room_id)
-
-        event.internal_metadata.outlier = False
-
         self.room_queues[room_id] = []
-
-        builder = self.event_builder_factory.new(
-            unfreeze(event.get_pdu_json())
-        )
-
         handled_events = set()
 
         try:
-            builder.event_id = self.event_builder_factory.create_event_id()
-            builder.origin = self.hs.hostname
-            builder.content = content
-
-            if not hasattr(event, "signatures"):
-                builder.signatures = {}
-
-            add_hashes_and_signatures(
-                builder,
-                self.hs.hostname,
-                self.hs.config.signing_key[0],
-            )
-
-            new_event = builder.build()
-
+            new_event = self._sign_event(event)
             # Try the host we successfully got a response to /make_join/
             # request first.
             try:
@@ -619,11 +600,7 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
-
-            ret = yield self.replication_layer.send_join(
-                target_hosts,
-                new_event
-            )
+            ret = yield self.replication_layer.send_join(target_hosts, new_event)
 
             origin = ret["origin"]
             state = ret["state"]
@@ -649,35 +626,8 @@ class FederationHandler(BaseHandler):
                 # FIXME
                 pass
 
-            ev_infos = []
-            for e in itertools.chain(state, auth_chain):
-                if e.event_id == event.event_id:
-                    continue
-
-                e.internal_metadata.outlier = True
-                auth_ids = [e_id for e_id, _ in e.auth_events]
-                ev_infos.append({
-                    "event": e,
-                    "auth_events": {
-                        (e.type, e.state_key): e for e in auth_chain
-                        if e.event_id in auth_ids
-                    }
-                })
-
-            yield self._handle_new_events(origin, ev_infos, outliers=True)
-
-            auth_ids = [e_id for e_id, _ in event.auth_events]
-            auth_events = {
-                (e.type, e.state_key): e for e in auth_chain
-                if e.event_id in auth_ids
-            }
-
-            _, event_stream_id, max_stream_id = yield self._handle_new_event(
-                origin,
-                new_event,
-                state=state,
-                current_state=state,
-                auth_events=auth_events,
+            event_stream_id, max_stream_id = yield self._persist_auth_tree(
+                auth_chain, state, event
             )
 
             with PreserveLoggingContext():
@@ -714,12 +664,14 @@ class FederationHandler(BaseHandler):
     @log_function
     def on_make_join_request(self, room_id, user_id):
         """ We've received a /make_join/ request, so we create a partial
-        join event for the room and return that. We don *not* persist or
+        join event for the room and return that. We do *not* persist or
         process it until the other server has signed it and sent it back.
         """
+        event_content = {"membership": Membership.JOIN}
+
         builder = self.event_builder_factory.new({
             "type": EventTypes.Member,
-            "content": {"membership": Membership.JOIN},
+            "content": event_content,
             "room_id": room_id,
             "sender": user_id,
             "state_key": user_id,
@@ -865,6 +817,168 @@ class FederationHandler(BaseHandler):
         defer.returnValue(event)
 
     @defer.inlineCallbacks
+    def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
+        origin, event = yield self._make_and_verify_event(
+            target_hosts,
+            room_id,
+            user_id,
+            "leave"
+        )
+        signed_event = self._sign_event(event)
+
+        # Try the host we successfully got a response to /make_join/
+        # request first.
+        try:
+            target_hosts.remove(origin)
+            target_hosts.insert(0, origin)
+        except ValueError:
+            pass
+
+        yield self.replication_layer.send_leave(
+            target_hosts,
+            signed_event
+        )
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
+                               content={},):
+        origin, pdu = yield self.replication_layer.make_membership_event(
+            target_hosts,
+            room_id,
+            user_id,
+            membership,
+            content,
+        )
+
+        logger.debug("Got response to make_%s: %s", membership, pdu)
+
+        event = pdu
+
+        # We should assert some things.
+        # FIXME: Do this in a nicer way
+        assert(event.type == EventTypes.Member)
+        assert(event.user_id == user_id)
+        assert(event.state_key == user_id)
+        assert(event.room_id == room_id)
+        defer.returnValue((origin, event))
+
+    def _sign_event(self, event):
+        event.internal_metadata.outlier = False
+
+        builder = self.event_builder_factory.new(
+            unfreeze(event.get_pdu_json())
+        )
+
+        builder.event_id = self.event_builder_factory.create_event_id()
+        builder.origin = self.hs.hostname
+
+        if not hasattr(event, "signatures"):
+            builder.signatures = {}
+
+        add_hashes_and_signatures(
+            builder,
+            self.hs.hostname,
+            self.hs.config.signing_key[0],
+        )
+
+        return builder.build()
+
+    @defer.inlineCallbacks
+    @log_function
+    def on_make_leave_request(self, room_id, user_id):
+        """ We've received a /make_leave/ request, so we create a partial
+        join event for the room and return that. We do *not* persist or
+        process it until the other server has signed it and sent it back.
+        """
+        builder = self.event_builder_factory.new({
+            "type": EventTypes.Member,
+            "content": {"membership": Membership.LEAVE},
+            "room_id": room_id,
+            "sender": user_id,
+            "state_key": user_id,
+        })
+
+        event, context = yield self._create_new_client_event(
+            builder=builder,
+        )
+
+        self.auth.check(event, auth_events=context.current_state)
+
+        defer.returnValue(event)
+
+    @defer.inlineCallbacks
+    @log_function
+    def on_send_leave_request(self, origin, pdu):
+        """ We have received a leave event for a room. Fully process it."""
+        event = pdu
+
+        logger.debug(
+            "on_send_leave_request: Got event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        event.internal_metadata.outlier = False
+
+        context, event_stream_id, max_stream_id = yield self._handle_new_event(
+            origin, event
+        )
+
+        logger.debug(
+            "on_send_leave_request: After _handle_new_event: %s, sigs: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
+
+        with PreserveLoggingContext():
+            d = self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id, extra_users=extra_users
+            )
+
+        def log_failure(f):
+            logger.warn(
+                "Failed to notify about %s: %s",
+                event.event_id, f.value
+            )
+
+        d.addErrback(log_failure)
+
+        new_pdu = event
+
+        destinations = set()
+
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.LEAVE:
+                        destinations.add(
+                            UserID.from_string(s.state_key).domain
+                        )
+            except:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        destinations.discard(origin)
+
+        logger.debug(
+            "on_send_leave_request: Sending event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        self.replication_layer.send_pdu(new_pdu, destinations)
+
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
     def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
         yield run_on_reactor()
 
@@ -986,8 +1100,6 @@ class FederationHandler(BaseHandler):
         context = yield self._prep_event(
             origin, event,
             state=state,
-            backfilled=backfilled,
-            current_state=current_state,
             auth_events=auth_events,
         )
 
@@ -1010,7 +1122,6 @@ class FederationHandler(BaseHandler):
                     origin,
                     ev_info["event"],
                     state=ev_info.get("state"),
-                    backfilled=backfilled,
                     auth_events=ev_info.get("auth_events"),
                 )
                 for ev_info in event_infos
@@ -1027,8 +1138,77 @@ class FederationHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _prep_event(self, origin, event, state=None, backfilled=False,
-                    current_state=None, auth_events=None):
+    def _persist_auth_tree(self, auth_events, state, event):
+        """Checks the auth chain is valid (and passes auth checks) for the
+        state and event. Then persists the auth chain and state atomically.
+        Persists the event seperately.
+
+        Returns:
+            2-tuple of (event_stream_id, max_stream_id) from the persist_event
+            call for `event`
+        """
+        events_to_context = {}
+        for e in itertools.chain(auth_events, state):
+            ctx = yield self.state_handler.compute_event_context(
+                e, outlier=True,
+            )
+            events_to_context[e.event_id] = ctx
+            e.internal_metadata.outlier = True
+
+        event_map = {
+            e.event_id: e
+            for e in auth_events
+        }
+
+        create_event = None
+        for e in auth_events:
+            if (e.type, e.state_key) == (EventTypes.Create, ""):
+                create_event = e
+                break
+
+        for e in itertools.chain(auth_events, state, [event]):
+            auth_for_e = {
+                (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id]
+                for e_id, _ in e.auth_events
+            }
+            if create_event:
+                auth_for_e[(EventTypes.Create, "")] = create_event
+
+            try:
+                self.auth.check(e, auth_events=auth_for_e)
+            except AuthError as err:
+                logger.warn(
+                    "Rejecting %s because %s",
+                    e.event_id, err.msg
+                )
+
+                if e == event:
+                    raise
+                events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
+
+        yield self.store.persist_events(
+            [
+                (e, events_to_context[e.event_id])
+                for e in itertools.chain(auth_events, state)
+            ],
+            is_new_state=False,
+        )
+
+        new_event_context = yield self.state_handler.compute_event_context(
+            event, old_state=state, outlier=False,
+        )
+
+        event_stream_id, max_stream_id = yield self.store.persist_event(
+            event, new_event_context,
+            backfilled=False,
+            is_new_state=True,
+            current_state=state,
+        )
+
+        defer.returnValue((event_stream_id, max_stream_id))
+
+    @defer.inlineCallbacks
+    def _prep_event(self, origin, event, state=None, auth_events=None):
         outlier = event.internal_metadata.is_outlier()
 
         context = yield self.state_handler.compute_event_context(
@@ -1061,6 +1241,10 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
+        if event.type == EventTypes.GuestAccess:
+            full_context = yield self.store.get_current_state(room_id=event.room_id)
+            yield self.maybe_kick_guest_users(event, full_context)
+
         defer.returnValue(context)
 
     @defer.inlineCallbacks
@@ -1166,7 +1350,7 @@ class FederationHandler(BaseHandler):
                         auth_ids = [e_id for e_id, _ in e.auth_events]
                         auth = {
                             (e.type, e.state_key): e for e in remote_auth_chain
-                            if e.event_id in auth_ids
+                            if e.event_id in auth_ids or e.type == EventTypes.Create
                         }
                         e.internal_metadata.outlier = True
 
@@ -1284,6 +1468,7 @@ class FederationHandler(BaseHandler):
                                 (e.type, e.state_key): e
                                 for e in result["auth_chain"]
                                 if e.event_id in auth_ids
+                                or event.type == EventTypes.Create
                             }
                             ev.internal_metadata.outlier = True
 
@@ -1458,50 +1643,73 @@ class FederationHandler(BaseHandler):
         })
 
     @defer.inlineCallbacks
-    def _handle_auth_events(self, origin, auth_events):
-        auth_ids_to_deferred = {}
-
-        def process_auth_ev(ev):
-            auth_ids = [e_id for e_id, _ in ev.auth_events]
-
-            prev_ds = [
-                auth_ids_to_deferred[i]
-                for i in auth_ids
-                if i in auth_ids_to_deferred
-            ]
-
-            d = defer.Deferred()
+    @log_function
+    def exchange_third_party_invite(self, invite):
+        sender = invite["sender"]
+        room_id = invite["room_id"]
 
-            auth_ids_to_deferred[ev.event_id] = d
+        event_dict = {
+            "type": EventTypes.Member,
+            "content": {
+                "membership": Membership.INVITE,
+                "third_party_invite": invite,
+            },
+            "room_id": room_id,
+            "sender": sender,
+            "state_key": invite["mxid"],
+        }
+
+        if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
+            builder = self.event_builder_factory.new(event_dict)
+            EventValidator().validate_new(builder)
+            event, context = yield self._create_new_client_event(builder=builder)
+            self.auth.check(event, context.current_state)
+            yield self._validate_keyserver(event, auth_events=context.current_state)
+            member_handler = self.hs.get_handlers().room_member_handler
+            yield member_handler.change_membership(event, context)
+        else:
+            destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
+            yield self.replication_layer.forward_third_party_invite(
+                destinations,
+                room_id,
+                event_dict,
+            )
 
-            @defer.inlineCallbacks
-            def f(*_):
-                ev.internal_metadata.outlier = True
+    @defer.inlineCallbacks
+    @log_function
+    def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+        builder = self.event_builder_factory.new(event_dict)
 
-                try:
-                    auth = {
-                        (e.type, e.state_key): e for e in auth_events
-                        if e.event_id in auth_ids
-                    }
+        event, context = yield self._create_new_client_event(
+            builder=builder,
+        )
 
-                    yield self._handle_new_event(
-                        origin, ev, auth_events=auth
-                    )
-                except:
-                    logger.exception(
-                        "Failed to handle auth event %s",
-                        ev.event_id,
-                    )
+        self.auth.check(event, auth_events=context.current_state)
+        yield self._validate_keyserver(event, auth_events=context.current_state)
 
-                d.callback(None)
+        returned_invite = yield self.send_invite(origin, event)
+        # TODO: Make sure the signatures actually are correct.
+        event.signatures.update(returned_invite.signatures)
+        member_handler = self.hs.get_handlers().room_member_handler
+        yield member_handler.change_membership(event, context)
 
-            if prev_ds:
-                dx = defer.DeferredList(prev_ds)
-                dx.addBoth(f)
-            else:
-                f()
+    @defer.inlineCallbacks
+    def _validate_keyserver(self, event, auth_events):
+        token = event.content["third_party_invite"]["signed"]["token"]
 
-        for e in auth_events:
-            process_auth_ev(e)
+        invite_event = auth_events.get(
+            (EventTypes.ThirdPartyInvite, token,)
+        )
 
-        yield defer.DeferredList(auth_ids_to_deferred.values())
+        try:
+            response = yield self.hs.get_simple_http_client().get_json(
+                invite_event.content["key_validity_url"],
+                {"public_key": invite_event.content["public_key"]}
+            )
+        except Exception:
+            raise SynapseError(
+                502,
+                "Third party certificate could not be checked"
+            )
+        if "valid" not in response or not response["valid"]:
+            raise AuthError(403, "Third party certificate was invalid")