summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-08-04 09:32:23 +0100
committerErik Johnston <erik@matrix.org>2015-08-04 09:32:23 +0100
commit4d6cb8814e134eba644afeed7bd49df0c7951342 (patch)
treef6d14b56fd5ec6691eb95f0c160b632b64dd192c
parentMerge pull request #210 from matrix-org/reg-v2a-password-skip (diff)
downloadsynapse-4d6cb8814e134eba644afeed7bd49df0c7951342.tar.xz
Speed up event filtering (for ACL) logic
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/storage/_base.py10
-rw-r--r--synapse/storage/state.py117
5 files changed, 102 insertions, 43 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f7155fd8d3..22f534e49a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -230,7 +230,11 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         states = yield self.store.get_state_for_events(
-            room_id, [e.event_id for e in events],
+            room_id, frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+                (EventTypes.Member, None),
+            )
         )
 
         events_and_states = zip(events, states)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9d6d4f0978..765b14d994 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -138,7 +138,11 @@ class MessageHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, room_id, events):
         states = yield self.store.get_state_for_events(
-            room_id, [e.event_id for e in events],
+            room_id, frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+                (EventTypes.Member, user_id),
+            )
         )
 
         events_and_states = zip(events, states)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6cff6230c1..8f58774b31 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -295,7 +295,11 @@ class SyncHandler(BaseHandler):
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, room_id, events):
         states = yield self.store.get_state_for_events(
-            room_id, [e.event_id for e in events],
+            room_id, frozenset(e.event_id for e in events),
+            types=(
+                (EventTypes.RoomHistoryVisibility, ""),
+                (EventTypes.Member, user_id),
+            )
         )
 
         events_and_states = zip(events, states)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 8f812f0fd7..7b76ee3b73 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -71,6 +71,11 @@ class Cache(object):
         self.thread = None
         caches_by_name[name] = self.cache
 
+        class Sentinel(object):
+            __slots__ = []
+
+        self.sentinel = Sentinel()
+
     def check_thread(self):
         expected_thread = self.thread
         if expected_thread is None:
@@ -85,9 +90,10 @@ class Cache(object):
         if len(keyargs) != self.keylen:
             raise ValueError("Expected a key to have %d items", self.keylen)
 
-        if keyargs in self.cache:
+        val = self.cache.get(keyargs, self.sentinel)
+        if val is not self.sentinel:
             cache_counter.inc_hits(self.name)
-            return self.cache[keyargs]
+            return val
 
         cache_counter.inc_misses(self.name)
         raise KeyError()
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 47bec65497..7e9bd232cf 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
 
 from twisted.internet import defer
 
+from synapse.util import unwrapFirstError
 from synapse.util.stringutils import random_string
 
 import logging
@@ -206,62 +207,102 @@ class StateStore(SQLBaseStore):
         events = yield self._get_events(event_ids, get_prev_content=False)
         defer.returnValue(events)
 
-    @defer.inlineCallbacks
-    def get_state_for_events(self, room_id, event_ids):
+    @cached(num_args=3, lru=True)
+    def _get_state_groups_from_group(self, room_id, group, types):
         def f(txn):
-            groups = set()
-            event_to_group = {}
-            for event_id in event_ids:
-                # TODO: Remove this loop.
-                group = self._simple_select_one_onecol_txn(
-                    txn,
-                    table="event_to_state_groups",
-                    keyvalues={"event_id": event_id},
-                    retcol="state_group",
-                    allow_none=True,
-                )
-                if group:
-                    event_to_group[event_id] = group
-                    groups.add(group)
-
-            group_to_state_ids = {}
-            for group in groups:
-                state_ids = self._simple_select_onecol_txn(
-                    txn,
-                    table="state_groups_state",
-                    keyvalues={"state_group": group},
-                    retcol="event_id",
-                )
+            sql = (
+                "SELECT event_id FROM state_groups_state WHERE"
+                " room_id = ? AND state_group = ? AND (%s)"
+            ) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),)
 
-                group_to_state_ids[group] = state_ids
+            args = [room_id, group]
+            args.extend([i for typ in types for i in typ])
+            txn.execute(sql, args)
 
-            return event_to_group, group_to_state_ids
+            return group, [
+                r[0]
+                for r in txn.fetchall()
+            ]
 
-        res = yield self.runInteraction(
-            "annotate_events_with_state_groups",
+        return self.runInteraction(
+            "_get_state_groups_from_group",
             f,
         )
 
-        event_to_group, group_to_state_ids = res
+    @cached(num_args=3, lru=True, max_entries=100000)
+    def _get_state_for_event_id(self, room_id, event_id, types):
+        def f(txn):
+            type_and_state_sql = " OR ".join([
+                "(type = ? AND state_key = ?)"
+                if typ[1] is not None
+                else "type = ?"
+                for typ in types
+            ])
 
-        state_list = yield defer.gatherResults(
+            sql = (
+                "SELECT sg.event_id FROM state_groups_state as sg"
+                " INNER JOIN event_to_state_groups as e"
+                " ON e.state_group = sg.state_group"
+                " WHERE e.event_id = ? AND (%s)"
+            ) % (type_and_state_sql,)
+
+            args = [event_id]
+            for typ, state_key in types:
+                args.extend(
+                    [typ, state_key] if state_key is not None else [typ]
+                )
+            txn.execute(sql, args)
+
+            return event_id, [
+                r[0]
+                for r in txn.fetchall()
+            ]
+
+        return self.runInteraction(
+            "_get_state_for_event_id",
+            f,
+        )
+
+    @defer.inlineCallbacks
+    def get_state_for_events(self, room_id, event_ids, types):
+        set_types = frozenset(types)
+        res = yield defer.gatherResults(
             [
-                self._fetch_events_for_group(group, vals)
-                for group, vals in group_to_state_ids.items()
+                self._get_state_for_event_id(
+                    room_id, event_id, set_types,
+                )
+                for event_id in event_ids
             ],
             consumeErrors=True,
+        ).addErrback(unwrapFirstError)
+
+        event_to_state_ids = dict(res)
+
+        event_dict = yield self._get_events(
+            [
+                item
+                for lst in event_to_state_ids.values()
+                for item in lst
+            ],
+            get_prev_content=False
+        ).addCallback(
+            lambda evs: {ev.event_id: ev for ev in evs}
         )
 
-        state_dict = {
-            group: {
+        event_to_state = {
+            event_id: {
                 (ev.type, ev.state_key): ev
-                for ev in state
+                for ev in [
+                    event_dict[state_id]
+                    for state_id in state_ids
+                    if state_id in event_dict
+                ]
             }
-            for group, state in state_list
+            for event_id, state_ids in event_to_state_ids.items()
         }
 
         defer.returnValue([
-            state_dict.get(event_to_group.get(event, None), None)
+            event_to_state[event]
             for event in event_ids
         ])