From bd4919fb72b2a75f1c0a7f0c78bd619fd2ae30e8 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Thu, 24 Jun 2021 15:33:20 +0200 Subject: MSC2918 Refresh tokens implementation (#9450) This implements refresh tokens, as defined by MSC2918 This MSC has been implemented client side in Hydrogen Web: vector-im/hydrogen-web#235 The basics of the MSC works: requesting refresh tokens on login, having the access tokens expire, and using the refresh token to get a new one. Signed-off-by: Quentin Gliech --- synapse/storage/databases/main/registration.py | 207 ++++++++++++++++++++++++- 1 file changed, 203 insertions(+), 4 deletions(-) (limited to 'synapse/storage/databases') diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index e5c5cf8ff0..e31c5864ac 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -53,6 +53,9 @@ class TokenLookupResult: valid_until_ms: The timestamp the token expires, if any. token_owner: The "owner" of the token. This is either the same as the user, or a server admin who is logged in as the user. + token_used: True if this token was used at least once in a request. + This field can be out of date since `get_user_by_access_token` is + cached. """ user_id = attr.ib(type=str) @@ -62,6 +65,7 @@ class TokenLookupResult: device_id = attr.ib(type=Optional[str], default=None) valid_until_ms = attr.ib(type=Optional[int], default=None) token_owner = attr.ib(type=str) + token_used = attr.ib(type=bool, default=False) # Make the token owner default to the user ID, which is the common case. @token_owner.default @@ -69,6 +73,29 @@ class TokenLookupResult: return self.user_id +@attr.s(frozen=True, slots=True) +class RefreshTokenLookupResult: + """Result of looking up a refresh token.""" + + user_id = attr.ib(type=str) + """The user this token belongs to.""" + + device_id = attr.ib(type=str) + """The device associated with this refresh token.""" + + token_id = attr.ib(type=int) + """The ID of this refresh token.""" + + next_token_id = attr.ib(type=Optional[int]) + """The ID of the refresh token which replaced this one.""" + + has_next_refresh_token_been_refreshed = attr.ib(type=bool) + """True if the next refresh token was used for another refresh.""" + + has_next_access_token_been_used = attr.ib(type=bool) + """True if the next access token was already used at least once.""" + + class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -441,7 +468,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): access_tokens.id as token_id, access_tokens.device_id, access_tokens.valid_until_ms, - access_tokens.user_id as token_owner + access_tokens.user_id as token_owner, + access_tokens.used as token_used FROM users INNER JOIN access_tokens on users.name = COALESCE(puppets_user_id, access_tokens.user_id) WHERE token = ? @@ -449,8 +477,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, (token,)) rows = self.db_pool.cursor_to_dict(txn) + if rows: - return TokenLookupResult(**rows[0]) + row = rows[0] + + # This field is nullable, ensure it comes out as a boolean + if row["token_used"] is None: + row["token_used"] = False + + return TokenLookupResult(**row) return None @@ -1072,6 +1107,111 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): desc="update_access_token_last_validated", ) + @cached() + async def mark_access_token_as_used(self, token_id: int) -> None: + """ + Mark the access token as used, which invalidates the refresh token used + to obtain it. + + Because get_user_by_access_token is cached, this function might be + called multiple times for the same token, effectively doing unnecessary + SQL updates. Because updating the `used` field only goes one way (from + False to True) it is safe to cache this function as well to avoid this + issue. + + Args: + token_id: The ID of the access token to update. + Raises: + StoreError if there was a problem updating this. + """ + await self.db_pool.simple_update_one( + "access_tokens", + {"id": token_id}, + {"used": True}, + desc="mark_access_token_as_used", + ) + + async def lookup_refresh_token( + self, token: str + ) -> Optional[RefreshTokenLookupResult]: + """Lookup a refresh token with hints about its validity.""" + + def _lookup_refresh_token_txn(txn) -> Optional[RefreshTokenLookupResult]: + txn.execute( + """ + SELECT + rt.id token_id, + rt.user_id, + rt.device_id, + rt.next_token_id, + (nrt.next_token_id IS NOT NULL) has_next_refresh_token_been_refreshed, + at.used has_next_access_token_been_used + FROM refresh_tokens rt + LEFT JOIN refresh_tokens nrt ON rt.next_token_id = nrt.id + LEFT JOIN access_tokens at ON at.refresh_token_id = nrt.id + WHERE rt.token = ? + """, + (token,), + ) + row = txn.fetchone() + + if row is None: + return None + + return RefreshTokenLookupResult( + token_id=row[0], + user_id=row[1], + device_id=row[2], + next_token_id=row[3], + has_next_refresh_token_been_refreshed=row[4], + # This column is nullable, ensure it's a boolean + has_next_access_token_been_used=(row[5] or False), + ) + + return await self.db_pool.runInteraction( + "lookup_refresh_token", _lookup_refresh_token_txn + ) + + async def replace_refresh_token(self, token_id: int, next_token_id: int) -> None: + """ + Set the successor of a refresh token, removing the existing successor + if any. + + Args: + token_id: ID of the refresh token to update. + next_token_id: ID of its successor. + """ + + def _replace_refresh_token_txn(txn) -> None: + # First check if there was an existing refresh token + old_next_token_id = self.db_pool.simple_select_one_onecol_txn( + txn, + "refresh_tokens", + {"id": token_id}, + "next_token_id", + allow_none=True, + ) + + self.db_pool.simple_update_one_txn( + txn, + "refresh_tokens", + {"id": token_id}, + {"next_token_id": next_token_id}, + ) + + # Delete the old "next" token if it exists. This should cascade and + # delete the associated access_token + if old_next_token_id is not None: + self.db_pool.simple_delete_one_txn( + txn, + "refresh_tokens", + {"id": old_next_token_id}, + ) + + await self.db_pool.runInteraction( + "replace_refresh_token", _replace_refresh_token_txn + ) + class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): def __init__( @@ -1263,6 +1403,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") + self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") async def add_access_token_to_user( self, @@ -1271,14 +1412,18 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): device_id: Optional[str], valid_until_ms: Optional[int], puppets_user_id: Optional[str] = None, + refresh_token_id: Optional[int] = None, ) -> int: """Adds an access token for the given user. Args: user_id: The user ID. token: The new access token to add. - device_id: ID of the device to associate with the access token + device_id: ID of the device to associate with the access token. valid_until_ms: when the token is valid until. None for no expiry. + puppets_user_id + refresh_token_id: ID of the refresh token generated alongside this + access token. Raises: StoreError if there was a problem adding this. Returns: @@ -1297,12 +1442,47 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): "valid_until_ms": valid_until_ms, "puppets_user_id": puppets_user_id, "last_validated": now, + "refresh_token_id": refresh_token_id, + "used": False, }, desc="add_access_token_to_user", ) return next_id + async def add_refresh_token_to_user( + self, + user_id: str, + token: str, + device_id: Optional[str], + ) -> int: + """Adds a refresh token for the given user. + + Args: + user_id: The user ID. + token: The new access token to add. + device_id: ID of the device to associate with the refresh token. + Raises: + StoreError if there was a problem adding this. + Returns: + The token ID + """ + next_id = self._refresh_tokens_id_gen.get_next() + + await self.db_pool.simple_insert( + "refresh_tokens", + { + "id": next_id, + "user_id": user_id, + "device_id": device_id, + "token": token, + "next_token_id": None, + }, + desc="add_refresh_token_to_user", + ) + + return next_id + def _set_device_for_access_token_txn(self, txn, token: str, device_id: str) -> str: old_device_id = self.db_pool.simple_select_one_onecol_txn( txn, "access_tokens", {"token": token}, "device_id" @@ -1545,7 +1725,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): device_id: Optional[str] = None, ) -> List[Tuple[str, int, Optional[str]]]: """ - Invalidate access tokens belonging to a user + Invalidate access and refresh tokens belonging to a user Args: user_id: ID of user the tokens belong to @@ -1565,7 +1745,13 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): items = keyvalues.items() where_clause = " AND ".join(k + " = ?" for k, _ in items) values = [v for _, v in items] # type: List[Union[str, int]] + # Conveniently, refresh_tokens and access_tokens both use the user_id and device_id fields. Only caveat + # is the `except_token_id` param that is tricky to get right, so for now we're just using the same where + # clause and values before we handle that. This seems to be only used in the "set password" handler. + refresh_where_clause = where_clause + refresh_values = values.copy() if except_token_id: + # TODO: support that for refresh tokens where_clause += " AND id != ?" values.append(except_token_id) @@ -1583,6 +1769,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values) + txn.execute( + "DELETE FROM refresh_tokens WHERE %s" % refresh_where_clause, + refresh_values, + ) + return tokens_and_devices return await self.db_pool.runInteraction("user_delete_access_tokens", f) @@ -1599,6 +1790,14 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): await self.db_pool.runInteraction("delete_access_token", f) + async def delete_refresh_token(self, refresh_token: str) -> None: + def f(txn): + self.db_pool.simple_delete_one_txn( + txn, table="refresh_tokens", keyvalues={"token": refresh_token} + ) + + await self.db_pool.runInteraction("delete_refresh_token", f) + async def add_user_pending_deactivation(self, user_id: str) -> None: """ Adds a user to the table of users who need to be parted from all the rooms they're -- cgit 1.5.1 From 60efc51a2bbc31f18a71ad1338afc430bfa65597 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 29 Jun 2021 11:25:34 +0100 Subject: Migrate stream_ordering to a bigint (#10264) * Move background update names out to a separate class `EventsBackgroundUpdatesStore` gets inherited and we don't really want to further pollute the namespace. * Migrate stream_ordering to a bigint * changelog --- changelog.d/10264.bugfix | 1 + .../storage/databases/main/events_bg_updates.py | 136 ++++++++++++++++++--- synapse/storage/schema/__init__.py | 2 +- .../60/01recreate_stream_ordering.sql.postgres | 40 ++++++ 4 files changed, 163 insertions(+), 16 deletions(-) create mode 100644 changelog.d/10264.bugfix create mode 100644 synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres (limited to 'synapse/storage/databases') diff --git a/changelog.d/10264.bugfix b/changelog.d/10264.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10264.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index cbe4be1437..39aaee743c 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -29,6 +29,25 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) +_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( + # there should be no leftover rows without a stream_ordering2, but just in case... + "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", + # finally, we can drop the rule and switch the columns + "DROP RULE populate_stream_ordering2 ON events", + "ALTER TABLE events DROP COLUMN stream_ordering", + "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", +) + + +class _BackgroundUpdates: + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" + POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" + INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" + + @attr.s(slots=True, frozen=True) class _CalculateChainCover: """Return value for _calculate_chain_cover_txn.""" @@ -48,19 +67,15 @@ class _CalculateChainCover: class EventsBackgroundUpdatesStore(SQLBaseStore): - - EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" - EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" - DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" - def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_update_handler( - self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts + _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, + self._background_reindex_origin_server_ts, ) self.db_pool.updates.register_background_update_handler( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, + _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, self._background_reindex_fields_sender, ) @@ -85,7 +100,8 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) self.db_pool.updates.register_background_update_handler( - self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update + _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES, + self._cleanup_extremities_bg_update, ) self.db_pool.updates.register_background_update_handler( @@ -139,6 +155,24 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + # bg updates for replacing stream_ordering with a BIGINT + # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_STREAM_ORDERING2, + self._background_populate_stream_ordering2, + ) + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2, + index_name="events_stream_ordering", + table="events", + columns=["stream_ordering2"], + unique=True, + ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, + self._background_replace_stream_ordering_column, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -190,18 +224,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): } self.db_pool.updates._background_update_progress_txn( - txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress + txn, _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress ) return len(rows) result = await self.db_pool.runInteraction( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn + _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn ) if not result: await self.db_pool.updates._end_background_update( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME + _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME ) return result @@ -264,18 +298,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): } self.db_pool.updates._background_update_progress_txn( - txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress + txn, _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, progress ) return len(rows_to_update) result = await self.db_pool.runInteraction( - self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn + _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn ) if not result: await self.db_pool.updates._end_background_update( - self.EVENT_ORIGIN_SERVER_TS_NAME + _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME ) return result @@ -454,7 +488,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if not num_handled: await self.db_pool.updates._end_background_update( - self.DELETE_SOFT_FAILED_EXTREMITIES + _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES ) def _drop_table_txn(txn): @@ -1009,3 +1043,75 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): await self.db_pool.updates._end_background_update("purged_chain_cover") return result + + async def _background_populate_stream_ordering2( + self, progress: JsonDict, batch_size: int + ) -> int: + """Populate events.stream_ordering2, then replace stream_ordering + + This is to deal with the fact that stream_ordering was initially created as a + 32-bit integer field. + """ + batch_size = max(batch_size, 1) + + def process(txn: Cursor) -> int: + # if this is the first pass, find the minimum stream ordering + last_stream = progress.get("last_stream") + if last_stream is None: + txn.execute( + """ + SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1 + """ + ) + rows = txn.fetchall() + if not rows: + return 0 + last_stream = rows[0][0] - 1 + + txn.execute( + """ + UPDATE events SET stream_ordering2=stream_ordering + WHERE stream_ordering > ? AND stream_ordering <= ? + """, + (last_stream, last_stream + batch_size), + ) + row_count = txn.rowcount + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_STREAM_ORDERING2, + {"last_stream": last_stream + batch_size}, + ) + return row_count + + result = await self.db_pool.runInteraction( + "_background_populate_stream_ordering2", process + ) + + if result != 0: + return result + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_STREAM_ORDERING2 + ) + return 0 + + async def _background_replace_stream_ordering_column( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" + + def process(txn: Cursor) -> None: + for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + logger.info("completing stream_ordering migration: %s", sql) + txn.execute(sql) + + await self.db_pool.runInteraction( + "_background_replace_stream_ordering_column", process + ) + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN + ) + + return 0 diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index d36ba1d773..0a53b73ccc 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 59 +SCHEMA_VERSION = 60 """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres new file mode 100644 index 0000000000..88c9f8bd0d --- /dev/null +++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres @@ -0,0 +1,40 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- This migration handles the process of changing the type of `stream_ordering` to +-- a BIGINT. +-- +-- Note that this is only a problem on postgres as sqlite only has one "integer" type +-- which can cope with values up to 2^63. + +-- First add a new column to contain the bigger stream_ordering +ALTER TABLE events ADD COLUMN stream_ordering2 BIGINT; + +-- Create a rule which will populate it for new rows. +CREATE OR REPLACE RULE "populate_stream_ordering2" AS + ON INSERT TO events + DO UPDATE events SET stream_ordering2=NEW.stream_ordering WHERE stream_ordering=NEW.stream_ordering; + +-- Start a bg process to populate it for old events +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6001, 'populate_stream_ordering2', '{}'); + +-- ... and another to build an index on it +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'); + +-- ... and another to do the switcheroo +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES + (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2'); -- cgit 1.5.1 From 7647b0337fb5d936c88c5949fa92c07bf2137ad0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 29 Jun 2021 12:43:36 +0100 Subject: Fix `populate_stream_ordering2` background job (#10267) It was possible for us not to find any rows in a batch, and hence conclude that we had finished. Let's not do that. --- changelog.d/10267.bugfix | 1 + .../storage/databases/main/events_bg_updates.py | 28 ++++++++++------------ 2 files changed, 13 insertions(+), 16 deletions(-) create mode 100644 changelog.d/10267.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10267.bugfix b/changelog.d/10267.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10267.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 39aaee743c..da3a7df27b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1055,32 +1055,28 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): batch_size = max(batch_size, 1) def process(txn: Cursor) -> int: - # if this is the first pass, find the minimum stream ordering - last_stream = progress.get("last_stream") - if last_stream is None: - txn.execute( - """ - SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1 - """ - ) - rows = txn.fetchall() - if not rows: - return 0 - last_stream = rows[0][0] - 1 - + last_stream = progress.get("last_stream", -(1 << 31)) txn.execute( """ UPDATE events SET stream_ordering2=stream_ordering - WHERE stream_ordering > ? AND stream_ordering <= ? + WHERE stream_ordering IN ( + SELECT stream_ordering FROM events WHERE stream_ordering > ? + ORDER BY stream_ordering LIMIT ? + ) + RETURNING stream_ordering; """, - (last_stream, last_stream + batch_size), + (last_stream, batch_size), ) row_count = txn.rowcount + if row_count == 0: + return 0 + last_stream = max(row[0] for row in txn) + logger.info("populated stream_ordering2 up to %i", last_stream) self.db_pool.updates._background_update_progress_txn( txn, _BackgroundUpdates.POPULATE_STREAM_ORDERING2, - {"last_stream": last_stream + batch_size}, + {"last_stream": last_stream}, ) return row_count -- cgit 1.5.1 From 329ef5c715d81b538e8b071de046c698a82eae10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 30 Jun 2021 12:07:16 +0100 Subject: Fix the inbound PDU metric (#10279) This broke in #10272 --- changelog.d/10279.bugfix | 1 + synapse/federation/federation_server.py | 37 ++++++------ synapse/storage/databases/main/event_federation.py | 66 ++++++++++++++++++---- synapse/storage/engines/_base.py | 6 ++ synapse/storage/engines/postgres.py | 5 ++ synapse/storage/engines/sqlite.py | 5 ++ 6 files changed, 93 insertions(+), 27 deletions(-) create mode 100644 changelog.d/10279.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10279.bugfix b/changelog.d/10279.bugfix new file mode 100644 index 0000000000..ac8b64ead9 --- /dev/null +++ b/changelog.d/10279.bugfix @@ -0,0 +1 @@ +Fix the prometheus `synapse_federation_server_pdu_process_time` metric. Broke in v1.37.1. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 742d29291e..e93b7577fe 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -369,22 +369,21 @@ class FederationServer(FederationBase): async def process_pdu(pdu: EventBase) -> JsonDict: event_id = pdu.event_id - with pdu_process_time.time(): - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - return {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - return {"error": str(e)} - except Exception as e: - f = failure.Failure() - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore - ) - return {"error": str(e)} + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + return {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + return {"error": str(e)} + except Exception as e: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + return {"error": str(e)} await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT @@ -932,9 +931,13 @@ class FederationServer(FederationBase): exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) - await self.store.remove_received_event_from_staging( + received_ts = await self.store.remove_received_event_from_staging( origin, event.event_id ) + if received_ts is not None: + pdu_process_time.observe( + (self._clock.time_msec() - received_ts) / 1000 + ) # We need to do this check outside the lock to avoid a race between # a new event being inserted by another instance and it attempting diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f23f8c6ecf..f2d27ee893 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1075,16 +1075,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas self, origin: str, event_id: str, - ) -> None: - """Remove the given event from the staging area""" - await self.db_pool.simple_delete( - table="federation_inbound_events_staging", - keyvalues={ - "origin": origin, - "event_id": event_id, - }, - desc="remove_received_event_from_staging", - ) + ) -> Optional[int]: + """Remove the given event from the staging area. + + Returns: + The received_ts of the row that was deleted, if any. + """ + if self.db_pool.engine.supports_returning: + + def _remove_received_event_from_staging_txn(txn): + sql = """ + DELETE FROM federation_inbound_events_staging + WHERE origin = ? AND event_id = ? + RETURNING received_ts + """ + + txn.execute(sql, (origin, event_id)) + return txn.fetchone() + + row = await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + db_autocommit=True, + ) + if row is None: + return None + + return row[0] + + else: + + def _remove_received_event_from_staging_txn(txn): + received_ts = self.db_pool.simple_select_one_onecol_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + retcol="received_ts", + allow_none=True, + ) + self.db_pool.simple_delete_txn( + txn, + table="federation_inbound_events_staging", + keyvalues={ + "origin": origin, + "event_id": event_id, + }, + ) + + return received_ts + + return await self.db_pool.runInteraction( + "remove_received_event_from_staging", + _remove_received_event_from_staging_txn, + ) async def get_next_staged_event_id_for_room( self, diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 1882bfd9cf..20cd63c330 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -49,6 +49,12 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta): """ ... + @property + @abc.abstractmethod + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + ... + @abc.abstractmethod def check_database( self, db_conn: ConnectionType, allow_outdated_version: bool = False diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 21411c5fea..30f948a0f7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -133,6 +133,11 @@ class PostgresEngine(BaseDatabaseEngine): """Do we support using `a = ANY(?)` and passing a list""" return True + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 5fe1b205e1..70d17d4f2c 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -60,6 +60,11 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): """Do we support using `a = ANY(?)` and passing a list""" return False + @property + def supports_returning(self) -> bool: + """Do we support the `RETURNING` clause in insert/update/delete?""" + return self.module.sqlite_version_info >= (3, 35, 0) + def check_database(self, db_conn, allow_outdated_version: bool = False): if not allow_outdated_version: version = self.module.sqlite_version_info -- cgit 1.5.1 From 859dc05b3692a3672c1a0db8deaaa9274b6aa6f5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 30 Jun 2021 15:01:24 +0100 Subject: Rebuild other indexes using `stream_ordering` (#10282) We need to rebuild *all* of the indexes that use the current `stream_ordering` column. --- changelog.d/10282.bugfix | 1 + .../storage/databases/main/events_bg_updates.py | 50 ++++++++++++++++++++-- .../60/01recreate_stream_ordering.sql.postgres | 11 +++-- 3 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 changelog.d/10282.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10282.bugfix b/changelog.d/10282.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10282.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index da3a7df27b..1c95c66648 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -29,13 +29,18 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( +_REPLACE_STREAM_ORDERING_SQL_COMMANDS = ( # there should be no leftover rows without a stream_ordering2, but just in case... "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", - # finally, we can drop the rule and switch the columns + # now we can drop the rule and switch the columns "DROP RULE populate_stream_ordering2 ON events", "ALTER TABLE events DROP COLUMN stream_ordering", "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", + # ... and finally, rename the indexes into place for consistency with sqlite + "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index", + "ALTER INDEX events_order_room2 RENAME TO events_order_room", + "ALTER INDEX events_room_stream2 RENAME TO events_room_stream", + "ALTER INDEX events_ts2 RENAME TO events_ts", ) @@ -45,6 +50,10 @@ class _BackgroundUpdates: DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" + INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" + INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" + INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" @@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + ################################################################################ + # bg updates for replacing stream_ordering with a BIGINT # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.POPULATE_STREAM_ORDERING2, self._background_populate_stream_ordering2, ) + # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2); self.db_pool.updates.register_background_index_update( _BackgroundUpdates.INDEX_STREAM_ORDERING2, index_name="events_stream_ordering", @@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): columns=["stream_ordering2"], unique=True, ) + # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false; + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL, + index_name="event_contains_url_index2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + where_clause="contains_url = true AND outlier = false", + ) + # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER, + index_name="events_order_room2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + ) + # CREATE INDEX events_room_stream ON events(room_id, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM, + index_name="events_room_stream2", + table="events", + columns=["room_id", "stream_ordering2"], + ) + # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS, + index_name="events_ts2", + table="events", + columns=["origin_server_ts", "stream_ordering2"], + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, self._background_replace_stream_ordering_column, ) + ################################################################################ + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" def process(txn: Cursor) -> None: - for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS: logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres index 88c9f8bd0d..b5fb763ddd 100644 --- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres +++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres @@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (6001, 'populate_stream_ordering2', '{}'); --- ... and another to build an index on it +-- ... and some more to build indexes on it. These aren't really interdependent +-- but the backround_updates manager can only handle a single dependency per update. INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'); + (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'), + (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'), + (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'), + (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'), + (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream'); -- ... and another to do the switcheroo INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2'); + (6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts'); -- cgit 1.5.1 From 76addadd7c807a3412e6a104db0fdc9b79888688 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Jul 2021 10:18:25 +0100 Subject: Add some metrics to staging area (#10284) --- changelog.d/10284.feature | 1 + synapse/storage/databases/main/event_federation.py | 39 ++++++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 changelog.d/10284.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/10284.feature b/changelog.d/10284.feature new file mode 100644 index 0000000000..379155e8cf --- /dev/null +++ b/changelog.d/10284.feature @@ -0,0 +1 @@ +Add metrics for new inbound federation staging area. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f2d27ee893..08d75b0d41 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -16,6 +16,8 @@ import logging from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple +from prometheus_client import Gauge + from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion @@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter +oldest_pdu_in_federation_staging = Gauge( + "synapse_federation_server_oldest_inbound_pdu_in_staging", + "The age in seconds since we received the oldest pdu in the federation staging area", +) + +number_pdus_in_federation_queue = Gauge( + "synapse_federation_server_number_inbound_pdu_in_staging", + "The total number of events in the inbound federation staging", +) + logger = logging.getLogger(__name__) @@ -54,6 +66,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas 500000, "_event_auth_cache", size_callback=len ) # type: LruCache[str, List[Tuple[str, int]]] + self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000) + async def get_auth_chain( self, room_id: str, event_ids: Collection[str], include_given: bool = False ) -> List[EventBase]: @@ -1193,6 +1207,31 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return origin, event + @wrap_as_background_process("_get_stats_for_federation_staging") + async def _get_stats_for_federation_staging(self): + """Update the prometheus metrics for the inbound federation staging area.""" + + def _get_stats_for_federation_staging_txn(txn): + txn.execute( + "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging" + ) + (count,) = txn.fetchone() + + txn.execute( + "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging" + ) + + (age,) = txn.fetchone() + + return count, age + + count, age = await self.db_pool.runInteraction( + "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn + ) + + number_pdus_in_federation_queue.set(count) + oldest_pdu_in_federation_staging.set(age) + class EventFederationStore(EventFederationWorkerStore): """Responsible for storing and serving up the various graphs associated -- cgit 1.5.1 From c65067d67307de7688fa39246426370421e56452 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Jul 2021 13:02:37 +0100 Subject: Handle old staged inbound events (#10303) We might have events in the staging area if the service was restarted while there were unhandled events in the staging area. Fixes #10295 --- changelog.d/10303.bugfix | 1 + synapse/federation/federation_server.py | 67 ++++++++++++++++++---- synapse/storage/databases/main/event_federation.py | 9 +++ 3 files changed, 67 insertions(+), 10 deletions(-) create mode 100644 changelog.d/10303.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10303.bugfix b/changelog.d/10303.bugfix new file mode 100644 index 0000000000..c0577c9f73 --- /dev/null +++ b/changelog.d/10303.bugfix @@ -0,0 +1 @@ +Ensure that inbound events from federation that were being processed when Synapse was restarted get promptly processed on start up. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b312d0b809..bf67d0f574 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -148,6 +148,41 @@ class FederationServer(FederationBase): self._room_prejoin_state_types = hs.config.api.room_prejoin_state + # Whether we have started handling old events in the staging area. + self._started_handling_of_staged_events = False + + @wrap_as_background_process("_handle_old_staged_events") + async def _handle_old_staged_events(self) -> None: + """Handle old staged events by fetching all rooms that have staged + events and start the processing of each of those rooms. + """ + + # Get all the rooms IDs with staged events. + room_ids = await self.store.get_all_rooms_with_staged_incoming_events() + + # We then shuffle them so that if there are multiple instances doing + # this work they're less likely to collide. + random.shuffle(room_ids) + + for room_id in room_ids: + room_version = await self.store.get_room_version(room_id) + + # Try and acquire the processing lock for the room, if we get it start a + # background process for handling the events in the room. + lock = await self.store.try_acquire_lock( + _INBOUND_EVENT_HANDLING_LOCK_NAME, room_id + ) + if lock: + logger.info("Handling old staged inbound events in %s", room_id) + self._process_incoming_pdus_in_room_inner( + room_id, + room_version, + lock, + ) + + # We pause a bit so that we don't start handling all rooms at once. + await self._clock.sleep(random.uniform(0, 0.1)) + async def on_backfill_request( self, origin: str, room_id: str, versions: List[str], limit: int ) -> Tuple[int, Dict[str, Any]]: @@ -166,6 +201,12 @@ class FederationServer(FederationBase): async def on_incoming_transaction( self, origin: str, transaction_data: JsonDict ) -> Tuple[int, Dict[str, Any]]: + # If we receive a transaction we should make sure that kick off handling + # any old events in the staging area. + if not self._started_handling_of_staged_events: + self._started_handling_of_staged_events = True + self._handle_old_staged_events() + # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() @@ -882,25 +923,28 @@ class FederationServer(FederationBase): room_id: str, room_version: RoomVersion, lock: Lock, - latest_origin: str, - latest_event: EventBase, + latest_origin: Optional[str] = None, + latest_event: Optional[EventBase] = None, ) -> None: """Process events in the staging area for the given room. The latest_origin and latest_event args are the latest origin and event - received. + received (or None to simply pull the next event from the database). """ # The common path is for the event we just received be the only event in # the room, so instead of pulling the event out of the DB and parsing # the event we just pull out the next event ID and check if that matches. - next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room( - room_id - ) - if next_origin == latest_origin and next_event_id == latest_event.event_id: - origin = latest_origin - event = latest_event - else: + if latest_event is not None and latest_origin is not None: + ( + next_origin, + next_event_id, + ) = await self.store.get_next_staged_event_id_for_room(room_id) + if next_origin != latest_origin or next_event_id != latest_event.event_id: + latest_origin = None + latest_event = None + + if latest_origin is None or latest_event is None: next = await self.store.get_next_staged_event_for_room( room_id, room_version ) @@ -908,6 +952,9 @@ class FederationServer(FederationBase): return origin, event = next + else: + origin = latest_origin + event = latest_event # We loop round until there are no more events in the room in the # staging area, or we fail to get the lock (which means another process diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 08d75b0d41..c4474df975 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1207,6 +1207,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return origin, event + async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: + """Get the room IDs of all events currently staged.""" + return await self.db_pool.simple_select_onecol( + table="federation_inbound_events_staging", + keyvalues={}, + retcol="DISTINCT room_id", + desc="get_all_rooms_with_staged_incoming_events", + ) + @wrap_as_background_process("_get_stats_for_federation_staging") async def _get_stats_for_federation_staging(self): """Update the prometheus metrics for the inbound federation staging area.""" -- cgit 1.5.1 From bcb0962a7250d6c1430ad42f5ed234ffea8f2468 Mon Sep 17 00:00:00 2001 From: Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> Date: Tue, 6 Jul 2021 14:08:53 +0200 Subject: Fix deactivate a user if he does not have a profile (#10252) --- changelog.d/10252.bugfix | 1 + synapse/storage/databases/main/profile.py | 8 +-- tests/rest/admin/test_user.py | 86 ++++++++++++++++++++++++------- 3 files changed, 73 insertions(+), 22 deletions(-) create mode 100644 changelog.d/10252.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10252.bugfix b/changelog.d/10252.bugfix new file mode 100644 index 0000000000..c8ddd14528 --- /dev/null +++ b/changelog.d/10252.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in v1.26.0 where only users who have set profile information could be deactivated with erasure enabled. diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index 9b4e95e134..ba7075caa5 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -73,20 +73,20 @@ class ProfileWorkerStore(SQLBaseStore): async def set_profile_displayname( self, user_localpart: str, new_displayname: Optional[str] ) -> None: - await self.db_pool.simple_update_one( + await self.db_pool.simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"displayname": new_displayname}, + values={"displayname": new_displayname}, desc="set_profile_displayname", ) async def set_profile_avatar_url( self, user_localpart: str, new_avatar_url: Optional[str] ) -> None: - await self.db_pool.simple_update_one( + await self.db_pool.simple_upsert( table="profiles", keyvalues={"user_id": user_localpart}, - updatevalues={"avatar_url": new_avatar_url}, + values={"avatar_url": new_avatar_url}, desc="set_profile_avatar_url", ) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index a34d051734..4fccce34fd 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -939,7 +939,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): """ channel = self.make_request("POST", self.url, b"{}") - self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(401, channel.code, msg=channel.json_body) self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"]) def test_requester_is_not_admin(self): @@ -950,7 +950,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): channel = self.make_request("POST", url, access_token=self.other_user_token) - self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual("You are not a server admin", channel.json_body["error"]) channel = self.make_request( @@ -960,7 +960,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): content=b"{}", ) - self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(403, channel.code, msg=channel.json_body) self.assertEqual("You are not a server admin", channel.json_body["error"]) def test_user_does_not_exist(self): @@ -990,7 +990,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(400, channel.code, msg=channel.json_body) self.assertEqual(Codes.BAD_JSON, channel.json_body["errcode"]) def test_user_is_not_local(self): @@ -1006,7 +1006,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): def test_deactivate_user_erase_true(self): """ - Test deactivating an user and set `erase` to `true` + Test deactivating a user and set `erase` to `true` """ # Get user @@ -1016,24 +1016,22 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(False, channel.json_body["deactivated"]) self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"]) self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"]) self.assertEqual("User1", channel.json_body["displayname"]) - # Deactivate user - body = json.dumps({"erase": True}) - + # Deactivate and erase user channel = self.make_request( "POST", self.url, access_token=self.admin_user_tok, - content=body.encode(encoding="utf_8"), + content={"erase": True}, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) # Get user channel = self.make_request( @@ -1042,7 +1040,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(True, channel.json_body["deactivated"]) self.assertEqual(0, len(channel.json_body["threepids"])) @@ -1053,7 +1051,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): def test_deactivate_user_erase_false(self): """ - Test deactivating an user and set `erase` to `false` + Test deactivating a user and set `erase` to `false` """ # Get user @@ -1063,7 +1061,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(False, channel.json_body["deactivated"]) self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"]) @@ -1071,13 +1069,11 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self.assertEqual("User1", channel.json_body["displayname"]) # Deactivate user - body = json.dumps({"erase": False}) - channel = self.make_request( "POST", self.url, access_token=self.admin_user_tok, - content=body.encode(encoding="utf_8"), + content={"erase": False}, ) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) @@ -1089,7 +1085,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): access_token=self.admin_user_tok, ) - self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual(200, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(True, channel.json_body["deactivated"]) self.assertEqual(0, len(channel.json_body["threepids"])) @@ -1098,6 +1094,60 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): self._is_erased("@user:test", False) + def test_deactivate_user_erase_true_no_profile(self): + """ + Test deactivating a user and set `erase` to `true` + if user has no profile information (stored in the database table `profiles`). + """ + + # Users normally have an entry in `profiles`, but occasionally they are created without one. + # To test deactivation for users without a profile, we delete the profile information for our user. + self.get_success( + self.store.db_pool.simple_delete_one( + table="profiles", keyvalues={"user_id": "user"} + ) + ) + + # Get user + channel = self.make_request( + "GET", + self.url_other_user, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(False, channel.json_body["deactivated"]) + self.assertEqual("foo@bar.com", channel.json_body["threepids"][0]["address"]) + self.assertIsNone(channel.json_body["avatar_url"]) + self.assertIsNone(channel.json_body["displayname"]) + + # Deactivate and erase user + channel = self.make_request( + "POST", + self.url, + access_token=self.admin_user_tok, + content={"erase": True}, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + + # Get user + channel = self.make_request( + "GET", + self.url_other_user, + access_token=self.admin_user_tok, + ) + + self.assertEqual(200, channel.code, msg=channel.json_body) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(True, channel.json_body["deactivated"]) + self.assertEqual(0, len(channel.json_body["threepids"])) + self.assertIsNone(channel.json_body["avatar_url"]) + self.assertIsNone(channel.json_body["displayname"]) + + self._is_erased("@user:test", True) + def _is_erased(self, user_id: str, expect: bool) -> None: """Assert that the user is erased or not""" d = self.store.is_user_erased(user_id) -- cgit 1.5.1 From 9ad84558951dd970dc2a362c923552141a42a5f3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 7 Jul 2021 11:56:17 +0200 Subject: ANALYZE new stream ordering column (#10326) Fixes #10325 --- changelog.d/10326.bugfix | 1 + synapse/storage/databases/main/events_bg_updates.py | 10 ++++++++++ 2 files changed, 11 insertions(+) create mode 100644 changelog.d/10326.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10326.bugfix b/changelog.d/10326.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10326.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 1c95c66648..29f33bac55 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1146,6 +1146,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) + # ANALYZE the new column to build stats on it, to encourage PostgreSQL to use the + # indexes on it. + # We need to pass execute a dummy function to handle the txn's result otherwise + # it tries to call fetchall() on it and fails because there's no result to fetch. + await self.db_pool.execute( + "background_analyze_new_stream_ordering_column", + lambda txn: None, + "ANALYZE events(stream_ordering2)", + ) + await self.db_pool.runInteraction( "_background_replace_stream_ordering_column", process ) -- cgit 1.5.1 From 1579fdd54a9aab6b65ddb8de4e83b61c3384e2fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jul 2021 10:16:54 +0100 Subject: Ensure we always drop the federation inbound lock (#10336) --- changelog.d/10336.bugfix | 1 + synapse/federation/federation_server.py | 1 + synapse/storage/databases/main/lock.py | 15 +++++++++++++-- 3 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 changelog.d/10336.bugfix (limited to 'synapse/storage/databases') diff --git a/changelog.d/10336.bugfix b/changelog.d/10336.bugfix new file mode 100644 index 0000000000..5e75ed3335 --- /dev/null +++ b/changelog.d/10336.bugfix @@ -0,0 +1 @@ +Fix bug where inbound federation in a room could be delayed due to not correctly dropping a lock. Introduced in v1.37.1. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bf67d0f574..ac0f2ccfb3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -949,6 +949,7 @@ class FederationServer(FederationBase): room_id, room_version ) if not next: + await lock.release() return origin, event = next diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index e76188328c..774861074c 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -310,14 +310,25 @@ class Lock: _excinst: Optional[BaseException], _exctb: Optional[TracebackType], ) -> bool: + await self.release() + + return False + + async def release(self) -> None: + """Release the lock. + + This is automatically called when using the lock as a context manager. + """ + + if self._dropped: + return + if self._looping_call.running: self._looping_call.stop() await self._store._drop_lock(self._lock_name, self._lock_key, self._token) self._dropped = True - return False - def __del__(self) -> None: if not self._dropped: # We should not be dropped without the lock being released (unless -- cgit 1.5.1