summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 14:55:02 +0000
committerErik Johnston <erik@matrix.org>2020-03-25 14:55:02 +0000
commit83ecaeecbf9997b5ceb532fa965f071084ca61ee (patch)
treef62a181a3a651ff595311bfa166778b4d4bfb000
parentPass instance name through to rdata (diff)
downloadsynapse-erikj/split_out_fed_stream.tar.xz
-rw-r--r--synapse/app/generic_worker.py28
-rw-r--r--synapse/config/workers.py2
-rw-r--r--synapse/federation/federation_server.py11
-rw-r--r--synapse/handlers/typing.py26
-rw-r--r--synapse/replication/http/__init__.py14
-rw-r--r--synapse/replication/http/_base.py14
-rw-r--r--synapse/replication/http/federation.py10
-rw-r--r--synapse/replication/http/streams.py2
-rw-r--r--synapse/replication/tcp/handler.py18
-rw-r--r--synapse/replication/tcp/resource.py7
-rw-r--r--synapse/replication/tcp/streams/_base.py2
-rw-r--r--synapse/replication/tcp/streams/federation.py12
-rw-r--r--synapse/server.py4
13 files changed, 90 insertions, 60 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 383edf07ad..c265bc0803 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -604,7 +604,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         self.pusher_pool = hs.get_pusherpool()
 
         if hs.config.send_federation:
-            self.send_handler = FederationSenderHandler(hs, self)
+            self.send_handler = FederationSenderHandler(hs)
         else:
             self.send_handler = None
 
@@ -624,7 +624,9 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
     async def process_and_notify(self, stream_name, instance_name, token, rows):
         try:
             if self.send_handler:
-                self.send_handler.process_replication_rows(stream_name, token, rows)
+                self.send_handler.process_replication_rows(
+                    stream_name, instance_name, token, rows
+                )
 
             if stream_name == EventsStream.NAME:
                 # We shouldn't get multiple rows per token for events stream, so
@@ -724,13 +726,14 @@ class FederationSenderHandler(object):
     to the federation sender.
     """
 
-    def __init__(self, hs: GenericWorkerServer, replication_client):
+    def __init__(self, hs: GenericWorkerServer):
+        self.hs = hs
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
         self.federation_sender = hs.get_federation_sender()
-        self.replication_client = replication_client
+        # self.replication_client = hs.get_tcp_replication()
 
-        self.federation_position = self.store.federation_out_pos_startup
+        self.federation_position = {"master": self.store.federation_out_pos_startup}
         self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer")
 
         self._last_ack = self.federation_position
@@ -749,14 +752,14 @@ class FederationSenderHandler(object):
         self.federation_sender.wake_destination(server)
 
     def stream_positions(self):
-        return {"federation": {"master": self.federation_position}}
+        return {"federation": self.federation_position}
 
-    def process_replication_rows(self, stream_name, token, rows):
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
         # The federation stream contains things that we want to send out, e.g.
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            run_in_background(self.update_token, token)
+            run_in_background(self.update_token, instance_name, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -804,9 +807,12 @@ class FederationSenderHandler(object):
             )
             await self.federation_sender.send_read_receipt(receipt_info)
 
-    async def update_token(self, token):
+    async def update_token(self, instance_name, token):
         try:
-            self.federation_position = token
+            self.federation_position[instance_name] = token
+            return
+
+            # FIXME
 
             # We linearize here to ensure we don't have races updating the token
             with (await self._fed_position_linearizer.queue(None)):
@@ -817,7 +823,7 @@ class FederationSenderHandler(object):
 
                     # We ACK this token over replication so that the master can drop
                     # its in memory queues
-                    self.replication_client.send_federation_ack(
+                    self.hs.get_tcp_replication().send_federation_ack(
                         self.federation_position
                     )
                     self._last_ack = self.federation_position
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index fef72ed974..43bdf039e7 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -48,6 +48,8 @@ class WorkerConfig(Config):
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
 
+        self.instance_http_map = config.get("instance_http_map", {})
+
         # This option is really only here to support `--manhole` command line
         # argument.
         manhole = config.get("worker_manhole")
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 89d521bc31..6fb65b5dfb 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -819,7 +819,16 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
                 edu_type, origin, content
             )
 
-        return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
+        if edu_type == "m.typing":
+            instance_name = "synapse.app.client_reader"
+        else:
+            instance_name = "master"
+        return await self._send_edu(
+            instance_name=instance_name,
+            edu_type=edu_type,
+            origin=origin,
+            content=content,
+        )
 
     async def on_query(self, query_type, args):
         """Overrides FederationHandlerRegistry
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f8a347fa85..3e1eb27d55 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
         self.clock = hs.get_clock()
         self.wheel_timer = WheelTimer(bucket_size=5000)
 
