diff options
Diffstat (limited to 'synapse/events/snapshot.py')
-rw-r--r-- | synapse/events/snapshot.py | 344 |
1 files changed, 203 insertions, 141 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index acbcbeeced..7c5f620d09 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -12,104 +12,124 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional, Union from six import iteritems +import attr from frozendict import frozendict from twisted.internet import defer +from synapse.appservice import ApplicationService from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.types import StateMap -class EventContext(object): +@attr.s(slots=True) +class EventContext: """ + Holds information relevant to persisting an event + Attributes: - state_group (int|None): state group id, if the state has been stored - as a state group. This is usually only None if e.g. the event is - an outlier. - rejected (bool|str): A rejection reason if the event was rejected, else - False - - push_actions (list[(str, list[object])]): list of (user_id, actions) - tuples - - prev_group (int): Previously persisted state group. ``None`` for an - outlier. - delta_ids (dict[(str, str), str]): Delta from ``prev_group``. - (type, state_key) -> event_id. ``None`` for an outlier. - - prev_state_events (?): XXX: is this ever set to anything other than - the empty list? - - _current_state_ids (dict[(str, str), str]|None): - The current state map including the current event. None if outlier - or we haven't fetched the state from DB yet. - (type, state_key) -> event_id + rejected: A rejection reason if the event was rejected, else False + + _state_group: The ID of the state group for this event. Note that state events + are persisted with a state group which includes the new event, so this is + effectively the state *after* the event in question. + + For a *rejected* state event, where the state of the rejected event is + ignored, this state_group should never make it into the + event_to_state_groups table. Indeed, inspecting this value for a rejected + state event is almost certainly incorrect. + + For an outlier, where we don't have the state at the event, this will be + None. + + Note that this is a private attribute: it should be accessed via + the ``state_group`` property. + + state_group_before_event: The ID of the state group representing the state + of the room before this event. + + If this is a non-state event, this will be the same as ``state_group``. If + it's a state event, it will be the same as ``prev_group``. + + If ``state_group`` is None (ie, the event is an outlier), + ``state_group_before_event`` will always also be ``None``. + + prev_group: If it is known, ``state_group``'s prev_group. Note that this being + None does not necessarily mean that ``state_group`` does not have + a prev_group! + + If the event is a state event, this is normally the same as ``prev_group``. + + If ``state_group`` is None (ie, the event is an outlier), ``prev_group`` + will always also be ``None``. + + Note that this *not* (necessarily) the state group associated with + ``_prev_state_ids``. + + delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group`` + and ``state_group``. + + app_service: If this event is being sent by a (local) application service, that + app service. + + _current_state_ids: The room state map, including this event - ie, the state + in ``state_group``. - _prev_state_ids (dict[(str, str), str]|None): - The current state map excluding the current event. None if outlier - or we haven't fetched the state from DB yet. (type, state_key) -> event_id - _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have - been calculated. None if we haven't started calculating yet + FIXME: what is this for an outlier? it seems ill-defined. It seems like + it could be either {}, or the state we were given by the remote + server, depending on $THINGS - _event_type (str): The type of the event the context is associated with. - Only set when state has not been fetched yet. + Note that this is a private attribute: it should be accessed via + ``get_current_state_ids``. _AsyncEventContext impl calculates this + on-demand: it will be None until that happens. - _event_state_key (str|None): The state_key of the event the context is - associated with. Only set when state has not been fetched yet. + _prev_state_ids: The room state map, excluding this event - ie, the state + in ``state_group_before_event``. For a non-state + event, this will be the same as _current_state_events. - _prev_state_id (str|None): If the event associated with the context is - a state event, then `_prev_state_id` is the event_id of the state - that was replaced. - Only set when state has not been fetched yet. - """ + Note that it is a completely different thing to prev_group! - __slots__ = [ - "state_group", - "rejected", - "prev_group", - "delta_ids", - "prev_state_events", - "app_service", - "_current_state_ids", - "_prev_state_ids", - "_prev_state_id", - "_event_type", - "_event_state_key", - "_fetching_state_deferred", - ] - - def __init__(self): - self.prev_state_events = [] - self.rejected = False - self.app_service = None + (type, state_key) -> event_id - @staticmethod - def with_state( - state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None - ): - context = EventContext() + FIXME: again, what is this for an outlier? - # The current state including the current event - context._current_state_ids = current_state_ids - # The current state excluding the current event - context._prev_state_ids = prev_state_ids - context.state_group = state_group + As with _current_state_ids, this is a private attribute. It should be + accessed via get_prev_state_ids. + """ - context._prev_state_id = None - context._event_type = None - context._event_state_key = None - context._fetching_state_deferred = defer.succeed(None) + rejected = attr.ib(default=False, type=Union[bool, str]) + _state_group = attr.ib(default=None, type=Optional[int]) + state_group_before_event = attr.ib(default=None, type=Optional[int]) + prev_group = attr.ib(default=None, type=Optional[int]) + delta_ids = attr.ib(default=None, type=Optional[StateMap[str]]) + app_service = attr.ib(default=None, type=Optional[ApplicationService]) - # A previously persisted state group and a delta between that - # and this state. - context.prev_group = prev_group - context.delta_ids = delta_ids + _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) + _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) - return context + @staticmethod + def with_state( + state_group, + state_group_before_event, + current_state_ids, + prev_state_ids, + prev_group=None, + delta_ids=None, + ): + return EventContext( + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + state_group=state_group, + state_group_before_event=state_group_before_event, + prev_group=prev_group, + delta_ids=delta_ids, + ) @defer.inlineCallbacks def serialize(self, event, store): @@ -128,7 +148,7 @@ class EventContext(object): # the prev_state_ids, so if we're a state event we include the event # id that we replaced in the state. if event.is_state(): - prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_ids = yield self.get_prev_state_ids() prev_state_id = prev_state_ids.get((event.type, event.state_key)) else: prev_state_id = None @@ -137,74 +157,93 @@ class EventContext(object): "prev_state_id": prev_state_id, "event_type": event.type, "event_state_key": event.state_key if event.is_state() else None, - "state_group": self.state_group, + "state_group": self._state_group, + "state_group_before_event": self.state_group_before_event, "rejected": self.rejected, "prev_group": self.prev_group, "delta_ids": _encode_state_dict(self.delta_ids), - "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None, } @staticmethod - def deserialize(store, input): + def deserialize(storage, input): """Converts a dict that was produced by `serialize` back into a EventContext. Args: - store (DataStore): Used to convert AS ID to AS object + storage (Storage): Used to convert AS ID to AS object and fetch + state. input (dict): A dict produced by `serialize` Returns: EventContext """ - context = EventContext() + context = _AsyncEventContextImpl( + # We use the state_group and prev_state_id stuff to pull the + # current_state_ids out of the DB and construct prev_state_ids. + storage=storage, + prev_state_id=input["prev_state_id"], + event_type=input["event_type"], + event_state_key=input["event_state_key"], + state_group=input["state_group"], + state_group_before_event=input["state_group_before_event"], + prev_group=input["prev_group"], + delta_ids=_decode_state_dict(input["delta_ids"]), + rejected=input["rejected"], + ) - # We use the state_group and prev_state_id stuff to pull the - # current_state_ids out of the DB and construct prev_state_ids. - context._prev_state_id = input["prev_state_id"] - context._event_type = input["event_type"] - context._event_state_key = input["event_state_key"] + app_service_id = input["app_service_id"] + if app_service_id: + context.app_service = storage.main.get_app_service_by_id(app_service_id) - context._current_state_ids = None - context._prev_state_ids = None - context._fetching_state_deferred = None + return context - context.state_group = input["state_group"] - context.prev_group = input["prev_group"] - context.delta_ids = _decode_state_dict(input["delta_ids"]) + @property + def state_group(self) -> Optional[int]: + """The ID of the state group for this event. - context.rejected = input["rejected"] - context.prev_state_events = input["prev_state_events"] + Note that state events are persisted with a state group which includes the new + event, so this is effectively the state *after* the event in question. - app_service_id = input["app_service_id"] - if app_service_id: - context.app_service = store.get_app_service_by_id(app_service_id) + For an outlier, where we don't have the state at the event, this will be None. - return context + It is an error to access this for a rejected event, since rejected state should + not make it into the room state. Accessing this property will raise an exception + if ``rejected`` is set. + """ + if self.rejected: + raise RuntimeError("Attempt to access state_group of rejected event") + + return self._state_group @defer.inlineCallbacks - def get_current_state_ids(self, store): - """Gets the current state IDs + def get_current_state_ids(self): + """ + Gets the room state map, including this event - ie, the state in ``state_group`` + + It is an error to access this for a rejected event, since rejected state should + not make it into the room state. This method will raise an exception if + ``rejected`` is set. Returns: Deferred[dict[(str, str), str]|None]: Returns None if state_group is None, which happens when the associated event is an outlier. + Maps a (type, state_key) to the event ID of the state event matching this tuple. """ + if self.rejected: + raise RuntimeError("Attempt to access state_ids of rejected event") - if not self._fetching_state_deferred: - self._fetching_state_deferred = run_in_background( - self._fill_out_state, store - ) - - yield make_deferred_yieldable(self._fetching_state_deferred) - + yield self._ensure_fetched() return self._current_state_ids @defer.inlineCallbacks - def get_prev_state_ids(self, store): - """Gets the prev state IDs + def get_prev_state_ids(self): + """ + Gets the room state map, excluding this event. + + For a non-state event, this will be the same as get_current_state_ids(). Returns: Deferred[dict[(str, str), str]|None]: Returns None if state_group @@ -212,65 +251,88 @@ class EventContext(object): Maps a (type, state_key) to the event ID of the state event matching this tuple. """ - - if not self._fetching_state_deferred: - self._fetching_state_deferred = run_in_background( - self._fill_out_state, store - ) - - yield make_deferred_yieldable(self._fetching_state_deferred) - + yield self._ensure_fetched() return self._prev_state_ids def get_cached_current_state_ids(self): """Gets the current state IDs if we have them already cached. + It is an error to access this for a rejected event, since rejected state should + not make it into the room state. This method will raise an exception if + ``rejected`` is set. + Returns: dict[(str, str), str]|None: Returns None if we haven't cached the state or if state_group is None, which happens when the associated event is an outlier. """ + if self.rejected: + raise RuntimeError("Attempt to access state_ids of rejected event") return self._current_state_ids + def _ensure_fetched(self): + return defer.succeed(None) + + +@attr.s(slots=True) +class _AsyncEventContextImpl(EventContext): + """ + An implementation of EventContext which fetches _current_state_ids and + _prev_state_ids from the database on demand. + + Attributes: + + _storage (Storage) + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _event_type (str): The type of the event the context is associated with. + + _event_state_key (str): The state_key of the event the context is + associated with. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + """ + + # This needs to have a default as we're inheriting + _storage = attr.ib(default=None) + _prev_state_id = attr.ib(default=None) + _event_type = attr.ib(default=None) + _event_state_key = attr.ib(default=None) + _fetching_state_deferred = attr.ib(default=None) + + def _ensure_fetched(self): + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background(self._fill_out_state) + + return make_deferred_yieldable(self._fetching_state_deferred) + @defer.inlineCallbacks - def _fill_out_state(self, store): + def _fill_out_state(self): """Called to populate the _current_state_ids and _prev_state_ids attributes by loading from the database. """ if self.state_group is None: return - self._current_state_ids = yield store.get_state_ids_for_group(self.state_group) - if self._prev_state_id and self._event_state_key is not None: + self._current_state_ids = yield self._storage.state.get_state_ids_for_group( + self.state_group + ) + if self._event_state_key is not None: self._prev_state_ids = dict(self._current_state_ids) key = (self._event_type, self._event_state_key) - self._prev_state_ids[key] = self._prev_state_id + if self._prev_state_id: + self._prev_state_ids[key] = self._prev_state_id + else: + self._prev_state_ids.pop(key, None) else: self._prev_state_ids = self._current_state_ids - @defer.inlineCallbacks - def update_state( - self, state_group, prev_state_ids, current_state_ids, prev_group, delta_ids - ): - """Replace the state in the context - """ - - # We need to make sure we wait for any ongoing fetching of state - # to complete so that the updated state doesn't get clobbered - if self._fetching_state_deferred: - yield make_deferred_yieldable(self._fetching_state_deferred) - - self.state_group = state_group - self._prev_state_ids = prev_state_ids - self.prev_group = prev_group - self._current_state_ids = current_state_ids - self.delta_ids = delta_ids - - # We need to ensure that that we've marked as having fetched the state - self._fetching_state_deferred = defer.succeed(None) - def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in |