summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2021-03-12 15:08:03 +0000
committerGitHub <noreply@github.com>2021-03-12 15:08:03 +0000
commit2b328d7e0233bc331ce567e593b4c2f94a7d6e2e (patch)
treea1abdbbf5590c80eb80fe62dfc138f27dab52ee4
parentAdd logging for redis connection setup (#9590) (diff)
downloadsynapse-2b328d7e0233bc331ce567e593b4c2f94a7d6e2e.tar.xz
Improve logging when processing incoming transactions (#9596)
Put the room id in the logcontext, to make it easier to understand what's going on.
-rw-r--r--changelog.d/9596.misc1
-rw-r--r--synapse/federation/federation_server.py61
-rw-r--r--synapse/handlers/federation.py62
3 files changed, 51 insertions, 73 deletions
diff --git a/changelog.d/9596.misc b/changelog.d/9596.misc
new file mode 100644
index 0000000000..fc19a95f75
--- /dev/null
+++ b/changelog.d/9596.misc
@@ -0,0 +1 @@
+Improve logging when processing incoming transactions.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 06c5e7a9e0..89fe6def4b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -335,34 +335,41 @@ class FederationServer(FederationBase):
         # impose a limit to avoid going too crazy with ram/cpu.
 
         async def process_pdus_for_room(room_id: str):
-            logger.debug("Processing PDUs for %s", room_id)
-            try:
-                await self.check_server_matches_acl(origin_host, room_id)
-            except AuthError as e:
-                logger.warning("Ignoring PDUs for room %s from banned server", room_id)
-                for pdu in pdus_by_room[room_id]:
-                    event_id = pdu.event_id
-                    pdu_results[event_id] = e.error_dict()
-                return
+            with nested_logging_context(room_id):
+                logger.debug("Processing PDUs for %s", room_id)
 
-            for pdu in pdus_by_room[room_id]:
-                event_id = pdu.event_id
-                with pdu_process_time.time():
-                    with nested_logging_context(event_id):
-                        try:
-                            await self._handle_received_pdu(origin, pdu)
-                            pdu_results[event_id] = {}
-                        except FederationError as e:
-                            logger.warning("Error handling PDU %s: %s", event_id, e)
-                            pdu_results[event_id] = {"error": str(e)}
-                        except Exception as e:
-                            f = failure.Failure()
-                            pdu_results[event_id] = {"error": str(e)}
-                            logger.error(
-                                "Failed to handle PDU %s",
-                                event_id,
-                                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
-                            )
+                try:
+                    await self.check_server_matches_acl(origin_host, room_id)
+                except AuthError as e:
+                    logger.warning(
+                        "Ignoring PDUs for room %s from banned server", room_id
+                    )
+                    for pdu in pdus_by_room[room_id]:
+                        event_id = pdu.event_id
+                        pdu_results[event_id] = e.error_dict()
+                    return
+
+                for pdu in pdus_by_room[room_id]:
+                    pdu_results[pdu.event_id] = await process_pdu(pdu)
+
+        async def process_pdu(pdu: EventBase) -> JsonDict:
+            event_id = pdu.event_id
+            with pdu_process_time.time():
+                with nested_logging_context(event_id):
+                    try:
+                        await self._handle_received_pdu(origin, pdu)
+                        return {}
+                    except FederationError as e:
+                        logger.warning("Error handling PDU %s: %s", event_id, e)
+                        return {"error": str(e)}
+                    except Exception as e:
+                        f = failure.Failure()
+                        logger.error(
+                            "Failed to handle PDU %s",
+                            event_id,
+                            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                        )
+                        return {"error": str(e)}
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3fe02b7195..1d20c441f3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -201,7 +201,7 @@ class FederationHandler(BaseHandler):
             or pdu.internal_metadata.is_outlier()
         )
         if already_seen:
-            logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
+            logger.debug("Already seen pdu")
             return
 
         # do some initial sanity-checking of the event. In particular, make
@@ -210,18 +210,14 @@ class FederationHandler(BaseHandler):
         try:
             self._sanity_check_event(pdu)
         except SynapseError as err:
-            logger.warning(
-                "[%s %s] Received event failed sanity checks", room_id, event_id
-            )
+            logger.warning("Received event failed sanity checks")
             raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
         if room_id in self.room_queues:
             logger.info(
-                "[%s %s] Queuing PDU from %s for now: join in progress",
-                room_id,
-                event_id,
+                "Queuing PDU from %s for now: join in progress",
                 origin,
             )
             self.room_queues[room_id].append((pdu, origin))
@@ -236,9 +232,7 @@ class FederationHandler(BaseHandler):
         is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
         if not is_in_room:
             logger.info(
-                "[%s %s] Ignoring PDU from %s as we're not in the room",
-                room_id,
-                event_id,
+                "Ignoring PDU from %s as we're not in the room",
                 origin,
             )
             return None
@@ -250,7 +244,7 @@ class FederationHandler(BaseHandler):
             # We only backfill backwards to the min depth.
             min_depth = await self.get_min_depth_for_context(pdu.room_id)
 
-            logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
+            logger.debug("min_depth: %d", min_depth)
 
             prevs = set(pdu.prev_event_ids())
             seen = await self.store.have_events_in_timeline(prevs)
@@ -267,17 +261,13 @@ class FederationHandler(BaseHandler):
                     # If we're missing stuff, ensure we only fetch stuff one
                     # at a time.
                     logger.info(
-                        "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
-                        room_id,
-                        event_id,
+                        "Acquiring room lock to fetch %d missing prev_events: %s",
                         len(missing_prevs),
                         shortstr(missing_prevs),
                     )
                     with (await self._room_pdu_linearizer.queue(pdu.room_id)):
                         logger.info(
-                            "[%s %s] Acquired room lock to fetch %d missing prev_events",
-                            room_id,
-                            event_id,
+                            "Acquired room lock to fetch %d missing prev_events",
                             len(missing_prevs),
                         )
 
@@ -297,9 +287,7 @@ class FederationHandler(BaseHandler):
 
                         if not prevs - seen:
                             logger.info(
-                                "[%s %s] Found all missing prev_events",
-                                room_id,
-                                event_id,
+                                "Found all missing prev_events",
                             )
 
             if prevs - seen:
@@ -329,9 +317,7 @@ class FederationHandler(BaseHandler):
 
                 if sent_to_us_directly:
                     logger.warning(
-                        "[%s %s] Rejecting: failed to fetch %d prev events: %s",
-                        room_id,
-                        event_id,
+                        "Rejecting: failed to fetch %d prev events: %s",
                         len(prevs - seen),
                         shortstr(prevs - seen),
                     )
@@ -414,10 +400,7 @@ class FederationHandler(BaseHandler):
                     state = [event_map[e] for e in state_map.values()]
                 except Exception:
                     logger.warning(
-                        "[%s %s] Error attempting to resolve state at missing "
-                        "prev_events",
-                        room_id,
-                        event_id,
+                        "Error attempting to resolve state at missing " "prev_events",
                         exc_info=True,
                     )
                     raise FederationError(
@@ -454,9 +437,7 @@ class FederationHandler(BaseHandler):
         latest |= seen
 
         logger.info(
-            "[%s %s]: Requesting missing events between %s and %s",
-            room_id,
-            event_id,
+            "Requesting missing events between %s and %s",
             shortstr(latest),
             event_id,
         )
@@ -523,15 +504,11 @@ class FederationHandler(BaseHandler):
             # We failed to get the missing events, but since we need to handle
             # the case of `get_missing_events` not returning the necessary
             # events anyway, it is safe to simply log the error and continue.
-            logger.warning(
-                "[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
-            )
+            logger.warning("Failed to get prev_events: %s", e)
             return
 
         logger.info(
-            "[%s %s]: Got %d prev_events: %s",
-            room_id,
-            event_id,
+            "Got %d prev_events: %s",
             len(missing_events),
             shortstr(missing_events),
         )
@@ -542,9 +519,7 @@ class FederationHandler(BaseHandler):
 
         for ev in missing_events:
             logger.info(
-                "[%s %s] Handling received prev_event %s",
-                room_id,
-                event_id,
+                "Handling received prev_event %s",
                 ev.event_id,
             )
             with nested_logging_context(ev.event_id):
@@ -553,9 +528,7 @@ class FederationHandler(BaseHandler):
                 except FederationError as e:
                     if e.code == 403:
                         logger.warning(
-                            "[%s %s] Received prev_event %s failed history check.",
-                            room_id,
-                            event_id,
+                            "Received prev_event %s failed history check.",
                             ev.event_id,
                         )
                     else:
@@ -707,10 +680,7 @@ class FederationHandler(BaseHandler):
                 (ie, we are missing one or more prev_events), the resolved state at the
                 event
         """
-        room_id = event.room_id
-        event_id = event.event_id
-
-        logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
+        logger.debug("Processing event: %s", event)
 
         try:
             await self._handle_new_event(origin, event, state=state)