From 49af4020190eae6b0c65897d96cd2be286364d2b Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 9 Jul 2018 16:09:20 +1000 Subject: run isort --- synapse/events/snapshot.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 8e684d91b5..bcd9bb5946 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - from frozendict import frozendict +from twisted.internet import defer + class EventContext(object): """ -- cgit 1.4.1 From 3132b89f12f0386558045683ad198f090b0e2c90 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 21 Jul 2018 15:47:18 +1000 Subject: Make the rest of the .iterwhatever go away (#3562) --- changelog.d/3562.misc | 0 synapse/app/homeserver.py | 6 ++++-- synapse/app/synctl.py | 4 +++- synapse/events/snapshot.py | 4 +++- synapse/handlers/federation.py | 18 +++++++++--------- synapse/state.py | 6 +++--- synapse/visibility.py | 19 ++++++++++--------- tests/test_federation.py | 3 +-- 8 files changed, 33 insertions(+), 27 deletions(-) create mode 100644 changelog.d/3562.misc (limited to 'synapse/events/snapshot.py') diff --git a/changelog.d/3562.misc b/changelog.d/3562.misc new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 14e6dca522..2ad1beb8d8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ import logging import os import sys +from six import iteritems + from twisted.application import service from twisted.internet import defer, reactor from twisted.web.resource import EncodingResourceWrapper, NoResource @@ -442,7 +444,7 @@ def run(hs): stats["total_nonbridged_users"] = total_nonbridged_users daily_user_type_results = yield hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.iteritems(): + for name, count in iteritems(daily_user_type_results): stats["daily_user_type_" + name] = count room_count = yield hs.get_datastore().get_room_count() @@ -453,7 +455,7 @@ def run(hs): stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() - for name, count in r30_results.iteritems(): + for name, count in iteritems(r30_results): stats["r30_users_" + name] = count daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 68acc15a9a..d658f967ba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -25,6 +25,8 @@ import subprocess import sys import time +from six import iteritems + import yaml SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] @@ -173,7 +175,7 @@ def main(): os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) cache_factors = config.get("synctl_cache_factors", {}) - for cache_name, factor in cache_factors.iteritems(): + for cache_name, factor in iteritems(cache_factors): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) worker_configfiles = [] diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb5946..f83a1581a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + from frozendict import frozendict from twisted.internet import defer @@ -159,7 +161,7 @@ def _encode_state_dict(state_dict): return [ (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() + for (etype, state_key), v in iteritems(state_dict) ] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 65f6041b10..a6d391c4e8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,8 +21,8 @@ import logging import sys import six -from six import iteritems -from six.moves import http_client +from six import iteritems, itervalues +from six.moves import http_client, zip from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json @@ -731,7 +731,7 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.iteritems() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -748,7 +748,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.iteritems(), key=lambda d: d[1]) + return sorted(joined_domains.items(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -811,7 +811,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.iterkeys()) + event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -827,15 +827,15 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.itervalues() for e_id in ids.itervalues()], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.iteritems() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.iteritems() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: @@ -1515,7 +1515,7 @@ class FederationHandler(BaseHandler): yield self.store.persist_events( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) diff --git a/synapse/state.py b/synapse/state.py index 15a593d41c..504caae2f7 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,7 +18,7 @@ import hashlib import logging from collections import namedtuple -from six import iteritems, itervalues +from six import iteritems, iterkeys, itervalues from frozendict import frozendict @@ -647,7 +647,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): for event_id in event_ids ) if event_map is not None: - needed_events -= set(event_map.iterkeys()) + needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d conflicted events", len(needed_events)) @@ -668,7 +668,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(event_map.iterkeys()) + new_needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d auth events", len(new_needed_events)) diff --git a/synapse/visibility.py b/synapse/visibility.py index 9b97ea2b83..ba0499a022 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,11 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging import operator -import six +from six import iteritems, itervalues +from six.moves import map from twisted.internet import defer @@ -221,7 +222,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, return event # check each event: gives an iterable[None|EventBase] - filtered_events = itertools.imap(allowed, events) + filtered_events = map(allowed, events) # remove the None entries filtered_events = filter(operator.truth, filtered_events) @@ -261,7 +262,7 @@ def filter_events_for_server(store, server_name, events): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.itervalues(): + for ev in itervalues(state): if ev.type != EventTypes.Member: continue try: @@ -295,7 +296,7 @@ def filter_events_for_server(store, server_name, events): ) visibility_ids = set() - for sids in event_to_state_ids.itervalues(): + for sids in itervalues(event_to_state_ids): hist = sids.get((EventTypes.RoomHistoryVisibility, "")) if hist: visibility_ids.add(hist) @@ -308,7 +309,7 @@ def filter_events_for_server(store, server_name, events): event_map = yield store.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in event_map.itervalues() + for e in itervalues(event_map) ) if all_open: @@ -346,7 +347,7 @@ def filter_events_for_server(store, server_name, events): # state_key_to_event_id_set = { e - for key_to_eid in six.itervalues(event_to_state_ids) + for key_to_eid in itervalues(event_to_state_ids) for e in key_to_eid.items() } @@ -369,10 +370,10 @@ def filter_events_for_server(store, server_name, events): event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() + for key, inner_e_id in iteritems(key_to_eid) if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.iteritems() + for e_id, key_to_eid in iteritems(event_to_state_ids) } defer.returnValue([ diff --git a/tests/test_federation.py b/tests/test_federation.py index 159a136971..f40ff29b52 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -137,7 +137,6 @@ class MessageAcceptTests(unittest.TestCase): ) self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") - @unittest.DEBUG def test_cant_hide_past_history(self): """ If you send a message, you must be able to provide the direct @@ -178,7 +177,7 @@ class MessageAcceptTests(unittest.TestCase): for x, y in d.items() if x == ("m.room.member", "@us:test") ], - "auth_chain_ids": d.values(), + "auth_chain_ids": list(d.values()), } ) -- cgit 1.4.1 From acbfdc3442f706150c6312e534ca4ecec8548582 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:17:16 +0100 Subject: Refcator EventContext to accept state during init --- synapse/events/snapshot.py | 48 +++++++++++++---------- synapse/state.py | 97 ++++++++++++++++++++++++++-------------------- 2 files changed, 82 insertions(+), 63 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f83a1581a6..fbbe8dd490 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -60,22 +60,22 @@ class EventContext(object): "app_service", ] - def __init__(self): + def __init__(self, state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): # The current state including the current event - self.current_state_ids = None + self.current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = None - self.state_group = None - - self.rejected = False + self.prev_state_ids = prev_state_ids + self.state_group = state_group # A previously persisted state group and a delta between that # and this state. - self.prev_group = None - self.delta_ids = None + self.prev_group = prev_group + self.delta_ids = delta_ids - self.prev_state_events = None + self.prev_state_events = [] + self.rejected = False self.app_service = None def serialize(self, event): @@ -123,27 +123,33 @@ class EventContext(object): Returns: EventContext """ - context = EventContext() - context.state_group = input["state_group"] - context.rejected = input["rejected"] - context.prev_group = input["prev_group"] - context.delta_ids = _decode_state_dict(input["delta_ids"]) - context.prev_state_events = input["prev_state_events"] - # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. prev_state_id = input["prev_state_id"] event_type = input["event_type"] event_state_key = input["event_state_key"] - context.current_state_ids = yield store.get_state_ids_for_group( - context.state_group, + state_group = input["state_group"] + + current_state_ids = yield store.get_state_ids_for_group( + state_group, ) if prev_state_id and event_state_key: - context.prev_state_ids = dict(context.current_state_ids) - context.prev_state_ids[(event_type, event_state_key)] = prev_state_id + prev_state_ids = dict(current_state_ids) + prev_state_ids[(event_type, event_state_key)] = prev_state_id else: - context.prev_state_ids = context.current_state_ids + prev_state_ids = current_state_ids + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=input["prev_group"], + delta_ids = _decode_state_dict(input["delta_ids"]), + ) + + context.rejected = input["rejected"] + context.prev_state_events = input["prev_state_events"] app_service_id = input["app_service_id"] if app_service_id: diff --git a/synapse/state.py b/synapse/state.py index 504caae2f7..a708695006 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -203,25 +203,27 @@ class StateHandler(object): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. - context = EventContext() if old_state: - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): - context.current_state_ids = dict(context.prev_state_ids) + current_state_ids = dict(prev_state_ids) key = (event.type, event.state_key) - context.current_state_ids[key] = event.event_id + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids else: - context.current_state_ids = {} - context.prev_state_ids = {} - context.prev_state_events = [] + current_state_ids = {} + prev_state_ids = {} # We don't store state for outliers, so we don't generate a state - # froup for it. - context.state_group = None + # group for it. + context = EventContext( + state_group=None, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + ) defer.returnValue(context) @@ -230,31 +232,35 @@ class StateHandler(object): # Let's just correctly fill out the context and create a # new state group for it. - context = EventContext() - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] if replaces != event.event_id: # Paranoia check event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=None, delta_ids=None, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, + ) + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, ) - context.prev_state_events = [] defer.returnValue(context) logger.debug("calling resolve_state_groups from compute_event_context") @@ -262,47 +268,47 @@ class StateHandler(object): event.room_id, [e for e, _ in event.prev_events], ) - curr_state = entry.state + prev_state_ids = entry.state + prev_group = None + delta_ids = None - context = EventContext() - context.prev_state_ids = curr_state if event.is_state(): # If this is a state event then we need to create a new state # group for the state after this event. key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id if entry.state_group: # If the state at the event has a state group assigned then # we can use that as the prev group - context.prev_group = entry.state_group - context.delta_ids = { + prev_group = entry.state_group + delta_ids = { key: event.event_id } elif entry.prev_group: # If the state at the event only has a prev group, then we can # use that as a prev group too. - context.prev_group = entry.prev_group - context.delta_ids = dict(entry.delta_ids) - context.delta_ids[key] = event.event_id + prev_group = entry.prev_group + delta_ids = dict(entry.delta_ids) + delta_ids[key] = event.event_id - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + current_state_ids=current_state_ids, ) else: - context.current_state_ids = context.prev_state_ids - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids + current_state_ids = prev_state_ids + prev_group = entry.prev_group + delta_ids = entry.delta_ids if entry.state_group is None: entry.state_group = yield self.store.store_state_group( @@ -310,13 +316,20 @@ class StateHandler(object): event.room_id, prev_group=entry.prev_group, delta_ids=entry.delta_ids, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, ) entry.state_id = entry.state_group - context.state_group = entry.state_group + state_group = entry.state_group + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + ) - context.prev_state_events = [] defer.returnValue(context) @defer.inlineCallbacks -- cgit 1.4.1 From 842cdece42e59a4181a496761447f0cb00053c05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:31:35 +0100 Subject: pep8 --- synapse/events/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index fbbe8dd490..5e02ef1a5c 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -145,7 +145,7 @@ class EventContext(object): current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, prev_group=input["prev_group"], - delta_ids = _decode_state_dict(input["delta_ids"]), + delta_ids=_decode_state_dict(input["delta_ids"]), ) context.rejected = input["rejected"] -- cgit 1.4.1 From 440b8845b531db05e7c4f646e48dee7635cf1f0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:38:46 +0100 Subject: Make EventContext lazy load state --- synapse/events/snapshot.py | 153 +++++++++++++++++++++++++++++++++------------ synapse/state.py | 6 +- 2 files changed, 115 insertions(+), 44 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 5e02ef1a5c..f9568638a1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,18 +19,12 @@ from frozendict import frozendict from twisted.internet import defer +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + class EventContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - state_group (int|None): state group id, if the state has been stored as a state group. This is usually only None if e.g. the event is an outlier. @@ -47,36 +41,71 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + + _current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _prev_state_id (str|None): If set then the event associated with the + context overrode the _prev_state_id + + _event_type (str): The type of the event the context is associated with + + _event_state_key (str|None): The state_key of the event the context is + associated with """ __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", "prev_group", "delta_ids", "prev_state_events", "app_service", + "_current_state_ids", + "_prev_state_ids", + "_prev_state_id", + "_event_type", + "_event_state_key", + "_fetching_state_deferred", ] - def __init__(self, state_group, current_state_ids, prev_state_ids, - prev_group=None, delta_ids=None): + @staticmethod + def with_state(state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): + context = EventContext() + # The current state including the current event - self.current_state_ids = current_state_ids + context._current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = prev_state_ids - self.state_group = state_group + context._prev_state_ids = prev_state_ids + context.state_group = state_group + + context._prev_state_id = None + context._event_type = None + context._event_state_key = None + context._fetching_state_deferred = defer.succeed(None) # A previously persisted state group and a delta between that # and this state. - self.prev_group = prev_group - self.delta_ids = delta_ids + context.prev_group = prev_group + context.delta_ids = delta_ids + + context.prev_state_events = [] - self.prev_state_events = [] + context.rejected = False + context.app_service = None - self.rejected = False - self.app_service = None + return context def serialize(self, event): """Converts self to a type that can be serialized as JSON, and then @@ -123,30 +152,17 @@ class EventContext(object): Returns: EventContext """ + context = EventContext() + # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. - prev_state_id = input["prev_state_id"] - event_type = input["event_type"] - event_state_key = input["event_state_key"] + context._prev_state_id = input["prev_state_id"] + context._event_type = input["event_type"] + context._event_state_key = input["event_state_key"] - state_group = input["state_group"] - - current_state_ids = yield store.get_state_ids_for_group( - state_group, - ) - if prev_state_id and event_state_key: - prev_state_ids = dict(current_state_ids) - prev_state_ids[(event_type, event_state_key)] = prev_state_id - else: - prev_state_ids = current_state_ids - - context = EventContext( - state_group=state_group, - current_state_ids=current_state_ids, - prev_state_ids=prev_state_ids, - prev_group=input["prev_group"], - delta_ids=_decode_state_dict(input["delta_ids"]), - ) + context.state_group = input["state_group"] + context.prev_group = input["prev_group"] + context.delta_ids = _decode_state_dict(input["delta_ids"]) context.rejected = input["rejected"] context.prev_state_events = input["prev_state_events"] @@ -157,6 +173,61 @@ class EventContext(object): defer.returnValue(context) + @defer.inlineCallbacks + def get_current_state_ids(self, store): + """Gets the current state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._current_state_ids) + + @defer.inlineCallbacks + def get_prev_state_ids(self, store): + """Gets the prev state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._prev_state_ids) + + @defer.inlineCallbacks + def _fill_out_state(self, store): + """Called to populate the _current_state_ids and _prev_state_ids + attributes by loading from the database. + """ + if self.state_group is None: + return + + self._current_state_ids = yield store.get_state_ids_for_group( + self.state_group, + ) + if self._prev_state_id and self._event_state_key is not None: + self._prev_state_ids = dict(self._current_state_ids) + + key = (self._event_type, self._event_state_key) + self._prev_state_ids[key] = self._prev_state_id + else: + self._prev_state_ids = self._current_state_ids + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/state.py b/synapse/state.py index a708695006..32125c95df 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -219,7 +219,7 @@ class StateHandler(object): # We don't store state for outliers, so we don't generate a state # group for it. - context = EventContext( + context = EventContext.with_state( state_group=None, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, @@ -255,7 +255,7 @@ class StateHandler(object): current_state_ids=current_state_ids, ) - context = EventContext( + context = EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, @@ -322,7 +322,7 @@ class StateHandler(object): state_group = entry.state_group - context = EventContext( + context = EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, -- cgit 1.4.1 From 027bc01a1bc254fe08140c6e91a9fb945b08486f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:02:09 +0100 Subject: Add support for updating state --- synapse/events/snapshot.py | 19 +++++++++++++++++++ synapse/handlers/federation.py | 32 +++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 9 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f9568638a1..b090751bf1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -228,6 +228,25 @@ class EventContext(object): else: self._prev_state_ids = self._current_state_ids + @defer.inlineCallbacks + def update_state(self, state_group, prev_state_ids, current_state_ids, + delta_ids): + """Replace the state in the context + """ + + # We need to make sure we wait for any ongoing fetching of state + # to complete so that the updated state doesn't get clobbered + if self._fetching_state_deferred: + yield make_deferred_yieldable(self._fetching_state_deferred) + + self.state_group = state_group + self._prev_state_ids = prev_state_ids + self._current_state_ids = current_state_ids + self.delta_ids = delta_ids + + # We need to ensure that that we've marked as having fetched the state + self._fetching_state_deferred = defer.succeed(None) + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 98dd4a7fd1..14654d59f1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1975,21 +1975,35 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update(state_updates) + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids.update(state_updates) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ + delta_ids = dict(context.delta_ids) + delta_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ k: a.event_id for k, a in iteritems(auth_events) }) - context.state_group = yield self.store.store_state_group( + + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + delta_ids=delta_ids, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + delta_ids=delta_ids, ) @defer.inlineCallbacks -- cgit 1.4.1 From 4797ed000e612b68c418ce8c342bdd7ecc16b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:05:40 +0100 Subject: Update docstrings to make sense --- synapse/events/snapshot.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index b090751bf1..a6d7bf5700 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -55,13 +55,16 @@ class EventContext(object): _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have been calculated. None if we haven't started calculating yet - _prev_state_id (str|None): If set then the event associated with the - context overrode the _prev_state_id - - _event_type (str): The type of the event the context is associated with + _event_type (str): The type of the event the context is associated with. + Only set when state has not been fetched yet. _event_state_key (str|None): The state_key of the event the context is - associated with + associated with. Only set when state has not been fetched yet. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + Only set when state has not been fetched yet. """ __slots__ = [ -- cgit 1.4.1 From 999bcf9d016fb7fd9ad5a9daf4f0ec6d25a10717 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:24:21 +0100 Subject: Fix EventContext when using workers We were: 1. Not correctly setting all attributes 2. Using defer.inlineCallbacks in a non-generator --- synapse/events/snapshot.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a6d7bf5700..e31eceb921 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -82,6 +82,11 @@ class EventContext(object): "_fetching_state_deferred", ] + def __init__(self): + self.prev_state_events = [] + self.rejected = False + self.app_service = None + @staticmethod def with_state(state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None): @@ -103,11 +108,6 @@ class EventContext(object): context.prev_group = prev_group context.delta_ids = delta_ids - context.prev_state_events = [] - - context.rejected = False - context.app_service = None - return context def serialize(self, event): @@ -143,7 +143,6 @@ class EventContext(object): } @staticmethod - @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a EventContext. @@ -162,6 +161,7 @@ class EventContext(object): context._prev_state_id = input["prev_state_id"] context._event_type = input["event_type"] context._event_state_key = input["event_state_key"] + context._fetching_state_deferred = None context.state_group = input["state_group"] context.prev_group = input["prev_group"] @@ -174,7 +174,7 @@ class EventContext(object): if app_service_id: context.app_service = store.get_app_service_by_id(app_service_id) - defer.returnValue(context) + return context @defer.inlineCallbacks def get_current_state_ids(self, store): -- cgit 1.4.1 From 0faa3223cdf996aa18376a7420a43061a6691638 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 16:28:00 +0100 Subject: Fix missing attributes on workers. This was missed during the transition from attribute to getter for getting state from context. --- synapse/events/snapshot.py | 10 ++++++---- synapse/handlers/message.py | 5 +++-- synapse/replication/http/send_event.py | 7 +++++-- 3 files changed, 14 insertions(+), 8 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e31eceb921..a59064b416 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -110,7 +110,8 @@ class EventContext(object): return context - def serialize(self, event): + @defer.inlineCallbacks + def serialize(self, event, store): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -126,11 +127,12 @@ class EventContext(object): # the prev_state_ids, so if we're a state event we include the event # id that we replaced in the state. if event.is_state(): - prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) else: prev_state_id = None - return { + defer.returnValue({ "prev_state_id": prev_state_id, "event_type": event.type, "event_state_key": event.state_key if event.is_state() else None, @@ -140,7 +142,7 @@ class EventContext(object): "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None - } + }) @staticmethod def deserialize(store, input): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c4bcd9018b..7571975c22 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -807,8 +807,9 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( - self.hs.get_clock(), - self.http_client, + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, requester=requester, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 2eede54792..5227bc333d 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -34,12 +34,13 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(clock, client, host, port, requester, event, context, +def send_event_to_master(clock, store, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: clock (synapse.util.Clock) + store (DataStore) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context, host, port, event.event_id, ) + serialized_context = yield context.serialize(event, store) + payload = { "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(event), + "context": serialized_context, "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], -- cgit 1.4.1 From 50c60e5fadbefff6785c17dda9eecf88286dba30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 17:21:40 +0100 Subject: Only get cached state from context in persist_event We don't want to bother pulling out the current state from the DB since until we know we have to. Checking the context for state is just an optimisation. --- synapse/events/snapshot.py | 13 +++++++++++++ synapse/storage/events.py | 4 +++- 2 files changed, 16 insertions(+), 1 deletion(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a59064b416..c439b53801 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -163,6 +163,9 @@ class EventContext(object): context._prev_state_id = input["prev_state_id"] context._event_type = input["event_type"] context._event_state_key = input["event_state_key"] + + context._current_state_ids = None + context._prev_state_ids = None context._fetching_state_deferred = None context.state_group = input["state_group"] @@ -214,6 +217,16 @@ class EventContext(object): defer.returnValue(self._prev_state_ids) + def get_cached_current_state_ids(self): + """Gets the current state IDs if we have them already cached. + + Returns: + dict[(str, str), str]|None: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + return self._current_state_ids + @defer.inlineCallbacks def _fill_out_state(self, store): """Called to populate the _current_state_ids and _prev_state_ids diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bf4f3ee92a..dc0b3c2eba 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -549,7 +549,9 @@ class EventsStore(EventsWorkerStore): if ctx.state_group in state_groups_map: continue - state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self) + current_state_ids = ctx.get_cached_current_state_ids() + if current_state_ids is not None: + state_groups_map[ctx.state_group] = current_state_ids # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can -- cgit 1.4.1 From 8b9f164fff6cf821ff5bc702f3660c0f0eb320e7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 17:43:01 +0100 Subject: Comments --- synapse/events/snapshot.py | 5 +++-- synapse/storage/events.py | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index c439b53801..189212b0fa 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -221,8 +221,9 @@ class EventContext(object): """Gets the current state IDs if we have them already cached. Returns: - dict[(str, str), str]|None: Returns None if state_group - is None, which happens when the associated event is an outlier. + dict[(str, str), str]|None: Returns None if we haven't cached the + state or if state_group is None, which happens when the associated + event is an outlier. """ return self._current_state_ids diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dc0b3c2eba..c2910094d0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -549,6 +549,9 @@ class EventsStore(EventsWorkerStore): if ctx.state_group in state_groups_map: continue + # We're only interested in pulling out state that has already + # been cached in the context. We'll pull stuff out of the DB later + # if necessary. current_state_ids = ctx.get_cached_current_state_ids() if current_state_ids is not None: state_groups_map[ctx.state_group] = current_state_ids -- cgit 1.4.1 From c1f80effbe17b1572161cc50838e60b495fb45a4 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 23 Jul 2018 22:06:50 +0100 Subject: Handle delta_ids being None in _update_context_for_auth_events it's easier to create the new state group as a delta from the existing one. (There's an outside chance this will help with https://github.com/matrix-org/synapse/issues/3364) --- synapse/events/snapshot.py | 3 ++- synapse/handlers/federation.py | 13 ++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/events/snapshot.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 189212b0fa..368b5f6ae4 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -249,7 +249,7 @@ class EventContext(object): @defer.inlineCallbacks def update_state(self, state_group, prev_state_ids, current_state_ids, - delta_ids): + prev_group, delta_ids): """Replace the state in the context """ @@ -260,6 +260,7 @@ class EventContext(object): self.state_group = state_group self._prev_state_ids = prev_state_ids + self.prev_group = prev_group self._current_state_ids = current_state_ids self.delta_ids = delta_ids diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14654d59f1..145c1a21d4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1980,10 +1980,6 @@ class FederationHandler(BaseHandler): current_state_ids.update(state_updates) - if context.delta_ids is not None: - delta_ids = dict(context.delta_ids) - delta_ids.update(state_updates) - prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = dict(prev_state_ids) @@ -1991,11 +1987,13 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) }) + # create a new state group as a delta from the existing one. + prev_group = context.state_group state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, current_state_ids=current_state_ids, ) @@ -2003,7 +2001,8 @@ class FederationHandler(BaseHandler): state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, - delta_ids=delta_ids, + prev_group=prev_group, + delta_ids=state_updates, ) @defer.inlineCallbacks -- cgit 1.4.1