diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 741 |
1 files changed, 433 insertions, 308 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 483cb8eac6..145c1a21d4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,41 +15,46 @@ # limitations under the License. """Contains handlers for federation events.""" -import synapse.util.logcontext + +import itertools +import logging +import sys + +import six +from six import iteritems, itervalues +from six.moves import http_client, zip + from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 -from ._base import BaseHandler +from twisted.internet import defer -from synapse.api.errors import ( - AuthError, FederationError, StoreError, CodeMessageException, SynapseError, -) from synapse.api.constants import EventTypes, Membership, RejectedReason -from synapse.events.validator import EventValidator -from synapse.util import unwrapFirstError -from synapse.util.logcontext import ( - preserve_fn, preserve_context_over_deferred +from synapse.api.errors import ( + AuthError, + CodeMessageException, + FederationDeniedError, + FederationError, + StoreError, + SynapseError, ) -from synapse.util.metrics import measure_func -from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor, Linearizer -from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( - compute_event_signature, add_hashes_and_signatures, + add_hashes_and_signatures, + compute_event_signature, ) +from synapse.events.validator import EventValidator +from synapse.state import resolve_events_with_factory from synapse.types import UserID, get_domain_from_id - -from synapse.events.utils import prune_event - -from synapse.util.retryutils import NotRetryingDestination - +from synapse.util import logcontext, unwrapFirstError +from synapse.util.async import Linearizer from synapse.util.distributor import user_joined_room +from synapse.util.frozenutils import unfreeze +from synapse.util.logutils import log_function +from synapse.util.retryutils import NotRetryingDestination +from synapse.visibility import filter_events_for_server -from twisted.internet import defer - -import itertools -import logging +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -70,14 +76,16 @@ class FederationHandler(BaseHandler): self.hs = hs self.store = hs.get_datastore() - self.replication_layer = hs.get_replication_layer() + self.replication_layer = hs.get_federation_client() self.state_handler = hs.get_state_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() self.is_mine_id = hs.is_mine_id - - self.replication_layer.set_handler(self) + self.pusher_pool = hs.get_pusherpool() + self.spam_checker = hs.get_spam_checker() + self.event_creation_handler = hs.get_event_creation_handler() + self._server_notices_mxid = hs.config.server_notices_mxid # When joining a room we need to queue any events for that room up self.room_queues = {} @@ -85,7 +93,9 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function - def on_receive_pdu(self, origin, pdu, get_missing=True): + def on_receive_pdu( + self, origin, pdu, get_missing=True, sent_to_us_directly=False, + ): """ Process a PDU received via a federation /send/ transaction, or via backfill of missing prev_events @@ -99,8 +109,10 @@ class FederationHandler(BaseHandler): """ # We reprocess pdus when we have seen them only as outliers - existing = yield self.get_persisted_pdu( - origin, pdu.event_id, do_auth=False + existing = yield self.store.get_event( + pdu.event_id, + allow_none=True, + allow_rejected=True, ) # FIXME: Currently we fetch an event again when we already have it @@ -116,6 +128,19 @@ class FederationHandler(BaseHandler): logger.debug("Already seen pdu %s", pdu.event_id) return + # do some initial sanity-checking of the event. In particular, make + # sure it doesn't have hundreds of prev_events or auth_events, which + # could cause a huge state resolution or cascade of event fetches. + try: + self._sanity_check_event(pdu) + except SynapseError as err: + raise FederationError( + "ERROR", + err.code, + err.msg, + affected=pdu.event_id, + ) + # If we are currently in the process of joining this room, then we # queue up events for later processing. if pdu.room_id in self.room_queues: @@ -124,15 +149,30 @@ class FederationHandler(BaseHandler): self.room_queues[pdu.room_id].append((pdu, origin)) return - state = None - - auth_chain = [] - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] + # If we're no longer in the room just ditch the event entirely. This + # is probably an old server that has come back and thinks we're still + # in the room (or we've been rejoined to the room by a state reset). + # + # If we were never in the room then maybe our database got vaped and + # we should check if we *are* in fact in the room. If we are then we + # can magically rejoin the room. + is_in_room = yield self.auth.check_host_in_room( + pdu.room_id, + self.server_name ) + if not is_in_room: + was_in_room = yield self.store.was_host_joined( + pdu.room_id, self.server_name, + ) + if was_in_room: + logger.info( + "Ignoring PDU %s for room %s from %s as we've left the room!", + pdu.event_id, pdu.room_id, origin, + ) + defer.returnValue(None) - fetch_state = False + state = None + auth_chain = [] # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): @@ -147,7 +187,7 @@ class FederationHandler(BaseHandler): ) prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this @@ -175,8 +215,7 @@ class FederationHandler(BaseHandler): # Update the set of things we've seen after trying to # fetch the missing stuff - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.iterkeys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: logger.info( @@ -189,26 +228,60 @@ class FederationHandler(BaseHandler): list(prevs - seen)[:5], ) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + if sent_to_us_directly and prevs - seen: + # If they have sent it to us directly, and the server + # isn't telling us about the auth events that it's + # made a message referencing, we explode + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, ) - fetch_state = True + elif prevs - seen: + # Calculate the state of the previous events, and + # de-conflict them to find the current state. + state_groups = [] + auth_chains = set() + try: + # Get the state of the events we know about + ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + state_groups.append(ours) + + # Ask the remote server for the states we don't + # know about + for p in prevs - seen: + state, got_auth_chain = ( + yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, p + ) + ) + auth_chains.update(got_auth_chain) + state_group = {(x.type, x.state_key): x.event_id for x in state} + state_groups.append(state_group) + + # Resolve any conflicting state + def fetch(ev_ids): + return self.store.get_events( + ev_ids, get_prev_content=False, check_redacted=False + ) - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.replication_layer.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except: - logger.exception("Failed to get state for event: %s", pdu.event_id) + state_map = yield resolve_events_with_factory( + state_groups, {pdu.event_id: pdu}, fetch + ) + + state = (yield self.store.get_events(state_map.values())).values() + auth_chain = list(auth_chains) + except Exception: + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=pdu.event_id, + ) yield self._process_received_pdu( origin, @@ -227,8 +300,7 @@ class FederationHandler(BaseHandler): min_depth (int): Minimum depth of events to return. """ # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return @@ -287,11 +359,17 @@ class FederationHandler(BaseHandler): for e in missing_events: logger.info("Handling found event %s", e.event_id) - yield self.on_receive_pdu( - origin, - e, - get_missing=False - ) + try: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + except FederationError as e: + if e.code == 403: + logger.warn("Event %s failed history check.") + else: + raise @log_function @defer.inlineCallbacks @@ -340,9 +418,7 @@ class FederationHandler(BaseHandler): if auth_chain: event_ids |= {e.event_id for e in auth_chain} - seen_ids = set( - (yield self.store.have_events(event_ids)).keys() - ) + seen_ids = yield self.store.have_seen_events(event_ids) if state and auth_chain is not None: # If we have any state or auth_chain given to us by the replication @@ -410,7 +486,10 @@ class FederationHandler(BaseHandler): # joined the room. Don't bother if the user is just # changing their profile info. newly_joined = True - prev_state_id = context.prev_state_ids.get( + + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_state_id = prev_state_ids.get( (event.type, event.state_key) ) if prev_state_id: @@ -424,91 +503,21 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - @measure_func("_filter_events_for_server") - @defer.inlineCallbacks - def _filter_events_for_server(self, server_name, room_id, events): - event_to_state_ids = yield self.store.get_state_ids_for_events( - frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), - ) - ) - - # We only want to pull out member events that correspond to the - # server's domain. - - def check_match(id): - try: - return server_name == get_domain_from_id(id) - except: - return False - - # Parses mapping `event_id -> (type, state_key) -> state event_id` - # to get all state ids that we're interested in. - event_map = yield self.store.get_events([ - e_id - for key_to_eid in event_to_state_ids.values() - for key, e_id in key_to_eid.items() - if key[0] != EventTypes.Member or check_match(key[1]) - ]) - - event_to_state = { - e_id: { - key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.items() - if inner_e_id in event_map - } - for e_id, key_to_eid in event_to_state_ids.items() - } - - def redact_disallowed(event, state): - if not state: - return event - - history = state.get((EventTypes.RoomHistoryVisibility, ''), None) - if history: - visibility = history.content.get("history_visibility", "shared") - if visibility in ["invited", "joined"]: - # We now loop through all state events looking for - # membership states for the requesting server to determine - # if the server is either in the room or has been invited - # into the room. - for ev in state.values(): - if ev.type != EventTypes.Member: - continue - try: - domain = get_domain_from_id(ev.state_key) - except: - continue - - if domain != server_name: - continue - - memtype = ev.membership - if memtype == Membership.JOIN: - return event - elif memtype == Membership.INVITE: - if visibility == "invited": - return event - else: - return prune_event(event) - - return event - - defer.returnValue([ - redact_disallowed(e, event_to_state[e.event_id]) - for e in events - ]) - @log_function @defer.inlineCallbacks def backfill(self, dest, room_id, limit, extremities): """ Trigger a backfill request to `dest` for the given `room_id` - This will attempt to get more events from the remote. This may return - be successfull and still return no events if the other side has no new - events to offer. + This will attempt to get more events from the remote. If the other side + has no new events to offer, this will return an empty list. + + As the events are received, we check their signatures, and also do some + sanity-checking on them. If any of the backfilled events are invalid, + this method throws a SynapseError. + + TODO: make this more useful to distinguish failures of the remote + server from invalid events (there is probably no point in trying to + re-fetch invalid events from every other HS in the room.) """ if dest == self.server_name: raise SynapseError(400, "Can't backfill from self.") @@ -520,6 +529,16 @@ class FederationHandler(BaseHandler): extremities=extremities, ) + # ideally we'd sanity check the events here for excess prev_events etc, + # but it's hard to reject events at this point without completely + # breaking backfill in the same way that it is currently broken by + # events whose signature we cannot verify (#3121). + # + # So for now we accept the events anyway. #3124 tracks this. + # + # for ev in events: + # self._sanity_check_event(ev) + # Don't bother processing events we already have. seen_events = yield self.store.have_events_in_timeline( set(e.event_id for e in events) @@ -590,9 +609,10 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch ) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self.replication_layer.get_pdu)( + logcontext.run_in_background( + self.replication_layer.get_pdu, [dest], event_id, outlier=True, @@ -612,7 +632,7 @@ class FederationHandler(BaseHandler): failed_to_fetch = missing_auth - set(auth_events) - seen_events = yield self.store.have_events( + seen_events = yield self.store.have_seen_events( set(auth_events.keys()) | set(state_events.keys()) ) @@ -702,9 +722,19 @@ class FederationHandler(BaseHandler): curr_state = yield self.state_handler.get_current_state(room_id) def get_domains_from_state(state): + """Get joined domains from state + + Args: + state (dict[tuple, FrozenEvent]): State map from type/state + key to event. + + Returns: + list[tuple[str, int]]: Returns a list of servers with the + lowest depth of their joins. Sorted by lowest depth first. + """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -718,7 +748,7 @@ class FederationHandler(BaseHandler): joined_domains[dom] = min(d, old_d) else: joined_domains[dom] = d - except: + except Exception: pass return sorted(joined_domains.items(), key=lambda d: d[1]) @@ -738,7 +768,7 @@ class FederationHandler(BaseHandler): yield self.backfill( dom, room_id, limit=100, - extremities=[e for e in extremities.keys()] + extremities=extremities, ) # If this succeeded then we probably already have the # appropriate stuff. @@ -762,6 +792,9 @@ class FederationHandler(BaseHandler): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e) + continue except Exception as e: logger.exception( "Failed to backfill from %s because %s", @@ -784,38 +817,76 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") - states = yield preserve_context_over_deferred(defer.gatherResults([ - preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) - for e in event_ids - ])) + resolve = logcontext.preserve_fn( + self.state_handler.resolve_state_groups_for_events + ) + states = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [resolve(room_id, [e]) for e in event_ids], + consumeErrors=True, + )) + + # dict[str, dict[tuple, str]], a map from event_id to state map of + # event_ids. states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.values() for e_id in ids], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.items() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.items() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: likely_domains = get_domains_from_state(states[e_id]) success = yield try_backfill([ - dom for dom in likely_domains + dom for dom, _ in likely_domains if dom not in tried_domains ]) if success: defer.returnValue(True) - tried_domains.update(likely_domains) + tried_domains.update(dom for dom, _ in likely_domains) defer.returnValue(False) + def _sanity_check_event(self, ev): + """ + Do some early sanity checks of a received event + + In particular, checks it doesn't have an excessive number of + prev_events or auth_events, which could cause a huge state resolution + or cascade of event fetches. + + Args: + ev (synapse.events.EventBase): event to be checked + + Returns: None + + Raises: + SynapseError if the event does not pass muster + """ + if len(ev.prev_events) > 20: + logger.warn("Rejecting event %s which has %i prev_events", + ev.event_id, len(ev.prev_events)) + raise SynapseError( + http_client.BAD_REQUEST, + "Too many prev_events", + ) + + if len(ev.auth_events) > 10: + logger.warn("Rejecting event %s which has %i auth_events", + ev.event_id, len(ev.auth_events)) + raise SynapseError( + http_client.BAD_REQUEST, + "Too many auth_events", + ) + @defer.inlineCallbacks def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -838,16 +909,6 @@ class FederationHandler(BaseHandler): [auth_id for auth_id, _ in event.auth_events], include_given=True ) - - for event in auth: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - defer.returnValue([e for e in auth]) @log_function @@ -916,7 +977,7 @@ class FederationHandler(BaseHandler): room_creator_user_id="", is_public=False ) - except: + except Exception: # FIXME pass @@ -940,9 +1001,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)( - room_queue - ) + logcontext.run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -982,8 +1041,7 @@ class FederationHandler(BaseHandler): }) try: - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) except AuthError as e: @@ -1051,13 +1109,15 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - state_ids = context.prev_state_ids.values() + prev_state_ids = yield context.get_prev_state_ids(self.store) + + state_ids = list(prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(context.prev_state_ids.values()) + state = yield self.store.get_events(list(prev_state_ids.values())) defer.returnValue({ - "state": state.values(), + "state": list(state.values()), "auth_chain": auth_chain, }) @@ -1069,10 +1129,23 @@ class FederationHandler(BaseHandler): """ event = pdu + if event.state_key is None: + raise SynapseError(400, "The invite event did not have a state key") + is_blocked = yield self.store.is_room_blocked(event.room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") + if self.hs.config.block_non_admin_invites: + raise SynapseError(403, "This server does not accept room invites") + + if not self.spam_checker.user_may_invite( + event.sender, event.state_key, event.room_id, + ): + raise SynapseError( + 403, "This user is not permitted to send invites to this server/user" + ) + membership = event.content.get("membership") if event.type != EventTypes.Member or membership != Membership.INVITE: raise SynapseError(400, "The event was not an m.room.member invite event") @@ -1081,12 +1154,16 @@ class FederationHandler(BaseHandler): if sender_domain != origin: raise SynapseError(400, "The invite event was not from the server sending it") - if event.state_key is None: - raise SynapseError(400, "The invite event did not have a state key") - if not self.is_mine_id(event.state_key): raise SynapseError(400, "The invite event must be for this server") + # block any attempts to invite the server notices mxid + if event.state_key == self._server_notices_mxid: + raise SynapseError( + http_client.FORBIDDEN, + "Cannot invite this user", + ) + event.internal_metadata.outlier = True event.internal_metadata.invite_from_remote = True @@ -1213,8 +1290,7 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -1268,14 +1344,12 @@ class FederationHandler(BaseHandler): def get_state_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) if state_groups: - _, state = state_groups.items().pop() + _, state = list(iteritems(state_groups)).pop() results = { (e.type, e.state_key): e for e in state } @@ -1291,19 +1365,7 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - res = results.values() - for event in res: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - + res = list(results.values()) defer.returnValue(res) else: defer.returnValue([]) @@ -1312,8 +1374,6 @@ class FederationHandler(BaseHandler): def get_state_ids_for_pdu(self, room_id, event_id): """Returns the state at the event. i.e. not including said event. """ - yield run_on_reactor() - state_groups = yield self.store.get_state_groups_ids( room_id, [event_id] ) @@ -1332,7 +1392,7 @@ class FederationHandler(BaseHandler): else: results.pop((event.type, event.state_key), None) - defer.returnValue(results.values()) + defer.returnValue(list(results.values())) else: defer.returnValue([]) @@ -1349,17 +1409,26 @@ class FederationHandler(BaseHandler): limit ) - events = yield self._filter_events_for_server(origin, room_id, events) + events = yield filter_events_for_server(self.store, origin, events) defer.returnValue(events) @defer.inlineCallbacks @log_function - def get_persisted_pdu(self, origin, event_id, do_auth=True): - """ Get a PDU from the database with given origin and id. + def get_persisted_pdu(self, origin, event_id): + """Get an event from the database for the given server. + + Args: + origin [str]: hostname of server which is requesting the event; we + will check that the server is allowed to see it. + event_id [str]: id of the event being requested Returns: - Deferred: Results in a `Pdu`. + Deferred[EventBase|None]: None if we know nothing about the event; + otherwise the (possibly-redacted) event. + + Raises: + AuthError if the server is not currently in the room """ event = yield self.store.get_event( event_id, @@ -1368,32 +1437,17 @@ class FederationHandler(BaseHandler): ) if event: - if self.is_mine_id(event.event_id): - # FIXME: This is a temporary work around where we occasionally - # return events slightly differently than when they were - # originally signed - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - - if do_auth: - in_room = yield self.auth.check_host_in_room( - event.room_id, - origin - ) - if not in_room: - raise AuthError(403, "Host not in room.") - - events = yield self._filter_events_for_server( - origin, event.room_id, [event] - ) - - event = events[0] + in_room = yield self.auth.check_host_in_room( + event.room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + events = yield filter_events_for_server( + self.store, origin, [event], + ) + event = events[0] defer.returnValue(event) else: defer.returnValue(None) @@ -1412,22 +1466,33 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - if not event.internal_metadata.is_outlier(): - yield self.action_generator.handle_push_actions_for_event( - event, context + try: + if not event.internal_metadata.is_outlier() and not backfilled: + yield self.action_generator.handle_push_actions_for_event( + event, context + ) + + event_stream_id, max_stream_id = yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, ) + except: # noqa: E722, as we reraise the exception this is fine. + tp, value, tb = sys.exc_info() - event_stream_id, max_stream_id = yield self.store.persist_event( - event, - context=context, - backfilled=backfilled, - ) + logcontext.run_in_background( + self.store.remove_push_actions_from_staging, + event.event_id, + ) + + six.reraise(tp, value, tb) if not backfilled: # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id + logcontext.run_in_background( + self.pusher_pool.on_new_notifications, + event_stream_id, max_stream_id, ) defer.returnValue((context, event_stream_id, max_stream_id)) @@ -1439,22 +1504,23 @@ class FederationHandler(BaseHandler): a bunch of outliers, but not a chunk of individual events that depend on each other for state calculations. """ - contexts = yield preserve_context_over_deferred(defer.gatherResults( + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ - preserve_fn(self._prep_event)( + logcontext.run_in_background( + self._prep_event, origin, ev_info["event"], state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) for ev_info in event_infos - ] + ], consumeErrors=True, )) yield self.store.persist_events( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) @@ -1574,8 +1640,9 @@ class FederationHandler(BaseHandler): ) if not auth_events: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -1605,7 +1672,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - if event.type == EventTypes.GuestAccess: + if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) defer.returnValue(context) @@ -1635,15 +1702,6 @@ class FederationHandler(BaseHandler): local_auth_chain, remote_auth_chain ) - for event in ret["auth_chain"]: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) @@ -1669,11 +1727,26 @@ class FederationHandler(BaseHandler): min_depth=min_depth, ) + missing_events = yield filter_events_for_server( + self.store, origin, missing_events, + ) + defer.returnValue(missing_events) @defer.inlineCallbacks @log_function def do_auth(self, origin, event, context, auth_events): + """ + + Args: + origin (str): + event (synapse.events.FrozenEvent): + context (synapse.events.snapshot.EventContext): + auth_events (dict[(str, str)->str]): + + Returns: + defer.Deferred[None] + """ # Check if we have all the auth events. current_state = set(e.event_id for e in auth_events.values()) event_auth_events = set(e_id for e_id, _ in event.auth_events) @@ -1684,7 +1757,8 @@ class FederationHandler(BaseHandler): event_key = None if event_auth_events - current_state: - have_events = yield self.store.have_events( + # TODO: can we use store.have_seen_events here instead? + have_events = yield self.store.get_seen_events_with_rejections( event_auth_events - current_state ) else: @@ -1707,12 +1781,12 @@ class FederationHandler(BaseHandler): origin, event.room_id, event.event_id ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): + if e.event_id in seen_remotes: continue if e.event_id == event.event_id: @@ -1739,11 +1813,11 @@ class FederationHandler(BaseHandler): except AuthError: pass - have_events = yield self.store.have_events( + have_events = yield self.store.get_seen_events_with_rejections( [e_id for e_id, _ in event.auth_events] ) seen_events = set(have_events.keys()) - except: + except Exception: # FIXME: logger.exception("Failed to get auth chain") @@ -1756,18 +1830,18 @@ class FederationHandler(BaseHandler): # Do auth conflict res. logger.info("Different auth: %s", different_auth) - different_events = yield preserve_context_over_deferred(defer.gatherResults( - [ - preserve_fn(self.store.get_event)( + different_events = yield logcontext.make_deferred_yieldable( + defer.gatherResults([ + logcontext.run_in_background( + self.store.get_event, d, allow_none=True, allow_rejected=False, ) for d in different_auth if d in have_events and not have_events[d] - ], - consumeErrors=True - )).addErrback(unwrapFirstError) + ], consumeErrors=True) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) @@ -1777,7 +1851,7 @@ class FederationHandler(BaseHandler): }) new_state = self.state_handler.resolve_events( - [local_view.values(), remote_view.values()], + [list(local_view.values()), list(remote_view.values())], event ) @@ -1786,16 +1860,9 @@ class FederationHandler(BaseHandler): current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update({ - k: a.event_id for k, a in auth_events.items() - if k != event_key - }) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ - k: a.event_id for k, a in auth_events.items() - }) - context.state_group = self.store.get_next_state_group() + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, + ) if different_auth and not event.internal_metadata.is_outlier(): logger.info("Different auth after resolution: %s", different_auth) @@ -1815,9 +1882,10 @@ class FederationHandler(BaseHandler): break if do_resolution: + prev_state_ids = yield context.get_prev_state_ids(self.store) # 1. Get what we think is the auth chain. auth_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids + event, prev_state_ids ) local_auth_chain = yield self.store.get_auth_chain( auth_ids, include_given=True @@ -1832,13 +1900,13 @@ class FederationHandler(BaseHandler): local_auth_chain, ) - seen_remotes = yield self.store.have_events( + seen_remotes = yield self.store.have_seen_events( [e.event_id for e in result["auth_chain"]] ) # 3. Process any remote auth chain events we haven't seen. for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): + if ev.event_id in seen_remotes: continue if ev.event_id == event.event_id: @@ -1868,23 +1936,16 @@ class FederationHandler(BaseHandler): except AuthError: pass - except: + except Exception: # FIXME: logger.exception("Failed to query auth chain") # 4. Look at rejects and their proofs. # TODO. - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update({ - k: a.event_id for k, a in auth_events.items() - if k != event_key - }) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ - k: a.event_id for k, a in auth_events.items() - }) - context.state_group = self.store.get_next_state_group() + yield self._update_context_for_auth_events( + event, context, auth_events, event_key, + ) try: self.auth.check(event, auth_events=auth_events) @@ -1893,6 +1954,58 @@ class FederationHandler(BaseHandler): raise e @defer.inlineCallbacks + def _update_context_for_auth_events(self, event, context, auth_events, + event_key): + """Update the state_ids in an event context after auth event resolution, + storing the changes as a new state group. + + Args: + event (Event): The event we're handling the context for + + context (synapse.events.snapshot.EventContext): event context + to be updated + + auth_events (dict[(str, str)->str]): Events to update in the event + context. + + event_key ((str, str)): (type, state_key) for the current event. + this will not be included in the current_state in the context. + """ + state_updates = { + k: a.event_id for k, a in iteritems(auth_events) + if k != event_key + } + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ + k: a.event_id for k, a in iteritems(auth_events) + }) + + # create a new state group as a delta from the existing one. + prev_group = context.state_group + state_group = yield self.store.store_state_group( + event.event_id, + event.room_id, + prev_group=prev_group, + delta_ids=state_updates, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=state_updates, + ) + + @defer.inlineCallbacks def construct_auth_difference(self, local_auth, remote_auth): """ Given a local and remote auth chain, find the differences. This assumes that we have already processed all events in remote_auth @@ -1934,8 +2047,8 @@ class FederationHandler(BaseHandler): def get_next(it, opt=None): try: - return it.next() - except: + return next(it) + except Exception: return opt current_local = get_next(local_iter) @@ -2060,8 +2173,7 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder ) @@ -2076,7 +2188,7 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) - member_handler = self.hs.get_handlers().room_member_handler + member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) else: destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id)) @@ -2089,10 +2201,17 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks @log_function def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): + """Handle an exchange_third_party_invite request from a remote server + + The remote server will call this when it wants to turn a 3pid invite + into a normal m.room.member invite. + + Returns: + Deferred: resolves (to None) + """ builder = self.event_builder_factory.new(event_dict) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event( + event, context = yield self.event_creation_handler.create_new_client_event( builder=builder, ) @@ -2107,10 +2226,13 @@ class FederationHandler(BaseHandler): raise e yield self._check_signature(event, context) + # XXX we send the invite here, but send_membership_event also sends it, + # so we end up making two requests. I think this is redundant. returned_invite = yield self.send_invite(origin, event) # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) - member_handler = self.hs.get_handlers().room_member_handler + + member_handler = self.hs.get_room_member_handler() yield member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks @@ -2120,7 +2242,8 @@ class FederationHandler(BaseHandler): event.content["third_party_invite"]["signed"]["token"] ) original_invite = None - original_invite_id = context.prev_state_ids.get(key) + prev_state_ids = yield context.get_prev_state_ids(self.store) + original_invite_id = prev_state_ids.get(key) if original_invite_id: original_invite = yield self.store.get_event( original_invite_id, allow_none=True @@ -2139,8 +2262,9 @@ class FederationHandler(BaseHandler): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - message_handler = self.hs.get_handlers().message_handler - event, context = yield message_handler._create_new_client_event(builder=builder) + event, context = yield self.event_creation_handler.create_new_client_event( + builder=builder, + ) defer.returnValue((event, context)) @defer.inlineCallbacks @@ -2161,7 +2285,8 @@ class FederationHandler(BaseHandler): signed = event.content["third_party_invite"]["signed"] token = signed["token"] - invite_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + invite_event_id = prev_state_ids.get( (EventTypes.ThirdPartyInvite, token,) ) |