summary refs log tree commit diff
path: root/synapse/state/__init__.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-09-03 15:13:17 +0100
committerErik Johnston <erik@matrix.org>2018-10-16 16:16:13 +0100
commite238013c44714174f1bf9a2b9b1f576728f40784 (patch)
treee704d7baf57930de8c2cd5d9a48c3e06093a0ce6 /synapse/state/__init__.py
parentVarious cleanups in the federation client code (#4031) (diff)
downloadsynapse-e238013c44714174f1bf9a2b9b1f576728f40784.tar.xz
Add v2 state res algorithm.
We hook this up to the vdh test room version.
Diffstat (limited to 'synapse/state/__init__.py')
-rw-r--r--synapse/state/__init__.py87
1 files changed, 69 insertions, 18 deletions
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index b22495c1f9..836e137296 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -19,13 +19,14 @@ from collections import namedtuple
 
 from six import iteritems, itervalues
 
+import attr
 from frozendict import frozendict
 
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, RoomVersions
 from synapse.events.snapshot import EventContext
-from synapse.state import v1
+from synapse.state import v1, v2
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import get_cache_factor_for
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -372,15 +373,10 @@ class StateHandler(object):
 
         result = yield self._state_resolution_handler.resolve_state_groups(
             room_id, room_version, state_groups_ids, None,
-            self._state_map_factory,
+            state_res_store=StateResolutionStore(self.store),
         )
         defer.returnValue(result)
 
-    def _state_map_factory(self, ev_ids):
-        return self.store.get_events(
-            ev_ids, get_prev_content=False, check_redacted=False,
-        )
-
     @defer.inlineCallbacks
     def resolve_events(self, room_version, state_sets, event):
         logger.info(
@@ -401,7 +397,7 @@ class StateHandler(object):
             new_state = yield resolve_events_with_factory(
                 room_version, state_set_ids,
                 event_map=state_map,
-                state_map_factory=self._state_map_factory
+                state_res_store=StateResolutionStore(self.store),
             )
 
         new_state = {
@@ -436,7 +432,7 @@ class StateResolutionHandler(object):
     @defer.inlineCallbacks
     @log_function
     def resolve_state_groups(
-        self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
+        self, room_id, room_version, state_groups_ids, event_map, state_res_store,
     ):
         """Resolves conflicts between a set of state groups
 
@@ -454,9 +450,11 @@ class StateResolutionHandler(object):
                 a dict from event_id to event, for any events that we happen to
                 have in flight (eg, those currently being persisted). This will be
                 used as a starting point fof finding the state we need; any missing
-                events will be requested via state_map_factory.
+                events will be requested via state_res_store.
+
+                If None, all events will be fetched via state_res_store.
 
-                If None, all events will be fetched via state_map_factory.
+            state_res_store (StateResolutionStore)
 
         Returns:
             Deferred[_StateCacheEntry]: resolved state
@@ -502,7 +500,7 @@ class StateResolutionHandler(object):
                         room_version,
                         list(itervalues(state_groups_ids)),
                         event_map=event_map,
-                        state_map_factory=state_map_factory,
+                        state_res_store=state_res_store,
                     )
 
             # if the new state matches any of the input state groups, we can
@@ -583,7 +581,7 @@ def _make_state_cache_entry(
     )
 
 
-def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
+def resolve_events_with_factory(room_version, state_sets, event_map, state_res_store):
     """
     Args:
         room_version(str): Version of the room
@@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
 
             If None, all events will be fetched via state_map_factory.
 
-        state_map_factory(func): will be called
-            with a list of event_ids that are needed, and should return with
-            a Deferred of dict of event_id to event.
+        state_res_store (StateResolutionStore)
 
     Returns
         Deferred[dict[(str, str), str]]:
             a map from (type, state_key) to event_id.
     """
-    if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
+    if room_version == RoomVersions.V1:
         return v1.resolve_events_with_factory(
-            state_sets, event_map, state_map_factory,
+            state_sets, event_map, state_res_store.get_events,
+        )
+    elif room_version == RoomVersions.VDH_TEST:
+        return v2.resolve_events_with_factory(
+            state_sets, event_map, state_res_store,
         )
     else:
         # This should only happen if we added a version but forgot to add it to
@@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
         raise Exception(
             "No state resolution algorithm defined for version %r" % (room_version,)
         )
+
+
+@attr.s
+class StateResolutionStore(object):
+    """Interface that allows state resolution algorithms to access the database
+    in well defined way.
+
+    Args:
+        store (DataStore)
+    """
+
+    store = attr.ib()
+
+    def get_events(self, event_ids, allow_rejected=False):
+        """Get events from the database
+
+        Args:
+            event_ids (list): The event_ids of the events to fetch
+            allow_rejected (bool): If True return rejected events.
+
+        Returns:
+            Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
+        """
+
+        return self.store.get_events(
+            event_ids,
+            check_redacted=False,
+            get_prev_content=False,
+            allow_rejected=allow_rejected,
+        )
+
+    def get_auth_chain(self, event_ids):
+        """Gets the full auth chain for a set of events (including rejected
+        events).
+
+        Includes the given event IDs in the result.
+
+        Note that:
+            1. All events must be state events.
+            2. For v1 rooms this may not have the full auth chain in the
+               presence of rejected events
+
+        Args:
+            event_ids (list): The event IDs of the events to fetch the auth
+                chain for. Must be state events.
+
+        Returns:
+            Deferred[list[str]]: List of event IDs of the auth chain.
+        """
+
+        return self.store.get_auth_chain_ids(event_ids, include_given=True)