diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 495ac4c648..65f6041b10 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -20,37 +20,41 @@ import itertools
import logging
import sys
-from signedjson.key import decode_verify_key_bytes
-from signedjson.sign import verify_signed_json
import six
-from six.moves import http_client
from six import iteritems
-from twisted.internet import defer
+from six.moves import http_client
+
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
-from ._base import BaseHandler
+from twisted.internet import defer
+from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
- AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ AuthError,
+ CodeMessageException,
FederationDeniedError,
+ FederationError,
+ StoreError,
+ SynapseError,
)
-from synapse.api.constants import EventTypes, Membership, RejectedReason
-from synapse.events.validator import EventValidator
-from synapse.util import unwrapFirstError, logcontext
-from synapse.util.metrics import measure_func
-from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, Linearizer
-from synapse.util.frozenutils import unfreeze
from synapse.crypto.event_signing import (
- compute_event_signature, add_hashes_and_signatures,
+ add_hashes_and_signatures,
+ compute_event_signature,
)
+from synapse.events.validator import EventValidator
+from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
-
-from synapse.events.utils import prune_event
-
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async import Linearizer
+from synapse.util.distributor import user_joined_room
+from synapse.util.frozenutils import unfreeze
+from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
-from synapse.util.distributor import user_joined_room
+from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -89,7 +93,9 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def on_receive_pdu(self, origin, pdu, get_missing=True):
+ def on_receive_pdu(
+ self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ ):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -103,8 +109,10 @@ class FederationHandler(BaseHandler):
"""
# We reprocess pdus when we have seen them only as outliers
- existing = yield self.get_persisted_pdu(
- origin, pdu.event_id, do_auth=False
+ existing = yield self.store.get_event(
+ pdu.event_id,
+ allow_none=True,
+ allow_rejected=True,
)
# FIXME: Currently we fetch an event again when we already have it
@@ -161,14 +169,11 @@ class FederationHandler(BaseHandler):
"Ignoring PDU %s for room %s from %s as we've left the room!",
pdu.event_id, pdu.room_id, origin,
)
- return
+ defer.returnValue(None)
state = None
-
auth_chain = []
- fetch_state = False
-
# Get missing pdus if necessary.
if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
@@ -223,26 +228,60 @@ class FederationHandler(BaseHandler):
list(prevs - seen)[:5],
)
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+ if sent_to_us_directly and prevs - seen:
+ # If they have sent it to us directly, and the server
+ # isn't telling us about the auth events that it's
+ # made a message referencing, we explode
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- fetch_state = True
+ elif prevs - seen:
+ # Calculate the state of the previous events, and
+ # de-conflict them to find the current state.
+ state_groups = []
+ auth_chains = set()
+ try:
+ # Get the state of the events we know about
+ ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ state_groups.append(ours)
+
+ # Ask the remote server for the states we don't
+ # know about
+ for p in prevs - seen:
+ state, got_auth_chain = (
+ yield self.replication_layer.get_state_for_room(
+ origin, pdu.room_id, p
+ )
+ )
+ auth_chains.update(got_auth_chain)
+ state_group = {(x.type, x.state_key): x.event_id for x in state}
+ state_groups.append(state_group)
+
+ # Resolve any conflicting state
+ def fetch(ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False
+ )
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.replication_layer.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except Exception:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
+ state_map = yield resolve_events_with_factory(
+ state_groups, {pdu.event_id: pdu}, fetch
+ )
+
+ state = (yield self.store.get_events(state_map.values())).values()
+ auth_chain = list(auth_chains)
+ except Exception:
+ raise FederationError(
+ "ERROR",
+ 403,
+ "We can't get valid state history.",
+ affected=pdu.event_id,
+ )
yield self._process_received_pdu(
origin,
@@ -320,11 +359,17 @@ class FederationHandler(BaseHandler):
for e in missing_events:
logger.info("Handling found event %s", e.event_id)
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ e,
+ get_missing=False
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn("Event %s failed history check.")
+ else:
+ raise
@log_function
@defer.inlineCallbacks
@@ -455,83 +500,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- @measure_func("_filter_events_for_server")
- @defer.inlineCallbacks
- def _filter_events_for_server(self, server_name, room_id, events):
- event_to_state_ids = yield self.store.get_state_ids_for_events(
- frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, None),
- )
- )
-
- # We only want to pull out member events that correspond to the
- # server's domain.
-
- def check_match(id):
- try:
- return server_name == get_domain_from_id(id)
- except Exception:
- return False
-
- # Parses mapping `event_id -> (type, state_key) -> state event_id`
- # to get all state ids that we're interested in.
- event_map = yield self.store.get_events([
- e_id
- for key_to_eid in list(event_to_state_ids.values())
- for key, e_id in key_to_eid.items()
- if key[0] != EventTypes.Member or check_match(key[1])
- ])
-
- event_to_state = {
- e_id: {
- key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.iteritems()
- if inner_e_id in event_map
- }
- for e_id, key_to_eid in event_to_state_ids.iteritems()
- }
-
- def redact_disallowed(event, state):
- if not state:
- return event
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- if visibility in ["invited", "joined"]:
- # We now loop through all state events looking for
- # membership states for the requesting server to determine
- # if the server is either in the room or has been invited
- # into the room.
- for ev in state.itervalues():
- if ev.type != EventTypes.Member:
- continue
- try:
- domain = get_domain_from_id(ev.state_key)
- except Exception:
- continue
-
- if domain != server_name:
- continue
-
- memtype = ev.membership
- if memtype == Membership.JOIN:
- return event
- elif memtype == Membership.INVITE:
- if visibility == "invited":
- return event
- else:
- return prune_event(event)
-
- return event
-
- defer.returnValue([
- redact_disallowed(e, event_to_state[e.event_id])
- for e in events
- ])
-
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
@@ -938,16 +906,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -1381,8 +1339,6 @@ class FederationHandler(BaseHandler):
def get_state_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
-
state_groups = yield self.store.get_state_groups(
room_id, [event_id]
)
@@ -1405,18 +1361,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1425,8 +1369,6 @@ class FederationHandler(BaseHandler):
def get_state_ids_for_pdu(self, room_id, event_id):
"""Returns the state at the event. i.e. not including said event.
"""
- yield run_on_reactor()
-
state_groups = yield self.store.get_state_groups_ids(
room_id, [event_id]
)
@@ -1462,17 +1404,26 @@ class FederationHandler(BaseHandler):
limit
)
- events = yield self._filter_events_for_server(origin, room_id, events)
+ events = yield filter_events_for_server(self.store, origin, events)
defer.returnValue(events)
@defer.inlineCallbacks
@log_function
- def get_persisted_pdu(self, origin, event_id, do_auth=True):
- """ Get a PDU from the database with given origin and id.
+ def get_persisted_pdu(self, origin, event_id):
+ """Get an event from the database for the given server.
+
+ Args:
+ origin [str]: hostname of server which is requesting the event; we
+ will check that the server is allowed to see it.
+ event_id [str]: id of the event being requested
Returns:
- Deferred: Results in a `Pdu`.
+ Deferred[EventBase|None]: None if we know nothing about the event;
+ otherwise the (possibly-redacted) event.
+
+ Raises:
+ AuthError if the server is not currently in the room
"""
event = yield self.store.get_event(
event_id,
@@ -1481,32 +1432,17 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
- if do_auth:
- in_room = yield self.auth.check_host_in_room(
- event.room_id,
- origin
- )
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
- events = yield self._filter_events_for_server(
- origin, event.room_id, [event]
- )
-
- event = events[0]
+ in_room = yield self.auth.check_host_in_room(
+ event.room_id,
+ origin
+ )
+ if not in_room:
+ raise AuthError(403, "Host not in room.")
+ events = yield filter_events_for_server(
+ self.store, origin, [event],
+ )
+ event = events[0]
defer.returnValue(event)
else:
defer.returnValue(None)
@@ -1760,15 +1696,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -1794,8 +1721,8 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
- missing_events = yield self._filter_events_for_server(
- origin, room_id, missing_events,
+ missing_events = yield filter_events_for_server(
+ self.store, origin, missing_events,
)
defer.returnValue(missing_events)
|