diff options
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r-- | synapse/federation/federation_client.py | 97 |
1 files changed, 46 insertions, 51 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 861441708b..62d7ed13cf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -14,36 +14,35 @@ # limitations under the License. +import copy +import itertools +import logging +import random + +from six.moves import range + +from prometheus_client import Counter + from twisted.internet import defer -from .federation_base import FederationBase from synapse.api.constants import Membership - from synapse.api.errors import ( - CodeMessageException, HttpResponseException, SynapseError, + CodeMessageException, + FederationDeniedError, + HttpResponseException, + SynapseError, ) -from synapse.util import unwrapFirstError +from synapse.events import builder +from synapse.federation.federation_base import FederationBase, event_from_pdu_json +from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred -from synapse.events import FrozenEvent, builder -import synapse.metrics - from synapse.util.retryutils import NotRetryingDestination -import copy -import itertools -import logging -import random - - logger = logging.getLogger(__name__) - -# synapse.federation.federation_client is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.client") - -sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) PDU_RETRY_TIME_MS = 1 * 60 * 1000 @@ -58,6 +57,7 @@ class FederationClient(FederationBase): self._clear_tried_cache, 60 * 1000, ) self.state = hs.get_state_handler() + self.transport_layer = hs.get_federation_transport_client() def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" @@ -105,7 +105,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc(query_type) + sent_queries_counter.labels(query_type).inc() return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, @@ -124,7 +124,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_device_keys") + sent_queries_counter.labels("client_device_keys").inc() return self.transport_layer.query_client_keys( destination, content, timeout ) @@ -134,7 +134,7 @@ class FederationClient(FederationBase): """Query the device keys for a list of user ids hosted on a remote server. """ - sent_queries_counter.inc("user_devices") + sent_queries_counter.labels("user_devices").inc() return self.transport_layer.query_user_devices( destination, user_id, timeout ) @@ -151,7 +151,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_one_time_keys") + sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys( destination, content, timeout ) @@ -184,15 +184,15 @@ class FederationClient(FederationBase): logger.debug("backfill transaction_data=%s", repr(transaction_data)) pdus = [ - self.event_from_pdu_json(p, outlier=False) + event_from_pdu_json(p, outlier=False) for p in transaction_data["pdus"] ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield preserve_context_over_deferred(defer.gatherResults( + pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults( self._check_sigs_and_hashes(pdus), consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(pdus) @@ -244,7 +244,7 @@ class FederationClient(FederationBase): logger.debug("transaction_data %r", transaction_data) pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) + event_from_pdu_json(p, outlier=outlier) for p in transaction_data["pdus"] ] @@ -252,7 +252,7 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] + signed_pdu = yield self._check_sigs_and_hash(pdu) break @@ -266,6 +266,9 @@ class FederationClient(FederationBase): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e.message) + continue except Exception as e: pdu_attempts[destination] = now @@ -336,11 +339,11 @@ class FederationClient(FederationBase): ) pdus = [ - self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] + event_from_pdu_json(p, outlier=True) for p in result["pdus"] ] auth_chain = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in result.get("auth_chain", []) ] @@ -388,9 +391,9 @@ class FederationClient(FederationBase): """ if return_local: seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = seen_events.values() + signed_events = list(seen_events.values()) else: - seen_events = yield self.store.have_events(event_ids) + seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] failed_to_fetch = set() @@ -409,18 +412,19 @@ class FederationClient(FederationBase): batch_size = 20 missing_events = list(missing_events) - for i in xrange(0, len(missing_events), batch_size): + for i in range(0, len(missing_events), batch_size): batch = set(missing_events[i:i + batch_size]) deferreds = [ - preserve_fn(self.get_pdu)( + run_in_background( + self.get_pdu, destinations=random_server_list(), event_id=e_id, ) for e_id in batch ] - res = yield preserve_context_over_deferred( + res = yield make_deferred_yieldable( defer.DeferredList(deferreds, consumeErrors=True) ) for success, result in res: @@ -441,7 +445,7 @@ class FederationClient(FederationBase): ) auth_chain = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in res["auth_chain"] ] @@ -570,12 +574,12 @@ class FederationClient(FederationBase): logger.debug("Got content: %s", content) state = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in content.get("state", []) ] auth_chain = [ - self.event_from_pdu_json(p, outlier=True) + event_from_pdu_json(p, outlier=True) for p in content.get("auth_chain", []) ] @@ -585,7 +589,7 @@ class FederationClient(FederationBase): } valid_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus.values(), + destination, list(pdus.values()), outlier=True, ) @@ -650,7 +654,7 @@ class FederationClient(FederationBase): logger.debug("Got response to send_invite: %s", pdu_dict) - pdu = self.event_from_pdu_json(pdu_dict) + pdu = event_from_pdu_json(pdu_dict) # Check signatures are correct. pdu = yield self._check_sigs_and_hash(pdu) @@ -740,7 +744,7 @@ class FederationClient(FederationBase): ) auth_chain = [ - self.event_from_pdu_json(e) + event_from_pdu_json(e) for e in content["auth_chain"] ] @@ -788,7 +792,7 @@ class FederationClient(FederationBase): ) events = [ - self.event_from_pdu_json(e) + event_from_pdu_json(e) for e in content.get("events", []) ] @@ -805,15 +809,6 @@ class FederationClient(FederationBase): defer.returnValue(signed_events) - def event_from_pdu_json(self, pdu_json, outlier=False): - event = FrozenEvent( - pdu_json - ) - - event.internal_metadata.outlier = outlier - - return event - @defer.inlineCallbacks def forward_third_party_invite(self, destinations, room_id, event_dict): for destination in destinations: |