From d4a35ada28302e096efd42e1a2a28542ed7ebd6f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 18:16:20 +0100 Subject: Send device messages over federation --- synapse/federation/transaction_queue.py | 43 +++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 7 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index cb2ef0210c..5e86141f86 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .persistence import TransactionActions -from .units import Transaction +from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor @@ -187,6 +187,24 @@ class TransactionQueue(object): destination, pending_pdus, pending_edus, pending_failures ) + @defer.inlineCallbacks + def _get_new_device_messages(self, destination): + last_device_stream_id = 0 + to_device_stream_id = self.store.get_to_device_stream_token() + contents, stream_id = yield self.store.get_new_device_msgs_for_remote( + destination, last_device_stream_id, to_device_stream_id + ) + edus = [ + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.direct_to_device", + content=content, + ) + for content in contents + ] + defer.returnValue((edus, stream_id)) + @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, @@ -211,13 +229,19 @@ class TransactionQueue(object): self.store, ) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + edus.extend(device_message_edus) + logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", destination, txn_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures) + len(pdus), + len(edus), + len(failures) ) logger.debug("TX [%s] Persisting transaction...", destination) @@ -242,9 +266,9 @@ class TransactionQueue(object): " (PDUs: %d, EDUs: %d, failures: %d)", destination, txn_id, transaction.transaction_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures), + len(pdus), + len(edus), + len(failures), ) with limiter: @@ -299,6 +323,11 @@ class TransactionQueue(object): logger.info( "Failed to send event %s to %s", p.event_id, destination ) + else: + # Remove the acknowledged device messages from the database + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " -- cgit 1.4.1 From 31a07d2335dd628afb32f71167849ad88685525a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:27:07 +0100 Subject: Add stream change caches for device messages --- synapse/federation/transaction_queue.py | 5 ++++- synapse/storage/__init__.py | 24 ++++++++++++++++++++++++ synapse/storage/deviceinbox.py | 25 +++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5e86141f86..233c6606a9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -81,6 +81,8 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + self.last_device_stream_id_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) @@ -189,7 +191,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): - last_device_stream_id = 0 + last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( destination, last_device_stream_id, to_device_stream_id @@ -328,6 +330,7 @@ class TransactionQueue(object): yield self.store.delete_device_msgs_for_remote( destination, device_stream_id ) + self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..6965daddc5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 61da0e89e6..0d37bb961b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -70,6 +70,14 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) @@ -115,6 +123,10 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): @@ -161,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -261,6 +279,13 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" -- cgit 1.4.1 From cb98ac261bbda859574ec33cab934a3269e11e17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:39:13 +0100 Subject: Move the check for federated device_messages. Move the check into _attempt_new_transaction. Only delete messages if there were messages to delete. --- synapse/federation/transaction_queue.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 233c6606a9..c0ee946ac0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -177,6 +177,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + pending_edus.extend(device_message_edus) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) @@ -186,7 +192,9 @@ class TransactionQueue(object): return yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) ) @defer.inlineCallbacks @@ -210,7 +218,8 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures): + pending_failures, device_stream_id, + should_delete_from_device_stream): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -231,12 +240,6 @@ class TransactionQueue(object): self.store, ) - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) - - edus.extend(device_message_edus) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -327,9 +330,10 @@ class TransactionQueue(object): ) else: # Remove the acknowledged device messages from the database - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( -- cgit 1.4.1 From 43954d000e19a622576063de0b48cf9235dec395 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 16:10:51 +0100 Subject: Add a new method to enqueue the device messages rather than sending a dummy EDU --- synapse/federation/federation_client.py | 6 ++++++ synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/devicemessage.py | 10 +++------- 3 files changed, 20 insertions(+), 7 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 627acc6a4f..78719eed25 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -137,6 +137,12 @@ class FederationClient(FederationBase): self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) + @log_function + def send_device_messages(self, destination): + """Sends the device messages in the local database to the remote + destination""" + self._transaction_queue.enqueue_device_messages(destination) + @log_function def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c0ee946ac0..633c79c352 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,6 +157,17 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) + def enqueue_device_messages(self, destination): + if destination == self.server_name or destination == "localhost": + return + + if not self.can_send_to(destination): + return + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + @defer.inlineCallbacks def _attempt_new_transaction(self, destination): yield run_on_reactor() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 7e59c0d487..c5368e5df2 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -112,10 +112,6 @@ class DeviceMessageHandler(object): ) for destination in remote_messages.keys(): - # Hack to send make synapse send a federation transaction - # to the remote servers. - self.federation.send_edu( - destination=destination, - edu_type="m.ping", - content={}, - ) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation.send_device_messages(destination) -- cgit 1.4.1 From b3907561506a98d7e8bbe66efe2037df7ceb70fa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 11:00:15 +0100 Subject: Update last_device_stream_id_by_dest if there is nothing to send --- synapse/federation/transaction_queue.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 633c79c352..5c7245d383 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -200,6 +200,7 @@ class TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = device_stream_id return yield self._send_new_transaction( -- cgit 1.4.1 From d2688d7f03b006a1a4d340dce04550214ae86185 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 11:44:36 +0100 Subject: Correctly guard against multiple concurrent transactions --- synapse/federation/transaction_queue.py | 79 +++++++++++++++++---------------- 1 file changed, 41 insertions(+), 38 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5c7245d383..6900b0121b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -170,44 +170,53 @@ class TransactionQueue(object): @defer.inlineCallbacks def _attempt_new_transaction(self, destination): - yield run_on_reactor() - while True: - # list of (pending_pdu, deferred, order) - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - logger.debug( - "TX [%s] Transaction already in progress", - destination - ) - return + # list of (pending_pdu, deferred, order) + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + logger.debug( + "TX [%s] Transaction already in progress", + destination + ) + return - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_failures = self.pending_failures_by_dest.pop(destination, []) + try: + self.pending_transactions[destination] = 1 - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) + yield run_on_reactor() - pending_edus.extend(device_message_edus) + while True: + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest.pop(destination, []) - if pending_pdus: - logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) - if not pending_pdus and not pending_edus and not pending_failures: - logger.debug("TX [%s] Nothing to send", destination) - self.last_device_stream_id_by_dest[destination] = device_stream_id - return + pending_edus.extend(device_message_edus) - yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures, - device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) - ) + if pending_pdus: + logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) + + if not pending_pdus and not pending_edus and not pending_failures: + logger.debug("TX [%s] Nothing to send", destination) + self.last_device_stream_id_by_dest[destination] = ( + device_stream_id + ) + return + + yield self._send_new_transaction( + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) + ) + finally: + # We want to be *very* sure we delete this after we stop processing + self.pending_transactions.pop(destination, None) @defer.inlineCallbacks def _get_new_device_messages(self, destination): @@ -240,8 +249,6 @@ class TransactionQueue(object): failures = [x.get_dict() for x in pending_failures] try: - self.pending_transactions[destination] = 1 - logger.debug("TX [%s] _attempt_new_transaction", destination) txn_id = str(self._next_txn_id) @@ -375,7 +382,3 @@ class TransactionQueue(object): for p in pdus: logger.info("Failed to send event %s to %s", p.event_id, destination) - - finally: - # We want to be *very* sure we delete this after we stop processing - self.pending_transactions.pop(destination, None) -- cgit 1.4.1 From 4598682b43dbe55339cfc869042456b74813159f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 13:12:53 +0100 Subject: Fix tightloop on sending transaction --- synapse/federation/transaction_queue.py | 256 +++++++++++++++++--------------- 1 file changed, 134 insertions(+), 122 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 6900b0121b..f8d3fffe95 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -209,11 +209,13 @@ class TransactionQueue(object): ) return - yield self._send_new_transaction( + success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, should_delete_from_device_stream=bool(device_message_edus) ) + if not success: + break finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -242,143 +244,153 @@ class TransactionQueue(object): pending_failures, device_stream_id, should_delete_from_device_stream): - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus - failures = [x.get_dict() for x in pending_failures] + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[1]) + pdus = [x[0] for x in pending_pdus] + edus = pending_edus + failures = [x.get_dict() for x in pending_failures] - try: - logger.debug("TX [%s] _attempt_new_transaction", destination) + success = True - txn_id = str(self._next_txn_id) + try: + logger.debug("TX [%s] _attempt_new_transaction", destination) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) + txn_id = str(self._next_txn_id) - logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, txn_id, - len(pdus), - len(edus), - len(failures) - ) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug( + "TX [%s] {%s} Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, txn_id, + len(pdus), + len(edus), + len(failures) + ) - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + logger.debug("TX [%s] Persisting transaction...", destination) - self._next_txn_id += 1 + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - yield self.transaction_actions.prepare_to_send(transaction) + self._next_txn_id += 1 - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", - destination, txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - len(failures), - ) + yield self.transaction_actions.prepare_to_send(transaction) - with limiter: - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) - except HttpResponseException as e: - code = e.code - response = e.response + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", + destination, txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + len(failures), + ) - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code + with limiter: + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb ) + code = 200 + + if response: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: + code = e.code + response = e.response - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - - yield self.transaction_actions.delivered( - transaction, code, response + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) - if code != 200: - for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination - ) - else: - # Remove the acknowledged device messages from the database - if should_delete_from_device_stream: - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) - self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - except RuntimeError as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + logger.debug("TX [%s] Marked as delivered", destination) + if code != 200: for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) + logger.info( + "Failed to send event %s to %s", p.event_id, destination + ) + success = False + else: + # Remove the acknowledged device messages from the database + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) + self.last_device_stream_id_by_dest[destination] = device_stream_id + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + success = False + except RuntimeError as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + success = False + + for p in pdus: + logger.info("Failed to send event %s to %s", p.event_id, destination) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + success = False + + for p in pdus: + logger.info("Failed to send event %s to %s", p.event_id, destination) + + defer.returnValue(success) -- cgit 1.4.1 From a6c67501666c0fefeae8edec2c5f7755a9d24fb8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 13:46:05 +0100 Subject: Check if destination is ready for retry earlier --- synapse/federation/transaction_queue.py | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f8d3fffe95..d9b8b3fc1d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -192,6 +192,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + limiter = yield get_retry_limiter( + destination, + self.clock, + self.store, + ) + device_message_edus, device_stream_id = ( yield self._get_new_device_messages(destination) ) @@ -212,10 +218,18 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream=bool(device_message_edus) + should_delete_from_device_stream=bool(device_message_edus), + limiter=limiter, ) if not success: break + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + success = False finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -242,7 +256,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, pending_failures, device_stream_id, - should_delete_from_device_stream): + should_delete_from_device_stream, limiter): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -257,12 +271,6 @@ class TransactionQueue(object): txn_id = str(self._next_txn_id) - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - ) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -359,13 +367,6 @@ class TransactionQueue(object): destination, device_stream_id ) self.last_device_stream_id_by_dest[destination] = device_stream_id - except NotRetryingDestination: - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - success = False except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. -- cgit 1.4.1 From ab80d5e0a968beb48140534b9ceab62b285b35c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Sep 2016 14:05:01 +0100 Subject: Drop replication log levels --- synapse/federation/transaction_queue.py | 1 - synapse/replication/resource.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse/federation/transaction_queue.py') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d9b8b3fc1d..1ac569b305 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -229,7 +229,6 @@ class TransactionQueue(object): "dropping transaction for now", destination, ) - success = False finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 1ed9034bcb..857bc9795c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -181,7 +181,7 @@ class ReplicationResource(Resource): def replicate(self, request_streams, limit): writer = _Writer() current_token = yield self.current_replication_token() - logger.info("Replicating up to %r", current_token) + logger.debug("Replicating up to %r", current_token) yield self.account_data(writer, current_token, limit, request_streams) yield self.events(writer, current_token, limit, request_streams) @@ -195,7 +195,7 @@ class ReplicationResource(Resource): yield self.to_device(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) - logger.info("Replicated %d rows", writer.total) + logger.debug("Replicated %d rows", writer.total) defer.returnValue(writer.finish()) def streams(self, writer, current_token, request_streams): -- cgit 1.4.1