diff --git a/changelog.d/14466.bugfix b/changelog.d/14466.bugfix
new file mode 100644
index 0000000000..82f6e6b68e
--- /dev/null
+++ b/changelog.d/14466.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.70.0 where a receipt's thread ID was not sent over federation.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3ae5e8634c..5af2784f1e 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -35,7 +35,7 @@ from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import JsonDict, ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.visibility import filter_events_for_server
@@ -136,8 +136,11 @@ class PerDestinationQueue:
# destination
self._pending_presence: Dict[str, UserPresenceState] = {}
- # room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
+ # List of room_id -> receipt_type -> user_id -> receipt_dict,
+ #
+ # Each receipt can only have a single receipt per
+ # (room ID, receipt type, user ID, thread ID) tuple.
+ self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -202,17 +205,53 @@ class PerDestinationQueue:
Args:
receipt: receipt to be queued
"""
- self._pending_rrs.setdefault(receipt.room_id, {}).setdefault(
- receipt.receipt_type, {}
- )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data}
+ serialized_receipt: JsonDict = {
+ "event_ids": receipt.event_ids,
+ "data": receipt.data,
+ }
+ if receipt.thread_id is not None:
+ serialized_receipt["data"]["thread_id"] = receipt.thread_id
+
+ # Find which EDU to add this receipt to. There's three situations depending
+ # on the (room ID, receipt type, user, thread ID) tuple:
+ #
+ # 1. If it fully matches, clobber the information.
+ # 2. If it is missing, add the information.
+ # 3. If the subset tuple of (room ID, receipt type, user) matches, check
+ # the next EDU (or add a new EDU).
+ for edu in self._pending_receipt_edus:
+ receipt_content = edu.setdefault(receipt.room_id, {}).setdefault(
+ receipt.receipt_type, {}
+ )
+ # If this room ID, receipt type, user ID is not in this EDU, OR if
+ # the full tuple matches, use the current EDU.
+ if (
+ receipt.user_id not in receipt_content
+ or receipt_content[receipt.user_id].get("thread_id")
+ == receipt.thread_id
+ ):
+ receipt_content[receipt.user_id] = serialized_receipt
+ break
+
+ # If no matching EDU was found, create a new one.
+ else:
+ self._pending_receipt_edus.append(
+ {
+ receipt.room_id: {
+ receipt.receipt_type: {receipt.user_id: serialized_receipt}
+ }
+ }
+ )
def flush_read_receipts_for_room(self, room_id: str) -> None:
- # if we don't have any read-receipts for this room, it may be that we've already
- # sent them out, so we don't need to flush.
- if room_id not in self._pending_rrs:
- return
- self._rrs_pending_flush = True
- self.attempt_new_transaction()
+ # If there are any pending receipts for this room then force-flush them
+ # in a new transaction.
+ for edu in self._pending_receipt_edus:
+ if room_id in edu:
+ self._rrs_pending_flush = True
+ self.attempt_new_transaction()
+ # No use in checking remaining EDUs if the room was found.
+ break
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu
@@ -351,7 +390,7 @@ class PerDestinationQueue:
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
- self._pending_rrs = {}
+ self._pending_receipt_edus = []
self._start_catching_up()
except FederationDeniedError as e:
@@ -543,22 +582,27 @@ class PerDestinationQueue:
self._destination, last_successful_stream_ordering
)
- def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
- if not self._pending_rrs:
+ def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
+ if not self._pending_receipt_edus:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
- edu = Edu(
- origin=self._server_name,
- destination=self._destination,
- edu_type=EduTypes.RECEIPT,
- content=self._pending_rrs,
- )
- self._pending_rrs = {}
- self._rrs_pending_flush = False
- yield edu
+ # Send at most limit EDUs for receipts.
+ for content in self._pending_receipt_edus[:limit]:
+ yield Edu(
+ origin=self._server_name,
+ destination=self._destination,
+ edu_type=EduTypes.RECEIPT,
+ content=content,
+ )
+ self._pending_receipt_edus = self._pending_receipt_edus[limit:]
+
+ # If there are still pending read-receipts, don't reset the pending flush
+ # flag.
+ if not self._pending_receipt_edus:
+ self._rrs_pending_flush = False
def _pop_pending_edus(self, limit: int) -> List[Edu]:
pending_edus = self._pending_edus
@@ -645,27 +689,61 @@ class _TransactionQueueManager:
async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]:
# First we calculate the EDUs we want to send, if any.
- # We start by fetching device related EDUs, i.e device updates and to
- # device messages. We have to keep 2 free slots for presence and rr_edus.
- device_edu_limit = MAX_EDUS_PER_TRANSACTION - 2
+ # There's a maximum number of EDUs that can be sent with a transaction,
+ # generally device updates and to-device messages get priority, but we
+ # want to ensure that there's room for some other EDUs as well.
+ #
+ # This is done by:
+ #
+ # * Add a presence EDU, if one exists.
+ # * Add up-to a small limit of read receipt EDUs.
+ # * Add to-device EDUs, but leave some space for device list updates.
+ # * Add device list updates EDUs.
+ # * If there's any remaining room, add other EDUs.
+ pending_edus = []
+
+ # Add presence EDU.
+ if self.queue._pending_presence:
+ pending_edus.append(
+ Edu(
+ origin=self.queue._server_name,
+ destination=self.queue._destination,
+ edu_type=EduTypes.PRESENCE,
+ content={
+ "push": [
+ format_user_presence_state(
+ presence, self.queue._clock.time_msec()
+ )
+ for presence in self.queue._pending_presence.values()
+ ]
+ },
+ )
+ )
+ self.queue._pending_presence = {}
- # We prioritize to-device messages so that existing encryption channels
+ # Add read receipt EDUs.
+ pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
+ edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
+
+ # Next, prioritize to-device messages so that existing encryption channels
# work. We also keep a few slots spare (by reducing the limit) so that
# we can still trickle out some device list updates.
(
to_device_edus,
device_stream_id,
- ) = await self.queue._get_to_device_message_edus(device_edu_limit - 10)
+ ) = await self.queue._get_to_device_message_edus(edu_limit - 10)
if to_device_edus:
self._device_stream_id = device_stream_id
else:
self.queue._last_device_stream_id = device_stream_id
- device_edu_limit -= len(to_device_edus)
+ pending_edus.extend(to_device_edus)
+ edu_limit -= len(to_device_edus)
+ # Add device list update EDUs.
device_update_edus, dev_list_id = await self.queue._get_device_update_edus(
- device_edu_limit
+ edu_limit
)
if device_update_edus:
@@ -673,40 +751,17 @@ class _TransactionQueueManager:
else:
self.queue._last_device_list_stream_id = dev_list_id
- pending_edus = device_update_edus + to_device_edus
-
- # Now add the read receipt EDU.
- pending_edus.extend(self.queue._get_rr_edus(force_flush=False))
-
- # And presence EDU.
- if self.queue._pending_presence:
- pending_edus.append(
- Edu(
- origin=self.queue._server_name,
- destination=self.queue._destination,
- edu_type=EduTypes.PRESENCE,
- content={
- "push": [
- format_user_presence_state(
- presence, self.queue._clock.time_msec()
- )
- for presence in self.queue._pending_presence.values()
- ]
- },
- )
- )
- self.queue._pending_presence = {}
+ pending_edus.extend(device_update_edus)
+ edu_limit -= len(device_update_edus)
# Finally add any other types of EDUs if there is room.
- pending_edus.extend(
- self.queue._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
- )
- while (
- len(pending_edus) < MAX_EDUS_PER_TRANSACTION
- and self.queue._pending_edus_keyed
- ):
+ other_edus = self.queue._pop_pending_edus(edu_limit)
+ pending_edus.extend(other_edus)
+ edu_limit -= len(other_edus)
+ while edu_limit > 0 and self.queue._pending_edus_keyed:
_, val = self.queue._pending_edus_keyed.popitem()
pending_edus.append(val)
+ edu_limit -= 1
# Now we look for any PDUs to send, by getting up to 50 PDUs from the
# queue
@@ -717,8 +772,10 @@ class _TransactionQueueManager:
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
- if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
- pending_edus.extend(self.queue._get_rr_edus(force_flush=True))
+ if edu_limit:
+ pending_edus.extend(
+ self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
+ )
if self._pdus:
self._last_stream_ordering = self._pdus[
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index ac01582442..6a4fed1156 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -92,7 +92,6 @@ class ReceiptsHandler:
continue
# Check if these receipts apply to a thread.
- thread_id = None
data = user_values.get("data", {})
thread_id = data.get("thread_id")
# If the thread ID is invalid, consider it missing.
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index f1e357764f..01f147418b 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -84,6 +84,83 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
)
@override_config({"send_federation": True})
+ def test_send_receipts_thread(self):
+ mock_send_transaction = (
+ self.hs.get_federation_transport_client().send_transaction
+ )
+ mock_send_transaction.return_value = make_awaitable({})
+
+ # Create receipts for:
+ #
+ # * The same room / user on multiple threads.
+ # * A different user in the same room.
+ sender = self.hs.get_federation_sender()
+ for user, thread in (
+ ("alice", None),
+ ("alice", "thread"),
+ ("bob", None),
+ ("bob", "diff-thread"),
+ ):
+ receipt = ReadReceipt(
+ "room_id",
+ "m.read",
+ user,
+ ["event_id"],
+ thread_id=thread,
+ data={"ts": 1234},
+ )
+ self.successResultOf(
+ defer.ensureDeferred(sender.send_read_receipt(receipt))
+ )
+
+ self.pump()
+
+ # expect a call to send_transaction with two EDUs to separate threads.
+ mock_send_transaction.assert_called_once()
+ json_cb = mock_send_transaction.call_args[0][1]
+ data = json_cb()
+ # Note that the ordering of the EDUs doesn't matter.
+ self.assertCountEqual(
+ data["edus"],
+ [
+ {
+ "edu_type": EduTypes.RECEIPT,
+ "content": {
+ "room_id": {
+ "m.read": {
+ "alice": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234, "thread_id": "thread"},
+ },
+ "bob": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234, "thread_id": "diff-thread"},
+ },
+ }
+ }
+ },
+ },
+ {
+ "edu_type": EduTypes.RECEIPT,
+ "content": {
+ "room_id": {
+ "m.read": {
+ "alice": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234},
+ },
+ "bob": {
+ "event_ids": ["event_id"],
+ "data": {"ts": 1234},
+ },
+ }
+ }
+ },
+ },
+ ],
+ )
+
+ @override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""
|