summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/3582.misc1
-rw-r--r--synapse/events/snapshot.py10
-rw-r--r--synapse/handlers/message.py5
-rw-r--r--synapse/replication/http/send_event.py7
4 files changed, 15 insertions, 8 deletions
diff --git a/changelog.d/3582.misc b/changelog.d/3582.misc
new file mode 100644
index 0000000000..2374dc0c44
--- /dev/null
+++ b/changelog.d/3582.misc
@@ -0,0 +1 @@
+Lazily load state on master process when using workers to reduce DB consumption
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e31eceb921..a59064b416 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -110,7 +110,8 @@ class EventContext(object):
 
         return context
 
-    def serialize(self, event):
+    @defer.inlineCallbacks
+    def serialize(self, event, store):
         """Converts self to a type that can be serialized as JSON, and then
         deserialized by `deserialize`
 
@@ -126,11 +127,12 @@ class EventContext(object):
         # the prev_state_ids, so if we're a state event we include the event
         # id that we replaced in the state.
         if event.is_state():
-            prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
+            prev_state_ids = yield self.get_prev_state_ids(store)
+            prev_state_id = prev_state_ids.get((event.type, event.state_key))
         else:
             prev_state_id = None
 
-        return {
+        defer.returnValue({
             "prev_state_id": prev_state_id,
             "event_type": event.type,
             "event_state_key": event.state_key if event.is_state() else None,
@@ -140,7 +142,7 @@ class EventContext(object):
             "delta_ids": _encode_state_dict(self.delta_ids),
             "prev_state_events": self.prev_state_events,
             "app_service_id": self.app_service.id if self.app_service else None
-        }
+        })
 
     @staticmethod
     def deserialize(store, input):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c4bcd9018b..7571975c22 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -807,8 +807,9 @@ class EventCreationHandler(object):
             # If we're a worker we need to hit out to the master.
             if self.config.worker_app:
                 yield send_event_to_master(
-                    self.hs.get_clock(),
-                    self.http_client,
+                    clock=self.hs.get_clock(),
+                    store=self.store,
+                    client=self.http_client,
                     host=self.config.worker_replication_host,
                     port=self.config.worker_replication_http_port,
                     requester=requester,
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 2eede54792..5227bc333d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(clock, client, host, port, requester, event, context,
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
         clock (synapse.util.Clock)
+        store (DataStore)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
         host, port, event.event_id,
     )
 
+    serialized_context = yield context.serialize(event, store)
+
     payload = {
         "event": event.get_pdu_json(),
         "internal_metadata": event.internal_metadata.get_dict(),
         "rejected_reason": event.rejected_reason,
-        "context": context.serialize(event),
+        "context": serialized_context,
         "requester": requester.serialize(),
         "ratelimit": ratelimit,
         "extra_users": [u.to_string() for u in extra_users],