summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-07-12 09:59:58 +0100
committerErik Johnston <erik@matrix.org>2018-07-12 09:59:58 +0100
commit7c79f2cb7234b259ed2654da3c19fba7015d45ba (patch)
treee3c87fdcf0d31c6ac24316ca8fd54a5abe0472a4
parentMerge branch 'develop' into matrix-org-hotfixes (diff)
parentMerge pull request #3505 from matrix-org/erikj/receipts_cahce (diff)
downloadsynapse-7c79f2cb7234b259ed2654da3c19fba7015d45ba.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
-rw-r--r--changelog.d/3498.misc0
-rw-r--r--changelog.d/3505.feature1
-rw-r--r--synapse/replication/slave/storage/receipts.py2
-rw-r--r--synapse/storage/receipts.py25
-rw-r--r--tests/server.py2
5 files changed, 22 insertions, 8 deletions
diff --git a/changelog.d/3498.misc b/changelog.d/3498.misc
new file mode 100644

index 0000000000..e69de29bb2 --- /dev/null +++ b/changelog.d/3498.misc
diff --git a/changelog.d/3505.feature b/changelog.d/3505.feature new file mode 100644
index 0000000000..ca1867f529 --- /dev/null +++ b/changelog.d/3505.feature
@@ -0,0 +1 @@ +Reduce database consumption when processing large numbers of receipts diff --git a/synapse/replication/slave/storage/receipts.py b/synapse/replication/slave/storage/receipts.py
index 7ab12b850f..ed12342f40 100644 --- a/synapse/replication/slave/storage/receipts.py +++ b/synapse/replication/slave/storage/receipts.py
@@ -49,7 +49,7 @@ class SlavedReceiptsStore(ReceiptsWorkerStore, BaseSlavedStore): def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id): self.get_receipts_for_user.invalidate((user_id, receipt_type)) - self.get_linearized_receipts_for_room.invalidate_many((room_id,)) + self._get_linearized_receipts_for_room.invalidate_many((room_id,)) self.get_last_receipt_event_id_for_user.invalidate( (user_id, room_id, receipt_type) ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 3738901ea4..0ac665e967 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py
@@ -140,7 +140,9 @@ class ReceiptsWorkerStore(SQLBaseStore): """ room_ids = set(room_ids) - if from_key: + if from_key is not None: + # Only ask the database about rooms where there have been new + # receipts added since `from_key` room_ids = yield self._receipts_stream_cache.get_entities_changed( room_ids, from_key ) @@ -151,7 +153,6 @@ class ReceiptsWorkerStore(SQLBaseStore): defer.returnValue([ev for res in results.values() for ev in res]) - @cachedInlineCallbacks(num_args=3, tree=True) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -162,7 +163,19 @@ class ReceiptsWorkerStore(SQLBaseStore): from the start. Returns: - list: A list of receipts. + Deferred[list]: A list of receipts. + """ + if from_key is not None: + # Check the cache first to see if any new receipts have been added + # since`from_key`. If not we can no-op. + if not self._receipts_stream_cache.has_entity_changed(room_id, from_key): + defer.succeed([]) + + return self._get_linearized_receipts_for_room(room_id, to_key, from_key) + + @cachedInlineCallbacks(num_args=3, tree=True) + def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): + """See get_linearized_receipts_for_room """ def f(txn): if from_key: @@ -211,7 +224,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "content": content, }]) - @cachedList(cached_method_name="get_linearized_receipts_for_room", + @cachedList(cached_method_name="_get_linearized_receipts_for_room", list_name="room_ids", num_args=3, inlineCallbacks=True) def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): if not room_ids: @@ -373,7 +386,7 @@ class ReceiptsStore(ReceiptsWorkerStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) + txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,)) txn.call_after( self._receipts_stream_cache.entity_has_changed, @@ -493,7 +506,7 @@ class ReceiptsStore(ReceiptsWorkerStore): self.get_receipts_for_user.invalidate, (user_id, receipt_type) ) # FIXME: This shouldn't invalidate the whole cache - txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,)) + txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,)) self._simple_delete_txn( txn, diff --git a/tests/server.py b/tests/server.py
index 46223ccf05..e93f5a7f94 100644 --- a/tests/server.py +++ b/tests/server.py
@@ -22,7 +22,7 @@ class FakeChannel(object): wire). """ - result = attr.ib(factory=dict) + result = attr.ib(default=attr.Factory(dict)) @property def json_body(self):