diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e165429cad..e27ee216f0 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -191,6 +191,10 @@ class ReplicationDataHandler:
async def on_position(self, stream_name: str, instance_name: str, token: int):
self.store.process_replication_rows(stream_name, instance_name, token, [])
+ # We poke the generic "replication" notifier to wake anything up that
+ # may be streaming.
+ self.notifier.notify_replication()
+
def on_remote_server_up(self, server: str):
"""Called when get a new REMOTE_SERVER_UP command."""
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 8cd47770c1..ac532ed588 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -141,15 +141,23 @@ class RdataCommand(Command):
class PositionCommand(Command):
- """Sent by the server to tell the client the stream position without
- needing to send an RDATA.
+ """Sent by an instance to tell others the stream position without needing to
+ send an RDATA.
+
+ Two tokens are sent, the new position and the last position sent by the
+ instance (in an RDATA or other POSITION). The tokens are chosen so that *no*
+ rows were written by the instance between the `prev_token` and `new_token`.
+ (If an instance hasn't sent a position before then the new position can be
+ used for both.)
Format::
- POSITION <stream_name> <instance_name> <token>
+ POSITION <stream_name> <instance_name> <prev_token> <new_token>
- On receipt of a POSITION command clients should check if they have missed
- any updates, and if so then fetch them out of band.
+ On receipt of a POSITION command instances should check if they have missed
+ any updates, and if so then fetch them out of band. Instances can check this
+ by comparing their view of the current token for the sending instance with
+ the included `prev_token`.
The `<instance_name>` is the process that sent the command and is the source
of the stream.
@@ -157,18 +165,26 @@ class PositionCommand(Command):
NAME = "POSITION"
- def __init__(self, stream_name, instance_name, token):
+ def __init__(self, stream_name, instance_name, prev_token, new_token):
self.stream_name = stream_name
self.instance_name = instance_name
- self.token = token
+ self.prev_token = prev_token
+ self.new_token = new_token
@classmethod
def from_line(cls, line):
- stream_name, instance_name, token = line.split(" ", 2)
- return cls(stream_name, instance_name, int(token))
+ stream_name, instance_name, prev_token, new_token = line.split(" ", 3)
+ return cls(stream_name, instance_name, int(prev_token), int(new_token))
def to_line(self):
- return " ".join((self.stream_name, self.instance_name, str(self.token)))
+ return " ".join(
+ (
+ self.stream_name,
+ self.instance_name,
+ str(self.prev_token),
+ str(self.new_token),
+ )
+ )
class ErrorCommand(_SimpleCommand):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e92da7b263..95e5502bf2 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -101,8 +101,9 @@ class ReplicationCommandHandler:
self._streams_to_replicate = [] # type: List[Stream]
for stream in self._streams.values():
- if stream.NAME == CachesStream.NAME:
- # All workers can write to the cache invalidation stream.
+ if hs.config.redis.redis_enabled and stream.NAME == CachesStream.NAME:
+ # All workers can write to the cache invalidation stream when
+ # using redis.
self._streams_to_replicate.append(stream)
continue
@@ -313,11 +314,14 @@ class ReplicationCommandHandler:
# We respond with current position of all streams this instance
# replicates.
for stream in self.get_streams_to_replicate():
+ # Note that we use the current token as the prev token here (rather
+ # than stream.last_token), as we can't be sure that there have been
+ # no rows written between last token and the current token (since we
+ # might be racing with the replication sending bg process).
+ current_token = stream.current_token(self._instance_name)
self.send_command(
PositionCommand(
- stream.NAME,
- self._instance_name,
- stream.current_token(self._instance_name),
+ stream.NAME, self._instance_name, current_token, current_token,
)
)
@@ -511,16 +515,16 @@ class ReplicationCommandHandler:
# If the position token matches our current token then we're up to
# date and there's nothing to do. Otherwise, fetch all updates
# between then and now.
- missing_updates = cmd.token != current_token
+ missing_updates = cmd.prev_token != current_token
while missing_updates:
logger.info(
"Fetching replication rows for '%s' between %i and %i",
stream_name,
current_token,
- cmd.token,
+ cmd.new_token,
)
(updates, current_token, missing_updates) = await stream.get_updates_since(
- cmd.instance_name, current_token, cmd.token
+ cmd.instance_name, current_token, cmd.new_token
)
# TODO: add some tests for this
@@ -536,11 +540,11 @@ class ReplicationCommandHandler:
[stream.parse_row(row) for row in rows],
)
- logger.info("Caught up with stream '%s' to %i", stream_name, cmd.token)
+ logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
- cmd.stream_name, cmd.instance_name, cmd.token
+ cmd.stream_name, cmd.instance_name, cmd.new_token
)
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..666c13fdb7 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -23,7 +23,9 @@ from prometheus_client import Counter
from twisted.internet.protocol import Factory
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import EventsStream
from synapse.util.metrics import Measure
stream_updates_counter = Counter(
@@ -84,6 +86,23 @@ class ReplicationStreamer:
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
+ # If we have streams then we must have redis enabled or on master
+ assert (
+ not self.streams
+ or hs.config.redis.redis_enabled
+ or not hs.config.worker.worker_app
+ )
+
+ # If we are replicating an event stream we want to periodically check if
+ # we should send updated POSITIONs. We do this as a looping call rather
+ # explicitly poking when the position advances (without new data to
+ # replicate) to reduce replication traffic (otherwise each writer would
+ # likely send a POSITION for each new event received over replication).
+ #
+ # Note that if the position hasn't advanced then we won't send anything.
+ if any(EventsStream.NAME == s.NAME for s in self.streams):
+ self.clock.looping_call(self.on_notifier_poke, 1000)
+
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
@@ -91,7 +110,7 @@ class ReplicationStreamer:
This should get called each time new data is available, even if it
is currently being executed, so that nothing gets missed
"""
- if not self.command_handler.connected():
+ if not self.command_handler.connected() or not self.streams:
# Don't bother if nothing is listening. We still need to advance
# the stream tokens otherwise they'll fall behind forever
for stream in self.streams:
@@ -136,6 +155,8 @@ class ReplicationStreamer:
self._replication_torture_level / 1000.0
)
+ last_token = stream.last_token
+
logger.debug(
"Getting stream: %s: %s -> %s",
stream.NAME,
@@ -159,6 +180,30 @@ class ReplicationStreamer:
)
stream_updates_counter.labels(stream.NAME).inc(len(updates))
+ else:
+ # The token has advanced but there is no data to
+ # send, so we send a `POSITION` to inform other
+ # workers of the updated position.
+ if stream.NAME == EventsStream.NAME:
+ # XXX: We only do this for the EventStream as it
+ # turns out that e.g. account data streams share
+ # their "current token" with each other, meaning
+ # that it is *not* safe to send a POSITION.
+ logger.info(
+ "Sending position: %s -> %s",
+ stream.NAME,
+ current_token,
+ )
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ last_token,
+ current_token,
+ )
+ )
+ continue
+
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
# this by setting the current token to all but the last of
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index d7e40aaa8b..3d8da48f2d 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -524,6 +524,16 @@ class MultiWriterIdGenerator:
heapq.heappush(self._known_persisted_positions, new_id)
+ # If we're a writer and we don't have any active writes we update our
+ # current position to the latest position seen. This allows the instance
+ # to report a recent position when asked, rather than a potentially old
+ # one (if this instance hasn't written anything for a while).
+ our_current_position = self._current_positions.get(self._instance_name)
+ if our_current_position and not self._unfinished_ids:
+ self._current_positions[self._instance_name] = max(
+ our_current_position, new_id
+ )
+
# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py
index ff2d038ad2..4386b6101e 100644
--- a/synapse/storage/util/sequence.py
+++ b/synapse/storage/util/sequence.py
@@ -126,6 +126,8 @@ class PostgresSequenceGenerator(SequenceGenerator):
if max_stream_id > last_value:
logger.warning(
"Postgres sequence %s is behind table %s: %d < %d",
+ self._sequence_name,
+ table,
last_value,
max_stream_id,
)
|