diff options
author | Erik Johnston <erik@matrix.org> | 2018-02-16 11:18:19 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-03-14 16:51:39 +0000 |
commit | 220a6265b887867cec92242d35cc9b8c605826e8 (patch) | |
tree | a577d136169d2b7f29449d338b04d9563e77104c | |
parent | Refactor event storage to not require state (diff) | |
download | synapse-220a6265b887867cec92242d35cc9b8c605826e8.tar.xz |
Add concept of StatelessEventContext
The master process (usually) doesn't need the state at an event when it has been created by a worker process, so let's not automatically load the state in that case.
-rw-r--r-- | synapse/events/snapshot.py | 103 | ||||
-rw-r--r-- | synapse/replication/http/send_event.py | 10 |
2 files changed, 35 insertions, 78 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 8e684d91b5..5da09dcce7 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,22 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer -from frozendict import frozendict - - -class EventContext(object): +class StatelessEventContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - state_group (int|None): state group id, if the state has been stored as a state group. This is usually only None if e.g. the event is an outlier. @@ -40,29 +28,20 @@ class EventContext(object): prev_group (int): Previously persisted state group. ``None`` for an outlier. - delta_ids (dict[(str, str), str]): Delta from ``prev_group``. - (type, state_key) -> event_id. ``None`` for an outlier. prev_state_events (?): XXX: is this ever set to anything other than the empty list? """ __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", "prev_group", - "delta_ids", "prev_state_events", "app_service", ] def __init__(self): - # The current state including the current event - self.current_state_ids = None - # The current state excluding the current event - self.prev_state_ids = None self.state_group = None self.rejected = False @@ -70,46 +49,27 @@ class EventContext(object): # A previously persisted state group and a delta between that # and this state. self.prev_group = None - self.delta_ids = None self.prev_state_events = None self.app_service = None - def serialize(self, event): + def serialize(self): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` - Args: - event (FrozenEvent): The event that this context relates to - Returns: dict """ - - # We don't serialize the full state dicts, instead they get pulled out - # of the DB on the other side. However, the other side can't figure out - # the prev_state_ids, so if we're a state event we include the event - # id that we replaced in the state. - if event.is_state(): - prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) - else: - prev_state_id = None - return { - "prev_state_id": prev_state_id, - "event_type": event.type, - "event_state_key": event.state_key if event.is_state() else None, "state_group": self.state_group, "rejected": self.rejected, "prev_group": self.prev_group, - "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None } @staticmethod - @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a EventContext. @@ -121,52 +81,47 @@ class EventContext(object): Returns: EventContext """ - context = EventContext() + context = StatelessEventContext() context.state_group = input["state_group"] context.rejected = input["rejected"] context.prev_group = input["prev_group"] - context.delta_ids = _decode_state_dict(input["delta_ids"]) context.prev_state_events = input["prev_state_events"] - # We use the state_group and prev_state_id stuff to pull the - # current_state_ids out of the DB and construct prev_state_ids. - prev_state_id = input["prev_state_id"] - event_type = input["event_type"] - event_state_key = input["event_state_key"] - - context.current_state_ids = yield store.get_state_ids_for_group( - context.state_group, - ) - if prev_state_id and event_state_key: - context.prev_state_ids = dict(context.current_state_ids) - context.prev_state_ids[(event_type, event_state_key)] = prev_state_id - else: - context.prev_state_ids = context.current_state_ids - app_service_id = input["app_service_id"] if app_service_id: context.app_service = store.get_app_service_by_id(app_service_id) - defer.returnValue(context) + return context -def _encode_state_dict(state_dict): - """Since dicts of (type, state_key) -> event_id cannot be serialized in - JSON we need to convert them to a form that can. +class EventContext(StatelessEventContext): """ - if state_dict is None: - return None + Attributes: + current_state_ids (dict[(str, str), str]): + The current state map including the current event. + (type, state_key) -> event_id - return [ - (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() - ] + prev_state_ids (dict[(str, str), str]): + The current state map excluding the current event. + (type, state_key) -> event_id + delta_ids (dict[(str, str), str]): Delta from ``prev_group``. + (type, state_key) -> event_id. ``None`` for an outlier. -def _decode_state_dict(input): - """Decodes a state dict encoded using `_encode_state_dict` above """ - if input is None: - return None - return frozendict({(etype, state_key,): v for etype, state_key, v in input}) + __slots__ = [ + "current_state_ids", + "prev_state_ids", + "delta_ids", + ] + + def __init__(self): + # The current state including the current event + self.current_state_ids = None + # The current state excluding the current event + self.prev_state_ids = None + + self.delta_ids = None + + super(EventContext, self).__init__() diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index bbe2f967b7..f00716db95 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -19,7 +19,7 @@ from synapse.api.errors import ( SynapseError, MatrixCodeMessageException, CodeMessageException, ) from synapse.events import FrozenEvent -from synapse.events.snapshot import EventContext +from synapse.events.snapshot import StatelessEventContext from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.util.async import sleep from synapse.util.caches.response_cache import ResponseCache @@ -44,7 +44,7 @@ def send_event_to_master(client, host, port, requester, event, context, port (int): port on master listening for HTTP replication requester (Requester) event (FrozenEvent) - context (EventContext) + context (StatelessEventContext) ratelimit (bool) extra_users (list(UserID)): Any extra users to notify about event """ @@ -56,7 +56,7 @@ def send_event_to_master(client, host, port, requester, event, context, "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(event), + "context": context.serialize(), "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], @@ -140,7 +140,9 @@ class ReplicationSendEventRestServlet(RestServlet): event = FrozenEvent(event_dict, internal_metadata, rejected_reason) requester = Requester.deserialize(self.store, content["requester"]) - context = yield EventContext.deserialize(self.store, content["context"]) + context = yield StatelessEventContext.deserialize( + self.store, content["context"], + ) ratelimit = content["ratelimit"] extra_users = [UserID.from_string(u) for u in content["extra_users"]] |