diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 186 |
1 files changed, 109 insertions, 77 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6c19d6ae8c..f599e817aa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -14,6 +14,9 @@ # limitations under the License. """Contains handlers for federation events.""" +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json +from unpaddedbase64 import decode_base64 from ._base import BaseHandler @@ -221,19 +224,11 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.membership == Membership.JOIN: prev_state = context.current_state.get((event.type, event.state_key)) @@ -244,12 +239,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, self - ) - @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -483,7 +472,7 @@ class FederationHandler(BaseHandler): limit=100, extremities=[e for e in extremities.keys()] ) - except SynapseError: + except SynapseError as e: logger.info( "Failed to backfill from %s because %s", dom, e, @@ -643,19 +632,11 @@ class FederationHandler(BaseHandler): ) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[joinee] ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - logger.debug("Finished joining %s to %s", joinee, room_id) finally: room_queue = self.room_queues[room_id] @@ -730,18 +711,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) @@ -811,19 +784,11 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(event.state_key) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=[target_user], ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - defer.returnValue(event) @defer.inlineCallbacks @@ -848,7 +813,22 @@ class FederationHandler(BaseHandler): target_hosts, signed_event ) - defer.returnValue(None) + + context = yield self.state_handler.compute_event_context(event) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=False, + ) + + target_user = UserID.from_string(event.state_key) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) + + defer.returnValue(event) @defer.inlineCallbacks def _make_and_verify_event(self, target_hosts, room_id, user_id, membership, @@ -948,18 +928,10 @@ class FederationHandler(BaseHandler): extra_users.append(target_user) with PreserveLoggingContext(): - d = self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) - def log_failure(f): - logger.warn( - "Failed to notify about %s: %s", - event.event_id, f.value - ) - - d.addErrback(log_failure) - new_pdu = event destinations = set() @@ -1113,6 +1085,12 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context, self + ) + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, @@ -1186,7 +1164,13 @@ class FederationHandler(BaseHandler): try: self.auth.check(e, auth_events=auth_for_e) - except AuthError as err: + except SynapseError as err: + # we may get SynapseErrors here as well as AuthErrors. For + # instance, there are a couple of (ancient) events in some + # rooms whose senders do not have the correct sigil; these + # cause SynapseErrors in auth.check. We don't want to give up + # the attempt to federate altogether in such cases. + logger.warn( "Rejecting %s because %s", e.event_id, err.msg @@ -1654,19 +1638,15 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def exchange_third_party_invite(self, invite): - sender = invite["sender"] - room_id = invite["room_id"] - - if "signed" not in invite or "token" not in invite["signed"]: - logger.info( - "Discarding received notification of third party invite " - "without signed: %s" % (invite,) - ) - return - + def exchange_third_party_invite( + self, + sender_user_id, + target_user_id, + room_id, + signed, + ): third_party_invite = { - "signed": invite["signed"], + "signed": signed, } event_dict = { @@ -1676,8 +1656,8 @@ class FederationHandler(BaseHandler): "third_party_invite": third_party_invite, }, "room_id": room_id, - "sender": sender, - "state_key": invite["mxid"], + "sender": sender_user_id, + "state_key": target_user_id, } if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): @@ -1690,11 +1670,11 @@ class FederationHandler(BaseHandler): ) self.auth.check(event, context.current_state) - yield self._validate_keyserver(event, auth_events=context.current_state) + yield self._check_signature(event, auth_events=context.current_state) member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event(event, context) + yield member_handler.send_membership_event(None, event, context) else: - destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)]) + destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) yield self.replication_layer.forward_third_party_invite( destinations, room_id, @@ -1715,13 +1695,13 @@ class FederationHandler(BaseHandler): ) self.auth.check(event, auth_events=context.current_state) - yield self._validate_keyserver(event, auth_events=context.current_state) + yield self._check_signature(event, auth_events=context.current_state) returned_invite = yield self.send_invite(origin, event) # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) member_handler = self.hs.get_handlers().room_member_handler - yield member_handler.send_membership_event(event, context) + yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks def add_display_name_to_third_party_invite(self, event_dict, event, context): @@ -1745,17 +1725,69 @@ class FederationHandler(BaseHandler): defer.returnValue((event, context)) @defer.inlineCallbacks - def _validate_keyserver(self, event, auth_events): - token = event.content["third_party_invite"]["signed"]["token"] + def _check_signature(self, event, auth_events): + """ + Checks that the signature in the event is consistent with its invite. + :param event (Event): The m.room.member event to check + :param auth_events (dict<(event type, state_key), event>) + + :raises + AuthError if signature didn't match any keys, or key has been + revoked, + SynapseError if a transient error meant a key couldn't be checked + for revocation. + """ + signed = event.content["third_party_invite"]["signed"] + token = signed["token"] invite_event = auth_events.get( (EventTypes.ThirdPartyInvite, token,) ) + if not invite_event: + raise AuthError(403, "Could not find invite") + + last_exception = None + for public_key_object in self.hs.get_auth().get_public_keys(invite_event): + try: + for server, signature_block in signed["signatures"].items(): + for key_name, encoded_signature in signature_block.items(): + if not key_name.startswith("ed25519:"): + continue + + public_key = public_key_object["public_key"] + verify_key = decode_verify_key_bytes( + key_name, + decode_base64(public_key) + ) + verify_signed_json(signed, server, verify_key) + if "key_validity_url" in public_key_object: + yield self._check_key_revocation( + public_key, + public_key_object["key_validity_url"] + ) + return + except Exception as e: + last_exception = e + raise last_exception + + @defer.inlineCallbacks + def _check_key_revocation(self, public_key, url): + """ + Checks whether public_key has been revoked. + + :param public_key (str): base-64 encoded public key. + :param url (str): Key revocation URL. + + :raises + AuthError if they key has been revoked. + SynapseError if a transient error meant a key couldn't be checked + for revocation. + """ try: response = yield self.hs.get_simple_http_client().get_json( - invite_event.content["key_validity_url"], - {"public_key": invite_event.content["public_key"]} + url, + {"public_key": public_key} ) except Exception: raise SynapseError( |