summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/send_event.py7
-rw-r--r--synapse/replication/tcp/client.py2
-rw-r--r--synapse/replication/tcp/resource.py14
3 files changed, 14 insertions, 9 deletions
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 2eede54792..5227bc333d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(clock, client, host, port, requester, event, context,
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
         clock (synapse.util.Clock)
+        store (DataStore)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
         host, port, event.event_id,
     )
 
+    serialized_context = yield context.serialize(event, store)
+
     payload = {
         "event": event.get_pdu_json(),
         "internal_metadata": event.internal_metadata.get_dict(),
         "rejected_reason": event.rejected_reason,
-        "context": context.serialize(event),
+        "context": serialized_context,
         "requester": requester.serialize(),
         "ratelimit": ratelimit,
         "extra_users": [u.to_string() for u in extra_users],
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e592ab57bf..970e94313e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -192,7 +192,7 @@ class ReplicationClientHandler(object):
         """Returns a deferred that is resolved when we receive a SYNC command
         with given data.
 
-        Used by tests.
+        [Not currently] used by tests.
         """
         return self.awaiting_syncs.setdefault(data, defer.Deferred())
 
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 611fb66e1d..fd59f1595f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -25,6 +25,7 @@ from twisted.internet import defer
 from twisted.internet.protocol import Factory
 
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.metrics import Measure, measure_func
 
 from .protocol import ServerReplicationStreamProtocol
@@ -117,7 +118,6 @@ class ReplicationStreamer(object):
         for conn in self.connections:
             conn.send_error("server shutting down")
 
-    @defer.inlineCallbacks
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.
@@ -132,14 +132,16 @@ class ReplicationStreamer(object):
                 stream.discard_updates_and_advance()
             return
 
-        # If we're in the process of checking for new updates, mark that fact
-        # and return
+        self.pending_updates = True
+
         if self.is_looping:
-            logger.debug("Noitifier poke loop already running")
-            self.pending_updates = True
+            logger.debug("Notifier poke loop already running")
             return
 
-        self.pending_updates = True
+        run_as_background_process("replication_notifier", self._run_notifier_loop)
+
+    @defer.inlineCallbacks
+    def _run_notifier_loop(self):
         self.is_looping = True
 
         try: