summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14255.misc1
-rw-r--r--changelog.d/14490.feature1
-rw-r--r--changelog.d/14517.doc1
-rw-r--r--changelog.d/14525.feature1
-rw-r--r--changelog.d/14549.misc1
-rw-r--r--changelog.d/14568.misc1
-rw-r--r--changelog.d/14576.feature1
-rw-r--r--docs/setup/installation.md19
-rw-r--r--synapse/crypto/keyring.py107
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/push/push_tools.py28
-rw-r--r--synapse/rest/client/versions.py1
-rw-r--r--synapse/storage/databases/main/event_push_actions.py149
-rw-r--r--tests/crypto/test_keyring.py14
-rw-r--r--tests/events/test_presence_router.py16
-rw-r--r--tests/federation/test_federation_catch_up.py21
-rw-r--r--tests/federation/test_federation_sender.py31
-rw-r--r--tests/handlers/test_presence.py3
-rw-r--r--tests/handlers/test_typing.py6
-rw-r--r--tests/handlers/test_user_directory.py7
-rw-r--r--tests/module_api/test_api.py3
-rw-r--r--tests/push/test_email.py1
-rw-r--r--tests/push/test_http.py7
-rw-r--r--tests/replication/_base.py2
-rw-r--r--tests/replication/tcp/streams/test_federation.py5
-rw-r--r--tests/replication/test_auth.py4
-rw-r--r--tests/replication/test_client_reader_shard.py14
-rw-r--r--tests/replication/test_federation_ack.py5
-rw-r--r--tests/replication/test_federation_sender_shard.py59
-rw-r--r--tests/replication/test_pusher_shard.py15
-rw-r--r--tests/rest/key/v2/test_remote_key_resource.py5
-rw-r--r--tests/storage/test_event_push_actions.py47
-rw-r--r--tests/utils.py8
33 files changed, 390 insertions, 196 deletions
diff --git a/changelog.d/14255.misc b/changelog.d/14255.misc
new file mode 100644
index 0000000000..39924659c7
--- /dev/null
+++ b/changelog.d/14255.misc
@@ -0,0 +1 @@
+Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/changelog.d/14490.feature b/changelog.d/14490.feature
new file mode 100644
index 0000000000..c7cb571294
--- /dev/null
+++ b/changelog.d/14490.feature
@@ -0,0 +1 @@
+Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
diff --git a/changelog.d/14517.doc b/changelog.d/14517.doc
new file mode 100644
index 0000000000..2c9de68971
--- /dev/null
+++ b/changelog.d/14517.doc
@@ -0,0 +1 @@
+Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages.
diff --git a/changelog.d/14525.feature b/changelog.d/14525.feature
new file mode 100644
index 0000000000..c7cb571294
--- /dev/null
+++ b/changelog.d/14525.feature
@@ -0,0 +1 @@
+Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
diff --git a/changelog.d/14549.misc b/changelog.d/14549.misc
new file mode 100644
index 0000000000..d9d863dd20
--- /dev/null
+++ b/changelog.d/14549.misc
@@ -0,0 +1 @@
+Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room.
\ No newline at end of file
diff --git a/changelog.d/14568.misc b/changelog.d/14568.misc
new file mode 100644
index 0000000000..99973de1c1
--- /dev/null
+++ b/changelog.d/14568.misc
@@ -0,0 +1 @@
+Modernize unit tests configuration related to workers.
diff --git a/changelog.d/14576.feature b/changelog.d/14576.feature
new file mode 100644
index 0000000000..4fe8cb2667
--- /dev/null
+++ b/changelog.d/14576.feature
@@ -0,0 +1 @@
+Advertise support for Matrix 1.5 on `/_matrix/client/versions`.
diff --git a/docs/setup/installation.md b/docs/setup/installation.md
index dcd8f17c5e..436041f8a8 100644
--- a/docs/setup/installation.md
+++ b/docs/setup/installation.md
@@ -84,7 +84,9 @@ file when you upgrade the Debian package to a later version.
 
 ##### Downstream Debian packages
 
