From b849e46139675c3098fdaca8ceff6b76be3f2f02 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Thu, 7 Jan 2021 23:01:59 +0200 Subject: Add forward extremities endpoint to rooms admin API GET /_synapse/admin/v1/rooms//forward_extremities now gets forward extremities for a room, returning count and the list of extremities. Signed-off-by: Jason Robinson --- synapse/storage/databases/main/__init__.py | 2 ++ .../databases/main/events_forward_extremities.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 synapse/storage/databases/main/events_forward_extremities.py (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index c4de07a0a8..93b25af057 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -43,6 +43,7 @@ from .end_to_end_keys import EndToEndKeyStore from .event_federation import EventFederationStore from .event_push_actions import EventPushActionsStore from .events_bg_updates import EventsBackgroundUpdatesStore +from .events_forward_extremities import EventForwardExtremitiesStore from .filtering import FilteringStore from .group_server import GroupServerStore from .keys import KeyStore @@ -118,6 +119,7 @@ class DataStore( UIAuthStore, CacheInvalidationWorkerStore, ServerMetricsStore, + EventForwardExtremitiesStore, ): def __init__(self, database: DatabasePool, db_conn, hs): self.hs = hs diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py new file mode 100644 index 0000000000..250a424cc0 --- /dev/null +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -0,0 +1,20 @@ +from typing import List, Dict + +from synapse.storage._base import SQLBaseStore + + +class EventForwardExtremitiesStore(SQLBaseStore): + async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: + def get_forward_extremities_for_room_txn(txn): + sql = ( + "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " + "WHERE room_id = ?" + ) + + txn.execute(sql, (room_id,)) + rows = txn.fetchall() + return [{"event_id": row[0], "state_group": row[1]} for row in rows] + + return await self.db_pool.runInteraction( + "get_forward_extremities_for_room", get_forward_extremities_for_room_txn + ) -- cgit 1.5.1 From 85c0999bfb70f2e8438a9730b8858e7845027190 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Fri, 8 Jan 2021 00:12:23 +0200 Subject: Add Rooms admin forward extremities DELETE endpoint Signed-off-by: Jason Robinson --- synapse/rest/admin/rooms.py | 5 +++ .../databases/main/events_forward_extremities.py | 49 +++++++++++++++++++++- 2 files changed, 53 insertions(+), 1 deletion(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 1f7b7daea9..76f8603821 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -543,6 +543,11 @@ class ForwardExtremitiesRestServlet(RestServlet): room_id = await self.resolve_room_id(room_identifier) + deleted_count = await self.store.delete_forward_extremities_for_room(room_id) + return 200, { + "deleted": deleted_count, + } + async def on_GET(self, request, room_identifier): requester = await self.auth.get_user_by_req(request) await assert_user_is_admin(self.auth, requester.user) diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 250a424cc0..cc684a94fe 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -4,7 +4,54 @@ from synapse.storage._base import SQLBaseStore class EventForwardExtremitiesStore(SQLBaseStore): + + async def delete_forward_extremities_for_room(self, room_id: str) -> int: + """Delete any extra forward extremities for a room. + + Returns count deleted. + """ + def delete_forward_extremities_for_room_txn(txn): + # First we need to get the event_id to not delete + sql = ( + "SELECT " + " last_value(event_id) OVER w AS event_id" + " FROM event_forward_extremities" + " NATURAL JOIN events" + " where room_id = ?" + " WINDOW w AS (" + " PARTITION BY room_id" + " ORDER BY stream_ordering" + " range between unbounded preceding and unbounded following" + " )" + " ORDER BY stream_ordering" + ) + txn.execute(sql, (room_id,)) + rows = txn.fetchall() + + # TODO: should this raise a SynapseError instead of better to blow? + event_id = rows[0][0] + + # Now delete the extra forward extremities + sql = ( + "DELETE FROM event_forward_extremities " + "WHERE" + " event_id != ?" + " AND room_id = ?" + ) + + # TODO we should not commit yet + txn.execute(sql, (event_id, room_id)) + + # TODO flush the cache then commit + + return txn.rowcount + + return await self.db_pool.runInteraction( + "delete_forward_extremities_for_room", delete_forward_extremities_for_room_txn, + ) + async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: + """Get list of forward extremities for a room.""" def get_forward_extremities_for_room_txn(txn): sql = ( "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " @@ -16,5 +63,5 @@ class EventForwardExtremitiesStore(SQLBaseStore): return [{"event_id": row[0], "state_group": row[1]} for row in rows] return await self.db_pool.runInteraction( - "get_forward_extremities_for_room", get_forward_extremities_for_room_txn + "get_forward_extremities_for_room", get_forward_extremities_for_room_txn, ) -- cgit 1.5.1 From 90ad4d443a109ad95741b499d914006578acceef Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Sat, 9 Jan 2021 21:57:41 +0200 Subject: Implement clearing cache after deleting forward extremities Also run linter. Signed-off-by: Jason Robinson --- synapse/rest/admin/rooms.py | 21 +++++------ .../databases/main/events_forward_extremities.py | 41 +++++++++++++++++----- 2 files changed, 42 insertions(+), 20 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 76f8603821..6757a8100b 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -524,18 +524,20 @@ class ForwardExtremitiesRestServlet(RestServlet): async def resolve_room_id(self, room_identifier: str) -> str: """Resolve to a room ID, if necessary.""" if RoomID.is_valid(room_identifier): - room_id = room_identifier + resolved_room_id = room_identifier elif RoomAlias.is_valid(room_identifier): room_alias = RoomAlias.from_string(room_identifier) room_id, _ = await self.room_member_handler.lookup_room_alias(room_alias) - room_id = room_id.to_string() + resolved_room_id = room_id.to_string() else: raise SynapseError( 400, "%s was not legal room ID or room alias" % (room_identifier,) ) - if not room_id: - raise SynapseError(400, "Unknown room ID or room alias %s" % room_identifier) - return room_id + if not resolved_room_id: + raise SynapseError( + 400, "Unknown room ID or room alias %s" % room_identifier + ) + return resolved_room_id async def on_DELETE(self, request, room_identifier): requester = await self.auth.get_user_by_req(request) @@ -544,9 +546,7 @@ class ForwardExtremitiesRestServlet(RestServlet): room_id = await self.resolve_room_id(room_identifier) deleted_count = await self.store.delete_forward_extremities_for_room(room_id) - return 200, { - "deleted": deleted_count, - } + return 200, {"deleted": deleted_count} async def on_GET(self, request, room_identifier): requester = await self.auth.get_user_by_req(request) @@ -555,7 +555,4 @@ class ForwardExtremitiesRestServlet(RestServlet): room_id = await self.resolve_room_id(room_identifier) extremities = await self.store.get_forward_extremities_for_room(room_id) - return 200, { - "count": len(extremities), - "results": extremities, - } + return 200, {"count": len(extremities), "results": extremities} diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index cc684a94fe..6b8da52fee 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -1,15 +1,22 @@ -from typing import List, Dict +import logging +from typing import Dict, List +from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore +logger = logging.getLogger(__name__) -class EventForwardExtremitiesStore(SQLBaseStore): +class EventForwardExtremitiesStore(SQLBaseStore): async def delete_forward_extremities_for_room(self, room_id: str) -> int: """Delete any extra forward extremities for a room. + Invalidates the "get_latest_event_ids_in_room" cache if any forward + extremities were deleted. + Returns count deleted. """ + def delete_forward_extremities_for_room_txn(txn): # First we need to get the event_id to not delete sql = ( @@ -27,9 +34,17 @@ class EventForwardExtremitiesStore(SQLBaseStore): ) txn.execute(sql, (room_id,)) rows = txn.fetchall() - - # TODO: should this raise a SynapseError instead of better to blow? - event_id = rows[0][0] + try: + event_id = rows[0][0] + logger.debug( + "Found event_id %s as the forward extremity to keep for room %s", + event_id, + room_id, + ) + except KeyError: + msg = f"No forward extremity event found for room {room_id}" + logger.warning(msg) + raise SynapseError(400, msg) # Now delete the extra forward extremities sql = ( @@ -39,19 +54,29 @@ class EventForwardExtremitiesStore(SQLBaseStore): " AND room_id = ?" ) - # TODO we should not commit yet txn.execute(sql, (event_id, room_id)) + logger.info( + "Deleted %s extra forward extremities for room %s", + txn.rowcount, + room_id, + ) - # TODO flush the cache then commit + if txn.rowcount > 0: + # Invalidate the cache + self._invalidate_cache_and_stream( + txn, self.get_latest_event_ids_in_room, (room_id,), + ) return txn.rowcount return await self.db_pool.runInteraction( - "delete_forward_extremities_for_room", delete_forward_extremities_for_room_txn, + "delete_forward_extremities_for_room", + delete_forward_extremities_for_room_txn, ) async def get_forward_extremities_for_room(self, room_id: str) -> List[Dict]: """Get list of forward extremities for a room.""" + def get_forward_extremities_for_room_txn(txn): sql = ( "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " -- cgit 1.5.1 From b52fb703f788b3de3afa1142852354b876f6bacf Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 09:47:03 +0200 Subject: Don't try to use f-strings Signed-off-by: Jason Robinson --- synapse/storage/databases/main/events_forward_extremities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 6b8da52fee..83f751cf5b 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -42,7 +42,7 @@ class EventForwardExtremitiesStore(SQLBaseStore): room_id, ) except KeyError: - msg = f"No forward extremity event found for room {room_id}" + msg = "No forward extremity event found for room %s" % room_id logger.warning(msg) raise SynapseError(400, msg) -- cgit 1.5.1 From da16d06301aec83d144812d727c24192eb890c93 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 23:43:58 +0200 Subject: Address pr feedback * docs updates * prettify SQL * add missing copyright * cursor_to_dict * update touched files copyright years Signed-off-by: Jason Robinson --- docs/admin_api/rooms.md | 12 +--- synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/rooms.py | 2 +- synapse/storage/databases/main/__init__.py | 2 +- .../databases/main/events_forward_extremities.py | 64 +++++++++++++--------- 5 files changed, 46 insertions(+), 36 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 1d59bb5c4b..86daa393a7 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -516,11 +516,8 @@ optionally be specified, e.g.: # Forward Extremities Admin API Enables querying and deleting forward extremities from rooms. When a lot of forward -extremities accumulate in a room, performance can become degraded. - -When using this API endpoint to delete any extra forward extremities for a room, -the server does not need to be restarted as the relevant caches will be cleared -in the API call. +extremities accumulate in a room, performance can become degraded. For details, see +[#1760](https://github.com/matrix-org/synapse/issues/1760). ## Check for forward extremities @@ -537,7 +534,7 @@ A response as follows will be returned: "count": 1, "results": [ { - "event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdfgh", + "event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdefgh", "state_group": 439 } ] @@ -561,6 +558,3 @@ that were deleted. "deleted": 1 } ``` - -The cache `get_latest_event_ids_in_room` will be invalidated, if any forward extremities -were deleted. diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index b80b036090..319ad7bf7f 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd +# Copyright 2020, 2021 The Matrix.org Foundation C.I.C. + # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 6757a8100b..da1499cab3 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 93b25af057..b936f54f1e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 83f751cf5b..e6c2d6e122 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import logging from typing import Dict, List @@ -19,19 +34,19 @@ class EventForwardExtremitiesStore(SQLBaseStore): def delete_forward_extremities_for_room_txn(txn): # First we need to get the event_id to not delete - sql = ( - "SELECT " - " last_value(event_id) OVER w AS event_id" - " FROM event_forward_extremities" - " NATURAL JOIN events" - " where room_id = ?" - " WINDOW w AS (" - " PARTITION BY room_id" - " ORDER BY stream_ordering" - " range between unbounded preceding and unbounded following" - " )" - " ORDER BY stream_ordering" - ) + sql = """ + SELECT + last_value(event_id) OVER w AS event_id + FROM event_forward_extremities + NATURAL JOIN events + WHERE room_id = ? + WINDOW w AS ( + PARTITION BY room_id + ORDER BY stream_ordering + range between unbounded preceding and unbounded following + ) + ORDER BY stream_ordering + """ txn.execute(sql, (room_id,)) rows = txn.fetchall() try: @@ -47,12 +62,10 @@ class EventForwardExtremitiesStore(SQLBaseStore): raise SynapseError(400, msg) # Now delete the extra forward extremities - sql = ( - "DELETE FROM event_forward_extremities " - "WHERE" - " event_id != ?" - " AND room_id = ?" - ) + sql = """ + DELETE FROM event_forward_extremities + WHERE event_id != ? AND room_id = ? + """ txn.execute(sql, (event_id, room_id)) logger.info( @@ -78,14 +91,15 @@ class EventForwardExtremitiesStore(SQLBaseStore): """Get list of forward extremities for a room.""" def get_forward_extremities_for_room_txn(txn): - sql = ( - "SELECT event_id, state_group FROM event_forward_extremities NATURAL JOIN event_to_state_groups " - "WHERE room_id = ?" - ) + sql = """ + SELECT event_id, state_group + FROM event_forward_extremities + NATURAL JOIN event_to_state_groups + WHERE room_id = ? + """ txn.execute(sql, (room_id,)) - rows = txn.fetchall() - return [{"event_id": row[0], "state_group": row[1]} for row in rows] + return self.db_pool.cursor_to_dict(txn) return await self.db_pool.runInteraction( "get_forward_extremities_for_room", get_forward_extremities_for_room_txn, -- cgit 1.5.1 From 49c619a9a2203da61f496fe6e3ae308be87efda8 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 23:49:58 +0200 Subject: Simplify delete_forward_extremities_for_room_txn SQL As per feedback. Signed-off-by: Jason Robinson --- .../storage/databases/main/events_forward_extremities.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index e6c2d6e122..c7ec08469d 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -35,17 +35,11 @@ class EventForwardExtremitiesStore(SQLBaseStore): def delete_forward_extremities_for_room_txn(txn): # First we need to get the event_id to not delete sql = """ - SELECT - last_value(event_id) OVER w AS event_id - FROM event_forward_extremities - NATURAL JOIN events + SELECT event_id FROM event_forward_extremities + INNER JOIN events USING (room_id, event_id) WHERE room_id = ? - WINDOW w AS ( - PARTITION BY room_id - ORDER BY stream_ordering - range between unbounded preceding and unbounded following - ) - ORDER BY stream_ordering + ORDER BY stream_ordering DESC + LIMIT 1 """ txn.execute(sql, (room_id,)) rows = txn.fetchall() -- cgit 1.5.1 From c177faf5a92d8ef02dd59e16dcf6ca9fb5ca6a33 Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Mon, 11 Jan 2021 23:55:44 +0200 Subject: Remove trailing whitespace to appease the linter Signed-off-by: Jason Robinson --- synapse/storage/databases/main/events_forward_extremities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index c7ec08469d..5fea974050 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -86,8 +86,8 @@ class EventForwardExtremitiesStore(SQLBaseStore): def get_forward_extremities_for_room_txn(txn): sql = """ - SELECT event_id, state_group - FROM event_forward_extremities + SELECT event_id, state_group + FROM event_forward_extremities NATURAL JOIN event_to_state_groups WHERE room_id = ? """ -- cgit 1.5.1 From eee6fcf5fa857af95c46185fc11d540343c77d2d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jan 2021 10:22:53 +0000 Subject: Use execute_batch instead of executemany in places (#9181) `execute_batch` does fewer round trips in postgres than `executemany`, but does not give a correct `txn.rowcount` result after. --- changelog.d/9181.misc | 1 + synapse/storage/database.py | 5 ++--- synapse/storage/databases/main/events.py | 18 +++++++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) create mode 100644 changelog.d/9181.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9181.misc b/changelog.d/9181.misc new file mode 100644 index 0000000000..7820d09cd0 --- /dev/null +++ b/changelog.d/9181.misc @@ -0,0 +1 @@ +Speed up batch insertion when using PostgreSQL. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a19d65ad23..c7220bc778 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -267,8 +267,7 @@ class LoggingTransaction: self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args) else: - for val in args: - self.execute(sql, val) + self.executemany(sql, args) def execute_values(self, sql: str, *args: Any) -> List[Tuple]: """Corresponds to psycopg2.extras.execute_values. Only available when @@ -888,7 +887,7 @@ class DatabasePool: ", ".join("?" for _ in keys[0]), ) - txn.executemany(sql, vals) + txn.execute_batch(sql, vals) async def simple_upsert( self, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 3216b3f3c8..5db7d7aaa8 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -876,7 +876,7 @@ class PersistEventsStore: WHERE room_id = ? AND type = ? AND state_key = ? ) """ - txn.executemany( + txn.execute_batch( sql, ( ( @@ -895,7 +895,7 @@ class PersistEventsStore: ) # Now we actually update the current_state_events table - txn.executemany( + txn.execute_batch( "DELETE FROM current_state_events" " WHERE room_id = ? AND type = ? AND state_key = ?", ( @@ -907,7 +907,7 @@ class PersistEventsStore: # We include the membership in the current state table, hence we do # a lookup when we insert. This assumes that all events have already # been inserted into room_memberships. - txn.executemany( + txn.execute_batch( """INSERT INTO current_state_events (room_id, type, state_key, event_id, membership) VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) @@ -927,7 +927,7 @@ class PersistEventsStore: # we have no record of the fact the user *was* a member of the # room but got, say, state reset out of it. if to_delete or to_insert: - txn.executemany( + txn.execute_batch( "DELETE FROM local_current_membership" " WHERE room_id = ? AND user_id = ?", ( @@ -938,7 +938,7 @@ class PersistEventsStore: ) if to_insert: - txn.executemany( + txn.execute_batch( """INSERT INTO local_current_membership (room_id, user_id, event_id, membership) VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?)) @@ -1738,7 +1738,7 @@ class PersistEventsStore: """ if events_and_contexts: - txn.executemany( + txn.execute_batch( sql, ( ( @@ -1767,7 +1767,7 @@ class PersistEventsStore: # Now we delete the staging area for *all* events that were being # persisted. - txn.executemany( + txn.execute_batch( "DELETE FROM event_push_actions_staging WHERE event_id = ?", ((event.event_id,) for event, _ in all_events_and_contexts), ) @@ -1886,7 +1886,7 @@ class PersistEventsStore: " )" ) - txn.executemany( + txn.execute_batch( query, [ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) @@ -1900,7 +1900,7 @@ class PersistEventsStore: "DELETE FROM event_backward_extremities" " WHERE event_id = ? AND room_id = ?" ) - txn.executemany( + txn.execute_batch( query, [ (ev.event_id, ev.room_id) -- cgit 1.5.1 From 7a43482f1916622967f5a4b389f93944dd5deb07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jan 2021 14:44:12 +0000 Subject: Use execute_batch in more places (#9188) * Use execute_batch in more places * Newsfile --- changelog.d/9188.misc | 1 + synapse/storage/database.py | 6 ++++++ synapse/storage/databases/main/devices.py | 4 ++-- synapse/storage/databases/main/event_push_actions.py | 4 ++-- synapse/storage/databases/main/events_bg_updates.py | 12 ++---------- synapse/storage/databases/main/media_repository.py | 10 +++++----- synapse/storage/databases/main/purge_events.py | 2 +- synapse/storage/databases/main/registration.py | 2 +- synapse/storage/databases/main/roommember.py | 6 +----- .../storage/databases/main/schema/delta/59/01ignored_user.py | 2 +- synapse/storage/databases/main/search.py | 4 ++-- synapse/storage/databases/state/store.py | 4 ++-- 12 files changed, 26 insertions(+), 31 deletions(-) create mode 100644 changelog.d/9188.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9188.misc b/changelog.d/9188.misc new file mode 100644 index 0000000000..7820d09cd0 --- /dev/null +++ b/changelog.d/9188.misc @@ -0,0 +1 @@ +Speed up batch insertion when using PostgreSQL. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index c7220bc778..d2ba4bd2fc 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -262,6 +262,12 @@ class LoggingTransaction: return self.txn.description def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None: + """Similar to `executemany`, except `txn.rowcount` will not be correct + afterwards. + + More efficient than `executemany` on PostgreSQL + """ + if isinstance(self.database_engine, PostgresEngine): from psycopg2.extras import execute_batch # type: ignore diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9097677648..659d8f245f 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -897,7 +897,7 @@ class DeviceWorkerStore(SQLBaseStore): DELETE FROM device_lists_outbound_last_success WHERE destination = ? AND user_id = ? """ - txn.executemany(sql, ((row[0], row[1]) for row in rows)) + txn.execute_batch(sql, ((row[0], row[1]) for row in rows)) logger.info("Pruned %d device list outbound pokes", count) @@ -1343,7 +1343,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): # Delete older entries in the table, as we really only care about # when the latest change happened. - txn.executemany( + txn.execute_batch( """ DELETE FROM device_lists_stream WHERE user_id = ? AND device_id = ? AND stream_id < ? diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 1b657191a9..438383abe1 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -487,7 +487,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): VALUES (?, ?, ?, ?, ?, ?) """ - txn.executemany( + txn.execute_batch( sql, ( _gen_entry(user_id, actions) @@ -803,7 +803,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ], ) - txn.executemany( + txn.execute_batch( """ UPDATE event_push_summary SET notif_count = ?, unread_count = ?, stream_ordering = ? diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index e46e44ba54..5ca4fa6817 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -139,8 +139,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 - def reindex_txn(txn): sql = ( "SELECT stream_ordering, event_id, json FROM events" @@ -178,9 +176,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?" - for index in range(0, len(update_rows), INSERT_CLUMP_SIZE): - clump = update_rows[index : index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) + txn.execute_batch(sql, update_rows) progress = { "target_min_stream_id_inclusive": target_min_stream_id, @@ -210,8 +206,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) - INSERT_CLUMP_SIZE = 1000 - def reindex_search_txn(txn): sql = ( "SELECT stream_ordering, event_id FROM events" @@ -256,9 +250,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?" - for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE): - clump = rows_to_update[index : index + INSERT_CLUMP_SIZE] - txn.executemany(sql, clump) + txn.execute_batch(sql, rows_to_update) progress = { "target_min_stream_id_inclusive": target_min_stream_id, diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 283c8a5e22..e017177655 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -417,7 +417,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " WHERE media_origin = ? AND media_id = ?" ) - txn.executemany( + txn.execute_batch( sql, ( (time_ms, media_origin, media_id) @@ -430,7 +430,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " WHERE media_id = ?" ) - txn.executemany(sql, ((time_ms, media_id) for media_id in local_media)) + txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media)) return await self.db_pool.runInteraction( "update_cached_last_access_time", update_cache_txn @@ -557,7 +557,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?" def _delete_url_cache_txn(txn): - txn.executemany(sql, [(media_id,) for media_id in media_ids]) + txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) return await self.db_pool.runInteraction( "delete_url_cache", _delete_url_cache_txn @@ -586,11 +586,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): def _delete_url_cache_media_txn(txn): sql = "DELETE FROM local_media_repository WHERE media_id = ?" - txn.executemany(sql, [(media_id,) for media_id in media_ids]) + txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?" - txn.executemany(sql, [(media_id,) for media_id in media_ids]) + txn.execute_batch(sql, [(media_id,) for media_id in media_ids]) return await self.db_pool.runInteraction( "delete_url_cache_media", _delete_url_cache_media_txn diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 5d668aadb2..ecfc9f20b1 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -172,7 +172,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore): ) # Update backward extremeties - txn.executemany( + txn.execute_batch( "INSERT INTO event_backward_extremities (room_id, event_id)" " VALUES (?, ?)", [(room_id, event_id) for event_id, in new_backwards_extrems], diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 8d05288ed4..585b4049d6 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1104,7 +1104,7 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): FROM user_threepids """ - txn.executemany(sql, [(id_server,) for id_server in id_servers]) + txn.execute_batch(sql, [(id_server,) for id_server in id_servers]) if id_servers: await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index dcdaf09682..92382bed28 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -873,8 +873,6 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): "max_stream_id_exclusive", self._stream_order_on_start + 1 ) - INSERT_CLUMP_SIZE = 1000 - def add_membership_profile_txn(txn): sql = """ SELECT stream_ordering, event_id, events.room_id, event_json.json @@ -915,9 +913,7 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): UPDATE room_memberships SET display_name = ?, avatar_url = ? WHERE event_id = ? AND room_id = ? """ - for index in range(0, len(to_update), INSERT_CLUMP_SIZE): - clump = to_update[index : index + INSERT_CLUMP_SIZE] - txn.executemany(to_update_sql, clump) + txn.execute_batch(to_update_sql, to_update) progress = { "target_min_stream_id_inclusive": target_min_stream_id, diff --git a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py index f35c70b699..9e8f35c1d2 100644 --- a/synapse/storage/databases/main/schema/delta/59/01ignored_user.py +++ b/synapse/storage/databases/main/schema/delta/59/01ignored_user.py @@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs # { "ignored_users": "@someone:example.org": {} } ignored_users = content.get("ignored_users", {}) if isinstance(ignored_users, dict) and ignored_users: - cur.executemany(insert_sql, [(user_id, u) for u in ignored_users]) + cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users]) # Add indexes after inserting data for efficiency. logger.info("Adding constraints to ignored_users table") diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index e34fce6281..871af64b11 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -63,7 +63,7 @@ class SearchWorkerStore(SQLBaseStore): for entry in entries ) - txn.executemany(sql, args) + txn.execute_batch(sql, args) elif isinstance(self.database_engine, Sqlite3Engine): sql = ( @@ -75,7 +75,7 @@ class SearchWorkerStore(SQLBaseStore): for entry in entries ) - txn.executemany(sql, args) + txn.execute_batch(sql, args) else: # This should be unreachable. raise Exception("Unrecognized database engine") diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 0e31cc811a..89cdc84a9c 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -565,11 +565,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ) logger.info("[purge] removing redundant state groups") - txn.executemany( + txn.execute_batch( "DELETE FROM state_groups_state WHERE state_group = ?", ((sg,) for sg in state_groups_to_delete), ) - txn.executemany( + txn.execute_batch( "DELETE FROM state_groups WHERE id = ?", ((sg,) for sg in state_groups_to_delete), ) -- cgit 1.5.1 From 758ed5f1bc16f4b73d73d94129761a8680fd71c5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jan 2021 17:00:12 +0000 Subject: Speed up chain cover calculation (#9176) --- changelog.d/9176.misc | 1 + synapse/storage/databases/main/events.py | 199 ++++++++++++++++++++++--------- synapse/storage/util/sequence.py | 16 +++ 3 files changed, 161 insertions(+), 55 deletions(-) create mode 100644 changelog.d/9176.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9176.misc b/changelog.d/9176.misc new file mode 100644 index 0000000000..9c41d7b0f9 --- /dev/null +++ b/changelog.d/9176.misc @@ -0,0 +1 @@ +Speed up chain cover calculation when persisting a batch of state events at once. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5db7d7aaa8..ccda9f1caa 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -473,8 +473,9 @@ class PersistEventsStore: txn, self.db_pool, event_to_room_id, event_to_types, event_to_auth_chain, ) - @staticmethod + @classmethod def _add_chain_cover_index( + cls, txn, db_pool: DatabasePool, event_to_room_id: Dict[str, str], @@ -614,60 +615,17 @@ class PersistEventsStore: if not events_to_calc_chain_id_for: return - # We now calculate the chain IDs/sequence numbers for the events. We - # do this by looking at the chain ID and sequence number of any auth - # event with the same type/state_key and incrementing the sequence - # number by one. If there was no match or the chain ID/sequence - # number is already taken we generate a new chain. - # - # We need to do this in a topologically sorted order as we want to - # generate chain IDs/sequence numbers of an event's auth events - # before the event itself. - chains_tuples_allocated = set() # type: Set[Tuple[int, int]] - new_chain_tuples = {} # type: Dict[str, Tuple[int, int]] - for event_id in sorted_topologically( - events_to_calc_chain_id_for, event_to_auth_chain - ): - existing_chain_id = None - for auth_id in event_to_auth_chain.get(event_id, []): - if event_to_types.get(event_id) == event_to_types.get(auth_id): - existing_chain_id = chain_map[auth_id] - break - - new_chain_tuple = None - if existing_chain_id: - # We found a chain ID/sequence number candidate, check its - # not already taken. - proposed_new_id = existing_chain_id[0] - proposed_new_seq = existing_chain_id[1] + 1 - if (proposed_new_id, proposed_new_seq) not in chains_tuples_allocated: - already_allocated = db_pool.simple_select_one_onecol_txn( - txn, - table="event_auth_chains", - keyvalues={ - "chain_id": proposed_new_id, - "sequence_number": proposed_new_seq, - }, - retcol="event_id", - allow_none=True, - ) - if already_allocated: - # Mark it as already allocated so we don't need to hit - # the DB again. - chains_tuples_allocated.add((proposed_new_id, proposed_new_seq)) - else: - new_chain_tuple = ( - proposed_new_id, - proposed_new_seq, - ) - - if not new_chain_tuple: - new_chain_tuple = (db_pool.event_chain_id_gen.get_next_id_txn(txn), 1) - - chains_tuples_allocated.add(new_chain_tuple) - - chain_map[event_id] = new_chain_tuple - new_chain_tuples[event_id] = new_chain_tuple + # Allocate chain ID/sequence numbers to each new event. + new_chain_tuples = cls._allocate_chain_ids( + txn, + db_pool, + event_to_room_id, + event_to_types, + event_to_auth_chain, + events_to_calc_chain_id_for, + chain_map, + ) + chain_map.update(new_chain_tuples) db_pool.simple_insert_many_txn( txn, @@ -794,6 +752,137 @@ class PersistEventsStore: ], ) + @staticmethod + def _allocate_chain_ids( + txn, + db_pool: DatabasePool, + event_to_room_id: Dict[str, str], + event_to_types: Dict[str, Tuple[str, str]], + event_to_auth_chain: Dict[str, List[str]], + events_to_calc_chain_id_for: Set[str], + chain_map: Dict[str, Tuple[int, int]], + ) -> Dict[str, Tuple[int, int]]: + """Allocates, but does not persist, chain ID/sequence numbers for the + events in `events_to_calc_chain_id_for`. (c.f. _add_chain_cover_index + for info on args) + """ + + # We now calculate the chain IDs/sequence numbers for the events. We do + # this by looking at the chain ID and sequence number of any auth event + # with the same type/state_key and incrementing the sequence number by + # one. If there was no match or the chain ID/sequence number is already + # taken we generate a new chain. + # + # We try to reduce the number of times that we hit the database by + # batching up calls, to make this more efficient when persisting large + # numbers of state events (e.g. during joins). + # + # We do this by: + # 1. Calculating for each event which auth event will be used to + # inherit the chain ID, i.e. converting the auth chain graph to a + # tree that we can allocate chains on. We also keep track of which + # existing chain IDs have been referenced. + # 2. Fetching the max allocated sequence number for each referenced + # existing chain ID, generating a map from chain ID to the max + # allocated sequence number. + # 3. Iterating over the tree and allocating a chain ID/seq no. to the + # new event, by incrementing the sequence number from the + # referenced event's chain ID/seq no. and checking that the + # incremented sequence number hasn't already been allocated (by + # looking in the map generated in the previous step). We generate a + # new chain if the sequence number has already been allocated. + # + + existing_chains = set() # type: Set[int] + tree = [] # type: List[Tuple[str, Optional[str]]] + + # We need to do this in a topologically sorted order as we want to + # generate chain IDs/sequence numbers of an event's auth events before + # the event itself. + for event_id in sorted_topologically( + events_to_calc_chain_id_for, event_to_auth_chain + ): + for auth_id in event_to_auth_chain.get(event_id, []): + if event_to_types.get(event_id) == event_to_types.get(auth_id): + existing_chain_id = chain_map.get(auth_id) + if existing_chain_id: + existing_chains.add(existing_chain_id[0]) + + tree.append((event_id, auth_id)) + break + else: + tree.append((event_id, None)) + + # Fetch the current max sequence number for each existing referenced chain. + sql = """ + SELECT chain_id, MAX(sequence_number) FROM event_auth_chains + WHERE %s + GROUP BY chain_id + """ + clause, args = make_in_list_sql_clause( + db_pool.engine, "chain_id", existing_chains + ) + txn.execute(sql % (clause,), args) + + chain_to_max_seq_no = {row[0]: row[1] for row in txn} # type: Dict[Any, int] + + # Allocate the new events chain ID/sequence numbers. + # + # To reduce the number of calls to the database we don't allocate a + # chain ID number in the loop, instead we use a temporary `object()` for + # each new chain ID. Once we've done the loop we generate the necessary + # number of new chain IDs in one call, replacing all temporary + # objects with real allocated chain IDs. + + unallocated_chain_ids = set() # type: Set[object] + new_chain_tuples = {} # type: Dict[str, Tuple[Any, int]] + for event_id, auth_event_id in tree: + # If we reference an auth_event_id we fetch the allocated chain ID, + # either from the existing `chain_map` or the newly generated + # `new_chain_tuples` map. + existing_chain_id = None + if auth_event_id: + existing_chain_id = new_chain_tuples.get(auth_event_id) + if not existing_chain_id: + existing_chain_id = chain_map[auth_event_id] + + new_chain_tuple = None # type: Optional[Tuple[Any, int]] + if existing_chain_id: + # We found a chain ID/sequence number candidate, check its + # not already taken. + proposed_new_id = existing_chain_id[0] + proposed_new_seq = existing_chain_id[1] + 1 + + if chain_to_max_seq_no[proposed_new_id] < proposed_new_seq: + new_chain_tuple = ( + proposed_new_id, + proposed_new_seq, + ) + + # If we need to start a new chain we allocate a temporary chain ID. + if not new_chain_tuple: + new_chain_tuple = (object(), 1) + unallocated_chain_ids.add(new_chain_tuple[0]) + + new_chain_tuples[event_id] = new_chain_tuple + chain_to_max_seq_no[new_chain_tuple[0]] = new_chain_tuple[1] + + # Generate new chain IDs for all unallocated chain IDs. + newly_allocated_chain_ids = db_pool.event_chain_id_gen.get_next_mult_txn( + txn, len(unallocated_chain_ids) + ) + + # Map from potentially temporary chain ID to real chain ID + chain_id_to_allocated_map = dict( + zip(unallocated_chain_ids, newly_allocated_chain_ids) + ) # type: Dict[Any, int] + chain_id_to_allocated_map.update((c, c) for c in existing_chains) + + return { + event_id: (chain_id_to_allocated_map[chain_id], seq) + for event_id, (chain_id, seq) in new_chain_tuples.items() + } + def _persist_transaction_ids_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index c780ade077..0ec4dc2918 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -69,6 +69,11 @@ class SequenceGenerator(metaclass=abc.ABCMeta): """Gets the next ID in the sequence""" ... + @abc.abstractmethod + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + """Get the next `n` IDs in the sequence""" + ... + @abc.abstractmethod def check_consistency( self, @@ -219,6 +224,17 @@ class LocalSequenceGenerator(SequenceGenerator): self._current_max_id += 1 return self._current_max_id + def get_next_mult_txn(self, txn: Cursor, n: int) -> List[int]: + with self._lock: + if self._current_max_id is None: + assert self._callback is not None + self._current_max_id = self._callback(txn) + self._callback = None + + first_id = self._current_max_id + 1 + self._current_max_id += n + return [first_id + i for i in range(n)] + def check_consistency( self, db_conn: Connection, -- cgit 1.5.1 From 930ba009719788ebc2004c6ef89329dae1b9689b Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Sat, 23 Jan 2021 21:34:32 +0200 Subject: Add depth and received_ts to forward_extremities admin API response Also add a warning on the admin API documentation. Signed-off-by: Jason Robinson --- docs/admin_api/rooms.md | 8 +++++++- synapse/storage/databases/main/events_forward_extremities.py | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 86daa393a7..f34cec1ff7 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -535,7 +535,9 @@ A response as follows will be returned: "results": [ { "event_id": "$M5SP266vsnxctfwFgFLNceaCo3ujhRtg_NiiHabcdefgh", - "state_group": 439 + "state_group": 439, + "depth": 123, + "received_ts": 1611263016761 } ] } @@ -543,6 +545,10 @@ A response as follows will be returned: ## Deleting forward extremities +**WARNING**: Please ensure you know what you're doing and have read +the related issue [#1760](https://github.com/matrix-org/synapse/issues/1760). +Under no situations should this API be executed as an automated maintenance task! + If a room has lots of forward extremities, the extra can be deleted as follows: diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 5fea974050..84aaa919fb 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -86,9 +86,10 @@ class EventForwardExtremitiesStore(SQLBaseStore): def get_forward_extremities_for_room_txn(txn): sql = """ - SELECT event_id, state_group + SELECT event_id, state_group, depth, received_ts FROM event_forward_extremities NATURAL JOIN event_to_state_groups + NATURAL JOIN events WHERE room_id = ? """ -- cgit 1.5.1 From 4a55d267eef1388690e6781b580910e341358f95 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 25 Jan 2021 14:49:39 -0500 Subject: Add an admin API for shadow-banning users. (#9209) This expands the current shadow-banning feature to be usable via the admin API and adds documentation for it. A shadow-banned users receives successful responses to their client-server API requests, but the events are not propagated into rooms. Shadow-banning a user should be used as a tool of last resort and may lead to confusing or broken behaviour for the client. --- changelog.d/9209.feature | 1 + docs/admin_api/user_admin_api.rst | 30 ++++++++++++ stubs/txredisapi.pyi | 1 - synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/users.py | 36 +++++++++++++++ synapse/storage/databases/main/registration.py | 29 ++++++++++++ tests/rest/admin/test_user.py | 64 ++++++++++++++++++++++++++ tests/rest/client/test_shadow_banned.py | 8 +--- 8 files changed, 164 insertions(+), 7 deletions(-) create mode 100644 changelog.d/9209.feature (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9209.feature b/changelog.d/9209.feature new file mode 100644 index 0000000000..ec926e8eb4 --- /dev/null +++ b/changelog.d/9209.feature @@ -0,0 +1 @@ +Add an admin API endpoint for shadow-banning users. diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index b3d413cf57..1eb674939e 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -760,3 +760,33 @@ The following fields are returned in the JSON response body: - ``total`` - integer - Number of pushers. See also `Client-Server API Spec `_ + +Shadow-banning users +==================== + +Shadow-banning is a useful tool for moderating malicious or egregiously abusive users. +A shadow-banned users receives successful responses to their client-server API requests, +but the events are not propagated into rooms. This can be an effective tool as it +(hopefully) takes longer for the user to realise they are being moderated before +pivoting to another account. + +Shadow-banning a user should be used as a tool of last resort and may lead to confusing +or broken behaviour for the client. A shadow-banned user will not receive any +notification and it is generally more appropriate to ban or kick abusive users. +A shadow-banned user will be unable to contact anyone on the server. + +The API is:: + + POST /_synapse/admin/v1/users//shadow_ban + +To use it, you will need to authenticate by providing an ``access_token`` for a +server admin: see `README.rst `_. + +An empty JSON dict is returned. + +**Parameters** + +The following parameters should be set in the URL: + +- ``user_id`` - The fully qualified MXID: for example, ``@user:server.com``. The user must + be local. diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index bfac6840e6..726454ba31 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -15,7 +15,6 @@ """Contains *incomplete* type hints for txredisapi. """ - from typing import List, Optional, Type, Union class RedisProtocol: diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 6f7dc06503..f04740cd38 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -51,6 +51,7 @@ from synapse.rest.admin.users import ( PushersRestServlet, ResetPasswordRestServlet, SearchUsersRestServlet, + ShadowBanRestServlet, UserAdminServlet, UserMediaRestServlet, UserMembershipRestServlet, @@ -230,6 +231,7 @@ def register_servlets(hs, http_server): EventReportsRestServlet(hs).register(http_server) PushersRestServlet(hs).register(http_server) MakeRoomAdminRestServlet(hs).register(http_server) + ShadowBanRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource(hs, http_server): diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 86198bab30..68c3c64a0d 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -890,3 +890,39 @@ class UserTokenRestServlet(RestServlet): ) return 200, {"access_token": token} + + +class ShadowBanRestServlet(RestServlet): + """An admin API for shadow-banning a user. + + A shadow-banned users receives successful responses to their client-server + API requests, but the events are not propagated into rooms. + + Shadow-banning a user should be used as a tool of last resort and may lead + to confusing or broken behaviour for the client. + + Example: + + POST /_synapse/admin/v1/users/@test:example.com/shadow_ban + {} + + 200 OK + {} + """ + + PATTERNS = admin_patterns("/users/(?P[^/]*)/shadow_ban") + + def __init__(self, hs: "HomeServer"): + self.hs = hs + self.store = hs.get_datastore() + self.auth = hs.get_auth() + + async def on_POST(self, request, user_id): + await assert_requester_is_admin(self.auth, request) + + if not self.hs.is_mine_id(user_id): + raise SynapseError(400, "Only local users can be shadow-banned") + + await self.store.set_shadow_banned(UserID.from_string(user_id), True) + + return 200, {} diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 585b4049d6..0618b4387a 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -360,6 +360,35 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn) + async def set_shadow_banned(self, user: UserID, shadow_banned: bool) -> None: + """Sets whether a user shadow-banned. + + Args: + user: user ID of the user to test + shadow_banned: true iff the user is to be shadow-banned, false otherwise. + """ + + def set_shadow_banned_txn(txn): + self.db_pool.simple_update_one_txn( + txn, + table="users", + keyvalues={"name": user.to_string()}, + updatevalues={"shadow_banned": shadow_banned}, + ) + # In order for this to apply immediately, clear the cache for this user. + tokens = self.db_pool.simple_select_onecol_txn( + txn, + table="access_tokens", + keyvalues={"user_id": user.to_string()}, + retcol="token", + ) + for token in tokens: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (token,) + ) + + await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) + def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]: sql = """ SELECT users.name as user_id, diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index e48f8c1d7b..ee05ee60bc 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -2380,3 +2380,67 @@ class WhoisRestTestCase(unittest.HomeserverTestCase): self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual(self.other_user, channel.json_body["user_id"]) self.assertIn("devices", channel.json_body) + + +class ShadowBanRestTestCase(unittest.HomeserverTestCase): + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.other_user = self.register_user("user", "pass") + + self.url = "/_synapse/admin/v1/users/%s/shadow_ban" % urllib.parse.quote( + self.other_user + ) + + def test_no_auth(self): + """ + Try to get information of an user without authentication. + """ + channel = self.make_request("POST", self.url) + self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) + + def test_requester_is_not_admin(self): + """ + If the user is not a server admin, an error is returned. + """ + other_user_token = self.login("user", "pass") + + channel = self.make_request("POST", self.url, access_token=other_user_token) + self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"]) + + def test_user_is_not_local(self): + """ + Tests that shadow-banning for a user that is not a local returns a 400 + """ + url = "/_synapse/admin/v1/whois/@unknown_person:unknown_domain" + + channel = self.make_request("POST", url, access_token=self.admin_user_tok) + self.assertEqual(400, channel.code, msg=channel.json_body) + + def test_success(self): + """ + Shadow-banning should succeed for an admin. + """ + # The user starts off as not shadow-banned. + other_user_token = self.login("user", "pass") + result = self.get_success(self.store.get_user_by_access_token(other_user_token)) + self.assertFalse(result.shadow_banned) + + channel = self.make_request("POST", self.url, access_token=self.admin_user_tok) + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual({}, channel.json_body) + + # Ensure the user is shadow-banned (and the cache was cleared). + result = self.get_success(self.store.get_user_by_access_token(other_user_token)) + self.assertTrue(result.shadow_banned) diff --git a/tests/rest/client/test_shadow_banned.py b/tests/rest/client/test_shadow_banned.py index e689c3fbea..0ebdf1415b 100644 --- a/tests/rest/client/test_shadow_banned.py +++ b/tests/rest/client/test_shadow_banned.py @@ -18,6 +18,7 @@ import synapse.rest.admin from synapse.api.constants import EventTypes from synapse.rest.client.v1 import directory, login, profile, room from synapse.rest.client.v2_alpha import room_upgrade_rest_servlet +from synapse.types import UserID from tests import unittest @@ -31,12 +32,7 @@ class _ShadowBannedBase(unittest.HomeserverTestCase): self.store = self.hs.get_datastore() self.get_success( - self.store.db_pool.simple_update( - table="users", - keyvalues={"name": self.banned_user_id}, - updatevalues={"shadow_banned": True}, - desc="shadow_ban", - ) + self.store.set_shadow_banned(UserID.from_string(self.banned_user_id), True) ) self.other_user_id = self.register_user("otheruser", "pass") -- cgit 1.5.1 From 5b857b77f7de62bb9be0aa88a3fffcf7cb11efe6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 25 Jan 2021 14:52:30 -0500 Subject: Don't error if deleting a non-existent pusher. (#9121) --- changelog.d/9121.bugfix | 1 + synapse/storage/databases/main/pusher.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changelog.d/9121.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9121.bugfix b/changelog.d/9121.bugfix new file mode 100644 index 0000000000..a566878ec0 --- /dev/null +++ b/changelog.d/9121.bugfix @@ -0,0 +1 @@ +Fix spurious errors in logs when deleting a non-existant pusher. diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index bc7621b8d6..2687ef3e43 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -344,7 +344,9 @@ class PusherStore(PusherWorkerStore): txn, self.get_if_user_has_pusher, (user_id,) ) - self.db_pool.simple_delete_one_txn( + # It is expected that there is exactly one pusher to delete, but + # if it isn't there (or there are multiple) delete them all. + self.db_pool.simple_delete_txn( txn, "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, -- cgit 1.5.1 From e20f18a76680bc16fd8299a61dd81dc07f1a3ffd Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Tue, 26 Jan 2021 10:13:35 +0200 Subject: Make natural join inner join Co-authored-by: Erik Johnston --- synapse/storage/databases/main/events_forward_extremities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 84aaa919fb..68b64838bb 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -88,8 +88,8 @@ class EventForwardExtremitiesStore(SQLBaseStore): sql = """ SELECT event_id, state_group, depth, received_ts FROM event_forward_extremities - NATURAL JOIN event_to_state_groups - NATURAL JOIN events + INNER JOIN event_to_state_groups USING (event_id) + INNER JOIN events INNER JOIN USING (event_id) WHERE room_id = ? """ -- cgit 1.5.1 From 4936fc59fcf23582c940cb1cbf4286039b3504de Mon Sep 17 00:00:00 2001 From: Jason Robinson Date: Tue, 26 Jan 2021 10:21:02 +0200 Subject: Fix get forward extremities query Signed-off-by: Jason Robinson --- synapse/storage/databases/main/events_forward_extremities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 68b64838bb..0ac1da9c35 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -89,7 +89,7 @@ class EventForwardExtremitiesStore(SQLBaseStore): SELECT event_id, state_group, depth, received_ts FROM event_forward_extremities INNER JOIN event_to_state_groups USING (event_id) - INNER JOIN events INNER JOIN USING (event_id) + INNER JOIN events USING (room_id, event_id) WHERE room_id = ? """ -- cgit 1.5.1