diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 6c19d6ae8c..f599e817aa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,9 @@
# limitations under the License.
"""Contains handlers for federation events."""
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@@ -221,19 +224,11 @@ class FederationHandler(BaseHandler):
extra_users.append(target_user)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
prev_state = context.current_state.get((event.type, event.state_key))
@@ -244,12 +239,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- if not backfilled and not event.internal_metadata.is_outlier():
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, self
- )
-
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(
@@ -483,7 +472,7 @@ class FederationHandler(BaseHandler):
limit=100,
extremities=[e for e in extremities.keys()]
)
- except SynapseError:
+ except SynapseError as e:
logger.info(
"Failed to backfill from %s because %s",
dom, e,
@@ -643,19 +632,11 @@ class FederationHandler(BaseHandler):
)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[joinee]
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
@@ -730,18 +711,10 @@ class FederationHandler(BaseHandler):
extra_users.append(target_user)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
user = UserID.from_string(event.state_key)
@@ -811,19 +784,11 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(event.state_key)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=[target_user],
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
defer.returnValue(event)
@defer.inlineCallbacks
@@ -848,7 +813,22 @@ class FederationHandler(BaseHandler):
target_hosts,
signed_event
)
- defer.returnValue(None)
+
+ context = yield self.state_handler.compute_event_context(event)
+
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event,
+ context=context,
+ backfilled=False,
+ )
+
+ target_user = UserID.from_string(event.state_key)
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[target_user],
+ )
+
+ defer.returnValue(event)
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
@@ -948,18 +928,10 @@ class FederationHandler(BaseHandler):
extra_users.append(target_user)
with PreserveLoggingContext():
- d = self.notifier.on_new_room_event(
+ self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id, extra_users=extra_users
)
- def log_failure(f):
- logger.warn(
- "Failed to notify about %s: %s",
- event.event_id, f.value
- )
-
- d.addErrback(log_failure)
-
new_pdu = event
destinations = set()
@@ -1113,6 +1085,12 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
+ if not backfilled and not event.internal_metadata.is_outlier():
+ action_generator = ActionGenerator(self.hs)
+ yield action_generator.handle_push_actions_for_event(
+ event, context, self
+ )
+
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
@@ -1186,7 +1164,13 @@ class FederationHandler(BaseHandler):
try:
self.auth.check(e, auth_events=auth_for_e)
- except AuthError as err:
+ except SynapseError as err:
+ # we may get SynapseErrors here as well as AuthErrors. For
+ # instance, there are a couple of (ancient) events in some
+ # rooms whose senders do not have the correct sigil; these
+ # cause SynapseErrors in auth.check. We don't want to give up
+ # the attempt to federate altogether in such cases.
+
logger.warn(
"Rejecting %s because %s",
e.event_id, err.msg
@@ -1654,19 +1638,15 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def exchange_third_party_invite(self, invite):
- sender = invite["sender"]
- room_id = invite["room_id"]
-
- if "signed" not in invite or "token" not in invite["signed"]:
- logger.info(
- "Discarding received notification of third party invite "
- "without signed: %s" % (invite,)
- )
- return
-
+ def exchange_third_party_invite(
+ self,
+ sender_user_id,
+ target_user_id,
+ room_id,
+ signed,
+ ):
third_party_invite = {
- "signed": invite["signed"],
+ "signed": signed,
}
event_dict = {
@@ -1676,8 +1656,8 @@ class FederationHandler(BaseHandler):
"third_party_invite": third_party_invite,
},
"room_id": room_id,
- "sender": sender,
- "state_key": invite["mxid"],
+ "sender": sender_user_id,
+ "state_key": target_user_id,
}
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
@@ -1690,11 +1670,11 @@ class FederationHandler(BaseHandler):
)
self.auth.check(event, context.current_state)
- yield self._validate_keyserver(event, auth_events=context.current_state)
+ yield self._check_signature(event, auth_events=context.current_state)
member_handler = self.hs.get_handlers().room_member_handler
- yield member_handler.send_membership_event(event, context)
+ yield member_handler.send_membership_event(None, event, context)
else:
- destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
+ destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
yield self.replication_layer.forward_third_party_invite(
destinations,
room_id,
@@ -1715,13 +1695,13 @@ class FederationHandler(BaseHandler):
)
self.auth.check(event, auth_events=context.current_state)
- yield self._validate_keyserver(event, auth_events=context.current_state)
+ yield self._check_signature(event, auth_events=context.current_state)
returned_invite = yield self.send_invite(origin, event)
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
member_handler = self.hs.get_handlers().room_member_handler
- yield member_handler.send_membership_event(event, context)
+ yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
def add_display_name_to_third_party_invite(self, event_dict, event, context):
@@ -1745,17 +1725,69 @@ class FederationHandler(BaseHandler):
defer.returnValue((event, context))
@defer.inlineCallbacks
- def _validate_keyserver(self, event, auth_events):
- token = event.content["third_party_invite"]["signed"]["token"]
+ def _check_signature(self, event, auth_events):
+ """
+ Checks that the signature in the event is consistent with its invite.
+ :param event (Event): The m.room.member event to check
+ :param auth_events (dict<(event type, state_key), event>)
+
+ :raises
+ AuthError if signature didn't match any keys, or key has been
+ revoked,
+ SynapseError if a transient error meant a key couldn't be checked
+ for revocation.
+ """
+ signed = event.content["third_party_invite"]["signed"]
+ token = signed["token"]
invite_event = auth_events.get(
(EventTypes.ThirdPartyInvite, token,)
)
+ if not invite_event:
+ raise AuthError(403, "Could not find invite")
+
+ last_exception = None
+ for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
+ try:
+ for server, signature_block in signed["signatures"].items():
+ for key_name, encoded_signature in signature_block.items():
+ if not key_name.startswith("ed25519:"):
+ continue
+
+ public_key = public_key_object["public_key"]
+ verify_key = decode_verify_key_bytes(
+ key_name,
+ decode_base64(public_key)
+ )
+ verify_signed_json(signed, server, verify_key)
+ if "key_validity_url" in public_key_object:
+ yield self._check_key_revocation(
+ public_key,
+ public_key_object["key_validity_url"]
+ )
+ return
+ except Exception as e:
+ last_exception = e
+ raise last_exception
+
+ @defer.inlineCallbacks
+ def _check_key_revocation(self, public_key, url):
+ """
+ Checks whether public_key has been revoked.
+
+ :param public_key (str): base-64 encoded public key.
+ :param url (str): Key revocation URL.
+
+ :raises
+ AuthError if they key has been revoked.
+ SynapseError if a transient error meant a key couldn't be checked
+ for revocation.
+ """
try:
response = yield self.hs.get_simple_http_client().get_json(
- invite_event.content["key_validity_url"],
- {"public_key": invite_event.content["public_key"]}
+ url,
+ {"public_key": public_key}
)
except Exception:
raise SynapseError(
|