summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-07-28 10:58:45 +0100
committerErik Johnston <erik@matrix.org>2021-07-28 10:58:45 +0100
commit13f9422e38cbd1f25aa94d086c5be98084280fdf (patch)
tree3b71c9c9b48a0f69bdf7e403ca18cb0a6dd88c0c
parentAdd a PeriodicallyFlushingMemoryHandler to prevent logging silence (#10407) (diff)
downloadsynapse-13f9422e38cbd1f25aa94d086c5be98084280fdf.tar.xz
Allow /typing to be handled by any worker
-rw-r--r--synapse/handlers/typing.py29
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/typing.py89
-rw-r--r--synapse/rest/client/v1/room.py16
4 files changed, 122 insertions, 14 deletions
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py

index 0cb651a400..70e3aee4ce 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py
@@ -22,6 +22,7 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) +from synapse.replication.http.typing import ReplicationTypingRestServlet from synapse.replication.tcp.streams import TypingStream from synapse.types import JsonDict, Requester, UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -61,7 +62,9 @@ class FollowerTypingHandler: if hs.should_send_federation(): self.federation = hs.get_federation_sender() - if hs.config.worker.writers.typing != hs.get_instance_name(): + self._typing_repl_client = ReplicationTypingRestServlet.make_client(hs) + self._typing_worker = hs.config.worker.writers.typing + if self._typing_worker != hs.get_instance_name(): hs.get_federation_registry().register_instance_for_edu( "m.typing", hs.config.worker.writers.typing, @@ -199,6 +202,30 @@ class FollowerTypingHandler: def get_current_token(self) -> int: return self._latest_room_serial + async def started_typing( + self, target_user: UserID, requester: Requester, room_id: str, timeout: int + ) -> None: + await self._typing_repl_client( + typing=True, + instance_name=self._typing_worker, + user_id=target_user.to_string(), + requester=requester, + room_id=room_id, + timeout=timeout, + ) + + async def stopped_typing( + self, target_user: UserID, requester: Requester, room_id: str + ) -> None: + await self._typing_repl_client( + typing=True, + instance_name=self._typing_worker, + user_id=target_user.to_string(), + requester=requester, + room_id=room_id, + timeout=None, + ) + class TypingWriterHandler(FollowerTypingHandler): def __init__(self, hs: "HomeServer"): diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index ba8114ac9e..de17af748b 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py
@@ -24,6 +24,7 @@ from synapse.replication.http import ( register, send_event, streams, + typing, ) REPLICATION_PREFIX = "/_synapse/replication" @@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource): streams.register_servlets(hs, self) account_data.register_servlets(hs, self) push.register_servlets(hs, self) + typing.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/typing.py b/synapse/replication/http/typing.py new file mode 100644
index 0000000000..da8e54da3e --- /dev/null +++ b/synapse/replication/http/typing.py
@@ -0,0 +1,89 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.types import Requester, UserID +from typing import TYPE_CHECKING +import logging + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationTypingRestServlet(ReplicationEndpoint): + """Call to start or stop a user typing in a room. + + Request format: + + POST /_synapse/replication/typing/:room_id/:user_id + + { + "requester": ..., + "typing": true, + "timeout": 30000 + } + + """ + + NAME = "typing" + PATH_ARGS = ("room_id", "user_id") + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.handler = hs.get_typing_handler() + self.store = hs.get_datastore() + + @staticmethod + async def _serialize_payload(requester, room_id, user_id, typing, timeout): + payload = { + "requester": requester.serialize(), + "typing": typing, + "timeout": timeout, + } + + return payload + + async def _handle_request(self, request, room_id, user_id): + content = parse_json_object_from_request(request) + + requester = Requester.deserialize(self.store, content["requester"]) + request.requester = requester + + target_user = UserID.from_string(user_id) + + if content["typing"]: + await self.handler.started_typing( + target_user, + requester, + room_id, + content["timeout"], + ) + else: + await self.handler.stopped_typing( + target_user, + requester, + room_id, + ) + + return 200, {} + + +def register_servlets(hs, http_server): + ReplicationTypingRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 25ba52c624..f5b5cff636 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py
@@ -1254,18 +1254,11 @@ class RoomTypingRestServlet(RestServlet): self.presence_handler = hs.get_presence_handler() self.auth = hs.get_auth() - # If we're not on the typing writer instance we should scream if we get - # requests. - self._is_typing_writer = ( - hs.config.worker.writers.typing == hs.get_instance_name() - ) + self.handler = hs.get_typing_handler() async def on_PUT(self, request, room_id, user_id): requester = await self.auth.get_user_by_req(request) - if not self._is_typing_writer: - raise Exception("Got /typing request on instance that is not typing writer") - room_id = urlparse.unquote(room_id) target_user = UserID.from_string(urlparse.unquote(user_id)) @@ -1276,19 +1269,16 @@ class RoomTypingRestServlet(RestServlet): # Limit timeout to stop people from setting silly typing timeouts. timeout = min(content.get("timeout", 30000), 120000) - # Defer getting the typing handler since it will raise on workers. - typing_handler = self.hs.get_typing_writer_handler() - try: if content["typing"]: - await typing_handler.started_typing( + await self.handler.started_typing( target_user=target_user, requester=requester, room_id=room_id, timeout=timeout, ) else: - await typing_handler.stopped_typing( + await self.handler.stopped_typing( target_user=target_user, requester=requester, room_id=room_id ) except ShadowBanError: