From 0a6e837aaadec0342f31f29c48800f5ce5e3531b Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 27 May 2020 16:26:59 +0100 Subject: Fix incorrect placeholder syntax in database prepartion code (#7575) We were using `logger` syntax which isn't supported by `Exception`s. --- synapse/storage/prepare_database.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 9afc145340..b95434f031 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -366,9 +366,8 @@ def _upgrade_existing_database( if duplicates: # We don't support using the same file name in the same delta version. raise PrepareDatabaseException( - "Found multiple delta files with the same name in v%d: %s", - v, - duplicates, + "Found multiple delta files with the same name in v%d: %s" + % (v, duplicates,) ) # We sort to ensure that we apply the delta files in a consistent -- cgit 1.5.1 From 2dc430d36ef793b38d6d79ec8db4ea60588df2ee Mon Sep 17 00:00:00 2001 From: Dagfinn Ilmari Mannsåker Date: Mon, 1 Jun 2020 10:53:06 +0100 Subject: Use upsert when inserting read receipts (#7607) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #7469 Signed-off-by: Dagfinn Ilmari Mannsåker --- changelog.d/7607.bugfix | 1 + synapse/storage/data_stores/main/receipts.py | 13 ++++--------- 2 files changed, 5 insertions(+), 9 deletions(-) create mode 100644 changelog.d/7607.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/7607.bugfix b/changelog.d/7607.bugfix new file mode 100644 index 0000000000..04b22e5ffe --- /dev/null +++ b/changelog.d/7607.bugfix @@ -0,0 +1 @@ +Fix duplicate key violation when persisting read markers. diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 0d932a0672..cebdcd409f 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -391,7 +391,7 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type), ) - self.db.simple_delete_txn( + self.db.simple_upsert_txn( txn, table="receipts_linearized", keyvalues={ @@ -399,19 +399,14 @@ class ReceiptsStore(ReceiptsWorkerStore): "receipt_type": receipt_type, "user_id": user_id, }, - ) - - self.db.simple_insert_txn( - txn, - table="receipts_linearized", values={ "stream_id": stream_id, - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, "event_id": event_id, "data": json.dumps(data), }, + # receipts_linearized has a unique constraint on + # (user_id, room_id, receipt_type), so no need to lock + lock=False, ) if receipt_type == "m.read" and stream_ordering is not None: -- cgit 1.5.1 From df8a3cef6b997f9eb2be7780293a64c873f7580f Mon Sep 17 00:00:00 2001 From: Dagfinn Ilmari Mannsåker Date: Mon, 1 Jun 2020 15:23:43 +0100 Subject: Improve performance of _get_state_groups_from_groups_txn (#7567) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The query keeps showing up in my slow query log. This changes the plan under the top-level Sort node from ``` WindowAgg (cost=280335.88..292963.15 rows=561212 width=80) (actual time=138.651..160.562 rows=27112 loops=1) -> Sort (cost=280335.88..281738.91 rows=561212 width=84) (actual time=138.597..140.622 rows=27112 loops=1) Sort Key: state_groups_state.type, state_groups_state.state_key, state_groups_state.state_group Sort Method: quicksort Memory: 4581kB -> Nested Loop (cost=2.83..226745.22 rows=561212 width=84) (actual time=21.548..47.657 rows=27112 loops=1) -> HashAggregate (cost=2.27..3.28 rows=101 width=8) (actual time=21.526..21.535 rows=20 loops=1) Group Key: state.state_group -> CTE Scan on state (cost=0.00..2.02 rows=101 width=8) (actual time=21.280..21.493 rows=20 loops=1) -> Index Scan using state_groups_state_type_idx on state_groups_state (cost=0.56..2189.40 rows=5557 width=84) (actual time=0.005..0.991 rows=1356 loops=20) Index Cond: (state_group = state.state_group) ``` to ``` Nested Loop (cost=2.83..226745.22 rows=561212 width=84) (actual time=24.194..52.834 rows=27112 loops=1) -> HashAggregate (cost=2.27..3.28 rows=101 width=8) (actual time=24.130..24.138 rows=20 loops=1) Group Key: state.state_group -> CTE Scan on state (cost=0.00..2.02 rows=101 width=8) (actual time=23.887..24.113 rows=20 loops=1) -> Index Scan using state_groups_state_type_idx on state_groups_state (cost=0.56..2189.40 rows=5557 width=84) (actual time=0.016..1.159 rows=1356 loops=20) Index Cond: (state_group = state.state_group) ``` This cuts the execution time from ~190ms to ~130ms, i.e. a reduction of ~30%. The full plans are visualised at https://explain.depesz.com/s/WpbT and https://explain.depesz.com/s/KlEk Signed-off-by: Dagfinn Ilmari Mannsåker --- changelog.d/7567.misc | 1 + synapse/storage/data_stores/state/bg_updates.py | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) create mode 100644 changelog.d/7567.misc (limited to 'synapse/storage') diff --git a/changelog.d/7567.misc b/changelog.d/7567.misc new file mode 100644 index 0000000000..b086d5d026 --- /dev/null +++ b/changelog.d/7567.misc @@ -0,0 +1 @@ +Improve query performance for fetching state from a PostgreSQL database. diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py index e8edaf9f7b..ff000bc9ec 100644 --- a/synapse/storage/data_stores/state/bg_updates.py +++ b/synapse/storage/data_stores/state/bg_updates.py @@ -109,20 +109,20 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): SELECT prev_state_group FROM state_group_edges e, state s WHERE s.state_group = e.state_group ) - SELECT DISTINCT type, state_key, last_value(event_id) OVER ( - PARTITION BY type, state_key ORDER BY state_group ASC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) AS event_id FROM state_groups_state + SELECT DISTINCT ON (type, state_key) + type, state_key, event_id + FROM state_groups_state WHERE state_group IN ( SELECT state_group FROM state - ) + ) %s + ORDER BY type, state_key, state_group DESC """ for group in groups: args = [group] args.extend(where_args) - txn.execute(sql + where_clause, args) + txn.execute(sql % (where_clause,), args) for row in txn: typ, state_key, event_id = row key = (typ, state_key) -- cgit 1.5.1 From e91abfd2919bcd42322099ecca8387a2dae9b06e Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Wed, 3 Jun 2020 17:15:57 +0100 Subject: async/await get_user_id_by_threepid (#7620) Based on #7619 async's `get_user_id_by_threepid` and its call stack. --- changelog.d/7620.misc | 1 + synapse/app/homeserver.py | 9 ++++---- .../data_stores/main/monthly_active_users.py | 25 +++++++++++----------- synapse/storage/data_stores/main/registration.py | 22 ++++++++++--------- 4 files changed, 29 insertions(+), 28 deletions(-) create mode 100644 changelog.d/7620.misc (limited to 'synapse/storage') diff --git a/changelog.d/7620.misc b/changelog.d/7620.misc new file mode 100644 index 0000000000..f8357ee3b6 --- /dev/null +++ b/changelog.d/7620.misc @@ -0,0 +1 @@ +Convert `get_user_id_by_threepid` to async/await. \ No newline at end of file diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 93a5ba2100..730a2c015b 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -617,18 +617,17 @@ def run(hs): clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) reap_monthly_active_users() - @defer.inlineCallbacks - def generate_monthly_active_users(): + async def generate_monthly_active_users(): current_mau_count = 0 current_mau_count_by_service = {} reserved_users = () store = hs.get_datastore() if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: - current_mau_count = yield store.get_monthly_active_count() + current_mau_count = await store.get_monthly_active_count() current_mau_count_by_service = ( - yield store.get_monthly_active_count_by_service() + await store.get_monthly_active_count_by_service() ) - reserved_users = yield store.get_registered_reserved_users() + reserved_users = await store.get_registered_reserved_users() current_mau_gauge.set(float(current_mau_count)) for app_service, count in current_mau_count_by_service.items(): diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py index 1310d39069..e459cf49a0 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from twisted.internet import defer @@ -77,20 +78,19 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): return self.db.runInteraction("count_users_by_service", _count_users_by_service) - @defer.inlineCallbacks - def get_registered_reserved_users(self): - """Of the reserved threepids defined in config, which are associated - with registered users? + async def get_registered_reserved_users(self) -> List[str]: + """Of the reserved threepids defined in config, retrieve those that are associated + with registered users Returns: - Defered[list]: Real reserved users + User IDs of actual users that are reserved """ users = [] for tp in self.hs.config.mau_limits_reserved_threepids[ : self.hs.config.max_mau_value ]: - user_id = yield self.hs.get_datastore().get_user_id_by_threepid( + user_id = await self.hs.get_datastore().get_user_id_by_threepid( tp["medium"], tp["address"] ) if user_id: @@ -171,13 +171,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): else: logger.warning("mau limit reserved threepid %s not found in db" % tp) - @defer.inlineCallbacks - def reap_monthly_active_users(self): + async def reap_monthly_active_users(self): """Cleans out monthly active user table to ensure that no stale entries exist. - - Returns: - Deferred[] """ def _reap_users(txn, reserved_users): @@ -249,8 +245,8 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): ) self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ()) - reserved_users = yield self.get_registered_reserved_users() - yield self.db.runInteraction( + reserved_users = await self.get_registered_reserved_users() + await self.db.runInteraction( "reap_monthly_active_users", _reap_users, reserved_users ) @@ -261,6 +257,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): Args: user_id (str): user to add/update + + Returns: + Deferred """ # Support user never to be included in MAU stats. Note I can't easily call this # from upsert_monthly_active_user_txn because then I need a _txn form of diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index efcdd2100b..9768981891 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -17,6 +17,7 @@ import logging import re +from typing import Optional from six import iterkeys @@ -342,7 +343,7 @@ class RegistrationWorkerStore(SQLBaseStore): ) return res - @cachedInlineCallbacks() + @cached() def is_support_user(self, user_id): """Determines if the user is of type UserTypes.SUPPORT @@ -352,10 +353,9 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: Deferred[bool]: True if user is of type UserTypes.SUPPORT """ - res = yield self.db.runInteraction( + return self.db.runInteraction( "is_support_user", self.is_support_user_txn, user_id ) - return res def is_real_user_txn(self, txn, user_id): res = self.db.simple_select_one_onecol_txn( @@ -516,18 +516,17 @@ class RegistrationWorkerStore(SQLBaseStore): ) ) - @defer.inlineCallbacks - def get_user_id_by_threepid(self, medium, address): + async def get_user_id_by_threepid(self, medium: str, address: str) -> Optional[str]: """Returns user id from threepid Args: - medium (str): threepid medium e.g. email - address (str): threepid address e.g. me@example.com + medium: threepid medium e.g. email + address: threepid address e.g. me@example.com Returns: - Deferred[str|None]: user id or None if no user id/threepid mapping exists + The user ID or None if no user id/threepid mapping exists """ - user_id = yield self.db.runInteraction( + user_id = await self.db.runInteraction( "get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address ) return user_id @@ -993,7 +992,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): Args: user_id (str): The desired user ID to register. - password_hash (str): Optional. The password hash for this user. + password_hash (str|None): Optional. The password hash for this user. was_guest (bool): Optional. Whether this is a guest account being upgraded to a non-guest account. make_guest (boolean): True if the the new user should be guest, @@ -1007,6 +1006,9 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. + + Returns: + Deferred """ return self.db.runInteraction( "register_user", -- cgit 1.5.1 From 664409b1694f102b3fd03d825ae82b31a4311560 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Jun 2020 16:28:57 +0100 Subject: Fix bug in account data replication stream. (#7656) * Ensure account data stream IDs are unique. The account data stream is shared between three tables, and the maximum allocated ID was tracked in a dedicated table. Updating the max ID happened outside the transaction that allocated the ID, leading to a race where if the server was restarted then the same ID could be allocated but the max ID failed to be updated, leading it to be reused. The ID generators have support for tracking across multiple tables, so we may as well use that instead of a dedicated table. * Fix bug in account data replication stream. If the same stream ID was used in both global and room account data then the getting updates for the replication stream would fail due to `heapq.merge(..)` trying to compare a `str` with a `None`. (This is because you'd have two rows like `(534, '!room')` and `(534, None)` from the room and global account data tables). Fix is just to order by stream ID, since we don't rely on the ordering beyond that. The bug where stream IDs can be reused should be fixed now, so this case shouldn't happen going forward. Fixes #7617 --- changelog.d/7656.bugfix | 1 + synapse/replication/slave/storage/account_data.py | 8 +++++++- synapse/replication/tcp/streams/_base.py | 10 ++++++++-- synapse/storage/data_stores/main/account_data.py | 16 +++++++++++++++- synapse/storage/data_stores/main/tags.py | 3 +++ synapse/storage/prepare_database.py | 1 + 6 files changed, 35 insertions(+), 4 deletions(-) create mode 100644 changelog.d/7656.bugfix (limited to 'synapse/storage') diff --git a/changelog.d/7656.bugfix b/changelog.d/7656.bugfix new file mode 100644 index 0000000000..1aeddb5fb9 --- /dev/null +++ b/changelog.d/7656.bugfix @@ -0,0 +1 @@ +Fix bug in account data replication stream. diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 2a4f5c7cfd..9db6c62bc7 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -24,7 +24,13 @@ from synapse.storage.database import Database class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore): def __init__(self, database: Database, db_conn, hs): self._account_data_id_gen = SlavedIdTracker( - db_conn, "account_data_max_stream_id", "stream_id" + db_conn, + "account_data", + "stream_id", + extra_tables=[ + ("room_account_data", "stream_id"), + ("room_tags_revisions", "stream_id"), + ], ) super(SlavedAccountDataStore, self).__init__(database, db_conn, hs) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index d42aaff055..4acefc8a96 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -600,8 +600,14 @@ class AccountDataStream(Stream): for stream_id, user_id, room_id, account_data_type in room_results ) - # we need to return a sorted list, so merge them together. - updates = list(heapq.merge(room_rows, global_rows)) + # We need to return a sorted list, so merge them together. + # + # Note: We order only by the stream ID to work around a bug where the + # same stream ID could appear in both `global_rows` and `room_rows`, + # leading to a comparison between the data tuples. The comparison could + # fail due to attempting to compare the `room_id` which results in a + # `TypeError` from comparing a `str` vs `None`. + updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0])) return updates, to_token, limited diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index f9eef1b78e..b58f04d00d 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -297,7 +297,13 @@ class AccountDataWorkerStore(SQLBaseStore): class AccountDataStore(AccountDataWorkerStore): def __init__(self, database: Database, db_conn, hs): self._account_data_id_gen = StreamIdGenerator( - db_conn, "account_data_max_stream_id", "stream_id" + db_conn, + "account_data_max_stream_id", + "stream_id", + extra_tables=[ + ("room_account_data", "stream_id"), + ("room_tags_revisions", "stream_id"), + ], ) super(AccountDataStore, self).__init__(database, db_conn, hs) @@ -387,6 +393,10 @@ class AccountDataStore(AccountDataWorkerStore): # doesn't sound any worse than the whole update getting lost, # which is what would happen if we combined the two into one # transaction. + # + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. yield self._update_max_stream_id(next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id) @@ -405,6 +415,10 @@ class AccountDataStore(AccountDataWorkerStore): next_id(int): The the revision to advance to. """ + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. + def _update(txn): update_max_id_sql = ( "UPDATE account_data_max_stream_id" diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 2aa1bafd48..4219018302 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -233,6 +233,9 @@ class TagsStore(TagsWorkerStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id ) + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. update_max_id_sql = ( "UPDATE account_data_max_stream_id" " SET stream_id = ?" diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b95434f031..9cc3b51fe6 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -33,6 +33,7 @@ logger = logging.getLogger(__name__) # schema files, so the users will be informed on server restarts. # XXX: If you're about to bump this to 59 (or higher) please create an update # that drops the unused `cache_invalidation_stream` table, as per #7436! +# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! SCHEMA_VERSION = 58 dir_path = os.path.abspath(os.path.dirname(__file__)) -- cgit 1.5.1