diff options
Diffstat (limited to 'synapse/events/snapshot.py')
-rw-r--r-- | synapse/events/snapshot.py | 259 |
1 files changed, 238 insertions, 21 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e9a732ff03..368b5f6ae4 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,19 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + +from frozendict import frozendict + +from twisted.internet import defer + +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + class EventContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - - state_group (int): state group id + state_group (int|None): state group id, if the state has been stored + as a state group. This is usually only None if e.g. the event is + an outlier. rejected (bool|str): A rejection reason if the event was rejected, else False @@ -39,35 +41,250 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + + _current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _event_type (str): The type of the event the context is associated with. + Only set when state has not been fetched yet. + + _event_state_key (str|None): The state_key of the event the context is + associated with. Only set when state has not been fetched yet. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + Only set when state has not been fetched yet. """ __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", - "push_actions", "prev_group", "delta_ids", "prev_state_events", "app_service", + "_current_state_ids", + "_prev_state_ids", + "_prev_state_id", + "_event_type", + "_event_state_key", + "_fetching_state_deferred", ] def __init__(self): + self.prev_state_events = [] + self.rejected = False + self.app_service = None + + @staticmethod + def with_state(state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): + context = EventContext() + # The current state including the current event - self.current_state_ids = None + context._current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = None - self.state_group = None + context._prev_state_ids = prev_state_ids + context.state_group = state_group - self.rejected = False - self.push_actions = [] + context._prev_state_id = None + context._event_type = None + context._event_state_key = None + context._fetching_state_deferred = defer.succeed(None) # A previously persisted state group and a delta between that # and this state. - self.prev_group = None - self.delta_ids = None + context.prev_group = prev_group + context.delta_ids = delta_ids - self.prev_state_events = None + return context - self.app_service = None + @defer.inlineCallbacks + def serialize(self, event, store): + """Converts self to a type that can be serialized as JSON, and then + deserialized by `deserialize` + + Args: + event (FrozenEvent): The event that this context relates to + + Returns: + dict + """ + + # We don't serialize the full state dicts, instead they get pulled out + # of the DB on the other side. However, the other side can't figure out + # the prev_state_ids, so if we're a state event we include the event + # id that we replaced in the state. + if event.is_state(): + prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) + else: + prev_state_id = None + + defer.returnValue({ + "prev_state_id": prev_state_id, + "event_type": event.type, + "event_state_key": event.state_key if event.is_state() else None, + "state_group": self.state_group, + "rejected": self.rejected, + "prev_group": self.prev_group, + "delta_ids": _encode_state_dict(self.delta_ids), + "prev_state_events": self.prev_state_events, + "app_service_id": self.app_service.id if self.app_service else None + }) + + @staticmethod + def deserialize(store, input): + """Converts a dict that was produced by `serialize` back into a + EventContext. + + Args: + store (DataStore): Used to convert AS ID to AS object + input (dict): A dict produced by `serialize` + + Returns: + EventContext + """ + context = EventContext() + + # We use the state_group and prev_state_id stuff to pull the + # current_state_ids out of the DB and construct prev_state_ids. + context._prev_state_id = input["prev_state_id"] + context._event_type = input["event_type"] + context._event_state_key = input["event_state_key"] + + context._current_state_ids = None + context._prev_state_ids = None + context._fetching_state_deferred = None + + context.state_group = input["state_group"] + context.prev_group = input["prev_group"] + context.delta_ids = _decode_state_dict(input["delta_ids"]) + + context.rejected = input["rejected"] + context.prev_state_events = input["prev_state_events"] + + app_service_id = input["app_service_id"] + if app_service_id: + context.app_service = store.get_app_service_by_id(app_service_id) + + return context + + @defer.inlineCallbacks + def get_current_state_ids(self, store): + """Gets the current state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._current_state_ids) + + @defer.inlineCallbacks + def get_prev_state_ids(self, store): + """Gets the prev state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._prev_state_ids) + + def get_cached_current_state_ids(self): + """Gets the current state IDs if we have them already cached. + + Returns: + dict[(str, str), str]|None: Returns None if we haven't cached the + state or if state_group is None, which happens when the associated + event is an outlier. + """ + + return self._current_state_ids + + @defer.inlineCallbacks + def _fill_out_state(self, store): + """Called to populate the _current_state_ids and _prev_state_ids + attributes by loading from the database. + """ + if self.state_group is None: + return + + self._current_state_ids = yield store.get_state_ids_for_group( + self.state_group, + ) + if self._prev_state_id and self._event_state_key is not None: + self._prev_state_ids = dict(self._current_state_ids) + + key = (self._event_type, self._event_state_key) + self._prev_state_ids[key] = self._prev_state_id + else: + self._prev_state_ids = self._current_state_ids + + @defer.inlineCallbacks + def update_state(self, state_group, prev_state_ids, current_state_ids, + prev_group, delta_ids): + """Replace the state in the context + """ + + # We need to make sure we wait for any ongoing fetching of state + # to complete so that the updated state doesn't get clobbered + if self._fetching_state_deferred: + yield make_deferred_yieldable(self._fetching_state_deferred) + + self.state_group = state_group + self._prev_state_ids = prev_state_ids + self.prev_group = prev_group + self._current_state_ids = current_state_ids + self.delta_ids = delta_ids + + # We need to ensure that that we've marked as having fetched the state + self._fetching_state_deferred = defer.succeed(None) + + +def _encode_state_dict(state_dict): + """Since dicts of (type, state_key) -> event_id cannot be serialized in + JSON we need to convert them to a form that can. + """ + if state_dict is None: + return None + + return [ + (etype, state_key, v) + for (etype, state_key), v in iteritems(state_dict) + ] + + +def _decode_state_dict(input): + """Decodes a state dict encoded using `_encode_state_dict` above + """ + if input is None: + return None + + return frozendict({(etype, state_key,): v for etype, state_key, v in input}) |