From ae61ade8919085ad0c90e0b54c4e8338998ee64a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 2 Oct 2018 23:33:29 +0100 Subject: Fix bug in forward_extremity update logic An event does not stop being a forward_extremity just because an outlier or rejected event refers to it. --- synapse/storage/events.py | 111 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 33 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e7487311ce..046174edb3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -38,6 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id +from synapse.util import batch_iter from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder @@ -386,12 +387,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ) for room_id, ev_ctx_rm in iteritems(events_by_room): - # Work out new extremities by recursively adding and removing - # the new events. latest_event_ids = yield self.get_latest_event_ids_in_room( room_id ) - new_latest_event_ids = yield self._calculate_new_extremeties( + new_latest_event_ids = yield self._calculate_new_extremities( room_id, ev_ctx_rm, latest_event_ids ) @@ -400,6 +399,12 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore # No change in extremities, so no change in state continue + # there should always be at least one forward extremity. + # (except during the initial persistence of the send_join + # results, in which case there will be no existing + # extremities, so we'll `continue` above and skip this bit.) + assert new_latest_event_ids, "No forward extremities left!" + new_forward_extremeties[room_id] = new_latest_event_ids len_1 = ( @@ -517,44 +522,88 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore ) @defer.inlineCallbacks - def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): - """Calculates the new forward extremeties for a room given events to + def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids): + """Calculates the new forward extremities for a room given events to persist. Assumes that we are only persisting events for one room at a time. """ - new_latest_event_ids = set(latest_event_ids) - # First, add all the new events to the list - new_latest_event_ids.update( - event.event_id for event, ctx in event_contexts + + # we're only interested in new events which aren't outliers and which aren't + # being rejected. + new_events = [ + event for event, ctx in event_contexts if not event.internal_metadata.is_outlier() and not ctx.rejected + ] + + # start with the existing forward extremities + result = set(latest_event_ids) + + # add all the new events to the list + result.update( + event.event_id for event in new_events ) - # Now remove all events that are referenced by the to-be-added events - new_latest_event_ids.difference_update( + + # Now remove all events which are prev_events of any of the new events + result.difference_update( e_id - for event, ctx in event_contexts + for event in new_events for e_id, _ in event.prev_events - if not event.internal_metadata.is_outlier() and not ctx.rejected ) - # And finally remove any events that are referenced by previously added - # events. - rows = yield self._simple_select_many_batch( - table="event_edges", - column="prev_event_id", - iterable=list(new_latest_event_ids), - retcols=["prev_event_id"], - keyvalues={ - "is_state": False, - }, - desc="_calculate_new_extremeties", - ) + # Finally, remove any events which are prev_events of any existing events. + existing_prevs = yield self._get_events_which_are_prevs(result) + result.difference_update(existing_prevs) - new_latest_event_ids.difference_update( - row["prev_event_id"] for row in rows - ) + if not result: + logger.warn( + "Forward extremity list A+B-C-D is now empty in %s. " + "Old extremities (A): %s, new events (B): %s, " + "existing events which are reffed by new events (C): %s, " + "new events which are reffed by existing events (D): %s", + room_id, latest_event_ids, new_events, + [e_id for event in new_events for e_id, _ in event.prev_events], + existing_prevs, + ) + defer.returnValue(result) - defer.returnValue(new_latest_event_ids) + @defer.inlineCallbacks + def _get_events_which_are_prevs(self, event_ids): + """Filter the supplied list of event_ids to get those which are prev_events of + existing (non-outlier) events. + + Args: + event_ids (Iterable[str]): event ids to filter + + Returns: + Deferred[List[str]]: filtered event ids + """ + results = [] + + def _get_events(txn, batch): + sql = """ + SELECT prev_event_id + FROM event_edges + INNER JOIN events USING (event_id) + LEFT JOIN rejections USING (event_id) + WHERE + prev_event_id IN (%s) + AND rejections.event_id IS NULL + """ % ( + ",".join("?" for _ in batch), + ) + + txn.execute(sql, batch) + results.extend(r[0] for r in txn) + + for chunk in batch_iter(event_ids, 100): + yield self.runInteraction( + "_get_events_which_are_prevs", + _get_events, + chunk, + ) + + defer.returnValue(results) @defer.inlineCallbacks def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids, @@ -586,10 +635,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore the new current state is only returned if we've already calculated it. """ - - if not new_latest_event_ids: - return - # map from state_group to ((type, key) -> event_id) state map state_groups_map = {} -- cgit 1.5.1 From 3e39783d5d7ca5da7b3619d0328f6aeec48854de Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 2 Oct 2018 23:44:14 +0100 Subject: remove debugging --- synapse/storage/events.py | 10 ---------- 1 file changed, 10 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 046174edb3..8822dc7bcb 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -555,16 +555,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore existing_prevs = yield self._get_events_which_are_prevs(result) result.difference_update(existing_prevs) - if not result: - logger.warn( - "Forward extremity list A+B-C-D is now empty in %s. " - "Old extremities (A): %s, new events (B): %s, " - "existing events which are reffed by new events (C): %s, " - "new events which are reffed by existing events (D): %s", - room_id, latest_event_ids, new_events, - [e_id for event in new_events for e_id, _ in event.prev_events], - existing_prevs, - ) defer.returnValue(result) @defer.inlineCallbacks -- cgit 1.5.1 From 9693625e556df1af66ba376d49411064c2d0f47e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 3 Oct 2018 10:19:41 +0100 Subject: actually exclude outliers --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 8822dc7bcb..03cedf3a75 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -560,7 +560,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore @defer.inlineCallbacks def _get_events_which_are_prevs(self, event_ids): """Filter the supplied list of event_ids to get those which are prev_events of - existing (non-outlier) events. + existing (non-outlier/rejected) events. Args: event_ids (Iterable[str]): event ids to filter @@ -578,6 +578,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore LEFT JOIN rejections USING (event_id) WHERE prev_event_id IN (%s) + AND NOT events.outlier AND rejections.event_id IS NULL """ % ( ",".join("?" for _ in batch), -- cgit 1.5.1 From 17d585753f1df2b2c2b13ddb8171e174cef97aac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 4 Oct 2018 15:18:52 +0100 Subject: Delete unreferened state groups during purge --- synapse/storage/events.py | 33 +++++++++++++++++++++++++------ synapse/storage/state.py | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 6 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e7487311ce..0fb190530a 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2025,6 +2025,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.info("[purge] finding state groups which depend on redundant" " state groups") remaining_state_groups = [] + unreferenced_state_groups = 0 for i in range(0, len(state_rows), 100): chunk = [sg for sg, in state_rows[i:i + 100]] # look for state groups whose prev_state_group is one we are about @@ -2037,13 +2038,33 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore retcols=["state_group"], keyvalues={}, ) - remaining_state_groups.extend( - row["state_group"] for row in rows - # exclude state groups we are about to delete: no point in - # updating them - if row["state_group"] not in state_groups_to_delete - ) + for row in rows: + sg = row["state_group"] + + if sg in state_groups_to_delete: + # exclude state groups we are about to delete: no point in + # updating them + continue + + if not self._is_state_group_referenced(txn, sg): + # Let's also delete unreferenced state groups while we're + # here, since otherwise we'd need to de-delta them + state_groups_to_delete.add(sg) + unreferenced_state_groups += 1 + continue + + remaining_state_groups.append(sg) + + logger.info( + "[purge] found %i extra unreferenced state groups to delete", + unreferenced_state_groups, + ) + + logger.info( + "[purge] de-delta-ing %i remaining state groups", + len(remaining_state_groups), + ) # Now we turn the state groups that reference to-be-deleted state # groups to non delta versions. diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f4cbd61c4..b88c7dc091 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1041,6 +1041,56 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count + def _is_state_group_referenced(self, txn, state_group): + """Checks if a given state group is referenced, or is safe to delete. + + A state groups is referenced if it or any of its descendants are + pointed at by an event. (A descendant is a group which has the given + state_group as a prev group) + """ + + # We check this by doing a depth first search to look for any + # descendant referenced by `event_to_state_groups`. + + # State groups we need to check, contains state groups that are + # descendants of `state_group` + state_groups_to_search = [state_group] + + # Set of state groups we've already checked + state_groups_searched = set() + + while state_groups_to_search: + state_group = state_groups_to_search.pop() # Next state group to check + + is_referenced = self._simple_select_one_onecol_txn( + txn, + table="event_to_state_groups", + keyvalues={"state_group": state_group}, + retcol="event_id", + allow_none=True, + ) + if is_referenced: + # A descendant is referenced by event_to_state_groups, so + # original state group is referenced. + return True + + state_groups_searched.add(state_group) + + # Find all children of current state group and add to search + references = self._simple_select_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"prev_state_group": state_group}, + retcol="state_group", + ) + state_groups_to_search.extend(references) + + # Lets be paranoid and check for cycles + if state_groups_searched.intersection(references): + raise Exception("State group %s has cyclic dependency", state_group) + + return False + class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): """ Keeps track of the state at a given event. -- cgit 1.5.1 From e238013c44714174f1bf9a2b9b1f576728f40784 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Sep 2018 15:13:17 +0100 Subject: Add v2 state res algorithm. We hook this up to the vdh test room version. --- synapse/state/__init__.py | 87 ++++++-- synapse/state/v2.py | 544 ++++++++++++++++++++++++++++++++++++++++++++++ synapse/storage/events.py | 9 +- 3 files changed, 616 insertions(+), 24 deletions(-) create mode 100644 synapse/state/v2.py (limited to 'synapse/storage/events.py') diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index b22495c1f9..836e137296 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -19,13 +19,14 @@ from collections import namedtuple from six import iteritems, itervalues +import attr from frozendict import frozendict from twisted.internet import defer from synapse.api.constants import EventTypes, RoomVersions from synapse.events.snapshot import EventContext -from synapse.state import v1 +from synapse.state import v1, v2 from synapse.util.async_helpers import Linearizer from synapse.util.caches import get_cache_factor_for from synapse.util.caches.expiringcache import ExpiringCache @@ -372,15 +373,10 @@ class StateHandler(object): result = yield self._state_resolution_handler.resolve_state_groups( room_id, room_version, state_groups_ids, None, - self._state_map_factory, + state_res_store=StateResolutionStore(self.store), ) defer.returnValue(result) - def _state_map_factory(self, ev_ids): - return self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - @defer.inlineCallbacks def resolve_events(self, room_version, state_sets, event): logger.info( @@ -401,7 +397,7 @@ class StateHandler(object): new_state = yield resolve_events_with_factory( room_version, state_set_ids, event_map=state_map, - state_map_factory=self._state_map_factory + state_res_store=StateResolutionStore(self.store), ) new_state = { @@ -436,7 +432,7 @@ class StateResolutionHandler(object): @defer.inlineCallbacks @log_function def resolve_state_groups( - self, room_id, room_version, state_groups_ids, event_map, state_map_factory, + self, room_id, room_version, state_groups_ids, event_map, state_res_store, ): """Resolves conflicts between a set of state groups @@ -454,9 +450,11 @@ class StateResolutionHandler(object): a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be used as a starting point fof finding the state we need; any missing - events will be requested via state_map_factory. + events will be requested via state_res_store. + + If None, all events will be fetched via state_res_store. - If None, all events will be fetched via state_map_factory. + state_res_store (StateResolutionStore) Returns: Deferred[_StateCacheEntry]: resolved state @@ -502,7 +500,7 @@ class StateResolutionHandler(object): room_version, list(itervalues(state_groups_ids)), event_map=event_map, - state_map_factory=state_map_factory, + state_res_store=state_res_store, ) # if the new state matches any of the input state groups, we can @@ -583,7 +581,7 @@ def _make_state_cache_entry( ) -def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory): +def resolve_events_with_factory(room_version, state_sets, event_map, state_res_store): """ Args: room_version(str): Version of the room @@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f If None, all events will be fetched via state_map_factory. - state_map_factory(func): will be called - with a list of event_ids that are needed, and should return with - a Deferred of dict of event_id to event. + state_res_store (StateResolutionStore) Returns Deferred[dict[(str, str), str]]: a map from (type, state_key) to event_id. """ - if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,): + if room_version == RoomVersions.V1: return v1.resolve_events_with_factory( - state_sets, event_map, state_map_factory, + state_sets, event_map, state_res_store.get_events, + ) + elif room_version == RoomVersions.VDH_TEST: + return v2.resolve_events_with_factory( + state_sets, event_map, state_res_store, ) else: # This should only happen if we added a version but forgot to add it to @@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f raise Exception( "No state resolution algorithm defined for version %r" % (room_version,) ) + + +@attr.s +class StateResolutionStore(object): + """Interface that allows state resolution algorithms to access the database + in well defined way. + + Args: + store (DataStore) + """ + + store = attr.ib() + + def get_events(self, event_ids, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred[dict[str, FrozenEvent]]: Dict from event_id to event. + """ + + return self.store.get_events( + event_ids, + check_redacted=False, + get_prev_content=False, + allow_rejected=allow_rejected, + ) + + def get_auth_chain(self, event_ids): + """Gets the full auth chain for a set of events (including rejected + events). + + Includes the given event IDs in the result. + + Note that: + 1. All events must be state events. + 2. For v1 rooms this may not have the full auth chain in the + presence of rejected events + + Args: + event_ids (list): The event IDs of the events to fetch the auth + chain for. Must be state events. + + Returns: + Deferred[list[str]]: List of event IDs of the auth chain. + """ + + return self.store.get_auth_chain_ids(event_ids, include_given=True) diff --git a/synapse/state/v2.py b/synapse/state/v2.py new file mode 100644 index 0000000000..d034b4f38e --- /dev/null +++ b/synapse/state/v2.py @@ -0,0 +1,544 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import heapq +import itertools +import logging + +from six import iteritems, itervalues + +from twisted.internet import defer + +from synapse import event_auth +from synapse.api.constants import EventTypes +from synapse.api.errors import AuthError + +logger = logging.getLogger(__name__) + + +@defer.inlineCallbacks +def resolve_events_with_factory(state_sets, event_map, state_res_store): + """Resolves the state using the v2 state resolution algorithm + + Args: + state_sets(list): List of dicts of (type, state_key) -> event_id, + which are the different state groups to resolve. + + event_map(dict[str,FrozenEvent]|None): + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_res_store. + + If None, all events will be fetched via state_res_store. + + state_res_store (StateResolutionStore) + + Returns + Deferred[dict[(str, str), str]]: + a map from (type, state_key) to event_id. + """ + + logger.debug("Computing conflicted state") + + # First split up the un/conflicted state + unconflicted_state, conflicted_state = _seperate(state_sets) + + if not conflicted_state: + defer.returnValue(unconflicted_state) + + logger.debug("%d conflicted state entries", len(conflicted_state)) + logger.debug("Calculating auth chain difference") + + # Also fetch all auth events that appear in only some of the state sets' + # auth chains. + auth_diff = yield _get_auth_chain_difference( + state_sets, event_map, state_res_store, + ) + + full_conflicted_set = set(itertools.chain( + itertools.chain.from_iterable(itervalues(conflicted_state)), + auth_diff, + )) + + events = yield state_res_store.get_events([ + eid for eid in full_conflicted_set + if eid not in event_map + ], allow_rejected=True) + event_map.update(events) + + full_conflicted_set = set(eid for eid in full_conflicted_set if eid in event_map) + + logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) + + # Get and sort all the power events (kicks/bans/etc) + power_events = ( + eid for eid in full_conflicted_set + if _is_power_event(event_map[eid]) + ) + + sorted_power_events = yield _reverse_topological_power_sort( + power_events, + event_map, + state_res_store, + full_conflicted_set, + ) + + logger.debug("sorted %d power events", len(sorted_power_events)) + + # Now sequentially auth each one + resolved_state = yield _iterative_auth_checks( + sorted_power_events, unconflicted_state, event_map, + state_res_store, + ) + + logger.debug("resolved power events") + + # OK, so we've now resolved the power events. Now sort the remaining + # events using the mainline of the resolved power level. + + leftover_events = [ + ev_id + for ev_id in full_conflicted_set + if ev_id not in sorted_power_events + ] + + logger.debug("sorting %d remaining events", len(leftover_events)) + + pl = resolved_state.get((EventTypes.PowerLevels, ""), None) + leftover_events = yield _mainline_sort( + leftover_events, pl, event_map, state_res_store, + ) + + logger.debug("resolving remaining events") + + resolved_state = yield _iterative_auth_checks( + leftover_events, resolved_state, event_map, + state_res_store, + ) + + logger.debug("resolved") + + # We make sure that unconflicted state always still applies. + resolved_state.update(unconflicted_state) + + logger.debug("done") + + defer.returnValue(resolved_state) + + +@defer.inlineCallbacks +def _get_power_level_for_sender(event_id, event_map, state_res_store): + """Return the power level of the sender of the given event according to + their auth events. + + Args: + event_id (str) + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + + Returns: + Deferred[int] + """ + event = yield _get_event(event_id, event_map, state_res_store) + + pl = None + for aid, _ in event.auth_events: + aev = yield _get_event(aid, event_map, state_res_store) + if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): + pl = aev + break + + if pl is None: + # Couldn't find power level. Check if they're the creator of the room + for aid, _ in event.auth_events: + aev = yield _get_event(aid, event_map, state_res_store) + if (aev.type, aev.state_key) == (EventTypes.Create, ""): + if aev.content.get("creator") == event.sender: + defer.returnValue(100) + break + defer.returnValue(0) + + level = pl.content.get("users", {}).get(event.sender) + if level is None: + level = pl.content.get("users_default", 0) + + if level is None: + defer.returnValue(0) + else: + defer.returnValue(int(level)) + + +@defer.inlineCallbacks +def _get_auth_chain_difference(state_sets, event_map, state_res_store): + """Compare the auth chains of each state set and return the set of events + that only appear in some but not all of the auth chains. + + Args: + state_sets (list) + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + + Returns: + Deferred[set[str]]: Set of event IDs + """ + common = set(itervalues(state_sets[0])).intersection( + *(itervalues(s) for s in state_sets[1:]) + ) + + auth_sets = [] + for state_set in state_sets: + auth_ids = set( + eid + for key, eid in iteritems(state_set) + if (key[0] in ( + EventTypes.Member, + EventTypes.ThirdPartyInvite, + ) or key in ( + (EventTypes.PowerLevels, ''), + (EventTypes.Create, ''), + (EventTypes.JoinRules, ''), + )) and eid not in common + ) + + auth_chain = yield state_res_store.get_auth_chain(auth_ids) + auth_ids.update(auth_chain) + + auth_sets.append(auth_ids) + + intersection = set(auth_sets[0]).intersection(*auth_sets[1:]) + union = set().union(*auth_sets) + + defer.returnValue(union - intersection) + + +def _seperate(state_sets): + """Return the unconflicted and conflicted state. This is different than in + the original algorithm, as this defines a key to be conflicted if one of + the state sets doesn't have that key. + + Args: + state_sets (list) + + Returns: + tuple[dict, dict]: A tuple of unconflicted and conflicted state. The + conflicted state dict is a map from type/state_key to set of event IDs + """ + unconflicted_state = {} + conflicted_state = {} + + for key in set(itertools.chain.from_iterable(state_sets)): + event_ids = set(state_set.get(key) for state_set in state_sets) + if len(event_ids) == 1: + unconflicted_state[key] = event_ids.pop() + else: + event_ids.discard(None) + conflicted_state[key] = event_ids + + return unconflicted_state, conflicted_state + + +def _is_power_event(event): + """Return whether or not the event is a "power event", as defined by the + v2 state resolution algorithm + + Args: + event (FrozenEvent) + + Returns: + boolean + """ + if (event.type, event.state_key) in ( + (EventTypes.PowerLevels, ""), + (EventTypes.JoinRules, ""), + (EventTypes.Create, ""), + ): + return True + + if event.type == EventTypes.Member: + if event.membership in ('leave', 'ban'): + return event.sender != event.state_key + + return False + + +@defer.inlineCallbacks +def _add_event_and_auth_chain_to_graph(graph, event_id, event_map, + state_res_store, auth_diff): + """Helper function for _reverse_topological_power_sort that add the event + and its auth chain (that is in the auth diff) to the graph + + Args: + graph (dict[str, set[str]]): A map from event ID to the events auth + event IDs + event_id (str): Event to add to the graph + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + auth_diff (set[str]): Set of event IDs that are in the auth difference. + """ + + state = [event_id] + while state: + eid = state.pop() + graph.setdefault(eid, set()) + + event = yield _get_event(eid, event_map, state_res_store) + for aid, _ in event.auth_events: + if aid in auth_diff: + if aid not in graph: + state.append(aid) + + graph.setdefault(eid, set()).add(aid) + + +@defer.inlineCallbacks +def _reverse_topological_power_sort(event_ids, event_map, state_res_store, auth_diff): + """Returns a list of the event_ids sorted by reverse topological ordering, + and then by power level and origin_server_ts + + Args: + event_ids (list[str]): The events to sort + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + auth_diff (set[str]): Set of event IDs that are in the auth difference. + + Returns: + Deferred[list[str]]: The sorted list + """ + + graph = {} + for event_id in event_ids: + yield _add_event_and_auth_chain_to_graph( + graph, event_id, event_map, state_res_store, auth_diff, + ) + + event_to_pl = {} + for event_id in graph: + pl = yield _get_power_level_for_sender(event_id, event_map, state_res_store) + event_to_pl[event_id] = pl + + def _get_power_order(event_id): + ev = event_map[event_id] + pl = event_to_pl[event_id] + + return -pl, ev.origin_server_ts, event_id + + # Note: graph is modified during the sort + it = lexicographical_topological_sort( + graph, + key=_get_power_order, + ) + sorted_events = list(it) + + defer.returnValue(sorted_events) + + +@defer.inlineCallbacks +def _iterative_auth_checks(event_ids, base_state, event_map, state_res_store): + """Sequentially apply auth checks to each event in given list, updating the + state as it goes along. + + Args: + event_ids (list[str]): Ordered list of events to apply auth checks to + base_state (dict[tuple[str, str], str]): The set of state to start with + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + + Returns: + Deferred[dict[tuple[str, str], str]]: Returns the final updated state + """ + resolved_state = base_state.copy() + + for event_id in event_ids: + event = event_map[event_id] + + auth_events = {} + for aid, _ in event.auth_events: + ev = yield _get_event(aid, event_map, state_res_store) + + if ev.rejected_reason is None: + auth_events[(ev.type, ev.state_key)] = ev + + for key in event_auth.auth_types_for_event(event): + if key in resolved_state: + ev_id = resolved_state[key] + ev = yield _get_event(ev_id, event_map, state_res_store) + + if ev.rejected_reason is None: + auth_events[key] = event_map[ev_id] + + try: + event_auth.check( + event, auth_events, + do_sig_check=False, + do_size_check=False + ) + + resolved_state[(event.type, event.state_key)] = event_id + except AuthError: + pass + + defer.returnValue(resolved_state) + + +@defer.inlineCallbacks +def _mainline_sort(event_ids, resolved_power_event_id, event_map, + state_res_store): + """Returns a sorted list of event_ids sorted by mainline ordering based on + the given event resolved_power_event_id + + Args: + event_ids (list[str]): Events to sort + resolved_power_event_id (str): The final resolved power level event ID + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + + Returns: + Deferred[list[str]]: The sorted list + """ + mainline = [] + pl = resolved_power_event_id + while pl: + mainline.append(pl) + pl_ev = yield _get_event(pl, event_map, state_res_store) + auth_events = pl_ev.auth_events + pl = None + for aid, _ in auth_events: + ev = yield _get_event(aid, event_map, state_res_store) + if (ev.type, ev.state_key) == (EventTypes.PowerLevels, ""): + pl = aid + break + + mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))} + + event_ids = list(event_ids) + + order_map = {} + for ev_id in event_ids: + depth = yield _get_mainline_depth_for_event( + event_map[ev_id], mainline_map, + event_map, state_res_store, + ) + order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id) + + event_ids.sort(key=lambda ev_id: order_map[ev_id]) + + defer.returnValue(event_ids) + + +@defer.inlineCallbacks +def _get_mainline_depth_for_event(event, mainline_map, event_map, state_res_store): + """Get the mainline depths for the given event based on the mainline map + + Args: + event (FrozenEvent) + mainline_map (dict[str, int]): Map from event_id to mainline depth for + events in the mainline. + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + + Returns: + Deferred[int] + """ + + # We do an iterative search, replacing `event with the power level in its + # auth events (if any) + while event: + depth = mainline_map.get(event.event_id) + if depth is not None: + defer.returnValue(depth) + + auth_events = event.auth_events + event = None + + for aid, _ in auth_events: + aev = yield _get_event(aid, event_map, state_res_store) + if (aev.type, aev.state_key) == (EventTypes.PowerLevels, ""): + event = aev + break + + # Didn't find a power level auth event, so we just return 0 + defer.returnValue(0) + + +@defer.inlineCallbacks +def _get_event(event_id, event_map, state_res_store): + """Helper function to look up event in event_map, falling back to looking + it up in the store + + Args: + event_id (str) + event_map (dict[str,FrozenEvent]) + state_res_store (StateResolutionStore) + + Returns: + Deferred[FrozenEvent] + """ + if event_id not in event_map: + events = yield state_res_store.get_events([event_id], allow_rejected=True) + event_map.update(events) + defer.returnValue(event_map[event_id]) + + +def lexicographical_topological_sort(graph, key): + """Performs a lexicographic reverse topological sort on the graph. + + This returns a reverse topological sort (i.e. if node A references B then B + appears before A in the sort), with ties broken lexicographically based on + return value of the `key` function. + + NOTE: `graph` is modified during the sort. + + Args: + graph (dict[str, set[str]]): A representation of the graph where each + node is a key in the dict and its value are the nodes edges. + key (func): A function that takes a node and returns a value that is + comparable and used to order nodes + + Yields: + str: The next node in the topological sort + """ + + # Note, this is basically Kahn's algorithm except we look at nodes with no + # outgoing edges, c.f. + # https://en.wikipedia.org/wiki/Topological_sorting#Kahn's_algorithm + outdegree_map = graph + reverse_graph = {} + + # Lists of nodes with zero out degree. Is actually a tuple of + # `(key(node), node)` so that sorting does the right thing + zero_outdegree = [] + + for node, edges in iteritems(graph): + if len(edges) == 0: + zero_outdegree.append((key(node), node)) + + reverse_graph.setdefault(node, set()) + for edge in edges: + reverse_graph.setdefault(edge, set()).add(node) + + # heapq is a built in implementation of a sorted queue. + heapq.heapify(zero_outdegree) + + while zero_outdegree: + _, node = heapq.heappop(zero_outdegree) + + for parent in reverse_graph[node]: + out = outdegree_map[parent] + out.discard(node) + if len(out) == 0: + heapq.heappush(zero_outdegree, (key(parent), parent)) + + yield node diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 03cedf3a75..fc88edcb39 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.state import StateResolutionStore from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore @@ -731,11 +732,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore # Ok, we need to defer to the state handler to resolve our state sets. - def get_events(ev_ids): - return self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - state_groups = { sg: state_groups_map[sg] for sg in new_state_groups } @@ -745,7 +741,8 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.debug("calling resolve_state_groups from preserve_events") res = yield self._state_resolution_handler.resolve_state_groups( - room_id, room_version, state_groups, events_map, get_events + room_id, room_version, state_groups, events_map, + state_res_store=StateResolutionStore(self) ) defer.returnValue((res.state, None)) -- cgit 1.5.1 From 4a28d3d36fa21446e60a2a5142626a385e1f27af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 Oct 2018 14:01:53 +0100 Subject: Update event_auth table for rejected events --- synapse/storage/events.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index fc88edcb39..c780f55277 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -851,6 +851,27 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) + # We want to store event_auth mappings for rejected events, as they're + # used in state res v2. + # This is only necessary if the rejected event appears in an accepted + # event's auth chain, but its easier for now just to store them (and + # it doesn't take much storage compared to storing the entire event + # anyway). + self._simple_insert_many_txn( + txn, + table="event_auth", + values=[ + { + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + } + for event, _ in events_and_contexts + for auth_id, _ in event.auth_events + if event.is_state() + ], + ) + # _store_rejected_events_txn filters out any events which were # rejected, and returns the filtered list. events_and_contexts = self._store_rejected_events_txn( @@ -1326,21 +1347,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore txn, event.room_id, event.redacts ) - self._simple_insert_many_txn( - txn, - table="event_auth", - values=[ - { - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - } - for event, _ in events_and_contexts - for auth_id, _ in event.auth_events - if event.is_state() - ], - ) - # Update the event_forward_extremities, event_backward_extremities and # event_edges tables. self._handle_mult_prev_events( -- cgit 1.5.1 From 47a9da28caa6a4f27d2df31f043971a2c9c7b555 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Oct 2018 20:43:18 +0100 Subject: Batch process handling state groups --- synapse/storage/events.py | 81 +++++++++------------------------ synapse/storage/state.py | 112 +++++++++++++++++++++++++++++----------------- 2 files changed, 92 insertions(+), 101 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0fb190530a..e4d0f8b1a9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -37,6 +37,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore +from synapse.storage.state import StateGroupWorkerStore from synapse.types import RoomStreamToken, get_domain_from_id from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks @@ -203,7 +204,8 @@ def _retry_on_integrity_error(func): # inherits from EventFederationStore so that we can call _update_backward_extremities # and _handle_mult_prev_events (though arguably those could both be moved in here) -class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore): +class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore, + BackgroundUpdateStore): EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" @@ -1995,70 +1997,29 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.info("[purge] finding redundant state groups") - # Get all state groups that are only referenced by events that are - # to be deleted. - # This works by first getting state groups that we may want to delete, - # joining against event_to_state_groups to get events that use that - # state group, then left joining against events_to_purge again. Any - # state group where the left join produce *no nulls* are referenced - # only by events that are going to be purged. + # Get all state groups that are referenced by events that are to be + # deleted. We then go and check if they are referenced by other events + # or state groups, and if not we delete them. txn.execute(""" - SELECT state_group FROM - ( - SELECT DISTINCT state_group FROM events_to_purge - INNER JOIN event_to_state_groups USING (event_id) - ) AS sp - INNER JOIN event_to_state_groups USING (state_group) - LEFT JOIN events_to_purge AS ep USING (event_id) - GROUP BY state_group - HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0 + SELECT DISTINCT state_group FROM events_to_purge + INNER JOIN event_to_state_groups USING (event_id) """) - state_rows = txn.fetchall() - logger.info("[purge] found %i redundant state groups", len(state_rows)) - - # make a set of the redundant state groups, so that we can look them up - # efficiently - state_groups_to_delete = set([sg for sg, in state_rows]) - - # Now we get all the state groups that rely on these state groups - logger.info("[purge] finding state groups which depend on redundant" - " state groups") - remaining_state_groups = [] - unreferenced_state_groups = 0 - for i in range(0, len(state_rows), 100): - chunk = [sg for sg, in state_rows[i:i + 100]] - # look for state groups whose prev_state_group is one we are about - # to delete - rows = self._simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=chunk, - retcols=["state_group"], - keyvalues={}, - ) - - for row in rows: - sg = row["state_group"] - - if sg in state_groups_to_delete: - # exclude state groups we are about to delete: no point in - # updating them - continue + referenced_state_groups = set(sg for sg, in txn) + logger.info( + "[purge] found %i referenced state groups", + len(referenced_state_groups), + ) - if not self._is_state_group_referenced(txn, sg): - # Let's also delete unreferenced state groups while we're - # here, since otherwise we'd need to de-delta them - state_groups_to_delete.add(sg) - unreferenced_state_groups += 1 - continue + logger.info("[purge] finding state groups that can be deleted") - remaining_state_groups.append(sg) + state_groups_to_delete, remaining_state_groups = self._find_unreferenced_groups( + txn, referenced_state_groups, + ) logger.info( - "[purge] found %i extra unreferenced state groups to delete", - unreferenced_state_groups, + "[purge] found %i state groups to delete", + len(state_groups_to_delete), ) logger.info( @@ -2109,11 +2070,11 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore logger.info("[purge] removing redundant state groups") txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", - state_rows + ((sg,) for sg in state_groups_to_delete), ) txn.executemany( "DELETE FROM state_groups WHERE id = ?", - state_rows + ((sg,) for sg in state_groups_to_delete), ) logger.info("[purge] removing events from event_to_state_groups") diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f7cf5c86c9..0f86311ed4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1041,55 +1041,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count - def _is_state_group_referenced(self, txn, state_group): - """Checks if a given state group is referenced, or is safe to delete. + def _find_unreferenced_groups(self, txn, state_groups): + """Used when purging history to figure out which state groups can be + deleted and which need to be de-delta'ed (due to one of its prev groups + being scheduled for deletion). - A state group is referenced if it or any of its descendants are - pointed at by an event. (A descendant is a state_group whose chain of - prev_groups includes the given state_group.) - """ - - # We check this by doing a depth first search to look for any - # descendant referenced by `event_to_state_groups`. - - # State groups we need to check, contains state groups that are - # descendants of `state_group` - state_groups_to_search = [state_group] - - # Set of state groups we've already checked - state_groups_searched = set() - - while state_groups_to_search: - state_group = state_groups_to_search.pop() # Next state group to check + Args: + txn + state_groups (set[int]): Set of state groups referenced by events + that are going to be deleted. - is_referenced = self._simple_select_one_onecol_txn( + Returns: + tuple[set[int], set[int]]: The set of state groups that can be + deleted and the set of state groups that need to be de-delta'ed + """ + # Graph of state group -> previous group + graph = {} + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + lst = list(next_to_search) + current_search = set(lst[:100]) + next_to_search = set(lst[100:]) + + # Check if state groups are referenced + sql = """ + SELECT state_group, count(*) FROM event_to_state_groups + LEFT JOIN events_to_purge AS ep USING (event_id) + WHERE state_group IN (%s) AND ep.event_id IS NULL + GROUP BY state_group + """ % (",".join("?" for _ in current_search),) + txn.execute(sql, list(current_search)) + + referenced = set(sg for sg, cnt in txn if cnt > 0) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + rows = self._simple_select_many_txn( txn, - table="event_to_state_groups", - keyvalues={"state_group": state_group}, - retcol="event_id", - allow_none=True, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("prev_state_group", "state_group",), ) - if is_referenced: - # A descendant is referenced by event_to_state_groups, so - # original state group is referenced. - return True - state_groups_searched.add(state_group) + next_to_search.update(row["state_group"] for row in rows) + # We don't bother re-handling groups we've already seen + next_to_search -= state_groups_seen + state_groups_seen |= next_to_search - # Find all children of current state group and add to search - references = self._simple_select_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"prev_state_group": state_group}, - retcol="state_group", - ) - state_groups_to_search.extend(references) + for row in rows: + # Note: Each state group can have at most one prev group + graph[row["state_group"]] = row["prev_state_group"] + + to_delete = state_groups_seen - referenced_groups - # Lets be paranoid and check for cycles - if state_groups_searched.intersection(references): - raise Exception("State group %s has cyclic dependency", state_group) + to_dedelta = set() + for sg in referenced_groups: + prev_sg = graph.get(sg) + if prev_sg and prev_sg in to_delete: + to_dedelta.add(sg) - return False + return to_delete, to_dedelta class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): -- cgit 1.5.1 From 67f7b9cb50c246226b94027e3c1d4b958ff9f840 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Oct 2018 16:06:59 +0100 Subject: pep8 --- synapse/storage/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index af822fb69d..379b5c514f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2041,7 +2041,7 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore INNER JOIN event_to_state_groups USING (event_id) """) - referenced_state_groups = set(sg for sg, in txn) + referenced_state_groups = set(sg for sg, in txn) logger.info( "[purge] found %i referenced state groups", len(referenced_state_groups), -- cgit 1.5.1 From cb53ce9d6429252d5ee012f5a476cc834251c27d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 Oct 2018 17:49:55 +0100 Subject: Refactor state group lookup to reduce DB hits (#4011) Currently when fetching state groups from the data store we make two hits two the database: once for members and once for non-members (unless request is filtered to one or the other). This adds needless load to the datbase, so this PR refactors the lookup to make only a single database hit. --- changelog.d/4011.misc | 1 + synapse/handlers/initial_sync.py | 4 +- synapse/handlers/message.py | 20 +- synapse/handlers/pagination.py | 15 +- synapse/handlers/room.py | 22 +- synapse/handlers/sync.py | 97 ++--- synapse/rest/client/v1/room.py | 3 +- synapse/storage/events.py | 2 +- synapse/storage/state.py | 845 ++++++++++++++++++++++++--------------- synapse/visibility.py | 15 +- tests/storage/test_state.py | 175 +++++--- 11 files changed, 714 insertions(+), 485 deletions(-) create mode 100644 changelog.d/4011.misc (limited to 'synapse/storage/events.py') diff --git a/changelog.d/4011.misc b/changelog.d/4011.misc new file mode 100644 index 0000000000..ad7768c4cd --- /dev/null +++ b/changelog.d/4011.misc @@ -0,0 +1 @@ +Reduce database load when fetching state groups diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index e009395207..563bb3cea3 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -156,7 +156,7 @@ class InitialSyncHandler(BaseHandler): room_end_token = "s%d" % (event.stream_ordering,) deferred_room_state = run_in_background( self.store.get_state_for_events, - [event.event_id], None, + [event.event_id], ) deferred_room_state.addCallback( lambda states: states[event.event_id] @@ -301,7 +301,7 @@ class InitialSyncHandler(BaseHandler): def _room_initial_sync_parted(self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking): room_state = yield self.store.get_state_for_events( - [member_event_id], None + [member_event_id], ) room_state = room_state[member_event_id] diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6c4fcfb10a..969e588e73 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -35,6 +35,7 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.storage.state import StateFilter from synapse.types import RoomAlias, UserID from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder @@ -80,7 +81,7 @@ class MessageHandler(object): elif membership == Membership.LEAVE: key = (event_type, state_key) room_state = yield self.store.get_state_for_events( - [membership_event_id], [key] + [membership_event_id], StateFilter.from_types([key]) ) data = room_state[membership_event_id].get(key) @@ -88,7 +89,7 @@ class MessageHandler(object): @defer.inlineCallbacks def get_state_events( - self, user_id, room_id, types=None, filtered_types=None, + self, user_id, room_id, state_filter=StateFilter.all(), at_token=None, is_guest=False, ): """Retrieve all state events for a given room. If the user is @@ -100,13 +101,8 @@ class MessageHandler(object): Args: user_id(str): The user requesting state events. room_id(str): The room ID to get all state events from. - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. at_token(StreamToken|None): the stream token of the at which we are requesting the stats. If the user is not allowed to view the state as of that stream token, we raise a 403 SynapseError. If None, returns the current @@ -139,7 +135,7 @@ class MessageHandler(object): event = last_events[0] if visible_events: room_state = yield self.store.get_state_for_events( - [event.event_id], types, filtered_types=filtered_types, + [event.event_id], state_filter=state_filter, ) room_state = room_state[event.event_id] else: @@ -158,12 +154,12 @@ class MessageHandler(object): if membership == Membership.JOIN: state_ids = yield self.store.get_filtered_current_state_ids( - room_id, types, filtered_types=filtered_types, + room_id, state_filter=state_filter, ) room_state = yield self.store.get_events(state_ids.values()) elif membership == Membership.LEAVE: room_state = yield self.store.get_state_for_events( - [membership_event_id], types, filtered_types=filtered_types, + [membership_event_id], state_filter=state_filter, ) room_state = room_state[membership_event_id] diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index a155b6e938..43f81bd607 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -21,6 +21,7 @@ from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError from synapse.events.utils import serialize_event +from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import ReadWriteLock from synapse.util.logcontext import run_in_background @@ -255,16 +256,14 @@ class PaginationHandler(object): if event_filter and event_filter.lazy_load_members(): # TODO: remove redundant members - types = [ - (EventTypes.Member, state_key) - for state_key in set( - event.sender # FIXME: we also care about invite targets etc. - for event in events - ) - ] + # FIXME: we also care about invite targets etc. + state_filter = StateFilter.from_types( + (EventTypes.Member, event.sender) + for event in events + ) state_ids = yield self.store.get_state_ids_for_event( - events[0].event_id, types=types, + events[0].event_id, state_filter=state_filter, ) if state_ids: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ab1571b27b..3ba92bdb4c 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -33,6 +33,7 @@ from synapse.api.constants import ( RoomCreationPreset, ) from synapse.api.errors import AuthError, Codes, StoreError, SynapseError +from synapse.storage.state import StateFilter from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils from synapse.visibility import filter_events_for_client @@ -489,23 +490,24 @@ class RoomContextHandler(object): else: last_event_id = event_id - types = None - filtered_types = None if event_filter and event_filter.lazy_load_members(): - members = set(ev.sender for ev in itertools.chain( - results["events_before"], - (results["event"],), - results["events_after"], - )) - filtered_types = [EventTypes.Member] - types = [(EventTypes.Member, member) for member in members] + state_filter = StateFilter.from_lazy_load_member_list( + ev.sender + for ev in itertools.chain( + results["events_before"], + (results["event"],), + results["events_after"], + ) + ) + else: + state_filter = StateFilter.all() # XXX: why do we return the state as of the last event rather than the # first? Shouldn't we be consistent with /sync? # https://github.com/matrix-org/matrix-doc/issues/687 state = yield self.store.get_state_for_events( - [last_event_id], types, filtered_types=filtered_types, + [last_event_id], state_filter=state_filter, ) results["state"] = list(state[last_event_id].values()) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 351892a94f..09739f2862 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -27,6 +27,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary +from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -469,25 +470,20 @@ class SyncHandler(object): )) @defer.inlineCallbacks - def get_state_after_event(self, event, types=None, filtered_types=None): + def get_state_after_event(self, event, state_filter=StateFilter.all()): """ Get the room state after the given event Args: event(synapse.events.EventBase): event of interest - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A Deferred map from ((type, state_key)->Event) """ state_ids = yield self.store.get_state_ids_for_event( - event.event_id, types, filtered_types=filtered_types, + event.event_id, state_filter=state_filter, ) if event.is_state(): state_ids = state_ids.copy() @@ -495,18 +491,14 @@ class SyncHandler(object): defer.returnValue(state_ids) @defer.inlineCallbacks - def get_state_at(self, room_id, stream_position, types=None, filtered_types=None): + def get_state_at(self, room_id, stream_position, state_filter=StateFilter.all()): """ Get the room state at a particular stream position Args: room_id(str): room for which to get state stream_position(StreamToken): point at which to get state - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A Deferred map from ((type, state_key)->Event) @@ -522,7 +514,7 @@ class SyncHandler(object): if last_events: last_event = last_events[-1] state = yield self.get_state_after_event( - last_event, types, filtered_types=filtered_types, + last_event, state_filter=state_filter, ) else: @@ -563,10 +555,11 @@ class SyncHandler(object): last_event = last_events[-1] state_ids = yield self.store.get_state_ids_for_event( - last_event.event_id, [ + last_event.event_id, + state_filter=StateFilter.from_types([ (EventTypes.Name, ''), (EventTypes.CanonicalAlias, ''), - ] + ]), ) # this is heavily cached, thus: fast. @@ -717,8 +710,7 @@ class SyncHandler(object): with Measure(self.clock, "compute_state_delta"): - types = None - filtered_types = None + members_to_fetch = None lazy_load_members = sync_config.filter_collection.lazy_load_members() include_redundant_members = ( @@ -729,16 +721,21 @@ class SyncHandler(object): # We only request state for the members needed to display the # timeline: - types = [ - (EventTypes.Member, state_key) - for state_key in set( - event.sender # FIXME: we also care about invite targets etc. - for event in batch.events - ) - ] + members_to_fetch = set( + event.sender # FIXME: we also care about invite targets etc. + for event in batch.events + ) + + if full_state: + # always make sure we LL ourselves so we know we're in the room + # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 + # We only need apply this on full state syncs given we disabled + # LL for incr syncs in #3840. + members_to_fetch.add(sync_config.user.to_string()) - # only apply the filtering to room members - filtered_types = [EventTypes.Member] + state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) + else: + state_filter = StateFilter.all() timeline_state = { (event.type, event.state_key): event.event_id @@ -746,28 +743,19 @@ class SyncHandler(object): } if full_state: - if lazy_load_members: - # always make sure we LL ourselves so we know we're in the room - # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 - # We only need apply this on full state syncs given we disabled - # LL for incr syncs in #3840. - types.append((EventTypes.Member, sync_config.user.to_string())) - if batch: current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types, - filtered_types=filtered_types, + batch.events[-1].event_id, state_filter=state_filter, ) state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types, - filtered_types=filtered_types, + batch.events[0].event_id, state_filter=state_filter, ) else: current_state_ids = yield self.get_state_at( - room_id, stream_position=now_token, types=types, - filtered_types=filtered_types, + room_id, stream_position=now_token, + state_filter=state_filter, ) state_ids = current_state_ids @@ -781,8 +769,7 @@ class SyncHandler(object): ) elif batch.limited: state_at_timeline_start = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types, - filtered_types=filtered_types, + batch.events[0].event_id, state_filter=state_filter, ) # for now, we disable LL for gappy syncs - see @@ -797,17 +784,15 @@ class SyncHandler(object): # members to just be ones which were timeline senders, which then ensures # all of the rest get included in the state block (if we need to know # about them). - types = None - filtered_types = None + state_filter = StateFilter.all() state_at_previous_sync = yield self.get_state_at( - room_id, stream_position=since_token, types=types, - filtered_types=filtered_types, + room_id, stream_position=since_token, + state_filter=state_filter, ) current_state_ids = yield self.store.get_state_ids_for_event( - batch.events[-1].event_id, types=types, - filtered_types=filtered_types, + batch.events[-1].event_id, state_filter=state_filter, ) state_ids = _calculate_state( @@ -821,7 +806,7 @@ class SyncHandler(object): else: state_ids = {} if lazy_load_members: - if types and batch.events: + if members_to_fetch and batch.events: # We're returning an incremental sync, with no # "gap" since the previous sync, so normally there would be # no state to return. @@ -831,8 +816,12 @@ class SyncHandler(object): # timeline here, and then dedupe any redundant ones below. state_ids = yield self.store.get_state_ids_for_event( - batch.events[0].event_id, types=types, - filtered_types=None, # we only want members! + batch.events[0].event_id, + # we only want members! + state_filter=StateFilter.from_types( + (EventTypes.Member, member) + for member in members_to_fetch + ), ) if lazy_load_members and not include_redundant_members: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 663934efd0..fcfe7857f6 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -33,6 +33,7 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string, ) +from synapse.storage.state import StateFilter from synapse.streams.config import PaginationConfig from synapse.types import RoomAlias, RoomID, StreamToken, ThirdPartyInstanceID, UserID @@ -409,7 +410,7 @@ class RoomMemberListRestServlet(ClientV1RestServlet): room_id=room_id, user_id=requester.user.to_string(), at_token=at_token, - types=[(EventTypes.Member, None)], + state_filter=StateFilter.from_types([(EventTypes.Member, None)]), ) chunk = [] diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c780f55277..8881b009df 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2089,7 +2089,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore for sg in remaining_state_groups: logger.info("[purge] de-delta-ing remaining state group %s", sg) curr_state = self._get_state_groups_from_groups_txn( - txn, [sg], types=None + txn, [sg], ) curr_state = curr_state[sg] diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 3f4cbd61c4..ef65929bb2 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -19,6 +19,8 @@ from collections import namedtuple from six import iteritems, itervalues from six.moves import range +import attr + from twisted.internet import defer from synapse.api.constants import EventTypes @@ -48,6 +50,318 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt return len(self.delta_ids) if self.delta_ids else 0 +@attr.s(slots=True) +class StateFilter(object): + """A filter used when querying for state. + + Attributes: + types (dict[str, set[str]|None]): Map from type to set of state keys (or + None). This specifies which state_keys for the given type to fetch + from the DB. If None then all events with that type are fetched. If + the set is empty then no events with that type are fetched. + include_others (bool): Whether to fetch events with types that do not + appear in `types`. + """ + + types = attr.ib() + include_others = attr.ib(default=False) + + def __attrs_post_init__(self): + # If `include_others` is set we canonicalise the filter by removing + # wildcards from the types dictionary + if self.include_others: + self.types = { + k: v for k, v in iteritems(self.types) + if v is not None + } + + @staticmethod + def all(): + """Creates a filter that fetches everything. + + Returns: + StateFilter + """ + return StateFilter(types={}, include_others=True) + + @staticmethod + def none(): + """Creates a filter that fetches nothing. + + Returns: + StateFilter + """ + return StateFilter(types={}, include_others=False) + + @staticmethod + def from_types(types): + """Creates a filter that only fetches the given types + + Args: + types (Iterable[tuple[str, str|None]]): A list of type and state + keys to fetch. A state_key of None fetches everything for + that type + + Returns: + StateFilter + """ + type_dict = {} + for typ, s in types: + if typ in type_dict: + if type_dict[typ] is None: + continue + + if s is None: + type_dict[typ] = None + continue + + type_dict.setdefault(typ, set()).add(s) + + return StateFilter(types=type_dict) + + @staticmethod + def from_lazy_load_member_list(members): + """Creates a filter that returns all non-member events, plus the member + events for the given users + + Args: + members (iterable[str]): Set of user IDs + + Returns: + StateFilter + """ + return StateFilter( + types={EventTypes.Member: set(members)}, + include_others=True, + ) + + def return_expanded(self): + """Creates a new StateFilter where type wild cards have been removed + (except for memberships). The returned filter is a superset of the + current one, i.e. anything that passes the current filter will pass + the returned filter. + + This helps the caching as the DictionaryCache knows if it has *all* the + state, but does not know if it has all of the keys of a particular type, + which makes wildcard lookups expensive unless we have a complete cache. + Hence, if we are doing a wildcard lookup, populate the cache fully so + that we can do an efficient lookup next time. + + Note that since we have two caches, one for membership events and one for + other events, we can be a bit more clever than simply returning + `StateFilter.all()` if `has_wildcards()` is True. + + We return a StateFilter where: + 1. the list of membership events to return is the same + 2. if there is a wildcard that matches non-member events we + return all non-member events + + Returns: + StateFilter + """ + + if self.is_full(): + # If we're going to return everything then there's nothing to do + return self + + if not self.has_wildcards(): + # If there are no wild cards, there's nothing to do + return self + + if EventTypes.Member in self.types: + get_all_members = self.types[EventTypes.Member] is None + else: + get_all_members = self.include_others + + has_non_member_wildcard = self.include_others or any( + state_keys is None + for t, state_keys in iteritems(self.types) + if t != EventTypes.Member + ) + + if not has_non_member_wildcard: + # If there are no non-member wild cards we can just return ourselves + return self + + if get_all_members: + # We want to return everything. + return StateFilter.all() + else: + # We want to return all non-members, but only particular + # memberships + return StateFilter( + types={EventTypes.Member: self.types[EventTypes.Member]}, + include_others=True, + ) + + def make_sql_filter_clause(self): + """Converts the filter to an SQL clause. + + For example: + + f = StateFilter.from_types([("m.room.create", "")]) + clause, args = f.make_sql_filter_clause() + clause == "(type = ? AND state_key = ?)" + args == ['m.room.create', ''] + + + Returns: + tuple[str, list]: The SQL string (may be empty) and arguments. An + empty SQL string is returned when the filter matches everything + (i.e. is "full"). + """ + + where_clause = "" + where_args = [] + + if self.is_full(): + return where_clause, where_args + + if not self.include_others and not self.types: + # i.e. this is an empty filter, so we need to return a clause that + # will match nothing + return "1 = 2", [] + + # First we build up a lost of clauses for each type/state_key combo + clauses = [] + for etype, state_keys in iteritems(self.types): + if state_keys is None: + clauses.append("(type = ?)") + where_args.append(etype) + continue + + for state_key in state_keys: + clauses.append("(type = ? AND state_key = ?)") + where_args.extend((etype, state_key)) + + # This will match anything that appears in `self.types` + where_clause = " OR ".join(clauses) + + # If we want to include stuff that's not in the types dict then we add + # a `OR type NOT IN (...)` clause to the end. + if self.include_others: + if where_clause: + where_clause += " OR " + + where_clause += "type NOT IN (%s)" % ( + ",".join(["?"] * len(self.types)), + ) + where_args.extend(self.types) + + return where_clause, where_args + + def max_entries_returned(self): + """Returns the maximum number of entries this filter will return if + known, otherwise returns None. + + For example a simple state filter asking for `("m.room.create", "")` + will return 1, whereas the default state filter will return None. + + This is used to bail out early if the right number of entries have been + fetched. + """ + if self.has_wildcards(): + return None + + return len(self.concrete_types()) + + def filter_state(self, state_dict): + """Returns the state filtered with by this StateFilter + + Args: + state (dict[tuple[str, str], Any]): The state map to filter + + Returns: + dict[tuple[str, str], Any]: The filtered state map + """ + if self.is_full(): + return dict(state_dict) + + filtered_state = {} + for k, v in iteritems(state_dict): + typ, state_key = k + if typ in self.types: + state_keys = self.types[typ] + if state_keys is None or state_key in state_keys: + filtered_state[k] = v + elif self.include_others: + filtered_state[k] = v + + return filtered_state + + def is_full(self): + """Whether this filter fetches everything or not + + Returns: + bool + """ + return self.include_others and not self.types + + def has_wildcards(self): + """Whether the filter includes wildcards or is attempting to fetch + specific state. + + Returns: + bool + """ + + return ( + self.include_others + or any( + state_keys is None + for state_keys in itervalues(self.types) + ) + ) + + def concrete_types(self): + """Returns a list of concrete type/state_keys (i.e. not None) that + will be fetched. This will be a complete list if `has_wildcards` + returns False, but otherwise will be a subset (or even empty). + + Returns: + list[tuple[str,str]] + """ + return [ + (t, s) + for t, state_keys in iteritems(self.types) + if state_keys is not None + for s in state_keys + ] + + def get_member_split(self): + """Return the filter split into two: one which assumes it's exclusively + matching against member state, and one which assumes it's matching + against non member state. + + This is useful due to the returned filters giving correct results for + `is_full()`, `has_wildcards()`, etc, when operating against maps that + either exclusively contain member events or only contain non-member + events. (Which is the case when dealing with the member vs non-member + state caches). + + Returns: + tuple[StateFilter, StateFilter]: The member and non member filters + """ + + if EventTypes.Member in self.types: + state_keys = self.types[EventTypes.Member] + if state_keys is None: + member_filter = StateFilter.all() + else: + member_filter = StateFilter({EventTypes.Member: state_keys}) + elif self.include_others: + member_filter = StateFilter.all() + else: + member_filter = StateFilter.none() + + non_member_filter = StateFilter( + types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member}, + include_others=self.include_others, + ) + + return member_filter, non_member_filter + + # this inherits from EventsWorkerStore because it calls self.get_events class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): """The parts of StateGroupStore that can be called from workers. @@ -152,61 +466,41 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) # FIXME: how should this be cached? - def get_filtered_current_state_ids(self, room_id, types, filtered_types=None): + def get_filtered_current_state_ids(self, room_id, state_filter=StateFilter.all()): """Get the current state event of a given type for a room based on the current_state_events table. This may not be as up-to-date as the result of doing a fresh state resolution as per state_handler.get_current_state + Args: room_id (str) - types (list[(Str, (Str|None))]): List of (type, state_key) tuples - which are used to filter the state fetched. `state_key` may be - None, which matches any `state_key` - filtered_types (list[Str]|None): List of types to apply the above filter to. + state_filter (StateFilter): The state filter used to fetch state + from the database. + Returns: - deferred: dict of (type, state_key) -> event + Deferred[dict[tuple[str, str], str]]: Map from type/state_key to + event ID. """ - include_other_types = False if filtered_types is None else True - def _get_filtered_current_state_ids_txn(txn): results = {} - sql = """SELECT type, state_key, event_id FROM current_state_events - WHERE room_id = ? %s""" - # Turns out that postgres doesn't like doing a list of OR's and - # is about 1000x slower, so we just issue a query for each specific - # type seperately. - if types: - clause_to_args = [ - ( - "AND type = ? AND state_key = ?", - (etype, state_key) - ) if state_key is not None else ( - "AND type = ?", - (etype,) - ) - for etype, state_key in types - ] - - if include_other_types: - unique_types = set(filtered_types) - clause_to_args.append( - ( - "AND type <> ? " * len(unique_types), - list(unique_types) - ) - ) - else: - # If types is None we fetch all the state, and so just use an - # empty where clause with no extra args. - clause_to_args = [("", [])] - for where_clause, where_args in clause_to_args: - args = [room_id] - args.extend(where_args) - txn.execute(sql % (where_clause,), args) - for row in txn: - typ, state_key, event_id = row - key = (intern_string(typ), intern_string(state_key)) - results[key] = event_id + sql = """ + SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? + """ + + where_clause, where_args = state_filter.make_sql_filter_clause() + + if where_clause: + sql += " AND (%s)" % (where_clause,) + + args = [room_id] + args.extend(where_args) + txn.execute(sql, args) + for row in txn: + typ, state_key, event_id = row + key = (intern_string(typ), intern_string(state_key)) + results[key] = event_id + return results return self.runInteraction( @@ -322,20 +616,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): }) @defer.inlineCallbacks - def _get_state_groups_from_groups(self, groups, types, members=None): + def _get_state_groups_from_groups(self, groups, state_filter): """Returns the state groups for a given set of groups, filtering on types of state events. Args: groups(list[int]): list of state group IDs to query - types (Iterable[str, str|None]|None): list of 2-tuples of the form - (`type`, `state_key`), where a `state_key` of `None` matches all - state_keys for the `type`. If None, all types are returned. - members (bool|None): If not None, then, in addition to any filtering - implied by types, the results are also filtered to only include - member events (if True), or to exclude member events (if False) - - Returns: + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: Deferred[dict[int, dict[tuple[str, str], str]]]: dict of state_group_id -> (dict of (type, state_key) -> event id) @@ -346,19 +634,23 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - self._get_state_groups_from_groups_txn, chunk, types, members, + self._get_state_groups_from_groups_txn, chunk, state_filter, ) results.update(res) defer.returnValue(results) def _get_state_groups_from_groups_txn( - self, txn, groups, types=None, members=None, + self, txn, groups, state_filter=StateFilter.all(), ): results = {group: {} for group in groups} - if types is not None: - types = list(set(types)) # deduplicate types list + where_clause, where_args = state_filter.make_sql_filter_clause() + + # Unless the filter clause is empty, we're going to append it after an + # existing where clause + if where_clause: + where_clause = " AND (%s)" % (where_clause,) if isinstance(self.database_engine, PostgresEngine): # Temporarily disable sequential scans in this transaction. This is @@ -374,79 +666,33 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # group for the given type, state_key. # This may return multiple rows per (type, state_key), but last_value # should be the same. - sql = (""" + sql = """ WITH RECURSIVE state(state_group) AS ( VALUES(?::bigint) UNION ALL SELECT prev_state_group FROM state_group_edges e, state s WHERE s.state_group = e.state_group ) - SELECT type, state_key, last_value(event_id) OVER ( + SELECT DISTINCT type, state_key, last_value(event_id) OVER ( PARTITION BY type, state_key ORDER BY state_group ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS event_id FROM state_groups_state WHERE state_group IN ( SELECT state_group FROM state ) - %s - """) + """ - if members is True: - sql += " AND type = '%s'" % (EventTypes.Member,) - elif members is False: - sql += " AND type <> '%s'" % (EventTypes.Member,) - - # Turns out that postgres doesn't like doing a list of OR's and - # is about 1000x slower, so we just issue a query for each specific - # type seperately. - if types is not None: - clause_to_args = [ - ( - "AND type = ? AND state_key = ?", - (etype, state_key) - ) if state_key is not None else ( - "AND type = ?", - (etype,) - ) - for etype, state_key in types - ] - else: - # If types is None we fetch all the state, and so just use an - # empty where clause with no extra args. - clause_to_args = [("", [])] - - for where_clause, where_args in clause_to_args: - for group in groups: - args = [group] - args.extend(where_args) + for group in groups: + args = [group] + args.extend(where_args) - txn.execute(sql % (where_clause,), args) - for row in txn: - typ, state_key, event_id = row - key = (typ, state_key) - results[group][key] = event_id + txn.execute(sql + where_clause, args) + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[group][key] = event_id else: - where_args = [] - where_clauses = [] - wildcard_types = False - if types is not None: - for typ in types: - if typ[1] is None: - where_clauses.append("(type = ?)") - where_args.append(typ[0]) - wildcard_types = True - else: - where_clauses.append("(type = ? AND state_key = ?)") - where_args.extend([typ[0], typ[1]]) - - where_clause = "AND (%s)" % (" OR ".join(where_clauses)) - else: - where_clause = "" - - if members is True: - where_clause += " AND type = '%s'" % EventTypes.Member - elif members is False: - where_clause += " AND type <> '%s'" % EventTypes.Member + max_entries_returned = state_filter.max_entries_returned() # We don't use WITH RECURSIVE on sqlite3 as there are distributions # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) @@ -460,12 +706,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # without the right indices (which we can't add until # after we finish deduping state, which requires this func) args = [next_group] - if types: - args.extend(where_args) + args.extend(where_args) txn.execute( "SELECT type, state_key, event_id FROM state_groups_state" - " WHERE state_group = ? %s" % (where_clause,), + " WHERE state_group = ? " + where_clause, args ) results[group].update( @@ -481,9 +726,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # wildcards (i.e. Nones) in which case we have to do an exhaustive # search if ( - types is not None and - not wildcard_types and - len(results[group]) == len(types) + max_entries_returned is not None and + len(results[group]) == max_entries_returned ): break @@ -498,20 +742,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return results @defer.inlineCallbacks - def get_state_for_events(self, event_ids, types, filtered_types=None): + def get_state_for_events(self, event_ids, state_filter=StateFilter.all()): """Given a list of event_ids and type tuples, return a list of state - dicts for each event. The state dicts will only have the type/state_keys - that are in the `types` list. + dicts for each event. Args: event_ids (list[string]) - types (list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: deferred: A dict of (event_id) -> (type, state_key) -> [state_events] @@ -521,7 +759,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) + group_to_state = yield self._get_state_for_groups(groups, state_filter) state_event_map = yield self.get_events( [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)], @@ -540,20 +778,15 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): + def get_state_ids_for_events(self, event_ids, state_filter=StateFilter.all()): """ Get the state dicts corresponding to a list of events, containing the event_ids of the state events (as opposed to the events themselves) Args: event_ids(list(str)): events whose state should be returned - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A deferred dict from event_id -> (type, state_key) -> event_id @@ -563,7 +796,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) groups = set(itervalues(event_to_groups)) - group_to_state = yield self._get_state_for_groups(groups, types, filtered_types) + group_to_state = yield self._get_state_for_groups(groups, state_filter) event_to_state = { event_id: group_to_state[group] @@ -573,45 +806,35 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue({event: event_to_state[event] for event in event_ids}) @defer.inlineCallbacks - def get_state_for_event(self, event_id, types=None, filtered_types=None): + def get_state_for_event(self, event_id, state_filter=StateFilter.all()): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_for_events([event_id], types, filtered_types) + state_map = yield self.get_state_for_events([event_id], state_filter) defer.returnValue(state_map[event_id]) @defer.inlineCallbacks - def get_state_ids_for_event(self, event_id, types=None, filtered_types=None): + def get_state_ids_for_event(self, event_id, state_filter=StateFilter.all()): """ Get the state dict corresponding to a particular event Args: event_id(str): event whose state should be returned - types(list[(str, str|None)]|None): List of (type, state_key) tuples - which are used to filter the state fetched. If `state_key` is None, - all events are returned of the given type. - May be None, which matches any key. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: A deferred dict from (type, state_key) -> state_event """ - state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types) + state_map = yield self.get_state_ids_for_events([event_id], state_filter) defer.returnValue(state_map[event_id]) @cached(max_entries=50000) @@ -642,18 +865,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): defer.returnValue({row["event_id"]: row["state_group"] for row in rows}) - def _get_some_state_from_cache(self, cache, group, types, filtered_types=None): + def _get_state_for_group_using_cache(self, cache, group, state_filter): """Checks if group is in cache. See `_get_state_for_groups` Args: cache(DictionaryCache): the state group cache to use group(int): The state group to lookup - types(list[str, str|None]): List of 2-tuples of the form - (`type`, `state_key`), where a `state_key` of `None` matches all - state_keys for the `type`. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool indicating if we successfully retrieved all @@ -662,124 +881,102 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): """ is_all, known_absent, state_dict_ids = cache.get(group) - type_to_key = {} + if is_all or state_filter.is_full(): + # Either we have everything or want everything, either way + # `is_all` tells us whether we've gotten everything. + return state_filter.filter_state(state_dict_ids), is_all # tracks whether any of our requested types are missing from the cache missing_types = False - for typ, state_key in types: - key = (typ, state_key) - - if ( - state_key is None or - (filtered_types is not None and typ not in filtered_types) - ): - type_to_key[typ] = None - # we mark the type as missing from the cache because - # when the cache was populated it might have been done with a - # restricted set of state_keys, so the wildcard will not work - # and the cache may be incomplete. - missing_types = True - else: - if type_to_key.get(typ, object()) is not None: - type_to_key.setdefault(typ, set()).add(state_key) - + if state_filter.has_wildcards(): + # We don't know if we fetched all the state keys for the types in + # the filter that are wildcards, so we have to assume that we may + # have missed some. + missing_types = True + else: + # There aren't any wild cards, so `concrete_types()` returns the + # complete list of event types we're wanting. + for key in state_filter.concrete_types(): if key not in state_dict_ids and key not in known_absent: missing_types = True + break - sentinel = object() - - def include(typ, state_key): - valid_state_keys = type_to_key.get(typ, sentinel) - if valid_state_keys is sentinel: - return filtered_types is not None and typ not in filtered_types - if valid_state_keys is None: - return True - if state_key in valid_state_keys: - return True - return False - - got_all = is_all - if not got_all: - # the cache is incomplete. We may still have got all the results we need, if - # we don't have any wildcards in the match list. - if not missing_types and filtered_types is None: - got_all = True - - return { - k: v for k, v in iteritems(state_dict_ids) - if include(k[0], k[1]) - }, got_all - - def _get_all_state_from_cache(self, cache, group): - """Checks if group is in cache. See `_get_state_for_groups` - - Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool - indicating if we successfully retrieved all requests state from the - cache, if False we need to query the DB for the missing state. - - Args: - cache(DictionaryCache): the state group cache to use - group: The state group to lookup - """ - is_all, _, state_dict_ids = cache.get(group) - - return state_dict_ids, is_all + return state_filter.filter_state(state_dict_ids), not missing_types @defer.inlineCallbacks - def _get_state_for_groups(self, groups, types=None, filtered_types=None): + def _get_state_for_groups(self, groups, state_filter=StateFilter.all()): """Gets the state at each of a list of state groups, optionally filtering by type/state_key Args: groups (iterable[int]): list of state groups for which we want to get the state. - types (None|iterable[(str, None|str)]): - indicates the state type/keys required. If None, the whole - state is fetched and returned. - - Otherwise, each entry should be a `(type, state_key)` tuple to - include in the response. A `state_key` of None is a wildcard - meaning that we require all state with that type. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. - + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: Deferred[dict[int, dict[tuple[str, str], str]]]: dict of state_group_id -> (dict of (type, state_key) -> event id) """ - if types is not None: - non_member_types = [t for t in types if t[0] != EventTypes.Member] - if filtered_types is not None and EventTypes.Member not in filtered_types: - # we want all of the membership events - member_types = None - else: - member_types = [t for t in types if t[0] == EventTypes.Member] - - else: - non_member_types = None - member_types = None + member_filter, non_member_filter = state_filter.get_member_split() - non_member_state = yield self._get_state_for_groups_using_cache( - groups, self._state_group_cache, non_member_types, filtered_types, + # Now we look them up in the member and non-member caches + non_member_state, incomplete_groups_nm, = ( + yield self._get_state_for_groups_using_cache( + groups, self._state_group_cache, + state_filter=non_member_filter, + ) ) - # XXX: we could skip this entirely if member_types is [] - member_state = yield self._get_state_for_groups_using_cache( - # we set filtered_types=None as member_state only ever contain members. - groups, self._state_group_members_cache, member_types, None, + + member_state, incomplete_groups_m, = ( + yield self._get_state_for_groups_using_cache( + groups, self._state_group_members_cache, + state_filter=member_filter, + ) ) - state = non_member_state + state = dict(non_member_state) for group in groups: state[group].update(member_state[group]) + # Now fetch any missing groups from the database + + incomplete_groups = incomplete_groups_m | incomplete_groups_nm + + if not incomplete_groups: + defer.returnValue(state) + + cache_sequence_nm = self._state_group_cache.sequence + cache_sequence_m = self._state_group_members_cache.sequence + + # Help the cache hit ratio by expanding the filter a bit + db_state_filter = state_filter.return_expanded() + + group_to_state_dict = yield self._get_state_groups_from_groups( + list(incomplete_groups), + state_filter=db_state_filter, + ) + + # Now lets update the caches + self._insert_into_cache( + group_to_state_dict, + db_state_filter, + cache_seq_num_members=cache_sequence_m, + cache_seq_num_non_members=cache_sequence_nm, + ) + + # And finally update the result dict, by filtering out any extra + # stuff we pulled out of the database. + for group, group_state_dict in iteritems(group_to_state_dict): + # We just replace any existing entries, as we will have loaded + # everything we need from the database anyway. + state[group] = state_filter.filter_state(group_state_dict) + defer.returnValue(state) - @defer.inlineCallbacks def _get_state_for_groups_using_cache( - self, groups, cache, types=None, filtered_types=None + self, groups, cache, state_filter, ): """Gets the state at each of a list of state groups, optionally filtering by type/state_key, querying from a specific cache. @@ -790,89 +987,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): cache (DictionaryCache): the cache of group ids to state dicts which we will pass through - either the normal state cache or the specific members state cache. - types (None|iterable[(str, None|str)]): - indicates the state type/keys required. If None, the whole - state is fetched and returned. - - Otherwise, each entry should be a `(type, state_key)` tuple to - include in the response. A `state_key` of None is a wildcard - meaning that we require all state with that type. - filtered_types(list[str]|None): Only apply filtering via `types` to this - list of event types. Other types of events are returned unfiltered. - If None, `types` filtering is applied to all events. + state_filter (StateFilter): The state filter used to fetch state + from the database. Returns: - Deferred[dict[int, dict[tuple[str, str], str]]]: - dict of state_group_id -> (dict of (type, state_key) -> event id) + tuple[dict[int, dict[tuple[str, str], str]], set[int]]: Tuple of + dict of state_group_id -> (dict of (type, state_key) -> event id) + of entries in the cache, and the state group ids either missing + from the cache or incomplete. """ - if types: - types = frozenset(types) results = {} - missing_groups = [] - if types is not None: - for group in set(groups): - state_dict_ids, got_all = self._get_some_state_from_cache( - cache, group, types, filtered_types - ) - results[group] = state_dict_ids + incomplete_groups = set() + for group in set(groups): + state_dict_ids, got_all = self._get_state_for_group_using_cache( + cache, group, state_filter + ) + results[group] = state_dict_ids - if not got_all: - missing_groups.append(group) - else: - for group in set(groups): - state_dict_ids, got_all = self._get_all_state_from_cache( - cache, group - ) + if not got_all: + incomplete_groups.add(group) - results[group] = state_dict_ids + return results, incomplete_groups - if not got_all: - missing_groups.append(group) + def _insert_into_cache(self, group_to_state_dict, state_filter, + cache_seq_num_members, cache_seq_num_non_members): + """Inserts results from querying the database into the relevant cache. - if missing_groups: - # Okay, so we have some missing_types, let's fetch them. - cache_seq_num = cache.sequence + Args: + group_to_state_dict (dict): The new entries pulled from database. + Map from state group to state dict + state_filter (StateFilter): The state filter used to fetch state + from the database. + cache_seq_num_members (int): Sequence number of member cache since + last lookup in cache + cache_seq_num_non_members (int): Sequence number of member cache since + last lookup in cache + """ - # the DictionaryCache knows if it has *all* the state, but - # does not know if it has all of the keys of a particular type, - # which makes wildcard lookups expensive unless we have a complete - # cache. Hence, if we are doing a wildcard lookup, populate the - # cache fully so that we can do an efficient lookup next time. + # We need to work out which types we've fetched from the DB for the + # member vs non-member caches. This should be as accurate as possible, + # but can be an underestimate (e.g. when we have wild cards) - if filtered_types or (types and any(k is None for (t, k) in types)): - types_to_fetch = None - else: - types_to_fetch = types + member_filter, non_member_filter = state_filter.get_member_split() + if member_filter.is_full(): + # We fetched all member events + member_types = None + else: + # `concrete_types()` will only return a subset when there are wild + # cards in the filter, but that's fine. + member_types = member_filter.concrete_types() - group_to_state_dict = yield self._get_state_groups_from_groups( - missing_groups, types_to_fetch, cache == self._state_group_members_cache, - ) + if non_member_filter.is_full(): + # We fetched all non member events + non_member_types = None + else: + non_member_types = non_member_filter.concrete_types() + + for group, group_state_dict in iteritems(group_to_state_dict): + state_dict_members = {} + state_dict_non_members = {} - for group, group_state_dict in iteritems(group_to_state_dict): - state_dict = results[group] - - # update the result, filtering by `types`. - if types: - for k, v in iteritems(group_state_dict): - (typ, _) = k - if ( - (k in types or (typ, None) in types) or - (filtered_types and typ not in filtered_types) - ): - state_dict[k] = v + for k, v in iteritems(group_state_dict): + if k[0] == EventTypes.Member: + state_dict_members[k] = v else: - state_dict.update(group_state_dict) - - # update the cache with all the things we fetched from the - # database. - cache.update( - cache_seq_num, - key=group, - value=group_state_dict, - fetched_keys=types_to_fetch, - ) + state_dict_non_members[k] = v - defer.returnValue(results) + self._state_group_members_cache.update( + cache_seq_num_members, + key=group, + value=state_dict_members, + fetched_keys=member_types, + ) + + self._state_group_cache.update( + cache_seq_num_non_members, + key=group, + value=state_dict_non_members, + fetched_keys=non_member_types, + ) def store_state_group(self, event_id, room_id, prev_group, delta_ids, current_state_ids): @@ -1181,12 +1374,12 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): continue prev_state = self._get_state_groups_from_groups_txn( - txn, [prev_group], types=None + txn, [prev_group], ) prev_state = prev_state[prev_group] curr_state = self._get_state_groups_from_groups_txn( - txn, [state_group], types=None + txn, [state_group], ) curr_state = curr_state[state_group] diff --git a/synapse/visibility.py b/synapse/visibility.py index 43f48196be..0281a7c919 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -23,6 +23,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events.utils import prune_event +from synapse.storage.state import StateFilter from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) @@ -72,7 +73,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, ) event_id_to_state = yield store.get_state_for_events( frozenset(e.event_id for e in events), - types=types, + state_filter=StateFilter.from_types(types), ) ignore_dict_content = yield store.get_global_account_data_by_type_for_user( @@ -273,8 +274,8 @@ def filter_events_for_server(store, server_name, events): # need to check membership (as we know the server is in the room). event_to_state_ids = yield store.get_state_ids_for_events( frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), + state_filter=StateFilter.from_types( + types=((EventTypes.RoomHistoryVisibility, ""),), ) ) @@ -314,9 +315,11 @@ def filter_events_for_server(store, server_name, events): # of the history vis and membership state at those events. event_to_state_ids = yield store.get_state_ids_for_events( frozenset(e.event_id for e in events), - types=( - (EventTypes.RoomHistoryVisibility, ""), - (EventTypes.Member, None), + state_filter=StateFilter.from_types( + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ), ) ) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index b9c5b39d59..086a39d834 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -18,6 +18,7 @@ import logging from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.storage.state import StateFilter from synapse.types import RoomID, UserID import tests.unittest @@ -148,7 +149,7 @@ class StateStoreTestCase(tests.unittest.TestCase): # check we get the full state as of the final event state = yield self.store.get_state_for_event( - e5.event_id, None, filtered_types=None + e5.event_id, ) self.assertIsNotNone(e4) @@ -166,33 +167,35 @@ class StateStoreTestCase(tests.unittest.TestCase): # check we can filter to the m.room.name event (with a '' state key) state = yield self.store.get_state_for_event( - e5.event_id, [(EventTypes.Name, '')], filtered_types=None + e5.event_id, StateFilter.from_types([(EventTypes.Name, '')]) ) self.assertStateMapEqual({(e2.type, e2.state_key): e2}, state) # check we can filter to the m.room.name event (with a wildcard None state key) state = yield self.store.get_state_for_event( - e5.event_id, [(EventTypes.Name, None)], filtered_types=None + e5.event_id, StateFilter.from_types([(EventTypes.Name, None)]) ) self.assertStateMapEqual({(e2.type, e2.state_key): e2}, state) # check we can grab the m.room.member events (with a wildcard None state key) state = yield self.store.get_state_for_event( - e5.event_id, [(EventTypes.Member, None)], filtered_types=None + e5.event_id, StateFilter.from_types([(EventTypes.Member, None)]) ) self.assertStateMapEqual( {(e3.type, e3.state_key): e3, (e5.type, e5.state_key): e5}, state ) - # check we can use filtered_types to grab a specific room member - # without filtering out the other event types + # check we can grab a specific room member without filtering out the + # other event types state = yield self.store.get_state_for_event( e5.event_id, - [(EventTypes.Member, self.u_alice.to_string())], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: {self.u_alice.to_string()}}, + include_others=True, + ) ) self.assertStateMapEqual( @@ -204,10 +207,12 @@ class StateStoreTestCase(tests.unittest.TestCase): state, ) - # check that types=[], filtered_types=[EventTypes.Member] - # doesn't return all members + # check that we can grab everything except members state = yield self.store.get_state_for_event( - e5.event_id, [], filtered_types=[EventTypes.Member] + e5.event_id, state_filter=StateFilter( + types={EventTypes.Member: set()}, + include_others=True, + ), ) self.assertStateMapEqual( @@ -215,16 +220,21 @@ class StateStoreTestCase(tests.unittest.TestCase): ) ####################################################### - # _get_some_state_from_cache tests against a full cache + # _get_state_for_group_using_cache tests against a full cache ####################################################### room_id = self.room.to_string() group_ids = yield self.store.get_state_groups_ids(room_id, [e5.event_id]) group = list(group_ids.keys())[0] - # test _get_some_state_from_cache correctly filters out members with types=[] - (state_dict, is_all) = yield self.store._get_some_state_from_cache( - self.store._state_group_cache, group, [], filtered_types=[EventTypes.Member] + # test _get_state_for_group_using_cache correctly filters out members + # with types=[] + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( + self.store._state_group_cache, group, + state_filter=StateFilter( + types={EventTypes.Member: set()}, + include_others=True, + ), ) self.assertEqual(is_all, True) @@ -236,22 +246,27 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict, ) - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: set()}, + include_others=True, + ), ) self.assertEqual(is_all, True) self.assertDictEqual({}, state_dict) - # test _get_some_state_from_cache correctly filters in members with wildcard types - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + # test _get_state_for_group_using_cache correctly filters in members + # with wildcard types + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_cache, group, - [(EventTypes.Member, None)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: None}, + include_others=True, + ), ) self.assertEqual(is_all, True) @@ -263,11 +278,13 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict, ) - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [(EventTypes.Member, None)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: None}, + include_others=True, + ), ) self.assertEqual(is_all, True) @@ -280,12 +297,15 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict, ) - # test _get_some_state_from_cache correctly filters in members with specific types - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + # test _get_state_for_group_using_cache correctly filters in members + # with specific types + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=True, + ), ) self.assertEqual(is_all, True) @@ -297,23 +317,27 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict, ) - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=True, + ), ) self.assertEqual(is_all, True) self.assertDictEqual({(e5.type, e5.state_key): e5.event_id}, state_dict) - # test _get_some_state_from_cache correctly filters in members with specific types - # and no filtered_types - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + # test _get_state_for_group_using_cache correctly filters in members + # with specific types + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=None, + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=False, + ), ) self.assertEqual(is_all, True) @@ -357,42 +381,54 @@ class StateStoreTestCase(tests.unittest.TestCase): ############################################ # test that things work with a partial cache - # test _get_some_state_from_cache correctly filters out members with types=[] + # test _get_state_for_group_using_cache correctly filters out members + # with types=[] room_id = self.room.to_string() - (state_dict, is_all) = yield self.store._get_some_state_from_cache( - self.store._state_group_cache, group, [], filtered_types=[EventTypes.Member] + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( + self.store._state_group_cache, group, + state_filter=StateFilter( + types={EventTypes.Member: set()}, + include_others=True, + ), ) self.assertEqual(is_all, False) self.assertDictEqual({(e1.type, e1.state_key): e1.event_id}, state_dict) room_id = self.room.to_string() - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: set()}, + include_others=True, + ), ) self.assertEqual(is_all, True) self.assertDictEqual({}, state_dict) - # test _get_some_state_from_cache correctly filters in members wildcard types - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + # test _get_state_for_group_using_cache correctly filters in members + # wildcard types + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_cache, group, - [(EventTypes.Member, None)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: None}, + include_others=True, + ), ) self.assertEqual(is_all, False) self.assertDictEqual({(e1.type, e1.state_key): e1.event_id}, state_dict) - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [(EventTypes.Member, None)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: None}, + include_others=True, + ), ) self.assertEqual(is_all, True) @@ -404,44 +440,53 @@ class StateStoreTestCase(tests.unittest.TestCase): state_dict, ) - # test _get_some_state_from_cache correctly filters in members with specific types - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + # test _get_state_for_group_using_cache correctly filters in members + # with specific types + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=True, + ), ) self.assertEqual(is_all, False) self.assertDictEqual({(e1.type, e1.state_key): e1.event_id}, state_dict) - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=[EventTypes.Member], + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=True, + ), ) self.assertEqual(is_all, True) self.assertDictEqual({(e5.type, e5.state_key): e5.event_id}, state_dict) - # test _get_some_state_from_cache correctly filters in members with specific types - # and no filtered_types - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + # test _get_state_for_group_using_cache correctly filters in members + # with specific types + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=None, + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=False, + ), ) self.assertEqual(is_all, False) self.assertDictEqual({}, state_dict) - (state_dict, is_all) = yield self.store._get_some_state_from_cache( + (state_dict, is_all) = yield self.store._get_state_for_group_using_cache( self.store._state_group_members_cache, group, - [(EventTypes.Member, e5.state_key)], - filtered_types=None, + state_filter=StateFilter( + types={EventTypes.Member: {e5.state_key}}, + include_others=False, + ), ) self.assertEqual(is_all, True) -- cgit 1.5.1 From ad88460e0d2e0a7c2cf39ec5539d5c4ff030bbbd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 29 Oct 2018 14:23:34 +0000 Subject: Move _find_unreferenced_groups --- synapse/storage/events.py | 85 +++++++++++++++++++++++++++++++++++++++++++++-- synapse/storage/state.py | 79 ------------------------------------------- 2 files changed, 83 insertions(+), 81 deletions(-) (limited to 'synapse/storage/events.py') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index e791922733..919e855f3b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2052,8 +2052,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.info("[purge] finding state groups that can be deleted") - state_groups_to_delete, remaining_state_groups = self._find_unreferenced_groups( - txn, referenced_state_groups, + state_groups_to_delete, remaining_state_groups = ( + self._find_unreferenced_groups_during_purge( + txn, referenced_state_groups, + ) ) logger.info( @@ -2209,6 +2211,85 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore logger.info("[purge] done") + def _find_unreferenced_groups_during_purge(self, txn, state_groups): + """Used when purging history to figure out which state groups can be + deleted and which need to be de-delta'ed (due to one of its prev groups + being scheduled for deletion). + + Args: + txn + state_groups (set[int]): Set of state groups referenced by events + that are going to be deleted. + + Returns: + tuple[set[int], set[int]]: The set of state groups that can be + deleted and the set of state groups that need to be de-delta'ed + """ + # Graph of state group -> previous group + graph = {} + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + # Check if state groups are referenced + sql = """ + SELECT DISTINCT state_group FROM event_to_state_groups + LEFT JOIN events_to_purge AS ep USING (event_id) + WHERE state_group IN (%s) AND ep.event_id IS NULL + """ % (",".join("?" for _ in current_search),) + txn.execute(sql, list(current_search)) + + referenced = set(sg for sg, in txn) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("prev_state_group", "state_group",), + ) + + prevs = set(row["state_group"] for row in rows) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + for row in rows: + # Note: Each state group can have at most one prev group + graph[row["state_group"]] = row["prev_state_group"] + + to_delete = state_groups_seen - referenced_groups + + to_dedelta = set() + for sg in referenced_groups: + prev_sg = graph.get(sg) + if prev_sg and prev_sg in to_delete: + to_dedelta.add(sg) + + return to_delete, to_dedelta + @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream diff --git a/synapse/storage/state.py b/synapse/storage/state.py index dfec57c045..d737bd6778 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1234,85 +1234,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count - def _find_unreferenced_groups(self, txn, state_groups): - """Used when purging history to figure out which state groups can be - deleted and which need to be de-delta'ed (due to one of its prev groups - being scheduled for deletion). - - Args: - txn - state_groups (set[int]): Set of state groups referenced by events - that are going to be deleted. - - Returns: - tuple[set[int], set[int]]: The set of state groups that can be - deleted and the set of state groups that need to be de-delta'ed - """ - # Graph of state group -> previous group - graph = {} - - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(islice(next_to_search, 100)) - next_to_search -= current_search - - # Check if state groups are referenced - sql = """ - SELECT DISTINCT state_group FROM event_to_state_groups - LEFT JOIN events_to_purge AS ep USING (event_id) - WHERE state_group IN (%s) AND ep.event_id IS NULL - """ % (",".join("?" for _ in current_search),) - txn.execute(sql, list(current_search)) - - referenced = set(sg for sg, in txn) - referenced_groups |= referenced - - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced - - rows = self._simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("prev_state_group", "state_group",), - ) - - prevs = set(row["state_group"] for row in rows) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - for row in rows: - # Note: Each state group can have at most one prev group - graph[row["state_group"]] = row["prev_state_group"] - - to_delete = state_groups_seen - referenced_groups - - to_dedelta = set() - for sg in referenced_groups: - prev_sg = graph.get(sg) - if prev_sg and prev_sg in to_delete: - to_dedelta.add(sg) - - return to_delete, to_dedelta - class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): """ Keeps track of the state at a given event. -- cgit 1.5.1