summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-02-16 11:16:29 +0000
committerErik Johnston <erik@matrix.org>2018-03-14 16:41:20 +0000
commit2ad7c767eacd9067593cc6e7a4816cf298203401 (patch)
treef47c138b66ba7832fae6a5b88b6ba9e32a64f797
parentMerge pull request #2995 from matrix-org/erikj/enable_membership_worker (diff)
downloadsynapse-2ad7c767eacd9067593cc6e7a4816cf298203401.tar.xz
Refactor event storage to not require state
This is in preparation for using contexts that may or may not have the
current_state_ids set. This will allow us to avoid unnecessarily pulling
out state for an event on the master process when using workers.
-rw-r--r--synapse/storage/events.py73
1 files changed, 40 insertions, 33 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 3890878170..3259a58c12 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,6 +19,7 @@ from synapse.storage.events_worker import EventsWorkerStore
 from twisted.internet import defer
 
 from synapse.events import USE_FROZEN_DICTS
+from synapse.events.snapshot import EventContext
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import (
@@ -42,7 +43,6 @@ import ujson as json
 
 # these are only included to make the type annotations work
 from synapse.events import EventBase    # noqa: F401
-from synapse.events.snapshot import EventContext   # noqa: F401
 
 logger = logging.getLogger(__name__)
 
@@ -504,61 +504,68 @@ class EventsStore(EventsWorkerStore):
             defer.returnValue({})
 
         # map from state_group to ((type, key) -> event_id) state map
-        state_groups = {}
-        missing_event_ids = []
-        was_updated = False
+        state_groups_map = {}
+        for ev, ctx in events_context:
+            if ctx.state_group is None:
+                # I don't think this can happen, but let's double-check
+                raise Exception(
+                    "Context for new extremity event %s has no state "
+                    "group" % (ev.event_id, ),
+                )
+
+            if ctx.state_group in state_groups_map:
+                continue
+
+            if isinstance(ctx, EventContext) and ctx.current_state_ids:
+                state_groups_map[ctx.state_group] = ctx.current_state_ids
+
+        # We need to map new_latest_event_ids to their state groups. First, lets
+        # check if the event is one we're persisting and then we can pull the
+        # state group from its context.
+        # Otherwise we need to pull the state group from the database.
+        missing_event_ids = []  # List of events we need to fetch groups for
+        state_groups_to_resolve = set()  # State groups of new_latest_event_ids
         for event_id in new_latest_event_ids:
-            # First search in the list of new events we're adding,
-            # and then use the current state from that
+            # First search in the list of new events we're adding.
             for ev, ctx in events_context:
                 if event_id == ev.event_id:
-                    if ctx.current_state_ids is None:
-                        raise Exception("Unknown current state")
-
-                    if ctx.state_group is None:
-                        # I don't think this can happen, but let's double-check
-                        raise Exception(
-                            "Context for new extremity event %s has no state "
-                            "group" % (event_id, ),
-                        )
-
-                    # If we've already seen the state group don't bother adding
-                    # it to the state sets again
-                    if ctx.state_group not in state_groups:
-                        state_groups[ctx.state_group] = ctx.current_state_ids
-                        if ctx.delta_ids or hasattr(ev, "state_key"):
-                            was_updated = True
+                    state_groups_to_resolve.add(ctx.state_group)
                     break
             else:
                 # If we couldn't find it, then we'll need to pull
                 # the state from the database
-                was_updated = True
                 missing_event_ids.append(event_id)
 
-        if not was_updated:
-            return
-
         if missing_event_ids:
             # Now pull out the state for any missing events from DB
             event_to_groups = yield self._get_state_group_for_events(
                 missing_event_ids,
             )
+            state_groups_to_resolve.update(event_to_groups.itervalues())
 
-            groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
-
-            if groups:
-                group_to_state = yield self._get_state_for_groups(groups)
-                state_groups.update(group_to_state)
+        # Now that we have calculated state_groups_to_resolve we need to get
+        # their state so we can resolve to a single state set.
+        missing_state = state_groups_to_resolve - set(state_groups_map)
+        if missing_state:
+            group_to_state = yield self._get_state_for_groups(missing_state)
+            state_groups_map.update(group_to_state)
 
-        if len(state_groups) == 1:
+        if len(state_groups_to_resolve) == 1:
             # If there is only one state group, then we know what the current
             # state is.
-            defer.returnValue(state_groups.values()[0])
+            defer.returnValue(state_groups_map[state_groups_to_resolve.pop()])
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
 
         def get_events(ev_ids):
             return self.get_events(
                 ev_ids, get_prev_content=False, check_redacted=False,
             )
+
+        state_groups = {
+            sg: state_groups_map[sg] for sg in state_groups_to_resolve
+        }
+
         events_map = {ev.event_id: ev for ev, _ in events_context}
         logger.debug("calling resolve_state_groups from preserve_events")
         res = yield self._state_resolution_handler.resolve_state_groups(