summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-31 10:11:07 +0100
committerErik Johnston <erik@matrix.org>2017-03-31 10:11:07 +0100
commitd7dbc56c716e805c61f768e35acba590ec86667b (patch)
treedaeebe671813605cd33e13b72374f611c0a2b549 /synapse/federation
parentMerge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes (diff)
parentMerge pull request #2075 from matrix-org/erikj/cache_speed (diff)
downloadsynapse-d7dbc56c716e805c61f768e35acba590ec86667b.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/send_queue.py7
-rw-r--r--synapse/federation/transaction_queue.py11
2 files changed, 16 insertions, 2 deletions
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py

index 5c9f7a86f0..bbb0195228 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -54,6 +54,7 @@ class FederationRemoteSendQueue(object): def __init__(self, hs): self.server_name = hs.hostname self.clock = hs.get_clock() + self.notifier = hs.get_notifier() self.presence_map = {} self.presence_changed = sorteddict() @@ -186,6 +187,8 @@ class FederationRemoteSendQueue(object): else: self.edus[pos] = edu + self.notifier.on_new_replication_data() + def send_presence(self, destination, states): """As per TransactionQueue""" pos = self._next_pos() @@ -199,16 +202,20 @@ class FederationRemoteSendQueue(object): (destination, state.user_id) for state in states ] + self.notifier.on_new_replication_data() + def send_failure(self, failure, destination): """As per TransactionQueue""" pos = self._next_pos() self.failures[pos] = (destination, str(failure)) + self.notifier.on_new_replication_data() def send_device_messages(self, destination): """As per TransactionQueue""" pos = self._next_pos() self.device_messages[pos] = destination + self.notifier.on_new_replication_data() def get_current_token(self): return self.pos - 1 diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index d7ecefcc64..c27ce7c5f3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py
@@ -22,7 +22,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn -from synapse.util.retryutils import NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state @@ -303,12 +303,19 @@ class TransactionQueue(object): ) return + pending_pdus = [] try: self.pending_transactions[destination] = 1 + # This will throw if we wouldn't retry. We do this here so we fail + # quickly, but we will later check this again in the http client, + # hence why we throw the result away. + yield get_retry_limiter(destination, self.clock, self.store) + # XXX: what's this for? yield run_on_reactor() + pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( yield self._get_new_device_messages(destination) @@ -397,7 +404,7 @@ class TransactionQueue(object): destination, e, ) - for p in pending_pdus: + for p, _ in pending_pdus: logger.info("Failed to send event %s to %s", p.event_id, destination) finally: