diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f7155fd8d3..22f534e49a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -230,7 +230,11 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
states = yield self.store.get_state_for_events(
- room_id, [e.event_id for e in events],
+ room_id, frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, None),
+ )
)
events_and_states = zip(events, states)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 9d6d4f0978..765b14d994 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -138,7 +138,11 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events):
states = yield self.store.get_state_for_events(
- room_id, [e.event_id for e in events],
+ room_id, frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, user_id),
+ )
)
events_and_states = zip(events, states)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6cff6230c1..8f58774b31 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -295,7 +295,11 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def _filter_events_for_client(self, user_id, room_id, events):
states = yield self.store.get_state_for_events(
- room_id, [e.event_id for e in events],
+ room_id, frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, user_id),
+ )
)
events_and_states = zip(events, states)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 8f812f0fd7..7b76ee3b73 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -71,6 +71,11 @@ class Cache(object):
self.thread = None
caches_by_name[name] = self.cache
+ class Sentinel(object):
+ __slots__ = []
+
+ self.sentinel = Sentinel()
+
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
@@ -85,9 +90,10 @@ class Cache(object):
if len(keyargs) != self.keylen:
raise ValueError("Expected a key to have %d items", self.keylen)
- if keyargs in self.cache:
+ val = self.cache.get(keyargs, self.sentinel)
+ if val is not self.sentinel:
cache_counter.inc_hits(self.name)
- return self.cache[keyargs]
+ return val
cache_counter.inc_misses(self.name)
raise KeyError()
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 47bec65497..7e9bd232cf 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore, cached
from twisted.internet import defer
+from synapse.util import unwrapFirstError
from synapse.util.stringutils import random_string
import logging
@@ -206,62 +207,102 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
- @defer.inlineCallbacks
- def get_state_for_events(self, room_id, event_ids):
+ @cached(num_args=3, lru=True)
+ def _get_state_groups_from_group(self, room_id, group, types):
def f(txn):
- groups = set()
- event_to_group = {}
- for event_id in event_ids:
- # TODO: Remove this loop.
- group = self._simple_select_one_onecol_txn(
- txn,
- table="event_to_state_groups",
- keyvalues={"event_id": event_id},
- retcol="state_group",
- allow_none=True,
- )
- if group:
- event_to_group[event_id] = group
- groups.add(group)
-
- group_to_state_ids = {}
- for group in groups:
- state_ids = self._simple_select_onecol_txn(
- txn,
- table="state_groups_state",
- keyvalues={"state_group": group},
- retcol="event_id",
- )
+ sql = (
+ "SELECT event_id FROM state_groups_state WHERE"
+ " room_id = ? AND state_group = ? AND (%s)"
+ ) % (" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),)
- group_to_state_ids[group] = state_ids
+ args = [room_id, group]
+ args.extend([i for typ in types for i in typ])
+ txn.execute(sql, args)
- return event_to_group, group_to_state_ids
+ return group, [
+ r[0]
+ for r in txn.fetchall()
+ ]
- res = yield self.runInteraction(
- "annotate_events_with_state_groups",
+ return self.runInteraction(
+ "_get_state_groups_from_group",
f,
)
- event_to_group, group_to_state_ids = res
+ @cached(num_args=3, lru=True, max_entries=100000)
+ def _get_state_for_event_id(self, room_id, event_id, types):
+ def f(txn):
+ type_and_state_sql = " OR ".join([
+ "(type = ? AND state_key = ?)"
+ if typ[1] is not None
+ else "type = ?"
+ for typ in types
+ ])
- state_list = yield defer.gatherResults(
+ sql = (
+ "SELECT sg.event_id FROM state_groups_state as sg"
+ " INNER JOIN event_to_state_groups as e"
+ " ON e.state_group = sg.state_group"
+ " WHERE e.event_id = ? AND (%s)"
+ ) % (type_and_state_sql,)
+
+ args = [event_id]
+ for typ, state_key in types:
+ args.extend(
+ [typ, state_key] if state_key is not None else [typ]
+ )
+ txn.execute(sql, args)
+
+ return event_id, [
+ r[0]
+ for r in txn.fetchall()
+ ]
+
+ return self.runInteraction(
+ "_get_state_for_event_id",
+ f,
+ )
+
+ @defer.inlineCallbacks
+ def get_state_for_events(self, room_id, event_ids, types):
+ set_types = frozenset(types)
+ res = yield defer.gatherResults(
[
- self._fetch_events_for_group(group, vals)
- for group, vals in group_to_state_ids.items()
+ self._get_state_for_event_id(
+ room_id, event_id, set_types,
+ )
+ for event_id in event_ids
],
consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ event_to_state_ids = dict(res)
+
+ event_dict = yield self._get_events(
+ [
+ item
+ for lst in event_to_state_ids.values()
+ for item in lst
+ ],
+ get_prev_content=False
+ ).addCallback(
+ lambda evs: {ev.event_id: ev for ev in evs}
)
- state_dict = {
- group: {
+ event_to_state = {
+ event_id: {
(ev.type, ev.state_key): ev
- for ev in state
+ for ev in [
+ event_dict[state_id]
+ for state_id in state_ids
+ if state_id in event_dict
+ ]
}
- for group, state in state_list
+ for event_id, state_ids in event_to_state_ids.items()
}
defer.returnValue([
- state_dict.get(event_to_group.get(event, None), None)
+ event_to_state[event]
for event in event_ids
])
|