summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/sender/per_destination_queue.py56
1 files changed, 30 insertions, 26 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index be99211003..4d96f026c6 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -33,6 +33,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage import UserPresenceState
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
+# This is defined in the Matrix spec and enforced by the receiver.
+MAX_EDUS_PER_TRANSACTION = 100
+
 logger = logging.getLogger(__name__)
 
 
@@ -197,7 +200,8 @@ class PerDestinationQueue(object):
             pending_pdus = []
             while True:
                 device_message_edus, device_stream_id, dev_list_id = (
-                    yield self._get_new_device_messages()
+                    # We have to keep 2 free slots for presence and rr_edus
+                    yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
                 )
 
                 # BEGIN CRITICAL SECTION
@@ -216,19 +220,9 @@ class PerDestinationQueue(object):
 
                 pending_edus = []
 
-                pending_edus.extend(self._get_rr_edus(force_flush=False))
-
                 # We can only include at most 100 EDUs per transactions
-                pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
-
-                pending_edus.extend(
-                    self._pending_edus_keyed.values()
-                )
-
-                self._pending_edus_keyed = {}
-
-                pending_edus.extend(device_message_edus)
-
+                # rr_edus and pending_presence take at most one slot each
+                pending_edus.extend(self._get_rr_edus(force_flush=False))
                 pending_presence = self._pending_presence
                 self._pending_presence = {}
                 if pending_presence:
@@ -248,6 +242,12 @@ class PerDestinationQueue(object):
                         )
                     )
 
+                pending_edus.extend(device_message_edus)
+                pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)))
+                while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed:
+                    _, val = self._pending_edus_keyed.popitem()
+                    pending_edus.append(val)
+
                 if pending_pdus:
                     logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
                                  self._destination, len(pending_pdus))
@@ -259,7 +259,7 @@ class PerDestinationQueue(object):
 
                 # if we've decided to send a transaction anyway, and we have room, we
                 # may as well send any pending RRs
-                if len(pending_edus) < 100:
+                if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
                     pending_edus.extend(self._get_rr_edus(force_flush=True))
 
                 # END CRITICAL SECTION
@@ -346,33 +346,37 @@ class PerDestinationQueue(object):
         return pending_edus
 
     @defer.inlineCallbacks
-    def _get_new_device_messages(self):
-        last_device_stream_id = self._last_device_stream_id
-        to_device_stream_id = self._store.get_to_device_stream_token()
-        contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
-            self._destination, last_device_stream_id, to_device_stream_id
+    def _get_new_device_messages(self, limit):
+        last_device_list = self._last_device_list_stream_id
+        # Will return at most 20 entries
+        now_stream_id, results = yield self._store.get_devices_by_remote(
+            self._destination, last_device_list
         )
         edus = [
             Edu(
                 origin=self._server_name,
                 destination=self._destination,
-                edu_type="m.direct_to_device",
+                edu_type="m.device_list_update",
                 content=content,
             )
-            for content in contents
+            for content in results
         ]
 
-        last_device_list = self._last_device_list_stream_id
-        now_stream_id, results = yield self._store.get_devices_by_remote(
-            self._destination, last_device_list
+        assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
+
+        last_device_stream_id = self._last_device_stream_id
+        to_device_stream_id = self._store.get_to_device_stream_token()
+        contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+            self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus)
         )
         edus.extend(
             Edu(
                 origin=self._server_name,
                 destination=self._destination,
-                edu_type="m.device_list_update",
+                edu_type="m.direct_to_device",
                 content=content,
             )
-            for content in results
+            for content in contents
         )
+
         defer.returnValue((edus, stream_id, now_stream_id))