summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py23
-rw-r--r--synapse/storage/event_push_actions.py19
-rw-r--r--synapse/storage/push_rule.py29
-rw-r--r--synapse/util/caches/descriptors.py30
-rw-r--r--synapse/util/caches/lrucache.py53
-rw-r--r--synapse/util/caches/treecache.py60
-rw-r--r--tests/util/test_lrucache.py20
-rw-r--r--tests/util/test_treecache.py66
8 files changed, 280 insertions, 20 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index b91c165e2b..20c60422bf 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -36,6 +36,7 @@ def decode_rule_json(rule):
 @defer.inlineCallbacks
 def _get_rules(room_id, user_ids, store):
     rules_by_user = yield store.bulk_get_push_rules(user_ids)
+    rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
 
     rules_by_user = {
         uid: baserules.list_with_base_rules([
@@ -44,6 +45,26 @@ def _get_rules(room_id, user_ids, store):
         ])
         for uid in user_ids
     }
+
+    # We apply the rules-enabled map here: bulk_get_push_rules doesn't
+    # fetch disabled rules, but this won't account for any server default
+    # rules the user has disabled, so we need to do this too.
+    for uid in user_ids:
+        if uid not in rules_enabled_by_user:
+            continue
+
+        user_enabled_map = rules_enabled_by_user[uid]
+
+        for i, rule in enumerate(rules_by_user[uid]):
+            rule_id = rule['rule_id']
+
+            if rule_id in user_enabled_map:
+                if rule.get('enabled', True) != bool(user_enabled_map[rule_id]):
+                    # Rules are cached across users.
+                    rule = dict(rule)
+                    rule['enabled'] = bool(user_enabled_map[rule_id])
+                    rules_by_user[uid][i] = rule
+
     defer.returnValue(rules_by_user)
 
 
@@ -119,7 +140,7 @@ class BulkPushRuleEvaluator:
                 )
                 if matches:
                     actions = [x for x in rule['actions'] if x != 'dont_notify']
-                    if actions:
+                    if actions and 'notify' in actions:
                         actions_by_user[uid] = actions
                     break
         defer.returnValue(actions_by_user)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index aa61cf5569..a05c4f84cf 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -40,14 +40,20 @@ class EventPushActionsStore(SQLBaseStore):
                 'actions': json.dumps(actions)
             })
 
+        def f(txn):
+            for uid, _, __ in tuples:
+                txn.call_after(
+                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                    (event.room_id, uid)
+                )
+            return self._simple_insert_many_txn(txn, "event_push_actions", values)
+
         yield self.runInteraction(
             "set_actions_for_event_and_users",
-            self._simple_insert_many_txn,
-            "event_push_actions",
-            values
+            f,
         )
 
-    @cachedInlineCallbacks(num_args=3)
+    @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
     def get_unread_event_push_actions_by_room_for_user(
             self, room_id, user_id, last_read_event_id
     ):
@@ -98,6 +104,11 @@ class EventPushActionsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def remove_push_actions_for_event_id(self, room_id, event_id):
         def f(txn):
