summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-02-16 11:18:19 +0000
committerErik Johnston <erik@matrix.org>2018-03-14 16:51:39 +0000
commit220a6265b887867cec92242d35cc9b8c605826e8 (patch)
treea577d136169d2b7f29449d338b04d9563e77104c
parentRefactor event storage to not require state (diff)
downloadsynapse-220a6265b887867cec92242d35cc9b8c605826e8.tar.xz
Add concept of StatelessEventContext
The master process (usually) doesn't need the state at an event when it
has been created by a worker process, so let's not automatically load
the state in that case.
-rw-r--r--synapse/events/snapshot.py103
-rw-r--r--synapse/replication/http/send_event.py10
2 files changed, 35 insertions, 78 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 8e684d91b5..5da09dcce7 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -13,22 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
 
-from frozendict import frozendict
-
-
-class EventContext(object):
+class StatelessEventContext(object):
     """
     Attributes:
-        current_state_ids (dict[(str, str), str]):
-            The current state map including the current event.
-            (type, state_key) -> event_id
-
-        prev_state_ids (dict[(str, str), str]):
-            The current state map excluding the current event.
-            (type, state_key) -> event_id
-
         state_group (int|None): state group id, if the state has been stored
             as a state group. This is usually only None if e.g. the event is
             an outlier.
@@ -40,29 +28,20 @@ class EventContext(object):
 
         prev_group (int): Previously persisted state group. ``None`` for an
             outlier.
-        delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
-            (type, state_key) -> event_id. ``None`` for an outlier.
 
         prev_state_events (?): XXX: is this ever set to anything other than
             the empty list?
     """
 
     __slots__ = [
-        "current_state_ids",
-        "prev_state_ids",
         "state_group",
         "rejected",
         "prev_group",
-        "delta_ids",
         "prev_state_events",
         "app_service",
     ]
 
     def __init__(self):
-        # The current state including the current event
-        self.current_state_ids = None
-        # The current state excluding the current event
-        self.prev_state_ids = None
         self.state_group = None
 
         self.rejected = False
@@ -70,46 +49,27 @@ class EventContext(object):
         # A previously persisted state group and a delta between that
         # and this state.
         self.prev_group = None
-        self.delta_ids = None
 
         self.prev_state_events = None
 
         self.app_service = None
 
-    def serialize(self, event):
+    def serialize(self):
         """Converts self to a type that can be serialized as JSON, and then
         deserialized by `deserialize`
 
-        Args:
-            event (FrozenEvent): The event that this context relates to
-
         Returns:
             dict
         """
-
-        # We don't serialize the full state dicts, instead they get pulled out
-        # of the DB on the other side. However, the other side can't figure out
-        # the prev_state_ids, so if we're a state event we include the event
-        # id that we replaced in the state.
-        if event.is_state():
-            prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
-        else:
-            prev_state_id = None
-
         return {
-            "prev_state_id": prev_state_id,
-            "event_type": event.type,
-            "event_state_key": event.state_key if event.is_state() else None,
             "state_group": self.state_group,
             "rejected": self.rejected,
             "prev_group": self.prev_group,
-            "delta_ids": _encode_state_dict(self.delta_ids),
             "prev_state_events": self.prev_state_events,
             "app_service_id": self.app_service.id if self.app_service else None
         }
 
     @staticmethod
-    @defer.inlineCallbacks
     def deserialize(store, input):
         """Converts a dict that was produced by `serialize` back into a
         EventContext.
@@ -121,52 +81,47 @@ class EventContext(object):
         Returns:
             EventContext
         """
-        context = EventContext()
+        context = StatelessEventContext()
         context.state_group = input["state_group"]
         context.rejected = input["rejected"]
         context.prev_group = input["prev_group"]
-        context.delta_ids = _decode_state_dict(input["delta_ids"])
         context.prev_state_events = input["prev_state_events"]
 
-        # We use the state_group and prev_state_id stuff to pull the
-        # current_state_ids out of the DB and construct prev_state_ids.
-        prev_state_id = input["prev_state_id"]
-        event_type = input["event_type"]
-        event_state_key = input["event_state_key"]
-
-        context.current_state_ids = yield store.get_state_ids_for_group(
-            context.state_group,
-        )
-        if prev_state_id and event_state_key:
-            context.prev_state_ids = dict(context.current_state_ids)
-            context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
-        else:
-            context.prev_state_ids = context.current_state_ids
-
         app_service_id = input["app_service_id"]
         if app_service_id:
             context.app_service = store.get_app_service_by_id(app_service_id)
 
-        defer.returnValue(context)
+        return context
 
 
-def _encode_state_dict(state_dict):
-    """Since dicts of (type, state_key) -> event_id cannot be serialized in
-    JSON we need to convert them to a form that can.
+class EventContext(StatelessEventContext):
     """
-    if state_dict is None:
-        return None
+    Attributes:
+        current_state_ids (dict[(str, str), str]):
+            The current state map including the current event.
+            (type, state_key) -> event_id
 
-    return [
-        (etype, state_key, v)
-        for (etype, state_key), v in state_dict.iteritems()
-    ]
+        prev_state_ids (dict[(str, str), str]):
+            The current state map excluding the current event.
+            (type, state_key) -> event_id
 
+        delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
+            (type, state_key) -> event_id. ``None`` for an outlier.
 
-def _decode_state_dict(input):
-    """Decodes a state dict encoded using `_encode_state_dict` above
     """
-    if input is None:
-        return None
 
-    return frozendict({(etype, state_key,): v for etype, state_key, v in input})
+    __slots__ = [
+        "current_state_ids",
+        "prev_state_ids",
+        "delta_ids",
+    ]
+
+    def __init__(self):
+        # The current state including the current event
+        self.current_state_ids = None
+        # The current state excluding the current event
+        self.prev_state_ids = None
+
+        self.delta_ids = None
+
+        super(EventContext, self).__init__()
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index bbe2f967b7..f00716db95 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -19,7 +19,7 @@ from synapse.api.errors import (
     SynapseError, MatrixCodeMessageException, CodeMessageException,
 )
 from synapse.events import FrozenEvent
-from synapse.events.snapshot import EventContext
+from synapse.events.snapshot import StatelessEventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
@@ -44,7 +44,7 @@ def send_event_to_master(client, host, port, requester, event, context,
         port (int): port on master listening for HTTP replication
         requester (Requester)
         event (FrozenEvent)
-        context (EventContext)
+        context (StatelessEventContext)
         ratelimit (bool)
         extra_users (list(UserID)): Any extra users to notify about event
     """
@@ -56,7 +56,7 @@ def send_event_to_master(client, host, port, requester, event, context,
         "event": event.get_pdu_json(),
         "internal_metadata": event.internal_metadata.get_dict(),
         "rejected_reason": event.rejected_reason,
-        "context": context.serialize(event),
+        "context": context.serialize(),
         "requester": requester.serialize(),
         "ratelimit": ratelimit,
         "extra_users": [u.to_string() for u in extra_users],
@@ -140,7 +140,9 @@ class ReplicationSendEventRestServlet(RestServlet):
             event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
 
             requester = Requester.deserialize(self.store, content["requester"])
-            context = yield EventContext.deserialize(self.store, content["context"])
+            context = yield StatelessEventContext.deserialize(
+                self.store, content["context"],
+            )
 
             ratelimit = content["ratelimit"]
             extra_users = [UserID.from_string(u) for u in content["extra_users"]]