diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 662325bab1..6035d2f664 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -22,13 +22,12 @@ from typing import (
Callable,
Dict,
List,
- Match,
Optional,
Tuple,
Union,
)
-from prometheus_client import Counter, Histogram
+from prometheus_client import Counter, Gauge, Histogram
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
@@ -88,19 +87,32 @@ pdu_process_time = Histogram(
)
+last_pdu_age_metric = Gauge(
+ "synapse_federation_last_received_pdu_age",
+ "The age (in seconds) of the last PDU successfully received from the given domain",
+ labelnames=("server_name",),
+)
+
+
class FederationServer(FederationBase):
def __init__(self, hs):
- super(FederationServer, self).__init__(hs)
+ super().__init__(hs)
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
self.device_handler = hs.get_device_handler()
+ self._federation_ratelimiter = hs.get_federation_ratelimiter()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
+ # We cache results for transaction with the same ID
+ self._transaction_resp_cache = ResponseCache(
+ hs, "fed_txn_handler", timeout_ms=30000
+ )
+
self.transaction_actions = TransactionActions(self.store)
self.registry = hs.get_federation_registry()
@@ -112,6 +124,10 @@ class FederationServer(FederationBase):
hs, "state_ids_resp", timeout_ms=30000
)
+ self._federation_metrics_domains = (
+ hs.get_config().federation.federation_metrics_domains
+ )
+
async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]:
@@ -135,22 +151,44 @@ class FederationServer(FederationBase):
request_time = self._clock.time_msec()
transaction = Transaction(**transaction_data)
+ transaction_id = transaction.transaction_id # type: ignore
- if not transaction.transaction_id: # type: ignore
+ if not transaction_id:
raise Exception("Transaction missing transaction_id")
- logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore
+ logger.debug("[%s] Got transaction", transaction_id)
- # use a linearizer to ensure that we don't process the same transaction
- # multiple times in parallel.
- with (
- await self._transaction_linearizer.queue(
- (origin, transaction.transaction_id) # type: ignore
- )
- ):
- result = await self._handle_incoming_transaction(
- origin, transaction, request_time
- )
+ # We wrap in a ResponseCache so that we de-duplicate retried
+ # transactions.
+ return await self._transaction_resp_cache.wrap(
+ (origin, transaction_id),
+ self._on_incoming_transaction_inner,
+ origin,
+ transaction,
+ request_time,
+ )
+
+ async def _on_incoming_transaction_inner(
+ self, origin: str, transaction: Transaction, request_time: int
+ ) -> Tuple[int, Dict[str, Any]]:
+ # Use a linearizer to ensure that transactions from a remote are
+ # processed in order.
+ with await self._transaction_linearizer.queue(origin):
+ # We rate limit here *after* we've queued up the incoming requests,
+ # so that we don't fill up the ratelimiter with blocked requests.
+ #
+ # This is important as the ratelimiter allows N concurrent requests
+ # at a time, and only starts ratelimiting if there are more requests
+ # than that being processed at a time. If we queued up requests in
+ # the linearizer/response cache *after* the ratelimiting then those
+ # queued up requests would count as part of the allowed limit of N
+ # concurrent requests.
+ with self._federation_ratelimiter.ratelimit(origin) as d:
+ await d
+
+ result = await self._handle_incoming_transaction(
+ origin, transaction, request_time
+ )
return result
@@ -234,7 +272,11 @@ class FederationServer(FederationBase):
pdus_by_room = {} # type: Dict[str, List[EventBase]]
+ newest_pdu_ts = 0
+
for p in transaction.pdus: # type: ignore
+ # FIXME (richardv): I don't think this works:
+ # https://github.com/matrix-org/synapse/issues/8429
if "unsigned" in p:
unsigned = p["unsigned"]
if "age" in unsigned:
@@ -272,6 +314,9 @@ class FederationServer(FederationBase):
event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
+ if event.origin_server_ts > newest_pdu_ts:
+ newest_pdu_ts = event.origin_server_ts
+
pdu_results = {}
# we can process different rooms in parallel (which is useful if they
@@ -312,6 +357,10 @@ class FederationServer(FederationBase):
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
)
+ if newest_pdu_ts and origin in self._federation_metrics_domains:
+ newest_pdu_age = self._clock.time_msec() - newest_pdu_ts
+ last_pdu_age_metric.labels(server_name=origin).set(newest_pdu_age / 1000)
+
return pdu_results
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
@@ -801,14 +850,14 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
return False
-def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
+def _acl_entry_matches(server_name: str, acl_entry: Any) -> bool:
if not isinstance(acl_entry, str):
logger.warning(
"Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
)
return False
regex = glob_to_regex(acl_entry)
- return regex.match(server_name)
+ return bool(regex.match(server_name))
class FederationHandlerRegistry:
|