diff options
author | Erik Johnston <erik@matrix.org> | 2015-02-19 10:38:48 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-02-19 10:38:48 +0000 |
commit | 8321e8a2e0381f52f3f434223db58f6ea280d89e (patch) | |
tree | 13c5600cbeb56c7c2837dd2df329f10a239f91ac /synapse/federation/federation_client.py | |
parent | Merge pull request #73 from matrix-org/hotfixes-v0.7.0f (diff) | |
parent | Update release date (diff) | |
download | synapse-8321e8a2e0381f52f3f434223db58f6ea280d89e.tar.xz |
Merge branch 'release-v0.7.1' of github.com:matrix-org/synapse
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r-- | synapse/federation/federation_client.py | 79 |
1 files changed, 64 insertions, 15 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..cd3c962d50 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,10 +19,13 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException +from synapse.api.errors import CodeMessageException, SynapseError +from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination + import logging @@ -30,6 +33,20 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): + def __init__(self): + self._get_pdu_cache = None + + def start_get_pdu_cache(self): + self._get_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120*1000, + reset_expiry_on_get=False, + ) + + self._get_pdu_cache.start() + @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -160,29 +177,58 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. + if self._get_pdu_cache: + e = self._get_pdu_cache.get(event_id) + if e: + defer.returnValue(e) + pdu = None for destination in destinations: try: - transaction_data = yield self.transport_layer.get_event( - destination, event_id + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - logger.debug("transaction_data %r", transaction_data) + with limiter: + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + logger.debug("transaction_data %r", transaction_data) - if pdu_list: - pdu = pdu_list[0] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(pdu) + if pdu_list: + pdu = pdu_list[0] - break - except CodeMessageException: - raise + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + + except SynapseError: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except NotRetryingDestination as e: + logger.info(e.message) + continue except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -190,6 +236,9 @@ class FederationClient(FederationBase): ) continue + if self._get_pdu_cache is not None: + self._get_pdu_cache[event_id] = pdu + defer.returnValue(pdu) @defer.inlineCallbacks |