diff --git a/changelog.d/7869.feature b/changelog.d/7869.feature
new file mode 100644
index 0000000000..1982049a52
--- /dev/null
+++ b/changelog.d/7869.feature
@@ -0,0 +1 @@
+Add experimental support for moving typing off master.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index e90695f026..c0853eef22 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -111,6 +111,7 @@ from synapse.rest.client.v1.room import (
RoomSendEventRestServlet,
RoomStateEventRestServlet,
RoomStateRestServlet,
+ RoomTypingRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync, user_directory
@@ -451,37 +452,6 @@ class GenericWorkerPresence(BasePresenceHandler):
await self._bump_active_client(user_id=user_id)
-class GenericWorkerTyping(object):
- def __init__(self, hs):
- self._latest_room_serial = 0
- self._reset()
-
- def _reset(self):
- """
- Reset the typing handler's data caches.
- """
- # map room IDs to serial numbers
- self._room_serials = {}
- # map room IDs to sets of users currently typing
- self._room_typing = {}
-
- def process_replication_rows(self, token, rows):
- if self._latest_room_serial > token:
- # The master has gone backwards. To prevent inconsistent data, just
- # clear everything.
- self._reset()
-
- # Set the latest serial token to whatever the server gave us.
- self._latest_room_serial = token
-
- for row in rows:
- self._room_serials[row.room_id] = token
- self._room_typing[row.room_id] = row.user_ids
-
- def get_current_token(self) -> int:
- return self._latest_room_serial
-
-
class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
# rather than going via the correct worker.
@@ -558,6 +528,7 @@ class GenericWorkerServer(HomeServer):
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
+ RoomTypingRestServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
@@ -669,9 +640,6 @@ class GenericWorkerServer(HomeServer):
def build_presence_handler(self):
return GenericWorkerPresence(self)
- def build_typing_handler(self):
- return GenericWorkerTyping(self)
-
class GenericWorkerReplicationHandler(ReplicationDataHandler):
def __init__(self, hs):
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index dbc661630c..2574cd3aa1 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -34,9 +34,11 @@ class WriterLocations:
Attributes:
events: The instance that writes to the event and backfill streams.
+ events: The instance that writes to the typing stream.
"""
events = attr.ib(default="master", type=str)
+ typing = attr.ib(default="master", type=str)
class WorkerConfig(Config):
@@ -93,16 +95,15 @@ class WorkerConfig(Config):
writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers)
- # Check that the configured writer for events also appears in
+ # Check that the configured writer for events and typing also appears in
# `instance_map`.
- if (
- self.writers.events != "master"
- and self.writers.events not in self.instance_map
- ):
- raise ConfigError(
- "Instance %r is configured to write events but does not appear in `instance_map` config."
- % (self.writers.events,)
- )
+ for stream in ("events", "typing"):
+ instance = getattr(self.writers, stream)
+ if instance != "master" and instance not in self.instance_map:
+ raise ConfigError(
+ "Instance %r is configured to write %s but does not appear in `instance_map` config."
+ % (instance, stream)
+ )
def read_arguments(self, args):
# We support a bunch of command line arguments that override options in
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8c53330c49..23625ba995 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -15,7 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ List,
+ Match,
+ Optional,
+ Tuple,
+ Union,
+)
from canonicaljson import json
from prometheus_client import Counter, Histogram
@@ -56,6 +67,9 @@ from synapse.util import glob_to_regex, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -768,11 +782,30 @@ class FederationHandlerRegistry(object):
query type for incoming federation traffic.
"""
- def __init__(self):
- self.edu_handlers = {}
- self.query_handlers = {}
+ def __init__(self, hs: "HomeServer"):
+ self.config = hs.config
+ self.http_client = hs.get_simple_http_client()
+ self.clock = hs.get_clock()
+ self._instance_name = hs.get_instance_name()
- def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]):
+ # These are safe to load in monolith mode, but will explode if we try
+ # and use them. However we have guards before we use them to ensure that
+ # we don't route to ourselves, and in monolith mode that will always be
+ # the case.
+ self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+ self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+ self.edu_handlers = (
+ {}
+ ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
+ self.query_handlers = {} # type: Dict[str, Callable[[dict], Awaitable[None]]]
+
+ # Map from type to instance name that we should route EDU handling to.
+ self._edu_type_to_instance = {} # type: Dict[str, str]
+
+ def register_edu_handler(
+ self, edu_type: str, handler: Callable[[str, dict], Awaitable[None]]
+ ):
"""Sets the handler callable that will be used to handle an incoming
federation EDU of the given type.
@@ -809,66 +842,56 @@ class FederationHandlerRegistry(object):
self.query_handlers[query_type] = handler
+ def register_instance_for_edu(self, edu_type: str, instance_name: str):
+ """Register that the EDU handler is on a different instance than master.
+ """
+ self._edu_type_to_instance[edu_type] = instance_name
+
async def on_edu(self, edu_type: str, origin: str, content: dict):
+ if not self.config.use_presence and edu_type == "m.presence":
+ return
+
+ # Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
- if not handler:
- logger.warning("No handler registered for EDU type %s", edu_type)
+ if handler:
+ with start_active_span_from_edu(content, "handle_edu"):
+ try:
+ await handler(origin, content)
+ except SynapseError as e:
+ logger.info("Failed to handle edu %r: %r", edu_type, e)
+ except Exception:
+ logger.exception("Failed to handle edu %r", edu_type)
return
- with start_active_span_from_edu(content, "handle_edu"):
+ # Check if we can route it somewhere else that isn't us
+ route_to = self._edu_type_to_instance.get(edu_type, "master")
+ if route_to != self._instance_name:
try:
- await handler(origin, content)
+ await self._send_edu(
+ instance_name=route_to,
+ edu_type=edu_type,
+ origin=origin,
+ content=content,
+ )
except SynapseError as e:
logger.info("Failed to handle edu %r: %r", edu_type, e)
except Exception:
logger.exception("Failed to handle edu %r", edu_type)
-
- def on_query(self, query_type: str, args: dict) -> defer.Deferred:
- handler = self.query_handlers.get(query_type)
- if not handler:
- logger.warning("No handler registered for query type %s", query_type)
- raise NotFoundError("No handler for Query type '%s'" % (query_type,))
-
- return handler(args)
-
-
-class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
- """A FederationHandlerRegistry for worker processes.
-
- When receiving EDU or queries it will check if an appropriate handler has
- been registered on the worker, if there isn't one then it calls off to the
- master process.
- """
-
- def __init__(self, hs):
- self.config = hs.config
- self.http_client = hs.get_simple_http_client()
- self.clock = hs.get_clock()
-
- self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
- self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
-
- super(ReplicationFederationHandlerRegistry, self).__init__()
-
- async def on_edu(self, edu_type: str, origin: str, content: dict):
- """Overrides FederationHandlerRegistry
- """
- if not self.config.use_presence and edu_type == "m.presence":
return
- handler = self.edu_handlers.get(edu_type)
- if handler:
- return await super(ReplicationFederationHandlerRegistry, self).on_edu(
- edu_type, origin, content
- )
-
- return await self._send_edu(edu_type=edu_type, origin=origin, content=content)
+ # Oh well, let's just log and move on.
+ logger.warning("No handler registered for EDU type %s", edu_type)
async def on_query(self, query_type: str, args: dict):
- """Overrides FederationHandlerRegistry
- """
handler = self.query_handlers.get(query_type)
if handler:
return await handler(args)
- return await self._get_query_client(query_type=query_type, args=args)
+ # Check if we can route it somewhere else that isn't us
+ if self._instance_name == "master":
+ return await self._get_query_client(query_type=query_type, args=args)
+
+ # Uh oh, no handler! Let's raise an exception so the request returns an
+ # error.
+ logger.warning("No handler registered for query type %s", query_type)
+ raise NotFoundError("No handler for Query type '%s'" % (query_type,))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 846ddbdc6c..a86ac0150e 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,15 +15,19 @@
import logging
from collections import namedtuple
-from typing import List, Tuple
+from typing import TYPE_CHECKING, List, Set, Tuple
from synapse.api.errors import AuthError, SynapseError
-from synapse.logging.context import run_in_background
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.tcp.streams import TypingStream
from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -39,48 +43,48 @@ FEDERATION_TIMEOUT = 60 * 1000
FEDERATION_PING_INTERVAL = 40 * 1000
-class TypingHandler(object):
- def __init__(self, hs):
+class FollowerTypingHandler:
+ """A typing handler on a different process than the writer that is updated
+ via replication.
+ """
+
+ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self.server_name = hs.config.server_name
- self.auth = hs.get_auth()
- self.is_mine_id = hs.is_mine_id
- self.notifier = hs.get_notifier()
- self.state = hs.get_state_handler()
-
- self.hs = hs
-
self.clock = hs.get_clock()
- self.wheel_timer = WheelTimer(bucket_size=5000)
+ self.is_mine_id = hs.is_mine_id
- self.federation = hs.get_federation_sender()
+ self.federation = None
+ if hs.should_send_federation():
+ self.federation = hs.get_federation_sender()
- hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
+ if hs.config.worker.writers.typing != hs.get_instance_name():
+ hs.get_federation_registry().register_instance_for_edu(
+ "m.typing", hs.config.worker.writers.typing,
+ )
- hs.get_distributor().observe("user_left_room", self.user_left_room)
+ # map room IDs to serial numbers
+ self._room_serials = {}
+ # map room IDs to sets of users currently typing
+ self._room_typing = {}
- self._member_typing_until = {} # clock time we expect to stop
self._member_last_federation_poke = {}
-
+ self.wheel_timer = WheelTimer(bucket_size=5000)
self._latest_room_serial = 0
- self._reset()
-
- # caches which room_ids changed at which serials
- self._typing_stream_change_cache = StreamChangeCache(
- "TypingStreamChangeCache", self._latest_room_serial
- )
self.clock.looping_call(self._handle_timeouts, 5000)
def _reset(self):
- """
- Reset the typing handler's data caches.
+ """Reset the typing handler's data caches.
"""
# map room IDs to serial numbers
self._room_serials = {}
# map room IDs to sets of users currently typing
self._room_typing = {}
+ self._member_last_federation_poke = {}
+ self.wheel_timer = WheelTimer(bucket_size=5000)
+
def _handle_timeouts(self):
logger.debug("Checking for typing timeouts")
@@ -89,30 +93,140 @@ class TypingHandler(object):
members = set(self.wheel_timer.fetch(now))
for member in members:
- if not self.is_typing(member):
- # Nothing to do if they're no longer typing
- continue
-
- until = self._member_typing_until.get(member, None)
- if not until or until <= now:
- logger.info("Timing out typing for: %s", member.user_id)
- self._stopped_typing(member)
- continue
-
- # Check if we need to resend a keep alive over federation for this
- # user.
- if self.hs.is_mine_id(member.user_id):
- last_fed_poke = self._member_last_federation_poke.get(member, None)
- if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
- run_in_background(self._push_remote, member=member, typing=True)
-
- # Add a paranoia timer to ensure that we always have a timer for
- # each person typing.
- self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
+ self._handle_timeout_for_member(now, member)
+
+ def _handle_timeout_for_member(self, now: int, member: RoomMember):
+ if not self.is_typing(member):
+ # Nothing to do if they're no longer typing
+ return
+
+ # Check if we need to resend a keep alive over federation for this
+ # user.
+ if self.federation and self.is_mine_id(member.user_id):
+ last_fed_poke = self._member_last_federation_poke.get(member, None)
+ if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
+ run_as_background_process(
+ "typing._push_remote", self._push_remote, member=member, typing=True
+ )
+
+ # Add a paranoia timer to ensure that we always have a timer for
+ # each person typing.
+ self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
def is_typing(self, member):
return member.user_id in self._room_typing.get(member.room_id, [])
+ async def _push_remote(self, member, typing):
+ if not self.federation:
+ return
+
+ try:
+ users = await self.store.get_users_in_room(member.room_id)
+ self._member_last_federation_poke[member] = self.clock.time_msec()
+
+ now = self.clock.time_msec()
+ self.wheel_timer.insert(
+ now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
+ )
+
+ for domain in {get_domain_from_id(u) for u in users}:
+ if domain != self.server_name:
+ logger.debug("sending typing update to %s", domain)
+ self.federation.build_and_send_edu(
+ destination=domain,
+ edu_type="m.typing",
+ content={
+ "room_id": member.room_id,
+ "user_id": member.user_id,
+ "typing": typing,
+ },
+ key=member,
+ )
+ except Exception:
+ logger.exception("Error pushing typing notif to remotes")
+
+ def process_replication_rows(
+ self, token: int, rows: List[TypingStream.TypingStreamRow]
+ ):
+ """Should be called whenever we receive updates for typing stream.
+ """
+
+ if self._latest_room_serial > token:
+ # The master has gone backwards. To prevent inconsistent data, just
+ # clear everything.
+ self._reset()
+
+ # Set the latest serial token to whatever the server gave us.
+ self._latest_room_serial = token
+
+ for row in rows:
+ self._room_serials[row.room_id] = token
+
+ prev_typing = set(self._room_typing.get(row.room_id, []))
+ now_typing = set(row.user_ids)
+ self._room_typing[row.room_id] = row.user_ids
+
+ run_as_background_process(
+ "_handle_change_in_typing",
+ self._handle_change_in_typing,
+ row.room_id,
+ prev_typing,
+ now_typing,
+ )
+
+ async def _handle_change_in_typing(
+ self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
+ ):
+ """Process a change in typing of a room from replication, sending EDUs
+ for any local users.
+ """
+ for user_id in now_typing - prev_typing:
+ if self.is_mine_id(user_id):
+ await self._push_remote(RoomMember(room_id, user_id), True)
+
+ for user_id in prev_typing - now_typing:
+ if self.is_mine_id(user_id):
+ await self._push_remote(RoomMember(room_id, user_id), False)
+
+ def get_current_token(self):
+ return self._latest_room_serial
+
+
+class TypingWriterHandler(FollowerTypingHandler):
+ def __init__(self, hs):
+ super().__init__(hs)
+
+ assert hs.config.worker.writers.typing == hs.get_instance_name()
+
+ self.auth = hs.get_auth()
+ self.notifier = hs.get_notifier()
+
+ self.hs = hs
+
+ hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
+
+ hs.get_distributor().observe("user_left_room", self.user_left_room)
+
+ self._member_typing_until = {} # clock time we expect to stop
+
+ # caches which room_ids changed at which serials
+ self._typing_stream_change_cache = StreamChangeCache(
+ "TypingStreamChangeCache", self._latest_room_serial
+ )
+
+ def _handle_timeout_for_member(self, now: int, member: RoomMember):
+ super()._handle_timeout_for_member(now, member)
+
+ if not self.is_typing(member):
+ # Nothing to do if they're no longer typing
+ return
+
+ until = self._member_typing_until.get(member, None)
+ if not until or until <= now:
+ logger.info("Timing out typing for: %s", member.user_id)
+ self._stopped_typing(member)
+ return
+
async def started_typing(self, target_user, auth_user, room_id, timeout):
target_user_id = target_user.to_string()
auth_user_id = auth_user.to_string()
@@ -179,35 +293,11 @@ class TypingHandler(object):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
- run_in_background(self._push_remote, member, typing)
-
- self._push_update_local(member=member, typing=typing)
-
- async def _push_remote(self, member, typing):
- try:
- users = await self.store.get_users_in_room(member.room_id)
- self._member_last_federation_poke[member] = self.clock.time_msec()
-
- now = self.clock.time_msec()
- self.wheel_timer.insert(
- now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
+ run_as_background_process(
+ "typing._push_remote", self._push_remote, member, typing
)
- for domain in {get_domain_from_id(u) for u in users}:
- if domain != self.server_name:
- logger.debug("sending typing update to %s", domain)
- self.federation.build_and_send_edu(
- destination=domain,
- edu_type="m.typing",
- content={
- "room_id": member.room_id,
- "user_id": member.user_id,
- "typing": typing,
- },
- key=member,
- )
- except Exception:
- logger.exception("Error pushing typing notif to remotes")
+ self._push_update_local(member=member, typing=typing)
async def _recv_edu(self, origin, content):
room_id = content["room_id"]
@@ -304,8 +394,11 @@ class TypingHandler(object):
return rows, current_id, limited
- def get_current_token(self):
- return self._latest_room_serial
+ def process_replication_rows(
+ self, token: int, rows: List[TypingStream.TypingStreamRow]
+ ):
+ # The writing process should never get updates from replication.
+ raise Exception("Typing writer instance got typing info over replication")
class TypingNotificationEventSource(object):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 80f5df60f9..30d8de48fa 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams import (
EventsStream,
FederationStream,
Stream,
+ TypingStream,
)
from synapse.util.async_helpers import Linearizer
@@ -96,6 +97,14 @@ class ReplicationCommandHandler:
continue
+ if isinstance(stream, TypingStream):
+ # Only add TypingStream as a source on the instance in charge of
+ # typing.
+ if hs.config.worker.writers.typing == hs.get_instance_name():
+ self._streams_to_replicate.append(stream)
+
+ continue
+
# Only add any other streams if we're on master.
if hs.config.worker_app is not None:
continue
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 9076bbe9f1..7a42de3f7d 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -294,11 +294,12 @@ class TypingStream(Stream):
def __init__(self, hs):
typing_handler = hs.get_typing_handler()
- if hs.config.worker_app is None:
- # on the master, query the typing handler
+ writer_instance = hs.config.worker.writers.typing
+ if writer_instance == hs.get_instance_name():
+ # On the writer, query the typing handler
update_function = typing_handler.get_all_typing_updates
else:
- # Query master process
+ # Query the typing writer process
update_function = make_http_update_function(hs, self.NAME)
super().__init__(
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index ea5912d4e4..26d5a51cb2 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -817,9 +817,18 @@ class RoomTypingRestServlet(RestServlet):
self.typing_handler = hs.get_typing_handler()
self.auth = hs.get_auth()
+ # If we're not on the typing writer instance we should scream if we get
+ # requests.
+ self._is_typing_writer = (
+ hs.config.worker.writers.typing == hs.get_instance_name()
+ )
+
async def on_PUT(self, request, room_id, user_id):
requester = await self.auth.get_user_by_req(request)
+ if not self._is_typing_writer:
+ raise Exception("Got /typing request on instance that is not typing writer")
+
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))
diff --git a/synapse/server.py b/synapse/server.py
index 0e6ea96b33..8e41112530 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -44,7 +44,6 @@ from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
FederationHandlerRegistry,
FederationServer,
- ReplicationFederationHandlerRegistry,
)
from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import FederationSender
@@ -84,7 +83,7 @@ from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
-from synapse.handlers.typing import TypingHandler
+from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@@ -378,7 +377,10 @@ class HomeServer(object):
return PresenceHandler(self)
def build_typing_handler(self):
- return TypingHandler(self)
+ if self.config.worker.writers.typing == self.get_instance_name():
+ return TypingWriterHandler(self)
+ else:
+ return FollowerTypingHandler(self)
def build_sync_handler(self):
return SyncHandler(self)
@@ -534,10 +536,7 @@ class HomeServer(object):
return RoomMemberMasterHandler(self)
def build_federation_registry(self):
- if self.config.worker_app:
- return ReplicationFederationHandlerRegistry(self)
- else:
- return FederationHandlerRegistry()
+ return FederationHandlerRegistry(self)
def build_server_notices_manager(self):
if self.config.worker_app:
diff --git a/synapse/server.pyi b/synapse/server.pyi
index cd50c721b8..90a673778f 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -148,3 +148,5 @@ class HomeServer(object):
self,
) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient:
pass
+ def should_send_federation(self) -> bool:
+ pass
|