+            # Sad that we have to blow away the cache for the whole room here
+            txn.call_after(
+                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                (room_id,)
+            )
             txn.execute(
                 "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
                 (room_id, event_id)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 2adfefd994..35ec7e8cef 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -94,6 +94,35 @@ class PushRuleStore(SQLBaseStore):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
+    def bulk_get_push_rules_enabled(self, user_ids):
+        if not user_ids:
+            defer.returnValue({})
+
+        batch_size = 100
+
+        def f(txn, user_ids_to_fetch):
+            sql = (
+                "SELECT user_name, rule_id, enabled"
+                " FROM push_rules_enable"
+                " WHERE user_name"
+                " IN (" + ",".join("?" for _ in user_ids_to_fetch) + ")"
+            )
+            txn.execute(sql, user_ids_to_fetch)
+            return self.cursor_to_dict(txn)
+
+        results = {}
+
+        chunks = [user_ids[i:i+batch_size] for i in xrange(0, len(user_ids), batch_size)]
+        for batch_user_ids in chunks:
+            rows = yield self.runInteraction(
+                "bulk_get_push_rules_enabled", f, batch_user_ids
+            )
+
+            for row in rows:
+                results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
+        defer.returnValue(results)
+
+    @defer.inlineCallbacks
     def add_push_rule(self, before, after, **kwargs):
         vals = kwargs
         if 'conditions' in vals:
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 0033051849..88e56e3302 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,6 +17,7 @@ import logging
 from synapse.util.async import ObservableDeferred
 from synapse.util import unwrapFirstError
 from synapse.util.caches.lrucache import LruCache
+from synapse.util.caches.treecache import TreeCache
 
 from . import caches_by_name, DEBUG_CACHES, cache_counter
 
@@ -36,9 +37,12 @@ _CacheSentinel = object()
 
 class Cache(object):
 
-    def __init__(self, name, max_entries=1000, keylen=1, lru=True):
+    def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
         if lru:
-            self.cache = LruCache(max_size=max_entries)
+            cache_type = TreeCache if tree else dict
+            self.cache = LruCache(
+                max_size=max_entries, keylen=keylen, cache_type=cache_type
+            )
             self.max_entries = None
         else:
             self.cache = OrderedDict()
@@ -99,6 +103,15 @@ class Cache(object):
         self.sequence += 1
         self.cache.pop(key, None)
 
+    def invalidate_many(self, key):
+        self.check_thread()
+        if not isinstance(key, tuple):
+            raise TypeError(
+                "The cache key must be a tuple not %r" % (type(key),)
+            )
+        self.sequence += 1
+        self.cache.del_multi(key)
+
     def invalidate_all(self):
         self.check_thread()
         self.sequence += 1
@@ -122,7 +135,7 @@ class CacheDescriptor(object):
     which can be used to insert values into the cache specifically, without
     calling the calculation function.
     """
-    def __init__(self, orig, max_entries=1000, num_args=1, lru=True,
+    def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
                  inlineCallbacks=False):
         self.orig = orig
 
@@ -134,6 +147,7 @@ class CacheDescriptor(object):
         self.max_entries = max_entries
         self.num_args = num_args
         self.lru = lru
+        self.tree = tree
 
         self.arg_names = inspect.getargspec(orig).args[1:num_args+1]
 
@@ -149,6 +163,7 @@ class CacheDescriptor(object):
             max_entries=self.max_entries,
             keylen=self.num_args,
             lru=self.lru,
+            tree=self.tree,
         )
 
     def __get__(self, obj, objtype=None):
@@ -200,6 +215,7 @@ class CacheDescriptor(object):
 
         wrapped.invalidate = self.cache.invalidate
         wrapped.invalidate_all = self.cache.invalidate_all
+        wrapped.invalidate_many = self.cache.invalidate_many
         wrapped.prefill = self.cache.prefill
 
         obj.__dict__[self.orig.__name__] = wrapped
@@ -321,21 +337,23 @@ class CacheListDescriptor(object):
         return wrapped
 
 
-def cached(max_entries=1000, num_args=1, lru=True):
+def cached(max_entries=1000, num_args=1, lru=True, tree=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
-        lru=lru
+        lru=lru,
+        tree=tree,
     )
 
 
-def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False):
+def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
         lru=lru,
+        tree=tree,
         inlineCallbacks=True,
     )
 
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 0122b0bb3f..e6a66dc041 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -17,11 +17,27 @@
 from functools import wraps
 import threading
 
+from synapse.util.caches.treecache import TreeCache
+
+
+def enumerate_leaves(node, depth):
+    if depth == 0:
+        yield node
+    else:
+        for n in node.values():
+            for m in enumerate_leaves(n, depth - 1):
+                yield m
+
 
 class LruCache(object):
-    """Least-recently-used cache."""
-    def __init__(self, max_size):
-        cache = {}
+    """
+    Least-recently-used cache.
+    Supports del_multi only if cache_type=TreeCache
+    If cache_type=TreeCache, all keys must be tuples.
+    """
+    def __init__(self, max_size, keylen=1, cache_type=dict):
+        cache = cache_type()
+        self.size = 0
         list_root = []
         list_root[:] = [list_root, list_root, None, None]
 
@@ -44,6 +60,7 @@ class LruCache(object):
             prev_node[NEXT] = node
             next_node[PREV] = node
             cache[key] = node
+            self.size += 1
 
         def move_node_to_front(node):
             prev_node = node[PREV]
@@ -62,7 +79,7 @@ class LruCache(object):
             next_node = node[NEXT]
             prev_node[NEXT] = next_node
             next_node[PREV] = prev_node
-            cache.pop(node[KEY], None)
+            self.size -= 1
 
         @synchronized
         def cache_get(key, default=None):
@@ -81,8 +98,10 @@ class LruCache(object):
                 node[VALUE] = value
             else:
                 add_node(key, value)
-                if len(cache) > max_size:
-                    delete_node(list_root[PREV])
+                if self.size > max_size:
+                    todelete = list_root[PREV]
+                    delete_node(todelete)
+                    cache.pop(todelete[KEY], None)
 
         @synchronized
         def cache_set_default(key, value):
@@ -91,8 +110,10 @@ class LruCache(object):
                 return node[VALUE]
             else:
                 add_node(key, value)
-                if len(cache) > max_size:
-                    delete_node(list_root[PREV])
+                if self.size > max_size:
+                    todelete = list_root[PREV]
+                    delete_node(todelete)
+                    cache.pop(todelete[KEY], None)
                 return value
 
         @synchronized
@@ -100,11 +121,23 @@ class LruCache(object):
             node = cache.get(key, None)
             if node:
                 delete_node(node)
+                cache.pop(node[KEY], None)
                 return node[VALUE]
             else:
                 return default
 
         @synchronized
+        def cache_del_multi(key):
+            """
+            This will only work if constructed with cache_type=TreeCache
+            """
+            popped = cache.pop(key)
+            if popped is None:
+                return
+            for leaf in enumerate_leaves(popped, keylen - len(key)):
+                delete_node(leaf)
+
+        @synchronized
         def cache_clear():
             list_root[NEXT] = list_root
             list_root[PREV] = list_root
@@ -112,7 +145,7 @@ class LruCache(object):
 
         @synchronized
         def cache_len():
-            return len(cache)
+            return self.size
 
         @synchronized
         def cache_contains(key):
@@ -123,6 +156,8 @@ class LruCache(object):
         self.set = cache_set
         self.setdefault = cache_set_default
         self.pop = cache_pop
+        if cache_type is TreeCache:
+            self.del_multi = cache_del_multi
         self.len = cache_len
         self.contains = cache_contains
         self.clear = cache_clear
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
new file mode 100644
index 0000000000..3b58860910
--- /dev/null
+++ b/synapse/util/caches/treecache.py
@@ -0,0 +1,60 @@
+SENTINEL = object()
+
+
+class TreeCache(object):
+    """
+    Tree-based backing store for LruCache. Allows subtrees of data to be deleted
+    efficiently.
+    Keys must be tuples.
+    """
+    def __init__(self):
+        self.root = {}
+
+    def __setitem__(self, key, value):
+        return self.set(key, value)
+
+    def __contains__(self, key):
+        return self.get(key, SENTINEL) is not SENTINEL
+
+    def set(self, key, value):
+        node = self.root
+        for k in key[:-1]:
+            node = node.setdefault(k, {})
+        node[key[-1]] = value
+
+    def get(self, key, default=None):
+        node = self.root
+        for k in key[:-1]:
+            node = node.get(k, None)
+            if node is None:
+                return default
+        return node.get(key[-1], default)
+
+    def clear(self):
+        self.root = {}
+
+    def pop(self, key, default=None):
+        nodes = []
+
+        node = self.root
+        for k in key[:-1]:
+            node = node.get(k, None)
+            nodes.append(node)  # don't add the root node
+            if node is None:
+                return default
+        popped = node.pop(key[-1], SENTINEL)
+        if popped is SENTINEL:
+            return default
+
+        node_and_keys = zip(nodes, key)
+        node_and_keys.reverse()
+        node_and_keys.append((self.root, None))
+
+        for i in range(len(node_and_keys) - 1):
+            n, k = node_and_keys[i]
+
+            if n:
+                break
+            node_and_keys[i+1][0].pop(k)
+
+        return popped
diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py
index fbbc5eed15..2cd3d26454 100644
--- a/tests/util/test_lrucache.py
+++ b/tests/util/test_lrucache.py
@@ -17,6 +17,7 @@
 from .. import unittest
 
 from synapse.util.caches.lrucache import LruCache
+from synapse.util.caches.treecache import TreeCache
 
 class LruCacheTestCase(unittest.TestCase):
 
@@ -52,3 +53,22 @@ class LruCacheTestCase(unittest.TestCase):
         cache["key"] = 1
         self.assertEquals(cache.pop("key"), 1)
         self.assertEquals(cache.pop("key"), None)
+
+    def test_del_multi(self):
+        cache = LruCache(4, 2, cache_type=TreeCache)
+        cache[("animal", "cat")] = "mew"
+        cache[("animal", "dog")] = "woof"
+        cache[("vehicles", "car")] = "vroom"
+        cache[("vehicles", "train")] = "chuff"
+
+        self.assertEquals(len(cache), 4)
+
+        self.assertEquals(cache.get(("animal", "cat")), "mew")
+        self.assertEquals(cache.get(("vehicles", "car")), "vroom")
+        cache.del_multi(("animal",))
+        self.assertEquals(len(cache), 2)
+        self.assertEquals(cache.get(("animal", "cat")), None)
+        self.assertEquals(cache.get(("animal", "dog")), None)
+        self.assertEquals(cache.get(("vehicles", "car")), "vroom")
+        self.assertEquals(cache.get(("vehicles", "train")), "chuff")
+        # Man from del_multi say "Yes".
diff --git a/tests/util/test_treecache.py b/tests/util/test_treecache.py
new file mode 100644
index 0000000000..9946ceb3f1
--- /dev/null
+++ b/tests/util/test_treecache.py
@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015, 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from .. import unittest
+
+from synapse.util.caches.treecache import TreeCache
+
+class TreeCacheTestCase(unittest.TestCase):
+    def test_get_set_onelevel(self):
+        cache = TreeCache()
+        cache[("a",)] = "A"
+        cache[("b",)] = "B"
+        self.assertEquals(cache.get(("a",)), "A")
+        self.assertEquals(cache.get(("b",)), "B")
+
+    def test_pop_onelevel(self):
+        cache = TreeCache()
+        cache[("a",)] = "A"
+        cache[("b",)] = "B"
+        self.assertEquals(cache.pop(("a",)), "A")
+        self.assertEquals(cache.pop(("a",)), None)
+        self.assertEquals(cache.get(("b",)), "B")
+
+    def test_get_set_twolevel(self):
+        cache = TreeCache()
+        cache[("a", "a")] = "AA"
+        cache[("a", "b")] = "AB"
+        cache[("b", "a")] = "BA"
+        self.assertEquals(cache.get(("a", "a")), "AA")
+        self.assertEquals(cache.get(("a", "b")), "AB")
+        self.assertEquals(cache.get(("b", "a")), "BA")
+
+    def test_pop_twolevel(self):
+        cache = TreeCache()
+        cache[("a", "a")] = "AA"
+        cache[("a", "b")] = "AB"
+        cache[("b", "a")] = "BA"
+        self.assertEquals(cache.pop(("a", "a")), "AA")
+        self.assertEquals(cache.get(("a", "a")), None)
+        self.assertEquals(cache.get(("a", "b")), "AB")
+        self.assertEquals(cache.pop(("b", "a")), "BA")
+        self.assertEquals(cache.pop(("b", "a")), None)
+
+    def test_pop_mixedlevel(self):
+        cache = TreeCache()
+        cache[("a", "a")] = "AA"
+        cache[("a", "b")] = "AB"
+        cache[("b", "a")] = "BA"
+        self.assertEquals(cache.get(("a", "a")), "AA")
+        cache.pop(("a",))
+        self.assertEquals(cache.get(("a", "a")), None)
+        self.assertEquals(cache.get(("a", "b")), None)
+        self.assertEquals(cache.get(("b", "a")), "BA")