summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py14
-rw-r--r--synapse/replication/http/_base.py14
-rw-r--r--synapse/replication/http/federation.py10
-rw-r--r--synapse/replication/http/streams.py2
-rw-r--r--synapse/replication/tcp/handler.py18
-rw-r--r--synapse/replication/tcp/resource.py7
-rw-r--r--synapse/replication/tcp/streams/_base.py2
-rw-r--r--synapse/replication/tcp/streams/federation.py12
8 files changed, 46 insertions, 33 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py

index 4613b2538c..f43a4ed009 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py
@@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource): self.register_servlets(hs) def register_servlets(self, hs): - send_event.register_servlets(hs, self) - membership.register_servlets(hs, self) - federation.register_servlets(hs, self) - login.register_servlets(hs, self) - register.register_servlets(hs, self) - devices.register_servlets(hs, self) + if hs.config.worker_app is None: + send_event.register_servlets(hs, self) + membership.register_servlets(hs, self) + login.register_servlets(hs, self) + register.register_servlets(hs, self) + devices.register_servlets(hs, self) + streams.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a9ad601132..cdbc406c97 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -128,15 +128,23 @@ class ReplicationEndpoint(object): Returns a callable that accepts the same parameters as `_serialize_payload`. """ clock = hs.get_clock() - host = hs.config.worker_replication_host - port = hs.config.worker_replication_http_port + master_host = hs.config.worker_replication_host + master_port = hs.config.worker_replication_http_port + + instance_http_map = hs.config.instance_http_map client = hs.get_simple_http_client() @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): - if instance_name != "master": + if instance_name == "master": + host = master_host + port = master_port + elif instance_name in instance_http_map: + host = instance_http_map[instance_name]["host"] + port = instance_http_map[instance_name]["port"] + else: raise Exception("Unknown instance") data = yield cls._serialize_payload(**kwargs) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7e23b565b9..0a11743845 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py
@@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint): def register_servlets(hs, http_server): - ReplicationFederationSendEventsRestServlet(hs).register(http_server) + if hs.config.worker_app is None: + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) + ReplicationCleanRoomRestServlet(hs).register(http_server) + ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) - ReplicationGetQueryRestServlet(hs).register(http_server) - ReplicationCleanRoomRestServlet(hs).register(http_server) - ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 9c1fc9fb25..11a0270699 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py
@@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): # them ourselves we end up in an import loop). self.streams = hs.get_replication_streamer().get_streams() + self.instance_name = hs.config.worker_name or "master" + @staticmethod def _serialize_payload(stream_name, from_token, upto_token, limit): return {"from_token": from_token, "upto_token": upto_token, "limit": limit} diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cfba255897..ac4d6d1dd1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler: self.presence_handler = hs.get_presence_handler() self.instance_id = hs.get_instance_id() + self.instance_name = hs.config.worker.worker_name or "master" + self.connections = [] # type: List[Any] self.streams = { @@ -134,7 +136,9 @@ class ReplicationClientHandler: for stream_name, stream in self.streams.items(): current_token = stream.current_token() - self.send_command(PositionCommand(stream_name, "master", current_token)) + self.send_command( + PositionCommand(stream_name, self.instance_name, current_token) + ) async def on_USER_SYNC(self, cmd: UserSyncCommand): user_sync_counter.inc() @@ -232,17 +236,17 @@ class ReplicationClientHandler: return # Find where we previously streamed up to. - current_token = ( - self.replication_data_handler.get_streams_to_replicate() - .get(cmd.stream_name, {}) - .get(cmd.instance_name) + current_tokens = self.replication_data_handler.get_streams_to_replicate().get( + cmd.stream_name ) - if current_token is None: + if current_tokens is None: logger.debug( "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name ) return + current_token = current_tokens.get(cmd.instance_name, 0) + # Fetch all updates between then and now. limited = cmd.token != current_token while limited: @@ -335,7 +339,7 @@ class ReplicationClientHandler: We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, "master", token, data)) + self.send_command(RdataCommand(stream_name, self.instance_name, token, data)) class ReplicationDataHandler: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index bbd2c6ec41..d421cc477f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -74,9 +74,7 @@ class ReplicationStreamer(object): self.streams = [] # type: List[Stream] if hs.config.worker_app is None: for stream in STREAMS_MAP.values(): - if stream == FederationStream and hs.config.send_federation: - # We only support federation stream if federation sending - # hase been disabled on the master. + if stream == FederationStream: continue if stream == TypingStream: @@ -87,6 +85,9 @@ class ReplicationStreamer(object): if hs.config.server.handle_typing: self.streams.append(TypingStream(hs)) + # We always add federation stream + self.streams.append(FederationStream(hs)) + self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.notifier.add_replication_callback(self.on_notifier_poke) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 87ab631fd4..e68b220956 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -256,7 +256,7 @@ class TypingStream(Stream): self.current_token = typing_handler.get_current_token # type: ignore - if hs.config.worker_app is None: + if hs.config.handle_typing: self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore else: # Query master process diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..40951e81c9 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@ # limitations under the License. from collections import namedtuple -from twisted.internet import defer - from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function @@ -41,12 +39,8 @@ class FederationStream(Stream): # Not all synapse instances will have a federation sender instance, # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, # so we stub the stream out when that is the case. - if hs.config.worker_app is None or hs.should_send_federation(): - federation_sender = hs.get_federation_sender() - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore - else: - self.current_token = lambda: 0 # type: ignore - self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore + federation_sender = hs.get_federation_sender() + self.current_token = federation_sender.get_current_token # type: ignore + self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore super(FederationStream, self).__init__(hs)