summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-16 15:12:54 +0100
committerGitHub <noreply@github.com>2020-07-16 15:12:54 +0100
commitf2e38ca86711a8f80cf45d3182e426ed8967fc81 (patch)
tree5a46223ed7b3e50f018d96a09776b7e442619377 /synapse
parentAdd ability to run multiple pusher instances (#7855) (diff)
downloadsynapse-f2e38ca86711a8f80cf45d3182e426ed8967fc81.tar.xz
Allow moving typing off master (#7869)
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/generic_worker.py36
-rw-r--r--synapse/config/workers.py19
-rw-r--r--synapse/federation/federation_server.py125
-rw-r--r--synapse/handlers/typing.py241
-rw-r--r--synapse/replication/tcp/handler.py9
-rw-r--r--synapse/replication/tcp/streams/_base.py7
-rw-r--r--synapse/rest/client/v1/room.py9
-rw-r--r--synapse/server.py13
-rw-r--r--synapse/server.pyi2
9 files changed, 283 insertions, 178 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index e90695f026..c0853eef22 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -111,6 +111,7 @@ from synapse.rest.client.v1.room import (
     RoomSendEventRestServlet,
     RoomStateEventRestServlet,
     RoomStateRestServlet,
+    RoomTypingRestServlet,
 )
 from synapse.rest.client.v1.voip import VoipRestServlet
 from synapse.rest.client.v2_alpha import groups, sync, user_directory
@@ -451,37 +452,6 @@ class GenericWorkerPresence(BasePresenceHandler):
         await self._bump_active_client(user_id=user_id)
 
 
-class GenericWorkerTyping(object):
-    def __init__(self, hs):
-        self._latest_room_serial = 0
-        self._reset()
-
-    def _reset(self):
-        """
-        Reset the typing handler's data caches.
-        """
-        # map room IDs to serial numbers
-        self._room_serials = {}
-        # map room IDs to sets of users currently typing
-        self._room_typing = {}
-
-    def process_replication_rows(self, token, rows):
-        if self._latest_room_serial > token:
-            # The master has gone backwards. To prevent inconsistent data, just
-            # clear everything.
-            self._reset()
-
-        # Set the latest serial token to whatever the server gave us.
-        self._latest_room_serial = token
-
-        for row in rows:
-            self._room_serials[row.room_id] = token
-            self._room_typing[row.room_id] = row.user_ids
-
-    def get_current_token(self) -> int:
-        return self._latest_room_serial
-
-
 class GenericWorkerSlavedStore(
     # FIXME(#3714): We need to add UserDirectoryStore as we write directly
     # rather than going via the correct worker.
@@ -558,6 +528,7 @@ class GenericWorkerServer(HomeServer):
                     KeyUploadServlet(self).register(resource)
                     AccountDataServlet(self).register(resource)
                     RoomAccountDataServlet(self).register(resource)
+                    RoomTypingRestServlet(self).register(resource)
 
                     sync.register_servlets(self, resource)
                     events.register_servlets(self, resource)
@@ -669,9 +640,6 @@ class GenericWorkerServer(HomeServer):
     def build_presence_handler(self):
         return GenericWorkerPresence(self)
 
-    def build_typing_handler(self):
-        return GenericWorkerTyping(self)
-
 
 class GenericWorkerReplicationHandler(ReplicationDataHandler):
     def __init__(self, hs):
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index dbc661630c..2574cd3aa1 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -34,9 +34,11 @@ class WriterLocations:
 
     Attributes:
         events: The instance that writes to the event and backfill streams.
+        events: The instance that writes to the typing stream.
     """
 
     events = attr.ib(default="master", type=str)
+    typing = attr.ib(default="master", type=str)
 
 
 class WorkerConfig(Config):
@@ -93,16 +95,15 @@ class WorkerConfig(Config):
         writers = config.get("stream_writers") or {}
         self.writers = WriterLocations(**writers)
 
-        # Check that the configured writer for events also appears in
+        # Check that the configured writer for events and typing also appears in
         # `instance_map`.
-        if (
-            self.writers.events != "master"
-            and self.writers.events not in self.instance_map
-        ):
-            raise ConfigError(
-                "Instance %r is configured to write events but does not appear in `instance_map` config."
-                % (self.writers.events,)
-            )
+        for stream in ("events", "typing"):
+            instance = getattr(self.writers, stream)
+            if instance != "master" and instance not in self.instance_map:
+                raise ConfigError(
+                    "Instance %r is configured to write %s but does not appear in `instance_map` config."
+                    % (instance, stream)
+                )
 
     def read_arguments(self, args):
         # We support a bunch of command line arguments that override options in
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8c53330c49..23625ba995 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -15,7 +15,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Match,
+    Optional,
+    Tuple,
+    Union,
+)
 
 from canonicaljson import json
 from prometheus_client import Counter, Histogram
@@ -56,6 +67,9 @@ from synapse.util import glob_to_regex, unwrapFirstError
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -768,11 +782,30 @@ class FederationHandlerRegistry(object):
     query type for incoming federation traffic.
     """
 
-    def __init__(self):
-        self.edu_handlers = {}
-        self.query_handlers = {}
+    def __init__(self, hs: "HomeServer"):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+        self._instance_name = hs.get_instance_name()
 
-    def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]):
+        # These are safe to load in monolith mode, but will explode if we try
+        # and use them. However we have guards before we use them to ensure that
+        # we don't route to ourselves, and in monolith mode that will always be
+        # the case.
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        self.edu_handlers = (
+            {}
+        )  # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
+        self.query_handlers = {}  # type: Dict[str, Callable[[dict], Awaitable[None]]]
+
+        # Map from type to instance name that we should route EDU handling to.
+        self._edu_type_to_instance = {}  # type: Dict[str, str]
+
+    def register_edu_handler(
+        self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]]
+    ):
         """Sets the handler callable that will be used to handle an incoming
         federation EDU of the given type.
 
@@ -809,66 +842,56 @@ class FederationHandlerRegistry(object):
 
         self.query_handlers[query_type] = handler
 
+    def register_instance_for_edu(self, edu_type: str, instance_name: str):
+        """Register that the EDU handler is on a different instance than master.
+        """
+        self._edu_type_to_instance[edu_type] = instance_name
+
     async def on_edu(self, edu_type: str, origin: str, content: dict):
+        if not self.config.use_presence and edu_type == "m.presence":
+            return
+
+        # Check if we have a handler on this instance
         handler = self.edu_handlers.get(edu_type)
-        if not handler:
-            logger.warning("No handler registered for EDU type %s", edu_type)
+        if handler:
+            with start_active_span_from_edu(content, "handle_edu"):
+                try:
+                    await handler(origin, content)
+                except SynapseError as e:
+                    logger.info("Failed to handle edu %r: %r", edu_type, e)
+                except Exception:
+                    logger.exception("Failed to handle edu %r", edu_type)
             return
 
-        with start_active_span_from_edu(content, "handle_edu"):
+        # Check if we can route it somewhere else that isn't us
+        route_to = self._edu_type_to_instance.get(edu_type, "master")
+        if route_to != self._instance_name:
             try:
-                await handler(origin, content)
+                await self._send_edu(
+                    instance_name=route_to,
+                    edu_type=edu_type,
+                    origin=origin,
+                    content=content,
+                )
             except SynapseError as e:
                 logger.info("Failed to handle edu %r: %r", edu_type, e)
             except Exception:
                 logger.exception("Failed to handle edu %r", edu_type)
-
-    def on_query(self, query_type: str, args: dict) -> defer.Deferred:
-        handler = self.query_handlers.get(query_type)
-        if not handler:
-            logger.warning("No handler registered for query type %s", query_type)
-            raise NotFoundError("No handler for Query type '%s'" % (query_type,))
-
-        return handler(args)
-
-
-class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
-    """A FederationHandlerRegistry for worker processes.
-
-    When receiving EDU or queries it will check if an appropriate handler has
-    been registered on the worker, if there isn't one then it calls off to the
-    master process.
-    """
-
-    def __init__(self, hs):
-        self.config = hs.config
-        self.http_client = hs.get_simple_http_client()
-        self.clock = hs.get_clock()
-
-        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
-        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
-
-        super(ReplicationFederationHandlerRegistry, self).__init__()
-
-    async def on_edu(self, edu_type: str, origin: str, content: dict):
-        """Overrides FederationHandlerRegistry
-        """
-        if not self.config.use_presence and edu_type == "m.presence":
             return
 
-        handler = self.edu_handlers.get(edu_type)
-        if handler:
-            return await super(ReplicationFederationHandlerRegistry, self).on_edu(
-                edu_type, origin, content
-            )
-
-        return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
+        # Oh well, let's just log and move on.
+        logger.warning("No handler registered for EDU type %s", edu_type)
 
     async def on_query(self, query_type: str, args: dict):
-        """Overrides FederationHandlerRegistry
-        """
         handler = self.query_handlers.get(query_type)
         if handler:
             return await handler(args)
 
-        return await self._get_query_client(query_type=query_type, args=args)
+        # Check if we can route it somewhere else that isn't us
+        if self._instance_name == "master":
+            return await self._get_query_client(query_type=query_type, args=args)
+
+        # Uh oh, no handler! Let's raise an exception so the request returns an
+        # error.
+        logger.warning("No handler registered for query type %s", query_type)
+        raise NotFoundError("No handler for Query type '%s'" % (query_type,))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 846ddbdc6c..a86ac0150e 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,15 +15,19 @@
 
 import logging
 from collections import namedtuple
-from typing import List, Tuple
+from typing import TYPE_CHECKING, List, Set, Tuple
 
 from synapse.api.errors import AuthError, SynapseError
-from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.streams import TypingStream
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -39,48 +43,48 @@ FEDERATION_TIMEOUT = 60 * 1000
 FEDERATION_PING_INTERVAL = 40 * 1000
 
 
-class TypingHandler(object):
-    def __init__(self, hs):
+class FollowerTypingHandler:
+    """A typing handler on a different process than the writer that is updated
+    via replication.
+    """
+
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
         self.server_name = hs.config.server_name
-        self.auth = hs.get_auth()
-        self.is_mine_id = hs.is_mine_id
-        self.notifier = hs.get_notifier()
-        self.state = hs.get_state_handler()
-
-        self.hs = hs
-
         self.clock = hs.get_clock()
-        self.wheel_timer = WheelTimer(bucket_size=5000)
+        self.is_mine_id = hs.is_mine_id
 
-        self.federation = hs.get_federation_sender()
+        self.federation = None
+        if hs.should_send_federation():
+            self.federation = hs.get_federation_sender()
 
-        hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
+        if hs.config.worker.writers.typing != hs.get_instance_name():
+            hs.get_federation_registry().register_instance_for_edu(
+                "m.typing", hs.config.worker.writers.typing,
+            )
 
-        hs.get_distributor().observe("user_left_room", self.user_left_room)
+        # map room IDs to serial numbers
+        self._room_serials = {}
+        # map room IDs to sets of users currently typing
+        self._room_typing = {}
 
-        self._member_typing_until = {}  # clock time we expect to stop
         self._member_last_federation_poke = {}
-
+        self.wheel_timer = WheelTimer(bucket_size=5000)
         self._latest_room_serial = 0
-        self._reset()
-
-        # caches which room_ids changed at which serials
-        self._typing_stream_change_cache = StreamChangeCache(
-            "TypingStreamChangeCache", self._latest_room_serial
-        )
 
         self.clock.looping_call(self._handle_timeouts, 5000)
 
     def _reset(self):
-        """
-        Reset the typing handler's data caches.
+        """Reset the typing handler's data caches.
         """
         # map room IDs to serial numbers
         self._room_serials = {}
         # map room IDs to sets of users currently typing
         self._room_typing = {}
 