-        # self.federation = hs.get_federation_sender()
+        self.federation = hs.get_federation_sender()
 
         hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
 
@@ -203,16 +203,16 @@ class TypingHandler(object):
             for domain in {get_domain_from_id(u) for u in users}:
                 if domain != self.server_name:
                     logger.debug("sending typing update to %s", domain)
-                    # self.federation.build_and_send_edu(
-                    #     destination=domain,
-                    #     edu_type="m.typing",
-                    #     content={
-                    #         "room_id": member.room_id,
-                    #         "user_id": member.user_id,
-                    #         "typing": typing,
-                    #     },
-                    #     key=member,
-                    # )
+                    self.federation.build_and_send_edu(
+                        destination=domain,
+                        edu_type="m.typing",
+                        content={
+                            "room_id": member.room_id,
+                            "user_id": member.user_id,
+                            "typing": typing,
+                        },
+                        key=member,
+                    )
         except Exception:
             logger.exception("Error pushing typing notif to remotes")
 
@@ -309,7 +309,9 @@ class TypingSlaveHandler(object):
         # We must update this typing token from the response of the previous
         # sync. In particular, the stream id may "reset" back to zero/a low
         # value which we *must* use for the next replication request.
-        return {"typing": {"master": self._latest_room_serial}}
+        return {
+            "typing": {"synapse.app.client_reader": self._latest_room_serial}
+        }  # FIXME
 
     def process_replication_rows(self, stream_name, token, rows):
         if stream_name != TypingStream.NAME:
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 4613b2538c..f43a4ed009 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -33,10 +33,12 @@ class ReplicationRestResource(JsonResource):
         self.register_servlets(hs)
 
     def register_servlets(self, hs):
-        send_event.register_servlets(hs, self)
-        membership.register_servlets(hs, self)
-        federation.register_servlets(hs, self)
-        login.register_servlets(hs, self)
-        register.register_servlets(hs, self)
-        devices.register_servlets(hs, self)
+        if hs.config.worker_app is None:
+            send_event.register_servlets(hs, self)
+            membership.register_servlets(hs, self)
+            login.register_servlets(hs, self)
+            register.register_servlets(hs, self)
+            devices.register_servlets(hs, self)
+
         streams.register_servlets(hs, self)
+        federation.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index a9ad601132..cdbc406c97 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -128,15 +128,23 @@ class ReplicationEndpoint(object):
         Returns a callable that accepts the same parameters as `_serialize_payload`.
         """
         clock = hs.get_clock()
-        host = hs.config.worker_replication_host
-        port = hs.config.worker_replication_http_port
+        master_host = hs.config.worker_replication_host
+        master_port = hs.config.worker_replication_http_port
+
+        instance_http_map = hs.config.instance_http_map
 
         client = hs.get_simple_http_client()
 
         @trace(opname="outgoing_replication_request")
         @defer.inlineCallbacks
         def send_request(instance_name="master", **kwargs):
-            if instance_name != "master":
+            if instance_name == "master":
+                host = master_host
+                port = master_port
+            elif instance_name in instance_http_map:
+                host = instance_http_map[instance_name]["host"]
+                port = instance_http_map[instance_name]["port"]
+            else:
                 raise Exception("Unknown instance")
 
             data = yield cls._serialize_payload(**kwargs)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 7e23b565b9..0a11743845 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -277,8 +277,10 @@ class ReplicationStoreRoomOnInviteRestServlet(ReplicationEndpoint):
 
 
 def register_servlets(hs, http_server):
-    ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+    if hs.config.worker_app is None:
+        ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+        ReplicationGetQueryRestServlet(hs).register(http_server)
+        ReplicationCleanRoomRestServlet(hs).register(http_server)
+        ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
+
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
-    ReplicationGetQueryRestServlet(hs).register(http_server)
-    ReplicationCleanRoomRestServlet(hs).register(http_server)
-    ReplicationStoreRoomOnInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 9c1fc9fb25..11a0270699 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -51,6 +51,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         # them ourselves we end up in an import loop).
         self.streams = hs.get_replication_streamer().get_streams()
 
+        self.instance_name = hs.config.worker_name or "master"
+
     @staticmethod
     def _serialize_payload(stream_name, from_token, upto_token, limit):
         return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cfba255897..ac4d6d1dd1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler:
         self.presence_handler = hs.get_presence_handler()
         self.instance_id = hs.get_instance_id()
 
+        self.instance_name = hs.config.worker.worker_name or "master"
+
         self.connections = []  # type: List[Any]
 
         self.streams = {
@@ -134,7 +136,9 @@ class ReplicationClientHandler:
 
         for stream_name, stream in self.streams.items():
             current_token = stream.current_token()
-            self.send_command(PositionCommand(stream_name, "master", current_token))
+            self.send_command(
+                PositionCommand(stream_name, self.instance_name, current_token)
+            )
 
     async def on_USER_SYNC(self, cmd: UserSyncCommand):
         user_sync_counter.inc()
@@ -232,17 +236,17 @@ class ReplicationClientHandler:
             return
 
         # Find where we previously streamed up to.
-        current_token = (
-            self.replication_data_handler.get_streams_to_replicate()
-            .get(cmd.stream_name, {})
-            .get(cmd.instance_name)
+        current_tokens = self.replication_data_handler.get_streams_to_replicate().get(
+            cmd.stream_name
         )
-        if current_token is None:
+        if current_tokens is None:
             logger.debug(
                 "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
             )
             return
 
+        current_token = current_tokens.get(cmd.instance_name, 0)
+
         # Fetch all updates between then and now.
         limited = cmd.token != current_token
         while limited:
@@ -335,7 +339,7 @@ class ReplicationClientHandler:
 
         We need to check if the client is interested in the stream or not
         """
