From d4a35ada28302e096efd42e1a2a28542ed7ebd6f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 6 Sep 2016 18:16:20 +0100 Subject: Send device messages over federation --- synapse/federation/federation_server.py | 2 +- synapse/federation/transaction_queue.py | 43 +++++++++++++++++++++++++++------ 2 files changed, 37 insertions(+), 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5621655098..3fa7b2315c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -188,7 +188,7 @@ class FederationServer(FederationBase): except SynapseError as e: logger.info("Failed to handle edu %r: %r", edu_type, e) except Exception as e: - logger.exception("Failed to handle edu %r", edu_type, e) + logger.exception("Failed to handle edu %r", edu_type) else: logger.warn("Received EDU of type %s with no handler", edu_type) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index cb2ef0210c..5e86141f86 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -17,7 +17,7 @@ from twisted.internet import defer from .persistence import TransactionActions -from .units import Transaction +from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor @@ -187,6 +187,24 @@ class TransactionQueue(object): destination, pending_pdus, pending_edus, pending_failures ) + @defer.inlineCallbacks + def _get_new_device_messages(self, destination): + last_device_stream_id = 0 + to_device_stream_id = self.store.get_to_device_stream_token() + contents, stream_id = yield self.store.get_new_device_msgs_for_remote( + destination, last_device_stream_id, to_device_stream_id + ) + edus = [ + Edu( + origin=self.server_name, + destination=destination, + edu_type="m.direct_to_device", + content=content, + ) + for content in contents + ] + defer.returnValue((edus, stream_id)) + @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, @@ -211,13 +229,19 @@ class TransactionQueue(object): self.store, ) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + edus.extend(device_message_edus) + logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", destination, txn_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures) + len(pdus), + len(edus), + len(failures) ) logger.debug("TX [%s] Persisting transaction...", destination) @@ -242,9 +266,9 @@ class TransactionQueue(object): " (PDUs: %d, EDUs: %d, failures: %d)", destination, txn_id, transaction.transaction_id, - len(pending_pdus), - len(pending_edus), - len(pending_failures), + len(pdus), + len(edus), + len(failures), ) with limiter: @@ -299,6 +323,11 @@ class TransactionQueue(object): logger.info( "Failed to send event %s to %s", p.event_id, destination ) + else: + # Remove the acknowledged device messages from the database + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " -- cgit 1.5.1 From 31a07d2335dd628afb32f71167849ad88685525a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:27:07 +0100 Subject: Add stream change caches for device messages --- synapse/federation/transaction_queue.py | 5 ++++- synapse/storage/__init__.py | 24 ++++++++++++++++++++++++ synapse/storage/deviceinbox.py | 25 +++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5e86141f86..233c6606a9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -81,6 +81,8 @@ class TransactionQueue(object): # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} + self.last_device_stream_id_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) @@ -189,7 +191,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def _get_new_device_messages(self, destination): - last_device_stream_id = 0 + last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0) to_device_stream_id = self.store.get_to_device_stream_token() contents, stream_id = yield self.store.get_new_device_msgs_for_remote( destination, last_device_stream_id, to_device_stream_id @@ -328,6 +330,7 @@ class TransactionQueue(object): yield self.store.delete_device_msgs_for_remote( destination, device_stream_id ) + self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( "TX [%s] not ready for retry yet - " diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6c32773f25..6965daddc5 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -182,6 +182,30 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + max_device_inbox_id = self._device_inbox_id_gen.get_current_token() + device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + db_conn, "device_inbox", + entity_column="user_id", + stream_column="stream_id", + max_value=max_device_inbox_id + ) + self._device_inbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_inbox_id, + prefilled_cache=device_inbox_prefill, + ) + # The federation outbox and the local device inbox uses the same + # stream_id generator. + device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + db_conn, "device_federation_outbox", + entity_column="destination", + stream_column="stream_id", + max_value=max_device_inbox_id, + ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceInboxStreamChangeCache", min_device_outbox_id, + prefilled_cache=device_outbox_prefill, + ) + cur = LoggingTransaction( db_conn.cursor(), name="_find_stream_orderings_for_times_txn", diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 61da0e89e6..0d37bb961b 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -70,6 +70,14 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) + for destination in remote_messages_by_destination.keys(): + self._device_federation_outbox_stream_cache.entity_has_changed( + destination, stream_id + ) defer.returnValue(self._device_inbox_id_gen.get_current_token()) @@ -115,6 +123,10 @@ class DeviceInboxStore(SQLBaseStore): now_ms, stream_id, ) + for user_id in local_messages_by_user_then_device.keys(): + self._device_inbox_stream_cache.entity_has_changed( + user_id, stream_id + ) def _add_messages_to_local_device_inbox_txn(self, txn, stream_id, messages_by_user_then_device): @@ -161,6 +173,12 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + has_changed = self._device_inbox_stream_cache.has_entity_changed( + user_id, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_device_txn(txn): sql = ( "SELECT stream_id, message_json FROM device_inbox" @@ -261,6 +279,13 @@ class DeviceInboxStore(SQLBaseStore): Deferred ([dict], int): List of messages for the device and where in the stream the messages got to. """ + + has_changed = self._device_federation_outbox_stream_cache.has_entity_changed( + destination, last_stream_id + ) + if not has_changed: + return defer.succeed(([], current_stream_id)) + def get_new_messages_for_remote_destination_txn(txn): sql = ( "SELECT stream_id, messages_json FROM device_federation_outbox" -- cgit 1.5.1 From cb98ac261bbda859574ec33cab934a3269e11e17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 15:39:13 +0100 Subject: Move the check for federated device_messages. Move the check into _attempt_new_transaction. Only delete messages if there were messages to delete. --- synapse/federation/transaction_queue.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 233c6606a9..c0ee946ac0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -177,6 +177,12 @@ class TransactionQueue(object): pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) + device_message_edus, device_stream_id = ( + yield self._get_new_device_messages(destination) + ) + + pending_edus.extend(device_message_edus) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", destination, len(pending_pdus)) @@ -186,7 +192,9 @@ class TransactionQueue(object): return yield self._send_new_transaction( - destination, pending_pdus, pending_edus, pending_failures + destination, pending_pdus, pending_edus, pending_failures, + device_stream_id, + should_delete_from_device_stream=bool(device_message_edus) ) @defer.inlineCallbacks @@ -210,7 +218,8 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures): + pending_failures, device_stream_id, + should_delete_from_device_stream): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -231,12 +240,6 @@ class TransactionQueue(object): self.store, ) - device_message_edus, device_stream_id = ( - yield self._get_new_device_messages(destination) - ) - - edus.extend(device_message_edus) - logger.debug( "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d, failures: %d)", @@ -327,9 +330,10 @@ class TransactionQueue(object): ) else: # Remove the acknowledged device messages from the database - yield self.store.delete_device_msgs_for_remote( - destination, device_stream_id - ) + if should_delete_from_device_stream: + yield self.store.delete_device_msgs_for_remote( + destination, device_stream_id + ) self.last_device_stream_id_by_dest[destination] = device_stream_id except NotRetryingDestination: logger.info( -- cgit 1.5.1 From 43954d000e19a622576063de0b48cf9235dec395 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 16:10:51 +0100 Subject: Add a new method to enqueue the device messages rather than sending a dummy EDU --- synapse/federation/federation_client.py | 6 ++++++ synapse/federation/transaction_queue.py | 11 +++++++++++ synapse/handlers/devicemessage.py | 10 +++------- 3 files changed, 20 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 627acc6a4f..78719eed25 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -137,6 +137,12 @@ class FederationClient(FederationBase): self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) + @log_function + def send_device_messages(self, destination): + """Sends the device messages in the local database to the remote + destination""" + self._transaction_queue.enqueue_device_messages(destination) + @log_function def send_failure(self, failure, destination): self._transaction_queue.enqueue_failure(failure, destination) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c0ee946ac0..633c79c352 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,6 +157,17 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) + def enqueue_device_messages(self, destination): + if destination == self.server_name or destination == "localhost": + return + + if not self.can_send_to(destination): + return + + preserve_context_over_fn( + self._attempt_new_transaction, destination + ) + @defer.inlineCallbacks def _attempt_new_transaction(self, destination): yield run_on_reactor() diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 7e59c0d487..c5368e5df2 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -112,10 +112,6 @@ class DeviceMessageHandler(object): ) for destination in remote_messages.keys(): - # Hack to send make synapse send a federation transaction - # to the remote servers. - self.federation.send_edu( - destination=destination, - edu_type="m.ping", - content={}, - ) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + self.federation.send_device_messages(destination) -- cgit 1.5.1