summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_server.py61
1 files changed, 34 insertions, 27 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 06c5e7a9e0..89fe6def4b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -335,34 +335,41 @@ class FederationServer(FederationBase):
         # impose a limit to avoid going too crazy with ram/cpu.
 
         async def process_pdus_for_room(room_id: str):
-            logger.debug("Processing PDUs for %s", room_id)
-            try:
-                await self.check_server_matches_acl(origin_host, room_id)
-            except AuthError as e:
-                logger.warning("Ignoring PDUs for room %s from banned server", room_id)
-                for pdu in pdus_by_room[room_id]:
-                    event_id = pdu.event_id
-                    pdu_results[event_id] = e.error_dict()
-                return
+            with nested_logging_context(room_id):
+                logger.debug("Processing PDUs for %s", room_id)
 
-            for pdu in pdus_by_room[room_id]:
-                event_id = pdu.event_id
-                with pdu_process_time.time():
-                    with nested_logging_context(event_id):
-                        try:
-                            await self._handle_received_pdu(origin, pdu)
-                            pdu_results[event_id] = {}
-                        except FederationError as e:
-                            logger.warning("Error handling PDU %s: %s", event_id, e)
-                            pdu_results[event_id] = {"error": str(e)}
-                        except Exception as e:
-                            f = failure.Failure()
-                            pdu_results[event_id] = {"error": str(e)}
-                            logger.error(
-                                "Failed to handle PDU %s",
-                                event_id,
-                                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
-                            )
+                try:
+                    await self.check_server_matches_acl(origin_host, room_id)
+                except AuthError as e:
+                    logger.warning(
+                        "Ignoring PDUs for room %s from banned server", room_id
+                    )
+                    for pdu in pdus_by_room[room_id]:
+                        event_id = pdu.event_id
+                        pdu_results[event_id] = e.error_dict()
+                    return
+
+                for pdu in pdus_by_room[room_id]:
+                    pdu_results[pdu.event_id] = await process_pdu(pdu)
+
+        async def process_pdu(pdu: EventBase) -> JsonDict:
+            event_id = pdu.event_id
+            with pdu_process_time.time():
+                with nested_logging_context(event_id):
+                    try:
+                        await self._handle_received_pdu(origin, pdu)
+                        return {}
+                    except FederationError as e:
+                        logger.warning("Error handling PDU %s: %s", event_id, e)
+                        return {"error": str(e)}
+                    except Exception as e:
+                        f = failure.Failure()
+                        logger.error(
+                            "Failed to handle PDU %s",
+                            event_id,
+                            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                        )
+                        return {"error": str(e)}
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT