summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-06-30 12:07:16 +0100
committerGitHub <noreply@github.com>2021-06-30 12:07:16 +0100
commit329ef5c715d81b538e8b071de046c698a82eae10 (patch)
tree69b4853a47342f7a42c3ef493cc99be7ccbd044d /synapse/federation/federation_server.py
parentMerge branch 'release-v1.37' into develop (diff)
downloadsynapse-329ef5c715d81b538e8b071de046c698a82eae10.tar.xz
Fix the inbound PDU metric (#10279)
This broke in #10272
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py37
1 files changed, 20 insertions, 17 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 742d29291e..e93b7577fe 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -369,22 +369,21 @@ class FederationServer(FederationBase):
 
         async def process_pdu(pdu: EventBase) -> JsonDict:
             event_id = pdu.event_id
-            with pdu_process_time.time():
-                with nested_logging_context(event_id):
-                    try:
-                        await self._handle_received_pdu(origin, pdu)
-                        return {}
-                    except FederationError as e:
-                        logger.warning("Error handling PDU %s: %s", event_id, e)
-                        return {"error": str(e)}
-                    except Exception as e:
-                        f = failure.Failure()
-                        logger.error(
-                            "Failed to handle PDU %s",
-                            event_id,
-                            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
-                        )
-                        return {"error": str(e)}
+            with nested_logging_context(event_id):
+                try:
+                    await self._handle_received_pdu(origin, pdu)
+                    return {}
+                except FederationError as e:
+                    logger.warning("Error handling PDU %s: %s", event_id, e)
+                    return {"error": str(e)}
+                except Exception as e:
+                    f = failure.Failure()
+                    logger.error(
+                        "Failed to handle PDU %s",
+                        event_id,
+                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                    )
+                    return {"error": str(e)}
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -932,9 +931,13 @@ class FederationServer(FederationBase):
                         exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
                     )
 
-                await self.store.remove_received_event_from_staging(
+                received_ts = await self.store.remove_received_event_from_staging(
                     origin, event.event_id
                 )
+                if received_ts is not None:
+                    pdu_process_time.observe(
+                        (self._clock.time_msec() - received_ts) / 1000
+                    )
 
             # We need to do this check outside the lock to avoid a race between
             # a new event being inserted by another instance and it attempting