summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-01 10:31:09 +0100
committerErik Johnston <erik@matrix.org>2016-06-01 10:33:39 +0100
commit7b4f8c527c8a04444dae4dc76fee4a96023113c0 (patch)
tree87c1868a76f1be28f501c02bb229476803a78f4d
parentMerge branch 'erikj/push_rules_cache' of github.com:matrix-org/synapse into e... (diff)
downloadsynapse-7b4f8c527c8a04444dae4dc76fee4a96023113c0.tar.xz
Add get_users_with_read_receipts_in_room cache
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py9
-rw-r--r--synapse/storage/receipts.py28
-rw-r--r--synapse/util/caches/descriptors.py9
3 files changed, 41 insertions, 5 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bdffc3c90f..23fde7508e 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -91,18 +91,17 @@ def evaluator_for_event(event, hs, store):
 
     with log_duration("get_users_in_room"):
         all_in_room = yield store.get_users_in_room(room_id)
-    with log_duration("all_in_room"):
         all_in_room = set(all_in_room)
 
     with log_duration("get_receipts_for_room"):
-        receipts = yield store.get_receipts_for_room(room_id, "m.read")
+        users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
 
     # any users with pushers must be ours: they have pushers
     with log_duration("get_mine_pushers"):
         user_ids = set(users_with_pushers)
-        for r in receipts:
-            if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room:
-                user_ids.add(r['user_id'])
+        for uid in users_with_receipts:
+            if hs.is_mine_id(uid) and uid in all_in_room:
+                user_ids.add(uid)
 
     # if this event is an invite event, we may need to run rules for the user
     # who's been invited, otherwise they won't get told they've been invited
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..ba806f1b03 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -34,6 +34,26 @@ class ReceiptsStore(SQLBaseStore):
             "ReceiptsRoomChangeCache", self._receipts_id_gen.get_current_token()
         )
 
+    @cachedInlineCallbacks()
+    def get_users_with_read_receipts_in_room(self, room_id):
+        receipts = yield self.get_receipts_for_room(room_id, "m.read")
+        defer.returnValue(set(r['user_id'] for r in receipts))
+
+    def _invalidate_get_users_with_receipts_in_room(self, room_id, receipt_type,
+                                                    user_id):
+        if receipt_type != "m.read":
+            return
+
+        # XXX: ObservableDeferred?!
+        res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
+
+        if res and res.called and user_id in res.result:
+            # We'd only be adding to the set, so no point invalidating if the
+            # user is already there
+            return
+
+        self.get_users_with_read_receipts_in_room.invalidate((room_id,))
+
     @cached(num_args=2)
     def get_receipts_for_room(self, room_id, receipt_type):
         return self._simple_select_list(
@@ -229,6 +249,10 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
         txn.call_after(
+            self._invalidate_get_users_with_receipts_in_room,
+            room_id, receipt_type, user_id,
+        )
+        txn.call_after(
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
@@ -374,6 +398,10 @@ class ReceiptsStore(SQLBaseStore):
             self.get_receipts_for_room.invalidate, (room_id, receipt_type)
         )
         txn.call_after(
+            self._invalidate_get_users_with_receipts_in_room,
+            room_id, receipt_type, user_id,
+        )
+        txn.call_after(
             self.get_receipts_for_user.invalidate, (user_id, receipt_type)
         )
         # FIXME: This shouldn't invalidate the whole cache
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 758f5982b0..47201d327f 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -84,6 +84,9 @@ class Cache(object):
         else:
             return default
 
+    def has(self, key):
+        return key in self.cache
+
     def update(self, sequence, key, value):
         self.check_thread()
         if self.sequence == sequence:
@@ -124,6 +127,12 @@ class Cache(object):
         self.sequence += 1
         self.cache.clear()
 
+    def __contains__(self, key):
+        return self.has(key)
+
+    def __getitem__(self, key):
+        return self.get(key)
+
 
 class CacheDescriptor(object):
     """ A method decorator that applies a memoizing cache around the function.