diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3bde0ae0d4..e266cc2a20 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -51,10 +51,12 @@ from synapse.types import (
JsonMapping,
MultiWriterStreamToken,
PersistedPosition,
+ StrCollection,
)
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -550,6 +552,46 @@ class ReceiptsWorkerStore(SQLBaseStore):
return results
+ async def get_rooms_with_receipts_between(
+ self,
+ room_ids: StrCollection,
+ from_key: MultiWriterStreamToken,
+ to_key: MultiWriterStreamToken,
+ ) -> StrCollection:
+ """Given a set of room_ids, find out which ones (may) have receipts
+ between the two tokens (> `from_token` and <= `to_token`)."""
+
+ room_ids = self._receipts_stream_cache.get_entities_changed(
+ room_ids, from_key.stream
+ )
+ if not room_ids:
+ return []
+
+ def f(txn: LoggingTransaction, room_ids: StrCollection) -> StrCollection:
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", room_ids
+ )
+
+ sql = f"""
+ SELECT DISTINCT room_id FROM receipts_linearized
+ WHERE {clause} AND ? < stream_id AND stream_id <= ?
+ """
+ args.append(from_key.stream)
+ args.append(to_key.get_max_stream_pos())
+
+ txn.execute(sql, args)
+
+ return [room_id for room_id, in txn]
+
+ results: List[str] = []
+ for batch in batch_iter(room_ids, 1000):
+ batch_result = await self.db_pool.runInteraction(
+ "get_rooms_with_receipts_between", f, batch
+ )
+ results.extend(batch_result)
+
+ return results
+
async def get_users_sent_receipts_between(
self, last_id: int, current_id: int
) -> List[str]:
|