diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 0f86311ed4..59a50a5df9 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -19,6 +19,8 @@ from collections import namedtuple
from six import iteritems, itervalues
from six.moves import range
+import attr
+
from twisted.internet import defer
from synapse.api.constants import EventTypes
@@ -48,6 +50,318 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
return len(self.delta_ids) if self.delta_ids else 0
+@attr.s(slots=True)
+class StateFilter(object):
+ """A filter used when querying for state.
+
+ Attributes:
+ types (dict[str, set[str]|None]): Map from type to set of state keys (or
+ None). This specifies which state_keys for the given type to fetch
+ from the DB. If None then all events with that type are fetched. If
+ the set is empty then no events with that type are fetched.
+ include_others (bool): Whether to fetch events with types that do not
+ appear in `types`.
+ """
+
+ types = attr.ib()
+ include_others = attr.ib(default=False)
+
+ def __attrs_post_init__(self):
+ # If `include_others` is set we canonicalise the filter by removing
+ # wildcards from the types dictionary
+ if self.include_others:
+ self.types = {
+ k: v for k, v in iteritems(self.types)
+ if v is not None
+ }
+
+ @staticmethod
+ def all():
+ """Creates a filter that fetches everything.
+
+ Returns:
+ StateFilter
+ """
+ return StateFilter(types={}, include_others=True)
+
+ @staticmethod
+ def none():
+ """Creates a filter that fetches nothing.
+
+ Returns:
+ StateFilter
+ """
+ return StateFilter(types={}, include_others=False)
+
+ @staticmethod
+ def from_types(types):
+ """Creates a filter that only fetches the given types
+
+ Args:
+ types (Iterable[tuple[str, str|None]]): A list of type and state
+ keys to fetch. A state_key of None fetches everything for
+ that type
+
+ Returns:
+ StateFilter
+ """
+ type_dict = {}
+ for typ, s in types:
+ if typ in type_dict:
+ if type_dict[typ] is None:
+ continue
+
+ if s is None:
+ type_dict[typ] = None
+ continue
+
+ type_dict.setdefault(typ, set()).add(s)
+
+ return StateFilter(types=type_dict)
+
+ @staticmethod
+ def from_lazy_load_member_list(members):
+ """Creates a filter that returns all non-member events, plus the member
+ events for the given users
+
+ Args:
+ members (iterable[str]): Set of user IDs
+
+ Returns:
+ StateFilter
+ """
+ return StateFilter(
+ types={EventTypes.Member: set(members)},
+ include_others=True,
+ )
+
+ def return_expanded(self):
+ """Creates a new StateFilter where type wild cards have been removed
+ (except for memberships). The returned filter is a superset of the
+ current one, i.e. anything that passes the current filter will pass
+ the returned filter.
+
+ This helps the caching as the DictionaryCache knows if it has *all* the
+ state, but does not know if it has all of the keys of a particular type,
+ which makes wildcard lookups expensive unless we have a complete cache.
+ Hence, if we are doing a wildcard lookup, populate the cache fully so
+ that we can do an efficient lookup next time.
+
+ Note that since we have two caches, one for membership events and one for
+ other events, we can be a bit more clever than simply returning
+ `StateFilter.all()` if `has_wildcards()` is True.
+
+ We return a StateFilter where:
+ 1. the list of membership events to return is the same
+ 2. if there is a wildcard that matches non-member events we
+ return all non-member events
+
+ Returns:
+ StateFilter
+ """
+
+ if self.is_full():
+ # If we're going to return everything then there's nothing to do
+ return self
+
+ if not self.has_wildcards():
+ # If there are no wild cards, there's nothing to do
+ return self
+
+ if EventTypes.Member in self.types:
+ get_all_members = self.types[EventTypes.Member] is None
+ else:
+ get_all_members = self.include_others
+
+ has_non_member_wildcard = self.include_others or any(
+ state_keys is None
+ for t, state_keys in iteritems(self.types)
+ if t != EventTypes.Member
+ )
+
+ if not has_non_member_wildcard:
+ # If there are no non-member wild cards we can just return ourselves
+ return self
+
+ if get_all_members:
+ # We want to return everything.
+ return StateFilter.all()
+ else:
+ # We want to return all non-members, but only particular
+ # memberships
+ return StateFilter(
+ types={EventTypes.Member: self.types[EventTypes.Member]},
+ include_others=True,
+ )
+
+ def make_sql_filter_clause(self):
+ """Converts the filter to an SQL clause.
+
+ For example:
+
+ f = StateFilter.from_types([("m.room.create", "")])
+ clause, args = f.make_sql_filter_clause()
+ clause == "(type = ? AND state_key = ?)"
+ args == ['m.room.create', '']
+
+
+ Returns:
+ tuple[str, list]: The SQL string (may be empty) and arguments. An
+ empty SQL string is returned when the filter matches everything
+ (i.e. is "full").
+ """
+
+ where_clause = ""
+ where_args = []
+
+ if self.is_full():
+ return where_clause, where_args
+
+ if not self.include_others and not self.types:
+ # i.e. this is an empty filter, so we need to return a clause that
+ # will match nothing
+ return "1 = 2", []
+
+ # First we build up a lost of clauses for each type/state_key combo
+ clauses = []
+ for etype, state_keys in iteritems(self.types):
+ if state_keys is None:
+ clauses.append("(type = ?)")
+ where_args.append(etype)
+ continue
+
+ for state_key in state_keys:
+ clauses.append("(type = ? AND state_key = ?)")
+ where_args.extend((etype, state_key))
+
+ # This will match anything that appears in `self.types`
+ where_clause = " OR ".join(clauses)
+
+ # If we want to include stuff that's not in the types dict then we add
+ # a `OR type NOT IN (...)` clause to the end.
+ if self.include_others:
+ if where_clause:
+ where_clause += " OR "
+
+ where_clause += "type NOT IN (%s)" % (
+ ",".join(["?"] * len(self.types)),
+ )
+ where_args.extend(self.types)
+
+ return where_clause, where_args
+
+ def max_entries_returned(self):
+ """Returns the maximum number of entries this filter will return if
+ known, otherwise returns None.
+
+ For example a simple state filter asking for `("m.room.create", "")`
+ will return 1, whereas the default state filter will return None.
+
+ This is used to bail out early if the right number of entries have been
+ fetched.
+ """
+ if self.has_wildcards():
+ return None
+
+ return len(self.concrete_types())
+
+ def filter_state(self, state_dict):
+ """Returns the state filtered with by this StateFilter
+
+ Args:
+ state (dict[tuple[str, str], Any]): The state map to filter
+
+ Returns:
+ dict[tuple[str, str], Any]: The filtered state map
+ """
+ if self.is_full():
+ return dict(state_dict)
+
+ filtered_state = {}
+ for k, v in iteritems(state_dict):
+ typ, state_key = k
+ if typ in self.types:
+ state_keys = self.types[typ]
+ if state_keys is None or state_key in state_keys:
+ filtered_state[k] = v
+ elif self.include_others:
+ filtered_state[k] = v
+
+ return filtered_state
+
+ def is_full(self):
+ """Whether this filter fetches everything or not
+
+ Returns:
+ bool
+ """
+ return self.include_others and not self.types
+
+ def has_wildcards(self):
+ """Whether the filter includes wildcards or is attempting to fetch
+ specific state.
+
+ Returns:
+ bool
+ """
+
+ return (
+ self.include_others
+ or any(
+ state_keys is None
+ for state_keys in itervalues(self.types)
+ )
+ )
+
+ def concrete_types(self):
+ """Returns a list of concrete type/state_keys (i.e. not None) that
+ will be fetched. This will be a complete list if `has_wildcards`
+ returns False, but otherwise will be a subset (or even empty).
+
+ Returns:
+ list[tuple[str,str]]
+ """
+ return [
+ (t, s)
+ for t, state_keys in iteritems(self.types)
+ if state_keys is not None
+ for s in state_keys
+ ]
+
+ def get_member_split(self):
+ """Return the filter split into two: one which assumes it's exclusively
+ matching against member state, and one which assumes it's matching
+ against non member state.
+
+ This is useful due to the returned filters giving correct results for
+ `is_full()`, `has_wildcards()`, etc, when operating against maps that
+ either exclusively contain member events or only contain non-member
+ events. (Which is the case when dealing with the member vs non-member
+ state caches).
+
+ Returns:
+ tuple[StateFilter, StateFilter]: The member and non member filters
+ """
+
+ if EventTypes.Member in self.types:
+ state_keys = self.types[EventTypes.Member]
+ if state_keys is None:
+ member_filter = StateFilter.all()
+ else:
+ member_filter = StateFilter({EventTypes.Member: state_keys})
+ elif self.include_others:
+ member_filter = StateFilter.all()
+ else:
+ member_filter = StateFilter.none()
+
+ non_member_filter = StateFilter(
+ types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member},
+ include_others=self.include_others,
+ )
+
+ return member_filter, non_member_filter
+
+
# this inherits from EventsWorkerStore because it calls self.get_events
class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"""The parts of StateGroupStore that can be called from workers.
@@ -152,61 +466,41 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
# FIXME: how should this be cached?
- def get_filtered_current_state_ids(self, room_id, types, filtered_types=None):
+ def get_filtered_current_state_ids(self, room_id, state_filter=StateFilter.all()):
"""Get the current state event of a given type for a room based on the
current_state_events table. This may not be as up-to-date as the result
of doing a fresh state resolution as per state_handler.get_current_state
+
Args:
room_id (str)
- types (list[(Str, (Str|None))]): List of (type, state_key) tuples
- which are used to filter the state fetched. `state_key` may be
- None, which matches any `state_key`
- filtered_types (list[Str]|None): List of types to apply the above filter to.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+
Returns:
- deferred: dict of (type, state_key) -> event
+ Deferred[dict[tuple[str, str], str]]: Map from type/state_key to
+ event ID.
"""
- include_other_types = False if filtered_types is None else True
-
def _get_filtered_current_state_ids_txn(txn):
results = {}
- sql = """SELECT type, state_key, event_id FROM current_state_events
- WHERE room_id = ? %s"""
- # Turns out that postgres doesn't like doing a list of OR's and
- # is about 1000x slower, so we just issue a query for each specific
- # type seperately.
- if types:
- clause_to_args = [
- (
- "AND type = ? AND state_key = ?",
- (etype, state_key)
- ) if state_key is not None else (
- "AND type = ?",
- (etype,)
- )
- for etype, state_key in types
- ]
-
- if include_other_types:
- unique_types = set(filtered_types)
- clause_to_args.append(
- (
- "AND type <> ? " * len(unique_types),
- list(unique_types)
- )
- )
- else:
- # If types is None we fetch all the state, and so just use an
- # empty where clause with no extra args.
- clause_to_args = [("", [])]
- for where_clause, where_args in clause_to_args:
- args = [room_id]
- args.extend(where_args)
- txn.execute(sql % (where_clause,), args)
- for row in txn:
- typ, state_key, event_id = row
- key = (intern_string(typ), intern_string(state_key))
- results[key] = event_id
+ sql = """
+ SELECT type, state_key, event_id FROM current_state_events
+ WHERE room_id = ?
+ """
+
+ where_clause, where_args = state_filter.make_sql_filter_clause()
+
+ if where_clause:
+ sql += " AND (%s)" % (where_clause,)
+
+ args = [room_id]
+ args.extend(where_args)
+ txn.execute(sql, args)
+ for row in txn:
+ typ, state_key, event_id = row
+ key = (intern_string(typ), intern_string(state_key))
+ results[key] = event_id
+
return results
return self.runInteraction(
@@ -322,20 +616,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
})
@defer.inlineCallbacks
- def _get_state_groups_from_groups(self, groups, types, members=None):
+ def _get_state_groups_from_groups(self, groups, state_filter):
"""Returns the state groups for a given set of groups, filtering on
types of state events.
Args:
groups(list[int]): list of state group IDs to query
- types (Iterable[str, str|None]|None): list of 2-tuples of the form
- (`type`, `state_key`), where a `state_key` of `None` matches all
- state_keys for the `type`. If None, all types are returned.
- members (bool|None): If not None, then, in addition to any filtering
- implied by types, the results are also filtered to only include
- member events (if True), or to exclude member events (if False)
-
- Returns:
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
@@ -346,19 +634,23 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
for chunk in chunks:
res = yield self.runInteraction(
"_get_state_groups_from_groups",
- self._get_state_groups_from_groups_txn, chunk, types, members,
+ self._get_state_groups_from_groups_txn, chunk, state_filter,
)
results.update(res)
defer.returnValue(results)
def _get_state_groups_from_groups_txn(
- self, txn, groups, types=None, members=None,
+ self, txn, groups, state_filter=StateFilter.all(),
):
results = {group: {} for group in groups}
- if types is not None:
- types = list(set(types)) # deduplicate types list
+ where_clause, where_args = state_filter.make_sql_filter_clause()
+
+ # Unless the filter clause is empty, we're going to append it after an
+ # existing where clause
+ if where_clause:
+ where_clause = " AND (%s)" % (where_clause,)
if isinstance(self.database_engine, PostgresEngine):
# Temporarily disable sequential scans in this transaction. This is
@@ -374,79 +666,33 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# group for the given type, state_key.
# This may return multiple rows per (type, state_key), but last_value
# should be the same.
- sql = ("""
+ sql = """
WITH RECURSIVE state(state_group) AS (
VALUES(?::bigint)
UNION ALL
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
- SELECT type, state_key, last_value(event_id) OVER (
+ SELECT DISTINCT type, state_key, last_value(event_id) OVER (
PARTITION BY type, state_key ORDER BY state_group ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS event_id FROM state_groups_state
WHERE state_group IN (
SELECT state_group FROM state
)
- %s
- """)
-
- if members is True:
- sql += " AND type = '%s'" % (EventTypes.Member,)
- elif members is False:
- sql += " AND type <> '%s'" % (EventTypes.Member,)
-
- # Turns out that postgres doesn't like doing a list of OR's and
- # is about 1000x slower, so we just issue a query for each specific
- # type seperately.
- if types is not None:
- clause_to_args = [
- (
- "AND type = ? AND state_key = ?",
- (etype, state_key)
- ) if state_key is not None else (
- "AND type = ?",
- (etype,)
- )
- for etype, state_key in types
- ]
- else:
- # If types is None we fetch all the state, and so just use an
- # empty where clause with no extra args.
- clause_to_args = [("", [])]
+ """
- for where_clause, where_args in clause_to_args:
- for group in groups:
- args = [group]
- args.extend(where_args)
+ for group in groups:
+ args = [group]
+ args.extend(where_args)
- txn.execute(sql % (where_clause,), args)
- for row in txn:
- typ, state_key, event_id = row
- key = (typ, state_key)
- results[group][key] = event_id
+ txn.execute(sql + where_clause, args)
+ for row in txn:
+ typ, state_key, event_id = row
+ key = (typ, state_key)
+ results[group][key] = event_id
else:
- where_args = []
- where_clauses = []
- wildcard_types = False
- if types is not None:
- for typ in types:
- if typ[1] is None:
- where_clauses.append("(type = ?)")
- where_args.append(typ[0])
- wildcard_types = True
- else:
- where_clauses.append("(type = ? AND state_key = ?)")
- where_args.extend([typ[0], typ[1]])
-
- where_clause = "AND (%s)" % (" OR ".join(where_clauses))
- else:
- where_clause = ""
-
- if members is True:
- where_clause += " AND type = '%s'" % EventTypes.Member
- elif members is False:
- where_clause += " AND type <> '%s'" % EventTypes.Member
+ max_entries_returned = state_filter.max_entries_returned()
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
@@ -460,12 +706,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# without the right indices (which we can't add until
# after we finish deduping state, which requires this func)
args = [next_group]
- if types:
- args.extend(where_args)
+ args.extend(where_args)
txn.execute(
"SELECT type, state_key, event_id FROM state_groups_state"
- " WHERE state_group = ? %s" % (where_clause,),
+ " WHERE state_group = ? " + where_clause,
args
)
results[group].update(
@@ -481,9 +726,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# wildcards (i.e. Nones) in which case we have to do an exhaustive
# search
if (
- types is not None and
- not wildcard_types and
- len(results[group]) == len(types)
+ max_entries_returned is not None and
+ len(results[group]) == max_entries_returned
):
break
@@ -498,20 +742,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return results
@defer.inlineCallbacks
- def get_state_for_events(self, event_ids, types, filtered_types=None):
+ def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
"""Given a list of event_ids and type tuples, return a list of state
- dicts for each event. The state dicts will only have the type/state_keys
- that are in the `types` list.
+ dicts for each event.
Args:
event_ids (list[string])
- types (list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
deferred: A dict of (event_id) -> (type, state_key) -> [state_events]
@@ -521,7 +759,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
groups = set(itervalues(event_to_groups))
- group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
+ group_to_state = yield self._get_state_for_groups(groups, state_filter)
state_event_map = yield self.get_events(
[ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
@@ -540,20 +778,15 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
+ def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()):
"""
Get the state dicts corresponding to a list of events, containing the event_ids
of the state events (as opposed to the events themselves)
Args:
event_ids(list(str)): events whose state should be returned
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A deferred dict from event_id -> (type, state_key) -> event_id
@@ -563,7 +796,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
groups = set(itervalues(event_to_groups))
- group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
+ group_to_state = yield self._get_state_for_groups(groups, state_filter)
event_to_state = {
event_id: group_to_state[group]
@@ -573,45 +806,35 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_for_event(self, event_id, types=None, filtered_types=None):
+ def get_state_for_event(self, event_id, state_filter=StateFilter.all()):
"""
Get the state dict corresponding to a particular event
Args:
event_id(str): event whose state should be returned
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A deferred dict from (type, state_key) -> state_event
"""
- state_map = yield self.get_state_for_events([event_id], types, filtered_types)
+ state_map = yield self.get_state_for_events([event_id], state_filter)
defer.returnValue(state_map[event_id])
@defer.inlineCallbacks
- def get_state_ids_for_event(self, event_id, types=None, filtered_types=None):
+ def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()):
"""
Get the state dict corresponding to a particular event
Args:
event_id(str): event whose state should be returned
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
A deferred dict from (type, state_key) -> state_event
"""
- state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types)
+ state_map = yield self.get_state_ids_for_events([event_id], state_filter)
defer.returnValue(state_map[event_id])
@cached(max_entries=50000)
@@ -642,18 +865,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
- def _get_some_state_from_cache(self, cache, group, types, filtered_types=None):
+ def _get_state_for_group_using_cache(self, cache, group, state_filter):
"""Checks if group is in cache. See `_get_state_for_groups`
Args:
cache(DictionaryCache): the state group cache to use
group(int): The state group to lookup
- types(list[str, str|None]): List of 2-tuples of the form
- (`type`, `state_key`), where a `state_key` of `None` matches all
- state_keys for the `type`.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns 2-tuple (`state_dict`, `got_all`).
`got_all` is a bool indicating if we successfully retrieved all
@@ -662,124 +881,102 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
is_all, known_absent, state_dict_ids = cache.get(group)
- type_to_key = {}
+ if is_all or state_filter.is_full():
+ # Either we have everything or want everything, either way
+ # `is_all` tells us whether we've gotten everything.
+ return state_filter.filter_state(state_dict_ids), is_all
# tracks whether any of our requested types are missing from the cache
missing_types = False
- for typ, state_key in types:
- key = (typ, state_key)
-
- if (
- state_key is None or
- (filtered_types is not None and typ not in filtered_types)
- ):
- type_to_key[typ] = None
- # we mark the type as missing from the cache because
- # when the cache was populated it might have been done with a
- # restricted set of state_keys, so the wildcard will not work
- # and the cache may be incomplete.
- missing_types = True
- else:
- if type_to_key.get(typ, object()) is not None:
- type_to_key.setdefault(typ, set()).add(state_key)
-
+ if state_filter.has_wildcards():
+ # We don't know if we fetched all the state keys for the types in
+ # the filter that are wildcards, so we have to assume that we may
+ # have missed some.
+ missing_types = True
+ else:
+ # There aren't any wild cards, so `concrete_types()` returns the
+ # complete list of event types we're wanting.
+ for key in state_filter.concrete_types():
if key not in state_dict_ids and key not in known_absent:
missing_types = True
+ break
- sentinel = object()
-
- def include(typ, state_key):
- valid_state_keys = type_to_key.get(typ, sentinel)
- if valid_state_keys is sentinel:
- return filtered_types is not None and typ not in filtered_types
- if valid_state_keys is None:
- return True
- if state_key in valid_state_keys:
- return True
- return False
-
- got_all = is_all
- if not got_all:
- # the cache is incomplete. We may still have got all the results we need, if
- # we don't have any wildcards in the match list.
- if not missing_types and filtered_types is None:
- got_all = True
-
- return {
- k: v for k, v in iteritems(state_dict_ids)
- if include(k[0], k[1])
- }, got_all
-
- def _get_all_state_from_cache(self, cache, group):
- """Checks if group is in cache. See `_get_state_for_groups`
-
- Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool
- indicating if we successfully retrieved all requests state from the
- cache, if False we need to query the DB for the missing state.
-
- Args:
- cache(DictionaryCache): the state group cache to use
- group: The state group to lookup
- """
- is_all, _, state_dict_ids = cache.get(group)
-
- return state_dict_ids, is_all
+ return state_filter.filter_state(state_dict_ids), not missing_types
@defer.inlineCallbacks
- def _get_state_for_groups(self, groups, types=None, filtered_types=None):
+ def _get_state_for_groups(self, groups, state_filter=StateFilter.all()):
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
Args:
groups (iterable[int]): list of state groups for which we want
to get the state.
- types (None|iterable[(str, None|str)]):
- indicates the state type/keys required. If None, the whole
- state is fetched and returned.
-
- Otherwise, each entry should be a `(type, state_key)` tuple to
- include in the response. A `state_key` of None is a wildcard
- meaning that we require all state with that type.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
-
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
Deferred[dict[int, dict[tuple[str, str], str]]]:
dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
- if types is not None:
- non_member_types = [t for t in types if t[0] != EventTypes.Member]
-
- if filtered_types is not None and EventTypes.Member not in filtered_types:
- # we want all of the membership events
- member_types = None
- else:
- member_types = [t for t in types if t[0] == EventTypes.Member]
- else:
- non_member_types = None
- member_types = None
+ member_filter, non_member_filter = state_filter.get_member_split()
- non_member_state = yield self._get_state_for_groups_using_cache(
- groups, self._state_group_cache, non_member_types, filtered_types,
+ # Now we look them up in the member and non-member caches
+ non_member_state, incomplete_groups_nm, = (
+ yield self._get_state_for_groups_using_cache(
+ groups, self._state_group_cache,
+ state_filter=non_member_filter,
+ )
)
- # XXX: we could skip this entirely if member_types is []
- member_state = yield self._get_state_for_groups_using_cache(
- # we set filtered_types=None as member_state only ever contain members.
- groups, self._state_group_members_cache, member_types, None,
+
+ member_state, incomplete_groups_m, = (
+ yield self._get_state_for_groups_using_cache(
+ groups, self._state_group_members_cache,
+ state_filter=member_filter,
+ )
)
- state = non_member_state
+ state = dict(non_member_state)
for group in groups:
state[group].update(member_state[group])
+ # Now fetch any missing groups from the database
+
+ incomplete_groups = incomplete_groups_m | incomplete_groups_nm
+
+ if not incomplete_groups:
+ defer.returnValue(state)
+
+ cache_sequence_nm = self._state_group_cache.sequence
+ cache_sequence_m = self._state_group_members_cache.sequence
+
+ # Help the cache hit ratio by expanding the filter a bit
+ db_state_filter = state_filter.return_expanded()
+
+ group_to_state_dict = yield self._get_state_groups_from_groups(
+ list(incomplete_groups),
+ state_filter=db_state_filter,
+ )
+
+ # Now lets update the caches
+ self._insert_into_cache(
+ group_to_state_dict,
+ db_state_filter,
+ cache_seq_num_members=cache_sequence_m,
+ cache_seq_num_non_members=cache_sequence_nm,
+ )
+
+ # And finally update the result dict, by filtering out any extra
+ # stuff we pulled out of the database.
+ for group, group_state_dict in iteritems(group_to_state_dict):
+ # We just replace any existing entries, as we will have loaded
+ # everything we need from the database anyway.
+ state[group] = state_filter.filter_state(group_state_dict)
+
defer.returnValue(state)
- @defer.inlineCallbacks
def _get_state_for_groups_using_cache(
- self, groups, cache, types=None, filtered_types=None
+ self, groups, cache, state_filter,
):
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key, querying from a specific cache.
@@ -790,89 +987,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
cache (DictionaryCache): the cache of group ids to state dicts which
we will pass through - either the normal state cache or the specific
members state cache.
- types (None|iterable[(str, None|str)]):
- indicates the state type/keys required. If None, the whole
- state is fetched and returned.
-
- Otherwise, each entry should be a `(type, state_key)` tuple to
- include in the response. A `state_key` of None is a wildcard
- meaning that we require all state with that type.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
Returns:
- Deferred[dict[int, dict[tuple[str, str], str]]]:
- dict of state_group_id -> (dict of (type, state_key) -> event id)
+ tuple[dict[int, dict[tuple[str, str], str]], set[int]]: Tuple of
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ of entries in the cache, and the state group ids either missing
+ from the cache or incomplete.
"""
- if types:
- types = frozenset(types)
results = {}
- missing_groups = []
- if types is not None:
- for group in set(groups):
- state_dict_ids, got_all = self._get_some_state_from_cache(
- cache, group, types, filtered_types
- )
- results[group] = state_dict_ids
+ incomplete_groups = set()
+ for group in set(groups):
+ state_dict_ids, got_all = self._get_state_for_group_using_cache(
+ cache, group, state_filter
+ )
+ results[group] = state_dict_ids
- if not got_all:
- missing_groups.append(group)
- else:
- for group in set(groups):
- state_dict_ids, got_all = self._get_all_state_from_cache(
- cache, group
- )
+ if not got_all:
+ incomplete_groups.add(group)
- results[group] = state_dict_ids
+ return results, incomplete_groups
- if not got_all:
- missing_groups.append(group)
+ def _insert_into_cache(self, group_to_state_dict, state_filter,
+ cache_seq_num_members, cache_seq_num_non_members):
+ """Inserts results from querying the database into the relevant cache.
- if missing_groups:
- # Okay, so we have some missing_types, let's fetch them.
- cache_seq_num = cache.sequence
+ Args:
+ group_to_state_dict (dict): The new entries pulled from database.
+ Map from state group to state dict
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
+ cache_seq_num_members (int): Sequence number of member cache since
+ last lookup in cache
+ cache_seq_num_non_members (int): Sequence number of member cache since
+ last lookup in cache
+ """
- # the DictionaryCache knows if it has *all* the state, but
- # does not know if it has all of the keys of a particular type,
- # which makes wildcard lookups expensive unless we have a complete
- # cache. Hence, if we are doing a wildcard lookup, populate the
- # cache fully so that we can do an efficient lookup next time.
+ # We need to work out which types we've fetched from the DB for the
+ # member vs non-member caches. This should be as accurate as possible,
+ # but can be an underestimate (e.g. when we have wild cards)
- if filtered_types or (types and any(k is None for (t, k) in types)):
- types_to_fetch = None
- else:
- types_to_fetch = types
+ member_filter, non_member_filter = state_filter.get_member_split()
+ if member_filter.is_full():
+ # We fetched all member events
+ member_types = None
+ else:
+ # `concrete_types()` will only return a subset when there are wild
+ # cards in the filter, but that's fine.
+ member_types = member_filter.concrete_types()
- group_to_state_dict = yield self._get_state_groups_from_groups(
- missing_groups, types_to_fetch, cache == self._state_group_members_cache,
- )
+ if non_member_filter.is_full():
+ # We fetched all non member events
+ non_member_types = None
+ else:
+ non_member_types = non_member_filter.concrete_types()
+
+ for group, group_state_dict in iteritems(group_to_state_dict):
+ state_dict_members = {}
+ state_dict_non_members = {}
- for group, group_state_dict in iteritems(group_to_state_dict):
- state_dict = results[group]
-
- # update the result, filtering by `types`.
- if types:
- for k, v in iteritems(group_state_dict):
- (typ, _) = k
- if (
- (k in types or (typ, None) in types) or
- (filtered_types and typ not in filtered_types)
- ):
- state_dict[k] = v
+ for k, v in iteritems(group_state_dict):
+ if k[0] == EventTypes.Member:
+ state_dict_members[k] = v
else:
- state_dict.update(group_state_dict)
-
- # update the cache with all the things we fetched from the
- # database.
- cache.update(
- cache_seq_num,
- key=group,
- value=group_state_dict,
- fetched_keys=types_to_fetch,
- )
+ state_dict_non_members[k] = v
- defer.returnValue(results)
+ self._state_group_members_cache.update(
+ cache_seq_num_members,
+ key=group,
+ value=state_dict_members,
+ fetched_keys=member_types,
+ )
+
+ self._state_group_cache.update(
+ cache_seq_num_non_members,
+ key=group,
+ value=state_dict_non_members,
+ fetched_keys=non_member_types,
+ )
def store_state_group(self, event_id, room_id, prev_group, delta_ids,
current_state_ids):
@@ -1268,12 +1461,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
continue
prev_state = self._get_state_groups_from_groups_txn(
- txn, [prev_group], types=None
+ txn, [prev_group],
)
prev_state = prev_state[prev_group]
curr_state = self._get_state_groups_from_groups_txn(
- txn, [state_group], types=None
+ txn, [state_group],
)
curr_state = curr_state[state_group]
|