+        self._member_last_federation_poke = {}
+        self.wheel_timer = WheelTimer(bucket_size=5000)
+
     def _handle_timeouts(self):
         logger.debug("Checking for typing timeouts")
 
@@ -89,30 +93,140 @@ class TypingHandler(object):
         members = set(self.wheel_timer.fetch(now))
 
         for member in members:
-            if not self.is_typing(member):
-                # Nothing to do if they're no longer typing
-                continue
-
-            until = self._member_typing_until.get(member, None)
-            if not until or until <= now:
-                logger.info("Timing out typing for: %s", member.user_id)
-                self._stopped_typing(member)
-                continue
-
-            # Check if we need to resend a keep alive over federation for this
-            # user.
-            if self.hs.is_mine_id(member.user_id):
-                last_fed_poke = self._member_last_federation_poke.get(member, None)
-                if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
-                    run_in_background(self._push_remote, member=member, typing=True)
-
-            # Add a paranoia timer to ensure that we always have a timer for
-            # each person typing.
-            self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
+            self._handle_timeout_for_member(now, member)
+
+    def _handle_timeout_for_member(self, now: int, member: RoomMember):
+        if not self.is_typing(member):
+            # Nothing to do if they're no longer typing
+            return
+
+        # Check if we need to resend a keep alive over federation for this
+        # user.
+        if self.federation and self.is_mine_id(member.user_id):
+            last_fed_poke = self._member_last_federation_poke.get(member, None)
+            if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
+                run_as_background_process(
+                    "typing._push_remote", self._push_remote, member=member, typing=True
+                )
+
+        # Add a paranoia timer to ensure that we always have a timer for
+        # each person typing.
+        self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
 
     def is_typing(self, member):
         return member.user_id in self._room_typing.get(member.room_id, [])
 
