summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py186
1 files changed, 109 insertions, 77 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 6c19d6ae8c..f599e817aa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,9 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
 
@@ -221,19 +224,11 @@ class FederationHandler(BaseHandler):
                 extra_users.append(target_user)
 
             with PreserveLoggingContext():
-                d = self.notifier.on_new_room_event(
+                self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
                     extra_users=extra_users
                 )
 
-            def log_failure(f):
-                logger.warn(
-                    "Failed to notify about %s: %s",
-                    event.event_id, f.value
-                )
-
-            d.addErrback(log_failure)
-
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 prev_state = context.current_state.get((event.type, event.state_key))
@@ -244,12 +239,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-        if not backfilled and not event.internal_metadata.is_outlier():
-            action_generator = ActionGenerator(self.hs)
-            yield action_generator.handle_push_actions_for_event(
-                event, self
-            )
-
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
@@ -483,7 +472,7 @@ class FederationHandler(BaseHandler):
                         limit=100,
                         extremities=[e for e in extremities.keys()]
                     )
-                except SynapseError:
+                except SynapseError as e:
                     logger.info(
                         "Failed to backfill from %s because %s",
                         dom, e,
@@ -643,19 +632,11 @@ class FederationHandler(BaseHandler):
             )
 
             with PreserveLoggingContext():
-                d = self.notifier.on_new_room_event(
+                self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
                     extra_users=[joinee]
                 )
 
-            def log_failure(f):
-                logger.warn(
-                    "Failed to notify about %s: %s",
-                    event.event_id, f.value
-                )
-
-            d.addErrback(log_failure)
-
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -730,18 +711,10 @@ class FederationHandler(BaseHandler):
             extra_users.append(target_user)
 
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -811,19 +784,11 @@ class FederationHandler(BaseHandler):
 
         target_user = UserID.from_string(event.state_key)
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=[target_user],
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         defer.returnValue(event)
 
     @defer.inlineCallbacks
@@ -848,7 +813,22 @@ class FederationHandler(BaseHandler):
             target_hosts,
             signed_event
         )
-        defer.returnValue(None)
+
+        context = yield self.state_handler.compute_event_context(event)
+
+        event_stream_id, max_stream_id = yield self.store.persist_event(
+            event,
+            context=context,
+            backfilled=False,
+        )
+
+        target_user = UserID.from_string(event.state_key)
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=[target_user],
+        )
+
+        defer.returnValue(event)
 
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
@@ -948,18 +928,10 @@ class FederationHandler(BaseHandler):
             extra_users.append(target_user)
 
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         new_pdu = event
 
         destinations = set()
@@ -1113,6 +1085,12 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        if not backfilled and not event.internal_metadata.is_outlier():
+            action_generator = ActionGenerator(self.hs)
+            yield action_generator.handle_push_actions_for_event(
+                event, context, self
+            )
+
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
@@ -1186,7 +1164,13 @@ class FederationHandler(BaseHandler):
 
             try:
                 self.auth.check(e, auth_events=auth_for_e)
-            except AuthError as err:
+            except SynapseError as err:
+                # we may get SynapseErrors here as well as AuthErrors. For
+                # instance, there are a couple of (ancient) events in some
+                # rooms whose senders do not have the correct sigil; these
+                # cause SynapseErrors in auth.check. We don't want to give up
+                # the attempt to federate altogether in such cases.
+
                 logger.warn(
                     "Rejecting %s because %s",
                     e.event_id, err.msg
@@ -1654,19 +1638,15 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def exchange_third_party_invite(self, invite):
-        sender = invite["sender"]
-        room_id = invite["room_id"]
-
-        if "signed" not in invite or "token" not in invite["signed"]:
-            logger.info(
-                "Discarding received notification of third party invite "
-                "without signed: %s" % (invite,)
-            )
-            return
-
+    def exchange_third_party_invite(
+            self,
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+    ):
         third_party_invite = {
-            "signed": invite["signed"],
+            "signed": signed,
         }
 
         event_dict = {
@@ -1676,8 +1656,8 @@ class FederationHandler(BaseHandler):
                 "third_party_invite": third_party_invite,
             },
             "room_id": room_id,
-            "sender": sender,
-            "state_key": invite["mxid"],
+            "sender": sender_user_id,
+            "state_key": target_user_id,
         }
 
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
@@ -1690,11 +1670,11 @@ class FederationHandler(BaseHandler):
             )
 
             self.auth.check(event, context.current_state)
-            yield self._validate_keyserver(event, auth_events=context.current_state)
+            yield self._check_signature(event, auth_events=context.current_state)
             member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.send_membership_event(event, context)
+            yield member_handler.send_membership_event(None, event, context)
         else:
-            destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
+            destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
             yield self.replication_layer.forward_third_party_invite(
                 destinations,
                 room_id,
@@ -1715,13 +1695,13 @@ class FederationHandler(BaseHandler):
         )
 
         self.auth.check(event, auth_events=context.current_state)
-        yield self._validate_keyserver(event, auth_events=context.current_state)
+        yield self._check_signature(event, auth_events=context.current_state)
 
         returned_invite = yield self.send_invite(origin, event)
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
         member_handler = self.hs.get_handlers().room_member_handler
-        yield member_handler.send_membership_event(event, context)
+        yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
     def add_display_name_to_third_party_invite(self, event_dict, event, context):
@@ -1745,17 +1725,69 @@ class FederationHandler(BaseHandler):
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
-    def _validate_keyserver(self, event, auth_events):
-        token = event.content["third_party_invite"]["signed"]["token"]
+    def _check_signature(self, event, auth_events):
+        """
+        Checks that the signature in the event is consistent with its invite.
+        :param event (Event): The m.room.member event to check
+        :param auth_events (dict<(event type, state_key), event>)
+
+        :raises
+            AuthError if signature didn't match any keys, or key has been
+                revoked,
+            SynapseError if a transient error meant a key couldn't be checked
+                for revocation.
+        """
+        signed = event.content["third_party_invite"]["signed"]
+        token = signed["token"]
 
         invite_event = auth_events.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
+        if not invite_event:
+            raise AuthError(403, "Could not find invite")
+
+        last_exception = None
+        for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
+            try:
+                for server, signature_block in signed["signatures"].items():
+                    for key_name, encoded_signature in signature_block.items():
+                        if not key_name.startswith("ed25519:"):
+                            continue
+
+                        public_key = public_key_object["public_key"]
+                        verify_key = decode_verify_key_bytes(
+                            key_name,
+                            decode_base64(public_key)
+                        )
+                        verify_signed_json(signed, server, verify_key)
+                        if "key_validity_url" in public_key_object:
+                            yield self._check_key_revocation(
+                                public_key,
+                                public_key_object["key_validity_url"]
+                            )
+                        return
+            except Exception as e:
+                last_exception = e
+        raise last_exception
+
+    @defer.inlineCallbacks
+    def _check_key_revocation(self, public_key, url):
+        """
+        Checks whether public_key has been revoked.
+
+        :param public_key (str): base-64 encoded public key.
+        :param url (str): Key revocation URL.
+
+        :raises
+            AuthError if they key has been revoked.
+            SynapseError if a transient error meant a key couldn't be checked
+                for revocation.
+        """
         try:
             response = yield self.hs.get_simple_http_client().get_json(
-                invite_event.content["key_validity_url"],
-                {"public_key": invite_event.content["public_key"]}
+                url,
+                {"public_key": public_key}
             )
         except Exception:
             raise SynapseError(