summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.rst6
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/push/__init__.py27
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/events.py25
-rw-r--r--synapse/storage/push_rule.py21
-rw-r--r--synapse/storage/room.py3
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/storage/state.py25
-rw-r--r--synapse/util/lrucache.py8
11 files changed, 93 insertions, 36 deletions
diff --git a/CHANGES.rst b/CHANGES.rst
index 65970a89c7..e1420d7a35 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,9 @@
+Changes in synapse v0.9.0-r5 (2015-05-21)
+=========================================
+
+* Add more database caches to reduce amount of work done for each pusher. This
+  radically reduces CPU usage when multiple pushers are set up in the same room.
+
 Changes in synapse v0.9.0 (2015-05-07)
 ======================================
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 041e2151b0..68f86138a4 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.9.0-r4"
+__version__ = "0.9.0-r5"
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 401cc677d1..4bd027d9bb 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -536,9 +536,7 @@ class RoomListHandler(BaseHandler):
         chunk = yield self.store.get_rooms(is_public=True)
         results = yield defer.gatherResults(
             [
-                self.store.get_users_in_room(
-                    room_id=room["room_id"],
-                )
+                self.store.get_users_in_room(room["room_id"])
                 for room in chunk
             ],
             consumeErrors=True,
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 5575c847f9..e3dd4ce76d 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -84,25 +84,20 @@ class Pusher(object):
 
         rules = baserules.list_with_base_rules(rawrules, user)
 
+        room_id = ev['room_id']
+
         # get *our* member event for display name matching
-        member_events_for_room = yield self.store.get_current_state(
-            room_id=ev['room_id'],
+        my_display_name = None
+        our_member_event = yield self.store.get_current_state(
+            room_id=room_id,
             event_type='m.room.member',
-            state_key=None
+            state_key=self.user_name,
         )
-        my_display_name = None
-        room_member_count = 0
-        for mev in member_events_for_room:
-            if mev.content['membership'] != 'join':
-                continue
-
-            # This loop does two things:
-            # 1) Find our current display name
-            if mev.state_key == self.user_name and 'displayname' in mev.content:
-                my_display_name = mev.content['displayname']
+        if our_member_event:
+            my_display_name = our_member_event[0].content.get("displayname")
 
-            # and 2) Get the number of people in that room
-            room_member_count += 1
+        room_members = yield self.store.get_users_in_room(room_id)
+        room_member_count = len(room_members)
 
         for r in rules:
             if r['rule_id'] in enabled_map:
@@ -287,9 +282,11 @@ class Pusher(object):
             if len(actions) == 0:
                 logger.warn("Empty actions! Using default action.")
                 actions = Pusher.DEFAULT_ACTIONS
+
             if 'notify' not in actions and 'dont_notify' not in actions:
                 logger.warn("Neither notify nor dont_notify in actions: adding default")
                 actions.extend(Pusher.DEFAULT_ACTIONS)
+
             if 'dont_notify' in actions:
                 logger.debug(
                     "%s for %s: dont_notify",
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0f146998d9..39884c2afe 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -121,6 +121,11 @@ class Cache(object):
         self.sequence += 1
         self.cache.pop(keyargs, None)
 
+    def invalidate_all(self):
+        self.check_thread()
+        self.sequence += 1
+        self.cache.clear()
+
 
 def cached(max_entries=1000, num_args=1, lru=False):
     """ A method decorator that applies a memoizing cache around the function.
@@ -172,6 +177,7 @@ def cached(max_entries=1000, num_args=1, lru=False):
                 defer.returnValue(ret)
 
         wrapped.invalidate = cache.invalidate
+        wrapped.invalidate_all = cache.invalidate_all
         wrapped.prefill = cache.prefill
         return wrapped
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index b262035fac..d2a010bd88 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -125,6 +125,12 @@ class EventsStore(SQLBaseStore):
         # We purposefully do this first since if we include a `current_state`
         # key, we *want* to update the `current_state_events` table
         if current_state:
+            txn.call_after(self.get_current_state_for_key.invalidate_all)
+            txn.call_after(self.get_rooms_for_user.invalidate_all)
+            txn.call_after(self.get_users_in_room.invalidate, event.room_id)
+            txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
+            txn.call_after(self.get_room_name_and_aliases, event.room_id)
+
             self._simple_delete_txn(
                 txn,
                 table="current_state_events",
@@ -132,13 +138,6 @@ class EventsStore(SQLBaseStore):
             )
 
             for s in current_state:
-                if s.type == EventTypes.Member:
-                    txn.call_after(
-                        self.get_rooms_for_user.invalidate, s.state_key
-                    )
-                    txn.call_after(
-                        self.get_joined_hosts_for_room.invalidate, s.room_id
-                    )
                 self._simple_insert_txn(
                     txn,
                     "current_state_events",
@@ -356,6 +355,18 @@ class EventsStore(SQLBaseStore):
             )
 
             if is_new_state and not context.rejected:
+                txn.call_after(
+                    self.get_current_state_for_key.invalidate,
+                    event.room_id, event.type, event.state_key
+                )
+
+                if (event.type == EventTypes.Name
+                        or event.type == EventTypes.Aliases):
+                    txn.call_after(
+                        self.get_room_name_and_aliases.invalidate,
+                        event.room_id
+                    )
+
                 self._simple_upsert_txn(
                     txn,
                     "current_state_events",
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index e7988676ce..80d0ac4ea3 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -13,9 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import collections
-
-from ._base import SQLBaseStore, Table
+from ._base import SQLBaseStore, cached
 from twisted.internet import defer
 
 import logging
@@ -41,6 +39,7 @@ class PushRuleStore(SQLBaseStore):
 
         defer.returnValue(rows)
 
+    @cached()
     @defer.inlineCallbacks
     def get_push_rules_enabled_for_user(self, user_name):
         results = yield self._simple_select_list(
@@ -151,6 +150,10 @@ class PushRuleStore(SQLBaseStore):
 
             txn.execute(sql, (user_name, priority_class, new_rule_priority))
 
+        txn.call_after(
+            self.get_push_rules_enabled_for_user.invalidate, user_name
+        )
+
         self._simple_insert_txn(
             txn,
             table=PushRuleTable.table_name,
@@ -179,6 +182,10 @@ class PushRuleStore(SQLBaseStore):
         new_rule['priority_class'] = priority_class
         new_rule['priority'] = new_prio
 
+        txn.call_after(
+            self.get_push_rules_enabled_for_user.invalidate, user_name
+        )
+
         self._simple_insert_txn(
             txn,
             table=PushRuleTable.table_name,
@@ -201,6 +208,7 @@ class PushRuleStore(SQLBaseStore):
             {'user_name': user_name, 'rule_id': rule_id},
             desc="delete_push_rule",
         )
+        self.get_push_rules_enabled_for_user.invalidate(user_name)
 
     @defer.inlineCallbacks
     def set_push_rule_enabled(self, user_name, rule_id, enabled):
@@ -220,6 +228,7 @@ class PushRuleStore(SQLBaseStore):
             {'enabled': 1 if enabled else 0},
             {'id': new_id},
         )
+        self.get_push_rules_enabled_for_user.invalidate(user_name)
 
 
 class RuleNotFoundException(Exception):
@@ -230,7 +239,7 @@ class InconsistentRuleException(Exception):
     pass
 
 
-class PushRuleTable(Table):
+class PushRuleTable(object):
     table_name = "push_rules"
 
     fields = [
@@ -243,10 +252,8 @@ class PushRuleTable(Table):
         "actions",
     ]
 
-    EntryType = collections.namedtuple("PushRuleEntry", fields)
-
 
-class PushRuleEnableTable(Table):
+class PushRuleEnableTable(object):
     table_name = "push_rules_enable"
 
     fields = [
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index f956377632..4612a8aa83 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import StoreError
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
 
 import collections
 import logging
@@ -186,6 +186,7 @@ class RoomStore(SQLBaseStore):
                 }
             )
 
+    @cached()
     @defer.inlineCallbacks
     def get_room_name_and_aliases(self, room_id):
         def f(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 80717f6cde..d36a6c18a8 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -66,6 +66,7 @@ class RoomMemberStore(SQLBaseStore):
 
         txn.call_after(self.get_rooms_for_user.invalidate, target_user_id)
         txn.call_after(self.get_joined_hosts_for_room.invalidate, event.room_id)
+        txn.call_after(self.get_users_in_room.invalidate, event.room_id)
 
     def get_room_member(self, user_id, room_id):
         """Retrieve the current state of a room member.
@@ -87,6 +88,7 @@ class RoomMemberStore(SQLBaseStore):
             lambda events: events[0] if events else None
         )
 
+    @cached()
     def get_users_in_room(self, room_id):
         def f(txn):
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 56f0572f7e..b24de34f23 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
 
 from twisted.internet import defer
 
@@ -143,6 +143,12 @@ class StateStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def get_current_state(self, room_id, event_type=None, state_key=""):
+        if event_type and state_key is not None:
+            result = yield self.get_current_state_for_key(
+                room_id, event_type, state_key
+            )
+            defer.returnValue(result)
+
         def f(txn):
             sql = (
                 "SELECT event_id FROM current_state_events"
@@ -167,6 +173,23 @@ class StateStore(SQLBaseStore):
         events = yield self._get_events(event_ids, get_prev_content=False)
         defer.returnValue(events)
 
+    @cached(num_args=3)
+    @defer.inlineCallbacks
+    def get_current_state_for_key(self, room_id, event_type, state_key):
+        def f(txn):
+            sql = (
+                "SELECT event_id FROM current_state_events"
+                " WHERE room_id = ? AND type = ? AND state_key = ?"
+            )
+
+            args = (room_id, event_type, state_key)
+            txn.execute(sql, args)
+            results = txn.fetchall()
+            return [r[0] for r in results]
+        event_ids = yield self.runInteraction("get_current_state_for_key", f)
+        events = yield self._get_events(event_ids, get_prev_content=False)
+        defer.returnValue(events)
+
 
 def _make_group_id(clock):
     return str(int(clock.time_msec())) + random_string(5)
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index 96163c90f1..cacd7e45fa 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -20,7 +20,6 @@ import threading
 
 class LruCache(object):
     """Least-recently-used cache."""
-    # TODO(mjark) Add mutex for linked list for thread safety.
     def __init__(self, max_size):
         cache = {}
         list_root = []
@@ -106,6 +105,12 @@ class LruCache(object):
                 return default
 
         @synchronized
+        def cache_clear():
+            list_root[NEXT] = list_root
+            list_root[PREV] = list_root
+            cache.clear()
+
+        @synchronized
         def cache_len():
             return len(cache)
 
@@ -120,6 +125,7 @@ class LruCache(object):
         self.pop = cache_pop
         self.len = cache_len
         self.contains = cache_contains
+        self.clear = cache_clear
 
     def __getitem__(self, key):
         result = self.get(key, self.sentinel)