diff --git a/changelog.d/17592.misc b/changelog.d/17592.misc
new file mode 100644
index 0000000000..1b4a53ee17
--- /dev/null
+++ b/changelog.d/17592.misc
@@ -0,0 +1 @@
+Correctly track read receipts that should be sent down in experimental sliding sync.
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 14d0ecbe16..af8d7ab96c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -3088,38 +3088,17 @@ class SlidingSyncHandler:
# from that room but we only want to include receipts for events
# in the timeline to avoid bloating and blowing up the sync response
# as the number of users in the room increases. (this behavior is part of the spec)
- initial_rooms = {
- room_id
+ initial_rooms_and_event_ids = [
+ (room_id, event.event_id)
for room_id in initial_rooms
if room_id in actual_room_response_map
- }
- if initial_rooms:
- initial_receipts = await self.store.get_linearized_receipts_for_rooms(
- room_ids=initial_rooms,
- to_key=to_token.receipt_key,
+ for event in actual_room_response_map[room_id].timeline_events
+ ]
+ if initial_rooms_and_event_ids:
+ initial_receipts = await self.store.get_linearized_receipts_for_events(
+ room_and_event_ids=initial_rooms_and_event_ids,
)
-
- for receipt in initial_receipts:
- relevant_event_ids = {
- event.event_id
- for event in actual_room_response_map[
- receipt["room_id"]
- ].timeline_events
- }
-
- content = {
- event_id: content_value
- for event_id, content_value in receipt["content"].items()
- if event_id in relevant_event_ids
- }
- if content:
- fetched_receipts.append(
- {
- "type": receipt["type"],
- "room_id": receipt["room_id"],
- "content": content,
- }
- )
+ fetched_receipts.extend(initial_receipts)
fetched_receipts = ReceiptEventSource.filter_out_private_receipts(
fetched_receipts, sync_config.user.to_string()
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index e266cc2a20..0a20f5db4c 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -43,6 +43,7 @@ from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
+ make_tuple_in_list_sql_clause,
)
from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import MultiWriterIdGenerator
@@ -481,6 +482,83 @@ class ReceiptsWorkerStore(SQLBaseStore):
}
return results
+ async def get_linearized_receipts_for_events(
+ self,
+ room_and_event_ids: Collection[Tuple[str, str]],
+ ) -> Sequence[JsonMapping]:
+ """Get all receipts for the given set of events.
+
+ Arguments:
+ room_and_event_ids: A collection of 2-tuples of room ID and
+ event IDs to fetch receipts for
+
+ Returns:
+ A list of receipts, one per room.
+ """
+
+ def get_linearized_receipts_for_events_txn(
+ txn: LoggingTransaction,
+ room_id_event_id_tuples: Collection[Tuple[str, str]],
+ ) -> List[Tuple[str, str, str, str, Optional[str], str]]:
+ clause, args = make_tuple_in_list_sql_clause(
+ self.database_engine, ("room_id", "event_id"), room_id_event_id_tuples
+ )
+
+ sql = f"""
+ SELECT room_id, receipt_type, user_id, event_id, thread_id, data
+ FROM receipts_linearized
+ WHERE {clause}
+ """
+
+ txn.execute(sql, args)
+
+ return txn.fetchall()
+
+ # room_id -> event_id -> receipt_type -> user_id -> receipt data
+ room_to_content: Dict[str, Dict[str, Dict[str, Dict[str, JsonMapping]]]] = {}
+ for batch in batch_iter(room_and_event_ids, 1000):
+ batch_results = await self.db_pool.runInteraction(
+ "get_linearized_receipts_for_events",
+ get_linearized_receipts_for_events_txn,
+ batch,
+ )
+
+ for (
+ room_id,
+ receipt_type,
+ user_id,
+ event_id,
+ thread_id,
+ data,
+ ) in batch_results:
+ content = room_to_content.setdefault(room_id, {})
+ user_receipts = content.setdefault(event_id, {}).setdefault(
+ receipt_type, {}
+ )
+
+ receipt_data = db_to_json(data)
+ if thread_id is not None:
+ receipt_data["thread_id"] = thread_id
+
+ # MSC4102: always replace threaded receipts with unthreaded ones
+ # if there is a clash. Specifically:
+ # - if there is no existing receipt, great, set the data.
+ # - if there is an existing receipt, is it threaded (thread_id
+ # present)? YES: replace if this receipt has no thread id.
+ # NO: do not replace. This means we will drop some receipts, but
+ # MSC4102 is designed to drop semantically meaningless receipts,
+ # so this is okay. Previously, we would drop meaningful data!
+ if user_id in user_receipts:
+ if "thread_id" in user_receipts[user_id] and not thread_id:
+ user_receipts[user_id] = receipt_data
+ else:
+ user_receipts[user_id] = receipt_data
+
+ return [
+ {"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}
+ for room_id, content in room_to_content.items()
+ ]
+
@cached(
num_args=2,
)
@@ -996,6 +1074,12 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
self.RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME,
self._background_receipts_graph_unique_index,
)
+ self.db_pool.updates.register_background_index_update(
+ update_name="receipts_room_id_event_id_index",
+ index_name="receipts_linearized_event_id",
+ table="receipts_linearized",
+ columns=("room_id", "event_id"),
+ )
async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int
diff --git a/synapse/storage/schema/main/delta/86/02_receipts_event_id_index.sql b/synapse/storage/schema/main/delta/86/02_receipts_event_id_index.sql
new file mode 100644
index 0000000000..e6db91e5b5
--- /dev/null
+++ b/synapse/storage/schema/main/delta/86/02_receipts_event_id_index.sql
@@ -0,0 +1,15 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8602, 'receipts_room_id_event_id_index', '{}');
|