diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 996bfd0e23..f39233d846 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,24 +15,30 @@
# limitations under the License.
"""Contains handlers for federation events."""
+
+import itertools
+import logging
+import sys
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
+import six
+from six.moves import http_client
+from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
from synapse.api.errors import (
AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ FederationDeniedError,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import (
- PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
-)
+from synapse.util import unwrapFirstError, logcontext
from synapse.util.metrics import measure_func
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, Linearizer
from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
compute_event_signature, add_hashes_and_signatures,
@@ -42,13 +49,8 @@ from synapse.events.utils import prune_event
from synapse.util.retryutils import NotRetryingDestination
-from synapse.push.action_generator import ActionGenerator
from synapse.util.distributor import user_joined_room
-from twisted.internet import defer
-
-import itertools
-import logging
logger = logging.getLogger(__name__)
@@ -70,38 +72,268 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_replication_layer()
+ self.replication_layer = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
-
- self.replication_layer.set_handler(self)
+ self.action_generator = hs.get_action_generator()
+ self.is_mine_id = hs.is_mine_id
+ self.pusher_pool = hs.get_pusherpool()
+ self.spam_checker = hs.get_spam_checker()
+ self.event_creation_handler = hs.get_event_creation_handler()
# When joining a room we need to queue any events for that room up
self.room_queues = {}
+ self._room_pdu_linearizer = Linearizer("fed_room_pdu")
- @log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
- """ Called by the ReplicationLayer when we have a new pdu. We need to
- do auth checks and put it through the StateHandler.
+ @log_function
+ def on_receive_pdu(self, origin, pdu, get_missing=True):
+ """ Process a PDU received via a federation /send/ transaction, or
+ via backfill of missing prev_events
+
+ Args:
+ origin (str): server which initiated the /send/ transaction. Will
+ be used to fetch missing events or state.
+ pdu (FrozenEvent): received PDU
+ get_missing (bool): True if we should fetch missing prev_events
- auth_chain and state are None if we already have the necessary state
- and prev_events in the db
+ Returns (Deferred): completes with None
"""
- event = pdu
- logger.debug("Got event: %s", event.event_id)
+ # We reprocess pdus when we have seen them only as outliers
+ existing = yield self.get_persisted_pdu(
+ origin, pdu.event_id, do_auth=False
+ )
+
+ # FIXME: Currently we fetch an event again when we already have it
+ # if it has been marked as an outlier.
+
+ already_seen = (
+ existing and (
+ not existing.internal_metadata.is_outlier()
+ or pdu.internal_metadata.is_outlier()
+ )
+ )
+ if already_seen:
+ logger.debug("Already seen pdu %s", pdu.event_id)
+ return
+
+ # do some initial sanity-checking of the event. In particular, make
+ # sure it doesn't have hundreds of prev_events or auth_events, which
+ # could cause a huge state resolution or cascade of event fetches.
+ try:
+ self._sanity_check_event(pdu)
+ except SynapseError as err:
+ raise FederationError(
+ "ERROR",
+ err.code,
+ err.msg,
+ affected=pdu.event_id,
+ )
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if event.room_id in self.room_queues:
- self.room_queues[event.room_id].append((pdu, origin))
+ if pdu.room_id in self.room_queues:
+ logger.info("Ignoring PDU %s for room %s from %s for now; join "
+ "in progress", pdu.event_id, pdu.room_id, origin)
+ self.room_queues[pdu.room_id].append((pdu, origin))
return
- logger.debug("Processing event: %s", event.event_id)
+ # If we're no longer in the room just ditch the event entirely. This
+ # is probably an old server that has come back and thinks we're still
+ # in the room (or we've been rejoined to the room by a state reset).
+ #
+ # If we were never in the room then maybe our database got vaped and
+ # we should check if we *are* in fact in the room. If we are then we
+ # can magically rejoin the room.
+ is_in_room = yield self.auth.check_host_in_room(
+ pdu.room_id,
+ self.server_name
+ )
+ if not is_in_room:
+ was_in_room = yield self.store.was_host_joined(
+ pdu.room_id, self.server_name,
+ )
+ if was_in_room:
+ logger.info(
+ "Ignoring PDU %s for room %s from %s as we've left the room!",
+ pdu.event_id, pdu.room_id, origin,
+ )
+ return
+
+ state = None
+
+ auth_chain = []
+
+ fetch_state = False
+
+ # Get missing pdus if necessary.
+ if not pdu.internal_metadata.is_outlier():
+ # We only backfill backwards to the min depth.
+ min_depth = yield self.get_min_depth_for_context(
+ pdu.room_id
+ )
+
+ logger.debug(
+ "_handle_new_pdu min_depth for %s: %d",
+ pdu.room_id, min_depth
+ )
+
+ prevs = {e_id for e_id, _ in pdu.prev_events}
+ seen = yield self.store.have_seen_events(prevs)
+
+ if min_depth and pdu.depth < min_depth:
+ # This is so that we don't notify the user about this
+ # message, to work around the fact that some events will
+ # reference really really old events we really don't want to
+ # send to the clients.
+ pdu.internal_metadata.outlier = True
+ elif min_depth and pdu.depth > min_depth:
+ if get_missing and prevs - seen:
+ # If we're missing stuff, ensure we only fetch stuff one
+ # at a time.
+ logger.info(
+ "Acquiring lock for room %r to fetch %d missing events: %r...",
+ pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ )
+ with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+ logger.info(
+ "Acquired lock for room %r to fetch %d missing events",
+ pdu.room_id, len(prevs - seen),
+ )
+
+ yield self._get_missing_events_for_pdu(
+ origin, pdu, prevs, min_depth
+ )
+
+ # Update the set of things we've seen after trying to
+ # fetch the missing stuff
+ seen = yield self.store.have_seen_events(prevs)
+
+ if not prevs - seen:
+ logger.info(
+ "Found all missing prev events for %s", pdu.event_id
+ )
+ elif prevs - seen:
+ logger.info(
+ "Not fetching %d missing events for room %r,event %s: %r...",
+ len(prevs - seen), pdu.room_id, pdu.event_id,
+ list(prevs - seen)[:5],
+ )
+
+ if prevs - seen:
+ logger.info(
+ "Still missing %d events for room %r: %r...",
+ len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ )
+ fetch_state = True
- logger.debug("Event: %s", event)
+ if fetch_state:
+ # We need to get the state at this event, since we haven't
+ # processed all the prev events.
+ logger.debug(
+ "_handle_new_pdu getting state for %s",
+ pdu.room_id
+ )
+ try:
+ state, auth_chain = yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, pdu.event_id,
+ )
+ except Exception:
+ logger.exception("Failed to get state for event: %s", pdu.event_id)
+
+ yield self._process_received_pdu(
+ origin,
+ pdu,
+ state=state,
+ auth_chain=auth_chain,
+ )
+
+ @defer.inlineCallbacks
+ def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+ """
+ Args:
+ origin (str): Origin of the pdu. Will be called to get the missing events
+ pdu: received pdu
+ prevs (set(str)): List of event ids which we are missing
+ min_depth (int): Minimum depth of events to return.
+ """
+ # We recalculate seen, since it may have changed.
+ seen = yield self.store.have_seen_events(prevs)
+
+ if not prevs - seen:
+ return
+
+ latest = yield self.store.get_latest_event_ids_in_room(
+ pdu.room_id
+ )
+
+ # We add the prev events that we have seen to the latest
+ # list to ensure the remote server doesn't give them to us
+ latest = set(latest)
+ latest |= seen
+
+ logger.info(
+ "Missing %d events for room %r pdu %s: %r...",
+ len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+ )
+
+ # XXX: we set timeout to 10s to help workaround
+ # https://github.com/matrix-org/synapse/issues/1733.
+ # The reason is to avoid holding the linearizer lock
+ # whilst processing inbound /send transactions, causing
+ # FDs to stack up and block other inbound transactions
+ # which empirically can currently take up to 30 minutes.
+ #
+ # N.B. this explicitly disables retry attempts.
+ #
+ # N.B. this also increases our chances of falling back to
+ # fetching fresh state for the room if the missing event
+ # can't be found, which slightly reduces our security.
+ # it may also increase our DAG extremity count for the room,
+ # causing additional state resolution? See #1760.
+ # However, fetching state doesn't hold the linearizer lock
+ # apparently.
+ #
+ # see https://github.com/matrix-org/synapse/pull/1744
+
+ missing_events = yield self.replication_layer.get_missing_events(
+ origin,
+ pdu.room_id,
+ earliest_events_ids=list(latest),
+ latest_events=[pdu],
+ limit=10,
+ min_depth=min_depth,
+ timeout=10000,
+ )
+
+ logger.info(
+ "Got %d events: %r...",
+ len(missing_events), [e.event_id for e in missing_events[:5]]
+ )
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ missing_events.sort(key=lambda x: x.depth)
+
+ for e in missing_events:
+ logger.info("Handling found event %s", e.event_id)
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+
+ @log_function
+ @defer.inlineCallbacks
+ def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ """ Called when we have a new pdu. We need to do auth checks and put it
+ through the StateHandler.
+ """
+ event = pdu
+
+ logger.debug("Processing event: %s", event)
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -140,9 +372,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -181,13 +411,6 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- # if we're receiving valid events from an origin,
- # it's probably a good idea to mark it as not in retry-state
- # for sending (although this is a bit of a leap)
- retry_timings = yield self.store.get_destination_retry_timings(origin)
- if retry_timings and retry_timings["retry_last_ts"]:
- self.store.set_destination_retry_timings(origin, 0, 0)
-
room = yield self.store.get_room(event.room_id)
if not room:
@@ -206,11 +429,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -249,7 +471,7 @@ class FederationHandler(BaseHandler):
def check_match(id):
try:
return server_name == get_domain_from_id(id)
- except:
+ except Exception:
return False
# Parses mapping `event_id -> (type, state_key) -> state event_id`
@@ -287,7 +509,7 @@ class FederationHandler(BaseHandler):
continue
try:
domain = get_domain_from_id(ev.state_key)
- except:
+ except Exception:
continue
if domain != server_name:
@@ -314,9 +536,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
- This will attempt to get more events from the remote. This may return
- be successfull and still return no events if the other side has no new
- events to offer.
+ This will attempt to get more events from the remote. If the other side
+ has no new events to offer, this will return an empty list.
+
+ As the events are received, we check their signatures, and also do some
+ sanity-checking on them. If any of the backfilled events are invalid,
+ this method throws a SynapseError.
+
+ TODO: make this more useful to distinguish failures of the remote
+ server from invalid events (there is probably no point in trying to
+ re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -328,6 +557,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # ideally we'd sanity check the events here for excess prev_events etc,
+ # but it's hard to reject events at this point without completely
+ # breaking backfill in the same way that it is currently broken by
+ # events whose signature we cannot verify (#3121).
+ #
+ # So for now we accept the events anyway. #3124 tracks this.
+ #
+ # for ev in events:
+ # self._sanity_check_event(ev)
+
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -398,9 +637,10 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
- results = yield preserve_context_over_deferred(defer.gatherResults(
+ results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self.replication_layer.get_pdu)(
+ logcontext.run_in_background(
+ self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
@@ -420,7 +660,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
- seen_events = yield self.store.have_events(
+ seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -526,7 +766,7 @@ class FederationHandler(BaseHandler):
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
- except:
+ except Exception:
pass
return sorted(joined_domains.items(), key=lambda d: d[1])
@@ -570,6 +810,9 @@ class FederationHandler(BaseHandler):
except NotRetryingDestination as e:
logger.info(e.message)
continue
+ except FederationDeniedError as e:
+ logger.info(e)
+ continue
except Exception as e:
logger.exception(
"Failed to backfill from %s because %s",
@@ -592,10 +835,13 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
- states = yield preserve_context_over_deferred(defer.gatherResults([
- preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
- for e in event_ids
- ]))
+ resolve = logcontext.preserve_fn(
+ self.state_handler.resolve_state_groups_for_events
+ )
+ states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [resolve(room_id, [e]) for e in event_ids],
+ consumeErrors=True,
+ ))
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
@@ -624,6 +870,38 @@ class FederationHandler(BaseHandler):
defer.returnValue(False)
+ def _sanity_check_event(self, ev):
+ """
+ Do some early sanity checks of a received event
+
+ In particular, checks it doesn't have an excessive number of
+ prev_events or auth_events, which could cause a huge state resolution
+ or cascade of event fetches.
+
+ Args:
+ ev (synapse.events.EventBase): event to be checked
+
+ Returns: None
+
+ Raises:
+ SynapseError if the event does not pass muster
+ """
+ if len(ev.prev_events) > 20:
+ logger.warn("Rejecting event %s which has %i prev_events",
+ ev.event_id, len(ev.prev_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many prev_events",
+ )
+
+ if len(ev.auth_events) > 10:
+ logger.warn("Rejecting event %s which has %i auth_events",
+ ev.event_id, len(ev.auth_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many auth_events",
+ )
+
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -641,7 +919,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def on_event_auth(self, event_id):
- auth = yield self.store.get_auth_chain([event_id])
+ event = yield self.store.get_event(event_id)
+ auth = yield self.store.get_auth_chain(
+ [auth_id for auth_id, _ in event.auth_events],
+ include_given=True
+ )
for event in auth:
event.signatures.update(
@@ -670,8 +952,6 @@ class FederationHandler(BaseHandler):
"""
logger.debug("Joining %s to %s", joinee, room_id)
- yield self.store.clean_room_for_join(room_id)
-
origin, event = yield self._make_and_verify_event(
target_hosts,
room_id,
@@ -680,7 +960,15 @@ class FederationHandler(BaseHandler):
content,
)
+ # This shouldn't happen, because the RoomMemberHandler has a
+ # linearizer lock which only allows one operation per user per room
+ # at a time - so this is just paranoia.
+ assert (room_id not in self.room_queues)
+
self.room_queues[room_id] = []
+
+ yield self.store.clean_room_for_join(room_id)
+
handled_events = set()
try:
@@ -714,7 +1002,7 @@ class FederationHandler(BaseHandler):
room_creator_user_id="",
is_public=False
)
- except:
+ except Exception:
# FIXME
pass
@@ -722,29 +1010,45 @@ class FederationHandler(BaseHandler):
origin, auth_chain, state, event
)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[joinee]
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[joinee]
+ )
logger.debug("Finished joining %s to %s", joinee, room_id)
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
- for p, origin in room_queue:
- if p.event_id in handled_events:
- continue
+ # we don't need to wait for the queued events to be processed -
+ # it's just a best-effort thing at this point. We do want to do
+ # them roughly in order, though, otherwise we'll end up making
+ # lots of requests for missing prev_events which we do actually
+ # have. Hence we fire off the deferred, but don't wait for it.
- try:
- self.on_receive_pdu(origin, p)
- except:
- logger.exception("Couldn't handle pdu")
+ logcontext.run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True)
@defer.inlineCallbacks
+ def _handle_queued_pdus(self, room_queue):
+ """Process PDUs which got queued up while we were busy send_joining.
+
+ Args:
+ room_queue (list[FrozenEvent, str]): list of PDUs to be processed
+ and the servers that sent them
+ """
+ for p, origin in room_queue:
+ try:
+ logger.info("Processing queued PDU %s which was received "
+ "while we were joining %s", p.event_id, p.room_id)
+ yield self.on_receive_pdu(origin, p)
+ except Exception as e:
+ logger.warn(
+ "Error handling queued PDU %s from %s: %s",
+ p.event_id, origin, e)
+
+ @defer.inlineCallbacks
@log_function
def on_make_join_request(self, room_id, user_id):
""" We've received a /make_join/ request, so we create a partial
@@ -762,8 +1066,7 @@ class FederationHandler(BaseHandler):
})
try:
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
except AuthError as e:
@@ -791,9 +1094,19 @@ class FederationHandler(BaseHandler):
)
event.internal_metadata.outlier = False
- # Send this event on behalf of the origin server since they may not
- # have an up to data view of the state of the room at this event so
- # will not know which servers to send the event to.
+ # Send this event on behalf of the origin server.
+ #
+ # The reasons we have the destination server rather than the origin
+ # server send it are slightly mysterious: the origin server should have
+ # all the neccessary state once it gets the response to the send_join,
+ # so it could send the event itself if it wanted to. It may be that
+ # doing it this way reduces failure modes, or avoids certain attacks
+ # where a new server selectively tells a subset of the federation that
+ # it has joined.
+ #
+ # The fact is that, as of the current writing, Synapse doesn't send out
+ # the join event over federation after joining, and changing it now
+ # would introduce the danger of backwards-compatibility problems.
event.internal_metadata.send_on_behalf_of = origin
context, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -812,10 +1125,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
@@ -823,9 +1135,7 @@ class FederationHandler(BaseHandler):
yield user_joined_room(self.distributor, user, event.room_id)
state_ids = context.prev_state_ids.values()
- auth_chain = yield self.store.get_auth_chain(set(
- [event.event_id] + state_ids
- ))
+ auth_chain = yield self.store.get_auth_chain(state_ids)
state = yield self.store.get_events(context.prev_state_ids.values())
@@ -842,6 +1152,34 @@ class FederationHandler(BaseHandler):
"""
event = pdu
+ if event.state_key is None:
+ raise SynapseError(400, "The invite event did not have a state key")
+
+ is_blocked = yield self.store.is_room_blocked(event.room_id)
+ if is_blocked:
+ raise SynapseError(403, "This room has been blocked on this server")
+
+ if self.hs.config.block_non_admin_invites:
+ raise SynapseError(403, "This server does not accept room invites")
+
+ if not self.spam_checker.user_may_invite(
+ event.sender, event.state_key, event.room_id,
+ ):
+ raise SynapseError(
+ 403, "This user is not permitted to send invites to this server/user"
+ )
+
+ membership = event.content.get("membership")
+ if event.type != EventTypes.Member or membership != Membership.INVITE:
+ raise SynapseError(400, "The event was not an m.room.member invite event")
+
+ sender_domain = get_domain_from_id(event.sender)
+ if sender_domain != origin:
+ raise SynapseError(400, "The invite event was not from the server sending it")
+
+ if not self.is_mine_id(event.state_key):
+ raise SynapseError(400, "The invite event must be for this server")
+
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True
@@ -861,48 +1199,38 @@ class FederationHandler(BaseHandler):
)
target_user = UserID.from_string(event.state_key)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=[target_user],
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=[target_user],
+ )
defer.returnValue(event)
@defer.inlineCallbacks
def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
- try:
- origin, event = yield self._make_and_verify_event(
- target_hosts,
- room_id,
- user_id,
- "leave"
- )
- signed_event = self._sign_event(event)
- except SynapseError:
- raise
- except CodeMessageException as e:
- logger.warn("Failed to reject invite: %s", e)
- raise SynapseError(500, "Failed to reject invite")
-
- # Try the host we successfully got a response to /make_join/
- # request first.
+ origin, event = yield self._make_and_verify_event(
+ target_hosts,
+ room_id,
+ user_id,
+ "leave"
+ )
+ # Mark as outlier as we don't have any state for this event; we're not
+ # even in the room.
+ event.internal_metadata.outlier = True
+ event = self._sign_event(event)
+
+ # Try the host that we succesfully called /make_leave/ on first for
+ # the /send_leave/ request.
try:
target_hosts.remove(origin)
target_hosts.insert(0, origin)
except ValueError:
pass
- try:
- yield self.replication_layer.send_leave(
- target_hosts,
- signed_event
- )
- except SynapseError:
- raise
- except CodeMessageException as e:
- logger.warn("Failed to reject invite: %s", e)
- raise SynapseError(500, "Failed to reject invite")
+ yield self.replication_layer.send_leave(
+ target_hosts,
+ event
+ )
context = yield self.state_handler.compute_event_context(event)
@@ -978,8 +1306,7 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -1023,10 +1350,9 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id, extra_users=extra_users
- )
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id, extra_users=extra_users
+ )
defer.returnValue(None)
@@ -1061,7 +1387,7 @@ class FederationHandler(BaseHandler):
for event in res:
# We sign these again because there was a bug where we
# incorrectly signed things the first time round
- if self.hs.is_mine_id(event.event_id):
+ if self.is_mine_id(event.event_id):
event.signatures.update(
compute_event_signature(
event,
@@ -1096,7 +1422,7 @@ class FederationHandler(BaseHandler):
if prev_id != event.event_id:
results[(event.type, event.state_key)] = prev_id
else:
- del results[(event.type, event.state_key)]
+ results.pop((event.type, event.state_key), None)
defer.returnValue(results.values())
else:
@@ -1134,7 +1460,7 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.hs.is_mine_id(event.event_id):
+ if self.is_mine_id(event.event_id):
# FIXME: This is a temporary work around where we occasionally
# return events slightly differently than when they were
# originally signed
@@ -1178,23 +1504,33 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not event.internal_metadata.is_outlier():
- action_generator = ActionGenerator(self.hs)
- yield action_generator.handle_push_actions_for_event(
- event, context
+ try:
+ if not event.internal_metadata.is_outlier() and not backfilled:
+ yield self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event,
+ context=context,
+ backfilled=backfilled,
)
+ except: # noqa: E722, as we reraise the exception this is fine.
+ tp, value, tb = sys.exc_info()
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- backfilled=backfilled,
- )
+ logcontext.run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
+
+ six.reraise(tp, value, tb)
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
- event_stream_id, max_stream_id
+ logcontext.run_in_background(
+ self.pusher_pool.on_new_notifications,
+ event_stream_id, max_stream_id,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1206,16 +1542,17 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
- contexts = yield preserve_context_over_deferred(defer.gatherResults(
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self._prep_event)(
+ logcontext.run_in_background(
+ self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
- ]
+ ], consumeErrors=True,
))
yield self.store.persist_events(
@@ -1325,7 +1662,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _prep_event(self, origin, event, state=None, auth_events=None):
+ """
+
+ Args:
+ origin:
+ event:
+ state:
+ auth_events:
+ Returns:
+ Deferred, which resolves to synapse.events.snapshot.EventContext
+ """
context = yield self.state_handler.compute_event_context(
event, old_state=state,
)
@@ -1362,7 +1709,7 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
- if event.type == EventTypes.GuestAccess:
+ if event.type == EventTypes.GuestAccess and not context.rejected:
yield self.maybe_kick_guest_users(event)
defer.returnValue(context)
@@ -1379,7 +1726,11 @@ class FederationHandler(BaseHandler):
pass
# Now get the current auth_chain for the event.
- local_auth_chain = yield self.store.get_auth_chain([event_id])
+ event = yield self.store.get_event(event_id)
+ local_auth_chain = yield self.store.get_auth_chain(
+ [auth_id for auth_id, _ in event.auth_events],
+ include_given=True
+ )
# TODO: Check if we would now reject event_id. If so we need to tell
# everyone.
@@ -1427,6 +1778,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def do_auth(self, origin, event, context, auth_events):
+ """
+
+ Args:
+ origin (str):
+ event (synapse.events.FrozenEvent):
+ context (synapse.events.snapshot.EventContext):
+ auth_events (dict[(str, str)->str]):
+
+ Returns:
+ defer.Deferred[None]
+ """
# Check if we have all the auth events.
current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(e_id for e_id, _ in event.auth_events)
@@ -1437,7 +1799,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
- have_events = yield self.store.have_events(
+ # TODO: can we use store.have_seen_events here instead?
+ have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@@ -1460,12 +1823,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
+ if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@@ -1492,11 +1855,11 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- have_events = yield self.store.have_events(
+ have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
- except:
+ except Exception:
# FIXME:
logger.exception("Failed to get auth chain")
@@ -1509,18 +1872,18 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
- different_events = yield preserve_context_over_deferred(defer.gatherResults(
- [
- preserve_fn(self.store.get_event)(
+ different_events = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults([
+ logcontext.run_in_background(
+ self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
)
for d in different_auth
if d in have_events and not have_events[d]
- ],
- consumeErrors=True
- )).addErrback(unwrapFirstError)
+ ], consumeErrors=True)
+ ).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)
@@ -1539,16 +1902,9 @@ class FederationHandler(BaseHandler):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- if k != event_key
- })
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- })
- context.state_group = self.store.get_next_state_group()
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
if different_auth and not event.internal_metadata.is_outlier():
logger.info("Different auth after resolution: %s", different_auth)
@@ -1572,7 +1928,9 @@ class FederationHandler(BaseHandler):
auth_ids = yield self.auth.compute_auth_events(
event, context.prev_state_ids
)
- local_auth_chain = yield self.store.get_auth_chain(auth_ids)
+ local_auth_chain = yield self.store.get_auth_chain(
+ auth_ids, include_given=True
+ )
try:
# 2. Get remote difference.
@@ -1583,13 +1941,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
+ if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:
@@ -1619,23 +1977,16 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- except:
+ except Exception:
# FIXME:
logger.exception("Failed to query auth chain")
# 4. Look at rejects and their proofs.
# TODO.
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- if k != event_key
- })
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- })
- context.state_group = self.store.get_next_state_group()
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
try:
self.auth.check(event, auth_events=auth_events)
@@ -1644,6 +1995,45 @@ class FederationHandler(BaseHandler):
raise e
@defer.inlineCallbacks
+ def _update_context_for_auth_events(self, event, context, auth_events,
+ event_key):
+ """Update the state_ids in an event context after auth event resolution,
+ storing the changes as a new state group.
+
+ Args:
+ event (Event): The event we're handling the context for
+
+ context (synapse.events.snapshot.EventContext): event context
+ to be updated
+
+ auth_events (dict[(str, str)->str]): Events to update in the event
+ context.
+
+ event_key ((str, str)): (type, state_key) for the current event.
+ this will not be included in the current_state in the context.
+ """
+ state_updates = {
+ k: a.event_id for k, a in auth_events.iteritems()
+ if k != event_key
+ }
+ context.current_state_ids = dict(context.current_state_ids)
+ context.current_state_ids.update(state_updates)
+ if context.delta_ids is not None:
+ context.delta_ids = dict(context.delta_ids)
+ context.delta_ids.update(state_updates)
+ context.prev_state_ids = dict(context.prev_state_ids)
+ context.prev_state_ids.update({
+ k: a.event_id for k, a in auth_events.iteritems()
+ })
+ context.state_group = yield self.store.store_state_group(
+ event.event_id,
+ event.room_id,
+ prev_group=context.prev_group,
+ delta_ids=context.delta_ids,
+ current_state_ids=context.current_state_ids,
+ )
+
+ @defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
""" Given a local and remote auth chain, find the differences. This
assumes that we have already processed all events in remote_auth
@@ -1686,7 +2076,7 @@ class FederationHandler(BaseHandler):
def get_next(it, opt=None):
try:
return it.next()
- except:
+ except Exception:
return opt
current_local = get_next(local_iter)
@@ -1811,8 +2201,7 @@ class FederationHandler(BaseHandler):
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
@@ -1827,7 +2216,7 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -1840,10 +2229,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ """Handle an exchange_third_party_invite request from a remote server
+
+ The remote server will call this when it wants to turn a 3pid invite
+ into a normal m.room.member invite.
+
+ Returns:
+ Deferred: resolves (to None)
+ """
builder = self.event_builder_factory.new(event_dict)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -1858,10 +2254,13 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
+ # XXX we send the invite here, but send_membership_event also sends it,
+ # so we end up making two requests. I think this is redundant.
returned_invite = yield self.send_invite(origin, event)
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
- member_handler = self.hs.get_handlers().room_member_handler
+
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
@@ -1890,8 +2289,9 @@ class FederationHandler(BaseHandler):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(builder=builder)
+ event, context = yield self.event_creation_handler.create_new_client_event(
+ builder=builder,
+ )
defer.returnValue((event, context))
@defer.inlineCallbacks
|