diff options
author | Andrew Morgan <andrewm@element.io> | 2022-11-11 18:34:59 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2022-12-19 16:44:24 +0000 |
commit | f66d743eae12835885be40120f390caab9b6a95d (patch) | |
tree | f86b27e0cad4746a212b3ed4310a603c8261b7d8 /synapse | |
parent | Enable Complement tests for MSC3391 (diff) | |
download | synapse-f66d743eae12835885be40120f390caab9b6a95d.tar.xz |
Add replication methods for removing account data
Also rename existing account data methods, explicitly stating that they're for adding account data.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/account_data.py | 28 | ||||
-rw-r--r-- | synapse/replication/http/account_data.py | 88 |
2 files changed, 104 insertions, 12 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index fc21d58001..543e575b10 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -17,10 +17,12 @@ import random from typing import TYPE_CHECKING, Awaitable, Callable, Collection, List, Optional, Tuple from synapse.replication.http.account_data import ( + ReplicationAddRoomAccountDataRestServlet, ReplicationAddTagRestServlet, + ReplicationAddUserAccountDataRestServlet, + ReplicationRemoveRoomAccountDataRestServlet, ReplicationRemoveTagRestServlet, - ReplicationRoomAccountDataRestServlet, - ReplicationUserAccountDataRestServlet, + ReplicationRemoveUserAccountDataRestServlet, ) from synapse.streams import EventSource from synapse.types import JsonDict, StreamKeyType, UserID @@ -41,8 +43,18 @@ class AccountDataHandler: self._instance_name = hs.get_instance_name() self._notifier = hs.get_notifier() - self._user_data_client = ReplicationUserAccountDataRestServlet.make_client(hs) - self._room_data_client = ReplicationRoomAccountDataRestServlet.make_client(hs) + self._add_user_data_client = ( + ReplicationAddUserAccountDataRestServlet.make_client(hs) + ) + self._remove_user_data_client = ( + ReplicationRemoveUserAccountDataRestServlet.make_client(hs) + ) + self._add_room_data_client = ( + ReplicationAddRoomAccountDataRestServlet.make_client(hs) + ) + self._remove_room_data_client = ( + ReplicationRemoveRoomAccountDataRestServlet.make_client(hs) + ) self._add_tag_client = ReplicationAddTagRestServlet.make_client(hs) self._remove_tag_client = ReplicationRemoveTagRestServlet.make_client(hs) self._account_data_writers = hs.config.worker.writers.account_data @@ -112,7 +124,7 @@ class AccountDataHandler: return max_stream_id else: - response = await self._room_data_client( + response = await self._add_room_data_client( instance_name=random.choice(self._account_data_writers), user_id=user_id, room_id=room_id, @@ -127,9 +139,9 @@ class AccountDataHandler: """Add some global account_data for a user. Args: - user_id: The user to add a tag for. + user_id: The user to add some account data for. account_data_type: The type of account_data to add. - content: A json object to associate with the tag. + content: The content json dictionary. Returns: The maximum stream ID. @@ -148,7 +160,7 @@ class AccountDataHandler: return max_stream_id else: - response = await self._user_data_client( + response = await self._add_user_data_client( instance_name=random.choice(self._account_data_writers), user_id=user_id, account_data_type=account_data_type, diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py index 310f609153..201a81cb40 100644 --- a/synapse/replication/http/account_data.py +++ b/synapse/replication/http/account_data.py @@ -28,7 +28,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class ReplicationUserAccountDataRestServlet(ReplicationEndpoint): +class ReplicationAddUserAccountDataRestServlet(ReplicationEndpoint): """Add user account data on the appropriate account data worker. Request format: @@ -73,7 +73,46 @@ class ReplicationUserAccountDataRestServlet(ReplicationEndpoint): return 200, {"max_stream_id": max_stream_id} -class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint): +class ReplicationRemoveUserAccountDataRestServlet(ReplicationEndpoint): + """Remove user account data on the appropriate account data worker. + + Request format: + + POST /_synapse/replication/remove_user_account_data/:user_id/:type + + { + "content": { ... }, + } + + """ + + NAME = "remove_user_account_data" + PATH_ARGS = ("user_id", "account_data_type") + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.handler = hs.get_account_data_handler() + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + user_id: str, account_data_type: str + ) -> JsonDict: + return {} + + async def _handle_request( # type: ignore[override] + self, request: Request, user_id: str, account_data_type: str + ) -> Tuple[int, JsonDict]: + max_stream_id = await self.handler.remove_account_data_for_user( + user_id, account_data_type + ) + + return 200, {"max_stream_id": max_stream_id} + + +class ReplicationAddRoomAccountDataRestServlet(ReplicationEndpoint): """Add room account data on the appropriate account data worker. Request format: @@ -118,6 +157,45 @@ class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint): return 200, {"max_stream_id": max_stream_id} +class ReplicationRemoveRoomAccountDataRestServlet(ReplicationEndpoint): + """Remove room account data on the appropriate account data worker. + + Request format: + + POST /_synapse/replication/remove_room_account_data/:user_id/:room_id/:account_data_type + + { + "content": { ... }, + } + + """ + + NAME = "remove_room_account_data" + PATH_ARGS = ("user_id", "room_id", "account_data_type") + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.handler = hs.get_account_data_handler() + self.clock = hs.get_clock() + + @staticmethod + async def _serialize_payload( # type: ignore[override] + user_id: str, room_id: str, account_data_type: str, content: JsonDict + ) -> JsonDict: + return {} + + async def _handle_request( # type: ignore[override] + self, request: Request, user_id: str, room_id: str, account_data_type: str + ) -> Tuple[int, JsonDict]: + max_stream_id = await self.handler.remove_account_data_for_room( + user_id, room_id, account_data_type + ) + + return 200, {"max_stream_id": max_stream_id} + + class ReplicationAddTagRestServlet(ReplicationEndpoint): """Add tag on the appropriate account data worker. @@ -206,7 +284,9 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint): def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - ReplicationUserAccountDataRestServlet(hs).register(http_server) - ReplicationRoomAccountDataRestServlet(hs).register(http_server) + ReplicationAddUserAccountDataRestServlet(hs).register(http_server) + ReplicationRemoveUserAccountDataRestServlet(hs).register(http_server) + ReplicationAddRoomAccountDataRestServlet(hs).register(http_server) + ReplicationRemoveRoomAccountDataRestServlet(hs).register(http_server) ReplicationAddTagRestServlet(hs).register(http_server) ReplicationRemoveTagRestServlet(hs).register(http_server) |