summary refs log tree commit diff
path: root/synapse/state
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/state')
-rw-r--r--synapse/state/__init__.py28
-rw-r--r--synapse/state/v1.py26
-rw-r--r--synapse/state/v2.py77
3 files changed, 87 insertions, 44 deletions
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py

index 2fa529fcd0..495d9f04c8 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py
@@ -18,8 +18,6 @@ import logging from collections import namedtuple from typing import Dict, Iterable, List, Optional, Set -from six import iteritems, itervalues - import attr from frozendict import frozendict from prometheus_client import Histogram @@ -34,6 +32,7 @@ from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour from synapse.types import StateMap +from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func @@ -144,7 +143,7 @@ class StateHandler(object): list(state.values()), get_prev_content=False ) state = { - key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map + key: state_map[e_id] for key, e_id in state.items() if e_id in state_map } return state @@ -416,6 +415,7 @@ class StateHandler(object): with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + self.clock, event.room_id, room_version, state_set_ids, @@ -423,7 +423,7 @@ class StateHandler(object): state_res_store=StateResolutionStore(self.store), ) - new_state = {key: state_map[ev_id] for key, ev_id in iteritems(new_state)} + new_state = {key: state_map[ev_id] for key, ev_id in new_state.items()} return new_state @@ -505,8 +505,8 @@ class StateResolutionHandler(object): # resolve_events_with_store do it? new_state = {} conflicted_state = False - for st in itervalues(state_groups_ids): - for key, e_id in iteritems(st): + for st in state_groups_ids.values(): + for key, e_id in st.items(): if key in new_state: conflicted_state = True break @@ -518,9 +518,10 @@ class StateResolutionHandler(object): logger.info("Resolving conflicted state for %r", room_id) with Measure(self.clock, "state._resolve_events"): new_state = yield resolve_events_with_store( + self.clock, room_id, room_version, - list(itervalues(state_groups_ids)), + list(state_groups_ids.values()), event_map=event_map, state_res_store=state_res_store, ) @@ -561,12 +562,12 @@ def _make_state_cache_entry(new_state, state_groups_ids): # not get persisted. # first look for exact matches - new_state_event_ids = set(itervalues(new_state)) - for sg, state in iteritems(state_groups_ids): + new_state_event_ids = set(new_state.values()) + for sg, state in state_groups_ids.items(): if len(new_state_event_ids) != len(state): continue - old_state_event_ids = set(itervalues(state)) + old_state_event_ids = set(state.values()) if new_state_event_ids == old_state_event_ids: # got an exact match. return _StateCacheEntry(state=new_state, state_group=sg) @@ -579,8 +580,8 @@ def _make_state_cache_entry(new_state, state_groups_ids): prev_group = None delta_ids = None - for old_group, old_state in iteritems(state_groups_ids): - n_delta_ids = {k: v for k, v in iteritems(new_state) if old_state.get(k) != v} + for old_group, old_state in state_groups_ids.items(): + n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v} if not delta_ids or len(n_delta_ids) < len(delta_ids): prev_group = old_group delta_ids = n_delta_ids @@ -591,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids): def resolve_events_with_store( + clock: Clock, room_id: str, room_version: str, state_sets: List[StateMap[str]], @@ -627,7 +629,7 @@ def resolve_events_with_store( ) else: return v2.resolve_events_with_store( - room_id, room_version, state_sets, event_map, state_res_store + clock, room_id, room_version, state_sets, event_map, state_res_store ) diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 9bf98d06f2..7b531a8337 100644 --- a/synapse/state/v1.py +++ b/synapse/state/v1.py
@@ -17,8 +17,6 @@ import hashlib import logging from typing import Callable, Dict, List, Optional -from six import iteritems, iterkeys, itervalues - from twisted.internet import defer from synapse import event_auth @@ -70,11 +68,11 @@ def resolve_events_with_store( unconflicted_state, conflicted_state = _seperate(state_sets) needed_events = { - event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids + event_id for event_ids in conflicted_state.values() for event_id in event_ids } needed_event_count = len(needed_events) if event_map is not None: - needed_events -= set(iterkeys(event_map)) + needed_events -= set(event_map.keys()) logger.info( "Asking for %d/%d conflicted events", len(needed_events), needed_event_count @@ -102,11 +100,11 @@ def resolve_events_with_store( unconflicted_state, conflicted_state, state_map ) - new_needed_events = set(itervalues(auth_events)) + new_needed_events = set(auth_events.values()) new_needed_event_count = len(new_needed_events) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(iterkeys(event_map)) + new_needed_events -= set(event_map.keys()) logger.info( "Asking for %d/%d auth events", len(new_needed_events), new_needed_event_count @@ -152,7 +150,7 @@ def _seperate(state_sets): conflicted_state = {} for state_set in state_set_iterator: - for key, value in iteritems(state_set): + for key, value in state_set.items(): # Check if there is an unconflicted entry for the state key. unconflicted_value = unconflicted_state.get(key) if unconflicted_value is None: @@ -178,7 +176,7 @@ def _seperate(state_sets): def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map): auth_events = {} - for event_ids in itervalues(conflicted_state): + for event_ids in conflicted_state.values(): for event_id in event_ids: if event_id in state_map: keys = event_auth.auth_types_for_event(state_map[event_id]) @@ -194,7 +192,7 @@ def _resolve_with_state( unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map ): conflicted_state = {} - for key, event_ids in iteritems(conflicted_state_ids): + for key, event_ids in conflicted_state_ids.items(): events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map] if len(events) > 1: conflicted_state[key] = events @@ -203,7 +201,7 @@ def _resolve_with_state( auth_events = { key: state_map[ev_id] - for key, ev_id in iteritems(auth_event_ids) + for key, ev_id in auth_event_ids.items() if ev_id in state_map } @@ -214,7 +212,7 @@ def _resolve_with_state( raise new_state = unconflicted_state_ids - for key, event in iteritems(resolved_state): + for key, event in resolved_state.items(): new_state[key] = event.event_id return new_state @@ -238,21 +236,21 @@ def _resolve_state_events(conflicted_state, auth_events): auth_events.update(resolved_state) - for key, events in iteritems(conflicted_state): + for key, events in conflicted_state.items(): if key[0] == EventTypes.JoinRules: logger.debug("Resolving conflicted join rules %r", events) resolved_state[key] = _resolve_auth_events(events, auth_events) auth_events.update(resolved_state) - for key, events in iteritems(conflicted_state): + for key, events in conflicted_state.items(): if key[0] == EventTypes.Member: logger.debug("Resolving conflicted member lists %r", events) resolved_state[key] = _resolve_auth_events(events, auth_events) auth_events.update(resolved_state) - for key, events in iteritems(conflicted_state): + for key, events in conflicted_state.items(): if key not in resolved_state: logger.debug("Resolving conflicted state %r:%r", key, events) resolved_state[key] = _resolve_normal_events(events, auth_events) diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 18484e2fa6..bf6caa0946 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py
@@ -18,8 +18,6 @@ import itertools import logging from typing import Dict, List, Optional -from six import iteritems, itervalues - from twisted.internet import defer import synapse.state @@ -29,12 +27,20 @@ from synapse.api.errors import AuthError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.types import StateMap +from synapse.util import Clock logger = logging.getLogger(__name__) +# We want to yield to the reactor occasionally during state res when dealing +# with large data sets, so that we don't exhaust the reactor. This is done by +# yielding to reactor during loops every N iterations. +_YIELD_AFTER_ITERATIONS = 100 + + @defer.inlineCallbacks def resolve_events_with_store( + clock: Clock, room_id: str, room_version: str, state_sets: List[StateMap[str]], @@ -44,13 +50,11 @@ def resolve_events_with_store( """Resolves the state using the v2 state resolution algorithm Args: + clock room_id: the room we are working in - room_version: The room version - state_sets: List of dicts of (type, state_key) -> event_id, which are the different state groups to resolve. - event_map: a dict from event_id to event, for any events that we happen to have in flight (eg, those currently being persisted). This will be @@ -87,7 +91,7 @@ def resolve_events_with_store( full_conflicted_set = set( itertools.chain( - itertools.chain.from_iterable(itervalues(conflicted_state)), auth_diff + itertools.chain.from_iterable(conflicted_state.values()), auth_diff ) ) @@ -115,13 +119,14 @@ def resolve_events_with_store( ) sorted_power_events = yield _reverse_topological_power_sort( - room_id, power_events, event_map, state_res_store, full_conflicted_set + clock, room_id, power_events, event_map, state_res_store, full_conflicted_set ) logger.debug("sorted %d power events", len(sorted_power_events)) # Now sequentially auth each one resolved_state = yield _iterative_auth_checks( + clock, room_id, room_version, sorted_power_events, @@ -135,20 +140,22 @@ def resolve_events_with_store( # OK, so we've now resolved the power events. Now sort the remaining # events using the mainline of the resolved power level. + set_power_events = set(sorted_power_events) leftover_events = [ - ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events + ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events ] logger.debug("sorting %d remaining events", len(leftover_events)) pl = resolved_state.get((EventTypes.PowerLevels, ""), None) leftover_events = yield _mainline_sort( - room_id, leftover_events, pl, event_map, state_res_store + clock, room_id, leftover_events, pl, event_map, state_res_store ) logger.debug("resolving remaining events") resolved_state = yield _iterative_auth_checks( + clock, room_id, room_version, leftover_events, @@ -318,12 +325,13 @@ def _add_event_and_auth_chain_to_graph( @defer.inlineCallbacks def _reverse_topological_power_sort( - room_id, event_ids, event_map, state_res_store, auth_diff + clock, room_id, event_ids, event_map, state_res_store, auth_diff ): """Returns a list of the event_ids sorted by reverse topological ordering, and then by power level and origin_server_ts Args: + clock (Clock) room_id (str): the room we are working in event_ids (list[str]): The events to sort event_map (dict[str,FrozenEvent]) @@ -335,18 +343,28 @@ def _reverse_topological_power_sort( """ graph = {} - for event_id in event_ids: + for idx, event_id in enumerate(event_ids, start=1): yield _add_event_and_auth_chain_to_graph( graph, room_id, event_id, event_map, state_res_store, auth_diff ) + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + event_to_pl = {} - for event_id in graph: + for idx, event_id in enumerate(graph, start=1): pl = yield _get_power_level_for_sender( room_id, event_id, event_map, state_res_store ) event_to_pl[event_id] = pl + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + def _get_power_order(event_id): ev = event_map[event_id] pl = event_to_pl[event_id] @@ -362,12 +380,13 @@ def _reverse_topological_power_sort( @defer.inlineCallbacks def _iterative_auth_checks( - room_id, room_version, event_ids, base_state, event_map, state_res_store + clock, room_id, room_version, event_ids, base_state, event_map, state_res_store ): """Sequentially apply auth checks to each event in given list, updating the state as it goes along. Args: + clock (Clock) room_id (str) room_version (str) event_ids (list[str]): Ordered list of events to apply auth checks to @@ -381,7 +400,7 @@ def _iterative_auth_checks( resolved_state = base_state.copy() room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - for event_id in event_ids: + for idx, event_id in enumerate(event_ids, start=1): event = event_map[event_id] auth_events = {} @@ -419,17 +438,23 @@ def _iterative_auth_checks( except AuthError: pass + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + return resolved_state @defer.inlineCallbacks def _mainline_sort( - room_id, event_ids, resolved_power_event_id, event_map, state_res_store + clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store ): """Returns a sorted list of event_ids sorted by mainline ordering based on the given event resolved_power_event_id Args: + clock (Clock) room_id (str): room we're working in event_ids (list[str]): Events to sort resolved_power_event_id (str): The final resolved power level event ID @@ -439,8 +464,14 @@ def _mainline_sort( Returns: Deferred[list[str]]: The sorted list """ + if not event_ids: + # It's possible for there to be no event IDs here to sort, so we can + # skip calculating the mainline in that case. + return [] + mainline = [] pl = resolved_power_event_id + idx = 0 while pl: mainline.append(pl) pl_ev = yield _get_event(room_id, pl, event_map, state_res_store) @@ -454,17 +485,29 @@ def _mainline_sort( pl = aid break + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + + idx += 1 + mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))} event_ids = list(event_ids) order_map = {} - for ev_id in event_ids: + for idx, ev_id in enumerate(event_ids, start=1): depth = yield _get_mainline_depth_for_event( event_map[ev_id], mainline_map, event_map, state_res_store ) order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id) + # We yield occasionally when we're working with large data sets to + # ensure that we don't block the reactor loop for too long. + if idx % _YIELD_AFTER_ITERATIONS == 0: + yield clock.sleep(0) + event_ids.sort(key=lambda ev_id: order_map[ev_id]) return event_ids @@ -572,7 +615,7 @@ def lexicographical_topological_sort(graph, key): # `(key(node), node)` so that sorting does the right thing zero_outdegree = [] - for node, edges in iteritems(graph): + for node, edges in graph.items(): if len(edges) == 0: zero_outdegree.append((key(node), node))