diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 183 |
1 files changed, 120 insertions, 63 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3ae5e8634c..5af2784f1e 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger from synapse.logging.opentracing import SynapseTags, set_tag from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import ReadReceipt +from synapse.types import JsonDict, ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.visibility import filter_events_for_server @@ -136,8 +136,11 @@ class PerDestinationQueue: # destination self._pending_presence: Dict[str, UserPresenceState] = {} - # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} + # List of room_id -> receipt_type -> user_id -> receipt_dict, + # + # Each receipt can only have a single receipt per + # (room ID, receipt type, user ID, thread ID) tuple. + self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -202,17 +205,53 @@ class PerDestinationQueue: Args: receipt: receipt to be queued """ - self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( - receipt.receipt_type, {} - )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} + serialized_receipt: JsonDict = { + "event_ids": receipt.event_ids, + "data": receipt.data, + } + if receipt.thread_id is not None: + serialized_receipt["data"]["thread_id"] = receipt.thread_id + + # Find which EDU to add this receipt to. There's three situations depending + # on the (room ID, receipt type, user, thread ID) tuple: + # + # 1. If it fully matches, clobber the information. + # 2. If it is missing, add the information. + # 3. If the subset tuple of (room ID, receipt type, user) matches, check + # the next EDU (or add a new EDU). + for edu in self._pending_receipt_edus: + receipt_content = edu.setdefault(receipt.room_id, {}).setdefault( + receipt.receipt_type, {} + ) + # If this room ID, receipt type, user ID is not in this EDU, OR if + # the full tuple matches, use the current EDU. + if ( + receipt.user_id not in receipt_content + or receipt_content[receipt.user_id].get("thread_id") + == receipt.thread_id + ): + receipt_content[receipt.user_id] = serialized_receipt + break + + # If no matching EDU was found, create a new one. + else: + self._pending_receipt_edus.append( + { + receipt.room_id: { + receipt.receipt_type: {receipt.user_id: serialized_receipt} + } + } + ) def flush_read_receipts_for_room(self, room_id: str) -> None: - # if we don't have any read-receipts for this room, it may be that we've already - # sent them out, so we don't need to flush. - if room_id not in self._pending_rrs: - return - self._rrs_pending_flush = True - self.attempt_new_transaction() + # If there are any pending receipts for this room then force-flush them + # in a new transaction. + for edu in self._pending_receipt_edus: + if room_id in edu: + self._rrs_pending_flush = True + self.attempt_new_transaction() + # No use in checking remaining EDUs if the room was found. + break def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu @@ -351,7 +390,7 @@ class PerDestinationQueue: self._pending_edus = [] self._pending_edus_keyed = {} self._pending_presence = {} - self._pending_rrs = {} + self._pending_receipt_edus = [] self._start_catching_up() except FederationDeniedError as e: @@ -543,22 +582,27 @@ class PerDestinationQueue: self._destination, last_successful_stream_ordering ) - def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: - if not self._pending_rrs: + def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: + if not self._pending_receipt_edus: return if not force_flush and not self._rrs_pending_flush: # not yet time for this lot return - edu = Edu( - origin=self._server_name, - destination=self._destination, - edu_type=EduTypes.RECEIPT, - content=self._pending_rrs, - ) - self._pending_rrs = {} - self._rrs_pending_flush = False - yield edu + # Send at most limit EDUs for receipts. + for content in self._pending_receipt_edus[:limit]: + yield Edu( + origin=self._server_name, + destination=self._destination, + edu_type=EduTypes.RECEIPT, + content=content, + ) + self._pending_receipt_edus = self._pending_receipt_edus[limit:] + + # If there are still pending read-receipts, don't reset the pending flush + # flag. + if not self._pending_receipt_edus: + self._rrs_pending_flush = False def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus @@ -645,27 +689,61 @@ class _TransactionQueueManager: async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. - # We start by fetching device related EDUs, i.e device updates and to - # device messages. We have to keep 2 free slots for presence and rr_edus. - device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2 + # There's a maximum number of EDUs that can be sent with a transaction, + # generally device updates and to-device messages get priority, but we + # want to ensure that there's room for some other EDUs as well. + # + # This is done by: + # + # * Add a presence EDU, if one exists. + # * Add up-to a small limit of read receipt EDUs. + # * Add to-device EDUs, but leave some space for device list updates. + # * Add device list updates EDUs. + # * If there's any remaining room, add other EDUs. + pending_edus = [] + + # Add presence EDU. + if self.queue._pending_presence: + pending_edus.append( + Edu( + origin=self.queue._server_name, + destination=self.queue._destination, + edu_type=EduTypes.PRESENCE, + content={ + "push": [ + format_user_presence_state( + presence, self.queue._clock.time_msec() + ) + for presence in self.queue._pending_presence.values() + ] + }, + ) + ) + self.queue._pending_presence = {} - # We prioritize to-device messages so that existing encryption channels + # Add read receipt EDUs. + pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) + edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) + + # Next, prioritize to-device messages so that existing encryption channels # work. We also keep a few slots spare (by reducing the limit) so that # we can still trickle out some device list updates. ( to_device_edus, device_stream_id, - ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10) + ) = await self.queue._get_to_device_message_edus(edu_limit - 10) if to_device_edus: self._device_stream_id = device_stream_id else: self.queue._last_device_stream_id = device_stream_id - device_edu_limit -= len(to_device_edus) + pending_edus.extend(to_device_edus) + edu_limit -= len(to_device_edus) + # Add device list update EDUs. device_update_edus, dev_list_id = await self.queue._get_device_update_edus( - device_edu_limit + edu_limit ) if device_update_edus: @@ -673,40 +751,17 @@ class _TransactionQueueManager: else: self.queue._last_device_list_stream_id = dev_list_id - pending_edus = device_update_edus + to_device_edus - - # Now add the read receipt EDU. - pending_edus.extend(self.queue._get_rr_edus(force_flush=False)) - - # And presence EDU. - if self.queue._pending_presence: - pending_edus.append( - Edu( - origin=self.queue._server_name, - destination=self.queue._destination, - edu_type=EduTypes.PRESENCE, - content={ - "push": [ - format_user_presence_state( - presence, self.queue._clock.time_msec() - ) - for presence in self.queue._pending_presence.values() - ] - }, - ) - ) - self.queue._pending_presence = {} + pending_edus.extend(device_update_edus) + edu_limit -= len(device_update_edus) # Finally add any other types of EDUs if there is room. - pending_edus.extend( - self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus)) - ) - while ( - len(pending_edus) < MAX_EDUS_PER_TRANSACTION - and self.queue._pending_edus_keyed - ): + other_edus = self.queue._pop_pending_edus(edu_limit) + pending_edus.extend(other_edus) + edu_limit -= len(other_edus) + while edu_limit > 0 and self.queue._pending_edus_keyed: _, val = self.queue._pending_edus_keyed.popitem() pending_edus.append(val) + edu_limit -= 1 # Now we look for any PDUs to send, by getting up to 50 PDUs from the # queue @@ -717,8 +772,10 @@ class _TransactionQueueManager: # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: - pending_edus.extend(self.queue._get_rr_edus(force_flush=True)) + if edu_limit: + pending_edus.extend( + self.queue._get_receipt_edus(force_flush=True, limit=edu_limit) + ) if self._pdus: self._last_stream_ordering = self._pdus[ |