diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 080aca3d71..87c0615820 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -15,8 +15,17 @@
# limitations under the License.
"""Contains handlers for federation events."""
+
+import itertools
+import logging
+import sys
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
+import six
+from six.moves import http_client
+from six import iteritems
+from twisted.internet import defer
from unpaddedbase64 import decode_base64
from ._base import BaseHandler
@@ -43,10 +52,6 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.distributor import user_joined_room
-from twisted.internet import defer
-
-import itertools
-import logging
logger = logging.getLogger(__name__)
@@ -77,6 +82,7 @@ class FederationHandler(BaseHandler):
self.pusher_pool = hs.get_pusherpool()
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
+ self._server_notices_mxid = hs.config.server_notices_mxid
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -115,6 +121,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
+ # do some initial sanity-checking of the event. In particular, make
+ # sure it doesn't have hundreds of prev_events or auth_events, which
+ # could cause a huge state resolution or cascade of event fetches.
+ try:
+ self._sanity_check_event(pdu)
+ except SynapseError as err:
+ raise FederationError(
+ "ERROR",
+ err.code,
+ err.msg,
+ affected=pdu.event_id,
+ )
+
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@@ -149,10 +168,6 @@ class FederationHandler(BaseHandler):
auth_chain = []
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
fetch_state = False
# Get missing pdus if necessary.
@@ -168,7 +183,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -196,8 +211,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.iterkeys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@@ -248,8 +262,7 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
@@ -361,9 +374,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -469,18 +480,18 @@ class FederationHandler(BaseHandler):
# to get all state ids that we're interested in.
event_map = yield self.store.get_events([
e_id
- for key_to_eid in event_to_state_ids.values()
- for key, e_id in key_to_eid.items()
+ for key_to_eid in event_to_state_ids.itervalues()
+ for key, e_id in key_to_eid.iteritems()
if key[0] != EventTypes.Member or check_match(key[1])
])
event_to_state = {
e_id: {
key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.items()
+ for key, inner_e_id in key_to_eid.iteritems()
if inner_e_id in event_map
}
- for e_id, key_to_eid in event_to_state_ids.items()
+ for e_id, key_to_eid in event_to_state_ids.iteritems()
}
def redact_disallowed(event, state):
@@ -495,7 +506,7 @@ class FederationHandler(BaseHandler):
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
- for ev in state.values():
+ for ev in state.itervalues():
if ev.type != EventTypes.Member:
continue
try:
@@ -527,9 +538,16 @@ class FederationHandler(BaseHandler):
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
- This will attempt to get more events from the remote. This may return
- be successfull and still return no events if the other side has no new
- events to offer.
+ This will attempt to get more events from the remote. If the other side
+ has no new events to offer, this will return an empty list.
+
+ As the events are received, we check their signatures, and also do some
+ sanity-checking on them. If any of the backfilled events are invalid,
+ this method throws a SynapseError.
+
+ TODO: make this more useful to distinguish failures of the remote
+ server from invalid events (there is probably no point in trying to
+ re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -541,6 +559,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # ideally we'd sanity check the events here for excess prev_events etc,
+ # but it's hard to reject events at this point without completely
+ # breaking backfill in the same way that it is currently broken by
+ # events whose signature we cannot verify (#3121).
+ #
+ # So for now we accept the events anyway. #3124 tracks this.
+ #
+ # for ev in events:
+ # self._sanity_check_event(ev)
+
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -613,7 +641,8 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- logcontext.preserve_fn(self.replication_layer.get_pdu)(
+ logcontext.run_in_background(
+ self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
@@ -633,7 +662,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
- seen_events = yield self.store.have_events(
+ seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -723,9 +752,19 @@ class FederationHandler(BaseHandler):
curr_state = yield self.state_handler.get_current_state(room_id)
def get_domains_from_state(state):
+ """Get joined domains from state
+
+ Args:
+ state (dict[tuple, FrozenEvent]): State map from type/state
+ key to event.
+
+ Returns:
+ list[tuple[str, int]]: Returns a list of servers with the
+ lowest depth of their joins. Sorted by lowest depth first.
+ """
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
+ for (e_type, state_key), event in state.iteritems()
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -742,7 +781,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.items(), key=lambda d: d[1])
+ return sorted(joined_domains.iteritems(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -759,7 +798,7 @@ class FederationHandler(BaseHandler):
yield self.backfill(
dom, room_id,
limit=100,
- extremities=[e for e in extremities.keys()]
+ extremities=extremities,
)
# If this succeeded then we probably already have the
# appropriate stuff.
@@ -805,7 +844,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.keys())
+ event_ids = list(extremities.iterkeys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -815,34 +854,69 @@ class FederationHandler(BaseHandler):
[resolve(room_id, [e]) for e in event_ids],
consumeErrors=True,
))
+
+ # dict[str, dict[tuple, str]], a map from event_id to state map of
+ # event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.values() for e_id in ids],
+ [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.items()
+ for k, e_id in state_dict.iteritems()
if e_id in state_map
- } for key, state_dict in states.items()
+ } for key, state_dict in states.iteritems()
}
for e_id, _ in sorted_extremeties_tuple:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill([
- dom for dom in likely_domains
+ dom for dom, _ in likely_domains
if dom not in tried_domains
])
if success:
defer.returnValue(True)
- tried_domains.update(likely_domains)
+ tried_domains.update(dom for dom, _ in likely_domains)
defer.returnValue(False)
+ def _sanity_check_event(self, ev):
+ """
+ Do some early sanity checks of a received event
+
+ In particular, checks it doesn't have an excessive number of
+ prev_events or auth_events, which could cause a huge state resolution
+ or cascade of event fetches.
+
+ Args:
+ ev (synapse.events.EventBase): event to be checked
+
+ Returns: None
+
+ Raises:
+ SynapseError if the event does not pass muster
+ """
+ if len(ev.prev_events) > 20:
+ logger.warn("Rejecting event %s which has %i prev_events",
+ ev.event_id, len(ev.prev_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many prev_events",
+ )
+
+ if len(ev.auth_events) > 10:
+ logger.warn("Rejecting event %s which has %i auth_events",
+ ev.event_id, len(ev.auth_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many auth_events",
+ )
+
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -967,7 +1041,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
- logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
+ logcontext.run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True)
@@ -1121,6 +1195,13 @@ class FederationHandler(BaseHandler):
if not self.is_mine_id(event.state_key):
raise SynapseError(400, "The invite event must be for this server")
+ # block any attempts to invite the server notices mxid
+ if event.state_key == self._server_notices_mxid:
+ raise SynapseError(
+ http_client.FORBIDDEN,
+ "Cannot invite this user",
+ )
+
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True
@@ -1308,7 +1389,7 @@ class FederationHandler(BaseHandler):
)
if state_groups:
- _, state = state_groups.items().pop()
+ _, state = list(iteritems(state_groups)).pop()
results = {
(e.type, e.state_key): e for e in state
}
@@ -1457,18 +1538,21 @@ class FederationHandler(BaseHandler):
backfilled=backfilled,
)
except: # noqa: E722, as we reraise the exception this is fine.
- # Ensure that we actually remove the entries in the push actions
- # staging area
- logcontext.preserve_fn(
- self.store.remove_push_actions_from_staging
- )(event.event_id)
- raise
+ tp, value, tb = sys.exc_info()
+
+ logcontext.run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
+
+ six.reraise(tp, value, tb)
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
- event_stream_id, max_stream_id
+ logcontext.run_in_background(
+ self.pusher_pool.on_new_notifications,
+ event_stream_id, max_stream_id,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1482,7 +1566,8 @@ class FederationHandler(BaseHandler):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- logcontext.preserve_fn(self._prep_event)(
+ logcontext.run_in_background(
+ self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
@@ -1736,7 +1821,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
- have_events = yield self.store.have_events(
+ # TODO: can we use store.have_seen_events here instead?
+ have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@@ -1759,12 +1845,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
+ if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@@ -1791,7 +1877,7 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- have_events = yield self.store.have_events(
+ have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
@@ -1810,7 +1896,8 @@ class FederationHandler(BaseHandler):
different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
- logcontext.preserve_fn(self.store.get_event)(
+ logcontext.run_in_background(
+ self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
@@ -1876,13 +1963,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
+ if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:
@@ -1948,7 +2035,7 @@ class FederationHandler(BaseHandler):
this will not be included in the current_state in the context.
"""
state_updates = {
- k: a.event_id for k, a in auth_events.iteritems()
+ k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
context.current_state_ids = dict(context.current_state_ids)
@@ -1958,7 +2045,7 @@ class FederationHandler(BaseHandler):
context.delta_ids.update(state_updates)
context.prev_state_ids = dict(context.prev_state_ids)
context.prev_state_ids.update({
- k: a.event_id for k, a in auth_events.iteritems()
+ k: a.event_id for k, a in iteritems(auth_events)
})
context.state_group = yield self.store.store_state_group(
event.event_id,
@@ -2010,7 +2097,7 @@ class FederationHandler(BaseHandler):
def get_next(it, opt=None):
try:
- return it.next()
+ return next(it)
except Exception:
return opt
|