From b849e46139675c3098fdaca8ceff6b76be3f2f02 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Thu, 7 Jan 2021 23:01:59 +0200 Subject: Add forward extremities endpoint to rooms admin API GET /_synapse/admin/v1/rooms//forward_extremities now gets forward extremities for a room, returning count and the list of extremities. Signed-off-by: Jason Robinson --- synapse/storage/databases/main/__init__.py | 2 ++ .../databases/main/events_forward_extremities.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 synapse/storage/databases/main/events_forward_extremities.py (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index c4de07a0a8..93b25af057 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -43,6 +43,7 @@ from .end_to_end_keys import EndToEndKeyStore from .event_federation import EventFederationStore from .event_push_actions import EventPushActionsStore from .events_bg_updates import EventsBackgroundUpdatesStore +from .events_forward_extremities import EventForwardExtremitiesStore from .filtering import FilteringStore from .group_server import GroupServerStore from .keys import KeyStore @@ -118,6 +119,7 @@ class DataStore( UIAuthStore, CacheInvalidationWorkerStore, ServerMetricsStore, + EventForwardExtremitiesStore, ): def __init__(self, database: DatabasePool, db_conn, hs): self.hs = hs diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py new file mode 100644 index 0000000000..250a424cc0 --- /dev/null +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -0,0 +1,20 @@ +from typing import List, Dict + +from synapse.storage._base import SQLBaseStore + + +class EventForwardExtremitiesStore(SQLBaseStore): + async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: + def get_forward_extremities_for_room_txn(txn): + sql = ( + "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " + "WHERE room_id = ?" + ) + + txn.execute(sql, (room_id,)) + rows = txn.fetchall() + return [{"event_id": row[0], "state_group": row[1]} for row in rows] + + return await self.db_pool.runInteraction( + "get_forward_extremities_for_room", get_forward_extremities_for_room_txn + ) -- cgit 1.5.1 From 85c0999bfb70f2e8438a9730b8858e7845027190 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Fri, 8 Jan 2021 00:12:23 +0200 Subject: Add Rooms admin forward extremities DELETE endpoint Signed-off-by: Jason Robinson --- synapse/rest/admin/rooms.py | 5 +++ .../databases/main/events_forward_extremities.py | 49 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 1 deletion(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 1f7b7daea9..76f8603821 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -543,6 +543,11 @@ class ForwardExtremitiesRestServlet(RestServlet): room_id = await self.resolve_room_id(room_identifier) + deleted_count = await self.store.delete_forward_extremities_for_room(room_id) + return 200, { + "deleted": deleted_count, + } + async def on_GET(self, request, room_identifier): requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 250a424cc0..cc684a94fe 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -4,7 +4,54 @@ from synapse.storage._base import SQLBaseStore class EventForwardExtremitiesStore(SQLBaseStore): + + async def delete_forward_extremities_for_room(self, room_id: str) -> int: + """Delete any extra forward extremities for a room. + + Returns count deleted. + """ + def delete_forward_extremities_for_room_txn(txn): + # First we need to get the event_id to not delete + sql = ( + "SELECT " + " last_value(event_id) OVER w AS event_id" + " FROM event_forward_extremities" + " NATURAL JOIN events" + " where room_id = ?" + " WINDOW w AS (" + " PARTITION BY room_id" + " ORDER BY stream_ordering" + " range between unbounded preceding and unbounded following" + " )" + " ORDER BY stream_ordering" + ) + txn.execute(sql, (room_id,)) + rows = txn.fetchall() + + # TODO: should this raise a SynapseError instead of better to blow? + event_id = rows[0][0] + + # Now delete the extra forward extremities + sql = ( + "DELETE FROM event_forward_extremities " + "WHERE" + " event_id != ?" + " AND room_id = ?" + ) + + # TODO we should not commit yet + txn.execute(sql, (event_id, room_id)) + + # TODO flush the cache then commit + + return txn.rowcount + + return await self.db_pool.runInteraction( + "delete_forward_extremities_for_room", delete_forward_extremities_for_room_txn, + ) + async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: + """Get list of forward extremities for a room.""" def get_forward_extremities_for_room_txn(txn): sql = ( "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " @@ -16,5 +63,5 @@ class EventForwardExtremitiesStore(SQLBaseStore): return [{"event_id": row[0], "state_group": row[1]} for row in rows] return await self.db_pool.runInteraction( - "get_forward_extremities_for_room", get_forward_extremities_for_room_txn + "get_forward_extremities_for_room", get_forward_extremities_for_room_txn, ) -- cgit 1.5.1 From 90ad4d443a109ad95741b499d914006578acceef Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Sat, 9 Jan 2021 21:57:41 +0200 Subject: Implement clearing cache after deleting forward extremities Also run linter. Signed-off-by: Jason Robinson --- synapse/rest/admin/rooms.py | 21 +++++------ .../databases/main/events_forward_extremities.py | 41 +++++++++++++++++----- 2 files changed, 42 insertions(+), 20 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 76f8603821..6757a8100b 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -524,18 +524,20 @@ class ForwardExtremitiesRestServlet(RestServlet): async def resolve_room_id(self, room_identifier: str) -> str: """Resolve to a room ID, if necessary.""" if RoomID.is_valid(room_identifier): - room_id = room_identifier + resolved_room_id = room_identifier elif RoomAlias.is_valid(room_identifier): room_alias = RoomAlias.from_string(room_identifier) room_id, _ = await self.room_member_handler.lookup_room_alias(room_alias) - room_id = room_id.to_string() + resolved_room_id = room_id.to_string() else: raise SynapseError( 400, "%s was not legal room ID or room alias" % (room_identifier,) ) - if not room_id: - raise SynapseError(400, "Unknown room ID or room alias %s" % room_identifier) - return room_id + if not resolved_room_id: + raise SynapseError( + 400, "Unknown room ID or room alias %s" % room_identifier + ) + return resolved_room_id async def on_DELETE(self, request, room_identifier): requester = await self.auth.get_user_by_req(request) @@ -544,9 +546,7 @@ class ForwardExtremitiesRestServlet(RestServlet): room_id = await self.resolve_room_id(room_identifier) deleted_count = await self.store.delete_forward_extremities_for_room(room_id) - return 200, { - "deleted": deleted_count, - } + return 200, {"deleted": deleted_count} async def on_GET(self, request, room_identifier): requester = await self.auth.get_user_by_req(request) @@ -555,7 +555,4 @@ class ForwardExtremitiesRestServlet(RestServlet): room_id = await self.resolve_room_id(room_identifier) extremities = await self.store.get_forward_extremities_for_room(room_id) - return 200, { - "count": len(extremities), - "results": extremities, - } + return 200, {"count": len(extremities), "results": extremities} diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index cc684a94fe..6b8da52fee 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -1,15 +1,22 @@ -from typing import List, Dict +import logging +from typing import Dict, List +from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore +logger = logging.getLogger(__name__) -class EventForwardExtremitiesStore(SQLBaseStore): +class EventForwardExtremitiesStore(SQLBaseStore): async def delete_forward_extremities_for_room(self, room_id: str) -> int: """Delete any extra forward extremities for a room. + Invalidates the "get_latest_event_ids_in_room" cache if any forward + extremities were deleted. + Returns count deleted. """ + def delete_forward_extremities_for_room_txn(txn): # First we need to get the event_id to not delete sql = ( @@ -27,9 +34,17 @@ class EventForwardExtremitiesStore(SQLBaseStore): ) txn.execute(sql, (room_id,)) rows = txn.fetchall() - - # TODO: should this raise a SynapseError instead of better to blow? - event_id = rows[0][0] + try: + event_id = rows[0][0] + logger.debug( + "Found event_id %s as the forward extremity to keep for room %s", + event_id, + room_id, + ) + except KeyError: + msg = f"No forward extremity event found for room {room_id}" + logger.warning(msg) + raise SynapseError(400, msg) # Now delete the extra forward extremities sql = ( @@ -39,19 +54,29 @@ class EventForwardExtremitiesStore(SQLBaseStore): " AND room_id = ?" ) - # TODO we should not commit yet txn.execute(sql, (event_id, room_id)) + logger.info( + "Deleted %s extra forward extremities for room %s", + txn.rowcount, + room_id, + ) - # TODO flush the cache then commit + if txn.rowcount > 0: + # Invalidate the cache + self._invalidate_cache_and_stream( + txn, self.get_latest_event_ids_in_room, (room_id,), + ) return txn.rowcount return await self.db_pool.runInteraction( - "delete_forward_extremities_for_room", delete_forward_extremities_for_room_txn, + "delete_forward_extremities_for_room", + delete_forward_extremities_for_room_txn, ) async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: """Get list of forward extremities for a room.""" + def get_forward_extremities_for_room_txn(txn): sql = ( "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " -- cgit 1.5.1 From b52fb703f788b3de3afa1142852354b876f6bacf Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 09:47:03 +0200 Subject: Don't try to use f-strings Signed-off-by: Jason Robinson --- synapse/storage/databases/main/events_forward_extremities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 6b8da52fee..83f751cf5b 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -42,7 +42,7 @@ class EventForwardExtremitiesStore(SQLBaseStore): room_id, ) except KeyError: - msg = f"No forward extremity event found for room {room_id}" + msg = "No forward extremity event found for room %s" % room_id logger.warning(msg) raise SynapseError(400, msg) -- cgit 1.5.1 From da16d06301aec83d144812d727c24192eb890c93 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 23:43:58 +0200 Subject: Address pr feedback * docs updates * prettify SQL * add missing copyright * cursor_to_dict * update touched files copyright years Signed-off-by: Jason Robinson --- docs/admin_api/rooms.md | 12 +--- synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/rooms.py | 2 +- synapse/storage/databases/main/__init__.py | 2 +- .../databases/main/events_forward_extremities.py | 64 +++++++++++++--------- 5 files changed, 46 insertions(+), 36 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 1d59bb5c4b..86daa393a7 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -516,11 +516,8 @@ optionally be specified, e.g.: # Forward Extremities Admin API Enables querying and deleting forward extremities from rooms. When a lot of forward -extremities accumulate in a room, performance can become degraded. - -When using this API endpoint to delete any extra forward extremities for a room, -the server does not need to be restarted as the relevant caches will be cleared -in the API call. +extremities accumulate in a room, performance can become degraded. For details, see +[#1760](https://github.com/matrix-org/synapse/issues/1760). ## Check for forward extremities @@ -537,7 +534,7 @@ A response as follows will be returned: "count": 1, "results": [ { - "event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdfgh", + "event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdefgh", "state_group": 439 } ] @@ -561,6 +558,3 @@ that were deleted. "deleted": 1 } ``` - -The cache `get_latest_event_ids_in_room` will be invalidated, if any forward extremities -were deleted. diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index b80b036090..319ad7bf7f 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd +# Copyright 2020, 2021 The Matrix.org Foundation C.I.C. + # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 6757a8100b..da1499cab3 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 93b25af057..b936f54f1e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 83f751cf5b..e6c2d6e122 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging from typing import Dict, List @@ -19,19 +34,19 @@ class EventForwardExtremitiesStore(SQLBaseStore): def delete_forward_extremities_for_room_txn(txn): # First we need to get the event_id to not delete - sql = ( - "SELECT " - " last_value(event_id) OVER w AS event_id" - " FROM event_forward_extremities" - " NATURAL JOIN events" - " where room_id = ?" - " WINDOW w AS (" - " PARTITION BY room_id" - " ORDER BY stream_ordering" - " range between unbounded preceding and unbounded following" - " )" - " ORDER BY stream_ordering" - ) + sql = """ + SELECT + last_value(event_id) OVER w AS event_id + FROM event_forward_extremities + NATURAL JOIN events + WHERE room_id = ? + WINDOW w AS ( + PARTITION BY room_id + ORDER BY stream_ordering + range between unbounded preceding and unbounded following + ) + ORDER BY stream_ordering + """ txn.execute(sql, (room_id,)) rows = txn.fetchall() try: @@ -47,12 +62,10 @@ class EventForwardExtremitiesStore(SQLBaseStore): raise SynapseError(400, msg) # Now delete the extra forward extremities - sql = ( - "DELETE FROM event_forward_extremities " - "WHERE" - " event_id != ?" - " AND room_id = ?" - ) + sql = """ + DELETE FROM event_forward_extremities + WHERE event_id != ? AND room_id = ? + """ txn.execute(sql, (event_id, room_id)) logger.info( @@ -78,14 +91,15 @@ class EventForwardExtremitiesStore(SQLBaseStore): """Get list of forward extremities for a room.""" def get_forward_extremities_for_room_txn(txn): - sql = ( - "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " - "WHERE room_id = ?" - ) + sql = """ + SELECT event_id, state_group + FROM event_forward_extremities + NATURAL JOIN event_to_state_groups + WHERE room_id = ? + """ txn.execute(sql, (room_id,)) - rows = txn.fetchall() - return [{"event_id": row[0], "state_group": row[1]} for row in rows] + return self.db_pool.cursor_to_dict(txn) return await self.db_pool.runInteraction( "get_forward_extremities_for_room", get_forward_extremities_for_room_txn, -- cgit 1.5.1 From 49c619a9a2203da61f496fe6e3ae308be87efda8 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 23:49:58 +0200 Subject: Simplify delete_forward_extremities_for_room_txn SQL As per feedback. Signed-off-by: Jason Robinson --- .../storage/databases/main/events_forward_extremities.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index e6c2d6e122..c7ec08469d 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -35,17 +35,11 @@ class EventForwardExtremitiesStore(SQLBaseStore): def delete_forward_extremities_for_room_txn(txn): # First we need to get the event_id to not delete sql = """ - SELECT - last_value(event_id) OVER w AS event_id - FROM event_forward_extremities - NATURAL JOIN events + SELECT event_id FROM event_forward_extremities + INNER JOIN events USING (room_id, event_id) WHERE room_id = ? - WINDOW w AS ( - PARTITION BY room_id - ORDER BY stream_ordering - range between unbounded preceding and unbounded following - ) - ORDER BY stream_ordering + ORDER BY stream_ordering DESC + LIMIT 1 """ txn.execute(sql, (room_id,)) rows = txn.fetchall() -- cgit 1.5.1 From c177faf5a92d8ef02dd59e16dcf6ca9fb5ca6a33 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 23:55:44 +0200 Subject: Remove trailing whitespace to appease the linter Signed-off-by: Jason Robinson --- synapse/storage/databases/main/events_forward_extremities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index c7ec08469d..5fea974050 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -86,8 +86,8 @@ class EventForwardExtremitiesStore(SQLBaseStore): def get_forward_extremities_for_room_txn(txn): sql = """ - SELECT event_id, state_group - FROM event_forward_extremities + SELECT event_id, state_group + FROM event_forward_extremities NATURAL JOIN event_to_state_groups WHERE room_id = ? """ -- cgit 1.5.1 From 758ed5f1bc16f4b73d73d94129761a8680fd71c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jan 2021 17:00:12 +0000 Subject: Speed up chain cover calculation (#9176) --- changelog.d/9176.misc | 1 + synapse/storage/databases/main/events.py | 199 ++++++++++++++++++++++--------- synapse/storage/util/sequence.py | 16 +++ 3 files changed, 161 insertions(+), 55 deletions(-) create mode 100644 changelog.d/9176.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9176.misc b/changelog.d/9176.misc new file mode 100644 index 0000000000..9c41d7b0f9 --- /dev/null +++ b/changelog.d/9176.misc @@ -0,0 +1 @@ +Speed up chain cover calculation when persisting a batch of state events at once. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5db7d7aaa8..ccda9f1caa 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -473,8 +473,9 @@ class PersistEventsStore: txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain, ) - @staticmethod + @classmethod def _add_chain_cover_index( + cls, txn, db_pool: DatabasePool, event_to_room_id: Dict[str, str], @@ -614,60 +615,17 @@ class PersistEventsStore: if not events_to_calc_chain_id_for: return - # We now calculate the chain IDs/sequence numbers for the events. We - # do this by looking at the chain ID and sequence number of any auth - # event with the same type/state_key and incrementing the sequence - # number by one. If there was no match or the chain ID/sequence - # number is already taken we generate a new chain. - # - # We need to do this in a topologically sorted order as we want to - # generate chain IDs/sequence numbers of an event's auth events - # before the event itself. - chains_tuples_allocated = set() # type: Set[Tuple[int, int]] - new_chain_tuples = {} # type: Dict[str, Tuple[int, int]] - for event_id in sorted_topologically( - events_to_calc_chain_id_for, event_to_auth_chain - ): - existing_chain_id = None - for auth_id in event_to_auth_chain.get(event_id, []): - if event_to_types.get(event_id) == event_to_types.get(auth_id): - existing_chain_id = chain_map[auth_id] - break - - new_chain_tuple = None - if existing_chain_id: - # We found a chain ID/sequence number candidate, check its - # not already taken. - proposed_new_id = existing_chain_id[0] - proposed_new_seq = existing_chain_id[1] + 1 - if (proposed_new_id, proposed_new_seq) not in chains_tuples_allocated: - already_allocated = db_pool.simple_select_one_onecol_txn( - txn, - table="event_auth_chains", - keyvalues={ - "chain_id": proposed_new_id, - "sequence_number": proposed_new_seq, - }, - retcol="event_id", - allow_none=True, - ) - if already_allocated: - # Mark it as already allocated so we don't need to hit - # the DB again. - chains_tuples_allocated.add((proposed_new_id, proposed_new_seq)) - else: - new_chain_tuple = ( - proposed_new_id, - proposed_new_seq, - ) - - if not new_chain_tuple: - new_chain_tuple = (db_pool.event_chain_id_gen.get_next_id_txn(txn), 1) - - chains_tuples_allocated.add(new_chain_tuple) - - chain_map[event_id] = new_chain_tuple - new_chain_tuples[event_id] = new_chain_tuple + # Allocate chain ID/sequence numbers to each new event. + new_chain_tuples = cls._allocate_chain_ids( + txn, + db_pool, + event_to_room_id, + event_to_types, + event_to_auth_chain, + events_to_calc_chain_id_for, + chain_map, + ) + chain_map.update(new_chain_tuples) db_pool.simple_insert_many_txn( txn, @@ -794,6 +752,137 @@ class PersistEventsStore: ], ) + @staticmethod + def _allocate_chain_ids( + txn, + db_pool: DatabasePool, + event_to_room_id: Dict[str, str], + event_to_types: Dict[str, Tuple[str, str]], + event_to_auth_chain: Dict[str, List[str]], + events_to_calc_chain_id_for: Set[str], + chain_map: Dict[str, Tuple[int, int]], + ) -> Dict[str, Tuple[int, int]]: + """Allocates, but does not persist, chain ID/sequence numbers for the + events in `events_to_calc_chain_id_for`. (c.f. _add_chain_cover_index + for info on args) + """ + + # We now calculate the chain IDs/sequence numbers for the events. We do + # this by looking at the chain ID and sequence number of any auth event + # with the same type/state_key and incrementing the sequence number by + # one. If there was no match or the chain ID/sequence number is already + # taken we generate a new chain. + # + # We try to reduce the number of times that we hit the database by + # batching up calls, to make this more efficient when persisting large + # numbers of state events (e.g. during joins). + # + # We do this by: + # 1. Calculating for each event which auth event will be used to + # inherit the chain ID, i.e. converting the auth chain graph to a + # tree that we can allocate chains on. We also keep track of which + # existing chain IDs have been referenced. + # 2. Fetching the max allocated sequence number for each referenced + # existing chain ID, generating a map from chain ID to the max + # allocated sequence number. + # 3. Iterating over the tree and allocating a chain ID/seq no. to the + # new event, by incrementing the sequence number from the + # referenced event's chain ID/seq no. and checking that the + # incremented sequence number hasn't already been allocated (by + # looking in the map generated in the previous step). We generate a + # new chain if the sequence number has already been allocated. + # + + existing_chains = set() # type: Set[int] + tree = [] # type: List[Tuple[str, Optional[str]]] + + # We need to do this in a topologically sorted order as we want to + # generate chain IDs/sequence numbers of an event's auth events before + # the event itself. + for event_id in sorted_topologically( + events_to_calc_chain_id_for, event_to_auth_chain + ): + for auth_id in event_to_auth_chain.get(event_id, []): + if event_to_types.get(event_id) == event_to_types.get(auth_id): + existing_chain_id = chain_map.get(auth_id) + if existing_chain_id: + existing_chains.add(existing_chain_id[0]) + + tree.append((event_id, auth_id)) + break + else: + tree.append((event_id, None)) + + # Fetch the current max sequence number for each existing referenced chain. + sql = """ + SELECT chain_id, MAX(sequence_number) FROM event_auth_chains + WHERE %s + GROUP BY chain_id + """ + clause, args = make_in_list_sql_clause( + db_pool.engine, "chain_id", existing_chains + ) + txn.execute(sql % (clause,), args) + + chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int] + + # Allocate the new events chain ID/sequence numbers. + # + # To reduce the number of calls to the database we don't allocate a + # chain ID number in the loop, instead we use a temporary `object()` for + # each new chain ID. Once we've done the loop we generate the necessary + # number of new chain IDs in one call, replacing all temporary + # objects with real allocated chain IDs. + + unallocated_chain_ids = set() # type: Set[object] + new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]] + for event_id, auth_event_id in tree: + # If we reference an auth_event_id we fetch the allocated chain ID, + # either from the existing `chain_map` or the newly generated + # `new_chain_tuples` map. + existing_chain_id = None + if auth_event_id: + existing_chain_id = new_chain_tuples.get(auth_event_id) + if not existing_chain_id: + existing_chain_id = chain_map[auth_event_id] + + new_chain_tuple = None # type: Optional[Tuple[Any, int]] + if existing_chain_id: + # We found a chain ID/sequence number candidate, check its + # not already taken. + proposed_new_id = existing_chain_id[0] + proposed_new_seq = existing_chain_id[1] + 1 + + if chain_to_max_seq_no[proposed_new_id] < proposed_new_seq: + new_chain_tuple = ( + proposed_new_id, + proposed_new_seq, + ) + + # If we need to start a new chain we allocate a temporary chain ID. + if not new_chain_tuple: + new_chain_tuple = (object(), 1) + unallocated_chain_ids.add(new_chain_tuple[0]) + + new_chain_tuples[event_id] = new_chain_tuple + chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1] + + # Generate new chain IDs for all unallocated chain IDs. + newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn( + txn, len(unallocated_chain_ids) + ) + + # Map from potentially temporary chain ID to real chain ID + chain_id_to_allocated_map = dict( + zip(unallocated_chain_ids, newly_allocated_chain_ids) + ) # type: Dict[Any, int] + chain_id_to_allocated_map.update((c, c) for c in existing_chains) + + return { + event_id: (chain_id_to_allocated_map[chain_id], seq) + for event_id, (chain_id, seq) in new_chain_tuples.items() + } + def _persist_transaction_ids_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index c780ade077..0ec4dc2918 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -69,6 +69,11 @@ class SequenceGenerator(metaclass=abc.ABCMeta): """Gets the next ID in the sequence""" ... + @abc.abstractmethod + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + """Get the next `n` IDs in the sequence""" + ... + @abc.abstractmethod def check_consistency( self, @@ -219,6 +224,17 @@ class LocalSequenceGenerator(SequenceGenerator): self._current_max_id += 1 return self._current_max_id + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + with self._lock: + if self._current_max_id is None: + assert self._callback is not None + self._current_max_id = self._callback(txn) + self._callback = None + + first_id = self._current_max_id + 1 + self._current_max_id += n + return [first_id + i for i in range(n)] + def check_consistency( self, db_conn: Connection, -- cgit 1.5.1 From 930ba009719788ebc2004c6ef89329dae1b9689b Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Sat, 23 Jan 2021 21:34:32 +0200 Subject: Add depth and received_ts to forward_extremities admin API response Also add a warning on the admin API documentation. Signed-off-by: Jason Robinson --- docs/admin_api/rooms.md | 8 +++++++- synapse/storage/databases/main/events_forward_extremities.py | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 86daa393a7..f34cec1ff7 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -535,7 +535,9 @@ A response as follows will be returned: "results": [ { "event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdefgh", - "state_group": 439 + "state_group": 439, + "depth": 123, + "received_ts": 1611263016761 } ] } @@ -543,6 +545,10 @@ A response as follows will be returned: ## Deleting forward extremities +**WARNING**: Please ensure you know what you're doing and have read +the related issue [#1760](https://github.com/matrix-org/synapse/issues/1760). +Under no situations should this API be executed as an automated maintenance task! + If a room has lots of forward extremities, the extra can be deleted as follows: diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 5fea974050..84aaa919fb 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -86,9 +86,10 @@ class EventForwardExtremitiesStore(SQLBaseStore): def get_forward_extremities_for_room_txn(txn): sql = """ - SELECT event_id, state_group + SELECT event_id, state_group, depth, received_ts FROM event_forward_extremities NATURAL JOIN event_to_state_groups + NATURAL JOIN events WHERE room_id = ? """ -- cgit 1.5.1