+    async def _push_remote(self, member, typing):
+        if not self.federation:
+            return
+
+        try:
+            users = await self.store.get_users_in_room(member.room_id)
+            self._member_last_federation_poke[member] = self.clock.time_msec()
+
+            now = self.clock.time_msec()
+            self.wheel_timer.insert(
+                now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
+            )
+
+            for domain in {get_domain_from_id(u) for u in users}:
+                if domain != self.server_name:
+                    logger.debug("sending typing update to %s", domain)
+                    self.federation.build_and_send_edu(
+                        destination=domain,
+                        edu_type="m.typing",
+                        content={
+                            "room_id": member.room_id,
+                            "user_id": member.user_id,
+                            "typing": typing,
+                        },
+                        key=member,
+                    )
+        except Exception:
+            logger.exception("Error pushing typing notif to remotes")
+
+    def process_replication_rows(
+        self, token: int, rows: List[TypingStream.TypingStreamRow]
+    ):
+        """Should be called whenever we receive updates for typing stream.
+        """
+
+        if self._latest_room_serial > token:
+            # The master has gone backwards. To prevent inconsistent data, just
+            # clear everything.
+            self._reset()
+
+        # Set the latest serial token to whatever the server gave us.
+        self._latest_room_serial = token
+
+        for row in rows:
+            self._room_serials[row.room_id] = token
+
+            prev_typing = set(self._room_typing.get(row.room_id, []))
+            now_typing = set(row.user_ids)
+            self._room_typing[row.room_id] = row.user_ids
+
+            run_as_background_process(
+                "_handle_change_in_typing",
+                self._handle_change_in_typing,
+                row.room_id,
+                prev_typing,
+                now_typing,
+            )
+
+    async def _handle_change_in_typing(
+        self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
+    ):
+        """Process a change in typing of a room from replication, sending EDUs
+        for any local users.
+        """
+        for user_id in now_typing - prev_typing:
+            if self.is_mine_id(user_id):
+                await self._push_remote(RoomMember(room_id, user_id), True)
+
+        for user_id in prev_typing - now_typing:
+            if self.is_mine_id(user_id):
+                await self._push_remote(RoomMember(room_id, user_id), False)
+
+    def get_current_token(self):
+        return self._latest_room_serial
+
+
+class TypingWriterHandler(FollowerTypingHandler):
+    def __init__(self, hs):
+        super().__init__(hs)
+
+        assert hs.config.worker.writers.typing == hs.get_instance_name()
+
+        self.auth = hs.get_auth()
+        self.notifier = hs.get_notifier()
+
+        self.hs = hs
+
+        hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
+
+        hs.get_distributor().observe("user_left_room", self.user_left_room)
+
+        self._member_typing_until = {}  # clock time we expect to stop
+
+        # caches which room_ids changed at which serials
+        self._typing_stream_change_cache = StreamChangeCache(
+            "TypingStreamChangeCache", self._latest_room_serial
+        )
+
+    def _handle_timeout_for_member(self, now: int, member: RoomMember):
+        super()._handle_timeout_for_member(now, member)
+
+        if not self.is_typing(member):
+            # Nothing to do if they're no longer typing
+            return
+
+        until = self._member_typing_until.get(member, None)
+        if not until or until <= now:
+            logger.info("Timing out typing for: %s", member.user_id)
+            self._stopped_typing(member)
+            return
+
     async def started_typing(self, target_user, auth_user, room_id, timeout):
         target_user_id = target_user.to_string()
         auth_user_id = auth_user.to_string()
