summary refs log tree commit diff
path: root/synapse/state/v2.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/state/v2.py')
-rw-r--r--synapse/state/v2.py77
1 files changed, 60 insertions, 17 deletions
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 18484e2fa6..bf6caa0946 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -18,8 +18,6 @@ import itertools
 import logging
 from typing import Dict, List, Optional
 
-from six import iteritems, itervalues
-
 from twisted.internet import defer
 
 import synapse.state
@@ -29,12 +27,20 @@ from synapse.api.errors import AuthError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
 from synapse.types import StateMap
+from synapse.util import Clock
 
 logger = logging.getLogger(__name__)
 
 
+# We want to yield to the reactor occasionally during state res when dealing
+# with large data sets, so that we don't exhaust the reactor. This is done by
+# yielding to reactor during loops every N iterations.
+_YIELD_AFTER_ITERATIONS = 100
+
+
 @defer.inlineCallbacks
 def resolve_events_with_store(
+    clock: Clock,
     room_id: str,
     room_version: str,
     state_sets: List[StateMap[str]],
@@ -44,13 +50,11 @@ def resolve_events_with_store(
     """Resolves the state using the v2 state resolution algorithm
 
     Args:
+        clock
         room_id: the room we are working in
-
         room_version: The room version
-
         state_sets: List of dicts of (type, state_key) -> event_id,
             which are the different state groups to resolve.
-
         event_map:
             a dict from event_id to event, for any events that we happen to
             have in flight (eg, those currently being persisted). This will be
@@ -87,7 +91,7 @@ def resolve_events_with_store(
 
     full_conflicted_set = set(
         itertools.chain(
-            itertools.chain.from_iterable(itervalues(conflicted_state)), auth_diff
+            itertools.chain.from_iterable(conflicted_state.values()), auth_diff
         )
     )
 
@@ -115,13 +119,14 @@ def resolve_events_with_store(
     )
 
     sorted_power_events = yield _reverse_topological_power_sort(
-        room_id, power_events, event_map, state_res_store, full_conflicted_set
+        clock, room_id, power_events, event_map, state_res_store, full_conflicted_set
     )
 
     logger.debug("sorted %d power events", len(sorted_power_events))
 
     # Now sequentially auth each one
     resolved_state = yield _iterative_auth_checks(
+        clock,
         room_id,
         room_version,
         sorted_power_events,
@@ -135,20 +140,22 @@ def resolve_events_with_store(
     # OK, so we've now resolved the power events. Now sort the remaining
     # events using the mainline of the resolved power level.
 
+    set_power_events = set(sorted_power_events)
     leftover_events = [
-        ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events
+        ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events
     ]
 
     logger.debug("sorting %d remaining events", len(leftover_events))
 
     pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
     leftover_events = yield _mainline_sort(
-        room_id, leftover_events, pl, event_map, state_res_store
+        clock, room_id, leftover_events, pl, event_map, state_res_store
     )
 
     logger.debug("resolving remaining events")
 
     resolved_state = yield _iterative_auth_checks(
+        clock,
         room_id,
         room_version,
         leftover_events,
@@ -318,12 +325,13 @@ def _add_event_and_auth_chain_to_graph(
 
 @defer.inlineCallbacks
 def _reverse_topological_power_sort(
-    room_id, event_ids, event_map, state_res_store, auth_diff
+    clock, room_id, event_ids, event_map, state_res_store, auth_diff
 ):
     """Returns a list of the event_ids sorted by reverse topological ordering,
     and then by power level and origin_server_ts
 
     Args:
+        clock (Clock)
         room_id (str): the room we are working in
         event_ids (list[str]): The events to sort
         event_map (dict[str,FrozenEvent])
@@ -335,18 +343,28 @@ def _reverse_topological_power_sort(
     """
 
     graph = {}
-    for event_id in event_ids:
+    for idx, event_id in enumerate(event_ids, start=1):
         yield _add_event_and_auth_chain_to_graph(
             graph, room_id, event_id, event_map, state_res_store, auth_diff
         )
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     event_to_pl = {}
-    for event_id in graph:
+    for idx, event_id in enumerate(graph, start=1):
         pl = yield _get_power_level_for_sender(
             room_id, event_id, event_map, state_res_store
         )
         event_to_pl[event_id] = pl
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     def _get_power_order(event_id):
         ev = event_map[event_id]
         pl = event_to_pl[event_id]
@@ -362,12 +380,13 @@ def _reverse_topological_power_sort(
 
 @defer.inlineCallbacks
 def _iterative_auth_checks(
-    room_id, room_version, event_ids, base_state, event_map, state_res_store
+    clock, room_id, room_version, event_ids, base_state, event_map, state_res_store
 ):
     """Sequentially apply auth checks to each event in given list, updating the
     state as it goes along.
 
     Args:
+        clock (Clock)
         room_id (str)
         room_version (str)
         event_ids (list[str]): Ordered list of events to apply auth checks to
@@ -381,7 +400,7 @@ def _iterative_auth_checks(
     resolved_state = base_state.copy()
     room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
-    for event_id in event_ids:
+    for idx, event_id in enumerate(event_ids, start=1):
         event = event_map[event_id]
 
         auth_events = {}
@@ -419,17 +438,23 @@ def _iterative_auth_checks(
         except AuthError:
             pass
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     return resolved_state
 
 
 @defer.inlineCallbacks
 def _mainline_sort(
-    room_id, event_ids, resolved_power_event_id, event_map, state_res_store
+    clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store
 ):
     """Returns a sorted list of event_ids sorted by mainline ordering based on
     the given event resolved_power_event_id
 
     Args:
+        clock (Clock)
         room_id (str): room we're working in
         event_ids (list[str]): Events to sort
         resolved_power_event_id (str): The final resolved power level event ID
@@ -439,8 +464,14 @@ def _mainline_sort(
     Returns:
         Deferred[list[str]]: The sorted list
     """
+    if not event_ids:
+        # It's possible for there to be no event IDs here to sort, so we can
+        # skip calculating the mainline in that case.
+        return []
+
     mainline = []
     pl = resolved_power_event_id
+    idx = 0
     while pl:
         mainline.append(pl)
         pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
@@ -454,17 +485,29 @@ def _mainline_sort(
                 pl = aid
                 break
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
+        idx += 1
+
     mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
 
     event_ids = list(event_ids)
 
     order_map = {}
-    for ev_id in event_ids:
+    for idx, ev_id in enumerate(event_ids, start=1):
         depth = yield _get_mainline_depth_for_event(
             event_map[ev_id], mainline_map, event_map, state_res_store
         )
         order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     event_ids.sort(key=lambda ev_id: order_map[ev_id])
 
     return event_ids
@@ -572,7 +615,7 @@ def lexicographical_topological_sort(graph, key):
     # `(key(node), node)` so that sorting does the right thing
     zero_outdegree = []
 
-    for node, edges in iteritems(graph):
+    for node, edges in graph.items():
         if len(edges) == 0:
             zero_outdegree.append((key(node), node))