summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 14:55:02 +0000
committerErik Johnston <erik@matrix.org>2020-03-25 14:55:02 +0000
commit83ecaeecbf9997b5ceb532fa965f071084ca61ee (patch)
treef62a181a3a651ff595311bfa166778b4d4bfb000 /synapse/replication/tcp
parentPass instance name through to rdata (diff)
downloadsynapse-github/erikj/split_out_fed_stream.tar.xz
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/handler.py18
-rw-r--r--synapse/replication/tcp/resource.py7
-rw-r--r--synapse/replication/tcp/streams/_base.py2
-rw-r--r--synapse/replication/tcp/streams/federation.py12
4 files changed, 19 insertions, 20 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index cfba255897..ac4d6d1dd1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler: self.presence_handler = hs.get_presence_handler() self.instance_id = hs.get_instance_id() + self.instance_name = hs.config.worker.worker_name or "master" + self.connections = [] # type: List[Any] self.streams = { @@ -134,7 +136,9 @@ class ReplicationClientHandler: for stream_name, stream in self.streams.items(): current_token = stream.current_token() - self.send_command(PositionCommand(stream_name, "master", current_token)) + self.send_command( + PositionCommand(stream_name, self.instance_name, current_token) + ) async def on_USER_SYNC(self, cmd: UserSyncCommand): user_sync_counter.inc() @@ -232,17 +236,17 @@ class ReplicationClientHandler: return # Find where we previously streamed up to. - current_token = ( - self.replication_data_handler.get_streams_to_replicate() - .get(cmd.stream_name, {}) - .get(cmd.instance_name) + current_tokens = self.replication_data_handler.get_streams_to_replicate().get( + cmd.stream_name ) - if current_token is None: + if current_tokens is None: logger.debug( "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name ) return + current_token = current_tokens.get(cmd.instance_name, 0) + # Fetch all updates between then and now. limited = cmd.token != current_token while limited: @@ -335,7 +339,7 @@ class ReplicationClientHandler: We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, "master", token, data)) + self.send_command(RdataCommand(stream_name, self.instance_name, token, data)) class ReplicationDataHandler: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index bbd2c6ec41..d421cc477f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -74,9 +74,7 @@ class ReplicationStreamer(object): self.streams = [] # type: List[Stream] if hs.config.worker_app is None: for stream in STREAMS_MAP.values(): - if stream == FederationStream and hs.config.send_federation: - # We only support federation stream if federation sending - # hase been disabled on the master. + if stream == FederationStream: continue if stream == TypingStream: @@ -87,6 +85,9 @@ class ReplicationStreamer(object): if hs.config.server.handle_typing: self.streams.append(TypingStream(hs)) + # We always add federation stream + self.streams.append(FederationStream(hs)) + self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.notifier.add_replication_callback(self.on_notifier_poke) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 87ab631fd4..e68b220956 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -256,7 +256,7 @@ class TypingStream(Stream): self.current_token = typing_handler.get_current_token # type: ignore - if hs.config.worker_app is None: + if hs.config.handle_typing: self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore else: # Query master process diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..40951e81c9 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@ # limitations under the License. from collections import namedtuple -from twisted.internet import defer - from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function @@ -41,12 +39,8 @@ class FederationStream(Stream): # Not all synapse instances will have a federation sender instance, # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, # so we stub the stream out when that is the case. - if hs.config.worker_app is None or hs.should_send_federation(): - federation_sender = hs.get_federation_sender() - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore - else: - self.current_token = lambda: 0 # type: ignore - self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore + federation_sender = hs.get_federation_sender() + self.current_token = federation_sender.get_current_token # type: ignore + self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore super(FederationStream, self).__init__(hs)