summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-01-25 14:27:27 +0000
committerErik Johnston <erik@matrix.org>2017-01-25 14:27:27 +0000
commit2367c5568c01bc65aacc955b76ba707918b37f1e (patch)
tree0f07606420d5f64ba3e186ce69876fc6301082a3 /synapse/federation/transaction_queue.py
parentMerge branch 'erikj/current_state_fix' into develop (diff)
downloadsynapse-2367c5568c01bc65aacc955b76ba707918b37f1e.tar.xz
Add basic implementation of local device list changes
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py24
1 files changed, 21 insertions, 3 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6b3a7abb9e..65c6673a87 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -100,6 +100,7 @@ class TransactionQueue(object):
         self.pending_failures_by_dest = {}
 
         self.last_device_stream_id_by_dest = {}
+        self.last_device_list_stream_id_by_dest = {}
 
         # HACK to get unique tx id
         self._next_txn_id = int(self.clock.time_msec())
@@ -356,7 +357,7 @@ class TransactionQueue(object):
                     success = yield self._send_new_transaction(
                         destination, pending_pdus, pending_edus, pending_failures,
                         device_stream_id,
-                        should_delete_from_device_stream=bool(device_message_edus),
+                        includes_device_messages=bool(device_message_edus),
                         limiter=limiter,
                     )
                     if not success:
@@ -373,6 +374,8 @@ class TransactionQueue(object):
 
     @defer.inlineCallbacks
     def _get_new_device_messages(self, destination):
+        # TODO: Send appropriate device list messages
+
         last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
         to_device_stream_id = self.store.get_to_device_stream_token()
         contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
@@ -387,13 +390,27 @@ class TransactionQueue(object):
             )
             for content in contents
         ]
+
+        last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
+        now_stream_id, results = yield self.store.get_devices_by_remote(
+            destination, last_device_list
+        )
+        edus.extend(
+            Edu(
+                origin=self.server_name,
+                destination=destination,
+                edu_type="m.device_list_update",
+                content=content,
+            )
+            for content in results
+        )
         defer.returnValue((edus, stream_id))
 
     @measure_func("_send_new_transaction")
     @defer.inlineCallbacks
     def _send_new_transaction(self, destination, pending_pdus, pending_edus,
                               pending_failures, device_stream_id,
-                              should_delete_from_device_stream, limiter):
+                              includes_device_messages, limiter):
 
         # Sort based on the order field
         pending_pdus.sort(key=lambda t: t[1])
@@ -506,7 +523,8 @@ class TransactionQueue(object):
                 success = False
             else:
                 # Remove the acknowledged device messages from the database
-                if should_delete_from_device_stream:
+                # Only bother if we actually sent some device messages
+                if includes_device_messages:
                     yield self.store.delete_device_msgs_for_remote(
                         destination, device_stream_id
                     )