summary refs log tree commit diff
path: root/synapse/federation/sender/per_destination_queue.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-03-20 16:02:25 +0000
committerGitHub <noreply@github.com>2019-03-20 16:02:25 +0000
commita902d131804890ee6cc4137a669be92ceb2253c4 (patch)
tree439196e5d19779f54a5018c3c6a38024aa5fd7c6 /synapse/federation/sender/per_destination_queue.py
parentMerge pull request #4894 from matrix-org/erikj/postgres_tuning (diff)
downloadsynapse-a902d131804890ee6cc4137a669be92ceb2253c4.tar.xz
Batch up outgoing read-receipts to reduce federation traffic. (#4890)
Rate-limit outgoing read-receipts as per #4730.
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r--synapse/federation/sender/per_destination_queue.py64
1 files changed, 62 insertions, 2 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 385039add4..be99211003 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -80,6 +80,10 @@ class PerDestinationQueue(object):
         # destination
         self._pending_presence = {}   # type: dict[str, UserPresenceState]
 
+        # room_id -> receipt_type -> user_id -> receipt_dict
+        self._pending_rrs = {}
+        self._rrs_pending_flush = False
+
         # stream_id of last successfully sent to-device message.
         # NB: may be a long or an int.
         self._last_device_stream_id = 0
@@ -87,6 +91,9 @@ class PerDestinationQueue(object):
         # stream_id of last successfully sent device list update.
         self._last_device_list_stream_id = 0
 
+    def __str__(self):
+        return "PerDestinationQueue[%s]" % self._destination
+
     def pending_pdu_count(self):
         return len(self._pending_pdus)
 
@@ -118,6 +125,30 @@ class PerDestinationQueue(object):
         })
         self.attempt_new_transaction()
 
+    def queue_read_receipt(self, receipt):
+        """Add a RR to the list to be sent. Doesn't start the transmission loop yet
+        (see flush_read_receipts_for_room)
+
+        Args:
+            receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
+        """
+        self._pending_rrs.setdefault(
+            receipt.room_id, {},
+        ).setdefault(
+            receipt.receipt_type, {}
+        )[receipt.user_id] = {
+            "event_ids": receipt.event_ids,
+            "data": receipt.data,
+        }
+
+    def flush_read_receipts_for_room(self, room_id):
+        # if we don't have any read-receipts for this room, it may be that we've already
+        # sent them out, so we don't need to flush.
+        if room_id not in self._pending_rrs:
+            return
+        self._rrs_pending_flush = True
+        self.attempt_new_transaction()
+
     def send_keyed_edu(self, edu, key):
         self._pending_edus_keyed[(edu.edu_type, key)] = edu
         self.attempt_new_transaction()
@@ -183,10 +214,12 @@ class PerDestinationQueue(object):
                 # We can only include at most 50 PDUs per transactions
                 pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
 
-                pending_edus = self._pending_edus
+                pending_edus = []
+
+                pending_edus.extend(self._get_rr_edus(force_flush=False))
 
                 # We can only include at most 100 EDUs per transactions
-                pending_edus, self._pending_edus = pending_edus[:100], pending_edus[100:]
+                pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
 
                 pending_edus.extend(
                     self._pending_edus_keyed.values()
@@ -224,6 +257,11 @@ class PerDestinationQueue(object):
                     self._last_device_stream_id = device_stream_id
                     return
 
+                # if we've decided to send a transaction anyway, and we have room, we
+                # may as well send any pending RRs
+                if len(pending_edus) < 100:
+                    pending_edus.extend(self._get_rr_edus(force_flush=True))
+
                 # END CRITICAL SECTION
 
                 success = yield self._transaction_manager.send_new_transaction(
@@ -285,6 +323,28 @@ class PerDestinationQueue(object):
             # We want to be *very* sure we clear this after we stop processing
             self.transmission_loop_running = False
 
+    def _get_rr_edus(self, force_flush):
+        if not self._pending_rrs:
+            return
+        if not force_flush and not self._rrs_pending_flush:
+            # not yet time for this lot
+            return
+
+        edu = Edu(
+            origin=self._server_name,
+            destination=self._destination,
+            edu_type="m.receipt",
+            content=self._pending_rrs,
+        )
+        self._pending_rrs = {}
+        self._rrs_pending_flush = False
+        yield edu
+
+    def _pop_pending_edus(self, limit):
+        pending_edus = self._pending_edus
+        pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
+        return pending_edus
+
     @defer.inlineCallbacks
     def _get_new_device_messages(self):
         last_device_stream_id = self._last_device_stream_id