diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5dcd4eecce..deee0f4904 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.events import FrozenEvent, builder
import synapse.metrics
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
import copy
import itertools
@@ -88,7 +88,7 @@ class FederationClient(FederationBase):
@log_function
def make_query(self, destination, query_type, args,
- retry_on_dns_fail=False):
+ retry_on_dns_fail=False, ignore_backoff=False):
"""Sends a federation Query to a remote homeserver of the given type
and arguments.
@@ -98,6 +98,8 @@ class FederationClient(FederationBase):
handler name used in register_query_handler().
args (dict): Mapping of strings to strings containing the details
of the query request.
+ ignore_backoff (bool): true to ignore the historical backoff data
+ and try the request anyway.
Returns:
a Deferred which will eventually yield a JSON object from the
@@ -106,7 +108,8 @@ class FederationClient(FederationBase):
sent_queries_counter.inc(query_type)
return self.transport_layer.make_query(
- destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
+ destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
+ ignore_backoff=ignore_backoff,
)
@log_function
@@ -234,31 +237,24 @@ class FederationClient(FederationBase):
continue
try:
- limiter = yield get_retry_limiter(
- destination,
- self._clock,
- self.store,
+ transaction_data = yield self.transport_layer.get_event(
+ destination, event_id, timeout=timeout,
)
- with limiter:
- transaction_data = yield self.transport_layer.get_event(
- destination, event_id, timeout=timeout,
- )
-
- logger.debug("transaction_data %r", transaction_data)
+ logger.debug("transaction_data %r", transaction_data)
- pdu_list = [
- self.event_from_pdu_json(p, outlier=outlier)
- for p in transaction_data["pdus"]
- ]
+ pdu_list = [
+ self.event_from_pdu_json(p, outlier=outlier)
+ for p in transaction_data["pdus"]
+ ]
- if pdu_list and pdu_list[0]:
- pdu = pdu_list[0]
+ if pdu_list and pdu_list[0]:
+ pdu = pdu_list[0]
- # Check signatures are correct.
- signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
+ # Check signatures are correct.
+ signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
- break
+ break
pdu_attempts[destination] = now
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e922b7ff4a..bc20b9c201 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -52,7 +52,6 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
- self._room_pdu_linearizer = Linearizer("fed_room_pdu")
self._server_linearizer = Linearizer("fed_server")
# We cache responses to state queries, as they take a while and often
@@ -147,11 +146,15 @@ class FederationServer(FederationBase):
# check that it's actually being sent from a valid destination to
# workaround bug #1753 in 0.18.5 and 0.18.6
if transaction.origin != get_domain_from_id(pdu.event_id):
+ # We continue to accept join events from any server; this is
+ # necessary for the federation join dance to work correctly.
+ # (When we join over federation, the "helper" server is
+ # responsible for sending out the join event, rather than the
+ # origin. See bug #1893).
if not (
pdu.type == 'm.room.member' and
pdu.content and
- pdu.content.get("membership", None) == 'join' and
- self.hs.is_mine_id(pdu.state_key)
+ pdu.content.get("membership", None) == 'join'
):
logger.info(
"Discarding PDU %s from invalid origin %s",
@@ -165,7 +168,7 @@ class FederationServer(FederationBase):
)
try:
- yield self._handle_new_pdu(transaction.origin, pdu)
+ yield self._handle_received_pdu(transaction.origin, pdu)
results.append({})
except FederationError as e:
self.send_failure(e, transaction.origin)
@@ -497,27 +500,16 @@ class FederationServer(FederationBase):
)
@defer.inlineCallbacks
- @log_function
- def _handle_new_pdu(self, origin, pdu, get_missing=True):
-
- # We reprocess pdus when we have seen them only as outliers
- existing = yield self._get_persisted_pdu(
- origin, pdu.event_id, do_auth=False
- )
-
- # FIXME: Currently we fetch an event again when we already have it
- # if it has been marked as an outlier.
+ def _handle_received_pdu(self, origin, pdu):
+ """ Process a PDU received in a federation /send/ transaction.
- already_seen = (
- existing and (
- not existing.internal_metadata.is_outlier()
- or pdu.internal_metadata.is_outlier()
- )
- )
- if already_seen:
- logger.debug("Already seen pdu %s", pdu.event_id)
- return
+ Args:
+ origin (str): server which sent the pdu
+ pdu (FrozenEvent): received pdu
+ Returns (Deferred): completes with None
+ Raises: FederationError if the signatures / hash do not match
+ """
# Check signature.
try:
pdu = yield self._check_sigs_and_hash(pdu)
@@ -529,143 +521,7 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
- state = None
-
- auth_chain = []
-
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
- fetch_state = False
-
- # Get missing pdus if necessary.
- if not pdu.internal_metadata.is_outlier():
- # We only backfill backwards to the min depth.
- min_depth = yield self.handler.get_min_depth_for_context(
- pdu.room_id
- )
-
- logger.debug(
- "_handle_new_pdu min_depth for %s: %d",
- pdu.room_id, min_depth
- )
-
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
-
- if min_depth and pdu.depth < min_depth:
- # This is so that we don't notify the user about this
- # message, to work around the fact that some events will
- # reference really really old events we really don't want to
- # send to the clients.
- pdu.internal_metadata.outlier = True
- elif min_depth and pdu.depth > min_depth:
- if get_missing and prevs - seen:
- # If we're missing stuff, ensure we only fetch stuff one
- # at a time.
- logger.info(
- "Acquiring lock for room %r to fetch %d missing events: %r...",
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
- )
- with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
- logger.info(
- "Acquired lock for room %r to fetch %d missing events",
- pdu.room_id, len(prevs - seen),
- )
-
- # We recalculate seen, since it may have changed.
- have_seen = yield self.store.have_events(prevs)
- seen = set(have_seen.keys())
-
- if prevs - seen:
- latest = yield self.store.get_latest_event_ids_in_room(
- pdu.room_id
- )
-
- # We add the prev events that we have seen to the latest
- # list to ensure the remote server doesn't give them to us
- latest = set(latest)
- latest |= seen
-
- logger.info(
- "Missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
- )
-
- # XXX: we set timeout to 10s to help workaround
- # https://github.com/matrix-org/synapse/issues/1733.
- # The reason is to avoid holding the linearizer lock
- # whilst processing inbound /send transactions, causing
- # FDs to stack up and block other inbound transactions
- # which empirically can currently take up to 30 minutes.
- #
- # N.B. this explicitly disables retry attempts.
- #
- # N.B. this also increases our chances of falling back to
- # fetching fresh state for the room if the missing event
- # can't be found, which slightly reduces our security.
- # it may also increase our DAG extremity count for the room,
- # causing additional state resolution? See #1760.
- # However, fetching state doesn't hold the linearizer lock
- # apparently.
- #
- # see https://github.com/matrix-org/synapse/pull/1744
-
- missing_events = yield self.get_missing_events(
- origin,
- pdu.room_id,
- earliest_events_ids=list(latest),
- latest_events=[pdu],
- limit=10,
- min_depth=min_depth,
- timeout=10000,
- )
-
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- missing_events.sort(key=lambda x: x.depth)
-
- for e in missing_events:
- yield self._handle_new_pdu(
- origin,
- e,
- get_missing=False
- )
-
- have_seen = yield self.store.have_events(
- [ev for ev, _ in pdu.prev_events]
- )
-
- prevs = {e_id for e_id, _ in pdu.prev_events}
- seen = set(have_seen.keys())
- if prevs - seen:
- logger.info(
- "Still missing %d events for room %r: %r...",
- len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
- )
- fetch_state = True
-
- if fetch_state:
- # We need to get the state at this event, since we haven't
- # processed all the prev events.
- logger.debug(
- "_handle_new_pdu getting state for %s",
- pdu.room_id
- )
- try:
- state, auth_chain = yield self.get_state_for_room(
- origin, pdu.room_id, pdu.event_id,
- )
- except:
- logger.exception("Failed to get state for event: %s", pdu.event_id)
-
- yield self.handler.on_receive_pdu(
- origin,
- pdu,
- state=state,
- auth_chain=auth_chain,
- )
+ yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5c9f7a86f0..78c852ed69 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -35,7 +35,6 @@ from synapse.util.metrics import Measure
import synapse.metrics
from blist import sorteddict
-import ujson
metrics = synapse.metrics.get_metrics_for(__name__)
@@ -54,6 +53,7 @@ class FederationRemoteSendQueue(object):
def __init__(self, hs):
self.server_name = hs.hostname
self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
self.presence_map = {}
self.presence_changed = sorteddict()
@@ -186,6 +186,8 @@ class FederationRemoteSendQueue(object):
else:
self.edus[pos] = edu
+ self.notifier.on_new_replication_data()
+
def send_presence(self, destination, states):
"""As per TransactionQueue"""
pos = self._next_pos()
@@ -199,24 +201,33 @@ class FederationRemoteSendQueue(object):
(destination, state.user_id) for state in states
]
+ self.notifier.on_new_replication_data()
+
def send_failure(self, failure, destination):
"""As per TransactionQueue"""
pos = self._next_pos()
self.failures[pos] = (destination, str(failure))
+ self.notifier.on_new_replication_data()
def send_device_messages(self, destination):
"""As per TransactionQueue"""
pos = self._next_pos()
self.device_messages[pos] = destination
+ self.notifier.on_new_replication_data()
def get_current_token(self):
return self.pos - 1
- def get_replication_rows(self, token, limit, federation_ack=None):
- """
+ def federation_ack(self, token):
+ self._clear_queue_before_pos(token)
+
+ def get_replication_rows(self, from_token, to_token, limit, federation_ack=None):
+ """Get rows to be sent over federation between the two tokens
+
Args:
- token (int)
+ from_token (int)
+ to_token(int)
limit (int)
federation_ack (int): Optional. The position where the worker is
explicitly acknowledged it has handled. Allows us to drop
@@ -225,8 +236,8 @@ class FederationRemoteSendQueue(object):
# TODO: Handle limit.
# To handle restarts where we wrap around
- if token > self.pos:
- token = -1
+ if from_token > self.pos:
+ from_token = -1
rows = []
@@ -237,60 +248,65 @@ class FederationRemoteSendQueue(object):
# Fetch changed presence
keys = self.presence_changed.keys()
- i = keys.bisect_right(token)
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
dest_user_ids = set(
(pos, dest_user_id)
- for pos in keys[i:]
+ for pos in keys[i:j]
for dest_user_id in self.presence_changed[pos]
)
for (key, (dest, user_id)) in dest_user_ids:
- rows.append((key, PRESENCE_TYPE, ujson.dumps({
+ rows.append((key, PRESENCE_TYPE, {
"destination": dest,
"state": self.presence_map[user_id].as_dict(),
- })))
+ }))
# Fetch changes keyed edus
keys = self.keyed_edu_changed.keys()
- i = keys.bisect_right(token)
- keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j])
for (pos, (destination, edu_key)) in keyed_edus:
rows.append(
- (pos, KEYED_EDU_TYPE, ujson.dumps({
+ (pos, KEYED_EDU_TYPE, {
"key": edu_key,
"edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
- }))
+ })
)
# Fetch changed edus
keys = self.edus.keys()
- i = keys.bisect_right(token)
- edus = set((k, self.edus[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ edus = set((k, self.edus[k]) for k in keys[i:j])
for (pos, edu) in edus:
- rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
+ rows.append((pos, EDU_TYPE, edu.get_internal_dict()))
# Fetch changed failures
keys = self.failures.keys()
- i = keys.bisect_right(token)
- failures = set((k, self.failures[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ failures = set((k, self.failures[k]) for k in keys[i:j])
for (pos, (destination, failure)) in failures:
- rows.append((pos, FAILURE_TYPE, ujson.dumps({
+ rows.append((pos, FAILURE_TYPE, {
"destination": destination,
"failure": failure,
- })))
+ }))
# Fetch changed device messages
keys = self.device_messages.keys()
- i = keys.bisect_right(token)
- device_messages = set((k, self.device_messages[k]) for k in keys[i:])
+ i = keys.bisect_right(from_token)
+ j = keys.bisect_right(to_token) + 1
+ device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
for (pos, destination) in device_messages:
- rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
+ rows.append((pos, DEVICE_MESSAGE_TYPE, {
"destination": destination,
- })))
+ }))
# Sort rows based on pos
rows.sort()
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 90235ff098..c27ce7c5f3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import datetime
from twisted.internet import defer
@@ -22,9 +22,7 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
-from synapse.util.retryutils import (
- get_retry_limiter, NotRetryingDestination,
-)
+from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
@@ -99,7 +97,12 @@ class TransactionQueue(object):
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}
+ # destination -> stream_id of last successfully sent to-device message.
+ # NB: may be a long or an int.
self.last_device_stream_id_by_dest = {}
+
+ # destination -> stream_id of last successfully sent device list
+ # update.
self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
@@ -300,20 +303,20 @@ class TransactionQueue(object):
)
return
+ pending_pdus = []
try:
self.pending_transactions[destination] = 1
+ # This will throw if we wouldn't retry. We do this here so we fail
+ # quickly, but we will later check this again in the http client,
+ # hence why we throw the result away.
+ yield get_retry_limiter(destination, self.clock, self.store)
+
# XXX: what's this for?
yield run_on_reactor()
+ pending_pdus = []
while True:
- limiter = yield get_retry_limiter(
- destination,
- self.clock,
- self.store,
- backoff_on_404=True, # If we get a 404 the other side has gone
- )
-
device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination)
)
@@ -369,7 +372,6 @@ class TransactionQueue(object):
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
- limiter=limiter,
)
if success:
# Remove the acknowledged device messages from the database
@@ -387,12 +389,24 @@ class TransactionQueue(object):
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
- except NotRetryingDestination:
+ except NotRetryingDestination as e:
logger.debug(
- "TX [%s] not ready for retry yet - "
+ "TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now",
destination,
+ datetime.datetime.fromtimestamp(
+ (e.retry_last_ts + e.retry_interval) / 1000.0
+ ),
)
+ except Exception as e:
+ logger.warn(
+ "TX [%s] Failed to send transaction: %s",
+ destination,
+ e,
+ )
+ for p, _ in pending_pdus:
+ logger.info("Failed to send event %s to %s", p.event_id,
+ destination)
finally:
# We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None)
@@ -432,7 +446,7 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
- pending_failures, limiter):
+ pending_failures):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@@ -442,132 +456,104 @@ class TransactionQueue(object):
success = True
- try:
- logger.debug("TX [%s] _attempt_new_transaction", destination)
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
- txn_id = str(self._next_txn_id)
+ txn_id = str(self._next_txn_id)
- logger.debug(
- "TX [%s] {%s} Attempting new transaction"
- " (pdus: %d, edus: %d, failures: %d)",
- destination, txn_id,
- len(pdus),
- len(edus),
- len(failures)
- )
+ logger.debug(
+ "TX [%s] {%s} Attempting new transaction"
+ " (pdus: %d, edus: %d, failures: %d)",
+ destination, txn_id,
+ len(pdus),
+ len(edus),
+ len(failures)
+ )
- logger.debug("TX [%s] Persisting transaction...", destination)
+ logger.debug("TX [%s] Persisting transaction...", destination)
- transaction = Transaction.create_new(
- origin_server_ts=int(self.clock.time_msec()),
- transaction_id=txn_id,
- origin=self.server_name,
- destination=destination,
- pdus=pdus,
- edus=edus,
- pdu_failures=failures,
- )
+ transaction = Transaction.create_new(
+ origin_server_ts=int(self.clock.time_msec()),
+ transaction_id=txn_id,
+ origin=self.server_name,
+ destination=destination,
+ pdus=pdus,
+ edus=edus,
+ pdu_failures=failures,
+ )
- self._next_txn_id += 1
+ self._next_txn_id += 1
- yield self.transaction_actions.prepare_to_send(transaction)
+ yield self.transaction_actions.prepare_to_send(transaction)
- logger.debug("TX [%s] Persisted transaction", destination)
- logger.info(
- "TX [%s] {%s} Sending transaction [%s],"
- " (PDUs: %d, EDUs: %d, failures: %d)",
- destination, txn_id,
- transaction.transaction_id,
- len(pdus),
- len(edus),
- len(failures),
- )
+ logger.debug("TX [%s] Persisted transaction", destination)
+ logger.info(
+ "TX [%s] {%s} Sending transaction [%s],"
+ " (PDUs: %d, EDUs: %d, failures: %d)",
+ destination, txn_id,
+ transaction.transaction_id,
+ len(pdus),
+ len(edus),
+ len(failures),
+ )
- with limiter:
- # Actually send the transaction
-
- # FIXME (erikj): This is a bit of a hack to make the Pdu age
- # keys work
- def json_data_cb():
- data = transaction.get_dict()
- now = int(self.clock.time_msec())
- if "pdus" in data:
- for p in data["pdus"]:
- if "age_ts" in p:
- unsigned = p.setdefault("unsigned", {})
- unsigned["age"] = now - int(p["age_ts"])
- del p["age_ts"]
- return data
-
- try:
- response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
- code = 200
-
- if response:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "Transaction returned error for %s: %s",
- e_id, r,
- )
- except HttpResponseException as e:
- code = e.code
- response = e.response
-
- if e.code in (401, 404, 429) or 500 <= e.code:
- logger.info(
- "TX [%s] {%s} got %d response",
- destination, txn_id, code
+ # Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def json_data_cb():
+ data = transaction.get_dict()
+ now = int(self.clock.time_msec())
+ if "pdus" in data:
+ for p in data["pdus"]:
+ if "age_ts" in p:
+ unsigned = p.setdefault("unsigned", {})
+ unsigned["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
+ return data
+
+ try:
+ response = yield self.transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+
+ if response:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "Transaction returned error for %s: %s",
+ e_id, r,
)
- raise e
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
+ if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
)
+ raise e
- logger.debug("TX [%s] Sent transaction", destination)
- logger.debug("TX [%s] Marking as delivered...", destination)
+ logger.info(
+ "TX [%s] {%s} got %d response",
+ destination, txn_id, code
+ )
- yield self.transaction_actions.delivered(
- transaction, code, response
- )
+ logger.debug("TX [%s] Sent transaction", destination)
+ logger.debug("TX [%s] Marking as delivered...", destination)
- logger.debug("TX [%s] Marked as delivered", destination)
+ yield self.transaction_actions.delivered(
+ transaction, code, response
+ )
- if code != 200:
- for p in pdus:
- logger.info(
- "Failed to send event %s to %s", p.event_id, destination
- )
- success = False
- except RuntimeError as e:
- # We capture this here as there as nothing actually listens
- # for this finishing functions deferred.
- logger.warn(
- "TX [%s] Problem in _attempt_transaction: %s",
- destination,
- e,
- )
-
- success = False
+ logger.debug("TX [%s] Marked as delivered", destination)
+ if code != 200:
for p in pdus:
- logger.info("Failed to send event %s to %s", p.event_id, destination)
- except Exception as e:
- # We capture this here as there as nothing actually listens
- # for this finishing functions deferred.
- logger.warn(
- "TX [%s] Problem in _attempt_transaction: %s",
- destination,
- e,
- )
-
+ logger.info(
+ "Failed to send event %s to %s", p.event_id, destination
+ )
success = False
- for p in pdus:
- logger.info("Failed to send event %s to %s", p.event_id, destination)
-
defer.returnValue(success)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index f49e8a2cc4..15a03378f5 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -163,6 +163,7 @@ class TransportLayerClient(object):
data=json_data,
json_data_callback=json_data_callback,
long_retries=True,
+ backoff_on_404=True, # If we get a 404 the other side has gone
)
logger.debug(
@@ -174,7 +175,8 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def make_query(self, destination, query_type, args, retry_on_dns_fail):
+ def make_query(self, destination, query_type, args, retry_on_dns_fail,
+ ignore_backoff=False):
path = PREFIX + "/query/%s" % query_type
content = yield self.client.get_json(
@@ -183,6 +185,7 @@ class TransportLayerClient(object):
args=args,
retry_on_dns_fail=retry_on_dns_fail,
timeout=10000,
+ ignore_backoff=ignore_backoff,
)
defer.returnValue(content)
@@ -242,6 +245,7 @@ class TransportLayerClient(object):
destination=destination,
path=path,
data=content,
+ ignore_backoff=True,
)
defer.returnValue(response)
@@ -269,6 +273,7 @@ class TransportLayerClient(object):
destination=remote_server,
path=path,
args=args,
+ ignore_backoff=True,
)
defer.returnValue(response)
|