diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2020-10-08 17:05:01 +0100 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2020-10-08 17:05:01 +0100 |
commit | 23b50d6fb881db23324cac9f64cba33a1d3747b3 (patch) | |
tree | 160ac2f45363782b0bbc77dc845f410b18585199 /synapse/federation/federation_server.py | |
parent | Add `xyz.amorgan.knock` /versions string (diff) | |
parent | Merge tag 'v1.21.0rc3' into develop (diff) | |
download | synapse-23b50d6fb881db23324cac9f64cba33a1d3747b3.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into soru/knock
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 83 |
1 files changed, 66 insertions, 17 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 662325bab1..6035d2f664 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -22,13 +22,12 @@ from typing import ( Callable, Dict, List, - Match, Optional, Tuple, Union, ) -from prometheus_client import Counter, Histogram +from prometheus_client import Counter, Gauge, Histogram from twisted.internet import defer from twisted.internet.abstract import isIPAddress @@ -88,19 +87,32 @@ pdu_process_time = Histogram( ) +last_pdu_age_metric = Gauge( + "synapse_federation_last_received_pdu_age", + "The age (in seconds) of the last PDU successfully received from the given domain", + labelnames=("server_name",), +) + + class FederationServer(FederationBase): def __init__(self, hs): - super(FederationServer, self).__init__(hs) + super().__init__(hs) self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler self.state = hs.get_state_handler() self.device_handler = hs.get_device_handler() + self._federation_ratelimiter = hs.get_federation_ratelimiter() self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") + # We cache results for transaction with the same ID + self._transaction_resp_cache = ResponseCache( + hs, "fed_txn_handler", timeout_ms=30000 + ) + self.transaction_actions = TransactionActions(self.store) self.registry = hs.get_federation_registry() @@ -112,6 +124,10 @@ class FederationServer(FederationBase): hs, "state_ids_resp", timeout_ms=30000 ) + self._federation_metrics_domains = ( + hs.get_config().federation.federation_metrics_domains + ) + async def on_backfill_request( self, origin: str, room_id: str, versions: List[str], limit: int ) -> Tuple[int, Dict[str, Any]]: @@ -135,22 +151,44 @@ class FederationServer(FederationBase): request_time = self._clock.time_msec() transaction = Transaction(**transaction_data) + transaction_id = transaction.transaction_id # type: ignore - if not transaction.transaction_id: # type: ignore + if not transaction_id: raise Exception("Transaction missing transaction_id") - logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore + logger.debug("[%s] Got transaction", transaction_id) - # use a linearizer to ensure that we don't process the same transaction - # multiple times in parallel. - with ( - await self._transaction_linearizer.queue( - (origin, transaction.transaction_id) # type: ignore - ) - ): - result = await self._handle_incoming_transaction( - origin, transaction, request_time - ) + # We wrap in a ResponseCache so that we de-duplicate retried + # transactions. + return await self._transaction_resp_cache.wrap( + (origin, transaction_id), + self._on_incoming_transaction_inner, + origin, + transaction, + request_time, + ) + + async def _on_incoming_transaction_inner( + self, origin: str, transaction: Transaction, request_time: int + ) -> Tuple[int, Dict[str, Any]]: + # Use a linearizer to ensure that transactions from a remote are + # processed in order. + with await self._transaction_linearizer.queue(origin): + # We rate limit here *after* we've queued up the incoming requests, + # so that we don't fill up the ratelimiter with blocked requests. + # + # This is important as the ratelimiter allows N concurrent requests + # at a time, and only starts ratelimiting if there are more requests + # than that being processed at a time. If we queued up requests in + # the linearizer/response cache *after* the ratelimiting then those + # queued up requests would count as part of the allowed limit of N + # concurrent requests. + with self._federation_ratelimiter.ratelimit(origin) as d: + await d + + result = await self._handle_incoming_transaction( + origin, transaction, request_time + ) return result @@ -234,7 +272,11 @@ class FederationServer(FederationBase): pdus_by_room = {} # type: Dict[str, List[EventBase]] + newest_pdu_ts = 0 + for p in transaction.pdus: # type: ignore + # FIXME (richardv): I don't think this works: + # https://github.com/matrix-org/synapse/issues/8429 if "unsigned" in p: unsigned = p["unsigned"] if "age" in unsigned: @@ -272,6 +314,9 @@ class FederationServer(FederationBase): event = event_from_pdu_json(p, room_version) pdus_by_room.setdefault(room_id, []).append(event) + if event.origin_server_ts > newest_pdu_ts: + newest_pdu_ts = event.origin_server_ts + pdu_results = {} # we can process different rooms in parallel (which is useful if they @@ -312,6 +357,10 @@ class FederationServer(FederationBase): process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT ) + if newest_pdu_ts and origin in self._federation_metrics_domains: + newest_pdu_age = self._clock.time_msec() - newest_pdu_ts + last_pdu_age_metric.labels(server_name=origin).set(newest_pdu_age / 1000) + return pdu_results async def _handle_edus_in_txn(self, origin: str, transaction: Transaction): @@ -801,14 +850,14 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: return False -def _acl_entry_matches(server_name: str, acl_entry: str) -> Match: +def _acl_entry_matches(server_name: str, acl_entry: Any) -> bool: if not isinstance(acl_entry, str): logger.warning( "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry) ) return False regex = glob_to_regex(acl_entry) - return regex.match(server_name) + return bool(regex.match(server_name)) class FederationHandlerRegistry: |