From e238013c44714174f1bf9a2b9b1f576728f40784 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Sep 2018 15:13:17 +0100 Subject: Add v2 state res algorithm. We hook this up to the vdh test room version. --- synapse/state/__init__.py | 87 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 18 deletions(-) (limited to 'synapse/state/__init__.py') diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index b22495c1f9..836e137296 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -19,13 +19,14 @@ from collections import namedtuple from six import iteritems, itervalues +import attr from frozendict import frozendict from twisted.internet import defer from synapse.api.constants import EventTypes, RoomVersions from synapse.events.snapshot import EventContext -from synapse.state import v1 +from synapse.state import v1, v2 from synapse.util.async_helpers import Linearizer from synapse.util.caches import get_cache_factor_for from synapse.util.caches.expiringcache import ExpiringCache @@ -372,15 +373,10 @@ class StateHandler(object): result = yield self._state_resolution_handler.resolve_state_groups( room_id, room_version, state_groups_ids, None, - self._state_map_factory, + state_res_store=StateResolutionStore(self.store), ) defer.returnValue(result) - def _state_map_factory(self, ev_ids): - return self.store.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ) - @defer.inlineCallbacks def resolve_events(self, room_version, state_sets, event): logger.info( @@ -401,7 +397,7 @@ class StateHandler(object): new_state = yield resolve_events_with_factory( room_version, state_set_ids, event_map=state_map, - state_map_factory=self._state_map_factory + state_res_store=StateResolutionStore(self.store), ) new_state = { @@ -436,7 +432,7 @@ class StateResolutionHandler(object): @defer.inlineCallbacks @log_function def resolve_state_groups( - self, room_id, room_version, state_groups_ids, event_map, state_map_factory, + self, room_id, room_version, state_groups_ids, event_map, state_res_store, ): """Resolves conflicts between a set of state groups @@ -454,9 +450,11 @@ class StateResolutionHandler(object): a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be used as a starting point fof finding the state we need; any missing - events will be requested via state_map_factory. + events will be requested via state_res_store. + + If None, all events will be fetched via state_res_store. - If None, all events will be fetched via state_map_factory. + state_res_store (StateResolutionStore) Returns: Deferred[_StateCacheEntry]: resolved state @@ -502,7 +500,7 @@ class StateResolutionHandler(object): room_version, list(itervalues(state_groups_ids)), event_map=event_map, - state_map_factory=state_map_factory, + state_res_store=state_res_store, ) # if the new state matches any of the input state groups, we can @@ -583,7 +581,7 @@ def _make_state_cache_entry( ) -def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory): +def resolve_events_with_factory(room_version, state_sets, event_map, state_res_store): """ Args: room_version(str): Version of the room @@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f If None, all events will be fetched via state_map_factory. - state_map_factory(func): will be called - with a list of event_ids that are needed, and should return with - a Deferred of dict of event_id to event. + state_res_store (StateResolutionStore) Returns Deferred[dict[(str, str), str]]: a map from (type, state_key) to event_id. """ - if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,): + if room_version == RoomVersions.V1: return v1.resolve_events_with_factory( - state_sets, event_map, state_map_factory, + state_sets, event_map, state_res_store.get_events, + ) + elif room_version == RoomVersions.VDH_TEST: + return v2.resolve_events_with_factory( + state_sets, event_map, state_res_store, ) else: # This should only happen if we added a version but forgot to add it to @@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f raise Exception( "No state resolution algorithm defined for version %r" % (room_version,) ) + + +@attr.s +class StateResolutionStore(object): + """Interface that allows state resolution algorithms to access the database + in well defined way. + + Args: + store (DataStore) + """ + + store = attr.ib() + + def get_events(self, event_ids, allow_rejected=False): + """Get events from the database + + Args: + event_ids (list): The event_ids of the events to fetch + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred[dict[str, FrozenEvent]]: Dict from event_id to event. + """ + + return self.store.get_events( + event_ids, + check_redacted=False, + get_prev_content=False, + allow_rejected=allow_rejected, + ) + + def get_auth_chain(self, event_ids): + """Gets the full auth chain for a set of events (including rejected + events). + + Includes the given event IDs in the result. + + Note that: + 1. All events must be state events. + 2. For v1 rooms this may not have the full auth chain in the + presence of rejected events + + Args: + event_ids (list): The event IDs of the events to fetch the auth + chain for. Must be state events. + + Returns: + Deferred[list[str]]: List of event IDs of the auth chain. + """ + + return self.store.get_auth_chain_ids(event_ids, include_given=True) -- cgit 1.4.1 From 810715f79aafc586e70e9dcd3da4a2cc6823f704 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 24 Oct 2018 09:44:22 +0100 Subject: Rename resolve_events_with_factory --- synapse/handlers/federation.py | 4 ++-- synapse/state/__init__.py | 14 +++++++------- synapse/state/v1.py | 2 +- synapse/state/v2.py | 2 +- tests/state/test_v2.py | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) (limited to 'synapse/state/__init__.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b028d58ae4..91a18a552c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -53,7 +53,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet -from synapse.state import StateResolutionStore, resolve_events_with_factory +from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id from synapse.util import logcontext, unwrapFirstError from synapse.util.async_helpers import Linearizer @@ -385,7 +385,7 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x room_version = yield self.store.get_room_version(room_id) - state_map = yield resolve_events_with_factory( + state_map = yield resolve_events_with_store( room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 836e137296..9b40b18d5b 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -394,7 +394,7 @@ class StateHandler(object): } with Measure(self.clock, "state._resolve_events"): - new_state = yield resolve_events_with_factory( + new_state = yield resolve_events_with_store( room_version, state_set_ids, event_map=state_map, state_res_store=StateResolutionStore(self.store), @@ -478,10 +478,10 @@ class StateResolutionHandler(object): # start by assuming we won't have any conflicted state, and build up the new # state map by iterating through the state groups. If we discover a conflict, - # we give up and instead use `resolve_events_with_factory`. + # we give up and instead use `resolve_events_with_store`. # # XXX: is this actually worthwhile, or should we just let - # resolve_events_with_factory do it? + # resolve_events_with_store do it? new_state = {} conflicted_state = False for st in itervalues(state_groups_ids): @@ -496,7 +496,7 @@ class StateResolutionHandler(object): if conflicted_state: logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): - new_state = yield resolve_events_with_factory( + new_state = yield resolve_events_with_store( room_version, list(itervalues(state_groups_ids)), event_map=event_map, @@ -581,7 +581,7 @@ def _make_state_cache_entry( ) -def resolve_events_with_factory(room_version, state_sets, event_map, state_res_store): +def resolve_events_with_store(room_version, state_sets, event_map, state_res_store): """ Args: room_version(str): Version of the room @@ -604,11 +604,11 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_res_s a map from (type, state_key) to event_id. """ if room_version == RoomVersions.V1: - return v1.resolve_events_with_factory( + return v1.resolve_events_with_store( state_sets, event_map, state_res_store.get_events, ) elif room_version == RoomVersions.VDH_TEST: - return v2.resolve_events_with_factory( + return v2.resolve_events_with_store( state_sets, event_map, state_res_store, ) else: diff --git a/synapse/state/v1.py b/synapse/state/v1.py index 7a7157b352..70a981f4a2 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py @@ -31,7 +31,7 @@ POWER_KEY = (EventTypes.PowerLevels, "") @defer.inlineCallbacks -def resolve_events_with_factory(state_sets, event_map, state_map_factory): +def resolve_events_with_store(state_sets, event_map, state_map_factory): """ Args: state_sets(list): List of dicts of (type, state_key) -> event_id, diff --git a/synapse/state/v2.py b/synapse/state/v2.py index d034b4f38e..5d06f7e928 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def resolve_events_with_factory(state_sets, event_map, state_res_store): +def resolve_events_with_store(state_sets, event_map, state_res_store): """Resolves the state using the v2 state resolution algorithm Args: diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index b96bb6b25c..73f2ef7557 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -24,7 +24,7 @@ from synapse.event_auth import auth_types_for_event from synapse.events import FrozenEvent from synapse.state.v2 import ( lexicographical_topological_sort, - resolve_events_with_factory, + resolve_events_with_store, ) from synapse.types import EventID @@ -541,7 +541,7 @@ class StateTestCase(unittest.TestCase): elif len(prev_events) == 1: state_before = dict(state_at_event[prev_events[0]]) else: - state_d = resolve_events_with_factory( + state_d = resolve_events_with_store( [state_at_event[n] for n in prev_events], event_map=event_map, state_res_store=TestStateResolutionStore(event_map), -- cgit 1.4.1