diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 483cb8eac6..145c1a21d4 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,41 +15,46 @@
# limitations under the License.
"""Contains handlers for federation events."""
-import synapse.util.logcontext
+
+import itertools
+import logging
+import sys
+
+import six
+from six import iteritems, itervalues
+from six.moves import http_client, zip
+
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
-from ._base import BaseHandler
+from twisted.internet import defer
-from synapse.api.errors import (
- AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
-)
from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import (
- preserve_fn, preserve_context_over_deferred
+from synapse.api.errors import (
+ AuthError,
+ CodeMessageException,
+ FederationDeniedError,
+ FederationError,
+ StoreError,
+ SynapseError,
)
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
- compute_event_signature, add_hashes_and_signatures,
+ add_hashes_and_signatures,
+ compute_event_signature,
)
+from synapse.events.validator import EventValidator
+from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
-from synapse.util.retryutils import NotRetryingDestination
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async import Linearizer
from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
-from twisted.internet import defer
-
-import itertools
-import logging
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -70,14 +76,16 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_replication_layer()
+ self.replication_layer = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
self.is_mine_id = hs.is_mine_id
-
- self.replication_layer.set_handler(self)
+ self.pusher_pool = hs.get_pusherpool()
+ self.spam_checker = hs.get_spam_checker()
+ self.event_creation_handler = hs.get_event_creation_handler()
+ self._server_notices_mxid = hs.config.server_notices_mxid
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@@ -85,7 +93,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_receive_pdu(self, origin, pdu, get_missing=True):
+ def on_receive_pdu(
+ self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ ):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -99,8 +109,10 @@ class FederationHandler(BaseHandler):
"""
# We reprocess pdus when we have seen them only as outliers
- existing = yield self.get_persisted_pdu(
- origin, pdu.event_id, do_auth=False
+ existing = yield self.store.get_event(
+ pdu.event_id,
+ allow_none=True,
+ allow_rejected=True,
)
# FIXME: Currently we fetch an event again when we already have it
@@ -116,6 +128,19 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id)
return
+ # do some initial sanity-checking of the event. In particular, make
+ # sure it doesn't have hundreds of prev_events or auth_events, which
+ # could cause a huge state resolution or cascade of event fetches.
+ try:
+ self._sanity_check_event(pdu)
+ except SynapseError as err:
+ raise FederationError(
+ "ERROR",
+ err.code,
+ err.msg,
+ affected=pdu.event_id,
+ )
+
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
@@ -124,15 +149,30 @@ class FederationHandler(BaseHandler):
self.room_queues[pdu.room_id].append((pdu, origin))
return
- state = None
-
- auth_chain = []
-
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
+ # If we're no longer in the room just ditch the event entirely. This
+ # is probably an old server that has come back and thinks we're still
+ # in the room (or we've been rejoined to the room by a state reset).
+ #
+ # If we were never in the room then maybe our database got vaped and
+ # we should check if we *are* in fact in the room. If we are then we
+ # can magically rejoin the room.
+ is_in_room = yield self.auth.check_host_in_room(
+ pdu.room_id,
+ self.server_name
)
+ if not is_in_room:
+ was_in_room = yield self.store.was_host_joined(
+ pdu.room_id, self.server_name,
+ )
+ if was_in_room:
+ logger.info(
+ "Ignoring PDU %s for room %s from %s as we've left the room!",
+ pdu.event_id, pdu.room_id, origin,
+ )
+ defer.returnValue(None)
- fetch_state = False
+ state = None
+ auth_chain = []
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
@@ -147,7 +187,7 @@ class FederationHandler(BaseHandler):
)
prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if min_depth and pdu.depth < min_depth:
# This is so that we don't notify the user about this
@@ -175,8 +215,7 @@ class FederationHandler(BaseHandler):
# Update the set of things we've seen after trying to
# fetch the missing stuff
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.iterkeys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
logger.info(
@@ -189,26 +228,60 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ if sent_to_us_directly and prevs - seen:
+ # If they have sent it to us directly, and the server
+ # isn't telling us about the auth events that it's
+ # made a message referencing, we explode
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- fetch_state = True
+ elif prevs - seen:
+ # Calculate the state of the previous events, and
+ # de-conflict them to find the current state.
+ state_groups = []
+ auth_chains = set()
+ try:
+ # Get the state of the events we know about
+ ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ state_groups.append(ours)
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in prevs - seen:
+ state, got_auth_chain = (
+ yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, p
+ )
+ )
+ auth_chains.update(got_auth_chain)
+ state_group = {(x.type, x.state_key): x.event_id for x in state}
+ state_groups.append(state_group)
+
+ # Resolve any conflicting state
+ def fetch(ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False
+ )
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.replication_layer.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
+ state_map = yield resolve_events_with_factory(
+ state_groups, {pdu.event_id: pdu}, fetch
+ )
+
+ state = (yield self.store.get_events(state_map.values())).values()
+ auth_chain = list(auth_chains)
+ except Exception:
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=pdu.event_id,
+ )
yield self._process_received_pdu(
origin,
@@ -227,8 +300,7 @@ class FederationHandler(BaseHandler):
min_depth (int): Minimum depth of events to return.
"""
# We recalculate seen, since it may have changed.
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.keys())
+ seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
@@ -287,11 +359,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn("Event %s failed history check.")
+ else:
+ raise
@log_function
@defer.inlineCallbacks
@@ -340,9 +418,7 @@ class FederationHandler(BaseHandler):
if auth_chain:
event_ids |= {e.event_id for e in auth_chain}
- seen_ids = set(
- (yield self.store.have_events(event_ids)).keys()
- )
+ seen_ids = yield self.store.have_seen_events(event_ids)
if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication
@@ -410,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -424,91 +503,21 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- @measure_func("_filter_events_for_server")
- @defer.inlineCallbacks
- def _filter_events_for_server(self, server_name, room_id, events):
- event_to_state_ids = yield self.store.get_state_ids_for_events(
- frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, None),
- )
- )
-
- # We only want to pull out member events that correspond to the
- # server's domain.
-
- def check_match(id):
- try:
- return server_name == get_domain_from_id(id)
- except:
- return False
-
- # Parses mapping `event_id -> (type, state_key) -> state event_id`
- # to get all state ids that we're interested in.
- event_map = yield self.store.get_events([
- e_id
- for key_to_eid in event_to_state_ids.values()
- for key, e_id in key_to_eid.items()
- if key[0] != EventTypes.Member or check_match(key[1])
- ])
-
- event_to_state = {
- e_id: {
- key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.items()
- if inner_e_id in event_map
- }
- for e_id, key_to_eid in event_to_state_ids.items()
- }
-
- def redact_disallowed(event, state):
- if not state:
- return event
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- if visibility in ["invited", "joined"]:
- # We now loop through all state events looking for
- # membership states for the requesting server to determine
- # if the server is either in the room or has been invited
- # into the room.
- for ev in state.values():
- if ev.type != EventTypes.Member:
- continue
- try:
- domain = get_domain_from_id(ev.state_key)
- except:
- continue
-
- if domain != server_name:
- continue
-
- memtype = ev.membership
- if memtype == Membership.JOIN:
- return event
- elif memtype == Membership.INVITE:
- if visibility == "invited":
- return event
- else:
- return prune_event(event)
-
- return event
-
- defer.returnValue([
- redact_disallowed(e, event_to_state[e.event_id])
- for e in events
- ])
-
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`
- This will attempt to get more events from the remote. This may return
- be successfull and still return no events if the other side has no new
- events to offer.
+ This will attempt to get more events from the remote. If the other side
+ has no new events to offer, this will return an empty list.
+
+ As the events are received, we check their signatures, and also do some
+ sanity-checking on them. If any of the backfilled events are invalid,
+ this method throws a SynapseError.
+
+ TODO: make this more useful to distinguish failures of the remote
+ server from invalid events (there is probably no point in trying to
+ re-fetch invalid events from every other HS in the room.)
"""
if dest == self.server_name:
raise SynapseError(400, "Can't backfill from self.")
@@ -520,6 +529,16 @@ class FederationHandler(BaseHandler):
extremities=extremities,
)
+ # ideally we'd sanity check the events here for excess prev_events etc,
+ # but it's hard to reject events at this point without completely
+ # breaking backfill in the same way that it is currently broken by
+ # events whose signature we cannot verify (#3121).
+ #
+ # So for now we accept the events anyway. #3124 tracks this.
+ #
+ # for ev in events:
+ # self._sanity_check_event(ev)
+
# Don't bother processing events we already have.
seen_events = yield self.store.have_events_in_timeline(
set(e.event_id for e in events)
@@ -590,9 +609,10 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
- results = yield preserve_context_over_deferred(defer.gatherResults(
+ results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self.replication_layer.get_pdu)(
+ logcontext.run_in_background(
+ self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
@@ -612,7 +632,7 @@ class FederationHandler(BaseHandler):
failed_to_fetch = missing_auth - set(auth_events)
- seen_events = yield self.store.have_events(
+ seen_events = yield self.store.have_seen_events(
set(auth_events.keys()) | set(state_events.keys())
)
@@ -702,9 +722,19 @@ class FederationHandler(BaseHandler):
curr_state = yield self.state_handler.get_current_state(room_id)
def get_domains_from_state(state):
+ """Get joined domains from state
+
+ Args:
+ state (dict[tuple, FrozenEvent]): State map from type/state
+ key to event.
+
+ Returns:
+ list[tuple[str, int]]: Returns a list of servers with the
+ lowest depth of their joins. Sorted by lowest depth first.
+ """
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
+ for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -718,7 +748,7 @@ class FederationHandler(BaseHandler):
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
- except:
+ except Exception:
pass
return sorted(joined_domains.items(), key=lambda d: d[1])
@@ -738,7 +768,7 @@ class FederationHandler(BaseHandler):
yield self.backfill(
dom, room_id,
limit=100,
- extremities=[e for e in extremities.keys()]
+ extremities=extremities,
)
# If this succeeded then we probably already have the
# appropriate stuff.
@@ -762,6 +792,9 @@ class FederationHandler(BaseHandler):
except NotRetryingDestination as e:
logger.info(e.message)
continue
+ except FederationDeniedError as e:
+ logger.info(e)
+ continue
except Exception as e:
logger.exception(
"Failed to backfill from %s because %s",
@@ -784,38 +817,76 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
- states = yield preserve_context_over_deferred(defer.gatherResults([
- preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
- for e in event_ids
- ]))
+ resolve = logcontext.preserve_fn(
+ self.state_handler.resolve_state_groups_for_events
+ )
+ states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [resolve(room_id, [e]) for e in event_ids],
+ consumeErrors=True,
+ ))
+
+ # dict[str, dict[tuple, str]], a map from event_id to state map of
+ # event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.values() for e_id in ids],
+ [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.items()
+ for k, e_id in iteritems(state_dict)
if e_id in state_map
- } for key, state_dict in states.items()
+ } for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
likely_domains = get_domains_from_state(states[e_id])
success = yield try_backfill([
- dom for dom in likely_domains
+ dom for dom, _ in likely_domains
if dom not in tried_domains
])
if success:
defer.returnValue(True)
- tried_domains.update(likely_domains)
+ tried_domains.update(dom for dom, _ in likely_domains)
defer.returnValue(False)
+ def _sanity_check_event(self, ev):
+ """
+ Do some early sanity checks of a received event
+
+ In particular, checks it doesn't have an excessive number of
+ prev_events or auth_events, which could cause a huge state resolution
+ or cascade of event fetches.
+
+ Args:
+ ev (synapse.events.EventBase): event to be checked
+
+ Returns: None
+
+ Raises:
+ SynapseError if the event does not pass muster
+ """
+ if len(ev.prev_events) > 20:
+ logger.warn("Rejecting event %s which has %i prev_events",
+ ev.event_id, len(ev.prev_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many prev_events",
+ )
+
+ if len(ev.auth_events) > 10:
+ logger.warn("Rejecting event %s which has %i auth_events",
+ ev.event_id, len(ev.auth_events))
+ raise SynapseError(
+ http_client.BAD_REQUEST,
+ "Too many auth_events",
+ )
+
@defer.inlineCallbacks
def send_invite(self, target_host, event):
""" Sends the invite to the remote server for signing.
@@ -838,16 +909,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -916,7 +977,7 @@ class FederationHandler(BaseHandler):
room_creator_user_id="",
is_public=False
)
- except:
+ except Exception:
# FIXME
pass
@@ -940,9 +1001,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
- synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
- room_queue
- )
+ logcontext.run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True)
@@ -982,8 +1041,7 @@ class FederationHandler(BaseHandler):
})
try:
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
except AuthError as e:
@@ -1051,13 +1109,15 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- state_ids = context.prev_state_ids.values()
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(context.prev_state_ids.values())
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
- "state": state.values(),
+ "state": list(state.values()),
"auth_chain": auth_chain,
})
@@ -1069,10 +1129,23 @@ class FederationHandler(BaseHandler):
"""
event = pdu
+ if event.state_key is None:
+ raise SynapseError(400, "The invite event did not have a state key")
+
is_blocked = yield self.store.is_room_blocked(event.room_id)
if is_blocked:
raise SynapseError(403, "This room has been blocked on this server")
+ if self.hs.config.block_non_admin_invites:
+ raise SynapseError(403, "This server does not accept room invites")
+
+ if not self.spam_checker.user_may_invite(
+ event.sender, event.state_key, event.room_id,
+ ):
+ raise SynapseError(
+ 403, "This user is not permitted to send invites to this server/user"
+ )
+
membership = event.content.get("membership")
if event.type != EventTypes.Member or membership != Membership.INVITE:
raise SynapseError(400, "The event was not an m.room.member invite event")
@@ -1081,12 +1154,16 @@ class FederationHandler(BaseHandler):
if sender_domain != origin:
raise SynapseError(400, "The invite event was not from the server sending it")
- if event.state_key is None:
- raise SynapseError(400, "The invite event did not have a state key")
-
if not self.is_mine_id(event.state_key):
raise SynapseError(400, "The invite event must be for this server")
+ # block any attempts to invite the server notices mxid
+ if event.state_key == self._server_notices_mxid:
+ raise SynapseError(
+ http_client.FORBIDDEN,
+ "Cannot invite this user",
+ )
+
event.internal_metadata.outlier = True
event.internal_metadata.invite_from_remote = True
@@ -1213,8 +1290,7 @@ class FederationHandler(BaseHandler):
"state_key": user_id,
})
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -1268,14 +1344,12 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
-
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
if state_groups:
- _, state = state_groups.items().pop()
+ _, state = list(iteritems(state_groups)).pop()
results = {
(e.type, e.state_key): e for e in state
}
@@ -1291,19 +1365,7 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
- res = results.values()
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
+ res = list(results.values())
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1312,8 +1374,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
-
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)
@@ -1332,7 +1392,7 @@ class FederationHandler(BaseHandler):
else:
results.pop((event.type, event.state_key), None)
- defer.returnValue(results.values())
+ defer.returnValue(list(results.values()))
else:
defer.returnValue([])
@@ -1349,17 +1409,26 @@ class FederationHandler(BaseHandler):
limit
)
- events = yield self._filter_events_for_server(origin, room_id, events)
+ events = yield filter_events_for_server(self.store, origin, events)
defer.returnValue(events)
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id, do_auth=True):
- """ Get a PDU from the database with given origin and id.
+ def get_persisted_pdu(self, origin, event_id):
+ """Get an event from the database for the given server.
+
+ Args:
+ origin [str]: hostname of server which is requesting the event; we
+ will check that the server is allowed to see it.
+ event_id [str]: id of the event being requested
Returns:
- Deferred: Results in a `Pdu`.
+ Deferred[EventBase|None]: None if we know nothing about the event;
+ otherwise the (possibly-redacted) event.
+
+ Raises:
+ AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event_id,
@@ -1368,32 +1437,17 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
- if do_auth:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
- )
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
- events = yield self._filter_events_for_server(
- origin, event.room_id, [event]
- )
-
- event = events[0]
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+ events = yield filter_events_for_server(
+ self.store, origin, [event],
+ )
+ event = events[0]
defer.returnValue(event)
else:
defer.returnValue(None)
@@ -1412,22 +1466,33 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not event.internal_metadata.is_outlier():
- yield self.action_generator.handle_push_actions_for_event(
- event, context
+ try:
+ if not event.internal_metadata.is_outlier() and not backfilled:
+ yield self.action_generator.handle_push_actions_for_event(
+ event, context
+ )
+
+ event_stream_id, max_stream_id = yield self.store.persist_event(
+ event,
+ context=context,
+ backfilled=backfilled,
)
+ except: # noqa: E722, as we reraise the exception this is fine.
+ tp, value, tb = sys.exc_info()
- event_stream_id, max_stream_id = yield self.store.persist_event(
- event,
- context=context,
- backfilled=backfilled,
- )
+ logcontext.run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
+
+ six.reraise(tp, value, tb)
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
- preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
- event_stream_id, max_stream_id
+ logcontext.run_in_background(
+ self.pusher_pool.on_new_notifications,
+ event_stream_id, max_stream_id,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1439,22 +1504,23 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
- contexts = yield preserve_context_over_deferred(defer.gatherResults(
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self._prep_event)(
+ logcontext.run_in_background(
+ self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
for ev_info in event_infos
- ]
+ ], consumeErrors=True,
))
yield self.store.persist_events(
[
(ev_info["event"], context)
- for ev_info, context in itertools.izip(event_infos, contexts)
+ for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@@ -1574,8 +1640,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1605,7 +1672,7 @@ class FederationHandler(BaseHandler):
context.rejected = RejectedReason.AUTH_ERROR
- if event.type == EventTypes.GuestAccess:
+ if event.type == EventTypes.GuestAccess and not context.rejected:
yield self.maybe_kick_guest_users(event)
defer.returnValue(context)
@@ -1635,15 +1702,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -1669,11 +1727,26 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
+ missing_events = yield filter_events_for_server(
+ self.store, origin, missing_events,
+ )
+
defer.returnValue(missing_events)
@defer.inlineCallbacks
@log_function
def do_auth(self, origin, event, context, auth_events):
+ """
+
+ Args:
+ origin (str):
+ event (synapse.events.FrozenEvent):
+ context (synapse.events.snapshot.EventContext):
+ auth_events (dict[(str, str)->str]):
+
+ Returns:
+ defer.Deferred[None]
+ """
# Check if we have all the auth events.
current_state = set(e.event_id for e in auth_events.values())
event_auth_events = set(e_id for e_id, _ in event.auth_events)
@@ -1684,7 +1757,8 @@ class FederationHandler(BaseHandler):
event_key = None
if event_auth_events - current_state:
- have_events = yield self.store.have_events(
+ # TODO: can we use store.have_seen_events here instead?
+ have_events = yield self.store.get_seen_events_with_rejections(
event_auth_events - current_state
)
else:
@@ -1707,12 +1781,12 @@ class FederationHandler(BaseHandler):
origin, event.room_id, event.event_id
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
- if e.event_id in seen_remotes.keys():
+ if e.event_id in seen_remotes:
continue
if e.event_id == event.event_id:
@@ -1739,11 +1813,11 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- have_events = yield self.store.have_events(
+ have_events = yield self.store.get_seen_events_with_rejections(
[e_id for e_id, _ in event.auth_events]
)
seen_events = set(have_events.keys())
- except:
+ except Exception:
# FIXME:
logger.exception("Failed to get auth chain")
@@ -1756,18 +1830,18 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
- different_events = yield preserve_context_over_deferred(defer.gatherResults(
- [
- preserve_fn(self.store.get_event)(
+ different_events = yield logcontext.make_deferred_yieldable(
+ defer.gatherResults([
+ logcontext.run_in_background(
+ self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
)
for d in different_auth
if d in have_events and not have_events[d]
- ],
- consumeErrors=True
- )).addErrback(unwrapFirstError)
+ ], consumeErrors=True)
+ ).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)
@@ -1777,7 +1851,7 @@ class FederationHandler(BaseHandler):
})
new_state = self.state_handler.resolve_events(
- [local_view.values(), remote_view.values()],
+ [list(local_view.values()), list(remote_view.values())],
event
)
@@ -1786,16 +1860,9 @@ class FederationHandler(BaseHandler):
current_state = set(e.event_id for e in auth_events.values())
different_auth = event_auth_events - current_state
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- if k != event_key
- })
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- })
- context.state_group = self.store.get_next_state_group()
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
if different_auth and not event.internal_metadata.is_outlier():
logger.info("Different auth after resolution: %s", different_auth)
@@ -1815,9 +1882,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -1832,13 +1900,13 @@ class FederationHandler(BaseHandler):
local_auth_chain,
)
- seen_remotes = yield self.store.have_events(
+ seen_remotes = yield self.store.have_seen_events(
[e.event_id for e in result["auth_chain"]]
)
# 3. Process any remote auth chain events we haven't seen.
for ev in result["auth_chain"]:
- if ev.event_id in seen_remotes.keys():
+ if ev.event_id in seen_remotes:
continue
if ev.event_id == event.event_id:
@@ -1868,23 +1936,16 @@ class FederationHandler(BaseHandler):
except AuthError:
pass
- except:
+ except Exception:
# FIXME:
logger.exception("Failed to query auth chain")
# 4. Look at rejects and their proofs.
# TODO.
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- if k != event_key
- })
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
- k: a.event_id for k, a in auth_events.items()
- })
- context.state_group = self.store.get_next_state_group()
+ yield self._update_context_for_auth_events(
+ event, context, auth_events, event_key,
+ )
try:
self.auth.check(event, auth_events=auth_events)
@@ -1893,6 +1954,58 @@ class FederationHandler(BaseHandler):
raise e
@defer.inlineCallbacks
+ def _update_context_for_auth_events(self, event, context, auth_events,
+ event_key):
+ """Update the state_ids in an event context after auth event resolution,
+ storing the changes as a new state group.
+
+ Args:
+ event (Event): The event we're handling the context for
+
+ context (synapse.events.snapshot.EventContext): event context
+ to be updated
+
+ auth_events (dict[(str, str)->str]): Events to update in the event
+ context.
+
+ event_key ((str, str)): (type, state_key) for the current event.
+ this will not be included in the current_state in the context.
+ """
+ state_updates = {
+ k: a.event_id for k, a in iteritems(auth_events)
+ if k != event_key
+ }
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
+ k: a.event_id for k, a in iteritems(auth_events)
+ })
+
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
+ state_group = yield self.store.store_state_group(
+ event.event_id,
+ event.room_id,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ )
+
+ @defer.inlineCallbacks
def construct_auth_difference(self, local_auth, remote_auth):
""" Given a local and remote auth chain, find the differences. This
assumes that we have already processed all events in remote_auth
@@ -1934,8 +2047,8 @@ class FederationHandler(BaseHandler):
def get_next(it, opt=None):
try:
- return it.next()
- except:
+ return next(it)
+ except Exception:
return opt
current_local = get_next(local_iter)
@@ -2060,8 +2173,7 @@ class FederationHandler(BaseHandler):
if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder
)
@@ -2076,7 +2188,7 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
- member_handler = self.hs.get_handlers().room_member_handler
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
else:
destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
@@ -2089,10 +2201,17 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+ """Handle an exchange_third_party_invite request from a remote server
+
+ The remote server will call this when it wants to turn a 3pid invite
+ into a normal m.room.member invite.
+
+ Returns:
+ Deferred: resolves (to None)
+ """
builder = self.event_builder_factory.new(event_dict)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(
+ event, context = yield self.event_creation_handler.create_new_client_event(
builder=builder,
)
@@ -2107,10 +2226,13 @@ class FederationHandler(BaseHandler):
raise e
yield self._check_signature(event, context)
+ # XXX we send the invite here, but send_membership_event also sends it,
+ # so we end up making two requests. I think this is redundant.
returned_invite = yield self.send_invite(origin, event)
# TODO: Make sure the signatures actually are correct.
event.signatures.update(returned_invite.signatures)
- member_handler = self.hs.get_handlers().room_member_handler
+
+ member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
@defer.inlineCallbacks
@@ -2120,7 +2242,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2139,8 +2262,9 @@ class FederationHandler(BaseHandler):
builder = self.event_builder_factory.new(event_dict)
EventValidator().validate_new(builder)
- message_handler = self.hs.get_handlers().message_handler
- event, context = yield message_handler._create_new_client_event(builder=builder)
+ event, context = yield self.event_creation_handler.create_new_client_event(
+ builder=builder,
+ )
defer.returnValue((event, context))
@defer.inlineCallbacks
@@ -2161,7 +2285,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
|