diff --git a/.gitignore b/.gitignore
index 339a99e0d6..af90668c89 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,3 +38,5 @@ graph/*.dot
**/webclient/test/environment-protractor.js
uploads
+
+.idea/
diff --git a/docs/code_style.rst b/docs/code_style.rst
index d7e2d5e69e..dc40a7ab7b 100644
--- a/docs/code_style.rst
+++ b/docs/code_style.rst
@@ -1,10 +1,14 @@
Basically, PEP8
-- Max line width: 80 chars.
+- NEVER tabs. 4 spaces to indent.
+- Max line width: 79 chars (with flexibility to overflow by a "few chars" if
+ the overflowing content is not semantically significant and avoids an
+ explosion of vertical whitespace).
- Use camel case for class and type names
- Use underscores for functions and variables.
- Use double quotes.
-- Use parentheses instead of '\' for line continuation where ever possible (which is pretty much everywhere)
+- Use parentheses instead of '\\' for line continuation where ever possible
+ (which is pretty much everywhere)
- There should be max a single new line between:
- statements
- functions in a class
@@ -14,5 +18,32 @@ Basically, PEP8
- a single space after a comma
- a single space before and after for '=' when used as assignment
- no spaces before and after for '=' for default values and keyword arguments.
+- Indenting must follow PEP8; either hanging indent or multiline-visual indent
+ depending on the size and shape of the arguments and what makes more sense to
+ the author. In other words, both this::
-Comments should follow the google code style. This is so that we can generate documentation with sphinx (http://sphinxcontrib-napoleon.readthedocs.org/en/latest/)
+ print("I am a fish %s" % "moo")
+
+ and this::
+
+ print("I am a fish %s" %
+ "moo")
+
+ and this::
+
+ print(
+ "I am a fish %s" %
+ "moo"
+ )
+
+ ...are valid, although given each one takes up 2x more vertical space than
+ the previous, it's up to the author's discretion as to which layout makes most
+ sense for their function invocation. (e.g. if they want to add comments
+ per-argument, or put expressions in the arguments, or group related arguments
+ together, or want to deliberately extend or preserve vertical/horizontal
+ space)
+
+Comments should follow the google code style. This is so that we can generate
+documentation with sphinx (http://sphinxcontrib-napoleon.readthedocs.org/en/latest/)
+
+Code should pass pep8 --max-line-length=100 without any warnings.
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 01f87fe423..0cb632fb08 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -334,7 +334,7 @@ class ReplicationLayer(object):
defer.returnValue(response)
return
- logger.debug("[%s] Transacition is new", transaction.transaction_id)
+ logger.debug("[%s] Transaction is new", transaction.transaction_id)
with PreserveLoggingContext():
dl = []
@@ -685,6 +685,7 @@ class _TransactionQueue(object):
self.transport_layer = transport_layer
self._clock = hs.get_clock()
+ self.store = hs.get_datastore()
# Is a mapping from destinations -> deferreds. Used to keep track
# of which destinations have transactions in flight and when they are
@@ -728,8 +729,14 @@ class _TransactionQueue(object):
(pdu, deferred, order)
)
+ def eb(failure):
+ if not deferred.called:
+ deferred.errback(failure)
+ else:
+ logger.warn("Failed to send pdu", failure)
+
with PreserveLoggingContext():
- self._attempt_new_transaction(destination)
+ self._attempt_new_transaction(destination).addErrback(eb)
deferreds.append(deferred)
@@ -739,6 +746,9 @@ class _TransactionQueue(object):
def enqueue_edu(self, edu):
destination = edu.destination
+ if destination == self.server_name:
+ return
+
deferred = defer.Deferred()
self.pending_edus_by_dest.setdefault(destination, []).append(
(edu, deferred)
@@ -748,7 +758,7 @@ class _TransactionQueue(object):
if not deferred.called:
deferred.errback(failure)
else:
- logger.exception("Failed to send edu", failure)
+ logger.warn("Failed to send edu", failure)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb)
@@ -770,10 +780,33 @@ class _TransactionQueue(object):
@defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
+
+ (retry_last_ts, retry_interval) = (0, 0)
+ retry_timings = yield self.store.get_destination_retry_timings(
+ destination
+ )
+ if retry_timings:
+ (retry_last_ts, retry_interval) = (
+ retry_timings.retry_last_ts, retry_timings.retry_interval
+ )
+ if retry_last_ts + retry_interval > int(self._clock.time_msec()):
+ logger.info(
+ "TX [%s] not ready for retry yet - "
+ "dropping transaction for now",
+ destination,
+ )
+ return
+ else:
+ logger.info("TX [%s] is ready for retry", destination)
+
if destination in self.pending_transactions:
+ # XXX: pending_transactions can get stuck on by a never-ending
+ # request at which point pending_pdus_by_dest just keeps growing.
+ # we need application-layer timeouts of some flavour of these
+ # requests
return
- # list of (pending_pdu, deferred, order)
+ # list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_failures = self.pending_failures_by_dest.pop(destination, [])
@@ -781,7 +814,14 @@ class _TransactionQueue(object):
if not pending_pdus and not pending_edus and not pending_failures:
return
- logger.debug("TX [%s] Attempting new transaction", destination)
+ logger.debug(
+ "TX [%s] Attempting new transaction "
+ "(pdus: %d, edus: %d, failures: %d)",
+ destination,
+ len(pending_pdus),
+ len(pending_edus),
+ len(pending_failures)
+ )
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[2])
@@ -814,7 +854,11 @@ class _TransactionQueue(object):
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
- logger.debug("TX [%s] Sending transaction...", destination)
+ logger.info(
+ "TX [%s] Sending transaction [%s]",
+ destination,
+ transaction.transaction_id,
+ )
# Actually send the transaction
@@ -835,6 +879,8 @@ class _TransactionQueue(object):
transaction, json_data_cb
)
+ logger.info("TX [%s] got %d response", destination, code)
+
logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)
@@ -847,8 +893,14 @@ class _TransactionQueue(object):
for deferred in deferreds:
if code == 200:
+ if retry_last_ts:
+ # this host is alive! reset retry schedule
+ yield self.store.set_destination_retry_timings(
+ destination, 0, 0
+ )
deferred.callback(None)
else:
+ self.set_retrying(destination, retry_interval)
deferred.errback(RuntimeError("Got status %d" % code))
# Ensures we don't continue until all callbacks on that
@@ -861,11 +913,15 @@ class _TransactionQueue(object):
logger.debug("TX [%s] Yielded to callbacks", destination)
except Exception as e:
- logger.error("TX Problem in _attempt_transaction")
-
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
- logger.exception(e)
+ logger.warn(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
+
+ self.set_retrying(destination, retry_interval)
for deferred in deferreds:
if not deferred.called:
@@ -877,3 +933,22 @@ class _TransactionQueue(object):
# Check to see if there is anything else to send.
self._attempt_new_transaction(destination)
+
+ @defer.inlineCallbacks
+ def set_retrying(self, destination, retry_interval):
+ # track that this destination is having problems and we should
+ # give it a chance to recover before trying it again
+
+ if retry_interval:
+ retry_interval *= 2
+ # plateau at hourly retries for now
+ if retry_interval >= 60 * 60 * 1000:
+ retry_interval = 60 * 60 * 1000
+ else:
+ retry_interval = 2000 # try again at first after 2 seconds
+
+ yield self.store.set_destination_retry_timings(
+ destination,
+ int(self._clock.time_msec()),
+ retry_interval
+ )
diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py
index 8d86152085..0f11c6d491 100644
--- a/synapse/federation/transport.py
+++ b/synapse/federation/transport.py
@@ -155,7 +155,7 @@ class TransportLayer(object):
@defer.inlineCallbacks
@log_function
def send_transaction(self, transaction, json_data_callback=None):
- """ Sends the given Transaction to it's destination
+ """ Sends the given Transaction to its destination
Args:
transaction (Transaction)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 925eb5376e..cfb5029774 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -207,6 +207,13 @@ class FederationHandler(BaseHandler):
e.msg,
affected=event.event_id,
)
+
+ # if we're receiving valid events from an origin,
+ # it's probably a good idea to mark it as not in retry-state
+ # for sending (although this is a bit of a leap)
+ retry_timings = yield self.store.get_destination_retry_timings(origin)
+ if (retry_timings and retry_timings.retry_last_ts):
+ self.store.set_destination_retry_timings(origin, 0, 0)
room = yield self.store.get_room(event.room_id)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c7082b83a7..f05269cdfb 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,8 +90,8 @@ class MatrixFederationHttpClient(object):
("", "", path_bytes, param_bytes, query_bytes, "",)
)
- logger.debug("Sending request to %s: %s %s",
- destination, method, url_bytes)
+ logger.info("Sending request to %s: %s %s",
+ destination, method, url_bytes)
logger.debug(
"Types: %s",
@@ -102,6 +102,8 @@ class MatrixFederationHttpClient(object):
]
)
+ # XXX: Would be much nicer to retry only at the transaction-layer
+ # (once we have reliable transactions in place)
retries_left = 5
endpoint = self._getEndpoint(reactor, destination)
@@ -128,11 +130,20 @@ class MatrixFederationHttpClient(object):
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn("DNS Lookup failed to %s with %s", destination,
- e)
+ logger.warn(
+ "DNS Lookup failed to %s with %s",
+ destination,
+ e
+ )
raise SynapseError(400, "Domain specified not found.")
- logger.exception("Got error in _create_request")
+ logger.warn(
+ "Sending request failed to %s: %s %s : %s",
+ destination,
+ method,
+ url_bytes,
+ e
+ )
_print_ex(e)
if retries_left:
@@ -141,15 +152,21 @@ class MatrixFederationHttpClient(object):
else:
raise
+ logger.info(
+ "Received response %d %s for %s: %s %s",
+ response.code,
+ response.phrase,
+ destination,
+ method,
+ url_bytes
+ )
+
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
pass
else:
# :'(
# Update transactions table?
- logger.error(
- "Got response %d %s", response.code, response.phrase
- )
raise CodeMessageException(
response.code, response.phrase
)
@@ -347,7 +364,7 @@ def _print_ex(e):
for ex in e.reasons:
_print_ex(ex)
else:
- logger.exception(e)
+ logger.warn(e)
class _JsonProducer(object):
diff --git a/synapse/rest/transactions.py b/synapse/rest/transactions.py
index 93c0122f30..8c41ab4edb 100644
--- a/synapse/rest/transactions.py
+++ b/synapse/rest/transactions.py
@@ -19,7 +19,7 @@ import logging
logger = logging.getLogger(__name__)
-
+# FIXME: elsewhere we use FooStore to indicate something in the storage layer...
class HttpTransactionStore(object):
def __init__(self):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index e4bdad98bb..c9ab434b4e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -69,7 +69,7 @@ SCHEMAS = [
# Remember to update this number every time an incompatible change is made to
# database schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 8
+SCHEMA_VERSION = 9
class _RollbackButIsFineException(Exception):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4881f03368..e72200e2f7 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -650,7 +650,7 @@ class JoinHelper(object):
to dump the results into.
Attributes:
- taples (list): List of `Table` classes
+ tables (list): List of `Table` classes
EntryType (type)
"""
diff --git a/synapse/storage/schema/delta/v9.sql b/synapse/storage/schema/delta/v9.sql
new file mode 100644
index 0000000000..ad680c64da
--- /dev/null
+++ b/synapse/storage/schema/delta/v9.sql
@@ -0,0 +1,23 @@
+/* Copyright 2014 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
+
+PRAGMA user_version = 9;
\ No newline at end of file
diff --git a/synapse/storage/schema/transactions.sql b/synapse/storage/schema/transactions.sql
index 88e3e4e04d..de461bfa15 100644
--- a/synapse/storage/schema/transactions.sql
+++ b/synapse/storage/schema/transactions.sql
@@ -59,3 +59,9 @@ CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_tx ON transaction_id_to_pdu(tra
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_dest ON transaction_id_to_pdu(destination);
CREATE INDEX IF NOT EXISTS transaction_id_to_pdu_index ON transaction_id_to_pdu(transaction_id, destination);
+-- To track destination health
+CREATE TABLE IF NOT EXISTS destinations(
+ destination TEXT PRIMARY KEY,
+ retry_last_ts INTEGER,
+ retry_interval INTEGER
+);
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 00d0f48082..423cc3f02a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,8 @@ from ._base import SQLBaseStore, Table
from collections import namedtuple
+from twisted.internet import defer
+
import logging
logger = logging.getLogger(__name__)
@@ -26,6 +28,10 @@ class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
"""
+ # a write-through cache of DestinationsTable.EntryType indexed by
+ # destination string
+ destination_retry_cache = {}
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -114,7 +120,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
- # First we find out what the prev_txs should be.
+ # First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % (
@@ -205,6 +211,92 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
+ def get_destination_retry_timings(self, destination):
+ """Gets the current retry timings (if any) for a given destination.
+
+ Args:
+ destination (str)
+
+ Returns:
+ None if not retrying
+ Otherwise a DestinationsTable.EntryType for the retry scheme
+ """
+ if destination in self.destination_retry_cache:
+ return defer.succeed(self.destination_retry_cache[destination])
+
+ return self.runInteraction(
+ "get_destination_retry_timings",
+ self._get_destination_retry_timings, destination)
+
+ def _get_destination_retry_timings(cls, txn, destination):
+ query = DestinationsTable.select_statement("destination = ?")
+ txn.execute(query, (destination,))
+ result = txn.fetchall()
+ if result:
+ result = DestinationsTable.decode_single_result(result)
+ if result.retry_last_ts > 0:
+ return result
+ else:
+ return None
+
+ def set_destination_retry_timings(self, destination,
+ retry_last_ts, retry_interval):
+ """Sets the current retry timings for a given destination.
+ Both timings should be zero if retrying is no longer occuring.
+
+ Args:
+ destination (str)
+ retry_last_ts (int) - time of last retry attempt in unix epoch ms
+ retry_interval (int) - how long until next retry in ms
+ """
+
+ self.destination_retry_cache[destination] = (
+ DestinationsTable.EntryType(
+ destination,
+ retry_last_ts,
+ retry_interval
+ )
+ )
+
+ # XXX: we could chose to not bother persisting this if our cache thinks
+ # this is a NOOP
+ return self.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings,
+ destination,
+ retry_last_ts,
+ retry_interval,
+ )
+
+ def _set_destination_retry_timings(cls, txn, destination,
+ retry_last_ts, retry_interval):
+
+ query = (
+ "INSERT OR REPLACE INTO %s "
+ "(destination, retry_last_ts, retry_interval) "
+ "VALUES (?, ?, ?) "
+ ) % DestinationsTable.table_name
+
+ txn.execute(query, (destination, retry_last_ts, retry_interval))
+
+ def get_destinations_needing_retry(self):
+ """Get all destinations which are due a retry for sending a transaction.
+
+ Returns:
+ list: A list of `DestinationsTable.EntryType`
+ """
+
+ return self.runInteraction(
+ "get_destinations_needing_retry",
+ self._get_destinations_needing_retry
+ )
+
+ def _get_destinations_needing_retry(cls, txn):
+ where = "retry_last_ts > 0 and retry_next_ts < now()"
+ query = DestinationsTable.select_statement(where)
+ txn.execute(query)
+ return DestinationsTable.decode_results(txn.fetchall())
+
class ReceivedTransactionsTable(Table):
table_name = "received_transactions"
@@ -247,3 +339,15 @@ class TransactionsToPduTable(Table):
]
EntryType = namedtuple("TransactionsToPduEntry", fields)
+
+
+class DestinationsTable(Table):
+ table_name = "destinations"
+
+ fields = [
+ "destination",
+ "retry_last_ts",
+ "retry_interval",
+ ]
+
+ EntryType = namedtuple("DestinationsEntry", fields)
diff --git a/tests/federation/test_federation.py b/tests/federation/test_federation.py
index 73dd289276..f6b41e2c4c 100644
--- a/tests/federation/test_federation.py
+++ b/tests/federation/test_federation.py
@@ -25,6 +25,7 @@ from synapse.server import HomeServer
from synapse.federation import initialize_http_replication
from synapse.api.events import SynapseEvent
+from synapse.storage.transactions import DestinationsTable
def make_pdu(prev_pdus=[], **kwargs):
"""Provide some default fields for making a PduTuple."""
@@ -55,10 +56,14 @@ class FederationTestCase(unittest.TestCase):
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
+ "get_destination_retry_timings",
])
self.mock_persistence.get_received_txn_response.return_value = (
defer.succeed(None)
)
+ self.mock_persistence.get_destination_retry_timings.return_value = (
+ defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ )
self.mock_config = Mock()
self.mock_config.signing_key = [MockKey()]
self.clock = MockClock()
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 33016c16ef..fae33716a3 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -53,6 +53,8 @@ class FederationTestCase(unittest.TestCase):
"persist_event",
"store_room",
"get_room",
+ "get_destination_retry_timings",
+ "set_destination_retry_timings",
]),
resource_for_federation=NonCallableMock(),
http_client=NonCallableMock(spec_set=[]),
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index fe69ce47eb..b85a89052a 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -30,7 +30,7 @@ from synapse.api.constants import PresenceState
from synapse.api.errors import SynapseError
from synapse.handlers.presence import PresenceHandler, UserPresenceCache
from synapse.streams.config import SourcePaginationConfig
-
+from synapse.storage.transactions import DestinationsTable
OFFLINE = PresenceState.OFFLINE
UNAVAILABLE = PresenceState.UNAVAILABLE
@@ -528,6 +528,7 @@ class PresencePushTestCase(unittest.TestCase):
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
+ "get_destination_retry_timings",
]),
handlers=None,
resource_for_client=Mock(),
@@ -539,6 +540,9 @@ class PresencePushTestCase(unittest.TestCase):
hs.handlers = JustPresenceHandlers(hs)
self.datastore = hs.get_datastore()
+ self.datastore.get_destination_retry_timings.return_value = (
+ defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ )
def get_received_txn_response(*args):
return defer.succeed(None)
@@ -1037,6 +1041,7 @@ class PresencePollingTestCase(unittest.TestCase):
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
+ "get_destination_retry_timings",
]),
handlers=None,
resource_for_client=Mock(),
@@ -1048,6 +1053,9 @@ class PresencePollingTestCase(unittest.TestCase):
hs.handlers = JustPresenceHandlers(hs)
self.datastore = hs.get_datastore()
+ self.datastore.get_destination_retry_timings.return_value = (
+ defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ )
def get_received_txn_response(*args):
return defer.succeed(None)
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index adb5148351..7e6ed9a42f 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -25,6 +25,8 @@ from ..utils import MockHttpResource, MockClock, DeferredMockCallable, MockKey
from synapse.server import HomeServer
from synapse.handlers.typing import TypingNotificationHandler
+from synapse.storage.transactions import DestinationsTable
+
def _expect_edu(destination, edu_type, content, origin="test"):
return {
@@ -72,6 +74,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
+ "get_destination_retry_timings",
]),
handlers=None,
resource_for_client=Mock(),
@@ -89,6 +92,9 @@ class TypingNotificationsTestCase(unittest.TestCase):
self.handler.push_update_to_clients = self.mock_update_client
self.datastore = hs.get_datastore()
+ self.datastore.get_destination_retry_timings.return_value = (
+ defer.succeed(DestinationsTable.EntryType("", 0, 0))
+ )
def get_received_txn_response(*args):
return defer.succeed(None)
|