diff --git a/changelog.d/3904.misc b/changelog.d/3904.misc
new file mode 100644
index 0000000000..1e3c8e1706
--- /dev/null
+++ b/changelog.d/3904.misc
@@ -0,0 +1 @@
+Improve the logging when handling a federation transaction
\ No newline at end of file
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f10b46414b..8d6bd7976d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -69,6 +69,27 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+def shortstr(iterable, maxitems=5):
+ """If iterable has maxitems or fewer, return the stringification of a list
+ containing those items.
+
+ Otherwise, return the stringification of a a list with the first maxitems items,
+ followed by "...".
+
+ Args:
+ iterable (Iterable): iterable to truncate
+ maxitems (int): number of items to return before truncating
+
+ Returns:
+ unicode
+ """
+
+ items = list(itertools.islice(iterable, maxitems + 1))
+ if len(items) <= maxitems:
+ return str(items)
+ return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
+
+
class FederationHandler(BaseHandler):
"""Handles events that originated from federation.
Responsible for:
@@ -114,7 +135,6 @@ class FederationHandler(BaseHandler):
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@defer.inlineCallbacks
- @log_function
def on_receive_pdu(
self, origin, pdu, get_missing=True, sent_to_us_directly=False,
):
@@ -130,9 +150,17 @@ class FederationHandler(BaseHandler):
Returns (Deferred): completes with None
"""
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
+ logger.info(
+ "[%s %s] handling received PDU: %s",
+ room_id, event_id, pdu,
+ )
+
# We reprocess pdus when we have seen them only as outliers
existing = yield self.store.get_event(
- pdu.event_id,
+ event_id,
allow_none=True,
allow_rejected=True,
)
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
)
)
if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
+ logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
return
# do some initial sanity-checking of the event. In particular, make
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
try:
self._sanity_check_event(pdu)
except SynapseError as err:
+ logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
raise FederationError(
"ERROR",
err.code,
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
- if pdu.room_id in self.room_queues:
- logger.info("Ignoring PDU %s for room %s from %s for now; join "
- "in progress", pdu.event_id, pdu.room_id, origin)
- self.room_queues[pdu.room_id].append((pdu, origin))
+ if room_id in self.room_queues:
+ logger.info(
+ "[%s %s] Queuing PDU from %s for now: join in progress",
+ room_id, event_id, origin,
+ )
+ self.room_queues[room_id].append((pdu, origin))
return
# If we're no longer in the room just ditch the event entirely. This
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
# we should check if we *are* in fact in the room. If we are then we
# can magically rejoin the room.
is_in_room = yield self.auth.check_host_in_room(
- pdu.room_id,
+ room_id,
self.server_name
)
if not is_in_room:
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
)
if was_in_room:
logger.info(
- "Ignoring PDU %s for room %s from %s as we've left the room!",
- pdu.event_id, pdu.room_id, origin,
+ "[%s %s] Ignoring PDU from %s as we've left the room",
+ room_id, event_id, origin,
)
defer.returnValue(None)
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
)
logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
+ "[%s %s] min_depth: %d",
+ room_id, event_id, min_depth,
)
prevs = {e_id for e_id, _ in pdu.prev_events}
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
# send to the clients.
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
+ missing_prevs = prevs - seen
+ if get_missing and missing_prevs:
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+ "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
+ "[%s %s] Acquired room lock to fetch %d missing prev_events",
+ room_id, event_id, len(missing_prevs),
)
yield self._get_missing_events_for_pdu(
@@ -241,19 +273,23 @@ class FederationHandler(BaseHandler):
if not prevs - seen:
logger.info(
- "Found all missing prev events for %s", pdu.event_id
+ "[%s %s] Found all missing prev_events",
+ room_id, event_id,
)
- elif prevs - seen:
+ elif missing_prevs:
logger.info(
- "Not fetching %d missing events for room %r,event %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id,
- list(prevs - seen)[:5],
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
)
if sent_to_us_directly and prevs - seen:
# If they have sent it to us directly, and the server
# isn't telling us about the auth events that it's
# made a message referencing, we explode
+ logger.warn(
+ "[%s %s] Failed to fetch %d prev events: rejecting",
+ room_id, event_id, len(prevs - seen),
+ )
raise FederationError(
"ERROR",
403,
@@ -270,15 +306,19 @@ class FederationHandler(BaseHandler):
auth_chains = set()
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
+ ours = yield self.store.get_state_groups(room_id, list(seen))
state_groups.append(ours)
# Ask the remote server for the states we don't
# know about
for p in prevs - seen:
+ logger.info(
+ "[%s %s] Requesting state at missing prev_event %s",
+ room_id, event_id, p,
+ )
state, got_auth_chain = (
yield self.federation_client.get_state_for_room(
- origin, pdu.room_id, p
+ origin, room_id, p,
)
)
auth_chains.update(got_auth_chain)
@@ -291,19 +331,24 @@ class FederationHandler(BaseHandler):
ev_ids, get_prev_content=False, check_redacted=False
)
- room_version = yield self.store.get_room_version(pdu.room_id)
+ room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {pdu.event_id: pdu}, fetch
+ room_version, state_groups, {event_id: pdu}, fetch
)
state = (yield self.store.get_events(state_map.values())).values()
auth_chain = list(auth_chains)
except Exception:
+ logger.warn(
+ "[%s %s] Error attempting to resolve state at missing "
+ "prev_events",
+ room_id, event_id, exc_info=True,
+ )
raise FederationError(
"ERROR",
403,
"We can't get valid state history.",
- affected=pdu.event_id,
+ affected=event_id,
)
yield self._process_received_pdu(
@@ -322,15 +367,16 @@ class FederationHandler(BaseHandler):
prevs (set(str)): List of event ids which we are missing
min_depth (int): Minimum depth of events to return.
"""
- # We recalculate seen, since it may have changed.
+
+ room_id = pdu.room_id
+ event_id = pdu.event_id
+
seen = yield self.store.have_seen_events(prevs)
if not prevs - seen:
return
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
+ latest = yield self.store.get_latest_event_ids_in_room(room_id)
# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
@@ -338,8 +384,8 @@ class FederationHandler(BaseHandler):
latest |= seen
logger.info(
- "Missing %d events for room %r pdu %s: %r...",
- len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
+ "[%s %s]: Requesting %d prev_events: %s",
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
)
# XXX: we set timeout to 10s to help workaround
@@ -392,7 +438,7 @@ class FederationHandler(BaseHandler):
missing_events = yield self.federation_client.get_missing_events(
origin,
- pdu.room_id,
+ room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
@@ -401,37 +447,46 @@ class FederationHandler(BaseHandler):
)
logger.info(
- "Got %d events: %r...",
- len(missing_events), [e.event_id for e in missing_events[:5]]
+ "[%s %s]: Got %d prev_events: %s",
+ room_id, event_id, len(missing_events), shortstr(missing_events),
)
# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)
- for e in missing_events:
- logger.info("Handling found event %s", e.event_id)
+ for ev in missing_events:
+ logger.info(
+ "[%s %s] Handling received prev_event %s",
+ room_id, event_id, ev.event_id,
+ )
try:
yield self.on_receive_pdu(
origin,
- e,
+ ev,
get_missing=False
)
except FederationError as e:
if e.code == 403:
- logger.warn("Event %s failed history check.")
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
else:
raise
- @log_function
@defer.inlineCallbacks
- def _process_received_pdu(self, origin, pdu, state, auth_chain):
+ def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler.
"""
- event = pdu
+ room_id = event.room_id
+ event_id = event.event_id
- logger.debug("Processing event: %s", event)
+ logger.debug(
+ "[%s %s] Processing event: %s",
+ room_id, event_id, event,
+ )
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
@@ -440,15 +495,16 @@ class FederationHandler(BaseHandler):
# event.
if state and auth_chain and not event.internal_metadata.is_outlier():
is_in_room = yield self.auth.check_host_in_room(
- event.room_id,
+ room_id,
self.server_name
)
else:
is_in_room = True
+
if not is_in_room:
logger.info(
- "Got event for room we're not in: %r %r",
- event.room_id, event.event_id
+ "[%s %s] Got event for room we're not in",
+ room_id, event_id,
)
try:
@@ -460,7 +516,7 @@ class FederationHandler(BaseHandler):
"ERROR",
e.code,
e.msg,
- affected=event.event_id,
+ affected=event_id,
)
else:
@@ -509,12 +565,12 @@ class FederationHandler(BaseHandler):
affected=event.event_id,
)
- room = yield self.store.get_room(event.room_id)
+ room = yield self.store.get_room(room_id)
if not room:
try:
yield self.store.store_room(
- room_id=event.room_id,
+ room_id=room_id,
room_creator_user_id="",
is_public=False,
)
@@ -542,7 +598,7 @@ class FederationHandler(BaseHandler):
if newly_joined:
user = UserID.from_string(event.state_key)
- yield self.user_joined_room(user, event.room_id)
+ yield self.user_joined_room(user, room_id)
@log_function
@defer.inlineCallbacks
@@ -1459,12 +1515,10 @@ class FederationHandler(BaseHandler):
else:
defer.returnValue(None)
- @log_function
def get_min_depth_for_context(self, context):
return self.store.get_min_depth(context)
@defer.inlineCallbacks
- @log_function
def _handle_new_event(self, origin, event, state=None, auth_events=None,
backfilled=False):
context = yield self._prep_event(
@@ -1664,8 +1718,8 @@ class FederationHandler(BaseHandler):
)
except AuthError as e:
logger.warn(
- "Rejecting %s because %s",
- event.event_id, e.msg
+ "[%s %s] Rejecting: %s",
+ event.room_id, event.event_id, e.msg
)
context.rejected = RejectedReason.AUTH_ERROR
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
else:
self.retry_interval = self.min_retry_interval
- logger.debug(
+ logger.info(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)
|