diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 33fccfa7a8..dd28c2efe3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -386,11 +386,18 @@ class EventsStore(SQLBaseStore):
if all_single_prev_not_state:
continue
- state = yield self._calculate_state_delta(
- room_id, ev_ctx_rm, new_latest_event_ids
+ logger.info(
+ "Calculating state delta for room %s", room_id,
)
- if state:
- current_state_for_room[room_id] = state
+ current_state = yield self._get_new_state_after_events(
+ ev_ctx_rm, new_latest_event_ids,
+ )
+ if current_state is not None:
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ if delta is not None:
+ current_state_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -467,20 +474,22 @@ class EventsStore(SQLBaseStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
- """Calculate the new state deltas for a room.
+ def _get_new_state_after_events(self, events_context, new_latest_event_ids):
+ """Calculate the current state dict after adding some new events to
+ a room
- Assumes that we are only persisting events for one room at a time.
+ Args:
+ events_context (list[(EventBase, EventContext)]):
+ events and contexts which are being added to the room
+
+ new_latest_event_ids (iterable[str]):
+ the new forward extremities for the room.
Returns:
- 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert. `new_state` is the full set of state.
- May return None if there are no changes to be applied.
+ Deferred[dict[(str,str), str]|None]:
+ None if there are no changes to the room state, or
+ a dict of (type, state_key) -> event_id].
"""
- # Now we need to work out the different state sets for
- # each state extremities
state_sets = []
state_groups = set()
missing_event_ids = []
@@ -523,12 +532,12 @@ class EventsStore(SQLBaseStore):
state_sets.extend(group_to_state.itervalues())
if not new_latest_event_ids:
- current_state = {}
+ defer.returnValue({})
elif was_updated:
if len(state_sets) == 1:
# If there is only one state set, then we know what the current
# state is.
- current_state = state_sets[0]
+ defer.returnValue(state_sets[0])
else:
# We work out the current state by passing the state sets to the
# state resolution algorithm. It may ask for some events, including
@@ -537,8 +546,7 @@ class EventsStore(SQLBaseStore):
# up in the db.
logger.info(
- "Resolving state for %s with %i state sets",
- room_id, len(state_sets),
+ "Resolving state with %i state sets", len(state_sets),
)
events_map = {ev.event_id: ev for ev, _ in events_context}
@@ -567,9 +575,22 @@ class EventsStore(SQLBaseStore):
state_sets,
state_map_factory=get_events,
)
+ defer.returnValue(current_state)
else:
return
+ @defer.inlineCallbacks
+ def _calculate_state_delta(self, room_id, current_state):
+ """Calculate the new state deltas for a room.
+
+ Assumes that we are only persisting events for one room at a time.
+
+ Returns:
+ 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+ i.e. (type, state_key) -> event_id. `to_delete` are the entries to
+ first be deleted from current_state_events, `to_insert` are entries
+ to insert. `new_state` is the full set of state.
+ """
existing_state = yield self.get_current_state_ids(room_id)
existing_events = set(existing_state.itervalues())
|