summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/sender/per_destination_queue.py183
1 files changed, 120 insertions, 63 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3ae5e8634c..5af2784f1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger
 from synapse.logging.opentracing import SynapseTags, set_tag
 from synapse.metrics import sent_transactions_counter
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import JsonDict, ReadReceipt
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 from synapse.visibility import filter_events_for_server
 
@@ -136,8 +136,11 @@ class PerDestinationQueue:
         # destination
         self._pending_presence: Dict[str, UserPresenceState] = {}
 
-        # room_id -> receipt_type -> user_id -> receipt_dict
-        self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
+        # List of room_id -> receipt_type -> user_id -> receipt_dict,
+        #
+        # Each receipt can only have a single receipt per
+        # (room ID, receipt type, user ID, thread ID) tuple.
+        self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
         self._rrs_pending_flush = False
 
         # stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ class PerDestinationQueue:
         Args:
             receipt: receipt to be queued
         """
-        self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
-            receipt.receipt_type, {}
-        )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
+        serialized_receipt: JsonDict = {
+            "event_ids": receipt.event_ids,
+            "data": receipt.data,
+        }
+        if receipt.thread_id is not None:
+            serialized_receipt["data"]["thread_id"] = receipt.thread_id
+
+        # Find which EDU to add this receipt to. There's three situations depending
+        # on the (room ID, receipt type, user, thread ID) tuple:
+        #
+        # 1. If it fully matches, clobber the information.
+        # 2. If it is missing, add the information.
+        # 3. If the subset tuple of (room ID, receipt type, user) matches, check
+        #    the next EDU (or add a new EDU).
+        for edu in self._pending_receipt_edus:
+            receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
+                receipt.receipt_type, {}
+            )
+            # If this room ID, receipt type, user ID is not in this EDU, OR if
+            # the full tuple matches, use the current EDU.
+            if (
+                receipt.user_id not in receipt_content
+                or receipt_content[receipt.user_id].get("thread_id")
+                == receipt.thread_id
+            ):
+                receipt_content[receipt.user_id] = serialized_receipt
+                break
+
+        # If no matching EDU was found, create a new one.
+        else:
+            self._pending_receipt_edus.append(
+                {
+                    receipt.room_id: {
+                        receipt.receipt_type: {receipt.user_id: serialized_receipt}
+                    }
+                }
+            )
 
     def flush_read_receipts_for_room(self, room_id: str) -> None:
-        # if we don't have any read-receipts for this room, it may be that we've already
-        # sent them out, so we don't need to flush.
-        if room_id not in self._pending_rrs:
-            return
-        self._rrs_pending_flush = True
-        self.attempt_new_transaction()
+        # If there are any pending receipts for this room then force-flush them
+        # in a new transaction.
+        for edu in self._pending_receipt_edus:
+            if room_id in edu:
+                self._rrs_pending_flush = True
+                self.attempt_new_transaction()
+                # No use in checking remaining EDUs if the room was found.
+                break
 
     def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
         self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -351,7 +390,7 @@ class PerDestinationQueue:
                 self._pending_edus = []
                 self._pending_edus_keyed = {}
                 self._pending_presence = {}
-                self._pending_rrs = {}
+                self._pending_receipt_edus = []
 
                 self._start_catching_up()
         except FederationDeniedError as e:
@@ -543,22 +582,27 @@ class PerDestinationQueue:
                     self._destination, last_successful_stream_ordering
                 )
 
-    def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
-        if not self._pending_rrs:
+    def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+        if not self._pending_receipt_edus:
             return
         if not force_flush and not self._rrs_pending_flush:
             # not yet time for this lot
             return
 
-        edu = Edu(
-            origin=self._server_name,
-            destination=self._destination,
-            edu_type=EduTypes.RECEIPT,
-            content=self._pending_rrs,
-        )
-        self._pending_rrs = {}
-        self._rrs_pending_flush = False
-        yield edu
+        # Send at most limit EDUs for receipts.
+        for content in self._pending_receipt_edus[:limit]:
+            yield Edu(
+                origin=self._server_name,
+                destination=self._destination,
+                edu_type=EduTypes.RECEIPT,
+                content=content,
+            )
+        self._pending_receipt_edus = self._pending_receipt_edus[limit:]
+
+        # If there are still pending read-receipts, don't reset the pending flush
+        # flag.
+        if not self._pending_receipt_edus:
+            self._rrs_pending_flush = False
 
     def _pop_pending_edus(self, limit: int) -> List[Edu]:
         pending_edus = self._pending_edus
@@ -645,27 +689,61 @@ class _TransactionQueueManager:
     async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
         # First we calculate the EDUs we want to send, if any.
 
-        # We start by fetching device related EDUs, i.e device updates and to
-        # device messages. We have to keep 2 free slots for presence and rr_edus.
-        device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
+        # There's a maximum number of EDUs that can be sent with a transaction,
+        # generally device updates and to-device messages get priority, but we
+        # want to ensure that there's room for some other EDUs as well.
+        #
+        # This is done by:
+        #
+        # * Add a presence EDU, if one exists.
+        # * Add up-to a small limit of read receipt EDUs.
+        # * Add to-device EDUs, but leave some space for device list updates.
+        # * Add device list updates EDUs.
+        # * If there's any remaining room, add other EDUs.
+        pending_edus = []
+
+        # Add presence EDU.
+        if self.queue._pending_presence:
+            pending_edus.append(
+                Edu(
+                    origin=self.queue._server_name,
+                    destination=self.queue._destination,
+                    edu_type=EduTypes.PRESENCE,
+                    content={
+                        "push": [
+                            format_user_presence_state(
+                                presence, self.queue._clock.time_msec()
+                            )
+                            for presence in self.queue._pending_presence.values()
+                        ]
+                    },
+                )
+            )
+            self.queue._pending_presence = {}
 
-        # We prioritize to-device messages so that existing encryption channels
+        # Add read receipt EDUs.
+        pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+        edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
+
+        # Next, prioritize to-device messages so that existing encryption channels
         # work. We also keep a few slots spare (by reducing the limit) so that
         # we can still trickle out some device list updates.
         (
             to_device_edus,
             device_stream_id,
-        ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
+        ) = await self.queue._get_to_device_message_edus(edu_limit - 10)
 
         if to_device_edus:
             self._device_stream_id = device_stream_id
         else:
             self.queue._last_device_stream_id = device_stream_id
 
-        device_edu_limit -= len(to_device_edus)
+        pending_edus.extend(to_device_edus)
+        edu_limit -= len(to_device_edus)
 
+        # Add device list update EDUs.
         device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
-            device_edu_limit
+            edu_limit
         )
 
         if device_update_edus:
@@ -673,40 +751,17 @@ class _TransactionQueueManager:
         else:
             self.queue._last_device_list_stream_id = dev_list_id
 
-        pending_edus = device_update_edus + to_device_edus
-
-        # Now add the read receipt EDU.
-        pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
-
-        # And presence EDU.
-        if self.queue._pending_presence:
-            pending_edus.append(
-                Edu(
-                    origin=self.queue._server_name,
-                    destination=self.queue._destination,
-                    edu_type=EduTypes.PRESENCE,
-                    content={
-                        "push": [
-                            format_user_presence_state(
-                                presence, self.queue._clock.time_msec()
-                            )
-                            for presence in self.queue._pending_presence.values()
-                        ]
-                    },
-                )
-            )
-            self.queue._pending_presence = {}
+        pending_edus.extend(device_update_edus)
+        edu_limit -= len(device_update_edus)
 
         # Finally add any other types of EDUs if there is room.
-        pending_edus.extend(
-            self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
-        )
-        while (
-            len(pending_edus) < MAX_EDUS_PER_TRANSACTION
-            and self.queue._pending_edus_keyed
-        ):
+        other_edus = self.queue._pop_pending_edus(edu_limit)
+        pending_edus.extend(other_edus)
+        edu_limit -= len(other_edus)
+        while edu_limit > 0 and self.queue._pending_edus_keyed:
             _, val = self.queue._pending_edus_keyed.popitem()
             pending_edus.append(val)
+            edu_limit -= 1
 
         # Now we look for any PDUs to send, by getting up to 50 PDUs from the
         # queue
@@ -717,8 +772,10 @@ class _TransactionQueueManager:
 
         # if we've decided to send a transaction anyway, and we have room, we
         # may as well send any pending RRs
-        if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
-            pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
+        if edu_limit:
+            pending_edus.extend(
+                self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
+            )
 
         if self._pdus:
             self._last_stream_ordering = self._pdus[