From 61385846510ceed1bff571dde1233a70e02b4748 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 14:08:02 +0000 Subject: Don't return anything from _handle_new_pdu, since we ignore the return value anyway --- synapse/federation/federation_server.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9f5c98694c..7ca8d49c54 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -331,7 +331,6 @@ class FederationServer(FederationBase): ) if already_seen: logger.debug("Already seen pdu %s", pdu.event_id) - defer.returnValue({}) return # Check signature. @@ -418,7 +417,7 @@ class FederationServer(FederationBase): except: logger.warn("Failed to get state for event: %s", pdu.event_id) - ret = yield self.handler.on_receive_pdu( + yield self.handler.on_receive_pdu( origin, pdu, backfilled=False, @@ -426,8 +425,6 @@ class FederationServer(FederationBase): auth_chain=auth_chain, ) - defer.returnValue(ret) - def __str__(self): return "" % self.server_name -- cgit 1.5.1 From 91fc5eef1deb2f756d16f3d5ea20ede8d967f6df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 14:27:40 +0000 Subject: Mark old events as outliers. This is to fix the issue where if a remote server sends an event that references a really "old" event, then the local server will pull that in and send to all clients. We decide if an event is old if its depth is less than the minimum depth of the room. --- synapse/federation/federation_server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ca8d49c54..4391a60c02 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -366,7 +366,13 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) - if min_depth and pdu.depth > min_depth and max_recursion > 0: + if min_depth and pdu.depth < min_depth: + # This is so that we don't notify the user about this + # message, to work around the fact that some events will + # reference really really old events we really don't want to + # send to the clients. + pdu.internal_metadata.outlier = True + elif min_depth and pdu.depth > min_depth and max_recursion > 0: for event_id, hashes in pdu.prev_events: if event_id not in have_seen: logger.debug( -- cgit 1.5.1 From baa5b9a97582d4b3c825be1225aba7863230c047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 18:02:39 +0000 Subject: Cache results of get_pdu. --- synapse/federation/federation_client.py | 42 ++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..83b4947b99 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,7 +19,8 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException +from synapse.api.errors import CodeMessageException, SynapseError +from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -30,6 +31,20 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): + def __init__(self): + self._fail_fetch_pdu_cache = None + + def start_pdu_fail_cache(self): + self._fail_fetch_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120*1000, + reset_expiry_on_get=False, + ) + + self._fail_fetch_pdu_cache.start() + @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -160,6 +175,11 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. + if self._fail_fetch_pdu_cache: + e = self._fail_fetch_pdu_cache.get(event_id) + if e: + defer.returnValue(e) + pdu = None for destination in destinations: try: @@ -181,8 +201,21 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - except CodeMessageException: - raise + except SynapseError: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -190,6 +223,9 @@ class FederationClient(FederationBase): ) continue + if self._fail_fetch_pdu_cache is not None: + self._fail_fetch_pdu_cache[event_id] = pdu + defer.returnValue(pdu) @defer.inlineCallbacks -- cgit 1.5.1 From 72a4de2ce627528a13bda480403344ffde6275d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 10:03:23 +0000 Subject: Use consumeErrors=True on all DeferredLists. This is so that the DeferredLists actually consume the error instead of propogating down the non-existent errback chain. This should reduce the number of unhandled errors we are seeing. --- synapse/federation/federation_server.py | 2 +- synapse/federation/transaction_queue.py | 2 +- synapse/handlers/presence.py | 8 ++++---- synapse/notifier.py | 6 ++++-- synapse/storage/roommember.py | 2 +- 5 files changed, 11 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9f5c98694c..e94d0411b4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -124,7 +124,7 @@ class FederationServer(FederationBase): edu.content ) - results = yield defer.DeferredList(dl) + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] for r in results: diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 731019ad9f..bb20f2ebab 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -98,7 +98,7 @@ class TransactionQueue(object): deferreds.append(deferred) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) # NO inlineCallbacks def enqueue_edu(self, edu): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 59287010ed..8ef248ecf2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -492,7 +492,7 @@ class PresenceHandler(BaseHandler): user, domain, remoteusers )) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): target_localpart = target_user.localpart @@ -548,7 +548,7 @@ class PresenceHandler(BaseHandler): self._stop_polling_remote(user, domain, remoteusers) ) - return defer.DeferredList(deferreds) + return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): for localpart in self._local_pushmap.keys(): @@ -729,7 +729,7 @@ class PresenceHandler(BaseHandler): del self._remote_sendmap[user] with PreserveLoggingContext(): - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, @@ -768,7 +768,7 @@ class PresenceHandler(BaseHandler): ) ) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) defer.returnValue((localusers, remote_domains)) diff --git a/synapse/notifier.py b/synapse/notifier.py index e3b6ead620..f5a394596d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -135,7 +135,8 @@ class Notifier(object): with PreserveLoggingContext(): yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) @defer.inlineCallbacks @@ -203,7 +204,8 @@ class Notifier(object): with PreserveLoggingContext(): yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) @defer.inlineCallbacks diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 779f9ce544..9bf608bc90 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -288,7 +288,7 @@ class RoomMemberStore(SQLBaseStore): deferreds = [self.get_rooms_for_user(u) for u in user_id_list] - results = yield defer.DeferredList(deferreds) + results = yield defer.DeferredList(deferreds, consumeErrors=True) # A list of sets of strings giving room IDs for each user room_id_lists = [set([r.room_id for r in result[1]]) for result in results] -- cgit 1.5.1 From 02bfa889de1990eb8dc47770834bef777252dc82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:08:27 +0000 Subject: Handle recieving failures in transactions --- synapse/federation/federation_server.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e94d0411b4..34f5b17446 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -114,7 +114,15 @@ class FederationServer(FederationBase): with PreserveLoggingContext(): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + d = self._handle_new_pdu(transaction.origin, pdu) + + def handle_failure(failure): + failure.trap(FederationError) + self.enqueue_failure(failure.value, transaction.origin) + + d.addErrback(handle_failure) + + dl.append(d) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -124,6 +132,9 @@ class FederationServer(FederationBase): edu.content ) + for failure in getattr(transaction, "failures", []): + logger.info("Got failure %r", failure) + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] -- cgit 1.5.1 From c82e26ad4ba80742adc68a7e8c52441d760e4746 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:24:13 +0000 Subject: Actually respond with JSON to incoming transaction --- synapse/federation/federation_server.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34f5b17446..c48a41b5d5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -147,6 +147,8 @@ class FederationServer(FederationBase): logger.debug("Returning: %s", str(ret)) + response = ret + yield self.transaction_actions.set_response( transaction, 200, response -- cgit 1.5.1 From 659ead082ff11842fea803e44d75667e0ca38d71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:58:52 +0000 Subject: Format the response of transaction request in a nicer way --- synapse/federation/federation_server.py | 19 +++++++++++++++---- synapse/federation/transaction_queue.py | 22 ++++++++++++++++++++-- 2 files changed, 35 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c48a41b5d5..d1ec0b9eac 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -118,7 +118,7 @@ class FederationServer(FederationBase): def handle_failure(failure): failure.trap(FederationError) - self.enqueue_failure(failure.value, transaction.origin) + self.send_failure(failure.value, transaction.origin) d.addErrback(handle_failure) @@ -132,7 +132,7 @@ class FederationServer(FederationBase): edu.content ) - for failure in getattr(transaction, "failures", []): + for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) results = yield defer.DeferredList(dl, consumeErrors=True) @@ -143,11 +143,15 @@ class FederationServer(FederationBase): ret.append({}) else: logger.exception(r[1]) - ret.append({"error": str(r[1])}) + ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) - response = ret + response = { + "pdus": dict(zip( + (p.event_id for p in pdu_list), ret + )), + } yield self.transaction_actions.set_response( transaction, @@ -358,6 +362,13 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) + raise FederationError( + "ERROR", + 403, + "Forbidden", + affected=pdu.event_id, + ) + state = None auth_chain = [] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb20f2ebab..6faaa066fb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -91,7 +91,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send pdu", failure) + logger.warn("Failed to send pdu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -116,7 +116,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send edu", failure) + logger.warn("Failed to send edu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -133,6 +133,15 @@ class TransactionQueue(object): (failure, deferred) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send failure", failure.value) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + yield deferred @defer.inlineCallbacks @@ -249,6 +258,15 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 + + if response: + for e_id, r in getattr(response, "pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: code = e.code response = e.response -- cgit 1.5.1 From 676e8ee78ae3f51cbc0113c8d810d4bc1e81cdab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:22:45 +0000 Subject: Remove debug raise --- synapse/federation/federation_server.py | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 679fb141e7..22b9663831 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -361,13 +361,6 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - raise FederationError( - "ERROR", - 403, - "Forbidden", - affected=pdu.event_id, - ) - state = None auth_chain = [] -- cgit 1.5.1 From 2b8f1a956c6a1d767a3b60e84e7d0afe5857fb0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 17:20:56 +0000 Subject: Add per server retry limiting. Factor out the pre destination retry logic from TransactionQueue so it can be reused in both get_pdu and crypto.keyring --- synapse/crypto/keyring.py | 22 ++--- synapse/federation/federation_client.py | 36 ++++--- synapse/federation/transaction_queue.py | 161 +++++++++++++------------------- synapse/util/retryutils.py | 108 +++++++++++++++++++++ 4 files changed, 205 insertions(+), 122 deletions(-) create mode 100644 synapse/util/retryutils.py (limited to 'synapse/federation') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 3250cff595..ea00c830c0 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -22,6 +22,8 @@ from syutil.crypto.signing_key import ( from syutil.base64util import decode_base64, encode_base64 from synapse.api.errors import SynapseError, Codes +from synapse.util.retryutils import get_retry_limiter + from OpenSSL import crypto import logging @@ -88,19 +90,13 @@ class Keyring(object): # Try to fetch the key from the remote server. - retry_last_ts, retry_interval = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - server_name + limiter = yield get_retry_limiter( + server_name, + self.clock, + self.store, ) - if retry_timings: - retry_last_ts, retry_interval = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self.clock.time_msec()): - logger.info("%s not ready for retry", server_name) - raise ValueError("No verification key found for given key ids") - try: + with limiter: (response, tls_certificate) = yield fetch_server_key( server_name, self.hs.tls_context_factory ) @@ -165,7 +161,3 @@ class Keyring(object): return raise ValueError("No verification key found for given key ids") - - except: - self.set_retrying(server_name, retry_interval) - raise diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..c5b0274c2f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,6 +23,8 @@ from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination + import logging @@ -163,24 +165,34 @@ class FederationClient(FederationBase): pdu = None for destination in destinations: try: - transaction_data = yield self.transport_layer.get_event( - destination, event_id + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - logger.debug("transaction_data %r", transaction_data) + with limiter: + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + logger.debug("transaction_data %r", transaction_data) - if pdu_list: - pdu = pdu_list[0] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(pdu) + if pdu_list: + pdu = pdu_list[0] - break + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + except NotRetryingDestination as e: + logger.info(e.message) + continue except CodeMessageException: raise except Exception as e: diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb20f2ebab..7d02afe163 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -22,6 +22,9 @@ from .units import Transaction from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.retryutils import ( + get_retry_limiter, NotRetryingDestination, +) import logging @@ -138,25 +141,6 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - destination - ) - if retry_timings: - (retry_last_ts, retry_interval) = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - return - else: - logger.info("TX [%s] is ready for retry", destination) - if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -204,77 +188,79 @@ class TransactionQueue(object): ] try: - self.pending_transactions[destination] = 1 - - logger.debug("TX [%s] Persisting transaction...", destination) - - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - self._next_txn_id += 1 + with limiter: + self.pending_transactions[destination] = 1 - yield self.transaction_actions.prepare_to_send(transaction) + logger.debug("TX [%s] Persisting transaction...", destination) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) + + self._next_txn_id += 1 - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self._clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb + yield self.transaction_actions.prepare_to_send(transaction) + + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, ) - code = 200 - except HttpResponseException as e: - code = e.code - response = e.response - logger.info("TX [%s] got %d response", destination, code) + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self._clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) + logger.info("TX [%s] got %d response", destination, code) - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) + + yield self.transaction_actions.delivered( + transaction, code, response + ) + + logger.debug("TX [%s] Marked as delivered", destination) - logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) for deferred in deferreds: if code == 200: - if retry_last_ts: - # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings( - destination, 0, 0 - ) deferred.callback(None) else: - self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -285,6 +271,12 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. @@ -302,8 +294,6 @@ class TransactionQueue(object): e, ) - self.set_retrying(destination, retry_interval) - for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -314,22 +304,3 @@ class TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) - - @defer.inlineCallbacks - def set_retrying(self, destination, retry_interval): - # track that this destination is having problems and we should - # give it a chance to recover before trying it again - - if retry_interval: - retry_interval *= 2 - # plateau at hourly retries for now - if retry_interval >= 60 * 60 * 1000: - retry_interval = 60 * 60 * 1000 - else: - retry_interval = 2000 # try again at first after 2 seconds - - yield self.store.set_destination_retry_timings( - destination, - int(self._clock.time_msec()), - retry_interval - ) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py new file mode 100644 index 0000000000..bba524ebd8 --- /dev/null +++ b/synapse/util/retryutils.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import logging + + +logger = logging.getLogger(__name__) + + +class NotRetryingDestination(Exception): + def __init__(self, retry_last_ts, retry_interval, destination): + msg = "Not retrying server %s." % (destination,) + super(NotRetryingDestination, self).__init__(msg) + + self.retry_last_ts = retry_last_ts + self.retry_interval = retry_interval + self.destination = destination + + +@defer.inlineCallbacks +def get_retry_limiter(destination, clock, store, **kwargs): + retry_last_ts, retry_interval = (0, 0) + + retry_timings = yield store.get_destination_retry_timings( + destination + ) + + if retry_timings: + retry_last_ts, retry_interval = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + + now = int(clock.time_msec()) + + if retry_last_ts + retry_interval > now: + raise NotRetryingDestination( + retry_last_ts=retry_last_ts, + retry_interval=retry_interval, + destination=destination, + ) + + defer.returnValue( + RetryDestinationLimiter( + destination, + clock, + store, + retry_interval, + **kwargs + ) + ) + + +class RetryDestinationLimiter(object): + def __init__(self, destination, clock, store, retry_interval, + min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, + multiplier_retry_interval=2): + self.clock = clock + self.store = store + self.destination = destination + + self.retry_interval = retry_interval + self.min_retry_interval = min_retry_interval + self.max_retry_interval = max_retry_interval + self.multiplier_retry_interval = multiplier_retry_interval + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + def err(self, failure): + logger.exception( + "Failed to store set_destination_retry_timings", + failure.value + ) + + if exc_type is None and exc_val is None and exc_tb is None: + # We connected successfully. + retry_last_ts = 0 + self.retry_interval = 0 + else: + # We couldn't connect. + if self.retry_interval: + self.retry_interval *= self.multiplier_retry_interval + + if self.retry_interval >= self.max_retry_interval: + self.retry_interval = self.max_retry_interval + else: + self.retry_interval = self.min_retry_interval + + retry_last_ts = int(self._clock.time_msec()), + + self.store.set_destination_retry_timings( + self.destination, retry_last_ts, self.retry_interval + ).addErrback(err) -- cgit 1.5.1 From 9371019133bf16cec163d58fe69aa701c8ca5305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 18:13:34 +0000 Subject: Try to only back off if we think we failed to connect to the remote --- synapse/crypto/keyring.py | 108 ++++++++++++++++---------------- synapse/federation/transaction_queue.py | 66 +++++++++---------- synapse/util/retryutils.py | 10 ++- 3 files changed, 95 insertions(+), 89 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index ea00c830c0..828aced44a 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -101,63 +101,63 @@ class Keyring(object): server_name, self.hs.tls_context_factory ) - # Check the response. + # Check the response. - x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, tls_certificate - ) + x509_certificate_bytes = crypto.dump_certificate( + crypto.FILETYPE_ASN1, tls_certificate + ) - if ("signatures" not in response - or server_name not in response["signatures"]): - raise ValueError("Key response not signed by remote server") - - if "tls_certificate" not in response: - raise ValueError("Key response missing TLS certificate") - - tls_certificate_b64 = response["tls_certificate"] - - if encode_base64(x509_certificate_bytes) != tls_certificate_b64: - raise ValueError("TLS certificate doesn't match") - - verify_keys = {} - for key_id, key_base64 in response["verify_keys"].items(): - if is_signing_algorithm_supported(key_id): - key_bytes = decode_base64(key_base64) - verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_keys[key_id] = verify_key - - for key_id in response["signatures"][server_name]: - if key_id not in response["verify_keys"]: - raise ValueError( - "Key response must include verification keys for all" - " signatures" - ) - if key_id in verify_keys: - verify_signed_json( - response, - server_name, - verify_keys[key_id] - ) - - # Cache the result in the datastore. - - time_now_ms = self.clock.time_msec() - - yield self.store.store_server_certificate( - server_name, - server_name, - time_now_ms, - tls_certificate, - ) + if ("signatures" not in response + or server_name not in response["signatures"]): + raise ValueError("Key response not signed by remote server") + + if "tls_certificate" not in response: + raise ValueError("Key response missing TLS certificate") - for key_id, key in verify_keys.items(): - yield self.store.store_server_verify_key( - server_name, server_name, time_now_ms, key + tls_certificate_b64 = response["tls_certificate"] + + if encode_base64(x509_certificate_bytes) != tls_certificate_b64: + raise ValueError("TLS certificate doesn't match") + + verify_keys = {} + for key_id, key_base64 in response["verify_keys"].items(): + if is_signing_algorithm_supported(key_id): + key_bytes = decode_base64(key_base64) + verify_key = decode_verify_key_bytes(key_id, key_bytes) + verify_keys[key_id] = verify_key + + for key_id in response["signatures"][server_name]: + if key_id not in response["verify_keys"]: + raise ValueError( + "Key response must include verification keys for all" + " signatures" + ) + if key_id in verify_keys: + verify_signed_json( + response, + server_name, + verify_keys[key_id] ) - for key_id in key_ids: - if key_id in verify_keys: - defer.returnValue(verify_keys[key_id]) - return + # Cache the result in the datastore. + + time_now_ms = self.clock.time_msec() + + yield self.store.store_server_certificate( + server_name, + server_name, + time_now_ms, + tls_certificate, + ) + + for key_id, key in verify_keys.items(): + yield self.store.store_server_verify_key( + server_name, server_name, time_now_ms, key + ) + + for key_id in key_ids: + if key_id in verify_keys: + defer.returnValue(verify_keys[key_id]) + return - raise ValueError("No verification key found for given key ids") + raise ValueError("No verification key found for given key ids") diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d02afe163..dc0f30cb64 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -167,15 +167,6 @@ class TransactionQueue(object): logger.info("TX [%s] Nothing to send", destination) return - logger.debug( - "TX [%s] Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, - len(pending_pdus), - len(pending_edus), - len(pending_failures) - ) - # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -194,32 +185,41 @@ class TransactionQueue(object): self.store, ) - with limiter: - self.pending_transactions[destination] = 1 + logger.debug( + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) + + self.pending_transactions[destination] = 1 - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug("TX [%s] Persisting transaction...", destination) - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - self._next_txn_id += 1 + self._next_txn_id += 1 - yield self.transaction_actions.prepare_to_send(transaction) + yield self.transaction_actions.prepare_to_send(transaction) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) + with limiter: # Actually send the transaction # FIXME (erikj): This is a bit of a hack to make the Pdu age @@ -249,11 +249,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( - transaction, code, response - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 888b7ef2e9..d190100c8c 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.api.errors import CodeMessageException + import logging @@ -67,7 +69,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, - multiplier_retry_interval=2): + multiplier_retry_interval=2,): self.clock = clock self.store = store self.destination = destination @@ -87,7 +89,11 @@ class RetryDestinationLimiter(object): failure.value ) - if exc_type is None and exc_val is None and exc_tb is None: + valid_err_code = False + if exc_type is CodeMessageException: + valid_err_code = 0 <= exc_val.code < 500 + + if exc_type is None or valid_err_code: # We connected successfully. if not self.retry_interval: return -- cgit 1.5.1 From ec847059f3e9b9b5de62aa2f7ad2366c4e883fac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:14:10 +0000 Subject: Rename _fail_fetch_pdu_cache to _get_pdu_cache --- synapse/app/homeserver.py | 2 +- synapse/federation/federation_client.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7565d94449..7be82d0576 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -275,7 +275,7 @@ def setup(): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() - hs.get_replication_layer().start_pdu_fail_cache() + hs.get_replication_layer().start_get_pdu_cache() if config.daemonize: print config.pid_file diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83b4947b99..6042e366bd 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,10 +32,10 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): def __init__(self): - self._fail_fetch_pdu_cache = None + self._get_pdu_cache = None - def start_pdu_fail_cache(self): - self._fail_fetch_pdu_cache = ExpiringCache( + def start_get_pdu_cache(self): + self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, max_len=1000, @@ -43,7 +43,7 @@ class FederationClient(FederationBase): reset_expiry_on_get=False, ) - self._fail_fetch_pdu_cache.start() + self._get_pdu_cache.start() @log_function def send_pdu(self, pdu, destinations): @@ -175,8 +175,8 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. - if self._fail_fetch_pdu_cache: - e = self._fail_fetch_pdu_cache.get(event_id) + if self._get_pdu_cache: + e = self._get_pdu_cache.get(event_id) if e: defer.returnValue(e) @@ -223,8 +223,8 @@ class FederationClient(FederationBase): ) continue - if self._fail_fetch_pdu_cache is not None: - self._fail_fetch_pdu_cache[event_id] = pdu + if self._get_pdu_cache is not None: + self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu) -- cgit 1.5.1 From e482541e1d77cb179dafa44037ee1e04003501ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:44:22 +0000 Subject: Fix pyflakes --- synapse/federation/transaction_queue.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a77201ad49..ae04774c76 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -266,7 +266,6 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( transaction, code, response ) -- cgit 1.5.1 From 47d3ff4cf8ebceacc680c9e5b414150c6edbf85b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 11:30:37 +0000 Subject: Don't send failure to self --- synapse/federation/transaction_queue.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ae04774c76..f0041d4d12 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -128,6 +128,9 @@ class TransactionQueue(object): @defer.inlineCallbacks def enqueue_failure(self, failure, destination): + if destination == self.server_name: + return + deferred = defer.Deferred() self.pending_failures_by_dest.setdefault( -- cgit 1.5.1 From b68e4a729ff91372a3a7269acc08c1f77584b0f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 11:32:39 +0000 Subject: Discard destination 'localhost' --- synapse/federation/transaction_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f0041d4d12..06944eefb9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -107,7 +107,7 @@ class TransactionQueue(object): def enqueue_edu(self, edu): destination = edu.destination - if destination == self.server_name: + if destination == self.server_name or destination == "localhost": return deferred = defer.Deferred() @@ -128,7 +128,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def enqueue_failure(self, failure, destination): - if destination == self.server_name: + if destination == self.server_name or destination == "localhost": return deferred = defer.Deferred() -- cgit 1.5.1 From 2462aacd77963431507bb97769acee3ed9d65ceb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 18 Feb 2015 11:51:00 +0000 Subject: Restrict the destinations that synapse can talk to --- synapse/federation/transaction_queue.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ae04774c76..4b5460c797 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -66,6 +66,26 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) + def can_send_to(self, destination): + """Can we send messages to the given server? + + We can't send messages to ourselves. If we are running on localhost + then we can only federation with other servers running on localhost. + Otherwise we only federate with servers on a public domain. + + Args: + destination(str): The server we are possibly trying to send to. + Returns: + bool: True if we can send to the server. + """ + + if destination == self.server_name: + return False + if self.server_name.startswith("localhost"): + return destination.startswith("localhost") + else: + return not destination.startswith("localhost") + @defer.inlineCallbacks @log_function def enqueue_pdu(self, pdu, destinations, order): @@ -74,8 +94,9 @@ class TransactionQueue(object): # table and we'll get back to it later. destinations = set(destinations) - destinations.discard(self.server_name) - destinations.discard("localhost") + destinations = set( + dest for dest in destinations if self.can_send_to(dest) + ) logger.debug("Sending to: %s", str(destinations)) @@ -107,7 +128,7 @@ class TransactionQueue(object): def enqueue_edu(self, edu): destination = edu.destination - if destination == self.server_name: + if not self.can_send_to(destination): return deferred = defer.Deferred() @@ -130,6 +151,9 @@ class TransactionQueue(object): def enqueue_failure(self, failure, destination): deferred = defer.Deferred() + if not self.can_send_to(destination): + return + self.pending_failures_by_dest.setdefault( destination, [] ).append( -- cgit 1.5.1 From 446ef589920a9c3c4738873023494bed79d1f6c3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 12:03:12 +0000 Subject: Add errback to all deferreds in transaction_queue --- synapse/federation/transaction_queue.py | 37 ++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 06944eefb9..1e5505a4bf 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -90,14 +90,17 @@ class TransactionQueue(object): (pdu, deferred, order) ) - def eb(failure): + def chain(failure): if not deferred.called: deferred.errback(failure) - else: - logger.warn("Failed to send pdu", failure.value) + + def log_failure(failure): + logger.warn("Failed to send pdu", failure.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) deferreds.append(deferred) @@ -115,14 +118,17 @@ class TransactionQueue(object): (edu, deferred) ) - def eb(failure): + def chain(failure): if not deferred.called: deferred.errback(failure) - else: - logger.warn("Failed to send edu", failure.value) + + def log_failure(failure): + logger.warn("Failed to send pdu", failure.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) return deferred @@ -139,14 +145,17 @@ class TransactionQueue(object): (failure, deferred) ) - def eb(failure): + def chain(f): if not deferred.called: - deferred.errback(failure) - else: - logger.warn("Failed to send failure", failure.value) + deferred.errback(f) + + def log_failure(f): + logger.warn("Failed to send pdu", f.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) yield deferred @@ -308,7 +317,7 @@ class TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception( + logger.warn( "TX [%s] Problem in _attempt_transaction: %s", destination, e, -- cgit 1.5.1