From 11ea16777f52264f0a1d6f4db5b7db5c6c147523 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 9 May 2019 12:01:41 +0200 Subject: Limit the number of EDUs in transactions to 100 as expected by receiver (#5138) Fixes #3951. --- synapse/federation/sender/per_destination_queue.py | 56 ++++++++++++---------- 1 file changed, 30 insertions(+), 26 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index be99211003..4d96f026c6 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -33,6 +33,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import UserPresenceState from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter +# This is defined in the Matrix spec and enforced by the receiver. +MAX_EDUS_PER_TRANSACTION = 100 + logger = logging.getLogger(__name__) @@ -197,7 +200,8 @@ class PerDestinationQueue(object): pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( - yield self._get_new_device_messages() + # We have to keep 2 free slots for presence and rr_edus + yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2) ) # BEGIN CRITICAL SECTION @@ -216,19 +220,9 @@ class PerDestinationQueue(object): pending_edus = [] - pending_edus.extend(self._get_rr_edus(force_flush=False)) - # We can only include at most 100 EDUs per transactions - pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus))) - - pending_edus.extend( - self._pending_edus_keyed.values() - ) - - self._pending_edus_keyed = {} - - pending_edus.extend(device_message_edus) - + # rr_edus and pending_presence take at most one slot each + pending_edus.extend(self._get_rr_edus(force_flush=False)) pending_presence = self._pending_presence self._pending_presence = {} if pending_presence: @@ -248,6 +242,12 @@ class PerDestinationQueue(object): ) ) + pending_edus.extend(device_message_edus) + pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))) + while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed: + _, val = self._pending_edus_keyed.popitem() + pending_edus.append(val) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", self._destination, len(pending_pdus)) @@ -259,7 +259,7 @@ class PerDestinationQueue(object): # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < 100: + if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: pending_edus.extend(self._get_rr_edus(force_flush=True)) # END CRITICAL SECTION @@ -346,33 +346,37 @@ class PerDestinationQueue(object): return pending_edus @defer.inlineCallbacks - def _get_new_device_messages(self): - last_device_stream_id = self._last_device_stream_id - to_device_stream_id = self._store.get_to_device_stream_token() - contents, stream_id = yield self._store.get_new_device_msgs_for_remote( - self._destination, last_device_stream_id, to_device_stream_id + def _get_new_device_messages(self, limit): + last_device_list = self._last_device_list_stream_id + # Will return at most 20 entries + now_stream_id, results = yield self._store.get_devices_by_remote( + self._destination, last_device_list ) edus = [ Edu( origin=self._server_name, destination=self._destination, - edu_type="m.direct_to_device", + edu_type="m.device_list_update", content=content, ) - for content in contents + for content in results ] - last_device_list = self._last_device_list_stream_id - now_stream_id, results = yield self._store.get_devices_by_remote( - self._destination, last_device_list + assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" + + last_device_stream_id = self._last_device_stream_id + to_device_stream_id = self._store.get_to_device_stream_token() + contents, stream_id = yield self._store.get_new_device_msgs_for_remote( + self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus) ) edus.extend( Edu( origin=self._server_name, destination=self._destination, - edu_type="m.device_list_update", + edu_type="m.direct_to_device", content=content, ) - for content in results + for content in contents ) + defer.returnValue((edus, stream_id, now_stream_id)) -- cgit 1.4.1