diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-25 14:55:02 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-25 14:55:02 +0000 |
commit | 83ecaeecbf9997b5ceb532fa965f071084ca61ee (patch) | |
tree | f62a181a3a651ff595311bfa166778b4d4bfb000 | |
parent | Pass instance name through to rdata (diff) | |
download | synapse-github/erikj/split_out_fed_stream.tar.xz |
dkjfhsdklfhsdlkjf github/erikj/split_out_fed_stream erikj/split_out_fed_stream
-rw-r--r-- | synapse/app/generic_worker.py | 28 | ||||
-rw-r--r-- | synapse/config/workers.py | 2 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 11 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 26 | ||||
-rw-r--r-- | synapse/replication/http/__init__.py | 14 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 14 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 10 | ||||
-rw-r--r-- | synapse/replication/http/streams.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 18 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/federation.py | 12 | ||||
-rw-r--r-- | synapse/server.py | 4 |
13 files changed, 90 insertions, 60 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 383edf07ad..c265bc0803 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): self.pusher_pool = hs.get_pusherpool() if hs.config.send_federation: - self.send_handler = FederationSenderHandler(hs, self) + self.send_handler = FederationSenderHandler(hs) else: self.send_handler = None @@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler): async def process_and_notify(self, stream_name, instance_name, token, rows): try: if self.send_handler: - self.send_handler.process_replication_rows(stream_name, token, rows) + self.send_handler.process_replication_rows( + stream_name, instance_name, token, rows + ) if stream_name == EventsStream.NAME: # We shouldn't get multiple rows per token for events stream, so @@ -724,13 +726,14 @@ class FederationSenderHandler(object): to the federation sender. """ - def __init__(self, hs: GenericWorkerServer, replication_client): + def __init__(self, hs: GenericWorkerServer): + self.hs = hs self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() - self.replication_client = replication_client + # self.replication_client = hs.get_tcp_replication() - self.federation_position = self.store.federation_out_pos_startup + self.federation_position = {"master": self.store.federation_out_pos_startup} self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") self._last_ack = self.federation_position @@ -749,14 +752,14 @@ class FederationSenderHandler(object): self.federation_sender.wake_destination(server) def stream_positions(self): - return {"federation": {"master": self.federation_position}} + return {"federation": self.federation_position} - def process_replication_rows(self, stream_name, token, rows): + def process_replication_rows(self, stream_name, instance_name, token, rows): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - run_in_background(self.update_token, token) + run_in_background(self.update_token, instance_name, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": @@ -804,9 +807,12 @@ class FederationSenderHandler(object): ) await self.federation_sender.send_read_receipt(receipt_info) - async def update_token(self, token): + async def update_token(self, instance_name, token): try: - self.federation_position = token + self.federation_position[instance_name] = token + return + + # FIXME # We linearize here to ensure we don't have races updating the token with (await self._fed_position_linearizer.queue(None)): @@ -817,7 +823,7 @@ class FederationSenderHandler(object): # We ACK this token over replication so that the master can drop # its in memory queues - self.replication_client.send_federation_ack( + self.hs.get_tcp_replication().send_federation_ack( self.federation_position ) self._last_ack = self.federation_position diff --git a/synapse/config/workers.py b/synapse/config/workers.py index fef72ed974..43bdf039e7 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -48,6 +48,8 @@ class WorkerConfig(Config): self.worker_main_http_uri = config.get("worker_main_http_uri", None) + self.instance_http_map = config.get("instance_http_map", {}) + # This option is really only here to support `--manhole` command line # argument. manhole = config.get("worker_manhole") diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 89d521bc31..6fb65b5dfb 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -819,7 +819,16 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): edu_type, origin, content ) - return await self._send_edu(edu_type=edu_type, origin=origin, content=content) + if edu_type == "m.typing": + instance_name = "synapse.app.client_reader" + else: + instance_name = "master" + return await self._send_edu( + instance_name=instance_name, + edu_type=edu_type, + origin=origin, + content=content, + ) async def on_query(self, query_type, args): """Overrides FederationHandlerRegistry diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index f8a347fa85..3e1eb27d55 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -56,7 +56,7 @@ class TypingHandler(object): self.clock = hs.get_clock() self.wheel_timer = WheelTimer(bucket_size=5000) - # self.federation = hs.get_federation_sender() + self.federation = hs.get_federation_sender() hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu) @@ -203,16 +203,16 @@ class TypingHandler(object): for domain in {get_domain_from_id(u) for u in users}: if domain != self.server_name: logger.debug("sending typing update to %s", domain) - # self.federation.build_and_send_edu( - # destination=domain, - # edu_type="m.typing", - # content={ - # "room_id": member.room_id, - # "user_id": member.user_id, - # "typing": typing, - # }, - # key=member, - # ) + self.federation.build_and_send_edu( + destination=domain, + edu_type="m.typing", + content={ + "room_id": member.room_id, + "user_id": member.user_id, + "typing": typing, + }, + key=member, + ) except Exception: logger.exception("Error pushing typing notif to remotes") @@ -309,7 +309,9 @@ class TypingSlaveHandler(object): # We must update this typing token from the response of the previous # sync. In particular, the stream id may "reset" back to zero/a low # value which we *must* use for the next replication request. - return {"typing": {"master": self._latest_room_serial}} + return { + "typing": {"synapse.app.client_reader": self._latest_room_serial} + } # FIXME def process_replication_rows(self, stream_name, token, rows): if stream_name != TypingStream.NAME: diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 4613b2538c..f43a4ed009 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource): self.register_servlets(hs) def register_servlets(self, hs): - send_event.register_servlets(hs, self) - membership.register_servlets(hs, self) - federation.register_servlets(hs, self) - login.register_servlets(hs, self) - register.register_servlets(hs, self) - devices.register_servlets(hs, self) + if hs.config.worker_app is None: + send_event.register_servlets(hs, self) + membership.register_servlets(hs, self) + login.register_servlets(hs, self) + register.register_servlets(hs, self) + devices.register_servlets(hs, self) + streams.register_servlets(hs, self) + federation.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index a9ad601132..cdbc406c97 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -128,15 +128,23 @@ class ReplicationEndpoint(object): Returns a callable that accepts the same parameters as `_serialize_payload`. """ clock = hs.get_clock() - host = hs.config.worker_replication_host - port = hs.config.worker_replication_http_port + master_host = hs.config.worker_replication_host + master_port = hs.config.worker_replication_http_port + + instance_http_map = hs.config.instance_http_map client = hs.get_simple_http_client() @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(instance_name="master", **kwargs): - if instance_name != "master": + if instance_name == "master": + host = master_host + port = master_port + elif instance_name in instance_http_map: + host = instance_http_map[instance_name]["host"] + port = instance_http_map[instance_name]["port"] + else: raise Exception("Unknown instance") data = yield cls._serialize_payload(**kwargs) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7e23b565b9..0a11743845 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint): def register_servlets(hs, http_server): - ReplicationFederationSendEventsRestServlet(hs).register(http_server) + if hs.config.worker_app is None: + ReplicationFederationSendEventsRestServlet(hs).register(http_server) + ReplicationGetQueryRestServlet(hs).register(http_server) + ReplicationCleanRoomRestServlet(hs).register(http_server) + ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) + ReplicationFederationSendEduRestServlet(hs).register(http_server) - ReplicationGetQueryRestServlet(hs).register(http_server) - ReplicationCleanRoomRestServlet(hs).register(http_server) - ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index 9c1fc9fb25..11a0270699 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): # them ourselves we end up in an import loop). self.streams = hs.get_replication_streamer().get_streams() + self.instance_name = hs.config.worker_name or "master" + @staticmethod def _serialize_payload(stream_name, from_token, upto_token, limit): return {"from_token": from_token, "upto_token": upto_token, "limit": limit} diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index cfba255897..ac4d6d1dd1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -63,6 +63,8 @@ class ReplicationClientHandler: self.presence_handler = hs.get_presence_handler() self.instance_id = hs.get_instance_id() + self.instance_name = hs.config.worker.worker_name or "master" + self.connections = [] # type: List[Any] self.streams = { @@ -134,7 +136,9 @@ class ReplicationClientHandler: for stream_name, stream in self.streams.items(): current_token = stream.current_token() - self.send_command(PositionCommand(stream_name, "master", current_token)) + self.send_command( + PositionCommand(stream_name, self.instance_name, current_token) + ) async def on_USER_SYNC(self, cmd: UserSyncCommand): user_sync_counter.inc() @@ -232,17 +236,17 @@ class ReplicationClientHandler: return # Find where we previously streamed up to. - current_token = ( - self.replication_data_handler.get_streams_to_replicate() - .get(cmd.stream_name, {}) - .get(cmd.instance_name) + current_tokens = self.replication_data_handler.get_streams_to_replicate().get( + cmd.stream_name ) - if current_token is None: + if current_tokens is None: logger.debug( "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name ) return + current_token = current_tokens.get(cmd.instance_name, 0) + # Fetch all updates between then and now. limited = cmd.token != current_token while limited: @@ -335,7 +339,7 @@ class ReplicationClientHandler: We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, "master", token, data)) + self.send_command(RdataCommand(stream_name, self.instance_name, token, data)) class ReplicationDataHandler: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index bbd2c6ec41..d421cc477f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -74,9 +74,7 @@ class ReplicationStreamer(object): self.streams = [] # type: List[Stream] if hs.config.worker_app is None: for stream in STREAMS_MAP.values(): - if stream == FederationStream and hs.config.send_federation: - # We only support federation stream if federation sending - # hase been disabled on the master. + if stream == FederationStream: continue if stream == TypingStream: @@ -87,6 +85,9 @@ class ReplicationStreamer(object): if hs.config.server.handle_typing: self.streams.append(TypingStream(hs)) + # We always add federation stream + self.streams.append(FederationStream(hs)) + self.streams_by_name = {stream.NAME: stream for stream in self.streams} self.notifier.add_replication_callback(self.on_notifier_poke) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 87ab631fd4..e68b220956 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -256,7 +256,7 @@ class TypingStream(Stream): self.current_token = typing_handler.get_current_token # type: ignore - if hs.config.worker_app is None: + if hs.config.handle_typing: self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates) # type: ignore else: # Query master process diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py index 48c1d45718..40951e81c9 100644 --- a/synapse/replication/tcp/streams/federation.py +++ b/synapse/replication/tcp/streams/federation.py @@ -15,8 +15,6 @@ # limitations under the License. from collections import namedtuple -from twisted.internet import defer - from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function @@ -41,12 +39,8 @@ class FederationStream(Stream): # Not all synapse instances will have a federation sender instance, # whether that's a `FederationSender` or a `FederationRemoteSendQueue`, # so we stub the stream out when that is the case. - if hs.config.worker_app is None or hs.should_send_federation(): - federation_sender = hs.get_federation_sender() - self.current_token = federation_sender.get_current_token # type: ignore - self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore - else: - self.current_token = lambda: 0 # type: ignore - self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool)) # type: ignore + federation_sender = hs.get_federation_sender() + self.current_token = federation_sender.get_current_token # type: ignore + self.update_function = db_query_to_update_function(federation_sender.get_replication_rows) # type: ignore super(FederationStream, self).__init__(hs) diff --git a/synapse/server.py b/synapse/server.py index 8955c32cc4..2136061f1c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -457,10 +457,8 @@ class HomeServer(object): def build_federation_sender(self): if self.should_send_federation(): return FederationSender(self) - elif not self.config.worker_app: - return FederationRemoteSendQueue(self) else: - raise Exception("Workers cannot send federation traffic") + return FederationRemoteSendQueue(self) def build_receipts_handler(self): return ReceiptsHandler(self) |