From 9f514dd0fbf539d97b6d9f8f590aaf668fb3280e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 9 Nov 2023 14:40:30 -0500 Subject: Use _invalidate_cache_and_stream_bulk in more places. (#16616) This takes advantage of the new bulk method in more places to invalidate caches for many keys at once (and then to stream that over replication). --- synapse/storage/databases/main/account_data.py | 24 ++++++++++++----- .../storage/databases/main/events_bg_updates.py | 15 +++++------ synapse/storage/databases/main/keys.py | 17 +++++++----- synapse/storage/databases/main/presence.py | 9 ++++--- synapse/storage/databases/main/purge_events.py | 31 ++++++++++++++-------- synapse/storage/databases/main/registration.py | 20 +++++++------- 6 files changed, 70 insertions(+), 46 deletions(-) (limited to 'synapse/storage/databases') diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index d7482a1f4e..07f9b65af3 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -747,8 +747,16 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) # Invalidate the cache for any ignored users which were added or removed. - for ignored_user_id in previously_ignored_users ^ currently_ignored_users: - self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in ( + previously_ignored_users ^ currently_ignored_users + ) + ], + ) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) async def remove_account_data_for_user( @@ -824,10 +832,14 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) # Invalidate the cache for ignored users which were removed. - for ignored_user_id in previously_ignored_users: - self._invalidate_cache_and_stream( - txn, self.ignored_by, (ignored_user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.ignored_by, + [ + (ignored_user_id,) + for ignored_user_id in previously_ignored_users + ], + ) # Invalidate for this user the cache tracking ignored users. self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 0061805150..9c46c5d7bd 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1222,14 +1222,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) # Iterate the parent IDs and invalidate caches. - for parent_id in {r[1] for r in relations_to_insert}: - cache_tuple = (parent_id,) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_relations_for_event, cache_tuple # type: ignore[attr-defined] - ) - self._invalidate_cache_and_stream( # type: ignore[attr-defined] - txn, self.get_thread_summary, cache_tuple # type: ignore[attr-defined] - ) + cache_tuples = {(r[1],) for r in relations_to_insert} + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_relations_for_event, cache_tuples # type: ignore[attr-defined] + ) + self._invalidate_cache_and_stream_bulk( # type: ignore[attr-defined] + txn, self.get_thread_summary, cache_tuples # type: ignore[attr-defined] + ) if results: latest_event_id = results[-1][0] diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index ce88772f9e..c700872fdc 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -107,13 +107,16 @@ class KeyStore(CacheInvalidationWorkerStore): # invalidate takes a tuple corresponding to the params of # _get_server_keys_json. _get_server_keys_json only takes one # param, which is itself the 2-tuple (server_name, key_id). - for key_id in verify_keys: - self._invalidate_cache_and_stream( - txn, self._get_server_keys_json, ((server_name, key_id),) - ) - self._invalidate_cache_and_stream( - txn, self.get_server_key_json_for_remote, (server_name, key_id) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_server_keys_json, + [((server_name, key_id),) for key_id in verify_keys], + ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_server_key_json_for_remote, + [(server_name, key_id) for key_id in verify_keys], + ) await self.db_pool.runInteraction( "store_server_keys_response", store_server_keys_response_txn diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 3b444d2d07..0198bb09d2 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -363,10 +363,11 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) # for their user ID. value_values=[(presence_stream_id,) for _ in user_ids], ) - for user_id in user_ids: - self._invalidate_cache_and_stream( - txn, self._get_full_presence_stream_token_for_user, (user_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_full_presence_stream_token_for_user, + [(user_id,) for user_id in user_ids], + ) return await self.db_pool.runInteraction( "add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 1e11bf2706..c3b3e2baaf 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -295,19 +295,28 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # so make sure to keep this actually last. txn.execute("DROP TABLE events_to_purge") - for event_id, should_delete in event_rows: - self._invalidate_cache_and_stream( - txn, self._get_state_group_for_event, (event_id,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self._get_state_group_for_event, + [(event_id,) for event_id, _ in event_rows], + ) - # XXX: This is racy, since have_seen_events could be called between the - # transaction completing and the invalidation running. On the other hand, - # that's no different to calling `have_seen_events` just before the - # event is deleted from the database. + # XXX: This is racy, since have_seen_events could be called between the + # transaction completing and the invalidation running. On the other hand, + # that's no different to calling `have_seen_events` just before the + # event is deleted from the database. + self._invalidate_cache_and_stream_bulk( + txn, + self.have_seen_event, + [ + (room_id, event_id) + for event_id, should_delete in event_rows + if should_delete + ], + ) + + for event_id, should_delete in event_rows: if should_delete: - self._invalidate_cache_and_stream( - txn, self.have_seen_event, (room_id, event_id) - ) self.invalidate_get_event_cache_after_txn(txn, event_id) logger.info("[purge] done") diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index dec9858575..2c3f30e2eb 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -561,16 +561,15 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): updatevalues={"shadow_banned": shadow_banned}, ) # In order for this to apply immediately, clear the cache for this user. - tokens = self.db_pool.simple_select_onecol_txn( + tokens = self.db_pool.simple_select_list_txn( txn, table="access_tokens", keyvalues={"user_id": user_id}, - retcol="token", + retcols=("token",), + ) + self._invalidate_cache_and_stream_bulk( + txn, self.get_user_by_access_token, tokens ) - for token in tokens: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) await self.db_pool.runInteraction("set_shadow_banned", set_shadow_banned_txn) @@ -2683,10 +2682,11 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): ) tokens_and_devices = [(r[0], r[1], r[2]) for r in txn] - for token, _, _ in tokens_and_devices: - self._invalidate_cache_and_stream( - txn, self.get_user_by_access_token, (token,) - ) + self._invalidate_cache_and_stream_bulk( + txn, + self.get_user_by_access_token, + [(token,) for token, _, _ in tokens_and_devices], + ) txn.execute("DELETE FROM access_tokens WHERE %s" % where_clause, values) -- cgit 1.5.1 From 2c6a7dfcbfc6e3782c5d2d88b28a54efca8aff8b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 9 Nov 2023 16:19:42 -0500 Subject: Use attempt_to_set_autocommit everywhere. (#16615) To avoid asserting the type of the database connection. --- changelog.d/16615.misc | 1 + synapse/storage/background_updates.py | 18 ++++++++++++------ synapse/storage/databases/main/search.py | 8 ++++---- synapse/storage/databases/state/bg_updates.py | 4 ++-- tests/server.py | 15 +++++---------- 5 files changed, 24 insertions(+), 22 deletions(-) create mode 100644 changelog.d/16615.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/16615.misc b/changelog.d/16615.misc new file mode 100644 index 0000000000..37ab711dc6 --- /dev/null +++ b/changelog.d/16615.misc @@ -0,0 +1 @@ +Use more generic database methods. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 7426dbcad6..62fbd05534 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -49,7 +49,11 @@ else: if TYPE_CHECKING: from synapse.server import HomeServer - from synapse.storage.database import DatabasePool, LoggingTransaction + from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + ) logger = logging.getLogger(__name__) @@ -746,10 +750,10 @@ class BackgroundUpdater: The named index will be dropped upon completion of the new index. """ - def create_index_psql(conn: Connection) -> None: + def create_index_psql(conn: "LoggingDatabaseConnection") -> None: conn.rollback() # postgres insists on autocommit for the index - conn.set_session(autocommit=True) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -793,9 +797,9 @@ class BackgroundUpdater: undo_timeout_sql = f"SET statement_timeout = {default_timeout}" conn.cursor().execute(undo_timeout_sql) - conn.set_session(autocommit=False) # type: ignore + conn.engine.attempt_to_set_autocommit(conn.conn, False) - def create_index_sqlite(conn: Connection) -> None: + def create_index_sqlite(conn: "LoggingDatabaseConnection") -> None: # Sqlite doesn't support concurrent creation of indexes. # # We assume that sqlite doesn't give us invalid indices; however @@ -825,7 +829,9 @@ class BackgroundUpdater: c.execute(sql) if isinstance(self.db_pool.engine, engines.PostgresEngine): - runner: Optional[Callable[[Connection], None]] = create_index_psql + runner: Optional[ + Callable[[LoggingDatabaseConnection], None] + ] = create_index_psql elif psql_only: runner = None else: diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index f4bef4c99b..e25d86818b 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -275,7 +275,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we have to set autocommit, because postgres refuses to # CREATE INDEX CONCURRENTLY without it. - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: c = conn.cursor() @@ -301,7 +301,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # we should now be able to delete the GIST index. c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) if isinstance(self.database_engine, PostgresEngine): await self.db_pool.runWithConnection(create_index) @@ -323,7 +323,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): def create_index(conn: LoggingDatabaseConnection) -> None: conn.rollback() - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) c = conn.cursor() # We create with NULLS FIRST so that when we search *backwards* @@ -340,7 +340,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ON event_search(origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST) """ ) - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) await self.db_pool.runWithConnection(create_index) diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 0f9c550b27..2c3151526d 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -492,7 +492,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): conn.rollback() if isinstance(self.database_engine, PostgresEngine): # postgres insists on autocommit for the index - conn.set_session(autocommit=True) + conn.engine.attempt_to_set_autocommit(conn.conn, True) try: txn = conn.cursor() txn.execute( @@ -501,7 +501,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): ) txn.execute("DROP INDEX IF EXISTS state_groups_state_id") finally: - conn.set_session(autocommit=False) + conn.engine.attempt_to_set_autocommit(conn.conn, False) else: txn = conn.cursor() txn.execute( diff --git a/tests/server.py b/tests/server.py index 5a63ecee9f..2b63ed3dd8 100644 --- a/tests/server.py +++ b/tests/server.py @@ -88,7 +88,7 @@ from synapse.module_api.callbacks.third_party_event_rules_callbacks import ( from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.database import LoggingDatabaseConnection -from synapse.storage.engines import PostgresEngine, create_engine +from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database from synapse.types import ISynapseReactor, JsonDict from synapse.util import Clock @@ -1029,9 +1029,7 @@ def setup_test_homeserver( # Create the database before we actually try and connect to it, based off # the template database we generate in setupdb() - if isinstance(db_engine, PostgresEngine): - import psycopg2.extensions - + if USE_POSTGRES_FOR_TESTS: db_conn = db_engine.module.connect( dbname=POSTGRES_BASE_DB, user=POSTGRES_USER, @@ -1039,8 +1037,7 @@ def setup_test_homeserver( port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - assert isinstance(db_conn, psycopg2.extensions.connection) - db_conn.autocommit = True + db_engine.attempt_to_set_autocommit(db_conn, True) cur = db_conn.cursor() cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) cur.execute( @@ -1065,13 +1062,12 @@ def setup_test_homeserver( hs.setup() - if isinstance(db_engine, PostgresEngine): + if USE_POSTGRES_FOR_TESTS: database_pool = hs.get_datastores().databases[0] # We need to do cleanup on PostgreSQL def cleanup() -> None: import psycopg2 - import psycopg2.extensions # Close all the db pools database_pool._db_pool.close() @@ -1086,8 +1082,7 @@ def setup_test_homeserver( port=POSTGRES_PORT, password=POSTGRES_PASSWORD, ) - assert isinstance(db_conn, psycopg2.extensions.connection) - db_conn.autocommit = True + db_engine.attempt_to_set_autocommit(db_conn, True) cur = db_conn.cursor() # Try a few times to drop the DB. Some things may hold on to the -- cgit 1.5.1 From 0e36a57b60f37ef1cb2d031bd988afe42077940e Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 13 Nov 2023 16:57:44 +0000 Subject: Remove whole table locks on push rule add/delete (#16051) The statements are already executed within a transaction thus a table level lock is unnecessary. --- changelog.d/16051.misc | 1 + synapse/storage/databases/main/push_rule.py | 43 ++++++++++++++++++----------- 2 files changed, 28 insertions(+), 16 deletions(-) create mode 100644 changelog.d/16051.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/16051.misc b/changelog.d/16051.misc new file mode 100644 index 0000000000..1420d2eb3f --- /dev/null +++ b/changelog.d/16051.misc @@ -0,0 +1 @@ +Remove whole table locks on push rule modifications. Contributed by Nick @ Beeper (@fizzadar). diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index f72a23c584..cf622e195c 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -449,26 +449,28 @@ class PushRuleStore(PushRulesWorkerStore): before: str, after: str, ) -> None: - # Lock the table since otherwise we'll have annoying races between the - # SELECT here and the UPSERT below. - self.database_engine.lock_table(txn, "push_rules") - relative_to_rule = before or after - res = self.db_pool.simple_select_one_txn( - txn, - table="push_rules", - keyvalues={"user_name": user_id, "rule_id": relative_to_rule}, - retcols=["priority_class", "priority"], - allow_none=True, - ) + sql = """ + SELECT priority, priority_class FROM push_rules + WHERE user_name = ? AND rule_id = ? + """ + + if isinstance(self.database_engine, PostgresEngine): + sql += " FOR UPDATE" + else: + # Annoyingly SQLite doesn't support row level locking, so lock the whole table + self.database_engine.lock_table(txn, "push_rules") + + txn.execute(sql, (user_id, relative_to_rule)) + row = txn.fetchone() - if not res: + if row is None: raise RuleNotFoundException( "before/after rule not found: %s" % (relative_to_rule,) ) - base_priority_class, base_rule_priority = res + base_rule_priority, base_priority_class = row if base_priority_class != priority_class: raise InconsistentRuleException( @@ -516,9 +518,18 @@ class PushRuleStore(PushRulesWorkerStore): conditions_json: str, actions_json: str, ) -> None: - # Lock the table since otherwise we'll have annoying races between the - # SELECT here and the UPSERT below. - self.database_engine.lock_table(txn, "push_rules") + if isinstance(self.database_engine, PostgresEngine): + # Postgres doesn't do FOR UPDATE on aggregate functions, so select the rows first + # then re-select the count/max below. + sql = """ + SELECT * FROM push_rules + WHERE user_name = ? and priority_class = ? + FOR UPDATE + """ + txn.execute(sql, (user_id, priority_class)) + else: + # Annoyingly SQLite doesn't support row level locking, so lock the whole table + self.database_engine.lock_table(txn, "push_rules") # find the highest priority rule in that class sql = ( -- cgit 1.5.1 From f2f2c7c1f05de87f43cc2d18d5dc9bd636b3ed0a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 15 Nov 2023 08:02:11 -0500 Subject: Use full GitHub links instead of bare issue numbers. (#16637) --- .git-blame-ignore-revs | 12 ++++++------ changelog.d/16637.misc | 1 + debian/changelog | 2 +- pyproject.toml | 9 +++++---- synapse/app/generic_worker.py | 4 ++-- synapse/federation/sender/__init__.py | 4 ++-- synapse/handlers/federation_event.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/sync.py | 4 ++-- synapse/handlers/user_directory.py | 8 ++++---- synapse/http/matrixfederationclient.py | 14 ++++++++------ .../callbacks/third_party_event_rules_callbacks.py | 3 ++- synapse/storage/databases/__init__.py | 2 +- synapse/storage/databases/main/events_bg_updates.py | 4 ++-- synapse/storage/databases/main/events_worker.py | 6 ++++-- synapse/storage/databases/main/monthly_active_users.py | 2 +- synapse/storage/databases/main/purge_events.py | 2 +- synapse/storage/engines/postgres.py | 3 ++- synapse/storage/schema/__init__.py | 3 ++- .../main/delta/54/delete_forward_extremities.sql | 2 +- .../56/remove_tombstoned_rooms_from_directory.sql | 3 ++- .../main/delta/70/01clean_table_purged_rooms.sql | 3 ++- synapse/util/check_dependencies.py | 3 ++- sytest-blacklist | 2 +- tests/federation/test_federation_sender.py | 2 +- tests/handlers/test_federation.py | 6 +++--- tests/http/test_matrixfederationclient.py | 3 ++- tests/push/test_bulk_push_rule_evaluator.py | 2 +- tests/replication/tcp/streams/test_to_device.py | 2 +- tests/rest/admin/test_user.py | 2 +- tests/rest/client/test_events.py | 2 +- tests/rest/client/test_profile.py | 6 ++++-- tests/rest/client/test_rooms.py | 3 ++- tests/rest/client/test_sync.py | 2 +- tests/storage/databases/main/test_lock.py | 18 +++++++++--------- tests/storage/test_database.py | 5 +++-- tests/storage/test_event_federation.py | 2 +- tests/storage/test_room_search.py | 2 +- tests/util/test_check_dependencies.py | 10 +++++++--- 39 files changed, 94 insertions(+), 73 deletions(-) create mode 100644 changelog.d/16637.misc (limited to 'synapse/storage/databases') diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index 839b895c82..4c7b0335e6 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -8,21 +8,21 @@ # If ignoring a pull request that was not squash merged, only the merge # commit needs to be put here. Child commits will be resolved from it. -# Run black (#3679). +# Run black (https://github.com/matrix-org/synapse/pull/3679). 8b3d9b6b199abb87246f982d5db356f1966db925 -# Black reformatting (#5482). +# Black reformatting (https://github.com/matrix-org/synapse/pull/5482). 32e7c9e7f20b57dd081023ac42d6931a8da9b3a3 -# Target Python 3.5 with black (#8664). +# Target Python 3.5 with black (https://github.com/matrix-org/synapse/pull/8664). aff1eb7c671b0a3813407321d2702ec46c71fa56 -# Update black to 20.8b1 (#9381). +# Update black to 20.8b1 (https://github.com/matrix-org/synapse/pull/9381). 0a00b7ff14890987f09112a2ae696c61001e6cf1 -# Convert tests/rest/admin/test_room.py to unix file endings (#7953). +# Convert tests/rest/admin/test_room.py to unix file endings (https://github.com/matrix-org/synapse/pull/7953). c4268e3da64f1abb5b31deaeb5769adb6510c0a7 -# Update black to 23.1.0 (#15103) +# Update black to 23.1.0 (https://github.com/matrix-org/synapse/pull/15103) 9bb2eac71962970d02842bca441f4bcdbbf93a11 diff --git a/changelog.d/16637.misc b/changelog.d/16637.misc new file mode 100644 index 0000000000..f5068ac291 --- /dev/null +++ b/changelog.d/16637.misc @@ -0,0 +1 @@ +Improve references to GitHub issues. diff --git a/debian/changelog b/debian/changelog index cbfcb8f44d..5470f3877f 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1637,7 +1637,7 @@ matrix-synapse-py3 (0.99.3.1) stable; urgency=medium matrix-synapse-py3 (0.99.3) stable; urgency=medium [ Richard van der Hoff ] - * Fix warning during preconfiguration. (Fixes: #4819) + * Fix warning during preconfiguration. (Fixes: https://github.com/matrix-org/synapse/issues/4819) [ Synapse Packaging team ] * New synapse release 0.99.3. diff --git a/pyproject.toml b/pyproject.toml index df132c0236..825ff73f95 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -192,7 +192,7 @@ phonenumbers = ">=8.2.0" # we use GaugeHistogramMetric, which was added in prom-client 0.4.0. prometheus-client = ">=0.4.0" # we use `order`, which arrived in attrs 19.2.0. -# Note: 21.1.0 broke `/sync`, see #9936 +# Note: 21.1.0 broke `/sync`, see https://github.com/matrix-org/synapse/issues/9936 attrs = ">=19.2.0,!=21.1.0" netaddr = ">=0.7.18" # Jinja 2.x is incompatible with MarkupSafe>=2.1. To ensure that admins do not @@ -357,7 +357,7 @@ commonmark = ">=0.9.1" pygithub = ">=1.55" # The following are executed as commands by the release script. twine = "*" -# Towncrier min version comes from #3425. Rationale unclear. +# Towncrier min version comes from https://github.com/matrix-org/synapse/pull/3425. Rationale unclear. towncrier = ">=18.6.0rc1" # Used for checking the Poetry lockfile @@ -377,8 +377,9 @@ furo = ">=2022.12.7,<2024.0.0" [build-system] # The upper bounds here are defensive, intended to prevent situations like -# #13849 and #14079 where we see buildtime or runtime errors caused by build -# system changes. +# https://github.com/matrix-org/synapse/issues/13849 and +# https://github.com/matrix-org/synapse/issues/14079 where we see buildtime or +# runtime errors caused by build system changes. # We are happy to raise these upper bounds upon request, # provided we check that it's safe to do so (i.e. that CI passes). requires = ["poetry-core>=1.1.0,<=1.7.0", "setuptools_rust>=1.3,<=1.8.1"] diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index f7c80eee21..bcfb7a7200 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -104,8 +104,8 @@ logger = logging.getLogger("synapse.app.generic_worker") class GenericWorkerStore( - # FIXME(#3714): We need to add UserDirectoryStore as we write directly - # rather than going via the correct worker. + # FIXME(https://github.com/matrix-org/synapse/issues/3714): We need to add + # UserDirectoryStore as we write directly rather than going via the correct worker. UserDirectoryStore, StatsStore, UIAuthWorkerStore, diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 7980d1a322..948fde6658 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -581,14 +581,14 @@ class FederationSender(AbstractFederationSender): "get_joined_hosts", str(sg) ) if destinations is None: - # Add logging to help track down #13444 + # Add logging to help track down https://github.com/matrix-org/synapse/issues/13444 logger.info( "Unexpectedly did not have cached destinations for %s / %s", sg, event.event_id, ) else: - # Add logging to help track down #13444 + # Add logging to help track down https://github.com/matrix-org/synapse/issues/13444 logger.info( "Unexpectedly did not have cached prev group for %s", event.event_id, diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 0cc8e990d9..ba6b94a8b7 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -748,7 +748,7 @@ class FederationEventHandler: # fetching fresh state for the room if the missing event # can't be found, which slightly reduces our security. # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. + # causing additional state resolution? See https://github.com/matrix-org/synapse/issues/1760. # However, fetching state doesn't hold the linearizer lock # apparently. # diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 202beee738..4137fd50b1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1816,7 +1816,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): # the same token repeatedly. # # Hence this guard where we just return nothing so that the sync - # doesn't return. C.f. #5503. + # doesn't return. C.f. https://github.com/matrix-org/synapse/issues/5503. return [], max_token # Figure out which other users this user should explicitly receive diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2f1bc5a015..bf0106c6e7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -399,7 +399,7 @@ class SyncHandler: # # If that happens, we mustn't cache it, so that when the client comes back # with the same cache token, we don't immediately return the same empty - # result, causing a tightloop. (#8518) + # result, causing a tightloop. (https://github.com/matrix-org/synapse/issues/8518) if result.next_batch == since_token: cache_context.should_cache = False @@ -1003,7 +1003,7 @@ class SyncHandler: # always make sure we LL ourselves so we know we're in the room # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 # We only need apply this on full state syncs given we disabled - # LL for incr syncs in #3840. + # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840. # We don't insert ourselves into `members_to_fetch`, because in some # rare cases (an empty event batch with a now_token after the user's # leave in a partial state room which another local user has diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 75717ba4f9..3c19ea56f8 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -184,8 +184,8 @@ class UserDirectoryHandler(StateDeltasHandler): """Called to update index of our local user profiles when they change irrespective of any rooms the user may be in. """ - # FIXME(#3714): We should probably do this in the same worker as all - # the other changes. + # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should + # probably do this in the same worker as all the other changes. if await self.store.should_include_local_user_in_dir(user_id): await self.store.update_profile_in_user_dir( @@ -194,8 +194,8 @@ class UserDirectoryHandler(StateDeltasHandler): async def handle_local_user_deactivated(self, user_id: str) -> None: """Called when a user ID is deactivated""" - # FIXME(#3714): We should probably do this in the same worker as all - # the other changes. + # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should + # probably do this in the same worker as all the other changes. await self.store.remove_from_user_dir(user_id) async def _unsafe_process(self) -> None: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 08c7fc1631..d5013e8e97 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -465,7 +465,7 @@ class MatrixFederationHttpClient: """Wrapper for _send_request which can optionally retry the request upon receiving a combination of a 400 HTTP response code and a 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3 - due to #3622. + due to https://github.com/matrix-org/synapse/issues/3622. Args: request: details of request to be sent @@ -958,9 +958,9 @@ class MatrixFederationHttpClient: requests). try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED response we should try appending a trailing slash to the end - of the request. Workaround for #3622 in Synapse <= v0.99.3. This - will be attempted before backing off if backing off has been - enabled. + of the request. Workaround for https://github.com/matrix-org/synapse/issues/3622 + in Synapse <= v0.99.3. This will be attempted before backing off if + backing off has been enabled. parser: The parser to use to decode the response. Defaults to parsing as JSON. backoff_on_all_error_codes: Back off if we get any error response @@ -1155,7 +1155,8 @@ class MatrixFederationHttpClient: try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED response we should try appending a trailing slash to the end of - the request. Workaround for #3622 in Synapse <= v0.99.3. + the request. Workaround for https://github.com/matrix-org/synapse/issues/3622 + in Synapse <= v0.99.3. parser: The parser to use to decode the response. Defaults to parsing as JSON. @@ -1250,7 +1251,8 @@ class MatrixFederationHttpClient: try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED response we should try appending a trailing slash to the end of - the request. Workaround for #3622 in Synapse <= v0.99.3. + the request. Workaround for https://github.com/matrix-org/synapse/issues/3622 + in Synapse <= v0.99.3. parser: The parser to use to decode the response. Defaults to parsing as JSON. diff --git a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py index ecaeef3511..7419785aff 100644 --- a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py +++ b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py @@ -295,7 +295,8 @@ class ThirdPartyEventRulesModuleApiCallbacks: raise except SynapseError as e: # FIXME: Being able to throw SynapseErrors is relied upon by - # some modules. PR #10386 accidentally broke this ability. + # some modules. PR https://github.com/matrix-org/synapse/pull/10386 + # accidentally broke this ability. # That said, we aren't keen on exposing this implementation detail # to modules and we should one day have a proper way to do what # is wanted. diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 7aa24ccf21..b57e260fe0 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -45,7 +45,7 @@ class Databases(Generic[DataStoreT]): """ databases: List[DatabasePool] - main: "DataStore" # FIXME: #11165: actually an instance of `main_store_class` + main: "DataStore" # FIXME: https://github.com/matrix-org/synapse/issues/11165: actually an instance of `main_store_class` state: StateGroupDataStore persist_events: Optional[PersistEventsStore] diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 9c46c5d7bd..0c91f19c8e 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -425,7 +425,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Background update to clean out extremities that should have been deleted previously. - Mainly used to deal with the aftermath of #5269. + Mainly used to deal with the aftermath of https://github.com/matrix-org/synapse/issues/5269. """ # This works by first copying all existing forward extremities into the @@ -558,7 +558,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) logger.info( - "Deleted %d forward extremities of %d checked, to clean up #5269", + "Deleted %d forward extremities of %d checked, to clean up matrix-org/synapse#5269", deleted, len(original_set), ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4e63a16fa2..4125059061 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1312,7 +1312,8 @@ class EventsWorkerStore(SQLBaseStore): room_version: Optional[RoomVersion] if not room_version_id: # this should only happen for out-of-band membership events which - # arrived before #6983 landed. For all other events, we should have + # arrived before https://github.com/matrix-org/synapse/issues/6983 + # landed. For all other events, we should have # an entry in the 'rooms' table. # # However, the 'out_of_band_membership' flag is unreliable for older @@ -1323,7 +1324,8 @@ class EventsWorkerStore(SQLBaseStore): "Room %s for event %s is unknown" % (d["room_id"], event_id) ) - # so, assuming this is an out-of-band-invite that arrived before #6983 + # so, assuming this is an out-of-band-invite that arrived before + # https://github.com/matrix-org/synapse/issues/6983 # landed, we know that the room version must be v5 or earlier (because # v6 hadn't been invented at that point, so invites from such rooms # would have been rejected.) diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 4b1061e6d7..2911e53310 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -317,7 +317,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): if user_id: is_support = self.is_support_user_txn(txn, user_id) if not is_support: - # We do this manually here to avoid hitting #6791 + # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791 self.db_pool.simple_upsert_txn( txn, table="monthly_active_users", diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index c3b3e2baaf..1a5b5731bb 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -494,7 +494,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # - room_tags_revisions # The problem with these is that they are largeish and there is no room_id # index on them. In any case we should be clearing out 'stream' tables - # periodically anyway (#5888) + # periodically anyway (https://github.com/matrix-org/synapse/issues/5888) self._invalidate_caches_for_room_and_stream(txn, room_id) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 6309363217..ec4c4041b7 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -38,7 +38,8 @@ class PostgresEngine( super().__init__(psycopg2, database_config) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) - # Disables passing `bytes` to txn.execute, c.f. #6186. If you do + # Disables passing `bytes` to txn.execute, c.f. + # https://github.com/matrix-org/synapse/issues/6186. If you do # actually want to use bytes than wrap it in `bytearray`. def _disable_bytes_adapter(_: bytes) -> NoReturn: raise Exception("Passing bytes to DB is disabled.") diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 158b528dce..03e5a0f55d 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -109,7 +109,8 @@ Changes in SCHEMA_VERSION = 78 Changes in SCHEMA_VERSION = 79 - Add tables to handle in DB read-write locks. - - Add some mitigations for a painful race between foreground and background updates, cf #15677. + - Add some mitigations for a painful race between foreground and background updates, cf + https://github.com/matrix-org/synapse/issues/15677. Changes in SCHEMA_VERSION = 80 - The event_txn_id_device_id is always written to for new events. diff --git a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql index b062ec840c..f713e42aa0 100644 --- a/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql +++ b/synapse/storage/schema/main/delta/54/delete_forward_extremities.sql @@ -14,7 +14,7 @@ */ -- Start a background job to cleanup extremities that were incorrectly added --- by bug #5269. +-- by bug https://github.com/matrix-org/synapse/issues/5269. INSERT INTO background_updates (update_name, progress_json) VALUES ('delete_soft_failed_extremities', '{}'); diff --git a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql index aeb17813d3..246c3359f7 100644 --- a/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql +++ b/synapse/storage/schema/main/delta/56/remove_tombstoned_rooms_from_directory.sql @@ -13,6 +13,7 @@ * limitations under the License. */ --- Now that #6232 is a thing, we can remove old rooms from the directory. +-- Now that https://github.com/matrix-org/synapse/pull/6232 is a thing, we can +-- remove old rooms from the directory. INSERT INTO background_updates (update_name, progress_json) VALUES ('remove_tombstoned_rooms_from_directory', '{}'); diff --git a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql index aed79635b2..31a61defa7 100644 --- a/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql +++ b/synapse/storage/schema/main/delta/70/01clean_table_purged_rooms.sql @@ -13,7 +13,8 @@ * limitations under the License. */ --- Clean up left over rows from bug #11833, which was fixed in #12770. +-- Clean up left over rows from bug https://github.com/matrix-org/synapse/issues/11833, +-- which was fixed in https://github.com/matrix-org/synapse/pull/12770. DELETE FROM federation_inbound_events_staging WHERE room_id not in ( SELECT room_id FROM rooms ); diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py index f7cead9e12..6f008734a0 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py @@ -189,7 +189,8 @@ def check_requirements(extra: Optional[str] = None) -> None: errors.append(_not_installed(requirement, extra)) else: if dist.version is None: - # This shouldn't happen---it suggests a borked virtualenv. (See #12223) + # This shouldn't happen---it suggests a borked virtualenv. (See + # https://github.com/matrix-org/synapse/issues/12223) # Try to give a vaguely helpful error message anyway. # Type-ignore: the annotations don't reflect reality: see # https://github.com/python/typeshed/issues/7513 diff --git a/sytest-blacklist b/sytest-blacklist index d5fa36cec7..9ec0cecfd4 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -29,5 +29,5 @@ We can't peek into rooms with joined history_visibility Local users can peek by room alias Peeked rooms only turn up in the sync for the device who peeked them -# Validation needs to be added to Synapse: #10554 +# Validation needs to be added to Synapse: https://github.com/matrix-org/synapse/issues/10554 Rejects invalid device keys diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index caf04b54cb..2c970fc827 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -478,7 +478,7 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # expect two edus, in one or two transactions. We don't know what order the # devices will be updated. self.assertEqual(len(self.edus), 2) - stream_id = None # FIXME: there is a discontinuity in the stream IDs: see #7142 + stream_id = None # FIXME: there is a discontinuity in the stream IDs: see https://github.com/matrix-org/synapse/issues/7142 for edu in self.edus: self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE) c = edu["content"] diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 4fc0742413..a035232905 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -112,7 +112,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): """ Check that we store the state group correctly for rejected non-state events. - Regression test for #6289. + Regression test for https://github.com/matrix-org/synapse/issues/6289. """ OTHER_SERVER = "otherserver" OTHER_USER = "@otheruser:" + OTHER_SERVER @@ -165,7 +165,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): """ Check that we store the state group correctly for rejected state events. - Regression test for #6289. + Regression test for https://github.com/matrix-org/synapse/issues/6289. """ OTHER_SERVER = "otherserver" OTHER_USER = "@otheruser:" + OTHER_SERVER @@ -222,7 +222,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): of backwards extremities(the magic number is more than 5), no errors are thrown. - Regression test, see #11027 + Regression test, see https://github.com/matrix-org/synapse/pull/11027 """ # create the room user_id = self.register_user("kermit", "test") diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index bf1d287699..b7337d3926 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -368,7 +368,8 @@ class FederationClientTests(HomeserverTestCase): """ If a connection is made to a client but the client rejects it due to requiring a trailing slash. We need to retry the request with a - trailing slash. Workaround for Synapse <= v0.99.3, explained in #3622. + trailing slash. Workaround for Synapse <= v0.99.3, explained in + https://github.com/matrix-org/synapse/issues/3622. """ d = defer.ensureDeferred( self.cl.get_json("testserv:8008", "foo/bar", try_trailing_slash_on_400=True) diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index 7c23b77e0a..907ee1488c 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -92,7 +92,7 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase): - the bad power level value for "room", before JSON serisalistion - whether Bob should expect the message to be highlighted - Reproduces #14060. + Reproduces https://github.com/matrix-org/synapse/issues/14060. A lack of validation: the gift that keeps on giving. """ diff --git a/tests/replication/tcp/streams/test_to_device.py b/tests/replication/tcp/streams/test_to_device.py index ab379e8cf1..85adf84ece 100644 --- a/tests/replication/tcp/streams/test_to_device.py +++ b/tests/replication/tcp/streams/test_to_device.py @@ -62,7 +62,7 @@ class ToDeviceStreamTestCase(BaseStreamTestCase): ) # add one more message, for user2 this time - # this message would be dropped before fixing #15335 + # this message would be dropped before fixing https://github.com/matrix-org/synapse/issues/15335 msg["content"] = {"device": {}} messages = {user2: {"device": msg}} diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 42b065d883..492adb6160 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -1478,7 +1478,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase): def test_deactivate_user_erase_true_avatar_nonnull_but_empty(self) -> None: """Check we can erase a user whose avatar is the empty string. - Reproduces #12257. + Reproduces https://github.com/matrix-org/synapse/issues/12257. """ # Patch `self.other_user` to have an empty string as their avatar. self.get_success( diff --git a/tests/rest/client/test_events.py b/tests/rest/client/test_events.py index 141e0f57a3..8bea860beb 100644 --- a/tests/rest/client/test_events.py +++ b/tests/rest/client/test_events.py @@ -64,7 +64,7 @@ class EventStreamPermissionsTestCase(unittest.HomeserverTestCase): # 403. However, since the v1 spec no longer exists and the v1 # implementation is now part of the r0 implementation, the newer # behaviour is used instead to be consistent with the r0 spec. - # see issue #2602 + # see issue https://github.com/matrix-org/synapse/issues/2602 channel = self.make_request( "GET", "/events?access_token=%s" % ("invalid" + self.token,) ) diff --git a/tests/rest/client/test_profile.py b/tests/rest/client/test_profile.py index ecae092b47..8f923fd40f 100644 --- a/tests/rest/client/test_profile.py +++ b/tests/rest/client/test_profile.py @@ -170,7 +170,8 @@ class ProfileTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.code, 200, channel.result) # FIXME: If a user has no displayname set, Synapse returns 200 and omits a - # displayname from the response. This contradicts the spec, see #13137. + # displayname from the response. This contradicts the spec, see + # https://github.com/matrix-org/synapse/issues/13137. return channel.json_body.get("displayname") def _get_avatar_url(self, name: Optional[str] = None) -> Optional[str]: @@ -179,7 +180,8 @@ class ProfileTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.code, 200, channel.result) # FIXME: If a user has no avatar set, Synapse returns 200 and omits an - # avatar_url from the response. This contradicts the spec, see #13137. + # avatar_url from the response. This contradicts the spec, see + # https://github.com/matrix-org/synapse/issues/13137. return channel.json_body.get("avatar_url") @unittest.override_config({"max_avatar_size": 50}) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index aaa4f3bba0..bb24ed6aa7 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -888,7 +888,8 @@ class RoomsCreateTestCase(RoomBase): ) def test_room_creation_ratelimiting(self) -> None: """ - Regression test for #14312, where ratelimiting was made too strict. + Regression test for https://github.com/matrix-org/synapse/issues/14312, + where ratelimiting was made too strict. Clients should be able to create 10 rooms in a row without hitting rate limits, using default rate limit config. (We override rate limiting config back to its default value.) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index d60665254e..07c81d7f76 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -642,7 +642,7 @@ class SyncCacheTestCase(unittest.HomeserverTestCase): def test_noop_sync_does_not_tightloop(self) -> None: """If the sync times out, we shouldn't cache the result - Essentially a regression test for #8518. + Essentially a regression test for https://github.com/matrix-org/synapse/issues/8518. """ self.user_id = self.register_user("kermit", "monkey") self.tok = self.login("kermit", "monkey") diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 35f77052a7..6c4d44c05c 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -66,9 +66,9 @@ class LockTestCase(unittest.HomeserverTestCase): # Run the tasks to completion. # To work around `Linearizer`s using a different reactor to sleep when - # contended (#12841), we call `runUntilCurrent` on - # `twisted.internet.reactor`, which is a different reactor to that used - # by the homeserver. + # contended (https://github.com/matrix-org/synapse/issues/12841), we call + # `runUntilCurrent` on `twisted.internet.reactor`, which is a different + # reactor to that used by the homeserver. assert isinstance(reactor, ReactorBase) self.get_success(task1) reactor.runUntilCurrent() @@ -217,9 +217,9 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): # Run the tasks to completion. # To work around `Linearizer`s using a different reactor to sleep when - # contended (#12841), we call `runUntilCurrent` on - # `twisted.internet.reactor`, which is a different reactor to that used - # by the homeserver. + # contended (https://github.com/matrix-org/synapse/issues/12841), we call + # `runUntilCurrent` on `twisted.internet.reactor`, which is a different + # reactor to that used by the homeserver. assert isinstance(reactor, ReactorBase) self.get_success(task1) reactor.runUntilCurrent() @@ -269,9 +269,9 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): # Run the tasks to completion. # To work around `Linearizer`s using a different reactor to sleep when - # contended (#12841), we call `runUntilCurrent` on - # `twisted.internet.reactor`, which is a different reactor to that used - # by the homeserver. + # contended (https://github.com/matrix-org/synapse/issues/12841), we call + # `runUntilCurrent` on `twisted.internet.reactor`, which is a different + # reactor to that used by the homeserver. assert isinstance(reactor, ReactorBase) self.get_success(task1) reactor.runUntilCurrent() diff --git a/tests/storage/test_database.py b/tests/storage/test_database.py index 92ddaa6f4c..d60176b1d4 100644 --- a/tests/storage/test_database.py +++ b/tests/storage/test_database.py @@ -214,7 +214,8 @@ class CallbacksTestCase(unittest.HomeserverTestCase): after_callback, exception_callback = self._run_interaction(_test_txn) # Calling both `after_callback`s when the first attempt failed is rather - # surprising (#12184). Let's document the behaviour in a test. + # surprising (https://github.com/matrix-org/synapse/issues/12184). + # Let's document the behaviour in a test. after_callback.assert_has_calls( [ call(123, 456, extra=789), @@ -293,7 +294,7 @@ class PostgresReplicaIdentityTestCase(unittest.HomeserverTestCase): def test_all_tables_have_postgres_replica_identity(self) -> None: """ Tests that all tables have a Postgres REPLICA IDENTITY. - (See #16224). + (See https://github.com/matrix-org/synapse/issues/16224). Tables with a PRIMARY KEY have an implied REPLICA IDENTITY and are fine. Other tables need them to be set with `ALTER TABLE`. diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index d3e20f44b2..66a027887d 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1060,7 +1060,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): self, ) -> None: """ - A test that reproduces #13929 (Postgres only). + A test that reproduces https://github.com/matrix-org/synapse/issues/13929 (Postgres only). Test to make sure we can still get backfill points after many failed pull attempts that cause us to backoff to the limit. Even if the backoff formula diff --git a/tests/storage/test_room_search.py b/tests/storage/test_room_search.py index 52ffa91c81..e3dc3623cb 100644 --- a/tests/storage/test_room_search.py +++ b/tests/storage/test_room_search.py @@ -93,7 +93,7 @@ class EventSearchInsertionTest(HomeserverTestCase): both strings and integers. When using Postgres, integers are automatically converted to strings. - Regression test for #11918. + Regression test for https://github.com/matrix-org/synapse/issues/11918. """ store = self.hs.get_datastores().main diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py index aa20fe6780..c1392d8bfc 100644 --- a/tests/util/test_check_dependencies.py +++ b/tests/util/test_check_dependencies.py @@ -89,7 +89,8 @@ class TestDependencyChecker(TestCase): def test_version_reported_as_none(self) -> None: """Complain if importlib.metadata.version() returns None. - This shouldn't normally happen, but it was seen in the wild (#12223). + This shouldn't normally happen, but it was seen in the wild + (https://github.com/matrix-org/synapse/issues/12223). """ with patch( "synapse.util.check_dependencies.metadata.requires", @@ -148,7 +149,7 @@ class TestDependencyChecker(TestCase): """ Tests that release candidates count as far as satisfying a dependency is concerned. - (Regression test, see #12176.) + (Regression test, see https://github.com/matrix-org/synapse/issues/12176.) """ with patch( "synapse.util.check_dependencies.metadata.requires", @@ -162,7 +163,10 @@ class TestDependencyChecker(TestCase): check_requirements() def test_setuptools_rust_ignored(self) -> None: - """Test a workaround for a `poetry build` problem. Reproduces #13926.""" + """ + Test a workaround for a `poetry build` problem. Reproduces + https://github.com/matrix-org/synapse/issues/13926. + """ with patch( "synapse.util.check_dependencies.metadata.requires", return_value=["setuptools_rust >= 1.3"], -- cgit 1.5.1 From 999bd77d3abb7b0a4430f31f5912956c3bc100ee Mon Sep 17 00:00:00 2001 From: Sumner Evans Date: Wed, 15 Nov 2023 07:19:24 -0700 Subject: Asynchronous Uploads (#15503) Support asynchronous uploads as defined in MSC2246. --- changelog.d/15503.feature | 1 + docs/usage/configuration/config_documentation.md | 34 ++++ synapse/api/errors.py | 2 + synapse/config/ratelimiting.py | 7 + synapse/config/repository.py | 6 + synapse/media/_base.py | 6 + synapse/media/media_repository.py | 220 +++++++++++++++++++-- synapse/rest/media/create_resource.py | 83 ++++++++ synapse/rest/media/download_resource.py | 22 ++- synapse/rest/media/media_repository_resource.py | 8 +- synapse/rest/media/thumbnail_resource.py | 69 ++++--- synapse/rest/media/upload_resource.py | 75 ++++++- synapse/storage/databases/main/media_repository.py | 90 ++++++++- tests/media/test_media_storage.py | 4 +- 14 files changed, 568 insertions(+), 59 deletions(-) create mode 100644 changelog.d/15503.feature create mode 100644 synapse/rest/media/create_resource.py (limited to 'synapse/storage/databases') diff --git a/changelog.d/15503.feature b/changelog.d/15503.feature new file mode 100644 index 0000000000..b6ca97a2cf --- /dev/null +++ b/changelog.d/15503.feature @@ -0,0 +1 @@ +Add support for asynchronous uploads as defined by [MSC2246](https://github.com/matrix-org/matrix-spec-proposals/pull/2246). Contributed by @sumnerevans at @beeper. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 4200e70c83..7c4e742cd5 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1753,6 +1753,19 @@ rc_third_party_invite: burst_count: 10 ``` --- +### `rc_media_create` + +This option ratelimits creation of MXC URIs via the `/_matrix/media/v1/create` +endpoint based on the account that's creating the media. Defaults to +`per_second: 10`, `burst_count: 50`. + +Example configuration: +```yaml +rc_media_create: + per_second: 10 + burst_count: 50 +``` +--- ### `rc_federation` Defines limits on federation requests. @@ -1814,6 +1827,27 @@ Example configuration: media_store_path: "DATADIR/media_store" ``` --- +### `max_pending_media_uploads` + +How many *pending media uploads* can a given user have? A pending media upload +is a created MXC URI that (a) is not expired (the `unused_expires_at` timestamp +has not passed) and (b) the media has not yet been uploaded for. Defaults to 5. + +Example configuration: +```yaml +max_pending_media_uploads: 5 +``` +--- +### `unused_expiration_time` + +How long to wait in milliseconds before expiring created media IDs. Defaults to +"24h" + +Example configuration: +```yaml +unused_expiration_time: "1h" +``` +--- ### `media_storage_providers` Media storage providers allow media to be stored in different diff --git a/synapse/api/errors.py b/synapse/api/errors.py index fdb2955be8..fbd8b16ec3 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -83,6 +83,8 @@ class Codes(str, Enum): USER_DEACTIVATED = "M_USER_DEACTIVATED" # USER_LOCKED = "M_USER_LOCKED" USER_LOCKED = "ORG_MATRIX_MSC3939_USER_LOCKED" + NOT_YET_UPLOADED = "M_NOT_YET_UPLOADED" + CANNOT_OVERWRITE_MEDIA = "M_CANNOT_OVERWRITE_MEDIA" # Part of MSC3848 # https://github.com/matrix-org/matrix-spec-proposals/pull/3848 diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 4efbaeac0d..b1fcaf71a3 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -204,3 +204,10 @@ class RatelimitConfig(Config): "rc_third_party_invite", defaults={"per_second": 0.0025, "burst_count": 5}, ) + + # Ratelimit create media requests: + self.rc_media_create = RatelimitSettings.parse( + config, + "rc_media_create", + defaults={"per_second": 10, "burst_count": 50}, + ) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index f6cfdd3e04..839c026d70 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -141,6 +141,12 @@ class ContentRepositoryConfig(Config): "prevent_media_downloads_from", [] ) + self.unused_expiration_time = self.parse_duration( + config.get("unused_expiration_time", "24h") + ) + + self.max_pending_media_uploads = config.get("max_pending_media_uploads", 5) + self.media_store_path = self.ensure_directory( config.get("media_store_path", "media_store") ) diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 860e5ddca2..9d88a711cf 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -83,6 +83,12 @@ INLINE_CONTENT_TYPES = [ "audio/x-flac", ] +# Default timeout_ms for download and thumbnail requests +DEFAULT_MAX_TIMEOUT_MS = 20_000 + +# Maximum allowed timeout_ms for download and thumbnail requests +MAXIMUM_ALLOWED_MAX_TIMEOUT_MS = 60_000 + def respond_404(request: SynapseRequest) -> None: assert request.path is not None diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 1957426c6a..bf976b9e7c 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -27,13 +27,16 @@ import twisted.web.http from twisted.internet.defer import Deferred from synapse.api.errors import ( + Codes, FederationDeniedError, HttpResponseException, NotFoundError, RequestSendFailed, SynapseError, + cs_error, ) from synapse.config.repository import ThumbnailRequirement +from synapse.http.server import respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread from synapse.logging.opentracing import trace @@ -51,7 +54,7 @@ from synapse.media.storage_provider import StorageProviderWrapper from synapse.media.thumbnailer import Thumbnailer, ThumbnailError from synapse.media.url_previewer import UrlPreviewer from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage.databases.main.media_repository import RemoteMedia +from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia from synapse.types import UserID from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination @@ -80,6 +83,8 @@ class MediaRepository: self.store = hs.get_datastores().main self.max_upload_size = hs.config.media.max_upload_size self.max_image_pixels = hs.config.media.max_image_pixels + self.unused_expiration_time = hs.config.media.unused_expiration_time + self.max_pending_media_uploads = hs.config.media.max_pending_media_uploads Thumbnailer.set_limits(self.max_image_pixels) @@ -185,6 +190,117 @@ class MediaRepository: else: self.recently_accessed_locals.add(media_id) + @trace + async def create_media_id(self, auth_user: UserID) -> Tuple[str, int]: + """Create and store a media ID for a local user and return the MXC URI and its + expiration. + + Args: + auth_user: The user_id of the uploader + + Returns: + A tuple containing the MXC URI of the stored content and the timestamp at + which the MXC URI expires. + """ + media_id = random_string(24) + now = self.clock.time_msec() + await self.store.store_local_media_id( + media_id=media_id, + time_now_ms=now, + user_id=auth_user, + ) + return f"mxc://{self.server_name}/{media_id}", now + self.unused_expiration_time + + @trace + async def reached_pending_media_limit(self, auth_user: UserID) -> Tuple[bool, int]: + """Check if the user is over the limit for pending media uploads. + + Args: + auth_user: The user_id of the uploader + + Returns: + A tuple with a boolean and an integer indicating whether the user has too + many pending media uploads and the timestamp at which the first pending + media will expire, respectively. + """ + pending, first_expiration_ts = await self.store.count_pending_media( + user_id=auth_user + ) + return pending >= self.max_pending_media_uploads, first_expiration_ts + + @trace + async def verify_can_upload(self, media_id: str, auth_user: UserID) -> None: + """Verify that the media ID can be uploaded to by the given user. This + function checks that: + + * the media ID exists + * the media ID does not already have content + * the user uploading is the same as the one who created the media ID + * the media ID has not expired + + Args: + media_id: The media ID to verify + auth_user: The user_id of the uploader + """ + media = await self.store.get_local_media(media_id) + if media is None: + raise SynapseError(404, "Unknow media ID", errcode=Codes.NOT_FOUND) + + if media.user_id != auth_user.to_string(): + raise SynapseError( + 403, + "Only the creator of the media ID can upload to it", + errcode=Codes.FORBIDDEN, + ) + + if media.media_length is not None: + raise SynapseError( + 409, + "Media ID already has content", + errcode=Codes.CANNOT_OVERWRITE_MEDIA, + ) + + expired_time_ms = self.clock.time_msec() - self.unused_expiration_time + if media.created_ts < expired_time_ms: + raise NotFoundError("Media ID has expired") + + @trace + async def update_content( + self, + media_id: str, + media_type: str, + upload_name: Optional[str], + content: IO, + content_length: int, + auth_user: UserID, + ) -> None: + """Update the content of the given media ID. + + Args: + media_id: The media ID to replace. + media_type: The content type of the file. + upload_name: The name of the file, if provided. + content: A file like object that is the content to store + content_length: The length of the content + auth_user: The user_id of the uploader + """ + file_info = FileInfo(server_name=None, file_id=media_id) + fname = await self.media_storage.store_file(content, file_info) + logger.info("Stored local media in file %r", fname) + + await self.store.update_local_media( + media_id=media_id, + media_type=media_type, + upload_name=upload_name, + media_length=content_length, + user_id=auth_user, + ) + + try: + await self._generate_thumbnails(None, media_id, media_id, media_type) + except Exception as e: + logger.info("Failed to generate thumbnails: %s", e) + @trace async def create_content( self, @@ -231,8 +347,74 @@ class MediaRepository: return MXCUri(self.server_name, media_id) + def respond_not_yet_uploaded(self, request: SynapseRequest) -> None: + respond_with_json( + request, + 504, + cs_error("Media has not been uploaded yet", code=Codes.NOT_YET_UPLOADED), + send_cors=True, + ) + + async def get_local_media_info( + self, request: SynapseRequest, media_id: str, max_timeout_ms: int + ) -> Optional[LocalMedia]: + """Gets the info dictionary for given local media ID. If the media has + not been uploaded yet, this function will wait up to ``max_timeout_ms`` + milliseconds for the media to be uploaded. + + Args: + request: The incoming request. + media_id: The media ID of the content. (This is the same as + the file_id for local content.) + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. + + Returns: + Either the info dictionary for the given local media ID or + ``None``. If ``None``, then no further processing is necessary as + this function will send the necessary JSON response. + """ + wait_until = self.clock.time_msec() + max_timeout_ms + while True: + # Get the info for the media + media_info = await self.store.get_local_media(media_id) + if not media_info: + logger.info("Media %s is unknown", media_id) + respond_404(request) + return None + + if media_info.quarantined_by: + logger.info("Media %s is quarantined", media_id) + respond_404(request) + return None + + # The file has been uploaded, so stop looping + if media_info.media_length is not None: + return media_info + + # Check if the media ID has expired and still hasn't been uploaded to. + now = self.clock.time_msec() + expired_time_ms = now - self.unused_expiration_time + if media_info.created_ts < expired_time_ms: + logger.info("Media %s has expired without being uploaded", media_id) + respond_404(request) + return None + + if now >= wait_until: + break + + await self.clock.sleep(0.5) + + logger.info("Media %s has not yet been uploaded", media_id) + self.respond_not_yet_uploaded(request) + return None + async def get_local_media( - self, request: SynapseRequest, media_id: str, name: Optional[str] + self, + request: SynapseRequest, + media_id: str, + name: Optional[str], + max_timeout_ms: int, ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -242,13 +424,14 @@ class MediaRepository: the file_id for local content.) name: Optional name that, if specified, will be used as the filename in the Content-Disposition header of the response. + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: Resolves once a response has successfully been written to request """ - media_info = await self.store.get_local_media(media_id) - if not media_info or media_info.quarantined_by: - respond_404(request) + media_info = await self.get_local_media_info(request, media_id, max_timeout_ms) + if not media_info: return self.mark_recently_accessed(None, media_id) @@ -273,6 +456,7 @@ class MediaRepository: server_name: str, media_id: str, name: Optional[str], + max_timeout_ms: int, ) -> None: """Respond to requests for remote media. @@ -282,6 +466,8 @@ class MediaRepository: media_id: The media ID of the content (as defined by the remote server). name: Optional name that, if specified, will be used as the filename in the Content-Disposition header of the response. + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: Resolves once a response has successfully been written to request @@ -307,11 +493,11 @@ class MediaRepository: key = (server_name, media_id) async with self.remote_media_linearizer.queue(key): responder, media_info = await self._get_remote_media_impl( - server_name, media_id + server_name, media_id, max_timeout_ms ) # We deliberately stream the file outside the lock - if responder: + if responder and media_info: upload_name = name if name else media_info.upload_name await respond_with_responder( request, @@ -324,7 +510,7 @@ class MediaRepository: respond_404(request) async def get_remote_media_info( - self, server_name: str, media_id: str + self, server_name: str, media_id: str, max_timeout_ms: int ) -> RemoteMedia: """Gets the media info associated with the remote file, downloading if necessary. @@ -332,6 +518,8 @@ class MediaRepository: Args: server_name: Remote server_name where the media originated. media_id: The media ID of the content (as defined by the remote server). + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: The media info of the file @@ -347,7 +535,7 @@ class MediaRepository: key = (server_name, media_id) async with self.remote_media_linearizer.queue(key): responder, media_info = await self._get_remote_media_impl( - server_name, media_id + server_name, media_id, max_timeout_ms ) # Ensure we actually use the responder so that it releases resources @@ -358,7 +546,7 @@ class MediaRepository: return media_info async def _get_remote_media_impl( - self, server_name: str, media_id: str + self, server_name: str, media_id: str, max_timeout_ms: int ) -> Tuple[Optional[Responder], RemoteMedia]: """Looks for media in local cache, if not there then attempt to download from remote server. @@ -367,6 +555,8 @@ class MediaRepository: server_name: Remote server_name where the media originated. media_id: The media ID of the content (as defined by the remote server). + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: A tuple of responder and the media info of the file. @@ -399,8 +589,7 @@ class MediaRepository: try: media_info = await self._download_remote_file( - server_name, - media_id, + server_name, media_id, max_timeout_ms ) except SynapseError: raise @@ -433,6 +622,7 @@ class MediaRepository: self, server_name: str, media_id: str, + max_timeout_ms: int, ) -> RemoteMedia: """Attempt to download the remote file from the given server name, using the given file_id as the local id. @@ -442,7 +632,8 @@ class MediaRepository: media_id: The media ID of the content (as defined by the remote server). This is different than the file_id, which is locally generated. - file_id: Local file ID + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. Returns: The media info of the file. @@ -466,7 +657,8 @@ class MediaRepository: # tell the remote server to 404 if it doesn't # recognise the server_name, to make sure we don't # end up with a routing loop. - "allow_remote": "false" + "allow_remote": "false", + "timeout_ms": str(max_timeout_ms), }, ) except RequestSendFailed as e: diff --git a/synapse/rest/media/create_resource.py b/synapse/rest/media/create_resource.py new file mode 100644 index 0000000000..994afdf13c --- /dev/null +++ b/synapse/rest/media/create_resource.py @@ -0,0 +1,83 @@ +# Copyright 2023 Beeper Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +from typing import TYPE_CHECKING + +from synapse.api.errors import LimitExceededError +from synapse.api.ratelimiting import Ratelimiter +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet +from synapse.http.site import SynapseRequest + +if TYPE_CHECKING: + from synapse.media.media_repository import MediaRepository + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class CreateResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/v1/create")] + + def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): + super().__init__() + + self.media_repo = media_repo + self.clock = hs.get_clock() + self.auth = hs.get_auth() + self.max_pending_media_uploads = hs.config.media.max_pending_media_uploads + + # A rate limiter for creating new media IDs. + self._create_media_rate_limiter = Ratelimiter( + store=hs.get_datastores().main, + clock=self.clock, + cfg=hs.config.ratelimiting.rc_media_create, + ) + + async def on_POST(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + + # If the create media requests for the user are over the limit, drop them. + await self._create_media_rate_limiter.ratelimit(requester) + + ( + reached_pending_limit, + first_expiration_ts, + ) = await self.media_repo.reached_pending_media_limit(requester.user) + if reached_pending_limit: + raise LimitExceededError( + limiter_name="max_pending_media_uploads", + retry_after_ms=first_expiration_ts - self.clock.time_msec(), + ) + + content_uri, unused_expires_at = await self.media_repo.create_media_id( + requester.user + ) + + logger.info( + "Created Media URI %r that if unused will expire at %d", + content_uri, + unused_expires_at, + ) + respond_with_json( + request, + 200, + { + "content_uri": content_uri, + "unused_expires_at": unused_expires_at, + }, + send_cors=True, + ) diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 65b9ff52fa..60cd87548c 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -17,9 +17,13 @@ import re from typing import TYPE_CHECKING, Optional from synapse.http.server import set_corp_headers, set_cors_headers -from synapse.http.servlet import RestServlet, parse_boolean +from synapse.http.servlet import RestServlet, parse_boolean, parse_integer from synapse.http.site import SynapseRequest -from synapse.media._base import respond_404 +from synapse.media._base import ( + DEFAULT_MAX_TIMEOUT_MS, + MAXIMUM_ALLOWED_MAX_TIMEOUT_MS, + respond_404, +) from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: @@ -65,12 +69,16 @@ class DownloadResource(RestServlet): ) # Limited non-standard form of CSP for IE11 request.setHeader(b"X-Content-Security-Policy", b"sandbox;") - request.setHeader( - b"Referrer-Policy", - b"no-referrer", + request.setHeader(b"Referrer-Policy", b"no-referrer") + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + if self._is_mine_server_name(server_name): - await self.media_repo.get_local_media(request, media_id, file_name) + await self.media_repo.get_local_media( + request, media_id, file_name, max_timeout_ms + ) else: allow_remote = parse_boolean(request, "allow_remote", default=True) if not allow_remote: @@ -83,5 +91,5 @@ class DownloadResource(RestServlet): return await self.media_repo.get_remote_media( - request, server_name, media_id, file_name + request, server_name, media_id, file_name, max_timeout_ms ) diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py index 2089bb1029..ca65116b84 100644 --- a/synapse/rest/media/media_repository_resource.py +++ b/synapse/rest/media/media_repository_resource.py @@ -18,10 +18,11 @@ from synapse.config._base import ConfigError from synapse.http.server import HttpServer, JsonResource from .config_resource import MediaConfigResource +from .create_resource import CreateResource from .download_resource import DownloadResource from .preview_url_resource import PreviewUrlResource from .thumbnail_resource import ThumbnailResource -from .upload_resource import UploadResource +from .upload_resource import AsyncUploadServlet, UploadServlet if TYPE_CHECKING: from synapse.server import HomeServer @@ -91,8 +92,9 @@ class MediaRepositoryResource(JsonResource): # Note that many of these should not exist as v1 endpoints, but empirically # a lot of traffic still goes to them. - - UploadResource(hs, media_repo).register(http_server) + CreateResource(hs, media_repo).register(http_server) + UploadServlet(hs, media_repo).register(http_server) + AsyncUploadServlet(hs, media_repo).register(http_server) DownloadResource(hs, media_repo).register(http_server) ThumbnailResource(hs, media_repo, media_repo.media_storage).register( http_server diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index efda8b4ab4..681f2a5a27 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -23,6 +23,8 @@ from synapse.http.server import respond_with_json, set_corp_headers, set_cors_he from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media._base import ( + DEFAULT_MAX_TIMEOUT_MS, + MAXIMUM_ALLOWED_MAX_TIMEOUT_MS, FileInfo, ThumbnailInfo, respond_404, @@ -75,15 +77,19 @@ class ThumbnailResource(RestServlet): method = parse_string(request, "method", "scale") # TODO Parse the Accept header to get an prioritised list of thumbnail types. m_type = "image/png" + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) if self._is_mine_server_name(server_name): if self.dynamic_thumbnails: await self._select_or_generate_local_thumbnail( - request, media_id, width, height, method, m_type + request, media_id, width, height, method, m_type, max_timeout_ms ) else: await self._respond_local_thumbnail( - request, media_id, width, height, method, m_type + request, media_id, width, height, method, m_type, max_timeout_ms ) self.media_repo.mark_recently_accessed(None, media_id) else: @@ -95,14 +101,21 @@ class ThumbnailResource(RestServlet): respond_404(request) return - if self.dynamic_thumbnails: - await self._select_or_generate_remote_thumbnail( - request, server_name, media_id, width, height, method, m_type - ) - else: - await self._respond_remote_thumbnail( - request, server_name, media_id, width, height, method, m_type - ) + remote_resp_function = ( + self._select_or_generate_remote_thumbnail + if self.dynamic_thumbnails + else self._respond_remote_thumbnail + ) + await remote_resp_function( + request, + server_name, + media_id, + width, + height, + method, + m_type, + max_timeout_ms, + ) self.media_repo.mark_recently_accessed(server_name, media_id) async def _respond_local_thumbnail( @@ -113,15 +126,12 @@ class ThumbnailResource(RestServlet): height: int, method: str, m_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.store.get_local_media(media_id) - + media_info = await self.media_repo.get_local_media_info( + request, media_id, max_timeout_ms + ) if not media_info: - respond_404(request) - return - if media_info.quarantined_by: - logger.info("Media is quarantined") - respond_404(request) return thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) @@ -146,15 +156,13 @@ class ThumbnailResource(RestServlet): desired_height: int, desired_method: str, desired_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.store.get_local_media(media_id) + media_info = await self.media_repo.get_local_media_info( + request, media_id, max_timeout_ms + ) if not media_info: - respond_404(request) - return - if media_info.quarantined_by: - logger.info("Media is quarantined") - respond_404(request) return thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) @@ -206,8 +214,14 @@ class ThumbnailResource(RestServlet): desired_height: int, desired_method: str, desired_type: str, + max_timeout_ms: int, ) -> None: - media_info = await self.media_repo.get_remote_media_info(server_name, media_id) + media_info = await self.media_repo.get_remote_media_info( + server_name, media_id, max_timeout_ms + ) + if not media_info: + respond_404(request) + return thumbnail_infos = await self.store.get_remote_media_thumbnails( server_name, media_id @@ -263,11 +277,16 @@ class ThumbnailResource(RestServlet): height: int, method: str, m_type: str, + max_timeout_ms: int, ) -> None: # TODO: Don't download the whole remote file # We should proxy the thumbnail from the remote server instead of # downloading the remote file and generating our own thumbnails. - media_info = await self.media_repo.get_remote_media_info(server_name, media_id) + media_info = await self.media_repo.get_remote_media_info( + server_name, media_id, max_timeout_ms + ) + if not media_info: + return thumbnail_infos = await self.store.get_remote_media_thumbnails( server_name, media_id diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index 949326d85d..62d3e228a8 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -15,7 +15,7 @@ import logging import re -from typing import IO, TYPE_CHECKING, Dict, List, Optional +from typing import IO, TYPE_CHECKING, Dict, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError from synapse.http.server import respond_with_json @@ -29,23 +29,24 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# The name of the lock to use when uploading media. +_UPLOAD_MEDIA_LOCK_NAME = "upload_media" -class UploadResource(RestServlet): - PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")] +class BaseUploadServlet(RestServlet): def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() self.media_repo = media_repo self.filepaths = media_repo.filepaths self.store = hs.get_datastores().main - self.clock = hs.get_clock() + self.server_name = hs.hostname self.auth = hs.get_auth() self.max_upload_size = hs.config.media.max_upload_size - self.clock = hs.get_clock() - async def on_POST(self, request: SynapseRequest) -> None: - requester = await self.auth.get_user_by_req(request) + def _get_file_metadata( + self, request: SynapseRequest + ) -> Tuple[int, Optional[str], str]: raw_content_length = request.getHeader("Content-Length") if raw_content_length is None: raise SynapseError(msg="Request must specify a Content-Length", code=400) @@ -88,6 +89,16 @@ class UploadResource(RestServlet): # disposition = headers.getRawHeaders(b"Content-Disposition")[0] # TODO(markjh): parse content-dispostion + return content_length, upload_name, media_type + + +class UploadServlet(BaseUploadServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload$")] + + async def on_POST(self, request: SynapseRequest) -> None: + requester = await self.auth.get_user_by_req(request) + content_length, upload_name, media_type = self._get_file_metadata(request) + try: content: IO = request.content # type: ignore content_uri = await self.media_repo.create_content( @@ -103,3 +114,53 @@ class UploadResource(RestServlet): respond_with_json( request, 200, {"content_uri": str(content_uri)}, send_cors=True ) + + +class AsyncUploadServlet(BaseUploadServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/v3/upload/(?P[^/]*)/(?P[^/]*)$" + ) + ] + + async def on_PUT( + self, request: SynapseRequest, server_name: str, media_id: str + ) -> None: + requester = await self.auth.get_user_by_req(request) + + if server_name != self.server_name: + raise SynapseError( + 404, + "Non-local server name specified", + errcode=Codes.NOT_FOUND, + ) + + lock = await self.store.try_acquire_lock(_UPLOAD_MEDIA_LOCK_NAME, media_id) + if not lock: + raise SynapseError( + 409, + "Media ID cannot be overwritten", + errcode=Codes.CANNOT_OVERWRITE_MEDIA, + ) + + async with lock: + await self.media_repo.verify_can_upload(media_id, requester.user) + content_length, upload_name, media_type = self._get_file_metadata(request) + + try: + content: IO = request.content # type: ignore + await self.media_repo.update_content( + media_id, + media_type, + upload_name, + content, + content_length, + requester.user, + ) + except SpamMediaException: + # For uploading of media we want to respond with a 400, instead of + # the default 404, as that would just be confusing. + raise SynapseError(400, "Bad content") + + logger.info("Uploaded content for media ID %r", media_id) + respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 3f80a64dc5..149135b8b5 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -49,13 +49,14 @@ BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = ( class LocalMedia: media_id: str media_type: str - media_length: int + media_length: Optional[int] upload_name: str created_ts: int url_cache: Optional[str] last_access_ts: int quarantined_by: Optional[str] safe_from_quarantine: bool + user_id: Optional[str] @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -149,6 +150,13 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): self._drop_media_index_without_method, ) + if hs.config.media.can_load_media_repo: + self.unused_expiration_time: Optional[ + int + ] = hs.config.media.unused_expiration_time + else: + self.unused_expiration_time = None + async def _drop_media_index_without_method( self, progress: JsonDict, batch_size: int ) -> int: @@ -202,6 +210,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "url_cache", "last_access_ts", "safe_from_quarantine", + "user_id", ), allow_none=True, desc="get_local_media", @@ -218,6 +227,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): url_cache=row[5], last_access_ts=row[6], safe_from_quarantine=row[7], + user_id=row[8], ) async def get_local_media_by_user_paginate( @@ -272,7 +282,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): url_cache, last_access_ts, quarantined_by, - safe_from_quarantine + safe_from_quarantine, + user_id FROM local_media_repository WHERE user_id = ? ORDER BY {order_by_column} {order}, media_id ASC @@ -295,6 +306,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): last_access_ts=row[6], quarantined_by=row[7], safe_from_quarantine=bool(row[8]), + user_id=row[9], ) for row in txn ] @@ -391,6 +403,23 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "get_local_media_ids", _get_local_media_ids_txn ) + @trace + async def store_local_media_id( + self, + media_id: str, + time_now_ms: int, + user_id: UserID, + ) -> None: + await self.db_pool.simple_insert( + "local_media_repository", + { + "media_id": media_id, + "created_ts": time_now_ms, + "user_id": user_id.to_string(), + }, + desc="store_local_media_id", + ) + @trace async def store_local_media( self, @@ -416,6 +445,30 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_local_media", ) + async def update_local_media( + self, + media_id: str, + media_type: str, + upload_name: Optional[str], + media_length: int, + user_id: UserID, + url_cache: Optional[str] = None, + ) -> None: + await self.db_pool.simple_update_one( + "local_media_repository", + keyvalues={ + "user_id": user_id.to_string(), + "media_id": media_id, + }, + updatevalues={ + "media_type": media_type, + "upload_name": upload_name, + "media_length": media_length, + "url_cache": url_cache, + }, + desc="update_local_media", + ) + async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None: """Mark a local media as safe or unsafe from quarantining.""" await self.db_pool.simple_update_one( @@ -425,6 +478,39 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="mark_local_media_as_safe", ) + async def count_pending_media(self, user_id: UserID) -> Tuple[int, int]: + """Count the number of pending media for a user. + + Returns: + A tuple of two integers: the total pending media requests and the earliest + expiration timestamp. + """ + + def get_pending_media_txn(txn: LoggingTransaction) -> Tuple[int, int]: + sql = """ + SELECT COUNT(*), MIN(created_ts) + FROM local_media_repository + WHERE user_id = ? + AND created_ts > ? + AND media_length IS NULL + """ + assert self.unused_expiration_time is not None + txn.execute( + sql, + ( + user_id.to_string(), + self._clock.time_msec() - self.unused_expiration_time, + ), + ) + row = txn.fetchone() + if not row: + return 0, 0 + return row[0], (row[1] + self.unused_expiration_time if row[1] else 0) + + return await self.db_pool.runInteraction( + "get_pending_media", get_pending_media_txn + ) + async def get_url_cache(self, url: str, ts: int) -> Optional[UrlCache]: """Get the media_id and ts for a cached URL as of the given timestamp Returns: diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index a8e7a76b29..f262304c3d 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -318,7 +318,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): self.assertEqual( self.fetches[0][2], "/_matrix/media/r0/download/" + self.media_id ) - self.assertEqual(self.fetches[0][3], {"allow_remote": "false"}) + self.assertEqual( + self.fetches[0][3], {"allow_remote": "false", "timeout_ms": "20000"} + ) headers = { b"Content-Length": [b"%d" % (len(self.test_image.data))], -- cgit 1.5.1 From 43d1aa75e8cbf9d522b425d51d5ac1a742b59ffb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 15 Nov 2023 17:28:10 +0000 Subject: Add an Admin API to temporarily grant the ability to update an existing cross-signing key without UIA (#16634) --- changelog.d/16634.misc | 1 + docs/admin_api/user_admin_api.md | 37 ++++ synapse/handlers/e2e_keys.py | 20 ++- synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/users.py | 40 +++++ synapse/rest/client/keys.py | 16 +- synapse/storage/databases/main/end_to_end_keys.py | 84 +++++++++ .../delta/83/05_cross_signing_key_update_grant.sql | 15 ++ tests/handlers/test_e2e_keys.py | 47 ++++++ tests/rest/admin/test_user.py | 56 ++++++ tests/rest/client/test_keys.py | 188 ++++++++++++++++++++- .../storage/databases/main/test_end_to_end_keys.py | 121 +++++++++++++ 12 files changed, 613 insertions(+), 14 deletions(-) create mode 100644 changelog.d/16634.misc create mode 100644 synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql create mode 100644 tests/storage/databases/main/test_end_to_end_keys.py (limited to 'synapse/storage/databases') diff --git a/changelog.d/16634.misc b/changelog.d/16634.misc new file mode 100644 index 0000000000..f81cf39691 --- /dev/null +++ b/changelog.d/16634.misc @@ -0,0 +1 @@ +Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA. diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index b91848dd27..66089c634b 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -773,6 +773,43 @@ Note: The token will expire if the *admin* user calls `/logout/all` from any of their devices, but the token will *not* expire if the target user does the same. +## Allow replacing master cross-signing key without User-Interactive Auth + +This endpoint is not intended for server administrator usage; +we describe it here for completeness. + +This API temporarily permits a user to replace their master cross-signing key +without going through +[user-interactive authentication](https://spec.matrix.org/v1.8/client-server-api/#user-interactive-authentication-api) (UIA). +This is useful when Synapse has delegated its authentication to the +[Matrix Authentication Service](https://github.com/matrix-org/matrix-authentication-service/); +as Synapse cannot perform UIA is not possible in these circumstances. + +The API is + +```http request +POST /_synapse/admin/v1/users//_allow_cross_signing_replacement_without_uia +{} +``` + +If the user does not exist, or does exist but has no master cross-signing key, +this will return with status code `404 Not Found`. + +Otherwise, a response body like the following is returned, with status `200 OK`: + +```json +{ + "updatable_without_uia_before_ms": 1234567890 +} +``` + +The response body is a JSON object with a single field: + +- `updatable_without_uia_before_ms`: integer. The timestamp in milliseconds + before which the user is permitted to replace their cross-signing key without + going through UIA. + +_Added in Synapse 1.97.0._ ## User devices diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d06524495f..70fa931d17 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1450,19 +1450,25 @@ class E2eKeysHandler: return desired_key_data - async def is_cross_signing_set_up_for_user(self, user_id: str) -> bool: + async def check_cross_signing_setup(self, user_id: str) -> Tuple[bool, bool]: """Checks if the user has cross-signing set up Args: user_id: The user to check - Returns: - True if the user has cross-signing set up, False otherwise + Returns: a 2-tuple of booleans + - whether the user has cross-signing set up, and + - whether the user's master cross-signing key may be replaced without UIA. """ - existing_master_key = await self.store.get_e2e_cross_signing_key( - user_id, "master" - ) - return existing_master_key is not None + ( + exists, + ts_replacable_without_uia_before, + ) = await self.store.get_master_cross_signing_key_updatable_before(user_id) + + if ts_replacable_without_uia_before is None: + return exists, False + else: + return exists, self.clock.time_msec() < ts_replacable_without_uia_before def _check_cross_signing_key( diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 9bd0d764f8..91edfd45d7 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -88,6 +88,7 @@ from synapse.rest.admin.users import ( UserByThreePid, UserMembershipRestServlet, UserRegisterServlet, + UserReplaceMasterCrossSigningKeyRestServlet, UserRestServletV2, UsersRestServletV2, UserTokenRestServlet, @@ -292,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ListDestinationsRestServlet(hs).register(http_server) RoomMessagesRestServlet(hs).register(http_server) RoomTimestampToEventRestServlet(hs).register(http_server) + UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server) UserByExternalId(hs).register(http_server) UserByThreePid(hs).register(http_server) diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 73878dd99d..9900498fbe 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -1270,6 +1270,46 @@ class AccountDataRestServlet(RestServlet): } +class UserReplaceMasterCrossSigningKeyRestServlet(RestServlet): + """Allow a given user to replace their master cross-signing key without UIA. + + This replacement is permitted for a limited period (currently 10 minutes). + + While this is exposed via the admin API, this is intended for use by the + Matrix Authentication Service rather than server admins. + """ + + PATTERNS = admin_patterns( + "/users/(?P[^/]*)/_allow_cross_signing_replacement_without_uia" + ) + REPLACEMENT_PERIOD_MS = 10 * 60 * 1000 # 10 minutes + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastores().main + + async def on_POST( + self, + request: SynapseRequest, + user_id: str, + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + if user_id is None: + raise NotFoundError("User not found") + + timestamp = ( + await self._store.allow_master_cross_signing_key_replacement_without_uia( + user_id, self.REPLACEMENT_PERIOD_MS + ) + ) + + if timestamp is None: + raise NotFoundError("User has no master cross-signing key") + + return HTTPStatus.OK, {"updatable_without_uia_before_ms": timestamp} + + class UserByExternalId(RestServlet): """Find a user based on an external ID from an auth provider""" diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py index 70b8be1aa2..add8045439 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py @@ -376,9 +376,10 @@ class SigningKeyUploadServlet(RestServlet): user_id = requester.user.to_string() body = parse_json_object_from_request(request) - is_cross_signing_setup = ( - await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id) - ) + ( + is_cross_signing_setup, + master_key_updatable_without_uia, + ) = await self.e2e_keys_handler.check_cross_signing_setup(user_id) # Before MSC3967 we required UIA both when setting up cross signing for the # first time and when resetting the device signing key. With MSC3967 we only @@ -386,9 +387,14 @@ class SigningKeyUploadServlet(RestServlet): # time. Because there is no UIA in MSC3861, for now we throw an error if the # user tries to reset the device signing key when MSC3861 is enabled, but allow # first-time setup. + # + # XXX: We now have a get-out clause by which MAS can temporarily mark the master + # key as replaceable. It should do its own equivalent of user interactive auth + # before doing so. if self.hs.config.experimental.msc3861.enabled: - # There is no way to reset the device signing key with MSC3861 - if is_cross_signing_setup: + # The auth service has to explicitly mark the master key as replaceable + # without UIA to reset the device signing key with MSC3861. + if is_cross_signing_setup and not master_key_updatable_without_uia: raise SynapseError( HTTPStatus.NOT_IMPLEMENTED, "Resetting cross signing keys is not yet supported with MSC3861", diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 8cb61eaee3..9e98729330 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return otk_rows + async def get_master_cross_signing_key_updatable_before( + self, user_id: str + ) -> Tuple[bool, Optional[int]]: + """Get time before which a master cross-signing key may be replaced without UIA. + + (UIA means "User-Interactive Auth".) + + There are three cases to distinguish: + (1) No master cross-signing key. + (2) The key exists, but there is no replace-without-UI timestamp in the DB. + (3) The key exists, and has such a timestamp recorded. + + Returns: a 2-tuple of: + - a boolean: is there a master cross-signing key already? + - an optional timestamp, directly taken from the DB. + + In terms of the cases above, these are: + (1) (False, None). + (2) (True, None). + (3) (True, ). + + """ + + def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]: + # We want to distinguish between three cases: + txn.execute( + """ + SELECT updatable_without_uia_before_ms + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + """, + (user_id,), + ) + row = cast(Optional[Tuple[Optional[int]]], txn.fetchone()) + if row is None: + return False, None + return True, row[0] + + return await self.db_pool.runInteraction( + "e2e_cross_signing_keys", + impl, + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def __init__( @@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ], desc="add_e2e_signing_key", ) + + async def allow_master_cross_signing_key_replacement_without_uia( + self, user_id: str, duration_ms: int + ) -> Optional[int]: + """Mark this user's latest master key as being replaceable without UIA. + + Said replacement will only be permitted for a short time after calling this + function. That time period is controlled by the duration argument. + + Returns: + None, if there is no such key. + Otherwise, the timestamp before which replacement is allowed without UIA. + """ + timestamp = self._clock.time_msec() + duration_ms + + def impl(txn: LoggingTransaction) -> Optional[int]: + txn.execute( + """ + UPDATE e2e_cross_signing_keys + SET updatable_without_uia_before_ms = ? + WHERE stream_id = ( + SELECT stream_id + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + ) + """, + (timestamp, user_id), + ) + if txn.rowcount == 0: + return None + + return timestamp + + return await self.db_pool.runInteraction( + "allow_master_cross_signing_key_replacement_without_uia", + impl, + ) diff --git a/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql new file mode 100644 index 0000000000..b74bdd71fa --- /dev/null +++ b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql @@ -0,0 +1,15 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL; \ No newline at end of file diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 90b4da9ad5..07eb63f95e 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -1602,3 +1602,50 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): } }, ) + + def test_check_cross_signing_setup(self) -> None: + # First check what happens with no master key. + alice = "@alice:test" + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, False) + self.assertIs(replaceable_without_uia, False) + + # Upload a master key but don't specify a replacement timestamp. + dummy_key = {"keys": {"a": "b"}} + self.get_success( + self.store.set_e2e_cross_signing_key("@alice:test", "master", dummy_key) + ) + + # Should now find the key exists. + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, True) + self.assertIs(replaceable_without_uia, False) + + # Set an expiry timestamp in the future. + self.get_success( + self.store.allow_master_cross_signing_key_replacement_without_uia( + alice, + 1000, + ) + ) + + # Should now be allowed to replace the key without UIA. + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, True) + self.assertIs(replaceable_without_uia, True) + + # Wait 2 seconds, so that the timestamp is in the past. + self.reactor.advance(2.0) + + # Should no longer be allowed to replace the key without UIA. + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, True) + self.assertIs(replaceable_without_uia, False) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 492adb6160..cf71bbb461 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -4854,3 +4854,59 @@ class UsersByThreePidTestCase(unittest.HomeserverTestCase): {"user_id": self.other_user}, channel.json_body, ) + + +class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + ] + + @staticmethod + def url(user: str) -> str: + template = ( + "/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia" + ) + return template.format(urllib.parse.quote(user)) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.other_user = self.register_user("user", "pass") + + def test_error_cases(self) -> None: + fake_user = "@bums:other" + channel = self.make_request( + "POST", self.url(fake_user), access_token=self.admin_user_tok + ) + # Fail: user doesn't exist + self.assertEqual(404, channel.code, msg=channel.json_body) + + channel = self.make_request( + "POST", self.url(self.other_user), access_token=self.admin_user_tok + ) + # Fail: user exists, but has no master cross-signing key + self.assertEqual(404, channel.code, msg=channel.json_body) + + def test_success(self) -> None: + # Upload a master key. + dummy_key = {"keys": {"a": "b"}} + self.get_success( + self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key) + ) + + channel = self.make_request( + "POST", self.url(self.other_user), access_token=self.admin_user_tok + ) + # Success! + self.assertEqual(200, channel.code, msg=channel.json_body) + + # Should now find that the key exists. + _, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(self.other_user) + ) + assert timestamp is not None + self.assertGreater(timestamp, self.clock.time_msec()) diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py index 8ee5489057..9f81a695fa 100644 --- a/tests/rest/client/test_keys.py +++ b/tests/rest/client/test_keys.py @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License - +import urllib.parse from http import HTTPStatus +from unittest.mock import patch from signedjson.key import ( encode_verify_key_base64, @@ -24,12 +25,19 @@ from signedjson.sign import sign_json from synapse.api.errors import Codes from synapse.rest import admin from synapse.rest.client import keys, login -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester, create_requester from tests import unittest from tests.http.server._base import make_request_with_cancellation_test from tests.unittest import override_config +try: + import authlib # noqa: F401 + + HAS_AUTHLIB = True +except ImportError: + HAS_AUTHLIB = False + class KeyQueryTestCase(unittest.HomeserverTestCase): servlets = [ @@ -259,3 +267,179 @@ class KeyQueryTestCase(unittest.HomeserverTestCase): alice_token, ) self.assertEqual(channel.code, HTTPStatus.OK, channel.result) + + +class SigningKeyUploadServletTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + keys.register_servlets, + ] + + OIDC_ADMIN_TOKEN = "_oidc_admin_token" + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "enable_registration": False, + "experimental_features": { + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "account_management_url": "https://my-account.issuer", + "client_id": "id", + "client_auth_method": "client_secret_post", + "client_secret": "secret", + "admin_token": OIDC_ADMIN_TOKEN, + }, + }, + } + ) + def test_master_cross_signing_key_replacement_msc3861(self) -> None: + # Provision a user like MAS would, cribbing from + # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L224-L229 + alice = "@alice:test" + channel = self.make_request( + "PUT", + f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}", + access_token=self.OIDC_ADMIN_TOKEN, + content={}, + ) + self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body) + + # Provision a device like MAS would, cribbing from + # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L260-L262 + alice_device = "alice_device" + channel = self.make_request( + "POST", + f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}/devices", + access_token=self.OIDC_ADMIN_TOKEN, + content={"device_id": alice_device}, + ) + self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body) + + # Prepare a mock MAS access token. + alice_token = "alice_token_1234_oidcwhatyoudidthere" + + async def mocked_get_user_by_access_token( + token: str, allow_expired: bool = False + ) -> Requester: + self.assertEqual(token, alice_token) + return create_requester( + user_id=alice, + device_id=alice_device, + scope=[], + is_guest=False, + ) + + patch_get_user_by_access_token = patch.object( + self.hs.get_auth(), + "get_user_by_access_token", + wraps=mocked_get_user_by_access_token, + ) + + # Copied from E2eKeysHandlerTestCase + master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk" + master_pubkey2 = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM" + master_pubkey3 = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY" + + master_key: JsonDict = { + "user_id": alice, + "usage": ["master"], + "keys": {"ed25519:" + master_pubkey: master_pubkey}, + } + master_key2: JsonDict = { + "user_id": alice, + "usage": ["master"], + "keys": {"ed25519:" + master_pubkey2: master_pubkey2}, + } + master_key3: JsonDict = { + "user_id": alice, + "usage": ["master"], + "keys": {"ed25519:" + master_pubkey3: master_pubkey3}, + } + + with patch_get_user_by_access_token: + # Upload an initial cross-signing key. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key, + }, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # Should not be able to upload another master key. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key2, + }, + ) + self.assertEqual( + channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body + ) + + # Pretend that MAS did UIA and allowed us to replace the master key. + channel = self.make_request( + "POST", + f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia", + access_token=self.OIDC_ADMIN_TOKEN, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + + with patch_get_user_by_access_token: + # Should now be able to upload master key2. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key2, + }, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # Even though we're still in the grace period, we shouldn't be able to + # upload master key 3 immediately after uploading key 2. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key3, + }, + ) + self.assertEqual( + channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body + ) + + # Pretend that MAS did UIA and allowed us to replace the master key. + channel = self.make_request( + "POST", + f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia", + access_token=self.OIDC_ADMIN_TOKEN, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + timestamp_ms = channel.json_body["updatable_without_uia_before_ms"] + + # Advance to 1 second after the replacement period ends. + self.reactor.advance(timestamp_ms - self.clock.time_msec() + 1000) + + with patch_get_user_by_access_token: + # We should not be able to upload master key3 because the replacement has + # expired. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key3, + }, + ) + self.assertEqual( + channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body + ) diff --git a/tests/storage/databases/main/test_end_to_end_keys.py b/tests/storage/databases/main/test_end_to_end_keys.py new file mode 100644 index 0000000000..23e6f82c75 --- /dev/null +++ b/tests/storage/databases/main/test_end_to_end_keys.py @@ -0,0 +1,121 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List, Optional, Tuple + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.server import HomeServer +from synapse.storage._base import db_to_json +from synapse.storage.database import LoggingTransaction +from synapse.types import JsonDict +from synapse.util import Clock + +from tests.unittest import HomeserverTestCase + + +class EndToEndKeyWorkerStoreTestCase(HomeserverTestCase): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + def test_get_master_cross_signing_key_updatable_before(self) -> None: + # Should return False, None when there is no master key. + alice = "@alice:test" + exists, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(alice) + ) + self.assertIs(exists, False) + self.assertIsNone(timestamp) + + # Upload a master key. + dummy_key = {"keys": {"a": "b"}} + self.get_success( + self.store.set_e2e_cross_signing_key(alice, "master", dummy_key) + ) + + # Should now find that the key exists. + exists, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(alice) + ) + self.assertIs(exists, True) + self.assertIsNone(timestamp) + + # Write an updateable_before timestamp. + written_timestamp = self.get_success( + self.store.allow_master_cross_signing_key_replacement_without_uia( + alice, 1000 + ) + ) + + # Should now find that the key exists. + exists, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(alice) + ) + self.assertIs(exists, True) + self.assertEqual(timestamp, written_timestamp) + + def test_master_replacement_only_applies_to_latest_master_key( + self, + ) -> None: + """We shouldn't allow updates w/o UIA to old master keys or other key types.""" + alice = "@alice:test" + # Upload two master keys. + key1 = {"keys": {"a": "b"}} + key2 = {"keys": {"c": "d"}} + key3 = {"keys": {"e": "f"}} + self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key1)) + self.get_success(self.store.set_e2e_cross_signing_key(alice, "other", key2)) + self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key3)) + + # Third key should be the current one. + key = self.get_success( + self.store.get_e2e_cross_signing_key(alice, "master", alice) + ) + self.assertEqual(key, key3) + + timestamp = self.get_success( + self.store.allow_master_cross_signing_key_replacement_without_uia( + alice, 1000 + ) + ) + assert timestamp is not None + + def check_timestamp_column( + txn: LoggingTransaction, + ) -> List[Tuple[JsonDict, Optional[int]]]: + """Fetch all rows for Alice's keys.""" + txn.execute( + """ + SELECT keydata, updatable_without_uia_before_ms + FROM e2e_cross_signing_keys + WHERE user_id = ? + ORDER BY stream_id ASC; + """, + (alice,), + ) + return [(db_to_json(keydata), ts) for keydata, ts in txn.fetchall()] + + values = self.get_success( + self.store.db_pool.runInteraction( + "check_timestamp_column", + check_timestamp_column, + ) + ) + self.assertEqual( + values, + [ + (key1, None), + (key2, None), + (key3, timestamp), + ], + ) -- cgit 1.5.1 From 3e8531d3baf205733693f9ae8b43aa0b4c82b744 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 15:19:35 +0000 Subject: Speed up deleting device messages (#16643) Keeping track of a lower bound of stream ID where we've deleted everything below makes the queries much faster. Otherwise, every time we scan for rows to delete we'd re-scan across all the rows that have previously deleted (until the next table VACUUM). --- changelog.d/16643.misc | 1 + synapse/handlers/device.py | 8 +- synapse/storage/databases/main/deviceinbox.py | 106 ++++++++++++++++++++------ synapse/util/task_scheduler.py | 2 +- 4 files changed, 88 insertions(+), 29 deletions(-) create mode 100644 changelog.d/16643.misc (limited to 'synapse/storage/databases') diff --git a/changelog.d/16643.misc b/changelog.d/16643.misc new file mode 100644 index 0000000000..cc0cf0901f --- /dev/null +++ b/changelog.d/16643.misc @@ -0,0 +1 @@ +Speed up deleting of device messages when deleting a device. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 93472d0117..1af6d77545 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -396,15 +396,17 @@ class DeviceWorkerHandler: up_to_stream_id = task.params["up_to_stream_id"] # Delete the messages in batches to avoid too much DB load. + from_stream_id = None while True: - res = await self.store.delete_messages_for_device( + from_stream_id, _ = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, - up_to_stream_id=up_to_stream_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + if from_stream_id is None: return TaskStatus.COMPLETE, None, None await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3e7425d4a6..02dddd1da4 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: Optional[int] = None, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. - limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - limit_statement = "" if limit is None else f"LIMIT {limit}" - sql = f""" - DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( - SELECT MAX(stream_id) FROM ( - SELECT stream_id FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - ORDER BY stream_id - {limit_statement} - ) AS q1 - ) - """ - txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = await self.db_pool.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) + from_stream_id = None + count = 0 + while True: + from_stream_id, loop_count = await self.delete_messages_for_device_between( + user_id, + device_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, + limit=1000, + ) + count += loop_count + if from_stream_id is None: + break log_kv({"message": f"deleted {count} messages for device", "count": count}) - # In this case we don't know if we hit the limit or the delete is complete - # so let's not update the cache. - if count == limit: - return count - # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -514,6 +502,74 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count + @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Tuple[Optional[int], int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted (or None if all messages in the range have + been deleted) and the number of messages deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Tuple[Optional[int], int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + row = txn.fetchone() + if row is None or row[0] is None: + return None, 0 + + (max_stream_id,) = row + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + num_deleted = txn.rowcount + if num_deleted < limit: + return None, num_deleted + + return max_stream_id, num_deleted + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index caf13b3474..29c561e555 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -193,7 +193,7 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse. -- cgit 1.5.1