diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 8cd47770c1..3bc06d59d5 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -171,6 +171,37 @@ class PositionCommand(Command):
return " ".join((self.stream_name, self.instance_name, str(self.token)))
+class PersistedToCommand(Command):
+ """Sent by writers to inform others that it has persisted up to the included
+ token.
+
+ The included `token` will *not* have been persisted by the instance.
+
+ Format::
+
+ PERSISTED_TO <stream_name> <instance_name> <token>
+
+ On receipt the client should mark that the given instances has persisted
+ everything up to the given token. Note: this does *not* mean that other
+ instances have also persisted all their rows up to that point.
+ """
+
+ NAME = "PERSISTED_TO"
+
+ def __init__(self, stream_name, instance_name, token):
+ self.stream_name = stream_name
+ self.instance_name = instance_name
+ self.token = token
+
+ @classmethod
+ def from_line(cls, line):
+ stream_name, instance_name, token = line.split(" ", 2)
+ return cls(stream_name, instance_name, int(token))
+
+ def to_line(self):
+ return " ".join((self.stream_name, self.instance_name, str(self.token)))
+
+
class ErrorCommand(_SimpleCommand):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
@@ -405,6 +436,7 @@ _COMMANDS = (
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
+ PersistedToCommand,
) # type: Tuple[Type[Command], ...]
# Map of command name to command type.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b323841f73..08049fe2e0 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -47,6 +47,7 @@ from synapse.replication.tcp.commands import (
ReplicateCommand,
UserIpCommand,
UserSyncCommand,
+ PersistedToCommand,
)
from synapse.replication.tcp.protocol import AbstractConnection
from synapse.replication.tcp.streams import (
@@ -387,6 +388,9 @@ class ReplicationCommandHandler:
assert self._server_notices_sender is not None
await self._server_notices_sender.on_user_ip(cmd.user_id)
+ def on_PERSISTED_TO(self, conn: AbstractConnection, cmd: PersistedToCommand):
+ pass
+
def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..623d7fff3f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import Factory
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
+from synapse.replication.tcp.streams import EventsStream
from synapse.util.metrics import Measure
stream_updates_counter = Counter(
@@ -84,6 +85,9 @@ class ReplicationStreamer:
# Set of streams to replicate.
self.streams = self.command_handler.get_streams_to_replicate()
+ if self.streams:
+ self.clock.looping_call(self.on_notifier_poke, 1000.0)
+
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
@@ -126,9 +130,7 @@ class ReplicationStreamer:
random.shuffle(all_streams)
for stream in all_streams:
- if stream.last_token == stream.current_token(
- self._instance_name
- ):
+ if not stream.has_updates():
continue
if self._replication_torture_level:
@@ -174,6 +176,11 @@ class ReplicationStreamer:
except Exception:
logger.exception("Failed to replicate")
+ # for command in stream.extra_commands(
+ # sent_updates=bool(updates)
+ # ):
+ # self.command_handler.send_command(command)
+
logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 54dccd15a6..f3ea34f886 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -31,6 +31,7 @@ from typing import (
import attr
from synapse.replication.http.streams import ReplicationGetStreamUpdates
+from synapse.replication.tcp.commands import Command
if TYPE_CHECKING:
import synapse.server
@@ -187,6 +188,12 @@ class Stream:
)
return updates, upto_token, limited
+ def has_updates(self) -> bool:
+ return self.current_token(self.local_instance_name) != self.last_token
+
+ def extra_commands(self, sent_updates: bool) -> List[Command]:
+ return []
+
def current_token_without_instance(
current_token: Callable[[], int]
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..1aa7ba3da6 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,8 @@ from typing import List, Tuple, Type
import attr
-from ._base import Stream, StreamUpdateResult, Token
+from synapse.replication.tcp.streams._base import Stream, StreamUpdateResult, Token
+from synapse.replication.tcp.commands import Command, PersistedToCommand
"""Handling of the 'events' replication stream
@@ -222,3 +223,18 @@ class EventsStream(Stream):
(typ, data) = row
data = TypeToRow[typ].from_data(data)
return EventsStreamRow(typ, data)
+
+ def has_updates(self) -> bool:
+ return True
+
+ def extra_commands(self, sent_updates: bool) -> List[Command]:
+ if sent_updates:
+ return []
+
+ return [
+ PersistedToCommand(
+ self.NAME,
+ self.local_instance_name,
+ self._store._stream_id_gen.get_max_persisted_position_for_self(),
+ )
+ ]
|