summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py133
1 files changed, 76 insertions, 57 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6b3a7abb9e..d18f6b6cfd 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -100,6 +100,7 @@ class TransactionQueue(object):
         self.pending_failures_by_dest = {}
 
         self.last_device_stream_id_by_dest = {}
+        self.last_device_list_stream_id_by_dest = {}
 
         # HACK to get unique tx id
         self._next_txn_id = int(self.clock.time_msec())
@@ -305,62 +306,74 @@ class TransactionQueue(object):
             yield run_on_reactor()
 
             while True:
-                    pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
-                    pending_edus = self.pending_edus_by_dest.pop(destination, [])
-                    pending_presence = self.pending_presence_by_dest.pop(destination, {})
-                    pending_failures = self.pending_failures_by_dest.pop(destination, [])
+                pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
+                pending_edus = self.pending_edus_by_dest.pop(destination, [])
+                pending_presence = self.pending_presence_by_dest.pop(destination, {})
+                pending_failures = self.pending_failures_by_dest.pop(destination, [])
 
-                    pending_edus.extend(
-                        self.pending_edus_keyed_by_dest.pop(destination, {}).values()
-                    )
+                pending_edus.extend(
+                    self.pending_edus_keyed_by_dest.pop(destination, {}).values()
+                )
 
-                    limiter = yield get_retry_limiter(
-                        destination,
-                        self.clock,
-                        self.store,
-                    )
+                limiter = yield get_retry_limiter(
+                    destination,
+                    self.clock,
+                    self.store,
+                )
 
-                    device_message_edus, device_stream_id = (
-                        yield self._get_new_device_messages(destination)
-                    )
+                device_message_edus, device_stream_id, dev_list_id = (
+                    yield self._get_new_device_messages(destination)
+                )
 
-                    pending_edus.extend(device_message_edus)
-                    if pending_presence:
-                        pending_edus.append(
-                            Edu(
-                                origin=self.server_name,
-                                destination=destination,
-                                edu_type="m.presence",
-                                content={
-                                    "push": [
-                                        format_user_presence_state(
-                                            presence, self.clock.time_msec()
-                                        )
-                                        for presence in pending_presence.values()
-                                    ]
-                                },
-                            )
+                pending_edus.extend(device_message_edus)
+                if pending_presence:
+                    pending_edus.append(
+                        Edu(
+                            origin=self.server_name,
+                            destination=destination,
+                            edu_type="m.presence",
+                            content={
+                                "push": [
+                                    format_user_presence_state(
+                                        presence, self.clock.time_msec()
+                                    )
+                                    for presence in pending_presence.values()
+                                ]
+                            },
                         )
+                    )
+
+                if pending_pdus:
+                    logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                                 destination, len(pending_pdus))
 
-                    if pending_pdus:
-                        logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                                     destination, len(pending_pdus))
+                if not pending_pdus and not pending_edus and not pending_failures:
+                    logger.debug("TX [%s] Nothing to send", destination)
+                    self.last_device_stream_id_by_dest[destination] = (
+                        device_stream_id
+                    )
+                    return
 
-                    if not pending_pdus and not pending_edus and not pending_failures:
-                        logger.debug("TX [%s] Nothing to send", destination)
-                        self.last_device_stream_id_by_dest[destination] = (
-                            device_stream_id
+                success = yield self._send_new_transaction(
+                    destination, pending_pdus, pending_edus, pending_failures,
+                    limiter=limiter,
+                )
+                if success:
+                    # Remove the acknowledged device messages from the database
+                    # Only bother if we actually sent some device messages
+                    if device_message_edus:
+                        yield self.store.delete_device_msgs_for_remote(
+                            destination, device_stream_id
+                        )
+                        logger.info("Marking as sent %r %r", destination, dev_list_id)
+                        yield self.store.mark_as_sent_devices_by_remote(
+                            destination, dev_list_id
                         )
-                        return
 
-                    success = yield self._send_new_transaction(
-                        destination, pending_pdus, pending_edus, pending_failures,
-                        device_stream_id,
-                        should_delete_from_device_stream=bool(device_message_edus),
-                        limiter=limiter,
-                    )
-                    if not success:
-                        break
+                    self.last_device_stream_id_by_dest[destination] = device_stream_id
+                    self.last_device_list_stream_id_by_dest[destination] = dev_list_id
+                else:
+                    break
         except NotRetryingDestination:
             logger.debug(
                 "TX [%s] not ready for retry yet - "
@@ -387,13 +400,26 @@ class TransactionQueue(object):
             )
             for content in contents
         ]
-        defer.returnValue((edus, stream_id))
+
+        last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
+        now_stream_id, results = yield self.store.get_devices_by_remote(
+            destination, last_device_list
+        )
+        edus.extend(
+            Edu(
+                origin=self.server_name,
+                destination=destination,
+                edu_type="m.device_list_update",
+                content=content,
+            )
+            for content in results
+        )
+        defer.returnValue((edus, stream_id, now_stream_id))
 
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
-                              pending_failures, device_stream_id,
-                              should_delete_from_device_stream, limiter):
+                              pending_failures, limiter):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
@@ -504,13 +530,6 @@ class TransactionQueue(object):
                         "Failed to send event %s to %s", p.event_id, destination
                     )
                 success = False
-            else:
-                # Remove the acknowledged device messages from the database
-                if should_delete_from_device_stream:
-                    yield self.store.delete_device_msgs_for_remote(
-                        destination, device_stream_id
-                    )
-                self.last_device_stream_id_by_dest[destination] = device_stream_id
         except RuntimeError as e:
             # We capture this here as there as nothing actually listens
             # for this finishing functions deferred.