summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-02-15 13:53:18 +0000
committerErik Johnston <erik@matrix.org>2018-02-15 13:53:18 +0000
commit106906a65e647d94a9d2faf1b3a626bc1f608a25 (patch)
tree4a96b7e56243b2f695a84a44e27a7f146c119468
parentMerge pull request #2872 from matrix-org/erikj/event_worker_dont_log (diff)
downloadsynapse-106906a65e647d94a9d2faf1b3a626bc1f608a25.tar.xz
Don't serialize current state over replication
Diffstat (limited to '')
-rw-r--r--synapse/events/snapshot.py41
-rw-r--r--synapse/replication/http/send_event.py4
-rw-r--r--synapse/storage/state.py14
3 files changed, 51 insertions, 8 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 7b80444f73..f9445bef13 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from frozendict import frozendict
 
@@ -77,16 +78,30 @@ class EventContext(object):
 
         self.app_service = None
 
-    def serialize(self):
+    def serialize(self, event):
         """Converts self to a type that can be serialized as JSON, and then
         deserialized by `deserialize`
 
+        Args:
+            event (FrozenEvent): The event that this context relates to
+
         Returns:
             dict
         """
+
+        # We don't serialize the full state dicts, instead they get pulled out
+        # of the DB on the other side. However, the other side can't figure out
+        # the prev_state_ids, so if we're a state event we include the event
+        # id that we replaced in the state.
+        if event.is_state():
+            prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
+        else:
+            prev_state_id = None
+
         return {
-            "current_state_ids": _encode_state_dict(self.current_state_ids),
-            "prev_state_ids": _encode_state_dict(self.prev_state_ids),
+            "prev_state_id": prev_state_id,
+            "event_type": event.type,
+            "event_state_key": event.state_key if event.is_state() else None,
             "state_group": self.state_group,
             "rejected": self.rejected,
             "push_actions": self.push_actions,
@@ -97,6 +112,7 @@ class EventContext(object):
         }
 
     @staticmethod
+    @defer.inlineCallbacks
     def deserialize(store, input):
         """Converts a dict that was produced by `serialize` back into a
         EventContext.
@@ -109,8 +125,6 @@ class EventContext(object):
             EventContext
         """
         context = EventContext()
-        context.current_state_ids = _decode_state_dict(input["current_state_ids"])
-        context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
         context.state_group = input["state_group"]
         context.rejected = input["rejected"]
         context.push_actions = input["push_actions"]
@@ -118,11 +132,26 @@ class EventContext(object):
         context.delta_ids = _decode_state_dict(input["delta_ids"])
         context.prev_state_events = input["prev_state_events"]
 
+        # We use the state_group and prev_state_id stuff to pull the
+        # current_state_ids out of the DB and construct prev_state_ids.
+        prev_state_id = input["prev_state_id"]
+        event_type = input["event_type"]
+        event_state_key = input["event_state_key"]
+
+        context.current_state_ids = yield store.get_state_ids_for_group(
+            context.state_group,
+        )
+        if prev_state_id and event_state_key:
+            context.prev_state_ids = dict(context.current_state_ids)
+            context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
+        else:
+            context.prev_state_ids = context.current_state_ids
+
         app_service_id = input["app_service_id"]
         if app_service_id:
             context.app_service = store.get_app_service_by_id(app_service_id)
 
-        return context
+        defer.returnValue(context)
 
 
 def _encode_state_dict(state_dict):
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 7b21a2213c..468f4b68f4 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -46,7 +46,7 @@ def send_event_to_master(client, host, port, requester, event, context):
         "event": event.get_pdu_json(),
         "internal_metadata": event.internal_metadata.get_dict(),
         "rejected_reason": event.rejected_reason,
-        "context": context.serialize(),
+        "context": context.serialize(event),
         "requester": requester.serialize(),
     }
 
@@ -96,7 +96,7 @@ class ReplicationSendEventRestServlet(RestServlet):
             event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
 
             requester = Requester.deserialize(self.store, content["requester"])
-            context = EventContext.deserialize(self.store, content["context"])
+            context = yield EventContext.deserialize(self.store, content["context"])
 
         if requester.user:
             request.authenticated_entity = requester.user.to_string()
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index d0a840456a..2b325e1c1f 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -140,6 +140,20 @@ class StateGroupWorkerStore(SQLBaseStore):
         defer.returnValue(group_to_state)
 
     @defer.inlineCallbacks
+    def get_state_ids_for_group(self, state_group):
+        """Get the state IDs for the given state group
+
+        Args:
+            state_group (int)
+
+        Returns:
+            Deferred[dict]: Resolves to a map of (type, state_key) -> event_id
+        """
+        group_to_state = yield self._get_state_for_groups((state_group,))
+
+        defer.returnValue(group_to_state[state_group])
+
+    @defer.inlineCallbacks
     def get_state_groups(self, room_id, event_ids):
         """ Get the state groups for the given list of event_ids