diff options
author | Jason Robinson <jasonr@matrix.org> | 2021-01-26 12:57:38 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-26 12:57:38 +0200 |
commit | e5b659e9e12e0f7d1467a154b152666da31db02c (patch) | |
tree | b3be45139814147228555ee67cd4064862d56b04 /synapse | |
parent | Periodically send pings to detect dead Redis connections (#9218) (diff) | |
parent | Fix get forward extremities query (diff) | |
download | synapse-e5b659e9e12e0f7d1467a154b152666da31db02c.tar.xz |
Merge pull request #9062 from matrix-org/jaywink/admin-forward-extremities
Add forward extremities endpoint to rooms admin API
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/rest/admin/__init__.py | 4 | ||||
-rw-r--r-- | synapse/rest/admin/rooms.py | 59 | ||||
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_forward_extremities.py | 101 |
4 files changed, 166 insertions, 2 deletions
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index f04740cd38..57e0a10837 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd +# Copyright 2020, 2021 The Matrix.org Foundation C.I.C. + # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,6 +38,7 @@ from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_medi from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet from synapse.rest.admin.rooms import ( DeleteRoomRestServlet, + ForwardExtremitiesRestServlet, JoinRoomAliasServlet, ListRoomRestServlet, MakeRoomAdminRestServlet, @@ -232,6 +235,7 @@ def register_servlets(hs, http_server): PushersRestServlet(hs).register(http_server) MakeRoomAdminRestServlet(hs).register(http_server) ShadowBanRestServlet(hs).register(http_server) + ForwardExtremitiesRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource(hs, http_server): diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index ab7cc9102a..da1499cab3 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -499,3 +499,60 @@ class MakeRoomAdminRestServlet(RestServlet): ) return 200, {} + + +class ForwardExtremitiesRestServlet(RestServlet): + """Allows a server admin to get or clear forward extremities. + + Clearing does not require restarting the server. + + Clear forward extremities: + DELETE /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities + + Get forward_extremities: + GET /_synapse/admin/v1/rooms/<room_id_or_alias>/forward_extremities + """ + + PATTERNS = admin_patterns("/rooms/(?P<room_identifier>[^/]*)/forward_extremities") + + def __init__(self, hs: "HomeServer"): + self.hs = hs + self.auth = hs.get_auth() + self.room_member_handler = hs.get_room_member_handler() + self.store = hs.get_datastore() + + async def resolve_room_id(self, room_identifier: str) -> str: + """Resolve to a room ID, if necessary.""" + if RoomID.is_valid(room_identifier): + resolved_room_id = room_identifier + elif RoomAlias.is_valid(room_identifier): + room_alias = RoomAlias.from_string(room_identifier) + room_id, _ = await self.room_member_handler.lookup_room_alias(room_alias) + resolved_room_id = room_id.to_string() + else: + raise SynapseError( + 400, "%s was not legal room ID or room alias" % (room_identifier,) + ) + if not resolved_room_id: + raise SynapseError( + 400, "Unknown room ID or room alias %s" % room_identifier + ) + return resolved_room_id + + async def on_DELETE(self, request, room_identifier): + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + room_id = await self.resolve_room_id(room_identifier) + + deleted_count = await self.store.delete_forward_extremities_for_room(room_id) + return 200, {"deleted": deleted_count} + + async def on_GET(self, request, room_identifier): + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + room_id = await self.resolve_room_id(room_identifier) + + extremities = await self.store.get_forward_extremities_for_room(room_id) + return 200, {"count": len(extremities), "results": extremities} diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index ae561a2da3..5d0845588c 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ from .end_to_end_keys import EndToEndKeyStore from .event_federation import EventFederationStore from .event_push_actions import EventPushActionsStore from .events_bg_updates import EventsBackgroundUpdatesStore +from .events_forward_extremities import EventForwardExtremitiesStore from .filtering import FilteringStore from .group_server import GroupServerStore from .keys import KeyStore @@ -118,6 +119,7 @@ class DataStore( UIAuthStore, CacheInvalidationWorkerStore, ServerMetricsStore, + EventForwardExtremitiesStore, ): def __init__(self, database: DatabasePool, db_conn, hs): self.hs = hs diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py new file mode 100644 index 0000000000..0ac1da9c35 --- /dev/null +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Dict, List + +from synapse.api.errors import SynapseError +from synapse.storage._base import SQLBaseStore + +logger = logging.getLogger(__name__) + + +class EventForwardExtremitiesStore(SQLBaseStore): + async def delete_forward_extremities_for_room(self, room_id: str) -> int: + """Delete any extra forward extremities for a room. + + Invalidates the "get_latest_event_ids_in_room" cache if any forward + extremities were deleted. + + Returns count deleted. + """ + + def delete_forward_extremities_for_room_txn(txn): + # First we need to get the event_id to not delete + sql = """ + SELECT event_id FROM event_forward_extremities + INNER JOIN events USING (room_id, event_id) + WHERE room_id = ? + ORDER BY stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (room_id,)) + rows = txn.fetchall() + try: + event_id = rows[0][0] + logger.debug( + "Found event_id %s as the forward extremity to keep for room %s", + event_id, + room_id, + ) + except KeyError: + msg = "No forward extremity event found for room %s" % room_id + logger.warning(msg) + raise SynapseError(400, msg) + + # Now delete the extra forward extremities + sql = """ + DELETE FROM event_forward_extremities + WHERE event_id != ? AND room_id = ? + """ + + txn.execute(sql, (event_id, room_id)) + logger.info( + "Deleted %s extra forward extremities for room %s", + txn.rowcount, + room_id, + ) + + if txn.rowcount > 0: + # Invalidate the cache + self._invalidate_cache_and_stream( + txn, self.get_latest_event_ids_in_room, (room_id,), + ) + + return txn.rowcount + + return await self.db_pool.runInteraction( + "delete_forward_extremities_for_room", + delete_forward_extremities_for_room_txn, + ) + + async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: + """Get list of forward extremities for a room.""" + + def get_forward_extremities_for_room_txn(txn): + sql = """ + SELECT event_id, state_group, depth, received_ts + FROM event_forward_extremities + INNER JOIN event_to_state_groups USING (event_id) + INNER JOIN events USING (room_id, event_id) + WHERE room_id = ? + """ + + txn.execute(sql, (room_id,)) + return self.db_pool.cursor_to_dict(txn) + + return await self.db_pool.runInteraction( + "get_forward_extremities_for_room", get_forward_extremities_for_room_txn, + ) |