diff options
author | Erik Johnston <erik@matrix.org> | 2021-06-30 12:07:16 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-30 12:07:16 +0100 |
commit | 329ef5c715d81b538e8b071de046c698a82eae10 (patch) | |
tree | 69b4853a47342f7a42c3ef493cc99be7ccbd044d /synapse/federation | |
parent | Merge branch 'release-v1.37' into develop (diff) | |
download | synapse-329ef5c715d81b538e8b071de046c698a82eae10.tar.xz |
Fix the inbound PDU metric (#10279)
This broke in #10272
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_server.py | 37 |
1 files changed, 20 insertions, 17 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 742d29291e..e93b7577fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -369,22 +369,21 @@ class FederationServer(FederationBase): async def process_pdu(pdu: EventBase) -> JsonDict: event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - return {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - return {"error": str(e)} - except Exception as e: - f = failure.Failure() - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) - return {"error": str(e)} + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -932,9 +931,13 @@ class FederationServer(FederationBase): exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) - await self.store.remove_received_event_from_staging( + received_ts = await self.store.remove_received_event_from_staging( origin, event.event_id ) + if received_ts is not None: + pdu_process_time.observe( + (self._clock.time_msec() - received_ts) / 1000 + ) # We need to do this check outside the lock to avoid a race between # a new event being inserted by another instance and it attempting |