diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 385039add4..be99211003 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -80,6 +80,10 @@ class PerDestinationQueue(object):
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
+ # room_id -> receipt_type -> user_id -> receipt_dict
+ self._pending_rrs = {}
+ self._rrs_pending_flush = False
+
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self._last_device_stream_id = 0
@@ -87,6 +91,9 @@ class PerDestinationQueue(object):
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
+ def __str__(self):
+ return "PerDestinationQueue[%s]" % self._destination
+
def pending_pdu_count(self):
return len(self._pending_pdus)
@@ -118,6 +125,30 @@ class PerDestinationQueue(object):
})
self.attempt_new_transaction()
+ def queue_read_receipt(self, receipt):
+ """Add a RR to the list to be sent. Doesn't start the transmission loop yet
+ (see flush_read_receipts_for_room)
+
+ Args:
+ receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
+ """
+ self._pending_rrs.setdefault(
+ receipt.room_id, {},
+ ).setdefault(
+ receipt.receipt_type, {}
+ )[receipt.user_id] = {
+ "event_ids": receipt.event_ids,
+ "data": receipt.data,
+ }
+
+ def flush_read_receipts_for_room(self, room_id):
+ # if we don't have any read-receipts for this room, it may be that we've already
+ # sent them out, so we don't need to flush.
+ if room_id not in self._pending_rrs:
+ return
+ self._rrs_pending_flush = True
+ self.attempt_new_transaction()
+
def send_keyed_edu(self, edu, key):
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
@@ -183,10 +214,12 @@ class PerDestinationQueue(object):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
- pending_edus = self._pending_edus
+ pending_edus = []
+
+ pending_edus.extend(self._get_rr_edus(force_flush=False))
# We can only include at most 100 EDUs per transactions
- pending_edus, self._pending_edus = pending_edus[:100], pending_edus[100:]
+ pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
pending_edus.extend(
self._pending_edus_keyed.values()
@@ -224,6 +257,11 @@ class PerDestinationQueue(object):
self._last_device_stream_id = device_stream_id
return
+ # if we've decided to send a transaction anyway, and we have room, we
+ # may as well send any pending RRs
+ if len(pending_edus) < 100:
+ pending_edus.extend(self._get_rr_edus(force_flush=True))
+
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
@@ -285,6 +323,28 @@ class PerDestinationQueue(object):
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
+ def _get_rr_edus(self, force_flush):
+ if not self._pending_rrs:
+ return
+ if not force_flush and not self._rrs_pending_flush:
+ # not yet time for this lot
+ return
+
+ edu = Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type="m.receipt",
+ content=self._pending_rrs,
+ )
+ self._pending_rrs = {}
+ self._rrs_pending_flush = False
+ yield edu
+
+ def _pop_pending_edus(self, limit):
+ pending_edus = self._pending_edus
+ pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
+ return pending_edus
+
@defer.inlineCallbacks
def _get_new_device_messages(self):
last_device_stream_id = self._last_device_stream_id
|