diff options
33 files changed, 390 insertions, 196 deletions
diff --git a/changelog.d/14255.misc b/changelog.d/14255.misc new file mode 100644 index 0000000000..39924659c7 --- /dev/null +++ b/changelog.d/14255.misc @@ -0,0 +1 @@ +Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar). diff --git a/changelog.d/14490.feature b/changelog.d/14490.feature new file mode 100644 index 0000000000..c7cb571294 --- /dev/null +++ b/changelog.d/14490.feature @@ -0,0 +1 @@ +Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`. diff --git a/changelog.d/14517.doc b/changelog.d/14517.doc new file mode 100644 index 0000000000..2c9de68971 --- /dev/null +++ b/changelog.d/14517.doc @@ -0,0 +1 @@ +Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages. diff --git a/changelog.d/14525.feature b/changelog.d/14525.feature new file mode 100644 index 0000000000..c7cb571294 --- /dev/null +++ b/changelog.d/14525.feature @@ -0,0 +1 @@ +Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`. diff --git a/changelog.d/14549.misc b/changelog.d/14549.misc new file mode 100644 index 0000000000..d9d863dd20 --- /dev/null +++ b/changelog.d/14549.misc @@ -0,0 +1 @@ +Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room. \ No newline at end of file diff --git a/changelog.d/14568.misc b/changelog.d/14568.misc new file mode 100644 index 0000000000..99973de1c1 --- /dev/null +++ b/changelog.d/14568.misc @@ -0,0 +1 @@ +Modernize unit tests configuration related to workers. diff --git a/changelog.d/14576.feature b/changelog.d/14576.feature new file mode 100644 index 0000000000..4fe8cb2667 --- /dev/null +++ b/changelog.d/14576.feature @@ -0,0 +1 @@ +Advertise support for Matrix 1.5 on `/_matrix/client/versions`. diff --git a/docs/setup/installation.md b/docs/setup/installation.md index dcd8f17c5e..436041f8a8 100644 --- a/docs/setup/installation.md +++ b/docs/setup/installation.md @@ -84,7 +84,9 @@ file when you upgrade the Debian package to a later version. ##### Downstream Debian packages -Andrej Shadura maintains a `matrix-synapse` package in the Debian repositories. +Andrej Shadura maintains a +[`matrix-synapse`](https://packages.debian.org/sid/matrix-synapse) package in +the Debian repositories. For `bookworm` and `sid`, it can be installed simply with: ```sh @@ -100,23 +102,27 @@ for information on how to use backports. ##### Downstream Ubuntu packages We do not recommend using the packages in the default Ubuntu repository -at this time, as they are old and suffer from known security vulnerabilities. +at this time, as they are [old and suffer from known security vulnerabilities]( + https://bugs.launchpad.net/ubuntu/+source/matrix-synapse/+bug/1848709 +). The latest version of Synapse can be installed from [our repository](#matrixorg-packages). #### Fedora -Synapse is in the Fedora repositories as `matrix-synapse`: +Synapse is in the Fedora repositories as +[`matrix-synapse`](https://src.fedoraproject.org/rpms/matrix-synapse): ```sh sudo dnf install matrix-synapse ``` -Oleg Girko provides Fedora RPMs at +Additionally, Oleg Girko provides Fedora RPMs at <https://obs.infoserver.lv/project/monitor/matrix-synapse> #### OpenSUSE -Synapse is in the OpenSUSE repositories as `matrix-synapse`: +Synapse is in the OpenSUSE repositories as +[`matrix-synapse`](https://software.opensuse.org/package/matrix-synapse): ```sh sudo zypper install matrix-synapse @@ -151,7 +157,8 @@ sudo pip install py-bcrypt #### Void Linux -Synapse can be found in the void repositories as 'synapse': +Synapse can be found in the void repositories as +['synapse'](https://github.com/void-linux/void-packages/tree/master/srcpkgs/synapse): ```sh xbps-install -Su diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index ed15f88350..69310d9035 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -14,7 +14,6 @@ import abc import logging -import urllib from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple import attr @@ -813,31 +812,27 @@ class ServerKeyFetcher(BaseV2KeyFetcher): results = {} - async def get_key(key_to_fetch_item: _FetchKeyRequest) -> None: + async def get_keys(key_to_fetch_item: _FetchKeyRequest) -> None: server_name = key_to_fetch_item.server_name - key_ids = key_to_fetch_item.key_ids try: - keys = await self.get_server_verify_key_v2_direct(server_name, key_ids) + keys = await self.get_server_verify_keys_v2_direct(server_name) results[server_name] = keys except KeyLookupError as e: - logger.warning( - "Error looking up keys %s from %s: %s", key_ids, server_name, e - ) + logger.warning("Error looking up keys from %s: %s", server_name, e) except Exception: - logger.exception("Error getting keys %s from %s", key_ids, server_name) + logger.exception("Error getting keys from %s", server_name) - await yieldable_gather_results(get_key, keys_to_fetch) + await yieldable_gather_results(get_keys, keys_to_fetch) return results - async def get_server_verify_key_v2_direct( - self, server_name: str, key_ids: Iterable[str] + async def get_server_verify_keys_v2_direct( + self, server_name: str ) -> Dict[str, FetchKeyResult]: """ Args: - server_name: - key_ids: + server_name: Server to request keys from Returns: Map from key ID to lookup result @@ -845,57 +840,41 @@ class ServerKeyFetcher(BaseV2KeyFetcher): Raises: KeyLookupError if there was a problem making the lookup """ - keys: Dict[str, FetchKeyResult] = {} - - for requested_key_id in key_ids: - # we may have found this key as a side-effect of asking for another. - if requested_key_id in keys: - continue - - time_now_ms = self.clock.time_msec() - try: - response = await self.client.get_json( - destination=server_name, - path="/_matrix/key/v2/server/" - + urllib.parse.quote(requested_key_id, safe=""), - ignore_backoff=True, - # we only give the remote server 10s to respond. It should be an - # easy request to handle, so if it doesn't reply within 10s, it's - # probably not going to. - # - # Furthermore, when we are acting as a notary server, we cannot - # wait all day for all of the origin servers, as the requesting - # server will otherwise time out before we can respond. - # - # (Note that get_json may make 4 attempts, so this can still take - # almost 45 seconds to fetch the headers, plus up to another 60s to - # read the response). - timeout=10000, - ) - except (NotRetryingDestination, RequestSendFailed) as e: - # these both have str() representations which we can't really improve - # upon - raise KeyLookupError(str(e)) - except HttpResponseException as e: - raise KeyLookupError("Remote server returned an error: %s" % (e,)) - - assert isinstance(response, dict) - if response["server_name"] != server_name: - raise KeyLookupError( - "Expected a response for server %r not %r" - % (server_name, response["server_name"]) - ) - - response_keys = await self.process_v2_response( - from_server=server_name, - response_json=response, - time_added_ms=time_now_ms, + time_now_ms = self.clock.time_msec() + try: + response = await self.client.get_json( + destination=server_name, + path="/_matrix/key/v2/server", + ignore_backoff=True, + # we only give the remote server 10s to respond. It should be an + # easy request to handle, so if it doesn't reply within 10s, it's + # probably not going to. + # + # Furthermore, when we are acting as a notary server, we cannot + # wait all day for all of the origin servers, as the requesting + # server will otherwise time out before we can respond. + # + # (Note that get_json may make 4 attempts, so this can still take + # almost 45 seconds to fetch the headers, plus up to another 60s to + # read the response). + timeout=10000, ) - await self.store.store_server_verify_keys( - server_name, - time_now_ms, - ((server_name, key_id, key) for key_id, key in response_keys.items()), + except (NotRetryingDestination, RequestSendFailed) as e: + # these both have str() representations which we can't really improve + # upon + raise KeyLookupError(str(e)) + except HttpResponseException as e: + raise KeyLookupError("Remote server returned an error: %s" % (e,)) + + assert isinstance(response, dict) + if response["server_name"] != server_name: + raise KeyLookupError( + "Expected a response for server %r not %r" + % (server_name, response["server_name"]) ) - keys.update(response_keys) - return keys + return await self.process_v2_response( + from_server=server_name, + response_json=response, + time_added_ms=time_now_ms, + ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index fc1d8c88a7..30ebd62883 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender): room_id = receipt.room_id # Work out which remote servers should be poked and poke them. - domains_set = await self._storage_controllers.state.get_current_hosts_in_room( + domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) domains = [ diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index edeba27a45..7ee07e4bee 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,7 +17,6 @@ from synapse.events import EventBase from synapse.push.presentable_names import calculate_room_name, name_from_member_event from synapse.storage.controllers import StorageControllers from synapse.storage.databases.main import DataStore -from synapse.util.async_helpers import concurrently_execute async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int: @@ -26,23 +25,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - badge = len(invites) - room_notifs = [] - - async def get_room_unread_count(room_id: str) -> None: - room_notifs.append( - await store.get_unread_event_push_actions_by_room_for_user( - room_id, - user_id, - ) - ) - - await concurrently_execute(get_room_unread_count, joins, 10) - - for notifs in room_notifs: - # Combine the counts from all the threads. - notify_count = notifs.main_timeline.notify_count + sum( - n.notify_count for n in notifs.threads.values() - ) + room_to_count = await store.get_unread_counts_by_room_for_user(user_id) + for room_id, notify_count in room_to_count.items(): + # room_to_count may include rooms which the user has left, + # ignore those. + if room_id not in joins: + continue if notify_count == 0: continue @@ -51,8 +39,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) - # return one badge count per conversation badge += 1 else: - # increment the badge count by the number of unread messages in the room + # Increase badge by number of notifications in room + # NOTE: this includes threaded and unthreaded notifications. badge += notify_count + return badge diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 3c0a90010b..e19c0946c0 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -77,6 +77,7 @@ class VersionsRestServlet(RestServlet): "v1.2", "v1.3", "v1.4", + "v1.5", ], # as per MSC1497: "unstable_features": { diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b283ab0f9c..7ebe34f773 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -74,6 +74,7 @@ receipt. """ import logging +from collections import defaultdict from typing import ( TYPE_CHECKING, Collection, @@ -95,6 +96,7 @@ from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + PostgresEngine, ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -463,6 +465,153 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas return result + async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: + """Get the notification count by room for a user. Only considers notifications, + not highlight or unread counts, and threads are currently aggregated under their room. + + This function is intentionally not cached because it is called to calculate the + unread badge for push notifications and thus the result is expected to change. + + Note that this function assumes the user is a member of the room. Because + summary rows are not removed when a user leaves a room, the caller must + filter out those results from the result. + + Returns: + A map of room ID to notification counts for the given user. + """ + return await self.db_pool.runInteraction( + "get_unread_counts_by_room_for_user", + self._get_unread_counts_by_room_for_user_txn, + user_id, + ) + + def _get_unread_counts_by_room_for_user_txn( + self, txn: LoggingTransaction, user_id: str + ) -> Dict[str, int]: + receipt_types_clause, args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) + args.extend([user_id, user_id]) + + receipts_cte = f""" + WITH all_receipts AS ( + SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + {receipt_types_clause} + AND user_id = ? + GROUP BY room_id, thread_id + ) + """ + + receipts_joins = """ + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS threaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NOT NULL + ) AS threaded_receipts USING (room_id, thread_id) + LEFT JOIN ( + SELECT room_id, thread_id, + max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering + FROM all_receipts + WHERE thread_id IS NULL + ) AS unthreaded_receipts USING (room_id) + """ + + # First get summary counts by room / thread for the user. We use the max receipt + # stream ordering of both threaded & unthreaded receipts to compare against the + # summary table. + # + # PostgreSQL and SQLite differ in comparing scalar numerics. + if isinstance(self.database_engine, PostgresEngine): + # GREATEST ignores NULLs. + max_clause = """GREATEST( + threaded_receipt_stream_ordering, + unthreaded_receipt_stream_ordering + )""" + else: + # MAX returns NULL if any are NULL, so COALESCE to 0 first. + max_clause = """MAX( + COALESCE(threaded_receipt_stream_ordering, 0), + COALESCE(unthreaded_receipt_stream_ordering, 0) + )""" + + sql = f""" + {receipts_cte} + SELECT eps.room_id, eps.thread_id, notif_count + FROM event_push_summary AS eps + {receipts_joins} + WHERE user_id = ? + AND notif_count != 0 + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) + OR last_receipt_stream_ordering = {max_clause} + ) + """ + txn.execute(sql, args) + + seen_thread_ids = set() + room_to_count: Dict[str, int] = defaultdict(int) + + for room_id, thread_id, notif_count in txn: + room_to_count[room_id] += notif_count + seen_thread_ids.add(thread_id) + + # Now get any event push actions that haven't been rotated using the same OR + # join and filter by receipt and event push summary rotated up to stream ordering. + sql = f""" + {receipts_cte} + SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + {receipts_joins} + WHERE user_id = ? + AND epa.notif = 1 + AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + GROUP BY epa.room_id, epa.thread_id + """ + txn.execute(sql, args) + + for room_id, thread_id, notif_count in txn: + # Note: only count push actions we have valid summaries for with up to date receipt. + if thread_id not in seen_thread_ids: + continue + room_to_count[room_id] += notif_count + + thread_id_clause, thread_ids_args = make_in_list_sql_clause( + self.database_engine, "epa.thread_id", seen_thread_ids + ) + + # Finally re-check event_push_actions for any rooms not in the summary, ignoring + # the rotated up-to position. This handles the case where a read receipt has arrived + # but not been rotated meaning the summary table is out of date, so we go back to + # the push actions table. + sql = f""" + {receipts_cte} + SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count + FROM event_push_actions AS epa + {receipts_joins} + WHERE user_id = ? + AND NOT {thread_id_clause} + AND epa.notif = 1 + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) + GROUP BY epa.room_id + """ + + args.extend(thread_ids_args) + txn.execute(sql, args) + + for room_id, notif_count in txn: + room_to_count[room_id] += notif_count + + return room_to_count + @cached(tree=True, max_entries=5000, iterable=True) async def get_unread_event_push_actions_by_room_for_user( self, diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 63628aa6b0..f7c309cad0 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -433,7 +433,7 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase): async def get_json(destination, path, **kwargs): self.assertEqual(destination, SERVER_NAME) - self.assertEqual(path, "/_matrix/key/v2/server/key1") + self.assertEqual(path, "/_matrix/key/v2/server") return response self.http_client.get_json.side_effect = get_json @@ -469,18 +469,6 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase): keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0)) self.assertEqual(keys, {}) - def test_keyid_containing_forward_slash(self) -> None: - """We should url-encode any url unsafe chars in key ids. - - Detects https://github.com/matrix-org/synapse/issues/14488. - """ - fetcher = ServerKeyFetcher(self.hs) - self.get_success(fetcher.get_keys("example.com", ["key/potato"], 0)) - - self.http_client.get_json.assert_called_once() - args, kwargs = self.http_client.get_json.call_args - self.assertEqual(kwargs["path"], "/_matrix/key/v2/server/key%2Fpotato") - class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py index 685a9a6d52..b703e4472e 100644 --- a/tests/events/test_presence_router.py +++ b/tests/events/test_presence_router.py @@ -126,6 +126,13 @@ class PresenceRouterTestModule: class PresenceRouterTestCase(FederatingHomeserverTestCase): + """ + Test cases using a custom PresenceRouter + + By default in test cases, federation sending is disabled. This class re-enables it + for the main process by setting `federation_sender_instances` to None. + """ + servlets = [ admin.register_servlets, login.register_servlets, @@ -150,6 +157,11 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): self.sync_handler = self.hs.get_sync_handler() self.module_api = homeserver.get_module_api() + def default_config(self) -> JsonDict: + config = super().default_config() + config["federation_sender_instances"] = None + return config + @override_config( { "presence": { @@ -162,7 +174,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): }, } }, - "send_federation": True, } ) def test_receiving_all_presence_legacy(self): @@ -180,7 +191,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): }, }, ], - "send_federation": True, } ) def test_receiving_all_presence(self): @@ -290,7 +300,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): }, } }, - "send_federation": True, } ) def test_send_local_online_presence_to_with_module_legacy(self): @@ -310,7 +319,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase): }, }, ], - "send_federation": True, } ) def test_send_local_online_presence_to_with_module(self): diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py index 2873b4d430..b8fee72898 100644 --- a/tests/federation/test_federation_catch_up.py +++ b/tests/federation/test_federation_catch_up.py @@ -7,13 +7,21 @@ from synapse.federation.sender import PerDestinationQueue, TransactionManager from synapse.federation.units import Edu from synapse.rest import admin from synapse.rest.client import login, room +from synapse.types import JsonDict from synapse.util.retryutils import NotRetryingDestination from tests.test_utils import event_injection, make_awaitable -from tests.unittest import FederatingHomeserverTestCase, override_config +from tests.unittest import FederatingHomeserverTestCase class FederationCatchUpTestCases(FederatingHomeserverTestCase): + """ + Tests cases of catching up over federation. + + By default for test cases federation sending is disabled. This Test class has it + re-enabled for the main process. + """ + servlets = [ admin.register_servlets, room.register_servlets, @@ -42,6 +50,11 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): self.record_transaction ) + def default_config(self) -> JsonDict: + config = super().default_config() + config["federation_sender_instances"] = None + return config + async def record_transaction(self, txn, json_cb): if self.is_online: data = json_cb() @@ -79,7 +92,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): )[0] return {"event_id": event_id, "stream_ordering": stream_ordering} - @override_config({"send_federation": True}) def test_catch_up_destination_rooms_tracking(self): """ Tests that we populate the `destination_rooms` table as needed. @@ -105,7 +117,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): self.assertEqual(row_2["event_id"], event_id_2) self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1) - @override_config({"send_federation": True}) def test_catch_up_last_successful_stream_ordering_tracking(self): """ Tests that we populate the `destination_rooms` table as needed. @@ -163,7 +174,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): "Send succeeded but not marked as last_successful_stream_ordering", ) - @override_config({"send_federation": True}) # critical to federate def test_catch_up_from_blank_state(self): """ Runs an overall test of federation catch-up from scratch. @@ -260,7 +270,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): return per_dest_queue, results_list - @override_config({"send_federation": True}) def test_catch_up_loop(self): """ Tests the behaviour of _catch_up_transmission_loop. @@ -325,7 +334,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): event_5.internal_metadata.stream_ordering, ) - @override_config({"send_federation": True}) def test_catch_up_on_synapse_startup(self): """ Tests the behaviour of get_catch_up_outstanding_destinations and @@ -424,7 +432,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase): # - all destinations are woken exactly once; they appear once in woken. self.assertCountEqual(woken, server_names[:-1]) - @override_config({"send_federation": True}) def test_not_latest_event(self): """Test that we send the latest event in the room even if its not ours.""" diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 01f147418b..8692d8190f 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -25,10 +25,17 @@ from synapse.rest.client import login from synapse.types import JsonDict, ReadReceipt from tests.test_utils import make_awaitable -from tests.unittest import HomeserverTestCase, override_config +from tests.unittest import HomeserverTestCase class FederationSenderReceiptsTestCases(HomeserverTestCase): + """ + Test federation sending to update receipts. + + By default for test cases federation sending is disabled. This Test class has it + re-enabled for the main process. + """ + def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver( federation_transport_client=Mock(spec=["send_transaction"]), @@ -38,9 +45,17 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): return_value=make_awaitable({"test", "host2"}) ) + hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( + hs.get_storage_controllers().state.get_current_hosts_in_room + ) + return hs - @override_config({"send_federation": True}) + def default_config(self) -> JsonDict: + config = super().default_config() + config["federation_sender_instances"] = None + return config + def test_send_receipts(self): mock_send_transaction = ( self.hs.get_federation_transport_client().send_transaction @@ -83,7 +98,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): ], ) - @override_config({"send_federation": True}) def test_send_receipts_thread(self): mock_send_transaction = ( self.hs.get_federation_transport_client().send_transaction @@ -160,7 +174,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): ], ) - @override_config({"send_federation": True}) def test_send_receipts_with_backoff(self): """Send two receipts in quick succession; the second should be flushed, but only after 20ms""" @@ -247,6 +260,13 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): class FederationSenderDevicesTestCases(HomeserverTestCase): + """ + Test federation sending to update devices. + + By default for test cases federation sending is disabled. This Test class has it + re-enabled for the main process. + """ + servlets = [ admin.register_servlets, login.register_servlets, @@ -261,7 +281,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): def default_config(self): c = super().default_config() - c["send_federation"] = True + # Enable federation sending on the main process. + c["federation_sender_instances"] = None return c def prepare(self, reactor, clock, hs): diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index c5981ff965..584e7b8971 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -992,7 +992,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): def default_config(self): config = super().default_config() - config["send_federation"] = True + # Enable federation sending on the main process. + config["federation_sender_instances"] = None return config def prepare(self, reactor, clock, hs): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 9c821b3042..efbb5a8dbb 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -200,7 +200,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ], ) - @override_config({"send_federation": True}) + # Enable federation sending on the main process. + @override_config({"federation_sender_instances": None}) def test_started_typing_remote_send(self) -> None: self.room_members = [U_APPLE, U_ONION] @@ -305,7 +306,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.assertEqual(events[0], []) self.assertEqual(events[1], 0) - @override_config({"send_federation": True}) + # Enable federation sending on the main process. + @override_config({"federation_sender_instances": None}) def test_stopped_typing(self) -> None: self.room_members = [U_APPLE, U_BANANA, U_ONION] diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 9e39cd97e5..75fc5a17a4 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -56,7 +56,8 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: config = self.default_config() - config["update_user_directory"] = True + # Re-enables updating the user directory, as that function is needed below. + config["update_user_directory_from_worker"] = None self.appservice = ApplicationService( token="i_am_an_app_service", @@ -1045,7 +1046,9 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: config = self.default_config() - config["update_user_directory"] = True + # Re-enables updating the user directory, as that function is needed below. It + # will be force disabled later + config["update_user_directory_from_worker"] = None hs = self.setup_test_homeserver(config=config) self.config = hs.config diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 058ca57e55..b0f3f4374d 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -336,7 +336,8 @@ class ModuleApiTestCase(HomeserverTestCase): # Test sending local online presence to users from the main process _test_sending_local_online_presence_to_local_user(self, test_with_workers=False) - @override_config({"send_federation": True}) + # Enable federation sending on the main process. + @override_config({"federation_sender_instances": None}) def test_send_local_online_presence_to_federation(self): """Tests that send_local_presence_to_users sends local online presence to remote users.""" # Create a user who will send presence updates diff --git a/tests/push/test_email.py b/tests/push/test_email.py index fd14568f55..57b2f0536e 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -66,7 +66,6 @@ class EmailPusherTests(HomeserverTestCase): "riot_base_url": None, } config["public_baseurl"] = "http://aaa" - config["start_pushers"] = True hs = self.setup_test_homeserver(config=config) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index b383b8401f..afaafe79aa 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Dict, List, Optional, Tuple +from typing import List, Optional, Tuple from unittest.mock import Mock from twisted.internet.defer import Deferred @@ -41,11 +41,6 @@ class HTTPPusherTests(HomeserverTestCase): user_id = True hijack_auth = False - def default_config(self) -> Dict[str, Any]: - config = super().default_config() - config["start_pushers"] = True - return config - def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.push_attempts: List[Tuple[Deferred, str, dict]] = [] diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 3029a16dda..6a7174b333 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -307,7 +307,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): stream to the master HS. Args: - worker_app: Type of worker, e.g. `synapse.app.federation_sender`. + worker_app: Type of worker, e.g. `synapse.app.generic_worker`. extra_config: Any extra config to use for this instances. **kwargs: Options that get passed to `self.setup_test_homeserver`, useful to e.g. pass some mocks for things like `federation_http_client` diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py index ffec06a0d6..bcb82c9c80 100644 --- a/tests/replication/tcp/streams/test_federation.py +++ b/tests/replication/tcp/streams/test_federation.py @@ -22,9 +22,8 @@ class FederationStreamTestCase(BaseStreamTestCase): def _get_worker_hs_config(self) -> dict: # enable federation sending on the worker config = super()._get_worker_hs_config() - # TODO: make it so we don't need both of these - config["send_federation"] = False - config["worker_app"] = "synapse.app.federation_sender" + config["worker_name"] = "federation_sender1" + config["federation_sender_instances"] = ["federation_sender1"] return config def test_catchup(self): diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py index 43a16bb141..5d7a89e0c7 100644 --- a/tests/replication/test_auth.py +++ b/tests/replication/test_auth.py @@ -38,7 +38,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase): def _get_worker_hs_config(self) -> dict: config = self.default_config() - config["worker_app"] = "synapse.app.client_reader" + config["worker_app"] = "synapse.app.generic_worker" config["worker_replication_host"] = "testserv" config["worker_replication_http_port"] = "8765" @@ -53,7 +53,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase): 4. Return the final request. """ - worker_hs = self.make_worker_hs("synapse.app.client_reader") + worker_hs = self.make_worker_hs("synapse.app.generic_worker") site = self._hs_to_site[worker_hs] channel_1 = make_request( diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py index 995097d72c..eb5b376534 100644 --- a/tests/replication/test_client_reader_shard.py +++ b/tests/replication/test_client_reader_shard.py @@ -22,20 +22,20 @@ logger = logging.getLogger(__name__) class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): - """Test using one or more client readers for registration.""" + """Test using one or more generic workers for registration.""" servlets = [register.register_servlets] def _get_worker_hs_config(self) -> dict: config = self.default_config() - config["worker_app"] = "synapse.app.client_reader" + config["worker_app"] = "synapse.app.generic_worker" config["worker_replication_host"] = "testserv" config["worker_replication_http_port"] = "8765" return config def test_register_single_worker(self): - """Test that registration works when using a single client reader worker.""" - worker_hs = self.make_worker_hs("synapse.app.client_reader") + """Test that registration works when using a single generic worker.""" + worker_hs = self.make_worker_hs("synapse.app.generic_worker") site = self._hs_to_site[worker_hs] channel_1 = make_request( @@ -64,9 +64,9 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): self.assertEqual(channel_2.json_body["user_id"], "@user:test") def test_register_multi_worker(self): - """Test that registration works when using multiple client reader workers.""" - worker_hs_1 = self.make_worker_hs("synapse.app.client_reader") - worker_hs_2 = self.make_worker_hs("synapse.app.client_reader") + """Test that registration works when using multiple generic workers.""" + worker_hs_1 = self.make_worker_hs("synapse.app.generic_worker") + worker_hs_2 = self.make_worker_hs("synapse.app.generic_worker") site_1 = self._hs_to_site[worker_hs_1] channel_1 = make_request( diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py index 26b8bd512a..63b1dd40b5 100644 --- a/tests/replication/test_federation_ack.py +++ b/tests/replication/test_federation_ack.py @@ -25,8 +25,9 @@ from tests.unittest import HomeserverTestCase class FederationAckTestCase(HomeserverTestCase): def default_config(self) -> dict: config = super().default_config() - config["worker_app"] = "synapse.app.federation_sender" - config["send_federation"] = False + config["worker_app"] = "synapse.app.generic_worker" + config["worker_name"] = "federation_sender1" + config["federation_sender_instances"] = ["federation_sender1"] return config def make_homeserver(self, reactor, clock): diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 6104a55aa1..c28073b8f7 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -27,17 +27,19 @@ logger = logging.getLogger(__name__) class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): + """ + Various tests for federation sending on workers. + + Federation sending is disabled by default, it will be enabled in each test by + updating 'federation_sender_instances'. + """ + servlets = [ login.register_servlets, register_servlets_for_client_rest_resource, room.register_servlets, ] - def default_config(self): - conf = super().default_config() - conf["send_federation"] = False - return conf - def test_send_event_single_sender(self): """Test that using a single federation sender worker correctly sends a new event. @@ -46,8 +48,11 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): mock_client.put_json.return_value = make_awaitable({}) self.make_worker_hs( - "synapse.app.federation_sender", - {"send_federation": False}, + "synapse.app.generic_worker", + { + "worker_name": "federation_sender1", + "federation_sender_instances": ["federation_sender1"], + }, federation_http_client=mock_client, ) @@ -73,11 +78,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): mock_client1 = Mock(spec=["put_json"]) mock_client1.put_json.return_value = make_awaitable({}) self.make_worker_hs( - "synapse.app.federation_sender", + "synapse.app.generic_worker", { - "send_federation": True, - "worker_name": "sender1", - "federation_sender_instances": ["sender1", "sender2"], + "worker_name": "federation_sender1", + "federation_sender_instances": [ + "federation_sender1", + "federation_sender2", + ], }, federation_http_client=mock_client1, ) @@ -85,11 +92,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): mock_client2 = Mock(spec=["put_json"]) mock_client2.put_json.return_value = make_awaitable({}) self.make_worker_hs( - "synapse.app.federation_sender", + "synapse.app.generic_worker", { - "send_federation": True, - "worker_name": "sender2", - "federation_sender_instances": ["sender1", "sender2"], + "worker_name": "federation_sender2", + "federation_sender_instances": [ + "federation_sender1", + "federation_sender2", + ], }, federation_http_client=mock_client2, ) @@ -136,11 +145,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): mock_client1 = Mock(spec=["put_json"]) mock_client1.put_json.return_value = make_awaitable({}) self.make_worker_hs( - "synapse.app.federation_sender", + "synapse.app.generic_worker", { - "send_federation": True, - "worker_name": "sender1", - "federation_sender_instances": ["sender1", "sender2"], + "worker_name": "federation_sender1", + "federation_sender_instances": [ + "federation_sender1", + "federation_sender2", + ], }, federation_http_client=mock_client1, ) @@ -148,11 +159,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): mock_client2 = Mock(spec=["put_json"]) mock_client2.put_json.return_value = make_awaitable({}) self.make_worker_hs( - "synapse.app.federation_sender", + "synapse.app.generic_worker", { - "send_federation": True, - "worker_name": "sender2", - "federation_sender_instances": ["sender1", "sender2"], + "worker_name": "federation_sender2", + "federation_sender_instances": [ + "federation_sender1", + "federation_sender2", + ], }, federation_http_client=mock_client2, ) diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py index 59fea93e49..ca18ad6553 100644 --- a/tests/replication/test_pusher_shard.py +++ b/tests/replication/test_pusher_shard.py @@ -38,11 +38,6 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): self.other_user_id = self.register_user("otheruser", "pass") self.other_access_token = self.login("otheruser", "pass") - def default_config(self): - conf = super().default_config() - conf["start_pushers"] = False - return conf - def _create_pusher_and_send_msg(self, localpart): # Create a user that will get push notifications user_id = self.register_user(localpart, "pass") @@ -92,8 +87,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): ) self.make_worker_hs( - "synapse.app.pusher", - {"start_pushers": False}, + "synapse.app.generic_worker", + {"worker_name": "pusher1", "pusher_instances": ["pusher1"]}, proxied_blacklisted_http_client=http_client_mock, ) @@ -122,9 +117,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): ) self.make_worker_hs( - "synapse.app.pusher", + "synapse.app.generic_worker", { - "start_pushers": True, "worker_name": "pusher1", "pusher_instances": ["pusher1", "pusher2"], }, @@ -137,9 +131,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase): ) self.make_worker_hs( - "synapse.app.pusher", + "synapse.app.generic_worker", { - "start_pushers": True, "worker_name": "pusher2", "pusher_instances": ["pusher1", "pusher2"], }, diff --git a/tests/rest/key/v2/test_remote_key_resource.py b/tests/rest/key/v2/test_remote_key_resource.py index 7f1fba1086..2bb6e27d94 100644 --- a/tests/rest/key/v2/test_remote_key_resource.py +++ b/tests/rest/key/v2/test_remote_key_resource.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import urllib.parse from io import BytesIO, StringIO from typing import Any, Dict, Optional, Union from unittest.mock import Mock @@ -65,9 +64,7 @@ class BaseRemoteKeyResourceTestCase(unittest.HomeserverTestCase): self.assertTrue(ignore_backoff) self.assertEqual(destination, server_name) key_id = "%s:%s" % (signing_key.alg, signing_key.version) - self.assertEqual( - path, "/_matrix/key/v2/server/%s" % (urllib.parse.quote(key_id),) - ) + self.assertEqual(path, "/_matrix/key/v2/server") response = { "server_name": server_name, diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index ee48920f84..5fa8bd2d98 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -156,7 +156,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): last_event_id: str - def _assert_counts(noitf_count: int, highlight_count: int) -> None: + def _assert_counts(notif_count: int, highlight_count: int) -> None: counts = self.get_success( self.store.db_pool.runInteraction( "get-unread-counts", @@ -168,13 +168,22 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, + notify_count=notif_count, unread_count=0, highlight_count=highlight_count, ), ) self.assertEqual(counts.threads, {}) + aggregate_counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual(aggregate_counts[room_id], notif_count) + def _create_event(highlight: bool = False) -> str: result = self.helper.send_event( room_id, @@ -283,7 +292,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): last_event_id: str def _assert_counts( - noitf_count: int, + notif_count: int, highlight_count: int, thread_notif_count: int, thread_highlight_count: int, @@ -299,7 +308,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, + notify_count=notif_count, unread_count=0, highlight_count=highlight_count, ), @@ -318,6 +327,17 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): else: self.assertEqual(counts.threads, {}) + aggregate_counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual( + aggregate_counts[room_id], notif_count + thread_notif_count + ) + def _create_event( highlight: bool = False, thread_id: Optional[str] = None ) -> str: @@ -454,7 +474,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): last_event_id: str def _assert_counts( - noitf_count: int, + notif_count: int, highlight_count: int, thread_notif_count: int, thread_highlight_count: int, @@ -470,7 +490,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, + notify_count=notif_count, unread_count=0, highlight_count=highlight_count, ), @@ -489,6 +509,17 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): else: self.assertEqual(counts.threads, {}) + aggregate_counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual( + aggregate_counts[room_id], notif_count + thread_notif_count + ) + def _create_event( highlight: bool = False, thread_id: Optional[str] = None ) -> str: @@ -646,7 +677,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): ) return result["event_id"] - def _assert_counts(noitf_count: int, thread_notif_count: int) -> None: + def _assert_counts(notif_count: int, thread_notif_count: int) -> None: counts = self.get_success( self.store.db_pool.runInteraction( "get-unread-counts", @@ -658,7 +689,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase): self.assertEqual( counts.main_timeline, NotifCounts( - notify_count=noitf_count, unread_count=0, highlight_count=0 + notify_count=notif_count, unread_count=0, highlight_count=0 ), ) if thread_notif_count: diff --git a/tests/utils.py b/tests/utils.py index 045a8b5fa7..d76bf9716a 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -125,7 +125,8 @@ def default_config( """ config_dict = { "server_name": name, - "send_federation": False, + # Setting this to an empty list turns off federation sending. + "federation_sender_instances": [], "media_store_path": "media", # the test signing key is just an arbitrary ed25519 key to keep the config # parser happy @@ -183,8 +184,9 @@ def default_config( # rooms will fail. "default_room_version": DEFAULT_ROOM_VERSION, # disable user directory updates, because they get done in the - # background, which upsets the test runner. - "update_user_directory": False, + # background, which upsets the test runner. Setting this to an + # (obviously) fake worker name disables updating the user directory. + "update_user_directory_from_worker": "does_not_exist_worker_name", "caches": {"global_factor": 1, "sync_response_cache_duration": 0}, "listeners": [{"port": 0, "type": "http"}], } |