summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/commands.py32
-rw-r--r--synapse/replication/tcp/handler.py4
-rw-r--r--synapse/replication/tcp/resource.py13
-rw-r--r--synapse/replication/tcp/streams/_base.py7
-rw-r--r--synapse/replication/tcp/streams/events.py18
5 files changed, 70 insertions, 4 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py

index 8cd47770c1..3bc06d59d5 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -171,6 +171,37 @@ class PositionCommand(Command): return " ".join((self.stream_name, self.instance_name, str(self.token))) +class PersistedToCommand(Command): + """Sent by writers to inform others that it has persisted up to the included + token. + + The included `token` will *not* have been persisted by the instance. + + Format:: + + PERSISTED_TO <stream_name> <instance_name> <token> + + On receipt the client should mark that the given instances has persisted + everything up to the given token. Note: this does *not* mean that other + instances have also persisted all their rows up to that point. + """ + + NAME = "PERSISTED_TO" + + def __init__(self, stream_name, instance_name, token): + self.stream_name = stream_name + self.instance_name = instance_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, instance_name, token = line.split(" ", 2) + return cls(stream_name, instance_name, int(token)) + + def to_line(self): + return " ".join((self.stream_name, self.instance_name, str(self.token))) + + class ErrorCommand(_SimpleCommand): """Sent by either side if there was an ERROR. The data is a string describing the error. @@ -405,6 +436,7 @@ _COMMANDS = ( UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + PersistedToCommand, ) # type: Tuple[Type[Command], ...] # Map of command name to command type. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index b323841f73..08049fe2e0 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -47,6 +47,7 @@ from synapse.replication.tcp.commands import ( ReplicateCommand, UserIpCommand, UserSyncCommand, + PersistedToCommand, ) from synapse.replication.tcp.protocol import AbstractConnection from synapse.replication.tcp.streams import ( @@ -387,6 +388,9 @@ class ReplicationCommandHandler: assert self._server_notices_sender is not None await self._server_notices_sender.on_user_ip(cmd.user_id) + def on_PERSISTED_TO(self, conn: AbstractConnection, cmd: PersistedToCommand): + pass + def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand): if cmd.instance_name == self._instance_name: # Ignore RDATA that are just our own echoes diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 687984e7a8..623d7fff3f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -24,6 +24,7 @@ from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol +from synapse.replication.tcp.streams import EventsStream from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -84,6 +85,9 @@ class ReplicationStreamer: # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() + if self.streams: + self.clock.looping_call(self.on_notifier_poke, 1000.0) + def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -126,9 +130,7 @@ class ReplicationStreamer: random.shuffle(all_streams) for stream in all_streams: - if stream.last_token == stream.current_token( - self._instance_name - ): + if not stream.has_updates(): continue if self._replication_torture_level: @@ -174,6 +176,11 @@ class ReplicationStreamer: except Exception: logger.exception("Failed to replicate") + # for command in stream.extra_commands( + # sent_updates=bool(updates) + # ): + # self.command_handler.send_command(command) + logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 54dccd15a6..f3ea34f886 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -31,6 +31,7 @@ from typing import ( import attr from synapse.replication.http.streams import ReplicationGetStreamUpdates +from synapse.replication.tcp.commands import Command if TYPE_CHECKING: import synapse.server @@ -187,6 +188,12 @@ class Stream: ) return updates, upto_token, limited + def has_updates(self) -> bool: + return self.current_token(self.local_instance_name) != self.last_token + + def extra_commands(self, sent_updates: bool) -> List[Command]: + return [] + def current_token_without_instance( current_token: Callable[[], int] diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index ccc7ca30d8..1aa7ba3da6 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py
@@ -19,7 +19,8 @@ from typing import List, Tuple, Type import attr -from ._base import Stream, StreamUpdateResult, Token +from synapse.replication.tcp.streams._base import Stream, StreamUpdateResult, Token +from synapse.replication.tcp.commands import Command, PersistedToCommand """Handling of the 'events' replication stream @@ -222,3 +223,18 @@ class EventsStream(Stream): (typ, data) = row data = TypeToRow[typ].from_data(data) return EventsStreamRow(typ, data) + + def has_updates(self) -> bool: + return True + + def extra_commands(self, sent_updates: bool) -> List[Command]: + if sent_updates: + return [] + + return [ + PersistedToCommand( + self.NAME, + self.local_instance_name, + self._store._stream_id_gen.get_max_persisted_position_for_self(), + ) + ]