diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 7ab12b850f..ed12342f40 100644
--- a/synapse/replication/slave/storage/receipts.py
+++ b/synapse/replication/slave/storage/receipts.py
@@ -49,7 +49,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore):
def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
- self.get_linearized_receipts_for_room.invalidate_many((room_id,))
+ self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 3738901ea4..0ac665e967 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -140,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
"""
room_ids = set(room_ids)
- if from_key:
+ if from_key is not None:
+ # Only ask the database about rooms where there have been new
+ # receipts added since `from_key`
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
@@ -151,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
defer.returnValue([ev for res in results.values() for ev in res])
- @cachedInlineCallbacks(num_args=3, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.
@@ -162,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
from the start.
Returns:
- list: A list of receipts.
+ Deferred[list]: A list of receipts.
+ """
+ if from_key is not None:
+ # Check the cache first to see if any new receipts have been added
+ # since`from_key`. If not we can no-op.
+ if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
+ defer.succeed([])
+
+ return self._get_linearized_receipts_for_room(room_id, to_key, from_key)
+
+ @cachedInlineCallbacks(num_args=3, tree=True)
+ def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
+ """See get_linearized_receipts_for_room
"""
def f(txn):
if from_key:
@@ -211,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
"content": content,
}])
- @cachedList(cached_method_name="get_linearized_receipts_for_room",
+ @cachedList(cached_method_name="_get_linearized_receipts_for_room",
list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
@@ -373,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(
self._receipts_stream_cache.entity_has_changed,
@@ -493,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
+ txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))
self._simple_delete_txn(
txn,
|