diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index be99211003..4d96f026c6 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -33,6 +33,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import UserPresenceState
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
+# This is defined in the Matrix spec and enforced by the receiver.
+MAX_EDUS_PER_TRANSACTION = 100
+
logger = logging.getLogger(__name__)
@@ -197,7 +200,8 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
device_message_edus, device_stream_id, dev_list_id = (
- yield self._get_new_device_messages()
+ # We have to keep 2 free slots for presence and rr_edus
+ yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
)
# BEGIN CRITICAL SECTION
@@ -216,19 +220,9 @@ class PerDestinationQueue(object):
pending_edus = []
- pending_edus.extend(self._get_rr_edus(force_flush=False))
-
# We can only include at most 100 EDUs per transactions
- pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
-
- pending_edus.extend(
- self._pending_edus_keyed.values()
- )
-
- self._pending_edus_keyed = {}
-
- pending_edus.extend(device_message_edus)
-
+ # rr_edus and pending_presence take at most one slot each
+ pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
if pending_presence:
@@ -248,6 +242,12 @@ class PerDestinationQueue(object):
)
)
+ pending_edus.extend(device_message_edus)
+ pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)))
+ while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed:
+ _, val = self._pending_edus_keyed.popitem()
+ pending_edus.append(val)
+
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
self._destination, len(pending_pdus))
@@ -259,7 +259,7 @@ class PerDestinationQueue(object):
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
- if len(pending_edus) < 100:
+ if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
@@ -346,33 +346,37 @@ class PerDestinationQueue(object):
return pending_edus
@defer.inlineCallbacks
- def _get_new_device_messages(self):
- last_device_stream_id = self._last_device_stream_id
- to_device_stream_id = self._store.get_to_device_stream_token()
- contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
- self._destination, last_device_stream_id, to_device_stream_id
+ def _get_new_device_messages(self, limit):
+ last_device_list = self._last_device_list_stream_id
+ # Will return at most 20 entries
+ now_stream_id, results = yield self._store.get_devices_by_remote(
+ self._destination, last_device_list
)
edus = [
Edu(
origin=self._server_name,
destination=self._destination,
- edu_type="m.direct_to_device",
+ edu_type="m.device_list_update",
content=content,
)
- for content in contents
+ for content in results
]
- last_device_list = self._last_device_list_stream_id
- now_stream_id, results = yield self._store.get_devices_by_remote(
- self._destination, last_device_list
+ assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
+
+ last_device_stream_id = self._last_device_stream_id
+ to_device_stream_id = self._store.get_to_device_stream_token()
+ contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
+ self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus)
)
edus.extend(
Edu(
origin=self._server_name,
destination=self._destination,
- edu_type="m.device_list_update",
+ edu_type="m.direct_to_device",
content=content,
)
- for content in results
+ for content in contents
)
+
defer.returnValue((edus, stream_id, now_stream_id))
|