From 3132b89f12f0386558045683ad198f090b0e2c90 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 21 Jul 2018 15:47:18 +1000 Subject: Make the rest of the .iterwhatever go away (#3562) --- synapse/events/snapshot.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb5946..f83a1581a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + from frozendict import frozendict from twisted.internet import defer @@ -159,7 +161,7 @@ def _encode_state_dict(state_dict): return [ (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() + for (etype, state_key), v in iteritems(state_dict) ] -- cgit 1.4.1 From acbfdc3442f706150c6312e534ca4ecec8548582 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:17:16 +0100 Subject: Refcator EventContext to accept state during init --- synapse/events/snapshot.py | 48 +++++++++++++---------- synapse/state.py | 97 ++++++++++++++++++++++++++-------------------- 2 files changed, 82 insertions(+), 63 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f83a1581a6..fbbe8dd490 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -60,22 +60,22 @@ class EventContext(object): "app_service", ] - def __init__(self): + def __init__(self, state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): # The current state including the current event - self.current_state_ids = None + self.current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = None - self.state_group = None - - self.rejected = False + self.prev_state_ids = prev_state_ids + self.state_group = state_group # A previously persisted state group and a delta between that # and this state. - self.prev_group = None - self.delta_ids = None + self.prev_group = prev_group + self.delta_ids = delta_ids - self.prev_state_events = None + self.prev_state_events = [] + self.rejected = False self.app_service = None def serialize(self, event): @@ -123,27 +123,33 @@ class EventContext(object): Returns: EventContext """ - context = EventContext() - context.state_group = input["state_group"] - context.rejected = input["rejected"] - context.prev_group = input["prev_group"] - context.delta_ids = _decode_state_dict(input["delta_ids"]) - context.prev_state_events = input["prev_state_events"] - # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. prev_state_id = input["prev_state_id"] event_type = input["event_type"] event_state_key = input["event_state_key"] - context.current_state_ids = yield store.get_state_ids_for_group( - context.state_group, + state_group = input["state_group"] + + current_state_ids = yield store.get_state_ids_for_group( + state_group, ) if prev_state_id and event_state_key: - context.prev_state_ids = dict(context.current_state_ids) - context.prev_state_ids[(event_type, event_state_key)] = prev_state_id + prev_state_ids = dict(current_state_ids) + prev_state_ids[(event_type, event_state_key)] = prev_state_id else: - context.prev_state_ids = context.current_state_ids + prev_state_ids = current_state_ids + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=input["prev_group"], + delta_ids = _decode_state_dict(input["delta_ids"]), + ) + + context.rejected = input["rejected"] + context.prev_state_events = input["prev_state_events"] app_service_id = input["app_service_id"] if app_service_id: diff --git a/synapse/state.py b/synapse/state.py index 504caae2f7..a708695006 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -203,25 +203,27 @@ class StateHandler(object): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. - context = EventContext() if old_state: - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): - context.current_state_ids = dict(context.prev_state_ids) + current_state_ids = dict(prev_state_ids) key = (event.type, event.state_key) - context.current_state_ids[key] = event.event_id + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids else: - context.current_state_ids = {} - context.prev_state_ids = {} - context.prev_state_events = [] + current_state_ids = {} + prev_state_ids = {} # We don't store state for outliers, so we don't generate a state - # froup for it. - context.state_group = None + # group for it. + context = EventContext( + state_group=None, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + ) defer.returnValue(context) @@ -230,31 +232,35 @@ class StateHandler(object): # Let's just correctly fill out the context and create a # new state group for it. - context = EventContext() - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] if replaces != event.event_id: # Paranoia check event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=None, delta_ids=None, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, + ) + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, ) - context.prev_state_events = [] defer.returnValue(context) logger.debug("calling resolve_state_groups from compute_event_context") @@ -262,47 +268,47 @@ class StateHandler(object): event.room_id, [e for e, _ in event.prev_events], ) - curr_state = entry.state + prev_state_ids = entry.state + prev_group = None + delta_ids = None - context = EventContext() - context.prev_state_ids = curr_state if event.is_state(): # If this is a state event then we need to create a new state # group for the state after this event. key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id if entry.state_group: # If the state at the event has a state group assigned then # we can use that as the prev group - context.prev_group = entry.state_group - context.delta_ids = { + prev_group = entry.state_group + delta_ids = { key: event.event_id } elif entry.prev_group: # If the state at the event only has a prev group, then we can # use that as a prev group too. - context.prev_group = entry.prev_group - context.delta_ids = dict(entry.delta_ids) - context.delta_ids[key] = event.event_id + prev_group = entry.prev_group + delta_ids = dict(entry.delta_ids) + delta_ids[key] = event.event_id - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + current_state_ids=current_state_ids, ) else: - context.current_state_ids = context.prev_state_ids - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids + current_state_ids = prev_state_ids + prev_group = entry.prev_group + delta_ids = entry.delta_ids if entry.state_group is None: entry.state_group = yield self.store.store_state_group( @@ -310,13 +316,20 @@ class StateHandler(object): event.room_id, prev_group=entry.prev_group, delta_ids=entry.delta_ids, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, ) entry.state_id = entry.state_group - context.state_group = entry.state_group + state_group = entry.state_group + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + ) - context.prev_state_events = [] defer.returnValue(context) @defer.inlineCallbacks -- cgit 1.4.1 From 842cdece42e59a4181a496761447f0cb00053c05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:31:35 +0100 Subject: pep8 --- synapse/events/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index fbbe8dd490..5e02ef1a5c 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -145,7 +145,7 @@ class EventContext(object): current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, prev_group=input["prev_group"], - delta_ids = _decode_state_dict(input["delta_ids"]), + delta_ids=_decode_state_dict(input["delta_ids"]), ) context.rejected = input["rejected"] -- cgit 1.4.1 From 440b8845b531db05e7c4f646e48dee7635cf1f0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:38:46 +0100 Subject: Make EventContext lazy load state --- synapse/events/snapshot.py | 153 +++++++++++++++++++++++++++++++++------------ synapse/state.py | 6 +- 2 files changed, 115 insertions(+), 44 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 5e02ef1a5c..f9568638a1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,18 +19,12 @@ from frozendict import frozendict from twisted.internet import defer +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + class EventContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - state_group (int|None): state group id, if the state has been stored as a state group. This is usually only None if e.g. the event is an outlier. @@ -47,36 +41,71 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + + _current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _prev_state_id (str|None): If set then the event associated with the + context overrode the _prev_state_id + + _event_type (str): The type of the event the context is associated with + + _event_state_key (str|None): The state_key of the event the context is + associated with """ __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", "prev_group", "delta_ids", "prev_state_events", "app_service", + "_current_state_ids", + "_prev_state_ids", + "_prev_state_id", + "_event_type", + "_event_state_key", + "_fetching_state_deferred", ] - def __init__(self, state_group, current_state_ids, prev_state_ids, - prev_group=None, delta_ids=None): + @staticmethod + def with_state(state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): + context = EventContext() + # The current state including the current event - self.current_state_ids = current_state_ids + context._current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = prev_state_ids - self.state_group = state_group + context._prev_state_ids = prev_state_ids + context.state_group = state_group + + context._prev_state_id = None + context._event_type = None + context._event_state_key = None + context._fetching_state_deferred = defer.succeed(None) # A previously persisted state group and a delta between that # and this state. - self.prev_group = prev_group - self.delta_ids = delta_ids + context.prev_group = prev_group + context.delta_ids = delta_ids + + context.prev_state_events = [] - self.prev_state_events = [] + context.rejected = False + context.app_service = None - self.rejected = False - self.app_service = None + return context def serialize(self, event): """Converts self to a type that can be serialized as JSON, and then @@ -123,30 +152,17 @@ class EventContext(object): Returns: EventContext """ + context = EventContext() + # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. - prev_state_id = input["prev_state_id"] - event_type = input["event_type"] - event_state_key = input["event_state_key"] + context._prev_state_id = input["prev_state_id"] + context._event_type = input["event_type"] + context._event_state_key = input["event_state_key"] - state_group = input["state_group"] - - current_state_ids = yield store.get_state_ids_for_group( - state_group, - ) - if prev_state_id and event_state_key: - prev_state_ids = dict(current_state_ids) - prev_state_ids[(event_type, event_state_key)] = prev_state_id - else: - prev_state_ids = current_state_ids - - context = EventContext( - state_group=state_group, - current_state_ids=current_state_ids, - prev_state_ids=prev_state_ids, - prev_group=input["prev_group"], - delta_ids=_decode_state_dict(input["delta_ids"]), - ) + context.state_group = input["state_group"] + context.prev_group = input["prev_group"] + context.delta_ids = _decode_state_dict(input["delta_ids"]) context.rejected = input["rejected"] context.prev_state_events = input["prev_state_events"] @@ -157,6 +173,61 @@ class EventContext(object): defer.returnValue(context) + @defer.inlineCallbacks + def get_current_state_ids(self, store): + """Gets the current state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._current_state_ids) + + @defer.inlineCallbacks + def get_prev_state_ids(self, store): + """Gets the prev state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._prev_state_ids) + + @defer.inlineCallbacks + def _fill_out_state(self, store): + """Called to populate the _current_state_ids and _prev_state_ids + attributes by loading from the database. + """ + if self.state_group is None: + return + + self._current_state_ids = yield store.get_state_ids_for_group( + self.state_group, + ) + if self._prev_state_id and self._event_state_key is not None: + self._prev_state_ids = dict(self._current_state_ids) + + key = (self._event_type, self._event_state_key) + self._prev_state_ids[key] = self._prev_state_id + else: + self._prev_state_ids = self._current_state_ids + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/state.py b/synapse/state.py index a708695006..32125c95df 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -219,7 +219,7 @@ class StateHandler(object): # We don't store state for outliers, so we don't generate a state # group for it. - context = EventContext( + context = EventContext.with_state( state_group=None, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, @@ -255,7 +255,7 @@ class StateHandler(object): current_state_ids=current_state_ids, ) - context = EventContext( + context = EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, @@ -322,7 +322,7 @@ class StateHandler(object): state_group = entry.state_group - context = EventContext( + context = EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, -- cgit 1.4.1 From 027bc01a1bc254fe08140c6e91a9fb945b08486f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:02:09 +0100 Subject: Add support for updating state --- synapse/events/snapshot.py | 19 +++++++++++++++++++ synapse/handlers/federation.py | 32 +++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 9 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f9568638a1..b090751bf1 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -228,6 +228,25 @@ class EventContext(object): else: self._prev_state_ids = self._current_state_ids + @defer.inlineCallbacks + def update_state(self, state_group, prev_state_ids, current_state_ids, + delta_ids): + """Replace the state in the context + """ + + # We need to make sure we wait for any ongoing fetching of state + # to complete so that the updated state doesn't get clobbered + if self._fetching_state_deferred: + yield make_deferred_yieldable(self._fetching_state_deferred) + + self.state_group = state_group + self._prev_state_ids = prev_state_ids + self._current_state_ids = current_state_ids + self.delta_ids = delta_ids + + # We need to ensure that that we've marked as having fetched the state + self._fetching_state_deferred = defer.succeed(None) + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 98dd4a7fd1..14654d59f1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1975,21 +1975,35 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update(state_updates) + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids.update(state_updates) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ + delta_ids = dict(context.delta_ids) + delta_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ k: a.event_id for k, a in iteritems(auth_events) }) - context.state_group = yield self.store.store_state_group( + + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + delta_ids=delta_ids, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + delta_ids=delta_ids, ) @defer.inlineCallbacks -- cgit 1.4.1 From 4797ed000e612b68c418ce8c342bdd7ecc16b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:05:40 +0100 Subject: Update docstrings to make sense --- synapse/events/snapshot.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index b090751bf1..a6d7bf5700 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -55,13 +55,16 @@ class EventContext(object): _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have been calculated. None if we haven't started calculating yet - _prev_state_id (str|None): If set then the event associated with the - context overrode the _prev_state_id - - _event_type (str): The type of the event the context is associated with + _event_type (str): The type of the event the context is associated with. + Only set when state has not been fetched yet. _event_state_key (str|None): The state_key of the event the context is - associated with + associated with. Only set when state has not been fetched yet. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + Only set when state has not been fetched yet. """ __slots__ = [ -- cgit 1.4.1 From 999bcf9d016fb7fd9ad5a9daf4f0ec6d25a10717 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:24:21 +0100 Subject: Fix EventContext when using workers We were: 1. Not correctly setting all attributes 2. Using defer.inlineCallbacks in a non-generator --- synapse/events/snapshot.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a6d7bf5700..e31eceb921 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -82,6 +82,11 @@ class EventContext(object): "_fetching_state_deferred", ] + def __init__(self): + self.prev_state_events = [] + self.rejected = False + self.app_service = None + @staticmethod def with_state(state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None): @@ -103,11 +108,6 @@ class EventContext(object): context.prev_group = prev_group context.delta_ids = delta_ids - context.prev_state_events = [] - - context.rejected = False - context.app_service = None - return context def serialize(self, event): @@ -143,7 +143,6 @@ class EventContext(object): } @staticmethod - @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a EventContext. @@ -162,6 +161,7 @@ class EventContext(object): context._prev_state_id = input["prev_state_id"] context._event_type = input["event_type"] context._event_state_key = input["event_state_key"] + context._fetching_state_deferred = None context.state_group = input["state_group"] context.prev_group = input["prev_group"] @@ -174,7 +174,7 @@ class EventContext(object): if app_service_id: context.app_service = store.get_app_service_by_id(app_service_id) - defer.returnValue(context) + return context @defer.inlineCallbacks def get_current_state_ids(self, store): -- cgit 1.4.1 From 0faa3223cdf996aa18376a7420a43061a6691638 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 16:28:00 +0100 Subject: Fix missing attributes on workers. This was missed during the transition from attribute to getter for getting state from context. --- synapse/events/snapshot.py | 10 ++++++---- synapse/handlers/message.py | 5 +++-- synapse/replication/http/send_event.py | 7 +++++-- 3 files changed, 14 insertions(+), 8 deletions(-) (limited to 'synapse/events') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e31eceb921..a59064b416 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -110,7 +110,8 @@ class EventContext(object): return context - def serialize(self, event): + @defer.inlineCallbacks + def serialize(self, event, store): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -126,11 +127,12 @@ class EventContext(object): # the prev_state_ids, so if we're a state event we include the event # id that we replaced in the state. if event.is_state(): - prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) else: prev_state_id = None - return { + defer.returnValue({ "prev_state_id": prev_state_id, "event_type": event.type, "event_state_key": event.state_key if event.is_state() else None, @@ -140,7 +142,7 @@ class EventContext(object): "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None - } + }) @staticmethod def deserialize(store, input): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c4bcd9018b..7571975c22 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -807,8 +807,9 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( - self.hs.get_clock(), - self.http_client, + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, requester=requester, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 2eede54792..5227bc333d 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -34,12 +34,13 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(clock, client, host, port, requester, event, context, +def send_event_to_master(clock, store, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: clock (synapse.util.Clock) + store (DataStore) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context, host, port, event.event_id, ) + serialized_context = yield context.serialize(event, store) + payload = { "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(event), + "context": serialized_context, "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], -- cgit 1.4.1