diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f10b46414b..38bebbf598 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+def shortstr(iterable, maxitems=5):
+ """If iterable has maxitems or fewer, return the stringification of a list
+ containing those items.
+
+ Otherwise, return the stringification of a a list with the first maxitems items,
+ followed by "...".
+
+ Args:
+ iterable (Iterable): iterable to truncate
+ maxitems (int): number of items to return before truncating
+
+ Returns:
+ unicode
+ """
+
+ items = list(itertools.islice(iterable, maxitems + 1))
+ if len(items) <= maxitems:
+ return str(items)
+ return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -114,9 +135,8 @@ class FederationHandler(BaseHandler):
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
- @log_function
def on_receive_pdu(
- self, origin, pdu, get_missing=True, sent_to_us_directly=False,
+ self, origin, pdu, sent_to_us_directly=False,
):
""" Process a PDU received via a federation /send/ transaction, or
via backfill of missing prev_events
@@ -125,14 +145,23 @@ class FederationHandler(BaseHandler):
origin (str): server which initiated the /send/ transaction. Will
be used to fetch missing events or state.
pdu (FrozenEvent): received PDU
- get_missing (bool): True if we should fetch missing prev_events
+ sent_to_us_directly (bool): True if this event was pushed to us; False if
+ we pulled it as the result of a missing prev_event.
Returns (Deferred): completes with None
"""
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ logger.info(
+ "[%s %s] handling received PDU: %s",
+ room_id, event_id, pdu,
+ )
+
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
- pdu.event_id,
+ event_id,
allow_none=True,
allow_rejected=True,
)
@@ -147,7 +176,7 @@ class FederationHandler(BaseHandler):
)
)
if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
+ logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
# do some initial sanity-checking of the event. In particular, make
@@ -156,6 +185,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
+ logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
raise FederationError(
"ERROR",
err.code,
@@ -165,10 +195,12 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if pdu.room_id in self.room_queues:
- logger.info("Ignoring PDU %s for room %s from %s for now; join "
- "in progress", pdu.event_id, pdu.room_id, origin)
- self.room_queues[pdu.room_id].append((pdu, origin))
+ if room_id in self.room_queues:
+ logger.info(
+ "[%s %s] Queuing PDU from %s for now: join in progress",
+ room_id, event_id, origin,
+ )
+ self.room_queues[room_id].append((pdu, origin))
return
# If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +211,7 @@ class FederationHandler(BaseHandler):
# we should check if we *are* in fact in the room. If we are then we
# can magically rejoin the room.
is_in_room = yield self.auth.check_host_in_room(
- pdu.room_id,
+ room_id,
self.server_name
)
if not is_in_room:
@@ -188,8 +220,8 @@ class FederationHandler(BaseHandler):
)
if was_in_room:
logger.info(
- "Ignoring PDU %s for room %s from %s as we've left the room!",
- pdu.event_id, pdu.room_id, origin,
+ "[%s %s] Ignoring PDU from %s as we've left the room",
+ room_id, event_id, origin,
)
defer.returnValue(None)
@@ -204,8 +236,8 @@ class FederationHandler(BaseHandler):
)
logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
+ "[%s %s] min_depth: %d",
+ room_id, event_id, min_depth,
)
prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +250,18 @@ class FederationHandler(BaseHandler):
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
+ missing_prevs = prevs - seen
+ if sent_to_us_directly and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
+ "[%s %s] Acquired room lock to fetch %d missing prev_events",
+ room_id, event_id, len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
@@ -241,49 +274,91 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
logger.info(
- "Found all missing prev events for %s", pdu.event_id
+ "[%s %s] Found all missing prev_events",
+ room_id, event_id,
)
- elif prevs - seen:
+ elif missing_prevs:
logger.info(
- "Not fetching %d missing events for room %r,event %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id,
- list(prevs - seen)[:5],
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
+ )
+
+ if prevs - seen:
+ # We've still not been able to get all of the prev_events for this event.
+ #
+ # In this case, we need to fall back to asking another server in the
+ # federation for the state at this event. That's ok provided we then
+ # resolve the state against other bits of the DAG before using it (which
+ # will ensure that you can't just take over a room by sending an event,
+ # withholding its prev_events, and declaring yourself to be an admin in
+ # the subsequent state request).
+ #
+ # Now, if we're pulling this event as a missing prev_event, then clearly
+ # this event is not going to become the only forward-extremity and we are
+ # guaranteed to resolve its state against our existing forward
+ # extremities, so that should be fine.
+ #
+ # On the other hand, if this event was pushed to us, it is possible for
+ # it to become the only forward-extremity in the room, and we would then
+ # trust its state to be the state for the whole room. This is very bad.
+ # Further, if the event was pushed to us, there is no excuse for us not to
+ # have all the prev_events. We therefore reject any such events.
+ #
+ # XXX this really feels like it could/should be merged with the above,
+ # but there is an interaction with min_depth that I'm not really
+ # following.
+
+ if sent_to_us_directly:
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
+ raise FederationError(
+ "ERROR",
+ 403,
+ (
+ "Your server isn't divulging details about prev_events "
+ "referenced in this event."
+ ),
+ affected=pdu.event_id,
)
- if sent_to_us_directly and prevs - seen:
- # If they have sent it to us directly, and the server
- # isn't telling us about the auth events that it's
- # made a message referencing, we explode
- raise FederationError(
- "ERROR",
- 403,
- (
- "Your server isn't divulging details about prev_events "
- "referenced in this event."
- ),
- affected=pdu.event_id,
- )
- elif prevs - seen:
# Calculate the state of the previous events, and
# de-conflict them to find the current state.
state_groups = []
auth_chains = set()
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ ours = yield self.store.get_state_groups(room_id, list(seen))
state_groups.append(ours)
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
- state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, pdu.room_id, p
- )
+ logger.info(
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id, event_id, p,
)
- auth_chains.update(got_auth_chain)
- state_group = {(x.type, x.state_key): x.event_id for x in state}
- state_groups.append(state_group)
+
+ with logcontext.nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, room_id, p,
+ )
+ )
+
+ # XXX hrm I'm not convinced that duplicate events will compare
+ # for equality, so I'm not sure this does what the author
+ # hoped.
+ auth_chains.update(got_auth_chain)
+
+ state_group = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_groups.append(state_group)
# Resolve any conflicting state
def fetch(ev_ids):
@@ -291,19 +366,24 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
- room_version = yield self.store.get_room_version(pdu.room_id)
+ room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {pdu.event_id: pdu}, fetch
+ room_version, state_groups, {event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
+ logger.warn(
+ "[%s %s] Error attempting to resolve state at missing "
+ "prev_events",
+ room_id, event_id, exc_info=True,
+ )
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
- affected=pdu.event_id,
+ affected=event_id,
)
yield self._process_received_pdu(
@@ -322,15 +402,16 @@ class FederationHandler(BaseHandler):
prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
"""
- # We recalculate seen, since it may have changed.
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
+ latest = yield self.store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -338,8 +419,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "Missing %d events for room %r pdu %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+ "[%s %s]: Requesting %d prev_events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
# XXX: we set timeout to 10s to help workaround
@@ -392,7 +473,7 @@ class FederationHandler(BaseHandler):
missing_events = yield self.federation_client.get_missing_events(
origin,
- pdu.room_id,
+ room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
@@ -401,37 +482,47 @@ class FederationHandler(BaseHandler):
)
logger.info(
- "Got %d events: %r...",
- len(missing_events), [e.event_id for e in missing_events[:5]]
+ "[%s %s]: Got %d prev_events: %s",
+ room_id, event_id, len(missing_events), shortstr(missing_events),
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
- for e in missing_events:
- logger.info("Handling found event %s", e.event_id)
- try:
- yield self.on_receive_pdu(
- origin,
- e,
- get_missing=False
- )
- except FederationError as e:
- if e.code == 403:
- logger.warn("Event %s failed history check.")
- else:
- raise
+ for ev in missing_events:
+ logger.info(
+ "[%s %s] Handling received prev_event %s",
+ room_id, event_id, ev.event_id,
+ )
+ with logcontext.nested_logging_context(ev.event_id):
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ ev,
+ sent_to_us_directly=False,
+ )
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
+ else:
+ raise
- @log_function
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
- event = pdu
+ room_id = event.room_id
+ event_id = event.event_id
- logger.debug("Processing event: %s", event)
+ logger.debug(
+ "[%s %s] Processing event: %s",
+ room_id, event_id, event,
+ )
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -440,15 +531,16 @@ class FederationHandler(BaseHandler):
# event.
if state and auth_chain and not event.internal_metadata.is_outlier():
is_in_room = yield self.auth.check_host_in_room(
- event.room_id,
+ room_id,
self.server_name
)
else:
is_in_room = True
+
if not is_in_room:
logger.info(
- "Got event for room we're not in: %r %r",
- event.room_id, event.event_id
+ "[%s %s] Got event for room we're not in",
+ room_id, event_id,
)
try:
@@ -460,7 +552,7 @@ class FederationHandler(BaseHandler):
"ERROR",
e.code,
e.msg,
- affected=event.event_id,
+ affected=event_id,
)
else:
@@ -493,6 +585,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
yield self._handle_new_events(origin, event_infos)
try:
@@ -509,12 +605,12 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- room = yield self.store.get_room(event.room_id)
+ room = yield self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
- room_id=event.room_id,
+ room_id=room_id,
room_creator_user_id="",
is_public=False,
)
@@ -542,7 +638,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, event.room_id)
+ yield self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
@@ -1056,7 +1152,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p)
+ with logcontext.nested_logging_context(p.event_id):
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@@ -1459,12 +1556,10 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue(None)
- @log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@defer.inlineCallbacks
- @log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event(
@@ -1504,15 +1599,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(
- self._prep_event,
+
+ @defer.inlineCallbacks
+ def prep(ev_info):
+ event = ev_info["event"]
+ with logcontext.nested_logging_context(suffix=event.event_id):
+ res = yield self._prep_event(
origin,
- ev_info["event"],
+ event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
+ defer.returnValue(res)
+
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
@@ -1664,8 +1766,8 @@ class FederationHandler(BaseHandler):
)
except AuthError as e:
logger.warn(
- "Rejecting %s because %s",
- event.event_id, e.msg
+ "[%s %s] Rejecting: %s",
+ event.room_id, event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
|