diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6b3a7abb9e..65c6673a87 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -100,6 +100,7 @@ class TransactionQueue(object):
self.pending_failures_by_dest = {}
self.last_device_stream_id_by_dest = {}
+ self.last_device_list_stream_id_by_dest = {}
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@@ -356,7 +357,7 @@ class TransactionQueue(object):
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures,
device_stream_id,
- should_delete_from_device_stream=bool(device_message_edus),
+ includes_device_messages=bool(device_message_edus),
limiter=limiter,
)
if not success:
@@ -373,6 +374,8 @@ class TransactionQueue(object):
@defer.inlineCallbacks
def _get_new_device_messages(self, destination):
+ # TODO: Send appropriate device list messages
+
last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
to_device_stream_id = self.store.get_to_device_stream_token()
contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
@@ -387,13 +390,27 @@ class TransactionQueue(object):
)
for content in contents
]
+
+ last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
+ now_stream_id, results = yield self.store.get_devices_by_remote(
+ destination, last_device_list
+ )
+ edus.extend(
+ Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type="m.device_list_update",
+ content=content,
+ )
+ for content in results
+ )
defer.returnValue((edus, stream_id))
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures, device_stream_id,
- should_delete_from_device_stream, limiter):
+ includes_device_messages, limiter):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@@ -506,7 +523,8 @@ class TransactionQueue(object):
success = False
else:
# Remove the acknowledged device messages from the database
- if should_delete_from_device_stream:
+ # Only bother if we actually sent some device messages
+ if includes_device_messages:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
|