diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index b22495c1f9..9b40b18d5b 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -19,13 +19,14 @@ from collections import namedtuple
from six import iteritems, itervalues
+import attr
from frozendict import frozendict
from twisted.internet import defer
from synapse.api.constants import EventTypes, RoomVersions
from synapse.events.snapshot import EventContext
-from synapse.state import v1
+from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
@@ -372,15 +373,10 @@ class StateHandler(object):
result = yield self._state_resolution_handler.resolve_state_groups(
room_id, room_version, state_groups_ids, None,
- self._state_map_factory,
+ state_res_store=StateResolutionStore(self.store),
)
defer.returnValue(result)
- def _state_map_factory(self, ev_ids):
- return self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- )
-
@defer.inlineCallbacks
def resolve_events(self, room_version, state_sets, event):
logger.info(
@@ -398,10 +394,10 @@ class StateHandler(object):
}
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events_with_factory(
+ new_state = yield resolve_events_with_store(
room_version, state_set_ids,
event_map=state_map,
- state_map_factory=self._state_map_factory
+ state_res_store=StateResolutionStore(self.store),
)
new_state = {
@@ -436,7 +432,7 @@ class StateResolutionHandler(object):
@defer.inlineCallbacks
@log_function
def resolve_state_groups(
- self, room_id, room_version, state_groups_ids, event_map, state_map_factory,
+ self, room_id, room_version, state_groups_ids, event_map, state_res_store,
):
"""Resolves conflicts between a set of state groups
@@ -454,9 +450,11 @@ class StateResolutionHandler(object):
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
used as a starting point fof finding the state we need; any missing
- events will be requested via state_map_factory.
+ events will be requested via state_res_store.
+
+ If None, all events will be fetched via state_res_store.
- If None, all events will be fetched via state_map_factory.
+ state_res_store (StateResolutionStore)
Returns:
Deferred[_StateCacheEntry]: resolved state
@@ -480,10 +478,10 @@ class StateResolutionHandler(object):
# start by assuming we won't have any conflicted state, and build up the new
# state map by iterating through the state groups. If we discover a conflict,
- # we give up and instead use `resolve_events_with_factory`.
+ # we give up and instead use `resolve_events_with_store`.
#
# XXX: is this actually worthwhile, or should we just let
- # resolve_events_with_factory do it?
+ # resolve_events_with_store do it?
new_state = {}
conflicted_state = False
for st in itervalues(state_groups_ids):
@@ -498,11 +496,11 @@ class StateResolutionHandler(object):
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events_with_factory(
+ new_state = yield resolve_events_with_store(
room_version,
list(itervalues(state_groups_ids)),
event_map=event_map,
- state_map_factory=state_map_factory,
+ state_res_store=state_res_store,
)
# if the new state matches any of the input state groups, we can
@@ -583,7 +581,7 @@ def _make_state_cache_entry(
)
-def resolve_events_with_factory(room_version, state_sets, event_map, state_map_factory):
+def resolve_events_with_store(room_version, state_sets, event_map, state_res_store):
"""
Args:
room_version(str): Version of the room
@@ -599,17 +597,19 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
If None, all events will be fetched via state_map_factory.
- state_map_factory(func): will be called
- with a list of event_ids that are needed, and should return with
- a Deferred of dict of event_id to event.
+ state_res_store (StateResolutionStore)
Returns
Deferred[dict[(str, str), str]]:
a map from (type, state_key) to event_id.
"""
- if room_version in (RoomVersions.V1, RoomVersions.VDH_TEST,):
- return v1.resolve_events_with_factory(
- state_sets, event_map, state_map_factory,
+ if room_version == RoomVersions.V1:
+ return v1.resolve_events_with_store(
+ state_sets, event_map, state_res_store.get_events,
+ )
+ elif room_version == RoomVersions.VDH_TEST:
+ return v2.resolve_events_with_store(
+ state_sets, event_map, state_res_store,
)
else:
# This should only happen if we added a version but forgot to add it to
@@ -617,3 +617,54 @@ def resolve_events_with_factory(room_version, state_sets, event_map, state_map_f
raise Exception(
"No state resolution algorithm defined for version %r" % (room_version,)
)
+
+
+@attr.s
+class StateResolutionStore(object):
+ """Interface that allows state resolution algorithms to access the database
+ in well defined way.
+
+ Args:
+ store (DataStore)
+ """
+
+ store = attr.ib()
+
+ def get_events(self, event_ids, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred[dict[str, FrozenEvent]]: Dict from event_id to event.
+ """
+
+ return self.store.get_events(
+ event_ids,
+ check_redacted=False,
+ get_prev_content=False,
+ allow_rejected=allow_rejected,
+ )
+
+ def get_auth_chain(self, event_ids):
+ """Gets the full auth chain for a set of events (including rejected
+ events).
+
+ Includes the given event IDs in the result.
+
+ Note that:
+ 1. All events must be state events.
+ 2. For v1 rooms this may not have the full auth chain in the
+ presence of rejected events
+
+ Args:
+ event_ids (list): The event IDs of the events to fetch the auth
+ chain for. Must be state events.
+
+ Returns:
+ Deferred[list[str]]: List of event IDs of the auth chain.
+ """
+
+ return self.store.get_auth_chain_ids(event_ids, include_given=True)
|