diff options
author | Erik Johnston <erik@matrix.org> | 2014-12-10 13:18:40 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-12-10 13:18:40 +0000 |
commit | aae8a37e63662681f6e75cca5b32cfba78b0d74f (patch) | |
tree | 10aa25caddf01da913d270186fc1169deff17320 /synapse | |
parent | Fix AttributeError (diff) | |
parent | update codestyle based on debate on #matrix-dev (diff) | |
download | synapse-aae8a37e63662681f6e75cca5b32cfba78b0d74f.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into events_refactor
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/federation/replication.py | 93 | ||||
-rw-r--r-- | synapse/federation/transport.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 7 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 35 | ||||
-rw-r--r-- | synapse/rest/transactions.py | 2 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v9.sql | 23 | ||||
-rw-r--r-- | synapse/storage/schema/transactions.sql | 6 | ||||
-rw-r--r-- | synapse/storage/transactions.py | 106 |
10 files changed, 255 insertions, 23 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index a4600b0b40..6388bb98e2 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -336,7 +336,7 @@ class ReplicationLayer(object): defer.returnValue(response) return - logger.debug("[%s] Transacition is new", transaction.transaction_id) + logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): dl = [] @@ -690,6 +690,7 @@ class _TransactionQueue(object): self.transport_layer = transport_layer self._clock = hs.get_clock() + self.store = hs.get_datastore() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -731,8 +732,14 @@ class _TransactionQueue(object): (pdu, deferred, order) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send pdu", failure) + with PreserveLoggingContext(): - self._attempt_new_transaction(destination) + self._attempt_new_transaction(destination).addErrback(eb) deferreds.append(deferred) @@ -742,6 +749,9 @@ class _TransactionQueue(object): def enqueue_edu(self, edu): destination = edu.destination + if destination == self.server_name: + return + deferred = defer.Deferred() self.pending_edus_by_dest.setdefault(destination, []).append( (edu, deferred) @@ -751,7 +761,7 @@ class _TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.exception("Failed to send edu", failure) + logger.warn("Failed to send edu", failure) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -773,10 +783,33 @@ class _TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + + (retry_last_ts, retry_interval) = (0, 0) + retry_timings = yield self.store.get_destination_retry_timings( + destination + ) + if retry_timings: + (retry_last_ts, retry_interval) = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + return + else: + logger.info("TX [%s] is ready for retry", destination) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests return - # list of (pending_pdu, deferred, order) + # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) @@ -784,7 +817,14 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction", destination) + logger.debug( + "TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -817,7 +857,11 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.debug("TX [%s] Sending transaction...", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) # Actually send the transaction @@ -838,6 +882,8 @@ class _TransactionQueue(object): transaction, json_data_cb ) + logger.info("TX [%s] got %d response", destination, code) + logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) @@ -850,8 +896,14 @@ class _TransactionQueue(object): for deferred in deferreds: if code == 200: + if retry_last_ts: + # this host is alive! reset retry schedule + yield self.store.set_destination_retry_timings( + destination, 0, 0 + ) deferred.callback(None) else: + self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -864,11 +916,15 @@ class _TransactionQueue(object): logger.debug("TX [%s] Yielded to callbacks", destination) except Exception as e: - logger.error("TX Problem in _attempt_transaction") - # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception(e) + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + self.set_retrying(destination, retry_interval) for deferred in deferreds: if not deferred.called: @@ -880,3 +936,22 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + + @defer.inlineCallbacks + def set_retrying(self, destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + + if retry_interval: + retry_interval *= 2 + # plateau at hourly retries for now + if retry_interval >= 60 * 60 * 1000: + retry_interval = 60 * 60 * 1000 + else: + retry_interval = 2000 # try again at first after 2 seconds + + yield self.store.set_destination_retry_timings( + destination, + int(self._clock.time_msec()), + retry_interval + ) diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 8d86152085..0f11c6d491 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -155,7 +155,7 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to it's destination + """ Sends the given Transaction to its destination Args: transaction (Transaction) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f38d9a48c8..e5deb8a9ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -203,6 +203,13 @@ class FederationHandler(BaseHandler): e.msg, affected=event.event_id, ) + + # if we're receiving valid events from an origin, + # it's probably a good idea to mark it as not in retry-state + # for sending (although this is a bit of a leap) + retry_timings = yield self.store.get_destination_retry_timings(origin) + if (retry_timings and retry_timings.retry_last_ts): + self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 510f07dd7b..fc5b5ab809 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -89,8 +89,8 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.debug("Sending request to %s: %s %s", - destination, method, url_bytes) + logger.info("Sending request to %s: %s %s", + destination, method, url_bytes) logger.debug( "Types: %s", @@ -101,6 +101,8 @@ class MatrixFederationHttpClient(object): ] ) + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) retries_left = 5 endpoint = self._getEndpoint(reactor, destination) @@ -127,11 +129,20 @@ class MatrixFederationHttpClient(object): break except Exception as e: if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn("DNS Lookup failed to %s with %s", destination, - e) + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) raise SynapseError(400, "Domain specified not found.") - logger.exception("Got error in _create_request") + logger.warn( + "Sending request failed to %s: %s %s : %s", + destination, + method, + url_bytes, + e + ) _print_ex(e) if retries_left: @@ -140,15 +151,21 @@ class MatrixFederationHttpClient(object): else: raise + logger.info( + "Received response %d %s for %s: %s %s", + response.code, + response.phrase, + destination, + method, + url_bytes + ) + if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - logger.error( - "Got response %d %s", response.code, response.phrase - ) raise CodeMessageException( response.code, response.phrase ) @@ -284,7 +301,7 @@ def _print_ex(e): for ex in e.reasons: _print_ex(ex) else: - logger.exception(e) + logger.warn(e) class _JsonProducer(object): diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py index 93c0122f30..8c41ab4edb 100644 --- a/synapse/rest/transactions.py +++ b/synapse/rest/transactions.py @@ -19,7 +19,7 @@ import logging logger = logging.getLogger(__name__) - +# FIXME: elsewhere we use FooStore to indicate something in the storage layer... class HttpTransactionStore(object): def __init__(self): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 07b1665bf6..7068424441 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -68,7 +68,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 9 class _RollbackButIsFineException(Exception): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 72f88cb2aa..935a167f6f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -608,7 +608,7 @@ class JoinHelper(object): to dump the results into. Attributes: - taples (list): List of `Table` classes + tables (list): List of `Table` classes EntryType (type) """ diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql new file mode 100644 index 0000000000..ad680c64da --- /dev/null +++ b/synapse/storage/schema/delta/v9.sql @@ -0,0 +1,23 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); + +PRAGMA user_version = 9; \ No newline at end of file diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql index 88e3e4e04d..de461bfa15 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/transactions.sql @@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 00d0f48082..423cc3f02a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table from collections import namedtuple +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + # a write-through cache of DestinationsTable.EntryType indexed by + # destination string + destination_retry_cache = {} + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): - # First we find out what the prev_txs should be. + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. query = "%s ORDER BY id DESC LIMIT 1" % ( @@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + def get_destination_retry_timings(self, destination): + """Gets the current retry timings (if any) for a given destination. + + Args: + destination (str) + + Returns: + None if not retrying + Otherwise a DestinationsTable.EntryType for the retry scheme + """ + if destination in self.destination_retry_cache: + return defer.succeed(self.destination_retry_cache[destination]) + + return self.runInteraction( + "get_destination_retry_timings", + self._get_destination_retry_timings, destination) + + def _get_destination_retry_timings(cls, txn, destination): + query = DestinationsTable.select_statement("destination = ?") + txn.execute(query, (destination,)) + result = txn.fetchall() + if result: + result = DestinationsTable.decode_single_result(result) + if result.retry_last_ts > 0: + return result + else: + return None + + def set_destination_retry_timings(self, destination, + retry_last_ts, retry_interval): + """Sets the current retry timings for a given destination. + Both timings should be zero if retrying is no longer occuring. + + Args: + destination (str) + retry_last_ts (int) - time of last retry attempt in unix epoch ms + retry_interval (int) - how long until next retry in ms + """ + + self.destination_retry_cache[destination] = ( + DestinationsTable.EntryType( + destination, + retry_last_ts, + retry_interval + ) + ) + + # XXX: we could chose to not bother persisting this if our cache thinks + # this is a NOOP + return self.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings, + destination, + retry_last_ts, + retry_interval, + ) + + def _set_destination_retry_timings(cls, txn, destination, + retry_last_ts, retry_interval): + + query = ( + "INSERT OR REPLACE INTO %s " + "(destination, retry_last_ts, retry_interval) " + "VALUES (?, ?, ?) " + ) % DestinationsTable.table_name + + txn.execute(query, (destination, retry_last_ts, retry_interval)) + + def get_destinations_needing_retry(self): + """Get all destinations which are due a retry for sending a transaction. + + Returns: + list: A list of `DestinationsTable.EntryType` + """ + + return self.runInteraction( + "get_destinations_needing_retry", + self._get_destinations_needing_retry + ) + + def _get_destinations_needing_retry(cls, txn): + where = "retry_last_ts > 0 and retry_next_ts < now()" + query = DestinationsTable.select_statement(where) + txn.execute(query) + return DestinationsTable.decode_results(txn.fetchall()) + class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -247,3 +339,15 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) + + +class DestinationsTable(Table): + table_name = "destinations" + + fields = [ + "destination", + "retry_last_ts", + "retry_interval", + ] + + EntryType = namedtuple("DestinationsEntry", fields) |