-Andrej Shadura maintains a `matrix-synapse` package in the Debian repositories.
+Andrej Shadura maintains a
+[`matrix-synapse`](https://packages.debian.org/sid/matrix-synapse) package in
+the Debian repositories.
 For `bookworm` and `sid`, it can be installed simply with:
 
 ```sh
@@ -100,23 +102,27 @@ for information on how to use backports.
 ##### Downstream Ubuntu packages
 
 We do not recommend using the packages in the default Ubuntu repository
-at this time, as they are old and suffer from known security vulnerabilities.
+at this time, as they are [old and suffer from known security vulnerabilities](
+    https://bugs.launchpad.net/ubuntu/+source/matrix-synapse/+bug/1848709
+).
 The latest version of Synapse can be installed from [our repository](#matrixorg-packages).
 
 #### Fedora
 
-Synapse is in the Fedora repositories as `matrix-synapse`:
+Synapse is in the Fedora repositories as
+[`matrix-synapse`](https://src.fedoraproject.org/rpms/matrix-synapse):
 
 ```sh
 sudo dnf install matrix-synapse
 ```
 
-Oleg Girko provides Fedora RPMs at
+Additionally, Oleg Girko provides Fedora RPMs at
 <https://obs.infoserver.lv/project/monitor/matrix-synapse>
 
 #### OpenSUSE
 
-Synapse is in the OpenSUSE repositories as `matrix-synapse`:
+Synapse is in the OpenSUSE repositories as
+[`matrix-synapse`](https://software.opensuse.org/package/matrix-synapse):
 
 ```sh
 sudo zypper install matrix-synapse
@@ -151,7 +157,8 @@ sudo pip install py-bcrypt
 
 #### Void Linux
 
-Synapse can be found in the void repositories as 'synapse':
+Synapse can be found in the void repositories as
+['synapse'](https://github.com/void-linux/void-packages/tree/master/srcpkgs/synapse):
 
 ```sh
 xbps-install -Su
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ed15f88350..69310d9035 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -14,7 +14,6 @@
 
 import abc
 import logging
-import urllib
 from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
 
 import attr
@@ -813,31 +812,27 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
 
         results = {}
 
-        async def get_key(key_to_fetch_item: _FetchKeyRequest) -> None:
+        async def get_keys(key_to_fetch_item: _FetchKeyRequest) -> None:
             server_name = key_to_fetch_item.server_name
-            key_ids = key_to_fetch_item.key_ids
 
             try:
-                keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
+                keys = await self.get_server_verify_keys_v2_direct(server_name)
                 results[server_name] = keys
             except KeyLookupError as e:
-                logger.warning(
-                    "Error looking up keys %s from %s: %s", key_ids, server_name, e
-                )
+                logger.warning("Error looking up keys from %s: %s", server_name, e)
             except Exception:
-                logger.exception("Error getting keys %s from %s", key_ids, server_name)
+                logger.exception("Error getting keys from %s", server_name)
 
-        await yieldable_gather_results(get_key, keys_to_fetch)
+        await yieldable_gather_results(get_keys, keys_to_fetch)
         return results
 
-    async def get_server_verify_key_v2_direct(
-        self, server_name: str, key_ids: Iterable[str]
+    async def get_server_verify_keys_v2_direct(
+        self, server_name: str
     ) -> Dict[str, FetchKeyResult]:
         """
 
         Args:
-            server_name:
-            key_ids:
+            server_name: Server to request keys from
 
         Returns:
             Map from key ID to lookup result
@@ -845,57 +840,41 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
         Raises:
             KeyLookupError if there was a problem making the lookup
         """
-        keys: Dict[str, FetchKeyResult] = {}
-
-        for requested_key_id in key_ids:
-            # we may have found this key as a side-effect of asking for another.
-            if requested_key_id in keys:
-                continue
-
-            time_now_ms = self.clock.time_msec()
-            try:
-                response = await self.client.get_json(
-                    destination=server_name,
-                    path="/_matrix/key/v2/server/"
-                    + urllib.parse.quote(requested_key_id, safe=""),
-                    ignore_backoff=True,
-                    # we only give the remote server 10s to respond. It should be an
-                    # easy request to handle, so if it doesn't reply within 10s, it's
-                    # probably not going to.
-                    #
-                    # Furthermore, when we are acting as a notary server, we cannot
-                    # wait all day for all of the origin servers, as the requesting
-                    # server will otherwise time out before we can respond.
-                    #
-                    # (Note that get_json may make 4 attempts, so this can still take
-                    # almost 45 seconds to fetch the headers, plus up to another 60s to
-                    # read the response).
-                    timeout=10000,
-                )
-            except (NotRetryingDestination, RequestSendFailed) as e:
-                # these both have str() representations which we can't really improve
-                # upon
-                raise KeyLookupError(str(e))
-            except HttpResponseException as e:
-                raise KeyLookupError("Remote server returned an error: %s" % (e,))
-
-            assert isinstance(response, dict)
-            if response["server_name"] != server_name:
-                raise KeyLookupError(
-                    "Expected a response for server %r not %r"
-                    % (server_name, response["server_name"])
-                )
-
-            response_keys = await self.process_v2_response(
-                from_server=server_name,
-                response_json=response,
-                time_added_ms=time_now_ms,
+        time_now_ms = self.clock.time_msec()
+        try:
+            response = await self.client.get_json(
+                destination=server_name,
+                path="/_matrix/key/v2/server",
+                ignore_backoff=True,
+                # we only give the remote server 10s to respond. It should be an
+                # easy request to handle, so if it doesn't reply within 10s, it's
+                # probably not going to.
+                #
+                # Furthermore, when we are acting as a notary server, we cannot
+                # wait all day for all of the origin servers, as the requesting
+                # server will otherwise time out before we can respond.
+                #
+                # (Note that get_json may make 4 attempts, so this can still take
+                # almost 45 seconds to fetch the headers, plus up to another 60s to
+                # read the response).
+                timeout=10000,
             )
-            await self.store.store_server_verify_keys(
-                server_name,
-                time_now_ms,
-                ((server_name, key_id, key) for key_id, key in response_keys.items()),
+        except (NotRetryingDestination, RequestSendFailed) as e:
+            # these both have str() representations which we can't really improve
+            # upon
+            raise KeyLookupError(str(e))
+        except HttpResponseException as e:
+            raise KeyLookupError("Remote server returned an error: %s" % (e,))
+
+        assert isinstance(response, dict)
+        if response["server_name"] != server_name:
+            raise KeyLookupError(
+                "Expected a response for server %r not %r"
+                % (server_name, response["server_name"])
             )
-            keys.update(response_keys)
 
-        return keys
+        return await self.process_v2_response(
+            from_server=server_name,
+            response_json=response,
+            time_added_ms=time_now_ms,
+        )
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fc1d8c88a7..30ebd62883 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender):
         room_id = receipt.room_id
 
         # Work out which remote servers should be poked and poke them.
-        domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+        domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
             room_id
         )
         domains = [
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index edeba27a45..7ee07e4bee 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,7 +17,6 @@ from synapse.events import EventBase
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage.controllers import StorageControllers
 from synapse.storage.databases.main import DataStore
-from synapse.util.async_helpers import concurrently_execute
 
 
 async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
@@ -26,23 +25,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
 
     badge = len(invites)
 
-    room_notifs = []
-
-    async def get_room_unread_count(room_id: str) -> None:
-        room_notifs.append(
-            await store.get_unread_event_push_actions_by_room_for_user(
-                room_id,
-                user_id,
-            )
-        )
-
-    await concurrently_execute(get_room_unread_count, joins, 10)
-
-    for notifs in room_notifs:
-        # Combine the counts from all the threads.
-        notify_count = notifs.main_timeline.notify_count + sum(
-            n.notify_count for n in notifs.threads.values()
-        )
+    room_to_count = await store.get_unread_counts_by_room_for_user(user_id)
+    for room_id, notify_count in room_to_count.items():
+        # room_to_count may include rooms which the user has left,
+        # ignore those.
+        if room_id not in joins:
+            continue
 
         if notify_count == 0:
             continue
@@ -51,8 +39,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
             # return one badge count per conversation
             badge += 1
         else:
-            # increment the badge count by the number of unread messages in the room
+            # Increase badge by number of notifications in room
+            # NOTE: this includes threaded and unthreaded notifications.
             badge += notify_count
+
     return badge
 
 
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 3c0a90010b..e19c0946c0 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -77,6 +77,7 @@ class VersionsRestServlet(RestServlet):
                     "v1.2",
                     "v1.3",
                     "v1.4",
+                    "v1.5",
                 ],
                 # as per MSC1497:
                 "unstable_features": {
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b283ab0f9c..7ebe34f773 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -74,6 +74,7 @@ receipt.
 """
 
 import logging
+from collections import defaultdict
 from typing import (
     TYPE_CHECKING,
     Collection,
@@ -95,6 +96,7 @@ from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
     LoggingTransaction,
+    PostgresEngine,
 )
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -463,6 +465,153 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         return result
 
+    async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
+        """Get the notification count by room for a user. Only considers notifications,
+        not highlight or unread counts, and threads are currently aggregated under their room.
+
+        This function is intentionally not cached because it is called to calculate the
+        unread badge for push notifications and thus the result is expected to change.
+
+        Note that this function assumes the user is a member of the room. Because
+        summary rows are not removed when a user leaves a room, the caller must
+        filter out those results from the result.
+
+        Returns:
+            A map of room ID to notification counts for the given user.
+        """
+        return await self.db_pool.runInteraction(
+            "get_unread_counts_by_room_for_user",
+            self._get_unread_counts_by_room_for_user_txn,
+            user_id,
+        )
+
+    def _get_unread_counts_by_room_for_user_txn(
+        self, txn: LoggingTransaction, user_id: str
+    ) -> Dict[str, int]:
+        receipt_types_clause, args = make_in_list_sql_clause(
+            self.database_engine,
+            "receipt_type",
+            (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+        )
+        args.extend([user_id, user_id])
+
+        receipts_cte = f"""
+            WITH all_receipts AS (
+                SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    {receipt_types_clause}
+                    AND user_id = ?
+                GROUP BY room_id, thread_id
+            )
+        """
+
+        receipts_joins = """
+            LEFT JOIN (
+                SELECT room_id, thread_id,
+                max_receipt_stream_ordering AS threaded_receipt_stream_ordering
+                FROM all_receipts
+                WHERE thread_id IS NOT NULL
+            ) AS threaded_receipts USING (room_id, thread_id)
+            LEFT JOIN (
+                SELECT room_id, thread_id,
+                max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
+                FROM all_receipts
+                WHERE thread_id IS NULL
+            ) AS unthreaded_receipts USING (room_id)
+        """
+
+        # First get summary counts by room / thread for the user. We use the max receipt
+        # stream ordering of both threaded & unthreaded receipts to compare against the
+        # summary table.
+        #
+        # PostgreSQL and SQLite differ in comparing scalar numerics.
+        if isinstance(self.database_engine, PostgresEngine):
+            # GREATEST ignores NULLs.
+            max_clause = """GREATEST(
+                threaded_receipt_stream_ordering,
+                unthreaded_receipt_stream_ordering
+            )"""
+        else:
+            # MAX returns NULL if any are NULL, so COALESCE to 0 first.
+            max_clause = """MAX(
+                COALESCE(threaded_receipt_stream_ordering, 0),
+                COALESCE(unthreaded_receipt_stream_ordering, 0)
+            )"""
+
+        sql = f"""
+            {receipts_cte}
+            SELECT eps.room_id, eps.thread_id, notif_count
+            FROM event_push_summary AS eps
+            {receipts_joins}
+            WHERE user_id = ?
+                AND notif_count != 0
+                AND (
+                    (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause})
+                    OR last_receipt_stream_ordering = {max_clause}
+                )
+        """
+        txn.execute(sql, args)
+
+        seen_thread_ids = set()
+        room_to_count: Dict[str, int] = defaultdict(int)
+
+        for room_id, thread_id, notif_count in txn:
+            room_to_count[room_id] += notif_count
+            seen_thread_ids.add(thread_id)
+
+        # Now get any event push actions that haven't been rotated using the same OR
+        # join and filter by receipt and event push summary rotated up to stream ordering.
+        sql = f"""
+            {receipts_cte}
+            SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+            FROM event_push_actions AS epa
+            {receipts_joins}
+            WHERE user_id = ?
+                AND epa.notif = 1
+                AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
+                AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+                AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+            GROUP BY epa.room_id, epa.thread_id
+        """
+        txn.execute(sql, args)
+
+        for room_id, thread_id, notif_count in txn:
+            # Note: only count push actions we have valid summaries for with up to date receipt.
+            if thread_id not in seen_thread_ids:
+                continue
+            room_to_count[room_id] += notif_count
+
+        thread_id_clause, thread_ids_args = make_in_list_sql_clause(
+            self.database_engine, "epa.thread_id", seen_thread_ids
+        )
+
+        # Finally re-check event_push_actions for any rooms not in the summary, ignoring
+        # the rotated up-to position. This handles the case where a read receipt has arrived
+        # but not been rotated meaning the summary table is out of date, so we go back to
+        # the push actions table.
+        sql = f"""
+            {receipts_cte}
+            SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+            FROM event_push_actions AS epa
+            {receipts_joins}
+            WHERE user_id = ?
+            AND NOT {thread_id_clause}
+            AND epa.notif = 1
+            AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+            AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+            GROUP BY epa.room_id
+        """
+
+        args.extend(thread_ids_args)
+        txn.execute(sql, args)
+
+        for room_id, notif_count in txn:
+            room_to_count[room_id] += notif_count
+
+        return room_to_count
+
     @cached(tree=True, max_entries=5000, iterable=True)
     async def get_unread_event_push_actions_by_room_for_user(
         self,
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 63628aa6b0..f7c309cad0 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -433,7 +433,7 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
 
         async def get_json(destination, path, **kwargs):
             self.assertEqual(destination, SERVER_NAME)
-            self.assertEqual(path, "/_matrix/key/v2/server/key1")
+            self.assertEqual(path, "/_matrix/key/v2/server")
             return response
 
         self.http_client.get_json.side_effect = get_json
@@ -469,18 +469,6 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
         keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
         self.assertEqual(keys, {})
 
-    def test_keyid_containing_forward_slash(self) -> None:
-        """We should url-encode any url unsafe chars in key ids.
-
-        Detects https://github.com/matrix-org/synapse/issues/14488.
-        """
-        fetcher = ServerKeyFetcher(self.hs)
-        self.get_success(fetcher.get_keys("example.com", ["key/potato"], 0))
-
-        self.http_client.get_json.assert_called_once()
-        args, kwargs = self.http_client.get_json.call_args
-        self.assertEqual(kwargs["path"], "/_matrix/key/v2/server/key%2Fpotato")
-
 
 class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py
index 685a9a6d52..b703e4472e 100644
--- a/tests/events/test_presence_router.py
+++ b/tests/events/test_presence_router.py
@@ -126,6 +126,13 @@ class PresenceRouterTestModule:
 
 
 class PresenceRouterTestCase(FederatingHomeserverTestCase):
+    """
+    Test cases using a custom PresenceRouter
+
+    By default in test cases, federation sending is disabled. This class re-enables it
+    for the main process by setting `federation_sender_instances` to None.
+    """
+
     servlets = [
         admin.register_servlets,
         login.register_servlets,
@@ -150,6 +157,11 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
         self.sync_handler = self.hs.get_sync_handler()
         self.module_api = homeserver.get_module_api()
 
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        config["federation_sender_instances"] = None
+        return config
+
     @override_config(
         {
             "presence": {
@@ -162,7 +174,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 }
             },
-            "send_federation": True,
         }
     )
     def test_receiving_all_presence_legacy(self):
@@ -180,7 +191,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 },
             ],
-            "send_federation": True,
         }
     )
     def test_receiving_all_presence(self):
@@ -290,7 +300,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 }
             },
-            "send_federation": True,
         }
     )
     def test_send_local_online_presence_to_with_module_legacy(self):
@@ -310,7 +319,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 },
             ],
-            "send_federation": True,
         }
     )
     def test_send_local_online_presence_to_with_module(self):
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 2873b4d430..b8fee72898 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -7,13 +7,21 @@ from synapse.federation.sender import PerDestinationQueue, TransactionManager
 from synapse.federation.units import Edu
 from synapse.rest import admin
 from synapse.rest.client import login, room
+from synapse.types import JsonDict
 from synapse.util.retryutils import NotRetryingDestination
 
 from tests.test_utils import event_injection, make_awaitable
-from tests.unittest import FederatingHomeserverTestCase, override_config
+from tests.unittest import FederatingHomeserverTestCase
 
 
 class FederationCatchUpTestCases(FederatingHomeserverTestCase):
+    """
+    Tests cases of catching up over federation.
+
+    By default for test cases federation sending is disabled. This Test class has it
+    re-enabled for the main process.
+    """
+
     servlets = [
         admin.register_servlets,
         room.register_servlets,
@@ -42,6 +50,11 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             self.record_transaction
         )
 
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        config["federation_sender_instances"] = None
+        return config
+
     async def record_transaction(self, txn, json_cb):
         if self.is_online:
             data = json_cb()
@@ -79,7 +92,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         )[0]
         return {"event_id": event_id, "stream_ordering": stream_ordering}
 
-    @override_config({"send_federation": True})
     def test_catch_up_destination_rooms_tracking(self):
         """
         Tests that we populate the `destination_rooms` table as needed.
@@ -105,7 +117,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         self.assertEqual(row_2["event_id"], event_id_2)
         self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
 
-    @override_config({"send_federation": True})
     def test_catch_up_last_successful_stream_ordering_tracking(self):
         """
         Tests that we populate the `destination_rooms` table as needed.
@@ -163,7 +174,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             "Send succeeded but not marked as last_successful_stream_ordering",
         )
 
-    @override_config({"send_federation": True})  # critical to federate
     def test_catch_up_from_blank_state(self):
         """
         Runs an overall test of federation catch-up from scratch.
@@ -260,7 +270,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
 
         return per_dest_queue, results_list
 
-    @override_config({"send_federation": True})
     def test_catch_up_loop(self):
         """
         Tests the behaviour of _catch_up_transmission_loop.
@@ -325,7 +334,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             event_5.internal_metadata.stream_ordering,
         )
 
-    @override_config({"send_federation": True})
     def test_catch_up_on_synapse_startup(self):
         """
         Tests the behaviour of get_catch_up_outstanding_destinations and
@@ -424,7 +432,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         # - all destinations are woken exactly once; they appear once in woken.
         self.assertCountEqual(woken, server_names[:-1])
 
-    @override_config({"send_federation": True})
     def test_not_latest_event(self):
         """Test that we send the latest event in the room even if its not ours."""
 
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index 01f147418b..8692d8190f 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -25,10 +25,17 @@ from synapse.rest.client import login
 from synapse.types import JsonDict, ReadReceipt
 
 from tests.test_utils import make_awaitable
-from tests.unittest import HomeserverTestCase, override_config
+from tests.unittest import HomeserverTestCase
 
 
 class FederationSenderReceiptsTestCases(HomeserverTestCase):
+    """
+    Test federation sending to update receipts.
+
+    By default for test cases federation sending is disabled. This Test class has it
+    re-enabled for the main process.
+    """
+
     def make_homeserver(self, reactor, clock):
         hs = self.setup_test_homeserver(
             federation_transport_client=Mock(spec=["send_transaction"]),
@@ -38,9 +45,17 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
             return_value=make_awaitable({"test", "host2"})
         )
 
+        hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = (
+            hs.get_storage_controllers().state.get_current_hosts_in_room
+        )
+
         return hs
 
-    @override_config({"send_federation": True})
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        config["federation_sender_instances"] = None
+        return config
+
     def test_send_receipts(self):
         mock_send_transaction = (
             self.hs.get_federation_transport_client().send_transaction
@@ -83,7 +98,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
             ],
         )
 
-    @override_config({"send_federation": True})
     def test_send_receipts_thread(self):
         mock_send_transaction = (
             self.hs.get_federation_transport_client().send_transaction
@@ -160,7 +174,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
             ],
         )
 
-    @override_config({"send_federation": True})
     def test_send_receipts_with_backoff(self):
         """Send two receipts in quick succession; the second should be flushed, but
         only after 20ms"""
@@ -247,6 +260,13 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
 
 
 class FederationSenderDevicesTestCases(HomeserverTestCase):
+    """
+    Test federation sending to update devices.
+
+    By default for test cases federation sending is disabled. This Test class has it
+    re-enabled for the main process.
+    """
+
     servlets = [
         admin.register_servlets,
         login.register_servlets,
@@ -261,7 +281,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
 
     def default_config(self):
         c = super().default_config()
-        c["send_federation"] = True
+        # Enable federation sending on the main process.
+        c["federation_sender_instances"] = None
         return c
 
     def prepare(self, reactor, clock, hs):
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index c5981ff965..584e7b8971 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -992,7 +992,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
 
     def default_config(self):
         config = super().default_config()
-        config["send_federation"] = True
+        # Enable federation sending on the main process.
+        config["federation_sender_instances"] = None
         return config
 
     def prepare(self, reactor, clock, hs):
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 9c821b3042..efbb5a8dbb 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -200,7 +200,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
             ],
         )
 
-    @override_config({"send_federation": True})
+    # Enable federation sending on the main process.
+    @override_config({"federation_sender_instances": None})
     def test_started_typing_remote_send(self) -> None:
         self.room_members = [U_APPLE, U_ONION]
 
@@ -305,7 +306,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
         self.assertEqual(events[0], [])
         self.assertEqual(events[1], 0)
 
-    @override_config({"send_federation": True})
+    # Enable federation sending on the main process.
+    @override_config({"federation_sender_instances": None})
     def test_stopped_typing(self) -> None:
         self.room_members = [U_APPLE, U_BANANA, U_ONION]
 
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index 9e39cd97e5..75fc5a17a4 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -56,7 +56,8 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         config = self.default_config()
-        config["update_user_directory"] = True
+        # Re-enables updating the user directory, as that function is needed below.
+        config["update_user_directory_from_worker"] = None
 
         self.appservice = ApplicationService(
             token="i_am_an_app_service",
@@ -1045,7 +1046,9 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase):
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         config = self.default_config()
-        config["update_user_directory"] = True
+        # Re-enables updating the user directory, as that function is needed below. It
+        # will be force disabled later
+        config["update_user_directory_from_worker"] = None
         hs = self.setup_test_homeserver(config=config)
 
         self.config = hs.config
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 058ca57e55..b0f3f4374d 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -336,7 +336,8 @@ class ModuleApiTestCase(HomeserverTestCase):
         # Test sending local online presence to users from the main process
         _test_sending_local_online_presence_to_local_user(self, test_with_workers=False)
 
-    @override_config({"send_federation": True})
+    # Enable federation sending on the main process.
+    @override_config({"federation_sender_instances": None})
     def test_send_local_online_presence_to_federation(self):
         """Tests that send_local_presence_to_users sends local online presence to remote users."""
         # Create a user who will send presence updates
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index fd14568f55..57b2f0536e 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -66,7 +66,6 @@ class EmailPusherTests(HomeserverTestCase):
             "riot_base_url": None,
         }
         config["public_baseurl"] = "http://aaa"
-        config["start_pushers"] = True
 
         hs = self.setup_test_homeserver(config=config)
 
diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index b383b8401f..afaafe79aa 100644
--- a/tests/push/test_http.py
+++ b/tests/push/test_http.py
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Any, Dict, List, Optional, Tuple
+from typing import List, Optional, Tuple
 from unittest.mock import Mock
 
 from twisted.internet.defer import Deferred
@@ -41,11 +41,6 @@ class HTTPPusherTests(HomeserverTestCase):
     user_id = True
     hijack_auth = False
 
-    def default_config(self) -> Dict[str, Any]:
-        config = super().default_config()
-        config["start_pushers"] = True
-        return config
-
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         self.push_attempts: List[Tuple[Deferred, str, dict]] = []
 
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index 3029a16dda..6a7174b333 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -307,7 +307,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
         stream to the master HS.
 
         Args:
-            worker_app: Type of worker, e.g. `synapse.app.federation_sender`.
+            worker_app: Type of worker, e.g. `synapse.app.generic_worker`.
             extra_config: Any extra config to use for this instances.
             **kwargs: Options that get passed to `self.setup_test_homeserver`,
                 useful to e.g. pass some mocks for things like `federation_http_client`
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
index ffec06a0d6..bcb82c9c80 100644
--- a/tests/replication/tcp/streams/test_federation.py
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -22,9 +22,8 @@ class FederationStreamTestCase(BaseStreamTestCase):
     def _get_worker_hs_config(self) -> dict:
         # enable federation sending on the worker
         config = super()._get_worker_hs_config()
-        # TODO: make it so we don't need both of these
-        config["send_federation"] = False
-        config["worker_app"] = "synapse.app.federation_sender"
+        config["worker_name"] = "federation_sender1"
+        config["federation_sender_instances"] = ["federation_sender1"]
         return config
 
     def test_catchup(self):
diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py
index 43a16bb141..5d7a89e0c7 100644
--- a/tests/replication/test_auth.py
+++ b/tests/replication/test_auth.py
@@ -38,7 +38,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase):
 
     def _get_worker_hs_config(self) -> dict:
         config = self.default_config()
-        config["worker_app"] = "synapse.app.client_reader"
+        config["worker_app"] = "synapse.app.generic_worker"
         config["worker_replication_host"] = "testserv"
         config["worker_replication_http_port"] = "8765"
 
@@ -53,7 +53,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase):
         4. Return the final request.
 
         """
-        worker_hs = self.make_worker_hs("synapse.app.client_reader")
+        worker_hs = self.make_worker_hs("synapse.app.generic_worker")
         site = self._hs_to_site[worker_hs]
 
         channel_1 = make_request(
diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py
index 995097d72c..eb5b376534 100644
--- a/tests/replication/test_client_reader_shard.py
+++ b/tests/replication/test_client_reader_shard.py
@@ -22,20 +22,20 @@ logger = logging.getLogger(__name__)
 
 
 class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
-    """Test using one or more client readers for registration."""
+    """Test using one or more generic workers for registration."""
 
     servlets = [register.register_servlets]
 
     def _get_worker_hs_config(self) -> dict:
         config = self.default_config()
-        config["worker_app"] = "synapse.app.client_reader"
+        config["worker_app"] = "synapse.app.generic_worker"
         config["worker_replication_host"] = "testserv"
         config["worker_replication_http_port"] = "8765"
         return config
 
     def test_register_single_worker(self):
-        """Test that registration works when using a single client reader worker."""
-        worker_hs = self.make_worker_hs("synapse.app.client_reader")
+        """Test that registration works when using a single generic worker."""
+        worker_hs = self.make_worker_hs("synapse.app.generic_worker")
         site = self._hs_to_site[worker_hs]
 
         channel_1 = make_request(
@@ -64,9 +64,9 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
         self.assertEqual(channel_2.json_body["user_id"], "@user:test")
 
     def test_register_multi_worker(self):
-        """Test that registration works when using multiple client reader workers."""
-        worker_hs_1 = self.make_worker_hs("synapse.app.client_reader")
-        worker_hs_2 = self.make_worker_hs("synapse.app.client_reader")
+        """Test that registration works when using multiple generic workers."""
+        worker_hs_1 = self.make_worker_hs("synapse.app.generic_worker")
+        worker_hs_2 = self.make_worker_hs("synapse.app.generic_worker")
 
         site_1 = self._hs_to_site[worker_hs_1]
         channel_1 = make_request(
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
index 26b8bd512a..63b1dd40b5 100644
--- a/tests/replication/test_federation_ack.py
+++ b/tests/replication/test_federation_ack.py
@@ -25,8 +25,9 @@ from tests.unittest import HomeserverTestCase
 class FederationAckTestCase(HomeserverTestCase):
     def default_config(self) -> dict:
         config = super().default_config()
-        config["worker_app"] = "synapse.app.federation_sender"
-        config["send_federation"] = False
+        config["worker_app"] = "synapse.app.generic_worker"
+        config["worker_name"] = "federation_sender1"
+        config["federation_sender_instances"] = ["federation_sender1"]
         return config
 
     def make_homeserver(self, reactor, clock):
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 6104a55aa1..c28073b8f7 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -27,17 +27,19 @@ logger = logging.getLogger(__name__)
 
 
 class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
+    """
+    Various tests for federation sending on workers.
+
+    Federation sending is disabled by default, it will be enabled in each test by
+    updating 'federation_sender_instances'.
+    """
+
     servlets = [
         login.register_servlets,
         register_servlets_for_client_rest_resource,
         room.register_servlets,
     ]
 
-    def default_config(self):
-        conf = super().default_config()
-        conf["send_federation"] = False
-        return conf
-
     def test_send_event_single_sender(self):
         """Test that using a single federation sender worker correctly sends a
         new event.
@@ -46,8 +48,11 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client.put_json.return_value = make_awaitable({})
 
         self.make_worker_hs(
-            "synapse.app.federation_sender",
-            {"send_federation": False},
+            "synapse.app.generic_worker",
+            {
+                "worker_name": "federation_sender1",
+                "federation_sender_instances": ["federation_sender1"],
+            },
             federation_http_client=mock_client,
         )
 
@@ -73,11 +78,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client1 = Mock(spec=["put_json"])
         mock_client1.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender1",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender1",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client1,
         )
@@ -85,11 +92,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client2 = Mock(spec=["put_json"])
         mock_client2.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender2",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender2",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client2,
         )
@@ -136,11 +145,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client1 = Mock(spec=["put_json"])
         mock_client1.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender1",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender1",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client1,
         )
@@ -148,11 +159,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client2 = Mock(spec=["put_json"])
         mock_client2.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender2",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender2",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client2,
         )
diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index 59fea93e49..ca18ad6553 100644
--- a/tests/replication/test_pusher_shard.py
+++ b/tests/replication/test_pusher_shard.py
@@ -38,11 +38,6 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         self.other_user_id = self.register_user("otheruser", "pass")
         self.other_access_token = self.login("otheruser", "pass")
 
-    def default_config(self):
-        conf = super().default_config()
-        conf["start_pushers"] = False
-        return conf
-
     def _create_pusher_and_send_msg(self, localpart):
         # Create a user that will get push notifications
         user_id = self.register_user(localpart, "pass")
@@ -92,8 +87,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         self.make_worker_hs(
-            "synapse.app.pusher",
-            {"start_pushers": False},
+            "synapse.app.generic_worker",
+            {"worker_name": "pusher1", "pusher_instances": ["pusher1"]},
             proxied_blacklisted_http_client=http_client_mock,
         )
 
@@ -122,9 +117,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         self.make_worker_hs(
-            "synapse.app.pusher",
+            "synapse.app.generic_worker",
             {
-                "start_pushers": True,
                 "worker_name": "pusher1",
                 "pusher_instances": ["pusher1", "pusher2"],
             },
@@ -137,9 +131,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         self.make_worker_hs(
-            "synapse.app.pusher",
+            "synapse.app.generic_worker",
             {
-                "start_pushers": True,
                 "worker_name": "pusher2",
                 "pusher_instances": ["pusher1", "pusher2"],
             },
diff --git a/tests/rest/key/v2/test_remote_key_resource.py b/tests/rest/key/v2/test_remote_key_resource.py
index 7f1fba1086..2bb6e27d94 100644
--- a/tests/rest/key/v2/test_remote_key_resource.py
+++ b/tests/rest/key/v2/test_remote_key_resource.py
@@ -11,7 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import urllib.parse
 from io import BytesIO, StringIO
 from typing import Any, Dict, Optional, Union
 from unittest.mock import Mock
@@ -65,9 +64,7 @@ class BaseRemoteKeyResourceTestCase(unittest.HomeserverTestCase):
             self.assertTrue(ignore_backoff)
             self.assertEqual(destination, server_name)
             key_id = "%s:%s" % (signing_key.alg, signing_key.version)
-            self.assertEqual(
-                path, "/_matrix/key/v2/server/%s" % (urllib.parse.quote(key_id),)
-            )
+            self.assertEqual(path, "/_matrix/key/v2/server")
 
             response = {
                 "server_name": server_name,
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index ee48920f84..5fa8bd2d98 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -156,7 +156,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
 
         last_event_id: str
 
-        def _assert_counts(noitf_count: int, highlight_count: int) -> None:
+        def _assert_counts(notif_count: int, highlight_count: int) -> None:
             counts = self.get_success(
                 self.store.db_pool.runInteraction(
                     "get-unread-counts",
@@ -168,13 +168,22 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count,
+                    notify_count=notif_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
             )
             self.assertEqual(counts.threads, {})
 
+            aggregate_counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-aggregate-unread-counts",
+                    self.store._get_unread_counts_by_room_for_user_txn,
+                    user_id,
+                )
+            )
+            self.assertEqual(aggregate_counts[room_id], notif_count)
+
         def _create_event(highlight: bool = False) -> str:
             result = self.helper.send_event(
                 room_id,
@@ -283,7 +292,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         last_event_id: str
 
         def _assert_counts(
-            noitf_count: int,
+            notif_count: int,
             highlight_count: int,
             thread_notif_count: int,
             thread_highlight_count: int,
@@ -299,7 +308,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count,
+                    notify_count=notif_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
@@ -318,6 +327,17 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             else:
                 self.assertEqual(counts.threads, {})
 
+            aggregate_counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-aggregate-unread-counts",
+                    self.store._get_unread_counts_by_room_for_user_txn,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                aggregate_counts[room_id], notif_count + thread_notif_count
+            )
+
         def _create_event(
             highlight: bool = False, thread_id: Optional[str] = None
         ) -> str:
@@ -454,7 +474,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         last_event_id: str
 
         def _assert_counts(
-            noitf_count: int,
+            notif_count: int,
             highlight_count: int,
             thread_notif_count: int,
             thread_highlight_count: int,
@@ -470,7 +490,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count,
+                    notify_count=notif_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
@@ -489,6 +509,17 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             else:
                 self.assertEqual(counts.threads, {})
 
+            aggregate_counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-aggregate-unread-counts",
+                    self.store._get_unread_counts_by_room_for_user_txn,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                aggregate_counts[room_id], notif_count + thread_notif_count
+            )
+
         def _create_event(
             highlight: bool = False, thread_id: Optional[str] = None
         ) -> str:
@@ -646,7 +677,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             )
             return result["event_id"]
 
-        def _assert_counts(noitf_count: int, thread_notif_count: int) -> None:
+        def _assert_counts(notif_count: int, thread_notif_count: int) -> None:
             counts = self.get_success(
                 self.store.db_pool.runInteraction(
                     "get-unread-counts",
@@ -658,7 +689,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count, unread_count=0, highlight_count=0
+                    notify_count=notif_count, unread_count=0, highlight_count=0
                 ),
             )
             if thread_notif_count:
diff --git a/tests/utils.py b/tests/utils.py
index 045a8b5fa7..d76bf9716a 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -125,7 +125,8 @@ def default_config(
     """
     config_dict = {
         "server_name": name,
-        "send_federation": False,
+        # Setting this to an empty list turns off federation sending.
+        "federation_sender_instances": [],
         "media_store_path": "media",
         # the test signing key is just an arbitrary ed25519 key to keep the config
         # parser happy
@@ -183,8 +184,9 @@ def default_config(
         # rooms will fail.
         "default_room_version": DEFAULT_ROOM_VERSION,
         # disable user directory updates, because they get done in the
-        # background, which upsets the test runner.
-        "update_user_directory": False,
+        # background, which upsets the test runner. Setting this to an
+        # (obviously) fake worker name disables updating the user directory.
+        "update_user_directory_from_worker": "does_not_exist_worker_name",
         "caches": {"global_factor": 1, "sync_response_cache_duration": 0},
         "listeners": [{"port": 0, "type": "http"}],
     }