diff --git a/changelog.d/3579.misc b/changelog.d/3579.misc
new file mode 100644
index 0000000000..2374dc0c44
--- /dev/null
+++ b/changelog.d/3579.misc
@@ -0,0 +1 @@
+Lazily load state on master process when using workers to reduce DB consumption
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bc629832d9..535bdb449d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -65,8 +65,9 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -544,7 +545,8 @@ class Auth(object):
@defer.inlineCallbacks
def add_auth_events(self, builder, context):
- auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 5e02ef1a5c..a6d7bf5700 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -19,18 +19,12 @@ from frozendict import frozendict
from twisted.internet import defer
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
class EventContext(object):
"""
Attributes:
- current_state_ids (dict[(str, str), str]):
- The current state map including the current event.
- (type, state_key) -> event_id
-
- prev_state_ids (dict[(str, str), str]):
- The current state map excluding the current event.
- (type, state_key) -> event_id
-
state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is
an outlier.
@@ -47,36 +41,74 @@ class EventContext(object):
prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
+
+ _current_state_ids (dict[(str, str), str]|None):
+ The current state map including the current event. None if outlier
+ or we haven't fetched the state from DB yet.
+ (type, state_key) -> event_id
+
+ _prev_state_ids (dict[(str, str), str]|None):
+ The current state map excluding the current event. None if outlier
+ or we haven't fetched the state from DB yet.
+ (type, state_key) -> event_id
+
+ _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
+ been calculated. None if we haven't started calculating yet
+
+ _event_type (str): The type of the event the context is associated with.
+ Only set when state has not been fetched yet.
+
+ _event_state_key (str|None): The state_key of the event the context is
+ associated with. Only set when state has not been fetched yet.
+
+ _prev_state_id (str|None): If the event associated with the context is
+ a state event, then `_prev_state_id` is the event_id of the state
+ that was replaced.
+ Only set when state has not been fetched yet.
"""
__slots__ = [
- "current_state_ids",
- "prev_state_ids",
"state_group",
"rejected",
"prev_group",
"delta_ids",
"prev_state_events",
"app_service",
+ "_current_state_ids",
+ "_prev_state_ids",
+ "_prev_state_id",
+ "_event_type",
+ "_event_state_key",
+ "_fetching_state_deferred",
]
- def __init__(self, state_group, current_state_ids, prev_state_ids,
- prev_group=None, delta_ids=None):
+ @staticmethod
+ def with_state(state_group, current_state_ids, prev_state_ids,
+ prev_group=None, delta_ids=None):
+ context = EventContext()
+
# The current state including the current event
- self.current_state_ids = current_state_ids
+ context._current_state_ids = current_state_ids
# The current state excluding the current event
- self.prev_state_ids = prev_state_ids
- self.state_group = state_group
+ context._prev_state_ids = prev_state_ids
+ context.state_group = state_group
+
+ context._prev_state_id = None
+ context._event_type = None
+ context._event_state_key = None
+ context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that
# and this state.
- self.prev_group = prev_group
- self.delta_ids = delta_ids
+ context.prev_group = prev_group
+ context.delta_ids = delta_ids
- self.prev_state_events = []
+ context.prev_state_events = []
- self.rejected = False
- self.app_service = None
+ context.rejected = False
+ context.app_service = None
+
+ return context
def serialize(self, event):
"""Converts self to a type that can be serialized as JSON, and then
@@ -123,30 +155,17 @@ class EventContext(object):
Returns:
EventContext
"""
+ context = EventContext()
+
# We use the state_group and prev_state_id stuff to pull the
# current_state_ids out of the DB and construct prev_state_ids.
- prev_state_id = input["prev_state_id"]
- event_type = input["event_type"]
- event_state_key = input["event_state_key"]
-
- state_group = input["state_group"]
+ context._prev_state_id = input["prev_state_id"]
+ context._event_type = input["event_type"]
+ context._event_state_key = input["event_state_key"]
- current_state_ids = yield store.get_state_ids_for_group(
- state_group,
- )
- if prev_state_id and event_state_key:
- prev_state_ids = dict(current_state_ids)
- prev_state_ids[(event_type, event_state_key)] = prev_state_id
- else:
- prev_state_ids = current_state_ids
-
- context = EventContext(
- state_group=state_group,
- current_state_ids=current_state_ids,
- prev_state_ids=prev_state_ids,
- prev_group=input["prev_group"],
- delta_ids=_decode_state_dict(input["delta_ids"]),
- )
+ context.state_group = input["state_group"]
+ context.prev_group = input["prev_group"]
+ context.delta_ids = _decode_state_dict(input["delta_ids"])
context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
@@ -157,6 +176,80 @@ class EventContext(object):
defer.returnValue(context)
+ @defer.inlineCallbacks
+ def get_current_state_ids(self, store):
+ """Gets the current state IDs
+
+ Returns:
+ Deferred[dict[(str, str), str]|None]: Returns None if state_group
+ is None, which happens when the associated event is an outlier.
+ """
+
+ if not self._fetching_state_deferred:
+ self._fetching_state_deferred = run_in_background(
+ self._fill_out_state, store,
+ )
+
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ defer.returnValue(self._current_state_ids)
+
+ @defer.inlineCallbacks
+ def get_prev_state_ids(self, store):
+ """Gets the prev state IDs
+
+ Returns:
+ Deferred[dict[(str, str), str]|None]: Returns None if state_group
+ is None, which happens when the associated event is an outlier.
+ """
+
+ if not self._fetching_state_deferred:
+ self._fetching_state_deferred = run_in_background(
+ self._fill_out_state, store,
+ )
+
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ defer.returnValue(self._prev_state_ids)
+
+ @defer.inlineCallbacks
+ def _fill_out_state(self, store):
+ """Called to populate the _current_state_ids and _prev_state_ids
+ attributes by loading from the database.
+ """
+ if self.state_group is None:
+ return
+
+ self._current_state_ids = yield store.get_state_ids_for_group(
+ self.state_group,
+ )
+ if self._prev_state_id and self._event_state_key is not None:
+ self._prev_state_ids = dict(self._current_state_ids)
+
+ key = (self._event_type, self._event_state_key)
+ self._prev_state_ids[key] = self._prev_state_id
+ else:
+ self._prev_state_ids = self._current_state_ids
+
+ @defer.inlineCallbacks
+ def update_state(self, state_group, prev_state_ids, current_state_ids,
+ delta_ids):
+ """Replace the state in the context
+ """
+
+ # We need to make sure we wait for any ongoing fetching of state
+ # to complete so that the updated state doesn't get clobbered
+ if self._fetching_state_deferred:
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ self.state_group = state_group
+ self._prev_state_ids = prev_state_ids
+ self._current_state_ids = current_state_ids
+ self.delta_ids = delta_ids
+
+ # We need to ensure that that we've marked as having fetched the state
+ self._fetching_state_deferred = defer.succeed(None)
+
def _encode_state_dict(state_dict):
"""Since dicts of (type, state_key) -> event_id cannot be serialized in
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
+ current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
- list(context.current_state_ids.values())
+ list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a6d391c4e8..14654d59f1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -486,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- state_ids = list(context.prev_state_ids.values())
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(context.prev_state_ids.values()))
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -1968,21 +1975,35 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update(state_updates)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
if context.delta_ids is not None:
- context.delta_ids = dict(context.delta_ids)
- context.delta_ids.update(state_updates)
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
+ delta_ids = dict(context.delta_ids)
+ delta_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
- context.state_group = yield self.store.store_state_group(
+
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ delta_ids=delta_ids,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ delta_ids=delta_ids,
)
@defer.inlineCallbacks
@@ -2222,7 +2243,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2264,7 +2286,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index abc07ea87c..c4bcd9018b 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -630,7 +630,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -752,8 +753,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -884,9 +885,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in iteritems(context.current_state_ids)
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -922,8 +925,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -943,11 +947,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..a832d91809 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -201,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@@ -496,9 +498,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
- guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+ guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -517,7 +520,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bb181d94ee..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context):
- pl_event_id = context.prev_state_ids.get(POWER_KEY)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
auth_events = {POWER_KEY: pl_event}
else:
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=False,
+ event, prev_state_ids, for_verification=False,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -304,7 +305,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits()
else:
- current_state_ids = context.current_state_ids
+ current_state_ids = yield context.get_current_state_ids(self.store)
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids))
diff --git a/synapse/state.py b/synapse/state.py
index a708695006..32125c95df 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -219,7 +219,7 @@ class StateHandler(object):
# We don't store state for outliers, so we don't generate a state
# group for it.
- context = EventContext(
+ context = EventContext.with_state(
state_group=None,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -255,7 +255,7 @@ class StateHandler(object):
current_state_ids=current_state_ids,
)
- context = EventContext(
+ context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
@@ -322,7 +322,7 @@ class StateHandler(object):
state_group = entry.state_group
- context = EventContext(
+ context = EventContext.with_state(
state_group=state_group,
current_state_ids=current_state_ids,
prev_state_ids=prev_state_ids,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 4ff0fdc4ab..bf4f3ee92a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -549,7 +549,7 @@ class EventsStore(EventsWorkerStore):
if ctx.state_group in state_groups_map:
continue
- state_groups_map[ctx.state_group] = ctx.current_state_ids
+ state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self)
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index be655d287b..af564b1b4e 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -186,6 +186,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
defer.returnValue(results)
+ @defer.inlineCallbacks
def bulk_get_push_rules_for_room(self, event, context):
state_group = context.state_group
if not state_group:
@@ -195,9 +196,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._bulk_get_push_rules_for_room(
- event.room_id, state_group, context.current_state_ids, event=event
+ current_state_ids = yield context.get_current_state_ids(self)
+ result = yield self._bulk_get_push_rules_for_room(
+ event.room_id, state_group, current_state_ids, event=event
)
+ defer.returnValue(result)
@cachedInlineCallbacks(num_args=2, cache_context=True)
def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 02a802bed9..a27702a7a0 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(user_who_share_room)
+ @defer.inlineCallbacks
def get_joined_users_from_context(self, event, context):
state_group = context.state_group
if not state_group:
@@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_users_from_context(
- event.room_id, state_group, context.current_state_ids,
+ current_state_ids = yield context.get_current_state_ids(self)
+ result = yield self._get_joined_users_from_context(
+ event.room_id, state_group, current_state_ids,
event=event,
context=context,
)
+ defer.returnValue(result)
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index cea01d93eb..f5b47f5ec0 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -222,9 +222,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_ids = {
key: e.event_id for key, e in state.items()
}
- context = EventContext()
- context.current_state_ids = state_ids
- context.prev_state_ids = state_ids
+ context = EventContext.with_state(
+ state_group=None,
+ current_state_ids=state_ids,
+ prev_state_ids=state_ids
+ )
else:
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)
diff --git a/tests/test_state.py b/tests/test_state.py
index c0f2d1152d..429a18cbf7 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -204,7 +204,8 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
- self.assertEqual(2, len(context_store["D"].prev_state_ids))
+ prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
+ self.assertEqual(2, len(prev_state_ids))
@defer.inlineCallbacks
def test_branch_basic_conflict(self):
@@ -255,9 +256,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
+ prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
+
self.assertSetEqual(
{"START", "A", "C"},
- {e_id for e_id in context_store["D"].prev_state_ids.values()}
+ {e_id for e_id in prev_state_ids.values()}
)
@defer.inlineCallbacks
@@ -318,9 +321,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
+ prev_state_ids = yield context_store["E"].get_prev_state_ids(self.store)
+
self.assertSetEqual(
{"START", "A", "B", "C"},
- {e for e in context_store["E"].prev_state_ids.values()}
+ {e for e in prev_state_ids.values()}
)
@defer.inlineCallbacks
@@ -398,9 +403,11 @@ class StateTestCase(unittest.TestCase):
self.store.register_event_context(event, context)
context_store[event.event_id] = context
+ prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store)
+
self.assertSetEqual(
{"A1", "A2", "A3", "A5", "B"},
- {e for e in context_store["D"].prev_state_ids.values()}
+ {e for e in prev_state_ids.values()}
)
def _add_depths(self, nodes, edges):
@@ -429,8 +436,10 @@ class StateTestCase(unittest.TestCase):
event, old_state=old_state
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
self.assertEqual(
- set(e.event_id for e in old_state), set(context.current_state_ids.values())
+ set(e.event_id for e in old_state), set(current_state_ids.values())
)
self.assertIsNotNone(context.state_group)
@@ -449,8 +458,10 @@ class StateTestCase(unittest.TestCase):
event, old_state=old_state
)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
self.assertEqual(
- set(e.event_id for e in old_state), set(context.prev_state_ids.values())
+ set(e.event_id for e in old_state), set(prev_state_ids.values())
)
@defer.inlineCallbacks
@@ -475,9 +486,11 @@ class StateTestCase(unittest.TestCase):
context = yield self.state.compute_event_context(event)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
self.assertEqual(
set([e.event_id for e in old_state]),
- set(context.current_state_ids.values())
+ set(current_state_ids.values())
)
self.assertEqual(group_name, context.state_group)
@@ -504,9 +517,11 @@ class StateTestCase(unittest.TestCase):
context = yield self.state.compute_event_context(event)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
self.assertEqual(
set([e.event_id for e in old_state]),
- set(context.prev_state_ids.values())
+ set(prev_state_ids.values())
)
self.assertIsNotNone(context.state_group)
@@ -545,7 +560,9 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
- self.assertEqual(len(context.current_state_ids), 6)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
+ self.assertEqual(len(current_state_ids), 6)
self.assertIsNotNone(context.state_group)
@@ -585,7 +602,9 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
- self.assertEqual(len(context.current_state_ids), 6)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
+ self.assertEqual(len(current_state_ids), 6)
self.assertIsNotNone(context.state_group)
@@ -642,8 +661,10 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
self.assertEqual(
- old_state_2[3].event_id, context.current_state_ids[("test1", "1")]
+ old_state_2[3].event_id, current_state_ids[("test1", "1")]
)
# Reverse the depth to make sure we are actually using the depths
@@ -670,8 +691,10 @@ class StateTestCase(unittest.TestCase):
event, prev_event_id1, old_state_1, prev_event_id2, old_state_2,
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
self.assertEqual(
- old_state_1[3].event_id, context.current_state_ids[("test1", "1")]
+ old_state_1[3].event_id, current_state_ids[("test1", "1")]
)
def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2,
|