diff options
Diffstat (limited to 'synapse/state')
-rw-r--r-- | synapse/state/__init__.py | 6 | ||||
-rw-r--r-- | synapse/state/v2.py | 56 |
2 files changed, 51 insertions, 11 deletions
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 50fd843f66..495d9f04c8 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -32,6 +32,7 @@ from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.types import StateMap +from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func @@ -414,6 +415,7 @@ class StateHandler(object): with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + self.clock, event.room_id, room_version, state_set_ids, @@ -516,6 +518,7 @@ class StateResolutionHandler(object): logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + self.clock, room_id, room_version, list(state_groups_ids.values()), @@ -589,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids): def resolve_events_with_store( + clock: Clock, room_id: str, room_version: str, state_sets: List[StateMap[str]], @@ -625,7 +629,7 @@ def resolve_events_with_store( ) else: return v2.resolve_events_with_store( - room_id, room_version, state_sets, event_map, state_res_store + clock, room_id, room_version, state_sets, event_map, state_res_store ) diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 57eadce4e6..7181ecda9a 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -27,12 +27,20 @@ from synapse.api.errors import AuthError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.types import StateMap +from synapse.util import Clock logger = logging.getLogger(__name__) +# We want to yield to the reactor occasionally during state res when dealing +# with large data sets, so that we don't exhaust the reactor. This is done by +# yielding to reactor during loops every N iterations. +_YIELD_AFTER_ITERATIONS = 100 + + @defer.inlineCallbacks def resolve_events_with_store( + clock: Clock, room_id: str, room_version: str, state_sets: List[StateMap[str]], @@ -42,13 +50,11 @@ def resolve_events_with_store( """Resolves the state using the v2 state resolution algorithm Args: + clock room_id: the room we are working in - room_version: The room version - state_sets: List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be @@ -113,7 +119,7 @@ def resolve_events_with_store( ) sorted_power_events = yield _reverse_topological_power_sort( - room_id, power_events, event_map, state_res_store, full_conflicted_set + clock, room_id, power_events, event_map, state_res_store, full_conflicted_set ) logger.debug("sorted %d power events", len(sorted_power_events)) @@ -142,7 +148,7 @@ def resolve_events_with_store( pl = resolved_state.get((EventTypes.PowerLevels, ""), None) leftover_events = yield _mainline_sort( - room_id, leftover_events, pl, event_map, state_res_store + clock, room_id, leftover_events, pl, event_map, state_res_store ) logger.debug("resolving remaining events") @@ -317,12 +323,13 @@ def _add_event_and_auth_chain_to_graph( @defer.inlineCallbacks def _reverse_topological_power_sort( - room_id, event_ids, event_map, state_res_store, auth_diff + clock, room_id, event_ids, event_map, state_res_store, auth_diff ): """Returns a list of the event_ids sorted by reverse topological ordering, and then by power level and origin_server_ts Args: + clock (Clock) room_id (str): the room we are working in event_ids (list[str]): The events to sort event_map (dict[str,FrozenEvent]) @@ -334,18 +341,28 @@ def _reverse_topological_power_sort( """ graph = {} - for event_id in event_ids: + for idx, event_id in enumerate(event_ids, start=1): yield _add_event_and_auth_chain_to_graph( graph, room_id, event_id, event_map, state_res_store, auth_diff ) + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + event_to_pl = {} - for event_id in graph: + for idx, event_id in enumerate(graph, start=1): pl = yield _get_power_level_for_sender( room_id, event_id, event_map, state_res_store ) event_to_pl[event_id] = pl + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + def _get_power_order(event_id): ev = event_map[event_id] pl = event_to_pl[event_id] @@ -423,12 +440,13 @@ def _iterative_auth_checks( @defer.inlineCallbacks def _mainline_sort( - room_id, event_ids, resolved_power_event_id, event_map, state_res_store + clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store ): """Returns a sorted list of event_ids sorted by mainline ordering based on the given event resolved_power_event_id Args: + clock (Clock) room_id (str): room we're working in event_ids (list[str]): Events to sort resolved_power_event_id (str): The final resolved power level event ID @@ -438,8 +456,14 @@ def _mainline_sort( Returns: Deferred[list[str]]: The sorted list """ + if not event_ids: + # It's possible for there to be no event IDs here to sort, so we can + # skip calculating the mainline in that case. + return [] + mainline = [] pl = resolved_power_event_id + idx = 0 while pl: mainline.append(pl) pl_ev = yield _get_event(room_id, pl, event_map, state_res_store) @@ -453,17 +477,29 @@ def _mainline_sort( pl = aid break + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + + idx += 1 + mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))} event_ids = list(event_ids) order_map = {} - for ev_id in event_ids: + for idx, ev_id in enumerate(event_ids, start=1): depth = yield _get_mainline_depth_for_event( event_map[ev_id], mainline_map, event_map, state_res_store ) order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id) + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + event_ids.sort(key=lambda ev_id: order_map[ev_id]) return event_ids |