summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/tcp/client.py4
-rw-r--r--synapse/replication/tcp/commands.py36
-rw-r--r--synapse/replication/tcp/handler.py24
-rw-r--r--synapse/replication/tcp/resource.py47
-rw-r--r--synapse/storage/util/id_generators.py10
-rw-r--r--synapse/storage/util/sequence.py2
6 files changed, 102 insertions, 21 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e165429cad..e27ee216f0 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -191,6 +191,10 @@ class ReplicationDataHandler:
     async def on_position(self, stream_name: str, instance_name: str, token: int):
         self.store.process_replication_rows(stream_name, instance_name, token, [])
 
+        # We poke the generic "replication" notifier to wake anything up that
+        # may be streaming.
+        self.notifier.notify_replication()
+
     def on_remote_server_up(self, server: str):
         """Called when get a new REMOTE_SERVER_UP command."""
 
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 8cd47770c1..ac532ed588 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -141,15 +141,23 @@ class RdataCommand(Command):
 
 
 class PositionCommand(Command):
-    """Sent by the server to tell the client the stream position without
-    needing to send an RDATA.
+    """Sent by an instance to tell others the stream position without needing to
+    send an RDATA.
+
+    Two tokens are sent, the new position and the last position sent by the
+    instance (in an RDATA or other POSITION). The tokens are chosen so that *no*
+    rows were written by the instance between the `prev_token` and `new_token`.
+    (If an instance hasn't sent a position before then the new position can be
+    used for both.)
 
     Format::
 
-        POSITION <stream_name> <instance_name> <token>
+        POSITION <stream_name> <instance_name> <prev_token> <new_token>
 
-    On receipt of a POSITION command clients should check if they have missed
-    any updates, and if so then fetch them out of band.
+    On receipt of a POSITION command instances should check if they have missed
+    any updates, and if so then fetch them out of band. Instances can check this
+    by comparing their view of the current token for the sending instance with
+    the included `prev_token`.
 
     The `<instance_name>` is the process that sent the command and is the source
     of the stream.
@@ -157,18 +165,26 @@ class PositionCommand(Command):
 
     NAME = "POSITION"
 
-    def __init__(self, stream_name, instance_name, token):
+    def __init__(self, stream_name, instance_name, prev_token, new_token):
         self.stream_name = stream_name
         self.instance_name = instance_name
-        self.token = token
+        self.prev_token = prev_token
+        self.new_token = new_token
 
     @classmethod
     def from_line(cls, line):
-        stream_name, instance_name, token = line.split(" ", 2)
-        return cls(stream_name, instance_name, int(token))
+        stream_name, instance_name, prev_token, new_token = line.split(" ", 3)
+        return cls(stream_name, instance_name, int(prev_token), int(new_token))
 
     def to_line(self):
-        return " ".join((self.stream_name, self.instance_name, str(self.token)))
+        return " ".join(
+            (
+                self.stream_name,
+                self.instance_name,
+                str(self.prev_token),
+                str(self.new_token),
+            )
+        )
 
 
 class ErrorCommand(_SimpleCommand):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e92da7b263..95e5502bf2 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -101,8 +101,9 @@ class ReplicationCommandHandler:
         self._streams_to_replicate = []  # type: List[Stream]
 
         for stream in self._streams.values():
-            if stream.NAME == CachesStream.NAME:
-                # All workers can write to the cache invalidation stream.
+            if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
+                # All workers can write to the cache invalidation stream when
+                # using redis.
                 self._streams_to_replicate.append(stream)
                 continue
 
@@ -313,11 +314,14 @@ class ReplicationCommandHandler:
         # We respond with current position of all streams this instance
         # replicates.
         for stream in self.get_streams_to_replicate():
+            # Note that we use the current token as the prev token here (rather
+            # than stream.last_token), as we can't be sure that there have been
+            # no rows written between last token and the current token (since we
+            # might be racing with the replication sending bg process).
+            current_token = stream.current_token(self._instance_name)
             self.send_command(
                 PositionCommand(
-                    stream.NAME,
-                    self._instance_name,
-                    stream.current_token(self._instance_name),
+                    stream.NAME, self._instance_name, current_token, current_token,
                 )
             )
 
@@ -511,16 +515,16 @@ class ReplicationCommandHandler:
         # If the position token matches our current token then we're up to
         # date and there's nothing to do. Otherwise, fetch all updates
         # between then and now.
-        missing_updates = cmd.token != current_token
+        missing_updates = cmd.prev_token != current_token
         while missing_updates:
             logger.info(
                 "Fetching replication rows for '%s' between %i and %i",
                 stream_name,
                 current_token,
-                cmd.token,
+                cmd.new_token,
             )
             (updates, current_token, missing_updates) = await stream.get_updates_since(
-                cmd.instance_name, current_token, cmd.token
+                cmd.instance_name, current_token, cmd.new_token
             )
 
             # TODO: add some tests for this
