diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a8a2ee46..e704cf2f44 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -17,11 +17,8 @@
import logging
from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
-import six
-from six import iteritems
-
from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
@@ -73,6 +70,10 @@ received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
+pdu_process_time = Histogram(
+ "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
class FederationServer(FederationBase):
def __init__(self, hs):
@@ -274,21 +275,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- with nested_logging_context(event_id):
- try:
- await self._handle_received_pdu(origin, pdu)
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warning("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s",
- event_id,
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
+ with pdu_process_time.time():
+ with nested_logging_context(event_id):
+ try:
+ await self._handle_received_pdu(origin, pdu)
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warning("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s",
+ event_id,
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
await concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -534,9 +536,9 @@ class FederationServer(FederationBase):
",".join(
(
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in iteritems(json_result)
- for device_id, device_keys in iteritems(user_keys)
- for key_id, _ in iteritems(device_keys)
+ for user_id, user_keys in json_result.items()
+ for device_id, device_keys in user_keys.items()
+ for key_id, _ in device_keys.items()
)
),
)
@@ -752,7 +754,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
- if not isinstance(acl_entry, six.string_types):
+ if not isinstance(acl_entry, str):
logger.warning(
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
)
|