diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 0cb651a400..70e3aee4ce 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -22,6 +22,7 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process,
wrap_as_background_process,
)
+from synapse.replication.http.typing import ReplicationTypingRestServlet
from synapse.replication.tcp.streams import TypingStream
from synapse.types import JsonDict, Requester, UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -61,7 +62,9 @@ class FollowerTypingHandler:
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
- if hs.config.worker.writers.typing != hs.get_instance_name():
+ self._typing_repl_client = ReplicationTypingRestServlet.make_client(hs)
+ self._typing_worker = hs.config.worker.writers.typing
+ if self._typing_worker != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(
"m.typing",
hs.config.worker.writers.typing,
@@ -199,6 +202,30 @@ class FollowerTypingHandler:
def get_current_token(self) -> int:
return self._latest_room_serial
+ async def started_typing(
+ self, target_user: UserID, requester: Requester, room_id: str, timeout: int
+ ) -> None:
+ await self._typing_repl_client(
+ typing=True,
+ instance_name=self._typing_worker,
+ user_id=target_user.to_string(),
+ requester=requester,
+ room_id=room_id,
+ timeout=timeout,
+ )
+
+ async def stopped_typing(
+ self, target_user: UserID, requester: Requester, room_id: str
+ ) -> None:
+ await self._typing_repl_client(
+ typing=True,
+ instance_name=self._typing_worker,
+ user_id=target_user.to_string(),
+ requester=requester,
+ room_id=room_id,
+ timeout=None,
+ )
+
class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index ba8114ac9e..de17af748b 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -24,6 +24,7 @@ from synapse.replication.http import (
register,
send_event,
streams,
+ typing,
)
REPLICATION_PREFIX = "/_synapse/replication"
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
streams.register_servlets(hs, self)
account_data.register_servlets(hs, self)
push.register_servlets(hs, self)
+ typing.register_servlets(hs, self)
# The following can't currently be instantiated on workers.
if hs.config.worker.worker_app is None:
diff --git a/synapse/replication/http/typing.py b/synapse/replication/http/typing.py
new file mode 100644
index 0000000000..da8e54da3e
--- /dev/null
+++ b/synapse/replication/http/typing.py
@@ -0,0 +1,89 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.types import Requester, UserID
+from typing import TYPE_CHECKING
+import logging
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationTypingRestServlet(ReplicationEndpoint):
+ """Call to start or stop a user typing in a room.
+
+ Request format:
+
+ POST /_synapse/replication/typing/:room_id/:user_id
+
+ {
+ "requester": ...,
+ "typing": true,
+ "timeout": 30000
+ }
+
+ """
+
+ NAME = "typing"
+ PATH_ARGS = ("room_id", "user_id")
+ CACHE = False
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+
+ self.handler = hs.get_typing_handler()
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ async def _serialize_payload(requester, room_id, user_id, typing, timeout):
+ payload = {
+ "requester": requester.serialize(),
+ "typing": typing,
+ "timeout": timeout,
+ }
+
+ return payload
+
+ async def _handle_request(self, request, room_id, user_id):
+ content = parse_json_object_from_request(request)
+
+ requester = Requester.deserialize(self.store, content["requester"])
+ request.requester = requester
+
+ target_user = UserID.from_string(user_id)
+
+ if content["typing"]:
+ await self.handler.started_typing(
+ target_user,
+ requester,
+ room_id,
+ content["timeout"],
+ )
+ else:
+ await self.handler.stopped_typing(
+ target_user,
+ requester,
+ room_id,
+ )
+
+ return 200, {}
+
+
+def register_servlets(hs, http_server):
+ ReplicationTypingRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 25ba52c624..f5b5cff636 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -1254,18 +1254,11 @@ class RoomTypingRestServlet(RestServlet):
self.presence_handler = hs.get_presence_handler()
self.auth = hs.get_auth()
- # If we're not on the typing writer instance we should scream if we get
- # requests.
- self._is_typing_writer = (
- hs.config.worker.writers.typing == hs.get_instance_name()
- )
+ self.handler = hs.get_typing_handler()
async def on_PUT(self, request, room_id, user_id):
requester = await self.auth.get_user_by_req(request)
- if not self._is_typing_writer:
- raise Exception("Got /typing request on instance that is not typing writer")
-
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))
@@ -1276,19 +1269,16 @@ class RoomTypingRestServlet(RestServlet):
# Limit timeout to stop people from setting silly typing timeouts.
timeout = min(content.get("timeout", 30000), 120000)
- # Defer getting the typing handler since it will raise on workers.
- typing_handler = self.hs.get_typing_writer_handler()
-
try:
if content["typing"]:
- await typing_handler.started_typing(
+ await self.handler.started_typing(
target_user=target_user,
requester=requester,
room_id=room_id,
timeout=timeout,
)
else:
- await typing_handler.stopped_typing(
+ await self.handler.stopped_typing(
target_user=target_user, requester=requester, room_id=room_id
)
except ShadowBanError:
|