@@ -536,11 +540,11 @@ class ReplicationCommandHandler:
                     [stream.parse_row(row) for row in rows],
                 )
 
-        logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)
+        logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
 
         # We've now caught up to position sent to us, notify handler.
         await self._replication_data_handler.on_position(
-            cmd.stream_name, cmd.instance_name, cmd.token
+            cmd.stream_name, cmd.instance_name, cmd.new_token
         )
 
         self._streams_by_connection.setdefault(conn, set()).add(stream_name)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..666c13fdb7 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -23,7 +23,9 @@ from prometheus_client import Counter
 from twisted.internet.protocol import Factory
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.commands import PositionCommand
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import EventsStream
 from synapse.util.metrics import Measure
 
 stream_updates_counter = Counter(
@@ -84,6 +86,23 @@ class ReplicationStreamer:
         # Set of streams to replicate.
         self.streams = self.command_handler.get_streams_to_replicate()
 
+        # If we have streams then we must have redis enabled or on master
+        assert (
+            not self.streams
+            or hs.config.redis.redis_enabled
+            or not hs.config.worker.worker_app
+        )
+
+        # If we are replicating an event stream we want to periodically check if
+        # we should send updated POSITIONs. We do this as a looping call rather
+        # explicitly poking when the position advances (without new data to
+        # replicate) to reduce replication traffic (otherwise each writer would
+        # likely send a POSITION for each new event received over replication).
+        #
+        # Note that if the position hasn't advanced then we won't send anything.
+        if any(EventsStream.NAME == s.NAME for s in self.streams):
+            self.clock.looping_call(self.on_notifier_poke, 1000)
+
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.
@@ -91,7 +110,7 @@ class ReplicationStreamer:
         This should get called each time new data is available, even if it
         is currently being executed, so that nothing gets missed
         """
-        if not self.command_handler.connected():
+        if not self.command_handler.connected() or not self.streams:
             # Don't bother if nothing is listening. We still need to advance
             # the stream tokens otherwise they'll fall behind forever
             for stream in self.streams:
@@ -136,6 +155,8 @@ class ReplicationStreamer:
                                 self._replication_torture_level / 1000.0
                             )
 
+                        last_token = stream.last_token
+
                         logger.debug(
                             "Getting stream: %s: %s -> %s",
                             stream.NAME,
@@ -159,6 +180,30 @@ class ReplicationStreamer:
                             )
                             stream_updates_counter.labels(stream.NAME).inc(len(updates))
 
+                        else:
+                            # The token has advanced but there is no data to
+                            # send, so we send a `POSITION` to inform other
+                            # workers of the updated position.
+                            if stream.NAME == EventsStream.NAME:
+                                # XXX: We only do this for the EventStream as it
+                                # turns out that e.g. account data streams share
+                                # their "current token" with each other, meaning
+                                # that it is *not* safe to send a POSITION.
+                                logger.info(
+                                    "Sending position: %s -> %s",
+                                    stream.NAME,
+                                    current_token,
+                                )
+                                self.command_handler.send_command(
+                                    PositionCommand(
+                                        stream.NAME,
+                                        self._instance_name,
+                                        last_token,
+                                        current_token,
+                                    )
+                                )
+                            continue
+
                         # Some streams return multiple rows with the same stream IDs,
                         # we need to make sure they get sent out in batches. We do
                         # this by setting the current token to all but the last of
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index d7e40aaa8b..3d8da48f2d 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -524,6 +524,16 @@ class MultiWriterIdGenerator:
 
         heapq.heappush(self._known_persisted_positions, new_id)
 
+        # If we're a writer and we don't have any active writes we update our
+        # current position to the latest position seen. This allows the instance
+        # to report a recent position when asked, rather than a potentially old
+        # one (if this instance hasn't written anything for a while).
+        our_current_position = self._current_positions.get(self._instance_name)
+        if our_current_position and not self._unfinished_ids:
+            self._current_positions[self._instance_name] = max(
+                our_current_position, new_id
+            )
+
         # We move the current min position up if the minimum current positions
         # of all instances is higher (since by definition all positions less
         # that that have been persisted).
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index ff2d038ad2..4386b6101e 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -126,6 +126,8 @@ class PostgresSequenceGenerator(SequenceGenerator):
         if max_stream_id > last_value:
             logger.warning(
                 "Postgres sequence %s is behind table %s: %d < %d",
+                self._sequence_name,
+                table,
                 last_value,
                 max_stream_id,
             )