-        self.send_command(RdataCommand(stream_name, "master", token, data))
+        self.send_command(RdataCommand(stream_name, self.instance_name, token, data))
 
 
 class ReplicationDataHandler:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index bbd2c6ec41..d421cc477f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -74,9 +74,7 @@ class ReplicationStreamer(object):
         self.streams = []  # type: List[Stream]
         if hs.config.worker_app is None:
             for stream in STREAMS_MAP.values():
-                if stream == FederationStream and hs.config.send_federation:
-                    # We only support federation stream if federation sending
-                    # hase been disabled on the master.
+                if stream == FederationStream:
                     continue
 
                 if stream == TypingStream:
@@ -87,6 +85,9 @@ class ReplicationStreamer(object):
         if hs.config.server.handle_typing:
             self.streams.append(TypingStream(hs))
 
+        # We always add federation stream
+        self.streams.append(FederationStream(hs))
+
         self.streams_by_name = {stream.NAME: stream for stream in self.streams}
 
         self.notifier.add_replication_callback(self.on_notifier_poke)
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 87ab631fd4..e68b220956 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -256,7 +256,7 @@ class TypingStream(Stream):
 
         self.current_token = typing_handler.get_current_token  # type: ignore
 
-        if hs.config.worker_app is None:
+        if hs.config.handle_typing:
             self.update_function = db_query_to_update_function(typing_handler.get_all_typing_updates)  # type: ignore
         else:
             # Query master process
diff --git a/synapse/replication/tcp/streams/federation.py b/synapse/replication/tcp/streams/federation.py
index 48c1d45718..40951e81c9 100644
--- a/synapse/replication/tcp/streams/federation.py
+++ b/synapse/replication/tcp/streams/federation.py
@@ -15,8 +15,6 @@
 # limitations under the License.
 from collections import namedtuple
 
-from twisted.internet import defer
-
 from synapse.replication.tcp.streams._base import Stream, db_query_to_update_function
 
 
@@ -41,12 +39,8 @@ class FederationStream(Stream):
         # Not all synapse instances will have a federation sender instance,
         # whether that's a `FederationSender` or a `FederationRemoteSendQueue`,
         # so we stub the stream out when that is the case.
-        if hs.config.worker_app is None or hs.should_send_federation():
-            federation_sender = hs.get_federation_sender()
-            self.current_token = federation_sender.get_current_token  # type: ignore
-            self.update_function = db_query_to_update_function(federation_sender.get_replication_rows)  # type: ignore
-        else:
-            self.current_token = lambda: 0  # type: ignore
-            self.update_function = lambda from_token, upto_token, limit: defer.succeed(([], upto_token, bool))  # type: ignore
+        federation_sender = hs.get_federation_sender()
+        self.current_token = federation_sender.get_current_token  # type: ignore
+        self.update_function = db_query_to_update_function(federation_sender.get_replication_rows)  # type: ignore
 
         super(FederationStream, self).__init__(hs)
diff --git a/synapse/server.py b/synapse/server.py
index 8955c32cc4..2136061f1c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -457,10 +457,8 @@ class HomeServer(object):
     def build_federation_sender(self):
         if self.should_send_federation():
             return FederationSender(self)
-        elif not self.config.worker_app:
-            return FederationRemoteSendQueue(self)
         else:
-            raise Exception("Workers cannot send federation traffic")
+            return FederationRemoteSendQueue(self)
 
     def build_receipts_handler(self):
         return ReceiptsHandler(self)