summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py37
1 files changed, 20 insertions, 17 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 742d29291e..e93b7577fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -369,22 +369,21 @@ class FederationServer(FederationBase):
 
         async def process_pdu(pdu: EventBase) -> JsonDict:
             event_id = pdu.event_id
-            with pdu_process_time.time():
-                with nested_logging_context(event_id):
-                    try:
-                        await self._handle_received_pdu(origin, pdu)
-                        return {}
-                    except FederationError as e:
-                        logger.warning("Error handling PDU %s: %s", event_id, e)
-                        return {"error": str(e)}
-                    except Exception as e:
-                        f = failure.Failure()
-                        logger.error(
-                            "Failed to handle PDU %s",
-                            event_id,
-                            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
-                        )
-                        return {"error": str(e)}
+            with nested_logging_context(event_id):
+                try:
+                    await self._handle_received_pdu(origin, pdu)
+                    return {}
+                except FederationError as e:
+                    logger.warning("Error handling PDU %s: %s", event_id, e)
+                    return {"error": str(e)}
+                except Exception as e:
+                    f = failure.Failure()
+                    logger.error(
+                        "Failed to handle PDU %s",
+                        event_id,
+                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                    )
+                    return {"error": str(e)}
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -932,9 +931,13 @@ class FederationServer(FederationBase):
                         exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
                     )
 
-                await self.store.remove_received_event_from_staging(
+                received_ts = await self.store.remove_received_event_from_staging(
                     origin, event.event_id
                 )
+                if received_ts is not None:
+                    pdu_process_time.observe(
+                        (self._clock.time_msec() - received_ts) / 1000
+                    )
 
             # We need to do this check outside the lock to avoid a race between
             # a new event being inserted by another instance and it attempting