diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 2fa529fcd0..495d9f04c8 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -18,8 +18,6 @@ import logging
from collections import namedtuple
from typing import Dict, Iterable, List, Optional, Set
-from six import iteritems, itervalues
-
import attr
from frozendict import frozendict
from prometheus_client import Histogram
@@ -34,6 +32,7 @@ from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import StateMap
+from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
@@ -144,7 +143,7 @@ class StateHandler(object):
list(state.values()), get_prev_content=False
)
state = {
- key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map
+ key: state_map[e_id] for key, e_id in state.items() if e_id in state_map
}
return state
@@ -416,6 +415,7 @@ class StateHandler(object):
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
+ self.clock,
event.room_id,
room_version,
state_set_ids,
@@ -423,7 +423,7 @@ class StateHandler(object):
state_res_store=StateResolutionStore(self.store),
)
- new_state = {key: state_map[ev_id] for key, ev_id in iteritems(new_state)}
+ new_state = {key: state_map[ev_id] for key, ev_id in new_state.items()}
return new_state
@@ -505,8 +505,8 @@ class StateResolutionHandler(object):
# resolve_events_with_store do it?
new_state = {}
conflicted_state = False
- for st in itervalues(state_groups_ids):
- for key, e_id in iteritems(st):
+ for st in state_groups_ids.values():
+ for key, e_id in st.items():
if key in new_state:
conflicted_state = True
break
@@ -518,9 +518,10 @@ class StateResolutionHandler(object):
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
+ self.clock,
room_id,
room_version,
- list(itervalues(state_groups_ids)),
+ list(state_groups_ids.values()),
event_map=event_map,
state_res_store=state_res_store,
)
@@ -561,12 +562,12 @@ def _make_state_cache_entry(new_state, state_groups_ids):
# not get persisted.
# first look for exact matches
- new_state_event_ids = set(itervalues(new_state))
- for sg, state in iteritems(state_groups_ids):
+ new_state_event_ids = set(new_state.values())
+ for sg, state in state_groups_ids.items():
if len(new_state_event_ids) != len(state):
continue
- old_state_event_ids = set(itervalues(state))
+ old_state_event_ids = set(state.values())
if new_state_event_ids == old_state_event_ids:
# got an exact match.
return _StateCacheEntry(state=new_state, state_group=sg)
@@ -579,8 +580,8 @@ def _make_state_cache_entry(new_state, state_groups_ids):
prev_group = None
delta_ids = None
- for old_group, old_state in iteritems(state_groups_ids):
- n_delta_ids = {k: v for k, v in iteritems(new_state) if old_state.get(k) != v}
+ for old_group, old_state in state_groups_ids.items():
+ n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v}
if not delta_ids or len(n_delta_ids) < len(delta_ids):
prev_group = old_group
delta_ids = n_delta_ids
@@ -591,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids):
def resolve_events_with_store(
+ clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
@@ -627,7 +629,7 @@ def resolve_events_with_store(
)
else:
return v2.resolve_events_with_store(
- room_id, room_version, state_sets, event_map, state_res_store
+ clock, room_id, room_version, state_sets, event_map, state_res_store
)
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 9bf98d06f2..7b531a8337 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -17,8 +17,6 @@ import hashlib
import logging
from typing import Callable, Dict, List, Optional
-from six import iteritems, iterkeys, itervalues
-
from twisted.internet import defer
from synapse import event_auth
@@ -70,11 +68,11 @@ def resolve_events_with_store(
unconflicted_state, conflicted_state = _seperate(state_sets)
needed_events = {
- event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids
+ event_id for event_ids in conflicted_state.values() for event_id in event_ids
}
needed_event_count = len(needed_events)
if event_map is not None:
- needed_events -= set(iterkeys(event_map))
+ needed_events -= set(event_map.keys())
logger.info(
"Asking for %d/%d conflicted events", len(needed_events), needed_event_count
@@ -102,11 +100,11 @@ def resolve_events_with_store(
unconflicted_state, conflicted_state, state_map
)
- new_needed_events = set(itervalues(auth_events))
+ new_needed_events = set(auth_events.values())
new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
- new_needed_events -= set(iterkeys(event_map))
+ new_needed_events -= set(event_map.keys())
logger.info(
"Asking for %d/%d auth events", len(new_needed_events), new_needed_event_count
@@ -152,7 +150,7 @@ def _seperate(state_sets):
conflicted_state = {}
for state_set in state_set_iterator:
- for key, value in iteritems(state_set):
+ for key, value in state_set.items():
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
if unconflicted_value is None:
@@ -178,7 +176,7 @@ def _seperate(state_sets):
def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map):
auth_events = {}
- for event_ids in itervalues(conflicted_state):
+ for event_ids in conflicted_state.values():
for event_id in event_ids:
if event_id in state_map:
keys = event_auth.auth_types_for_event(state_map[event_id])
@@ -194,7 +192,7 @@ def _resolve_with_state(
unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map
):
conflicted_state = {}
- for key, event_ids in iteritems(conflicted_state_ids):
+ for key, event_ids in conflicted_state_ids.items():
events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
if len(events) > 1:
conflicted_state[key] = events
@@ -203,7 +201,7 @@ def _resolve_with_state(
auth_events = {
key: state_map[ev_id]
- for key, ev_id in iteritems(auth_event_ids)
+ for key, ev_id in auth_event_ids.items()
if ev_id in state_map
}
@@ -214,7 +212,7 @@ def _resolve_with_state(
raise
new_state = unconflicted_state_ids
- for key, event in iteritems(resolved_state):
+ for key, event in resolved_state.items():
new_state[key] = event.event_id
return new_state
@@ -238,21 +236,21 @@ def _resolve_state_events(conflicted_state, auth_events):
auth_events.update(resolved_state)
- for key, events in iteritems(conflicted_state):
+ for key, events in conflicted_state.items():
if key[0] == EventTypes.JoinRules:
logger.debug("Resolving conflicted join rules %r", events)
resolved_state[key] = _resolve_auth_events(events, auth_events)
auth_events.update(resolved_state)
- for key, events in iteritems(conflicted_state):
+ for key, events in conflicted_state.items():
if key[0] == EventTypes.Member:
logger.debug("Resolving conflicted member lists %r", events)
resolved_state[key] = _resolve_auth_events(events, auth_events)
auth_events.update(resolved_state)
- for key, events in iteritems(conflicted_state):
+ for key, events in conflicted_state.items():
if key not in resolved_state:
logger.debug("Resolving conflicted state %r:%r", key, events)
resolved_state[key] = _resolve_normal_events(events, auth_events)
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 18484e2fa6..bf6caa0946 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -18,8 +18,6 @@ import itertools
import logging
from typing import Dict, List, Optional
-from six import iteritems, itervalues
-
from twisted.internet import defer
import synapse.state
@@ -29,12 +27,20 @@ from synapse.api.errors import AuthError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.types import StateMap
+from synapse.util import Clock
logger = logging.getLogger(__name__)
+# We want to yield to the reactor occasionally during state res when dealing
+# with large data sets, so that we don't exhaust the reactor. This is done by
+# yielding to reactor during loops every N iterations.
+_YIELD_AFTER_ITERATIONS = 100
+
+
@defer.inlineCallbacks
def resolve_events_with_store(
+ clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
@@ -44,13 +50,11 @@ def resolve_events_with_store(
"""Resolves the state using the v2 state resolution algorithm
Args:
+ clock
room_id: the room we are working in
-
room_version: The room version
-
state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
-
event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
@@ -87,7 +91,7 @@ def resolve_events_with_store(
full_conflicted_set = set(
itertools.chain(
- itertools.chain.from_iterable(itervalues(conflicted_state)), auth_diff
+ itertools.chain.from_iterable(conflicted_state.values()), auth_diff
)
)
@@ -115,13 +119,14 @@ def resolve_events_with_store(
)
sorted_power_events = yield _reverse_topological_power_sort(
- room_id, power_events, event_map, state_res_store, full_conflicted_set
+ clock, room_id, power_events, event_map, state_res_store, full_conflicted_set
)
logger.debug("sorted %d power events", len(sorted_power_events))
# Now sequentially auth each one
resolved_state = yield _iterative_auth_checks(
+ clock,
room_id,
room_version,
sorted_power_events,
@@ -135,20 +140,22 @@ def resolve_events_with_store(
# OK, so we've now resolved the power events. Now sort the remaining
# events using the mainline of the resolved power level.
+ set_power_events = set(sorted_power_events)
leftover_events = [
- ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events
+ ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events
]
logger.debug("sorting %d remaining events", len(leftover_events))
pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
leftover_events = yield _mainline_sort(
- room_id, leftover_events, pl, event_map, state_res_store
+ clock, room_id, leftover_events, pl, event_map, state_res_store
)
logger.debug("resolving remaining events")
resolved_state = yield _iterative_auth_checks(
+ clock,
room_id,
room_version,
leftover_events,
@@ -318,12 +325,13 @@ def _add_event_and_auth_chain_to_graph(
@defer.inlineCallbacks
def _reverse_topological_power_sort(
- room_id, event_ids, event_map, state_res_store, auth_diff
+ clock, room_id, event_ids, event_map, state_res_store, auth_diff
):
"""Returns a list of the event_ids sorted by reverse topological ordering,
and then by power level and origin_server_ts
Args:
+ clock (Clock)
room_id (str): the room we are working in
event_ids (list[str]): The events to sort
event_map (dict[str,FrozenEvent])
@@ -335,18 +343,28 @@ def _reverse_topological_power_sort(
"""
graph = {}
- for event_id in event_ids:
+ for idx, event_id in enumerate(event_ids, start=1):
yield _add_event_and_auth_chain_to_graph(
graph, room_id, event_id, event_map, state_res_store, auth_diff
)
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
event_to_pl = {}
- for event_id in graph:
+ for idx, event_id in enumerate(graph, start=1):
pl = yield _get_power_level_for_sender(
room_id, event_id, event_map, state_res_store
)
event_to_pl[event_id] = pl
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
def _get_power_order(event_id):
ev = event_map[event_id]
pl = event_to_pl[event_id]
@@ -362,12 +380,13 @@ def _reverse_topological_power_sort(
@defer.inlineCallbacks
def _iterative_auth_checks(
- room_id, room_version, event_ids, base_state, event_map, state_res_store
+ clock, room_id, room_version, event_ids, base_state, event_map, state_res_store
):
"""Sequentially apply auth checks to each event in given list, updating the
state as it goes along.
Args:
+ clock (Clock)
room_id (str)
room_version (str)
event_ids (list[str]): Ordered list of events to apply auth checks to
@@ -381,7 +400,7 @@ def _iterative_auth_checks(
resolved_state = base_state.copy()
room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
- for event_id in event_ids:
+ for idx, event_id in enumerate(event_ids, start=1):
event = event_map[event_id]
auth_events = {}
@@ -419,17 +438,23 @@ def _iterative_auth_checks(
except AuthError:
pass
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
return resolved_state
@defer.inlineCallbacks
def _mainline_sort(
- room_id, event_ids, resolved_power_event_id, event_map, state_res_store
+ clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store
):
"""Returns a sorted list of event_ids sorted by mainline ordering based on
the given event resolved_power_event_id
Args:
+ clock (Clock)
room_id (str): room we're working in
event_ids (list[str]): Events to sort
resolved_power_event_id (str): The final resolved power level event ID
@@ -439,8 +464,14 @@ def _mainline_sort(
Returns:
Deferred[list[str]]: The sorted list
"""
+ if not event_ids:
+ # It's possible for there to be no event IDs here to sort, so we can
+ # skip calculating the mainline in that case.
+ return []
+
mainline = []
pl = resolved_power_event_id
+ idx = 0
while pl:
mainline.append(pl)
pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
@@ -454,17 +485,29 @@ def _mainline_sort(
pl = aid
break
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
+ idx += 1
+
mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
event_ids = list(event_ids)
order_map = {}
- for ev_id in event_ids:
+ for idx, ev_id in enumerate(event_ids, start=1):
depth = yield _get_mainline_depth_for_event(
event_map[ev_id], mainline_map, event_map, state_res_store
)
order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
+ # We yield occasionally when we're working with large data sets to
+ # ensure that we don't block the reactor loop for too long.
+ if idx % _YIELD_AFTER_ITERATIONS == 0:
+ yield clock.sleep(0)
+
event_ids.sort(key=lambda ev_id: order_map[ev_id])
return event_ids
@@ -572,7 +615,7 @@ def lexicographical_topological_sort(graph, key):
# `(key(node), node)` so that sorting does the right thing
zero_outdegree = []
- for node, edges in iteritems(graph):
+ for node, edges in graph.items():
if len(edges) == 0:
zero_outdegree.append((key(node), node))
|