diff options
Diffstat (limited to 'synapse/storage/state.py')
-rw-r--r-- | synapse/storage/state.py | 251 |
1 files changed, 197 insertions, 54 deletions
diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 89a05c4618..dd03c4168b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -21,15 +21,17 @@ from six.moves import range from twisted.internet import defer +from synapse.api.constants import EventTypes +from synapse.api.errors import NotFoundError +from synapse.storage._base import SQLBaseStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine +from synapse.storage.events_worker import EventsWorkerStore from synapse.util.caches import get_cache_factor_for, intern_string from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.stringutils import to_ascii -from ._base import SQLBaseStore - logger = logging.getLogger(__name__) @@ -46,7 +48,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt return len(self.delta_ids) if self.delta_ids else 0 -class StateGroupWorkerStore(SQLBaseStore): +# this inherits from EventsWorkerStore because it calls self.get_events +class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): """The parts of StateGroupStore that can be called from workers. """ @@ -61,6 +64,30 @@ class StateGroupWorkerStore(SQLBaseStore): "*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache") ) + @defer.inlineCallbacks + def get_room_version(self, room_id): + """Get the room_version of a given room + + Args: + room_id (str) + + Returns: + Deferred[str] + + Raises: + NotFoundError if the room is unknown + """ + # for now we do this by looking at the create event. We may want to cache this + # more intelligently in future. + state_ids = yield self.get_current_state_ids(room_id) + create_id = state_ids.get((EventTypes.Create, "")) + + if not create_id: + raise NotFoundError("Unknown room") + + create_event = yield self.get_event(create_id) + defer.returnValue(create_event.content.get("room_version", "1")) + @cached(max_entries=100000, iterable=True) def get_current_state_ids(self, room_id): """Get the current state event ids for a room based on the @@ -89,6 +116,69 @@ class StateGroupWorkerStore(SQLBaseStore): _get_current_state_ids_txn, ) + # FIXME: how should this be cached? + def get_filtered_current_state_ids(self, room_id, types, filtered_types=None): + """Get the current state event of a given type for a room based on the + current_state_events table. This may not be as up-to-date as the result + of doing a fresh state resolution as per state_handler.get_current_state + Args: + room_id (str) + types (list[(Str, (Str|None))]): List of (type, state_key) tuples + which are used to filter the state fetched. `state_key` may be + None, which matches any `state_key` + filtered_types (list[Str]|None): List of types to apply the above filter to. + Returns: + deferred: dict of (type, state_key) -> event + """ + + include_other_types = False if filtered_types is None else True + + def _get_filtered_current_state_ids_txn(txn): + results = {} + sql = """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? %s""" + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) if state_key is not None else ( + "AND type = ?", + (etype,) + ) + for etype, state_key in types + ] + + if include_other_types: + unique_types = set(filtered_types) + clause_to_args.append( + ( + "AND type <> ? " * len(unique_types), + list(unique_types) + ) + ) + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + for where_clause, where_args in clause_to_args: + args = [room_id] + args.extend(where_args) + txn.execute(sql % (where_clause,), args) + for row in txn: + typ, state_key, event_id = row + key = (intern_string(typ), intern_string(state_key)) + results[key] = event_id + return results + + return self.runInteraction( + "get_filtered_current_state_ids", + _get_filtered_current_state_ids_txn, + ) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between @@ -186,7 +276,17 @@ class StateGroupWorkerStore(SQLBaseStore): @defer.inlineCallbacks def _get_state_groups_from_groups(self, groups, types): - """Returns dictionary state_group -> (dict of (type, state_key) -> event id) + """Returns the state groups for a given set of groups, filtering on + types of state events. + + Args: + groups(list[int]): list of state group IDs to query + types (Iterable[str, str|None]|None): list of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. If None, all types are returned. + + Returns: + dictionary state_group -> (dict of (type, state_key) -> event id) """ results = {} @@ -200,8 +300,11 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue(results) - def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + def _get_state_groups_from_groups_txn( + self, txn, groups, types=None, + ): results = {group: {} for group in groups} + if types is not None: types = list(set(types)) # deduplicate types list @@ -239,7 +342,7 @@ class StateGroupWorkerStore(SQLBaseStore): # Turns out that postgres doesn't like doing a list of OR's and # is about 1000x slower, so we just issue a query for each specific # type seperately. - if types: + if types is not None: clause_to_args = [ ( "AND type = ? AND state_key = ?", @@ -278,6 +381,7 @@ class StateGroupWorkerStore(SQLBaseStore): else: where_clauses.append("(type = ? AND state_key = ?)") where_args.extend([typ[0], typ[1]]) + where_clause = "AND (%s)" % (" OR ".join(where_clauses)) else: where_clause = "" @@ -332,27 +436,30 @@ class StateGroupWorkerStore(SQLBaseStore): return results @defer.inlineCallbacks - def get_state_for_events(self, event_ids, types): + def get_state_for_events(self, event_ids, types, filtered_types=None): """Given a list of event_ids and type tuples, return a list of state dicts for each event. The state dicts will only have the type/state_keys that are in the `types` list. Args: - event_ids (list) - types (list): List of (type, state_key) tuples which are used to - filter the state fetched. `state_key` may be None, which matches - any `state_key` + event_ids (list[string]) + types (list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: - deferred: A list of dicts corresponding to the event_ids given. - The dicts are mappings from (type, state_key) -> state_events + deferred: A dict of (event_id) -> (type, state_key) -> [state_events] """ event_to_groups = yield self._get_state_group_for_events( event_ids, ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) state_event_map = yield self.get_events( [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], @@ -371,25 +478,30 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_ids_for_events(self, event_ids, types=None): + def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): """ - Get the state dicts corresponding to a list of events + Get the state dicts corresponding to a list of events, containing the event_ids + of the state events (as opposed to the events themselves) Args: event_ids(list(str)): events whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: - A deferred dict from event_id -> (type, state_key) -> state_event + A deferred dict from event_id -> (type, state_key) -> event_id """ event_to_groups = yield self._get_state_group_for_events( event_ids, ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types) + group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) event_to_state = { event_id: group_to_state[group] @@ -399,37 +511,45 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_for_event(self, event_id, types=None): + def get_state_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_for_events([event_id], types) + state_map = yield self.get_state_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @defer.inlineCallbacks - def get_state_ids_for_event(self, event_id, types=None): + def get_state_ids_for_event(self, event_id, types=None, filtered_types=None): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str, str)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. May be None, which - matches any key + types(list[(str, str|None)]|None): List of (type, state_key) tuples + which are used to filter the state fetched. If `state_key` is None, + all events are returned of the given type. + May be None, which matches any key. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_ids_for_events([event_id], types) + state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types) defer.returnValue(state_map[event_id]) @cached(max_entries=50000) @@ -460,56 +580,73 @@ class StateGroupWorkerStore(SQLBaseStore): defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) - def _get_some_state_from_cache(self, group, types): + def _get_some_state_from_cache(self, group, types, filtered_types=None): """Checks if group is in cache. See `_get_state_for_groups` - Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). - `missing_types` is the list of types that aren't in the cache for that - group. `got_all` is a bool indicating if we successfully retrieved all + Args: + group(int): The state group to lookup + types(list[str, str|None]): List of 2-tuples of the form + (`type`, `state_key`), where a `state_key` of `None` matches all + state_keys for the `type`. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. + + Returns 2-tuple (`state_dict`, `got_all`). + `got_all` is a bool indicating if we successfully retrieved all requests state from the cache, if False we need to query the DB for the missing state. - - Args: - group: The state group to lookup - types (list): List of 2-tuples of the form (`type`, `state_key`), - where a `state_key` of `None` matches all state_keys for the - `type`. """ is_all, known_absent, state_dict_ids = self._state_group_cache.get(group) type_to_key = {} - missing_types = set() + + # tracks whether any of ourrequested types are missing from the cache + missing_types = False for typ, state_key in types: key = (typ, state_key) - if state_key is None: + + if ( + state_key is None or + (filtered_types is not None and typ not in filtered_types) + ): type_to_key[typ] = None - missing_types.add(key) + # we mark the type as missing from the cache because + # when the cache was populated it might have been done with a + # restricted set of state_keys, so the wildcard will not work + # and the cache may be incomplete. + missing_types = True else: if type_to_key.get(typ, object()) is not None: type_to_key.setdefault(typ, set()).add(state_key) if key not in state_dict_ids and key not in known_absent: - missing_types.add(key) + missing_types = True sentinel = object() def include(typ, state_key): valid_state_keys = type_to_key.get(typ, sentinel) if valid_state_keys is sentinel: - return False + return filtered_types is not None and typ not in filtered_types if valid_state_keys is None: return True if state_key in valid_state_keys: return True return False - got_all = is_all or not missing_types + got_all = is_all + if not got_all: + # the cache is incomplete. We may still have got all the results we need, if + # we don't have any wildcards in the match list. + if not missing_types and filtered_types is None: + got_all = True return { k: v for k, v in iteritems(state_dict_ids) if include(k[0], k[1]) - }, missing_types, got_all + }, got_all def _get_all_state_from_cache(self, group): """Checks if group is in cache. See `_get_state_for_groups` @@ -526,7 +663,7 @@ class StateGroupWorkerStore(SQLBaseStore): return state_dict_ids, is_all @defer.inlineCallbacks - def _get_state_for_groups(self, groups, types=None): + def _get_state_for_groups(self, groups, types=None, filtered_types=None): """Gets the state at each of a list of state groups, optionally filtering by type/state_key @@ -540,6 +677,9 @@ class StateGroupWorkerStore(SQLBaseStore): Otherwise, each entry should be a `(type, state_key)` tuple to include in the response. A `state_key` of None is a wildcard meaning that we require all state with that type. + filtered_types(list[str]|None): Only apply filtering via `types` to this + list of event types. Other types of events are returned unfiltered. + If None, `types` filtering is applied to all events. Returns: Deferred[dict[int, dict[(type, state_key), EventBase]]] @@ -551,8 +691,8 @@ class StateGroupWorkerStore(SQLBaseStore): missing_groups = [] if types is not None: for group in set(groups): - state_dict_ids, _, got_all = self._get_some_state_from_cache( - group, types, + state_dict_ids, got_all = self._get_some_state_from_cache( + group, types, filtered_types ) results[group] = state_dict_ids @@ -579,13 +719,13 @@ class StateGroupWorkerStore(SQLBaseStore): # cache. Hence, if we are doing a wildcard lookup, populate the # cache fully so that we can do an efficient lookup next time. - if types and any(k is None for (t, k) in types): + if filtered_types or (types and any(k is None for (t, k) in types)): types_to_fetch = None else: types_to_fetch = types group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, + missing_groups, types_to_fetch ) for group, group_state_dict in iteritems(group_to_state_dict): @@ -595,7 +735,10 @@ class StateGroupWorkerStore(SQLBaseStore): if types: for k, v in iteritems(group_state_dict): (typ, _) = k - if k in types or (typ, None) in types: + if ( + (k in types or (typ, None) in types) or + (filtered_types and typ not in filtered_types) + ): state_dict[k] = v else: state_dict.update(group_state_dict) |