summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py57
1 files changed, 56 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..1d4ceac0f1 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -23,7 +23,9 @@ from prometheus_client import Counter
 from twisted.internet.protocol import Factory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.commands import PositionCommand
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import EventsStream
 from synapse.util.metrics import Measure
 
 stream_updates_counter = Counter(
@@ -84,6 +86,23 @@ class ReplicationStreamer:
         # Set of streams to replicate.
         self.streams = self.command_handler.get_streams_to_replicate()
 
+        # If we have streams then we must have redis enabled or on master
+        assert (
+            not self.streams
+            or hs.config.redis.redis_enabled
+            or not hs.config.worker.worker_app
+        )
+
+        # If we are replicating an event stream we want to periodically check if
+        # we should send updated POSITIONs. We do this as a looping call rather
+        # explicitly poking when the position advances (without new data to
+        # replicate) to reduce replication traffic (otherwise each writer would
+        # likely send a POSITION for each new event received over replication).
+        #
+        # Note that if the position hasn't advanced then we won't send anything.
+        if any(EventsStream.NAME == s.NAME for s in self.streams):
+            self.clock.looping_call(self.on_notifier_poke, 1000)
+
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.
@@ -91,13 +110,23 @@ class ReplicationStreamer:
         This should get called each time new data is available, even if it
         is currently being executed, so that nothing gets missed
         """
-        if not self.command_handler.connected():
+        if not self.command_handler.connected() or not self.streams:
             # Don't bother if nothing is listening. We still need to advance
             # the stream tokens otherwise they'll fall behind forever
             for stream in self.streams:
                 stream.discard_updates_and_advance()
             return
 
+        # We check up front to see if anything has actually changed, as we get
+        # poked because of changes that happened on other instances.
+        if all(
+            stream.last_token == stream.current_token(self._instance_name)
+            for stream in self.streams
+        ):
+            return
+
+        # If there are updates then we need to set this even if we're already
+        # looping, as the loop needs to know that he might need to loop again.
         self.pending_updates = True
 
         if self.is_looping:
@@ -136,6 +165,8 @@ class ReplicationStreamer:
                                 self._replication_torture_level / 1000.0
                             )
 
+                        last_token = stream.last_token
+
                         logger.debug(
                             "Getting stream: %s: %s -> %s",
                             stream.NAME,
@@ -159,6 +190,30 @@ class ReplicationStreamer:
                             )
                             stream_updates_counter.labels(stream.NAME).inc(len(updates))
 
+                        else:
+                            # The token has advanced but there is no data to
+                            # send, so we send a `POSITION` to inform other
+                            # workers of the updated position.
+                            if stream.NAME == EventsStream.NAME:
+                                # XXX: We only do this for the EventStream as it
+                                # turns out that e.g. account data streams share
+                                # their "current token" with each other, meaning
+                                # that it is *not* safe to send a POSITION.
+                                logger.info(
+                                    "Sending position: %s -> %s",
+                                    stream.NAME,
+                                    current_token,
+                                )
+                                self.command_handler.send_command(
+                                    PositionCommand(
+                                        stream.NAME,
+                                        self._instance_name,
+                                        last_token,
+                                        current_token,
+                                    )
+                                )
+                            continue
+
                         # Some streams return multiple rows with the same stream IDs,
                         # we need to make sure they get sent out in batches. We do
                         # this by setting the current token to all but the last of