From aed62a35832a3ec1c7425ecc99cab06a781263ba Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 7 Dec 2014 02:26:07 +0000 Subject: track replication destination health, and perform exponential back-off when sending transactions. does *not* yet retry transactions, but drops them on the floor if waiting for a server to recover. --- synapse/federation/replication.py | 43 +++++++++++++++--- synapse/federation/transport.py | 2 +- synapse/http/matrixfederationclient.py | 16 ++++--- synapse/rest/transactions.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/schema/delta/v9.sql | 23 ++++++++++ synapse/storage/schema/transactions.sql | 6 +++ synapse/storage/transactions.py | 78 ++++++++++++++++++++++++++++++++- 9 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 synapse/storage/schema/delta/v9.sql diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 01f87fe423..f9c05b5ea3 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -723,6 +723,8 @@ class _TransactionQueue(object): deferreds = [] for destination in destinations: + # XXX: why don't we specify an errback for this deferred + # like we do for EDUs? --matthew deferred = defer.Deferred() self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) @@ -738,6 +740,9 @@ class _TransactionQueue(object): # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination + + if destination == self.server_name: + return deferred = defer.Deferred() self.pending_edus_by_dest.setdefault(destination, []).append( @@ -766,14 +771,23 @@ class _TransactionQueue(object): ) yield deferred - + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + + (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info("TX [%s] not ready for retry yet - dropping transaction for now") + return + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending request + # at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these requests return - # list of (pending_pdu, deferred, order) + # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) @@ -781,7 +795,8 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction", destination) + logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)", + destination, len(pending_pdus), len(pending_edus), len(pending_failures)) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -814,7 +829,7 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.debug("TX [%s] Sending transaction...", destination) + logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id) # Actually send the transaction @@ -835,6 +850,8 @@ class _TransactionQueue(object): transaction, json_data_cb ) + logger.info("TX [%s] got %d response", destination, code) + logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) @@ -849,6 +866,7 @@ class _TransactionQueue(object): if code == 200: deferred.callback(None) else: + start_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -861,12 +879,12 @@ class _TransactionQueue(object): logger.debug("TX [%s] Yielded to callbacks", destination) except Exception as e: - logger.error("TX Problem in _attempt_transaction") - # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception(e) + logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e) + start_retrying(destination, retry_interval) + for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -877,3 +895,14 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + +def start_retrying(destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + if retry_interval: + retry_interval *= 2 + else: + retry_interval = 2 # try again at first after 2 seconds + self.store.set_destination_retry_timings(destination, + int(self._clock.time_msec()), retry_interval) + \ No newline at end of file diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 8d86152085..0f11c6d491 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -155,7 +155,7 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to it's destination + """ Sends the given Transaction to its destination Args: transaction (Transaction) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 510f07dd7b..3edc59dbab 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -89,7 +89,7 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.debug("Sending request to %s: %s %s", + logger.info("Sending request to %s: %s %s", destination, method, url_bytes) logger.debug( @@ -101,7 +101,10 @@ class MatrixFederationHttpClient(object): ] ) - retries_left = 5 + # was 5; for now, let's only try once at the HTTP layer and then + # rely on transaction-layer retries for exponential backoff and + # getting the message through. + retries_left = 0 endpoint = self._getEndpoint(reactor, destination) @@ -131,7 +134,8 @@ class MatrixFederationHttpClient(object): e) raise SynapseError(400, "Domain specified not found.") - logger.exception("Got error in _create_request") + logger.exception("Sending request failed to %s: %s %s : %s", + destination, method, url_bytes, e) _print_ex(e) if retries_left: @@ -140,15 +144,15 @@ class MatrixFederationHttpClient(object): else: raise + logger.info("Received response %d %s for %s: %s %s", + response.code, response.phrase, destination, method, url_bytes) + if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - logger.error( - "Got response %d %s", response.code, response.phrase - ) raise CodeMessageException( response.code, response.phrase ) diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py index 93c0122f30..8c41ab4edb 100644 --- a/synapse/rest/transactions.py +++ b/synapse/rest/transactions.py @@ -19,7 +19,7 @@ import logging logger = logging.getLogger(__name__) - +# FIXME: elsewhere we use FooStore to indicate something in the storage layer... class HttpTransactionStore(object): def __init__(self): diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f15e3dfe62..04ab39341d 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -67,7 +67,7 @@ SCHEMAS = [ # Remember to update this number every time an incompatible change is made to # database schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 8 +SCHEMA_VERSION = 9 class _RollbackButIsFineException(Exception): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 4881f03368..e72200e2f7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -650,7 +650,7 @@ class JoinHelper(object): to dump the results into. Attributes: - taples (list): List of `Table` classes + tables (list): List of `Table` classes EntryType (type) """ diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql new file mode 100644 index 0000000000..ad680c64da --- /dev/null +++ b/synapse/storage/schema/delta/v9.sql @@ -0,0 +1,23 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); + +PRAGMA user_version = 9; \ No newline at end of file diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql index 88e3e4e04d..de461bfa15 100644 --- a/synapse/storage/schema/transactions.sql +++ b/synapse/storage/schema/transactions.sql @@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination); CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination); +-- To track destination health +CREATE TABLE IF NOT EXISTS destinations( + destination TEXT PRIMARY KEY, + retry_last_ts INTEGER, + retry_interval INTEGER +); diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 00d0f48082..47b73f7458 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -114,7 +114,7 @@ class TransactionStore(SQLBaseStore): def _prep_send_transaction(self, txn, transaction_id, destination, origin_server_ts): - # First we find out what the prev_txs should be. + # First we find out what the prev_txns should be. # Since we know that we are only sending one transaction at a time, # we can simply take the last one. query = "%s ORDER BY id DESC LIMIT 1" % ( @@ -205,6 +205,71 @@ class TransactionStore(SQLBaseStore): return ReceivedTransactionsTable.decode_results(txn.fetchall()) + def get_destination_retry_timings(self, destination): + """Gets the current retry timings (if any) for a given destination. + + Args: + destination (str) + + Returns: + None if not retrying + tuple: (retry_last_ts, retry_interval) + retry_ts: time of last retry attempt in unix epoch ms + retry_interval: how long until next retry in ms + """ + return self.runInteraction( + "get_destination_retry_timings", + self._get_destination_retry_timings, destination) + + def _get_destination_retry_timings(cls, txn, destination): + query = DestinationsTable.select_statement("destination = ?") + txn.execute(query, (destination,)) + result = DestinationsTable.decode_single_result(txn.fetchone()) + if result and result[0] > 0: + return result + else: + return None + + def set_destination_retry_timings(self, destination): + """Sets the current retry timings for a given destination. + Both timings should be zero if retrying is no longer occuring. + + Args: + destination (str) + retry_last_ts (int) - time of last retry attempt in unix epoch ms + retry_interval (int) - how long until next retry in ms + """ + return self.runInteraction( + "set_destination_retry_timings", + self._set_destination_retry_timings, destination, retry_last_ts, retry_interval) + + def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): + + query = ( + "INSERT OR REPLACE INTO %s " + "(retry_last_ts, retry_interval) " + "VALUES (?, ?) " + "WHERE destination = ?" + ) % DestinationsTable.table_name + + txn.execute(query, (retry_last_ts, retry_interval, destination)) + + def get_destinations_needing_retry(self): + """Get all destinations which are due a retry for sending a transaction. + + Returns: + list: A list of `DestinationsTable.EntryType` + """ + return self.runInteraction( + "get_destinations_needing_retry", + self._get_destinations_needing_retry + ) + + def _get_destinations_needing_retry(cls, txn): + where = "retry_last_ts > 0 and retry_next_ts < now()" + query = DestinationsTable.select_statement(where) + txn.execute(query) + return DestinationsTable.decode_results(txn.fetchall()) class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -247,3 +312,14 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) + +class DestinationsTable(Table): + table_name = "destinations" + + fields = [ + "destination", + "retry_last_ts", + "retry_interval", + ] + + EntryType = namedtuple("DestinationsEntry", fields) \ No newline at end of file -- cgit 1.4.1 From 5cd43d4b9f3c41b21ced0ab44cf24c2cf7dab817 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Sun, 7 Dec 2014 23:44:16 +0000 Subject: fix stupid syntax thinkos --- synapse/federation/replication.py | 23 +++++++++++------------ synapse/storage/transactions.py | 2 +- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index f9c05b5ea3..1b9e3ece09 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -866,7 +866,7 @@ class _TransactionQueue(object): if code == 200: deferred.callback(None) else: - start_retrying(destination, retry_interval) + self.start_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -883,7 +883,7 @@ class _TransactionQueue(object): # for this finishing functions deferred. logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e) - start_retrying(destination, retry_interval) + self.start_retrying(destination, retry_interval) for deferred in deferreds: if not deferred.called: @@ -896,13 +896,12 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) -def start_retrying(destination, retry_interval): - # track that this destination is having problems and we should - # give it a chance to recover before trying it again - if retry_interval: - retry_interval *= 2 - else: - retry_interval = 2 # try again at first after 2 seconds - self.store.set_destination_retry_timings(destination, - int(self._clock.time_msec()), retry_interval) - \ No newline at end of file + def start_retrying(self, destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + if retry_interval: + retry_interval *= 2 + else: + retry_interval = 2 # try again at first after 2 seconds + self.store.set_destination_retry_timings(destination, + int(self._clock.time_msec()), retry_interval) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 47b73f7458..cacd948302 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -230,7 +230,7 @@ class TransactionStore(SQLBaseStore): else: return None - def set_destination_retry_timings(self, destination): + def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. -- cgit 1.4.1 From 9c43b258ecc493b126ef2858b9bb8fda0f01478a Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 8 Dec 2014 00:17:12 +0000 Subject: actually reset retry schedule if we can successfuly talk to it --- synapse/federation/replication.py | 3 +++ synapse/handlers/federation.py | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 1b9e3ece09..88184caecd 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -864,6 +864,9 @@ class _TransactionQueue(object): for deferred in deferreds: if code == 200: + if retry_last_ts: + # this host is alive! reset retry schedule + self.store.set_destination_retry_timings(destination, 0, 0) deferred.callback(None) else: self.start_retrying(destination, retry_interval) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 925eb5376e..7a79e2d117 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -207,6 +207,12 @@ class FederationHandler(BaseHandler): e.msg, affected=event.event_id, ) + + # if we're receiving valid events from an origin, + # it's probably a good idea to mark it as not in retry-state + # for sending (although this is a bit of a leap) + if ((self.store.get_destination_retry_timings(origin))[0]): + self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) -- cgit 1.4.1 From 0d3fa1ac6e5257218a0c0dbda8cc015e77fe0a30 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 8 Dec 2014 17:48:57 +0000 Subject: add a write-through cache on the retry schedule --- synapse/storage/transactions.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index cacd948302..fa51766e05 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -25,6 +25,9 @@ logger = logging.getLogger(__name__) class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ + + # a write-through cache of DestinationsTable.EntryType indexed by destination string + destination_retry_cache = {} def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have @@ -213,10 +216,11 @@ class TransactionStore(SQLBaseStore): Returns: None if not retrying - tuple: (retry_last_ts, retry_interval) - retry_ts: time of last retry attempt in unix epoch ms - retry_interval: how long until next retry in ms + Otherwise a DestinationsTable.EntryType for the retry scheme """ + if self.destination_retry_cache[destination]: + return self.destination_retry_cache[destination] + return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) @@ -225,7 +229,7 @@ class TransactionStore(SQLBaseStore): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) result = DestinationsTable.decode_single_result(txn.fetchone()) - if result and result[0] > 0: + if result and result.retry_last_ts > 0: return result else: return None @@ -239,6 +243,12 @@ class TransactionStore(SQLBaseStore): retry_last_ts (int) - time of last retry attempt in unix epoch ms retry_interval (int) - how long until next retry in ms """ + + self.destination_retry_cache[destination] = ( + DestinationsTable.EntryType(destination, retry_last_ts, retry_interval) + ) + + # xxx: we could chose to not bother persisting this if our cache things this is a NOOP return self.runInteraction( "set_destination_retry_timings", self._set_destination_retry_timings, destination, retry_last_ts, retry_interval) @@ -260,6 +270,7 @@ class TransactionStore(SQLBaseStore): Returns: list: A list of `DestinationsTable.EntryType` """ + return self.runInteraction( "get_destinations_needing_retry", self._get_destinations_needing_retry -- cgit 1.4.1 From 8529fba02d93ed1d0d08873f0cbbd58a3194e4af Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 8 Dec 2014 19:34:51 +0000 Subject: fix a million stupid bugs and make it actually work --- synapse/federation/replication.py | 25 +++++++++++++++++-------- synapse/handlers/federation.py | 3 ++- synapse/storage/transactions.py | 25 ++++++++++++++----------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 88184caecd..c4c6667b62 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -685,6 +685,7 @@ class _TransactionQueue(object): self.transport_layer = transport_layer self._clock = hs.get_clock() + self.store = hs.get_datastore() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -775,11 +776,18 @@ class _TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - dropping transaction for now") - return + + (retry_last_ts, retry_interval) = (0, 0) + retry_timings = yield self.store.get_destination_retry_timings(destination) + if retry_timings: + (retry_last_ts, retry_interval) = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination) + return + else: + logger.info("TX [%s] is ready for retry", destination) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending request @@ -866,7 +874,7 @@ class _TransactionQueue(object): if code == 200: if retry_last_ts: # this host is alive! reset retry schedule - self.store.set_destination_retry_timings(destination, 0, 0) + yield self.store.set_destination_retry_timings(destination, 0, 0) deferred.callback(None) else: self.start_retrying(destination, retry_interval) @@ -899,12 +907,13 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + @defer.inlineCallbacks def start_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again if retry_interval: retry_interval *= 2 else: - retry_interval = 2 # try again at first after 2 seconds - self.store.set_destination_retry_timings(destination, + retry_interval = 2000 # try again at first after 2 seconds + yield self.store.set_destination_retry_timings(destination, int(self._clock.time_msec()), retry_interval) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7a79e2d117..cfb5029774 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -211,7 +211,8 @@ class FederationHandler(BaseHandler): # if we're receiving valid events from an origin, # it's probably a good idea to mark it as not in retry-state # for sending (although this is a bit of a leap) - if ((self.store.get_destination_retry_timings(origin))[0]): + retry_timings = yield self.store.get_destination_retry_timings(origin) + if (retry_timings and retry_timings.retry_last_ts): self.store.set_destination_retry_timings(origin, 0, 0) room = yield self.store.get_room(event.room_id) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index fa51766e05..237b024451 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table from collections import namedtuple +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -218,8 +220,8 @@ class TransactionStore(SQLBaseStore): None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ - if self.destination_retry_cache[destination]: - return self.destination_retry_cache[destination] + if destination in self.destination_retry_cache: + return defer.succeed(self.destination_retry_cache[destination]) return self.runInteraction( "get_destination_retry_timings", @@ -228,11 +230,13 @@ class TransactionStore(SQLBaseStore): def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) - result = DestinationsTable.decode_single_result(txn.fetchone()) - if result and result.retry_last_ts > 0: - return result - else: - return None + result = txn.fetchall() + if result: + result = DestinationsTable.decode_single_result(result) + if result.retry_last_ts > 0: + return result + else: + return None def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. @@ -257,12 +261,11 @@ class TransactionStore(SQLBaseStore): query = ( "INSERT OR REPLACE INTO %s " - "(retry_last_ts, retry_interval) " - "VALUES (?, ?) " - "WHERE destination = ?" + "(destination, retry_last_ts, retry_interval) " + "VALUES (?, ?, ?) " ) % DestinationsTable.table_name - txn.execute(query, (retry_last_ts, retry_interval, destination)) + txn.execute(query, (destination, retry_last_ts, retry_interval)) def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. -- cgit 1.4.1 From 8ada2d20183da9aac8182fbe80bdb897c5a4d224 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 9 Dec 2014 23:53:07 +0000 Subject: fix UTs by telling all the mock stores about the new methods for tracking retries --- tests/federation/test_federation.py | 5 +++++ tests/handlers/test_federation.py | 2 ++ tests/handlers/test_presence.py | 10 +++++++++- tests/handlers/test_typing.py | 9 +++++++-- 4 files changed, 23 insertions(+), 3 deletions(-) diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py index 73dd289276..f6b41e2c4c 100644 --- a/tests/federation/test_federation.py +++ b/tests/federation/test_federation.py @@ -25,6 +25,7 @@ from synapse.server import HomeServer from synapse.federation import initialize_http_replication from synapse.api.events import SynapseEvent +from synapse.storage.transactions import DestinationsTable def make_pdu(prev_pdus=[], **kwargs): """Provide some default fields for making a PduTuple.""" @@ -55,10 +56,14 @@ class FederationTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]) self.mock_persistence.get_received_txn_response.return_value = ( defer.succeed(None) ) + self.mock_persistence.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) self.mock_config = Mock() self.mock_config.signing_key = [MockKey()] self.clock = MockClock() diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 33016c16ef..fae33716a3 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -53,6 +53,8 @@ class FederationTestCase(unittest.TestCase): "persist_event", "store_room", "get_room", + "get_destination_retry_timings", + "set_destination_retry_timings", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index fe69ce47eb..b85a89052a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -30,7 +30,7 @@ from synapse.api.constants import PresenceState from synapse.api.errors import SynapseError from synapse.handlers.presence import PresenceHandler, UserPresenceCache from synapse.streams.config import SourcePaginationConfig - +from synapse.storage.transactions import DestinationsTable OFFLINE = PresenceState.OFFLINE UNAVAILABLE = PresenceState.UNAVAILABLE @@ -528,6 +528,7 @@ class PresencePushTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]), handlers=None, resource_for_client=Mock(), @@ -539,6 +540,9 @@ class PresencePushTestCase(unittest.TestCase): hs.handlers = JustPresenceHandlers(hs) self.datastore = hs.get_datastore() + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) def get_received_txn_response(*args): return defer.succeed(None) @@ -1037,6 +1041,7 @@ class PresencePollingTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]), handlers=None, resource_for_client=Mock(), @@ -1048,6 +1053,9 @@ class PresencePollingTestCase(unittest.TestCase): hs.handlers = JustPresenceHandlers(hs) self.datastore = hs.get_datastore() + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) def get_received_txn_response(*args): return defer.succeed(None) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index adb5148351..2f170ac3bf 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -25,6 +25,8 @@ from ..utils import MockHttpResource, MockClock, DeferredMockCallable, MockKey from synapse.server import HomeServer from synapse.handlers.typing import TypingNotificationHandler +from synapse.storage.transactions import DestinationsTable + def _expect_edu(destination, edu_type, content, origin="test"): return { @@ -49,7 +51,6 @@ class JustTypingNotificationHandlers(object): def __init__(self, hs): self.typing_notification_handler = TypingNotificationHandler(hs) - class TypingNotificationsTestCase(unittest.TestCase): """Tests typing notifications to rooms.""" def setUp(self): @@ -72,6 +73,7 @@ class TypingNotificationsTestCase(unittest.TestCase): "delivered_txn", "get_received_txn_response", "set_received_txn_response", + "get_destination_retry_timings", ]), handlers=None, resource_for_client=Mock(), @@ -89,6 +91,9 @@ class TypingNotificationsTestCase(unittest.TestCase): self.handler.push_update_to_clients = self.mock_update_client self.datastore = hs.get_datastore() + self.datastore.get_destination_retry_timings.return_value = ( + defer.succeed(DestinationsTable.EntryType("", 0, 0)) + ) def get_received_txn_response(*args): return defer.succeed(None) @@ -162,7 +167,7 @@ class TypingNotificationsTestCase(unittest.TestCase): @defer.inlineCallbacks def test_started_typing_remote_send(self): self.room_members = [self.u_apple, self.u_onion] - + put_json = self.mock_http_client.put_json put_json.expect_call_and_return( call("farm", -- cgit 1.4.1 From 2b1acb7671e33baeb01be2f0facd20cd6ea7e3b5 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 00:03:55 +0000 Subject: squidge to 79 columns as per pep8 --- synapse/federation/replication.py | 30 ++++++++++++++++++++---------- synapse/http/matrixfederationclient.py | 7 ++++--- synapse/storage/transactions.py | 18 ++++++++++++------ 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c4c6667b62..c242488483 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -778,21 +778,25 @@ class _TransactionQueue(object): def _attempt_new_transaction(self, destination): (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings(destination) + retry_timings = yield self.store.get_destination_retry_timings( + destination + ) if retry_timings: (retry_last_ts, retry_interval) = ( retry_timings.retry_last_ts, retry_timings.retry_interval ) if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination) + logger.info("TX [%s] not ready for retry yet - " + "dropping transaction for now", destination) return else: logger.info("TX [%s] is ready for retry", destination) if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending request - # at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these requests + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests return # list of (pending_pdu, deferred, order) @@ -803,8 +807,10 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)", - destination, len(pending_pdus), len(pending_edus), len(pending_failures)) + logger.debug("TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), len(pending_edus), len(pending_failures)) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -837,7 +843,8 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id) + logger.info("TX [%s] Sending transaction [%s]", destination, + transaction.transaction_id) # Actually send the transaction @@ -874,7 +881,9 @@ class _TransactionQueue(object): if code == 200: if retry_last_ts: # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings(destination, 0, 0) + yield self.store.set_destination_retry_timings( + destination, 0, 0 + ) deferred.callback(None) else: self.start_retrying(destination, retry_interval) @@ -892,7 +901,8 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e) + logger.exception("TX [%s] Problem in _attempt_transaction: %s", + destination, e) self.start_retrying(destination, retry_interval) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 3edc59dbab..c76990904d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -90,7 +90,7 @@ class MatrixFederationHttpClient(object): ) logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + destination, method, url_bytes) logger.debug( "Types: %s", @@ -135,7 +135,7 @@ class MatrixFederationHttpClient(object): raise SynapseError(400, "Domain specified not found.") logger.exception("Sending request failed to %s: %s %s : %s", - destination, method, url_bytes, e) + destination, method, url_bytes, e) _print_ex(e) if retries_left: @@ -145,7 +145,8 @@ class MatrixFederationHttpClient(object): raise logger.info("Received response %d %s for %s: %s %s", - response.code, response.phrase, destination, method, url_bytes) + response.code, response.phrase, + destination, method, url_bytes) if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 237b024451..2b16787695 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -28,7 +28,8 @@ class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - # a write-through cache of DestinationsTable.EntryType indexed by destination string + # a write-through cache of DestinationsTable.EntryType indexed by + # destination string destination_retry_cache = {} def get_received_txn_response(self, transaction_id, origin): @@ -238,7 +239,8 @@ class TransactionStore(SQLBaseStore): else: return None - def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): + def set_destination_retry_timings(self, destination, + retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. @@ -249,15 +251,19 @@ class TransactionStore(SQLBaseStore): """ self.destination_retry_cache[destination] = ( - DestinationsTable.EntryType(destination, retry_last_ts, retry_interval) + DestinationsTable.EntryType(destination, + retry_last_ts, retry_interval) ) - # xxx: we could chose to not bother persisting this if our cache things this is a NOOP + # XXX: we could chose to not bother persisting this if our cache thinks + # this is a NOOP return self.runInteraction( "set_destination_retry_timings", - self._set_destination_retry_timings, destination, retry_last_ts, retry_interval) + self._set_destination_retry_timings, destination, + retry_last_ts, retry_interval) - def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): + def _set_destination_retry_timings(cls, txn, destination, + retry_last_ts, retry_interval): query = ( "INSERT OR REPLACE INTO %s " -- cgit 1.4.1 From faf12b64f81627d92cb1ac49b6eb58f9d3f4837d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 00:12:51 +0000 Subject: add errbacks to enqueue_pdu deferreds; change logging for failed federation sends to warn rather than exception --- synapse/federation/replication.py | 16 ++++++++++------ synapse/http/matrixfederationclient.py | 4 ++-- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index c242488483..346b5f04c0 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -334,7 +334,7 @@ class ReplicationLayer(object): defer.returnValue(response) return - logger.debug("[%s] Transacition is new", transaction.transaction_id) + logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): dl = [] @@ -724,15 +724,19 @@ class _TransactionQueue(object): deferreds = [] for destination in destinations: - # XXX: why don't we specify an errback for this deferred - # like we do for EDUs? --matthew deferred = defer.Deferred() self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) ) + + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send pdu", failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination) + self._attempt_new_transaction(destination).addErrback(eb) deferreds.append(deferred) @@ -754,7 +758,7 @@ class _TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.exception("Failed to send edu", failure) + logger.warn("Failed to send edu", failure) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -901,7 +905,7 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception("TX [%s] Problem in _attempt_transaction: %s", + logger.warn("TX [%s] Problem in _attempt_transaction: %s", destination, e) self.start_retrying(destination, retry_interval) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c76990904d..8fc6bf8f97 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -134,7 +134,7 @@ class MatrixFederationHttpClient(object): e) raise SynapseError(400, "Domain specified not found.") - logger.exception("Sending request failed to %s: %s %s : %s", + logger.warn("Sending request failed to %s: %s %s : %s", destination, method, url_bytes, e) _print_ex(e) @@ -289,7 +289,7 @@ def _print_ex(e): for ex in e.reasons: _print_ex(ex) else: - logger.exception(e) + logger.warn(e) class _JsonProducer(object): -- cgit 1.4.1 From 71da2bed5585cf66a7707cfc5a000eb28a56ff34 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 00:18:44 +0000 Subject: plateau retries after 1h --- synapse/federation/replication.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 346b5f04c0..589a3f581b 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -890,7 +890,7 @@ class _TransactionQueue(object): ) deferred.callback(None) else: - self.start_retrying(destination, retry_interval) + self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -908,7 +908,7 @@ class _TransactionQueue(object): logger.warn("TX [%s] Problem in _attempt_transaction: %s", destination, e) - self.start_retrying(destination, retry_interval) + self.set_retrying(destination, retry_interval) for deferred in deferreds: if not deferred.called: @@ -922,11 +922,14 @@ class _TransactionQueue(object): self._attempt_new_transaction(destination) @defer.inlineCallbacks - def start_retrying(self, destination, retry_interval): + def set_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again if retry_interval: retry_interval *= 2 + # plateau at hourly retries for now + if retry_interval >= 60 * 60 * 1000: + retry_interval = 60 * 60 * 1000 else: retry_interval = 2000 # try again at first after 2 seconds yield self.store.set_destination_retry_timings(destination, -- cgit 1.4.1 From b8d30899b1296347a75d5a59e32d73a5236e6ea2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 10:16:09 +0000 Subject: Code style. --- synapse/federation/replication.py | 52 +++++++++++++++++++++++----------- synapse/http/matrixfederationclient.py | 29 +++++++++++++------ synapse/storage/transactions.py | 50 ++++++++++++++++++-------------- 3 files changed, 85 insertions(+), 46 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 589a3f581b..0cb632fb08 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -728,7 +728,7 @@ class _TransactionQueue(object): self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) ) - + def eb(failure): if not deferred.called: deferred.errback(failure) @@ -745,7 +745,7 @@ class _TransactionQueue(object): # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination - + if destination == self.server_name: return @@ -776,7 +776,7 @@ class _TransactionQueue(object): ) yield deferred - + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): @@ -790,12 +790,15 @@ class _TransactionQueue(object): retry_timings.retry_last_ts, retry_timings.retry_interval ) if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - " - "dropping transaction for now", destination) + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) return else: logger.info("TX [%s] is ready for retry", destination) - + if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -811,10 +814,14 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction " - "(pdus: %d, edus: %d, failures: %d)", + logger.debug( + "TX [%s] Attempting new transaction " + "(pdus: %d, edus: %d, failures: %d)", destination, - len(pending_pdus), len(pending_edus), len(pending_failures)) + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -847,8 +854,11 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.info("TX [%s] Sending transaction [%s]", destination, - transaction.transaction_id) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) # Actually send the transaction @@ -905,11 +915,14 @@ class _TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.warn("TX [%s] Problem in _attempt_transaction: %s", - destination, e) + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) - + for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -925,12 +938,17 @@ class _TransactionQueue(object): def set_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again + if retry_interval: retry_interval *= 2 # plateau at hourly retries for now if retry_interval >= 60 * 60 * 1000: retry_interval = 60 * 60 * 1000 else: - retry_interval = 2000 # try again at first after 2 seconds - yield self.store.set_destination_retry_timings(destination, - int(self._clock.time_msec()), retry_interval) + retry_interval = 2000 # try again at first after 2 seconds + + yield self.store.set_destination_retry_timings( + destination, + int(self._clock.time_msec()), + retry_interval + ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 8fc6bf8f97..16fb2adab5 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -130,12 +130,20 @@ class MatrixFederationHttpClient(object): break except Exception as e: if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn("DNS Lookup failed to %s with %s", destination, - e) + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) raise SynapseError(400, "Domain specified not found.") - logger.warn("Sending request failed to %s: %s %s : %s", - destination, method, url_bytes, e) + logger.warn( + "Sending request failed to %s: %s %s : %s", + destination, + method, + url_bytes, + e + ) _print_ex(e) if retries_left: @@ -144,10 +152,15 @@ class MatrixFederationHttpClient(object): else: raise - logger.info("Received response %d %s for %s: %s %s", - response.code, response.phrase, - destination, method, url_bytes) - + logger.info( + "Received response %d %s for %s: %s %s", + response.code, + response.phrase, + destination, + method, + url_bytes + ) + if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? pass diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 2b16787695..423cc3f02a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. """ - + # a write-through cache of DestinationsTable.EntryType indexed by # destination string destination_retry_cache = {} @@ -213,21 +213,21 @@ class TransactionStore(SQLBaseStore): def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. - + Args: destination (str) - + Returns: None if not retrying Otherwise a DestinationsTable.EntryType for the retry scheme """ if destination in self.destination_retry_cache: return defer.succeed(self.destination_retry_cache[destination]) - + return self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) - + def _get_destination_retry_timings(cls, txn, destination): query = DestinationsTable.select_statement("destination = ?") txn.execute(query, (destination,)) @@ -238,30 +238,36 @@ class TransactionStore(SQLBaseStore): return result else: return None - + def set_destination_retry_timings(self, destination, retry_last_ts, retry_interval): """Sets the current retry timings for a given destination. Both timings should be zero if retrying is no longer occuring. - + Args: destination (str) retry_last_ts (int) - time of last retry attempt in unix epoch ms retry_interval (int) - how long until next retry in ms """ - + self.destination_retry_cache[destination] = ( - DestinationsTable.EntryType(destination, - retry_last_ts, retry_interval) + DestinationsTable.EntryType( + destination, + retry_last_ts, + retry_interval + ) ) - + # XXX: we could chose to not bother persisting this if our cache thinks # this is a NOOP return self.runInteraction( "set_destination_retry_timings", - self._set_destination_retry_timings, destination, - retry_last_ts, retry_interval) - + self._set_destination_retry_timings, + destination, + retry_last_ts, + retry_interval, + ) + def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval): @@ -275,21 +281,22 @@ class TransactionStore(SQLBaseStore): def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. - + Returns: list: A list of `DestinationsTable.EntryType` """ - + return self.runInteraction( "get_destinations_needing_retry", self._get_destinations_needing_retry ) - + def _get_destinations_needing_retry(cls, txn): where = "retry_last_ts > 0 and retry_next_ts < now()" query = DestinationsTable.select_statement(where) txn.execute(query) - return DestinationsTable.decode_results(txn.fetchall()) + return DestinationsTable.decode_results(txn.fetchall()) + class ReceivedTransactionsTable(Table): table_name = "received_transactions" @@ -332,14 +339,15 @@ class TransactionsToPduTable(Table): ] EntryType = namedtuple("TransactionsToPduEntry", fields) - + + class DestinationsTable(Table): table_name = "destinations" - + fields = [ "destination", "retry_last_ts", "retry_interval", ] - EntryType = namedtuple("DestinationsEntry", fields) \ No newline at end of file + EntryType = namedtuple("DestinationsEntry", fields) -- cgit 1.4.1 From f26ec14b2167a2a52e30f3e6d52f50cbb5cdbf52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 10:25:21 +0000 Subject: Remove whitespace --- tests/handlers/test_typing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 2f170ac3bf..7b390e4346 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -167,7 +167,7 @@ class TypingNotificationsTestCase(unittest.TestCase): @defer.inlineCallbacks def test_started_typing_remote_send(self): self.room_members = [self.u_apple, self.u_onion] - + put_json = self.mock_http_client.put_json put_json.expect_call_and_return( call("farm", -- cgit 1.4.1 From 08aceea82e834119a0152198edef738d8350cba1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Dec 2014 10:26:12 +0000 Subject: Add newline back in --- tests/handlers/test_typing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 7b390e4346..7e6ed9a42f 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -51,6 +51,7 @@ class JustTypingNotificationHandlers(object): def __init__(self, hs): self.typing_notification_handler = TypingNotificationHandler(hs) + class TypingNotificationsTestCase(unittest.TestCase): """Tests typing notifications to rooms.""" def setUp(self): -- cgit 1.4.1 From 0f4dcab238b029407080cb02cc2cf14e22d8fe89 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Wed, 10 Dec 2014 10:28:27 +0000 Subject: turn back on per-request transaction retries, so that every time we try to hit a dead server we actually end up hammering 5 times :| --- synapse/http/matrixfederationclient.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 16fb2adab5..fc5b5ab809 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -101,10 +101,9 @@ class MatrixFederationHttpClient(object): ] ) - # was 5; for now, let's only try once at the HTTP layer and then - # rely on transaction-layer retries for exponential backoff and - # getting the message through. - retries_left = 0 + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) + retries_left = 5 endpoint = self._getEndpoint(reactor, destination) -- cgit 1.4.1