diff --git a/changelog.d/11237.misc b/changelog.d/11237.misc
new file mode 100644
index 0000000000..b90efc6535
--- /dev/null
+++ b/changelog.d/11237.misc
@@ -0,0 +1 @@
+Allow `stream_writers.typing` config to be a list of one worker.
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 462630201d..4507992031 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -63,7 +63,8 @@ class WriterLocations:
Attributes:
events: The instances that write to the event and backfill streams.
- typing: The instance that writes to the typing stream.
+ typing: The instances that write to the typing stream. Currently
+ can only be a single instance.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
@@ -75,9 +76,15 @@ class WriterLocations:
"""
events = attr.ib(
- default=["master"], type=List[str], converter=_instance_to_list_converter
+ default=["master"],
+ type=List[str],
+ converter=_instance_to_list_converter,
+ )
+ typing = attr.ib(
+ default=["master"],
+ type=List[str],
+ converter=_instance_to_list_converter,
)
- typing = attr.ib(default="master", type=str)
to_device = attr.ib(
default=["master"],
type=List[str],
@@ -217,6 +224,11 @@ class WorkerConfig(Config):
% (instance, stream)
)
+ if len(self.writers.typing) != 1:
+ raise ConfigError(
+ "Must only specify one instance to handle `typing` messages."
+ )
+
if len(self.writers.to_device) != 1:
raise ConfigError(
"Must only specify one instance to handle `to_device` messages."
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a75993d9..42e3acecb4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1232,10 +1232,6 @@ class FederationHandlerRegistry:
self.query_handlers[query_type] = handler
- def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
- """Register that the EDU handler is on a different instance than master."""
- self._edu_type_to_instance[edu_type] = [instance_name]
-
def register_instances_for_edu(
self, edu_type: str, instance_names: List[str]
) -> None:
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c411d69924..22c6174821 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -62,8 +62,8 @@ class FollowerTypingHandler:
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
- if hs.config.worker.writers.typing != hs.get_instance_name():
- hs.get_federation_registry().register_instance_for_edu(
+ if hs.get_instance_name() not in hs.config.worker.writers.typing:
+ hs.get_federation_registry().register_instances_for_edu(
"m.typing",
hs.config.worker.writers.typing,
)
@@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- assert hs.config.worker.writers.typing == hs.get_instance_name()
+ assert hs.get_instance_name() in hs.config.worker.writers.typing
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 06fd06fdf3..21293038ef 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -138,7 +138,7 @@ class ReplicationCommandHandler:
if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
- if hs.config.worker.writers.typing == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.typing:
self._streams_to_replicate.append(stream)
continue
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index c8b188ae4e..743a01da08 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -328,8 +328,7 @@ class TypingStream(Stream):
ROW_TYPE = TypingStreamRow
def __init__(self, hs: "HomeServer"):
- writer_instance = hs.config.worker.writers.typing
- if writer_instance == hs.get_instance_name():
+ if hs.get_instance_name() in hs.config.worker.writers.typing:
# On the writer, query the typing handler
typing_writer_handler = hs.get_typing_writer_handler()
update_function: Callable[
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index ed95189b6d..6a876cfa2f 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -914,7 +914,7 @@ class RoomTypingRestServlet(RestServlet):
# If we're not on the typing writer instance we should scream if we get
# requests.
self._is_typing_writer = (
- hs.config.worker.writers.typing == hs.get_instance_name()
+ hs.get_instance_name() in hs.config.worker.writers.typing
)
async def on_PUT(
diff --git a/synapse/server.py b/synapse/server.py
index 0fbf36ba99..013a7bacaa 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -463,7 +463,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_typing_writer_handler(self) -> TypingWriterHandler:
- if self.config.worker.writers.typing == self.get_instance_name():
+ if self.get_instance_name() in self.config.worker.writers.typing:
return TypingWriterHandler(self)
else:
raise Exception("Workers cannot write typing")
@@ -474,7 +474,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_typing_handler(self) -> FollowerTypingHandler:
- if self.config.worker.writers.typing == self.get_instance_name():
+ if self.get_instance_name() in self.config.worker.writers.typing:
# Use get_typing_writer_handler to ensure that we use the same
# cached version.
return self.get_typing_writer_handler()
|