diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..b27b3ae144 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -186,7 +186,17 @@ class StateGroupWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_state_groups_from_groups(self, groups, types):
- """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
+ """Returns the state groups for a given set of groups, filtering on
+ types of state events.
+
+ Args:
+ groups(list[int]): list of state group IDs to query
+ types (Iterable[str, str|None]|None): list of 2-tuples of the form
+ (`type`, `state_key`), where a `state_key` of `None` matches all
+ state_keys for the `type`. If None, all types are returned.
+
+ Returns:
+ dictionary state_group -> (dict of (type, state_key) -> event id)
"""
results = {}
@@ -200,8 +210,11 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue(results)
- def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+ def _get_state_groups_from_groups_txn(
+ self, txn, groups, types=None,
+ ):
results = {group: {} for group in groups}
+
if types is not None:
types = list(set(types)) # deduplicate types list
@@ -239,7 +252,7 @@ class StateGroupWorkerStore(SQLBaseStore):
# Turns out that postgres doesn't like doing a list of OR's and
# is about 1000x slower, so we just issue a query for each specific
# type seperately.
- if types:
+ if types is not None:
clause_to_args = [
(
"AND type = ? AND state_key = ?",
@@ -278,6 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
else:
where_clauses.append("(type = ? AND state_key = ?)")
where_args.extend([typ[0], typ[1]])
+
where_clause = "AND (%s)" % (" OR ".join(where_clauses))
else:
where_clause = ""
@@ -332,16 +346,20 @@ class StateGroupWorkerStore(SQLBaseStore):
return results
@defer.inlineCallbacks
- def get_state_for_events(self, event_ids, types):
+ def get_state_for_events(self, event_ids, types, filtered_types=None):
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event. The state dicts will only have the type/state_keys
that are in the `types` list.
Args:
- event_ids (list)
- types (list): List of (type, state_key) tuples which are used to
- filter the state fetched. `state_key` may be None, which matches
- any `state_key`
+ event_ids (list[string])
+ types (list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
deferred: A list of dicts corresponding to the event_ids given.
@@ -352,7 +370,7 @@ class StateGroupWorkerStore(SQLBaseStore):
)
groups = set(itervalues(event_to_groups))
- group_to_state = yield self._get_state_for_groups(groups, types)
+ group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
state_event_map = yield self.get_events(
[ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
@@ -371,15 +389,19 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_ids_for_events(self, event_ids, types=None):
+ def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
"""
Get the state dicts corresponding to a list of events
Args:
event_ids(list(str)): events whose state should be returned
- types(list[(str, str)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A deferred dict from event_id -> (type, state_key) -> state_event
@@ -389,7 +411,7 @@ class StateGroupWorkerStore(SQLBaseStore):
)
groups = set(itervalues(event_to_groups))
- group_to_state = yield self._get_state_for_groups(groups, types)
+ group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
event_to_state = {
event_id: group_to_state[group]
@@ -399,37 +421,45 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_for_event(self, event_id, types=None):
+ def get_state_for_event(self, event_id, types=None, filtered_types=None):
"""
Get the state dict corresponding to a particular event
Args:
event_id(str): event whose state should be returned
- types(list[(str, str)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A deferred dict from (type, state_key) -> state_event
"""
- state_map = yield self.get_state_for_events([event_id], types)
+ state_map = yield self.get_state_for_events([event_id], types, filtered_types)
defer.returnValue(state_map[event_id])
@defer.inlineCallbacks
- def get_state_ids_for_event(self, event_id, types=None):
+ def get_state_ids_for_event(self, event_id, types=None, filtered_types=None):
"""
Get the state dict corresponding to a particular event
Args:
event_id(str): event whose state should be returned
- types(list[(str, str)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A deferred dict from (type, state_key) -> state_event
"""
- state_map = yield self.get_state_ids_for_events([event_id], types)
+ state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types)
defer.returnValue(state_map[event_id])
@cached(max_entries=50000)
@@ -460,56 +490,73 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
- def _get_some_state_from_cache(self, group, types):
+ def _get_some_state_from_cache(self, group, types, filtered_types=None):
"""Checks if group is in cache. See `_get_state_for_groups`
- Returns 3-tuple (`state_dict`, `missing_types`, `got_all`).
- `missing_types` is the list of types that aren't in the cache for that
- group. `got_all` is a bool indicating if we successfully retrieved all
+ Args:
+ group(int): The state group to lookup
+ types(list[str, str|None]): List of 2-tuples of the form
+ (`type`, `state_key`), where a `state_key` of `None` matches all
+ state_keys for the `type`.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
+
+ Returns 2-tuple (`state_dict`, `got_all`).
+ `got_all` is a bool indicating if we successfully retrieved all
requests state from the cache, if False we need to query the DB for the
missing state.
-
- Args:
- group: The state group to lookup
- types (list): List of 2-tuples of the form (`type`, `state_key`),
- where a `state_key` of `None` matches all state_keys for the
- `type`.
"""
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
- missing_types = set()
+
+ # tracks whether any of ourrequested types are missing from the cache
+ missing_types = False
for typ, state_key in types:
key = (typ, state_key)
- if state_key is None:
+
+ if (
+ state_key is None or
+ (filtered_types is not None and typ not in filtered_types)
+ ):
type_to_key[typ] = None
- missing_types.add(key)
+ # we mark the type as missing from the cache because
+ # when the cache was populated it might have been done with a
+ # restricted set of state_keys, so the wildcard will not work
+ # and the cache may be incomplete.
+ missing_types = True
else:
if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key)
if key not in state_dict_ids and key not in known_absent:
- missing_types.add(key)
+ missing_types = True
sentinel = object()
def include(typ, state_key):
valid_state_keys = type_to_key.get(typ, sentinel)
if valid_state_keys is sentinel:
- return False
+ return filtered_types is not None and typ not in filtered_types
if valid_state_keys is None:
return True
if state_key in valid_state_keys:
return True
return False
- got_all = is_all or not missing_types
+ got_all = is_all
+ if not got_all:
+ # the cache is incomplete. We may still have got all the results we need, if
+ # we don't have any wildcards in the match list.
+ if not missing_types and filtered_types is None:
+ got_all = True
return {
k: v for k, v in iteritems(state_dict_ids)
if include(k[0], k[1])
- }, missing_types, got_all
+ }, got_all
def _get_all_state_from_cache(self, group):
"""Checks if group is in cache. See `_get_state_for_groups`
@@ -526,7 +573,7 @@ class StateGroupWorkerStore(SQLBaseStore):
return state_dict_ids, is_all
@defer.inlineCallbacks
- def _get_state_for_groups(self, groups, types=None):
+ def _get_state_for_groups(self, groups, types=None, filtered_types=None):
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
@@ -540,6 +587,9 @@ class StateGroupWorkerStore(SQLBaseStore):
Otherwise, each entry should be a `(type, state_key)` tuple to
include in the response. A `state_key` of None is a wildcard
meaning that we require all state with that type.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
@@ -551,8 +601,8 @@ class StateGroupWorkerStore(SQLBaseStore):
missing_groups = []
if types is not None:
for group in set(groups):
- state_dict_ids, _, got_all = self._get_some_state_from_cache(
- group, types,
+ state_dict_ids, got_all = self._get_some_state_from_cache(
+ group, types, filtered_types
)
results[group] = state_dict_ids
@@ -579,13 +629,13 @@ class StateGroupWorkerStore(SQLBaseStore):
# cache. Hence, if we are doing a wildcard lookup, populate the
# cache fully so that we can do an efficient lookup next time.
- if types and any(k is None for (t, k) in types):
+ if filtered_types or (types and any(k is None for (t, k) in types)):
types_to_fetch = None
else:
types_to_fetch = types
group_to_state_dict = yield self._get_state_groups_from_groups(
- missing_groups, types_to_fetch,
+ missing_groups, types_to_fetch
)
for group, group_state_dict in iteritems(group_to_state_dict):
@@ -595,7 +645,10 @@ class StateGroupWorkerStore(SQLBaseStore):
if types:
for k, v in iteritems(group_state_dict):
(typ, _) = k
- if k in types or (typ, None) in types:
+ if (
+ (k in types or (typ, None) in types) or
+ (filtered_types and typ not in filtered_types)
+ ):
state_dict[k] = v
else:
state_dict.update(group_state_dict)
|