@@ -179,35 +293,11 @@ class TypingHandler(object):
     def _push_update(self, member, typing):
         if self.hs.is_mine_id(member.user_id):
             # Only send updates for changes to our own users.
-            run_in_background(self._push_remote, member, typing)
-
-        self._push_update_local(member=member, typing=typing)
-
-    async def _push_remote(self, member, typing):
-        try:
-            users = await self.store.get_users_in_room(member.room_id)
-            self._member_last_federation_poke[member] = self.clock.time_msec()
-
-            now = self.clock.time_msec()
-            self.wheel_timer.insert(
-                now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
+            run_as_background_process(
+                "typing._push_remote", self._push_remote, member, typing
             )
 
-            for domain in {get_domain_from_id(u) for u in users}:
-                if domain != self.server_name:
-                    logger.debug("sending typing update to %s", domain)
-                    self.federation.build_and_send_edu(
-                        destination=domain,
-                        edu_type="m.typing",
-                        content={
-                            "room_id": member.room_id,
-                            "user_id": member.user_id,
-                            "typing": typing,
-                        },
-                        key=member,
-                    )
-        except Exception:
-            logger.exception("Error pushing typing notif to remotes")
+        self._push_update_local(member=member, typing=typing)
 
     async def _recv_edu(self, origin, content):
         room_id = content["room_id"]
@@ -304,8 +394,11 @@ class TypingHandler(object):
 
         return rows, current_id, limited
 
-    def get_current_token(self):
-        return self._latest_room_serial
+    def process_replication_rows(
+        self, token: int, rows: List[TypingStream.TypingStreamRow]
+    ):
+        # The writing process should never get updates from replication.
+        raise Exception("Typing writer instance got typing info over replication")
 
 
 class TypingNotificationEventSource(object):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 80f5df60f9..30d8de48fa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams import (
     EventsStream,
     FederationStream,
     Stream,
+    TypingStream,
 )
 from synapse.util.async_helpers import Linearizer
 
@@ -96,6 +97,14 @@ class ReplicationCommandHandler:
 
                 continue
 
+            if isinstance(stream, TypingStream):
+                # Only add TypingStream as a source on the instance in charge of
+                # typing.
+                if hs.config.worker.writers.typing == hs.get_instance_name():
+                    self._streams_to_replicate.append(stream)
+
+                continue
+
             # Only add any other streams if we're on master.
             if hs.config.worker_app is not None:
                 continue
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 9076bbe9f1..7a42de3f7d 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -294,11 +294,12 @@ class TypingStream(Stream):
     def __init__(self, hs):
         typing_handler = hs.get_typing_handler()
 
-        if hs.config.worker_app is None:
-            # on the master, query the typing handler
+        writer_instance = hs.config.worker.writers.typing
+        if writer_instance == hs.get_instance_name():
+            # On the writer, query the typing handler
             update_function = typing_handler.get_all_typing_updates
         else:
-            # Query master process
+            # Query the typing writer process
             update_function = make_http_update_function(hs, self.NAME)
 
         super().__init__(
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index ea5912d4e4..26d5a51cb2 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -817,9 +817,18 @@ class RoomTypingRestServlet(RestServlet):
         self.typing_handler = hs.get_typing_handler()
         self.auth = hs.get_auth()
 
+        # If we're not on the typing writer instance we should scream if we get
+        # requests.
+        self._is_typing_writer = (
+            hs.config.worker.writers.typing == hs.get_instance_name()
+        )
+
     async def on_PUT(self, request, room_id, user_id):
         requester = await self.auth.get_user_by_req(request)
 
+        if not self._is_typing_writer:
+            raise Exception("Got /typing request on instance that is not typing writer")
+
         room_id = urlparse.unquote(room_id)
         target_user = UserID.from_string(urlparse.unquote(user_id))
 
diff --git a/synapse/server.py b/synapse/server.py
index 0e6ea96b33..8e41112530 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -44,7 +44,6 @@ from synapse.federation.federation_client import FederationClient
 from synapse.federation.federation_server import (
     FederationHandlerRegistry,
     FederationServer,
-    ReplicationFederationHandlerRegistry,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.sender import FederationSender
@@ -84,7 +83,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
 from synapse.handlers.set_password import SetPasswordHandler
 from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
-from synapse.handlers.typing import TypingHandler
+from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
 from synapse.handlers.user_directory import UserDirectoryHandler
 from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@@ -378,7 +377,10 @@ class HomeServer(object):
         return PresenceHandler(self)
 
     def build_typing_handler(self):
-        return TypingHandler(self)
+        if self.config.worker.writers.typing == self.get_instance_name():
+            return TypingWriterHandler(self)
+        else:
+            return FollowerTypingHandler(self)
 
     def build_sync_handler(self):
         return SyncHandler(self)
@@ -534,10 +536,7 @@ class HomeServer(object):
         return RoomMemberMasterHandler(self)
 
     def build_federation_registry(self):
-        if self.config.worker_app:
-            return ReplicationFederationHandlerRegistry(self)
-        else:
-            return FederationHandlerRegistry()
+        return FederationHandlerRegistry(self)
 
     def build_server_notices_manager(self):
         if self.config.worker_app:
diff --git a/synapse/server.pyi b/synapse/server.pyi
index cd50c721b8..90a673778f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -148,3 +148,5 @@ class HomeServer(object):
         self,
     ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient:
         pass
+    def should_send_federation(self) -> bool:
+        pass