summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock4
-rw-r--r--changelog.d/14376.misc1
-rw-r--r--changelog.d/14393.bugfix1
-rw-r--r--changelog.d/14400.misc1
-rw-r--r--changelog.d/14403.misc1
-rw-r--r--changelog.d/14404.misc1
-rw-r--r--changelog.d/14412.misc1
-rw-r--r--changelog.d/14449.misc1
-rw-r--r--changelog.d/14452.misc1
-rw-r--r--changelog.d/14468.misc1
-rw-r--r--changelog.d/14476.misc1
-rw-r--r--changelog.d/14479.misc1
-rw-r--r--changelog.d/14487.misc1
-rw-r--r--changelog.d/14490.misc1
-rw-r--r--changelog.d/14499.doc1
-rw-r--r--changelog.d/14500.misc1
-rw-r--r--changelog.d/14501.misc1
-rw-r--r--changelog.d/14502.misc1
-rw-r--r--changelog.d/14503.misc1
-rw-r--r--changelog.d/14504.misc1
-rw-r--r--changelog.d/14505.misc1
-rwxr-xr-xdocker/configure_workers_and_start.py5
-rw-r--r--docs/usage/administration/admin_api/README.md2
-rw-r--r--docs/workers.md7
-rw-r--r--mypy.ini4
-rw-r--r--poetry.lock39
-rwxr-xr-xscripts-dev/federation_client.py122
-rw-r--r--synapse/api/errors.py2
-rw-r--r--synapse/app/generic_worker.py103
-rw-r--r--synapse/config/logger.py5
-rw-r--r--synapse/config/ratelimiting.py5
-rw-r--r--synapse/config/workers.py6
-rw-r--r--synapse/crypto/keyring.py11
-rw-r--r--synapse/events/__init__.py3
-rw-r--r--synapse/events/builder.py1
-rw-r--r--synapse/federation/sender/per_destination_queue.py1
-rw-r--r--synapse/federation/transport/client.py11
-rw-r--r--synapse/federation/transport/server/_base.py4
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/e2e_room_keys.py5
-rw-r--r--synapse/handlers/federation.py19
-rw-r--r--synapse/handlers/identity.py2
-rw-r--r--synapse/handlers/oidc.py2
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/handlers/saml.py4
-rw-r--r--synapse/http/additional_resource.py3
-rw-r--r--synapse/http/federation/matrix_federation_agent.py9
-rw-r--r--synapse/http/matrixfederationclient.py3
-rw-r--r--synapse/http/proxyagent.py20
-rw-r--r--synapse/http/server.py2
-rw-r--r--synapse/http/site.py2
-rw-r--r--synapse/logging/context.py39
-rw-r--r--synapse/logging/opentracing.py4
-rw-r--r--synapse/module_api/__init__.py7
-rw-r--r--synapse/replication/http/_base.py2
-rw-r--r--synapse/replication/http/devices.py67
-rw-r--r--synapse/replication/slave/__init__.py13
-rw-r--r--synapse/replication/slave/storage/__init__.py13
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py50
-rw-r--r--synapse/replication/tcp/protocol.py2
-rw-r--r--synapse/rest/admin/users.py5
-rw-r--r--synapse/rest/client/keys.py68
-rw-r--r--synapse/rest/client/login.py2
-rw-r--r--synapse/rest/media/v1/media_repository.py4
-rw-r--r--synapse/rest/media/v1/thumbnailer.py4
-rw-r--r--synapse/server_notices/consent_server_notices.py5
-rw-r--r--synapse/server_notices/resource_limits_server_notices.py12
-rw-r--r--synapse/state/__init__.py8
-rw-r--r--synapse/storage/controllers/persist_events.py5
-rw-r--r--synapse/storage/database.py6
-rw-r--r--synapse/storage/databases/main/account_data.py30
-rw-r--r--synapse/storage/databases/main/devices.py38
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py8
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py7
-rw-r--r--synapse/storage/databases/main/events.py22
-rw-r--r--synapse/storage/databases/main/events_worker.py37
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py8
-rw-r--r--synapse/storage/databases/main/push_rule.py17
-rw-r--r--synapse/storage/databases/main/pusher.py24
-rw-r--r--synapse/storage/databases/main/receipts.py18
-rw-r--r--synapse/storage/databases/main/registration.py6
-rw-r--r--synapse/storage/databases/main/room.py8
-rw-r--r--synapse/storage/databases/main/user_directory.py9
-rw-r--r--synapse/storage/util/id_generators.py13
-rw-r--r--synapse/types.py4
-rw-r--r--synapse/util/async_helpers.py3
-rw-r--r--synapse/util/caches/__init__.py2
-rw-r--r--synapse/util/caches/deferred_cache.py2
-rw-r--r--synapse/util/caches/dictionary_cache.py9
-rw-r--r--synapse/util/caches/expiringcache.py2
-rw-r--r--synapse/util/caches/lrucache.py8
-rw-r--r--synapse/util/ratelimitutils.py2
-rw-r--r--synapse/util/threepids.py2
-rw-r--r--synapse/util/wheel_timer.py4
-rw-r--r--synapse/visibility.py29
-rw-r--r--tests/crypto/test_keyring.py12
-rw-r--r--tests/handlers/test_presence.py41
-rw-r--r--tests/http/__init__.py7
-rw-r--r--tests/module_api/test_api.py3
-rw-r--r--tests/replication/_base.py7
-rw-r--r--tests/replication/slave/storage/test_events.py7
-rw-r--r--tests/replication/test_multi_media_repo.py14
-rw-r--r--tests/server_notices/test_resource_limits_server_notices.py10
-rw-r--r--tests/storage/test_id_generators.py162
-rw-r--r--tests/test_visibility.py10
-rw-r--r--tests/unittest.py18
106 files changed, 774 insertions, 577 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8a8099bc6d..428cabc39a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -343,9 +343,9 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.87"
+version = "1.0.88"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45"
+checksum = "8e8b3801309262e8184d9687fb697586833e939767aea0dda89f5a8e650e8bd7"
 dependencies = [
  "itoa",
  "ryu",
diff --git a/changelog.d/14376.misc b/changelog.d/14376.misc
new file mode 100644
index 0000000000..2ca326fea6
--- /dev/null
+++ b/changelog.d/14376.misc
@@ -0,0 +1 @@
+Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).
diff --git a/changelog.d/14393.bugfix b/changelog.d/14393.bugfix
new file mode 100644
index 0000000000..97177bc62f
--- /dev/null
+++ b/changelog.d/14393.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.58.0 where a user with presence state 'org.matrix.msc3026.busy' would mistakenly be set to 'online' when calling `/sync` or `/events` on a worker process.
\ No newline at end of file
diff --git a/changelog.d/14400.misc b/changelog.d/14400.misc
new file mode 100644
index 0000000000..6e025329c4
--- /dev/null
+++ b/changelog.d/14400.misc
@@ -0,0 +1 @@
+Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication.
diff --git a/changelog.d/14403.misc b/changelog.d/14403.misc
new file mode 100644
index 0000000000..ff28a2712a
--- /dev/null
+++ b/changelog.d/14403.misc
@@ -0,0 +1 @@
+Faster joins: do not wait for full state when creating events to send.
diff --git a/changelog.d/14404.misc b/changelog.d/14404.misc
new file mode 100644
index 0000000000..b9ab525f2b
--- /dev/null
+++ b/changelog.d/14404.misc
@@ -0,0 +1 @@
+Faster joins: filter out non local events when a room doesn't have its full state.
diff --git a/changelog.d/14412.misc b/changelog.d/14412.misc
new file mode 100644
index 0000000000..4da061d461
--- /dev/null
+++ b/changelog.d/14412.misc
@@ -0,0 +1 @@
+Remove duplicated type information from type hints.
diff --git a/changelog.d/14449.misc b/changelog.d/14449.misc
new file mode 100644
index 0000000000..320c0b6fae
--- /dev/null
+++ b/changelog.d/14449.misc
@@ -0,0 +1 @@
+Fix type logic in TCP replication code that prevented correctly ignoring blank commands.
\ No newline at end of file
diff --git a/changelog.d/14452.misc b/changelog.d/14452.misc
new file mode 100644
index 0000000000..cb190c0823
--- /dev/null
+++ b/changelog.d/14452.misc
@@ -0,0 +1 @@
+Enable mypy's [`strict_equality` check](https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-strict-equality) by default.
\ No newline at end of file
diff --git a/changelog.d/14468.misc b/changelog.d/14468.misc
new file mode 100644
index 0000000000..2ca326fea6
--- /dev/null
+++ b/changelog.d/14468.misc
@@ -0,0 +1 @@
+Remove old stream ID tracking code. Contributed by Nick @Beeper (@fizzadar).
diff --git a/changelog.d/14476.misc b/changelog.d/14476.misc
new file mode 100644
index 0000000000..6e025329c4
--- /dev/null
+++ b/changelog.d/14476.misc
@@ -0,0 +1 @@
+Remove the `worker_main_http_uri` configuration setting. This is now handled via internal replication.
diff --git a/changelog.d/14479.misc b/changelog.d/14479.misc
new file mode 100644
index 0000000000..08edd2f929
--- /dev/null
+++ b/changelog.d/14479.misc
@@ -0,0 +1 @@
+`scripts-dev/federation_client`: Fix routing on servers with `.well-known` files.
\ No newline at end of file
diff --git a/changelog.d/14487.misc b/changelog.d/14487.misc
new file mode 100644
index 0000000000..f6b47a1d8e
--- /dev/null
+++ b/changelog.d/14487.misc
@@ -0,0 +1 @@
+Reduce default third party invite rate limit to 216 invites per day.
diff --git a/changelog.d/14490.misc b/changelog.d/14490.misc
new file mode 100644
index 0000000000..c0a4daa885
--- /dev/null
+++ b/changelog.d/14490.misc
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 0.9 where it would fail to fetch server keys whose IDs contain a forward slash.
diff --git a/changelog.d/14499.doc b/changelog.d/14499.doc
new file mode 100644
index 0000000000..34ea57ef43
--- /dev/null
+++ b/changelog.d/14499.doc
@@ -0,0 +1 @@
+Fixed link to 'Synapse administration endpoints'.
diff --git a/changelog.d/14500.misc b/changelog.d/14500.misc
new file mode 100644
index 0000000000..c5d70a70f7
--- /dev/null
+++ b/changelog.d/14500.misc
@@ -0,0 +1 @@
+Bump pygithub from 1.56 to 1.57.
diff --git a/changelog.d/14501.misc b/changelog.d/14501.misc
new file mode 100644
index 0000000000..3c240d38b5
--- /dev/null
+++ b/changelog.d/14501.misc
@@ -0,0 +1 @@
+Bump sentry-sdk from 1.10.1 to 1.11.0.
diff --git a/changelog.d/14502.misc b/changelog.d/14502.misc
new file mode 100644
index 0000000000..86a19900f1
--- /dev/null
+++ b/changelog.d/14502.misc
@@ -0,0 +1 @@
+Bump types-pillow from 9.2.2.1 to 9.3.0.1.
diff --git a/changelog.d/14503.misc b/changelog.d/14503.misc
new file mode 100644
index 0000000000..e627d35cde
--- /dev/null
+++ b/changelog.d/14503.misc
@@ -0,0 +1 @@
+Bump towncrier from 21.9.0 to 22.8.0.
diff --git a/changelog.d/14504.misc b/changelog.d/14504.misc
new file mode 100644
index 0000000000..e228ee46a5
--- /dev/null
+++ b/changelog.d/14504.misc
@@ -0,0 +1 @@
+Bump phonenumbers from 8.12.56 to 8.13.0.
diff --git a/changelog.d/14505.misc b/changelog.d/14505.misc
new file mode 100644
index 0000000000..45d97ec461
--- /dev/null
+++ b/changelog.d/14505.misc
@@ -0,0 +1 @@
+Bump serde_json from 1.0.87 to 1.0.88.
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 62b1bab297..c1e1544536 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -213,10 +213,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
         "listener_resources": ["client", "replication"],
         "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
         "shared_extra_conf": {},
-        "worker_extra_conf": (
-            "worker_main_http_uri: http://127.0.0.1:%d"
-            % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
-        ),
+        "worker_extra_conf": "",
     },
     "account_data": {
         "app": "synapse.app.generic_worker",
diff --git a/docs/usage/administration/admin_api/README.md b/docs/usage/administration/admin_api/README.md
index f11e0b19a6..c00de2dd44 100644
--- a/docs/usage/administration/admin_api/README.md
+++ b/docs/usage/administration/admin_api/README.md
@@ -19,7 +19,7 @@ already on your `$PATH` depending on how Synapse was installed.
 Finding your user's `access_token` is client-dependent, but will usually be shown in the client's settings.
 
 ## Making an Admin API request
-For security reasons, we [recommend](reverse_proxy.md#synapse-administration-endpoints)
+For security reasons, we [recommend](../../../reverse_proxy.md#synapse-administration-endpoints)
 that the Admin API (`/_synapse/admin/...`) should be hidden from public view using a
 reverse proxy. This means you should typically query the Admin API from a terminal on
 the machine which runs Synapse.
diff --git a/docs/workers.md b/docs/workers.md
index 7ee8801161..27e54c5846 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -135,8 +135,8 @@ In the config file for each worker, you must specify:
    [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)).
  * If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option
    with an `http` listener.
- * If handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
-   the main process (`worker_main_http_uri`).
+ * **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for
+   the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer.
 
 For example:
 
@@ -221,7 +221,6 @@ information.
     ^/_matrix/client/(api/v1|r0|v3|unstable)/search$
 
     # Encryption requests
-    # Note that ^/_matrix/client/(r0|v3|unstable)/keys/upload/ requires `worker_main_http_uri`
     ^/_matrix/client/(r0|v3|unstable)/keys/query$
     ^/_matrix/client/(r0|v3|unstable)/keys/changes$
     ^/_matrix/client/(r0|v3|unstable)/keys/claim$
@@ -376,7 +375,7 @@ responsible for
 - persisting them to the DB, and finally
 - updating the events stream.
 
-Because load is sharded in this way, you *must* restart all worker instances when 
+Because load is sharded in this way, you *must* restart all worker instances when
 adding or removing event persisters.
 
 An `event_persister` should not be mistaken for an `event_creator`.
diff --git a/mypy.ini b/mypy.ini
index 8f1141a239..4cd61e0484 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -11,6 +11,7 @@ warn_unused_ignores = True
 local_partial_types = True
 no_implicit_optional = True
 disallow_untyped_defs = True
+strict_equality = True
 
 files =
   docker/,
@@ -117,6 +118,9 @@ disallow_untyped_defs = True
 [mypy-tests.state.test_profile]
 disallow_untyped_defs = True
 
+[mypy-tests.storage.test_id_generators]
+disallow_untyped_defs = True
+
 [mypy-tests.storage.test_profile]
 disallow_untyped_defs = True
 
diff --git a/poetry.lock b/poetry.lock
index 8d468adf12..d9e4803a5f 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -663,7 +663,7 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
 
 [[package]]
 name = "phonenumbers"
-version = "8.12.56"
+version = "8.13.0"
 description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
 category = "main"
 optional = false
@@ -814,15 +814,15 @@ python-versions = ">=3.6"
 
 [[package]]
 name = "pygithub"
-version = "1.56"
+version = "1.57"
 description = "Use the full Github API v3"
 category = "dev"
 optional = false
-python-versions = ">=3.6"
+python-versions = ">=3.7"
 
 [package.dependencies]
 deprecated = "*"
-pyjwt = ">=2.0"
+pyjwt = ">=2.4.0"
 pynacl = ">=1.4.0"
 requests = ">=2.14.0"
 
@@ -1076,7 +1076,7 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
 
 [[package]]
 name = "sentry-sdk"
-version = "1.10.1"
+version = "1.11.0"
 description = "Python client for Sentry (https://sentry.io)"
 category = "main"
 optional = true
@@ -1098,6 +1098,7 @@ fastapi = ["fastapi (>=0.79.0)"]
 flask = ["blinker (>=1.1)", "flask (>=0.11)"]
 httpx = ["httpx (>=0.16.0)"]
 pure-eval = ["asttokens", "executing", "pure-eval"]
+pymongo = ["pymongo (>=3.1)"]
 pyspark = ["pyspark (>=2.4.4)"]
 quart = ["blinker (>=1.1)", "quart (>=0.16.1)"]
 rq = ["rq (>=0.6)"]
@@ -1256,11 +1257,11 @@ python-versions = ">= 3.5"
 
 [[package]]
 name = "towncrier"
-version = "21.9.0"
+version = "22.8.0"
 description = "Building newsfiles for your project."
 category = "dev"
 optional = false
-python-versions = "*"
+python-versions = ">=3.7"
 
 [package.dependencies]
 click = "*"
@@ -1268,7 +1269,7 @@ click-default-group = "*"
 incremental = "*"
 jinja2 = "*"
 setuptools = "*"
-tomli = {version = "*", markers = "python_version >= \"3.6\""}
+tomli = "*"
 
 [package.extras]
 dev = ["packaging"]
@@ -1439,7 +1440,7 @@ python-versions = "*"
 
 [[package]]
 name = "types-pillow"
-version = "9.2.2.1"
+version = "9.3.0.1"
 description = "Typing stubs for Pillow"
 category = "dev"
 optional = false
@@ -2257,8 +2258,8 @@ pathspec = [
     {file = "pathspec-0.9.0.tar.gz", hash = "sha256:e564499435a2673d586f6b2130bb5b95f04a3ba06f81b8f895b651a3c76aabb1"},
 ]
 phonenumbers = [
-    {file = "phonenumbers-8.12.56-py2.py3-none-any.whl", hash = "sha256:80a7422cf0999a6f9b7a2e6cfbdbbfcc56ab5b75414dc3b805bbec91276b64a3"},
-    {file = "phonenumbers-8.12.56.tar.gz", hash = "sha256:82a4f226c930d02dcdf6d4b29e4cfd8678991fe65c2efd5fdd143557186f0868"},
+    {file = "phonenumbers-8.13.0-py2.py3-none-any.whl", hash = "sha256:dbaea9e4005a976bcf18fbe2bb87cb9cd0a3f119136f04188ac412d7741cebf0"},
+    {file = "phonenumbers-8.13.0.tar.gz", hash = "sha256:93745d7afd38e246660bb601b07deac54eeb76c8e5e43f5e83333b0383a0a1e4"},
 ]
 pillow = [
     {file = "Pillow-9.3.0-1-cp37-cp37m-win32.whl", hash = "sha256:e6ea6b856a74d560d9326c0f5895ef8050126acfdc7ca08ad703eb0081e82b74"},
@@ -2419,8 +2420,8 @@ pyflakes = [
     {file = "pyflakes-2.5.0.tar.gz", hash = "sha256:491feb020dca48ccc562a8c0cbe8df07ee13078df59813b83959cbdada312ea3"},
 ]
 pygithub = [
-    {file = "PyGithub-1.56-py3-none-any.whl", hash = "sha256:d15f13d82165306da8a68aefc0f848a6f6432d5febbff13b60a94758ce3ef8b5"},
-    {file = "PyGithub-1.56.tar.gz", hash = "sha256:80c6d85cf0f9418ffeb840fd105840af694c4f17e102970badbaf678251f2a01"},
+    {file = "PyGithub-1.57-py3-none-any.whl", hash = "sha256:5822febeac2391f1306c55a99af2bc8f86c8bf82ded000030cd02c18f31b731f"},
+    {file = "PyGithub-1.57.tar.gz", hash = "sha256:c273f252b278fb81f1769505cc6921bdb6791e1cebd6ac850cc97dad13c31ff3"},
 ]
 pygments = [
     {file = "Pygments-2.11.2-py3-none-any.whl", hash = "sha256:44238f1b60a76d78fc8ca0528ee429702aae011c265fe6a8dd8b63049ae41c65"},
@@ -2568,8 +2569,8 @@ semantic-version = [
     {file = "semantic_version-2.10.0.tar.gz", hash = "sha256:bdabb6d336998cbb378d4b9db3a4b56a1e3235701dc05ea2690d9a997ed5041c"},
 ]
 sentry-sdk = [
-    {file = "sentry-sdk-1.10.1.tar.gz", hash = "sha256:105faf7bd7b7fa25653404619ee261527266b14103fe1389e0ce077bd23a9691"},
-    {file = "sentry_sdk-1.10.1-py2.py3-none-any.whl", hash = "sha256:06c0fa9ccfdc80d7e3b5d2021978d6eb9351fa49db9b5847cf4d1f2a473414ad"},
+    {file = "sentry-sdk-1.11.0.tar.gz", hash = "sha256:e7b78a1ddf97a5f715a50ab8c3f7a93f78b114c67307785ee828ef67a5d6f117"},
+    {file = "sentry_sdk-1.11.0-py2.py3-none-any.whl", hash = "sha256:f467e6c7fac23d4d42bc83eb049c400f756cd2d65ab44f0cc1165d0c7c3d40bc"},
 ]
 service-identity = [
     {file = "service-identity-21.1.0.tar.gz", hash = "sha256:6e6c6086ca271dc11b033d17c3a8bea9f24ebff920c587da090afc9519419d34"},
@@ -2720,8 +2721,8 @@ tornado = [
     {file = "tornado-6.1.tar.gz", hash = "sha256:33c6e81d7bd55b468d2e793517c909b139960b6c790a60b7991b9b6b76fb9791"},
 ]
 towncrier = [
-    {file = "towncrier-21.9.0-py2.py3-none-any.whl", hash = "sha256:fc5a88a2a54988e3a8ed2b60d553599da8330f65722cc607c839614ed87e0f92"},
-    {file = "towncrier-21.9.0.tar.gz", hash = "sha256:9cb6f45c16e1a1eec9d0e7651165e7be60cd0ab81d13a5c96ca97a498ae87f48"},
+    {file = "towncrier-22.8.0-py2.py3-none-any.whl", hash = "sha256:3b780c3d966e1b26414830aec3d15000654b31e64e024f3e5fd128b4c6eb8f47"},
+    {file = "towncrier-22.8.0.tar.gz", hash = "sha256:7d3839b033859b45fb55df82b74cfd702431933c0cc9f287a5a7ea3e05d042cb"},
 ]
 treq = [
     {file = "treq-22.2.0-py3-none-any.whl", hash = "sha256:27d95b07c5c14be3e7b280416139b036087617ad5595be913b1f9b3ce981b9b2"},
@@ -2808,8 +2809,8 @@ types-opentracing = [
     {file = "types_opentracing-2.4.10-py3-none-any.whl", hash = "sha256:66d9cfbbdc4a6f8ca8189a15ad26f0fe41cee84c07057759c5d194e2505b84c2"},
 ]
 types-pillow = [
-    {file = "types-Pillow-9.2.2.1.tar.gz", hash = "sha256:85c139e06e1c46ec5f9c634d5c54a156b0958d5d0e8be024ed353db0c804b426"},
-    {file = "types_Pillow-9.2.2.1-py3-none-any.whl", hash = "sha256:3a6a871cade8428433a21ef459bb0a65532b87d05f9e836a0664431ce445bdcf"},
+    {file = "types-Pillow-9.3.0.1.tar.gz", hash = "sha256:f3b7cada3fa496c78d75253c6b1f07a843d625f42e5639b320a72acaff6f7cfb"},
+    {file = "types_Pillow-9.3.0.1-py3-none-any.whl", hash = "sha256:79837755fe9659f29efd1016e9903ac4a500e0c73260483f07296bd6ca47668b"},
 ]
 types-psycopg2 = [
     {file = "types-psycopg2-2.9.21.1.tar.gz", hash = "sha256:f5532cf15afdc6b5ebb1e59b7d896617217321f488fd1fbd74e7efb94decfab6"},
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index 763dd02c47..b1d5e2e616 100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -46,11 +46,12 @@ import signedjson.key
 import signedjson.types
 import srvlookup
 import yaml
+from requests import PreparedRequest, Response
 from requests.adapters import HTTPAdapter
 from urllib3 import HTTPConnectionPool
 
 # uncomment the following to enable debug logging of http requests
-# from httplib import HTTPConnection
+# from http.client import HTTPConnection
 # HTTPConnection.debuglevel = 1
 
 
@@ -103,6 +104,7 @@ def request(
     destination: str,
     path: str,
     content: Optional[str],
+    verify_tls: bool,
 ) -> requests.Response:
     if method is None:
         if content is None:
@@ -141,7 +143,6 @@ def request(
     s.mount("matrix://", MatrixConnectionAdapter())
 
     headers: Dict[str, str] = {
-        "Host": destination,
         "Authorization": authorization_headers[0],
     }
 
@@ -152,7 +153,7 @@ def request(
         method=method,
         url=dest,
         headers=headers,
-        verify=False,
+        verify=verify_tls,
         data=content,
         stream=True,
     )
@@ -203,6 +204,12 @@ def main() -> None:
     parser.add_argument("--body", help="Data to send as the body of the HTTP request")
 
     parser.add_argument(
+        "--insecure",
+        action="store_true",
+        help="Disable TLS certificate verification",
+    )
+
+    parser.add_argument(
         "path", help="request path, including the '/_matrix/federation/...' prefix."
     )
 
@@ -227,6 +234,7 @@ def main() -> None:
         args.destination,
         args.path,
         content=args.body,
+        verify_tls=not args.insecure,
     )
 
     sys.stderr.write("Status Code: %d\n" % (result.status_code,))
@@ -254,36 +262,93 @@ def read_args_from_config(args: argparse.Namespace) -> None:
 
 
 class MatrixConnectionAdapter(HTTPAdapter):
+    def send(
+        self,
+        request: PreparedRequest,
+        *args: Any,
+        **kwargs: Any,
+    ) -> Response:
+        # overrides the send() method in the base class.
+
+        # We need to look for .well-known redirects before passing the request up to
+        # HTTPAdapter.send().
+        assert isinstance(request.url, str)
+        parsed = urlparse.urlsplit(request.url)
+        server_name = parsed.netloc
+        well_known = self._get_well_known(parsed.netloc)
+
+        if well_known:
+            server_name = well_known
+
+        # replace the scheme in the uri with https, so that cert verification is done
+        # also replace the hostname if we got a .well-known result
+        request.url = urlparse.urlunsplit(
+            ("https", server_name, parsed.path, parsed.query, parsed.fragment)
+        )
+
+        # at this point we also add the host header (otherwise urllib will add one
+        # based on the `host` from the connection returned by `get_connection`,
+        # which will be wrong if there is an SRV record).
+        request.headers["Host"] = server_name
+
+        return super().send(request, *args, **kwargs)
+
+    def get_connection(
+        self, url: str, proxies: Optional[Dict[str, str]] = None
+    ) -> HTTPConnectionPool:
+        # overrides the get_connection() method in the base class
+        parsed = urlparse.urlsplit(url)
+        (host, port, ssl_server_name) = self._lookup(parsed.netloc)
+        print(
+            f"Connecting to {host}:{port} with SNI {ssl_server_name}", file=sys.stderr
+        )
+        return self.poolmanager.connection_from_host(
+            host,
+            port=port,
+            scheme="https",
+            pool_kwargs={"server_hostname": ssl_server_name},
+        )
+
     @staticmethod
-    def lookup(s: str, skip_well_known: bool = False) -> Tuple[str, int]:
-        if s[-1] == "]":
+    def _lookup(server_name: str) -> Tuple[str, int, str]:
+        """
+        Do an SRV lookup on a server name and return the host:port to connect to
+        Given the server_name (after any .well-known lookup), return the host, port and
+        the ssl server name
+        """
+        if server_name[-1] == "]":
             # ipv6 literal (with no port)
-            return s, 8448
+            return server_name, 8448, server_name
 
-        if ":" in s:
-            out = s.rsplit(":", 1)
+        if ":" in server_name:
+            # explicit port
+            out = server_name.rsplit(":", 1)
             try:
                 port = int(out[1])
             except ValueError:
-                raise ValueError("Invalid host:port '%s'" % s)
-            return out[0], port
-
-        # try a .well-known lookup
-        if not skip_well_known:
-            well_known = MatrixConnectionAdapter.get_well_known(s)
-            if well_known:
-                return MatrixConnectionAdapter.lookup(well_known, skip_well_known=True)
+                raise ValueError("Invalid host:port '%s'" % (server_name,))
+            return out[0], port, out[0]
 
         try:
-            srv = srvlookup.lookup("matrix", "tcp", s)[0]
-            return srv.host, srv.port
+            srv = srvlookup.lookup("matrix", "tcp", server_name)[0]
+            print(
+                f"SRV lookup on _matrix._tcp.{server_name} gave {srv}",
+                file=sys.stderr,
+            )
+            return srv.host, srv.port, server_name
         except Exception:
-            return s, 8448
+            return server_name, 8448, server_name
 
     @staticmethod
-    def get_well_known(server_name: str) -> Optional[str]:
-        uri = "https://%s/.well-known/matrix/server" % (server_name,)
-        print("fetching %s" % (uri,), file=sys.stderr)
+    def _get_well_known(server_name: str) -> Optional[str]:
+        if ":" in server_name:
+            # explicit port, or ipv6 literal. Either way, no .well-known
+            return None
+
+        # TODO: check for ipv4 literals
+
+        uri = f"https://{server_name}/.well-known/matrix/server"
+        print(f"fetching {uri}", file=sys.stderr)
 
         try:
             resp = requests.get(uri)
@@ -304,19 +369,6 @@ class MatrixConnectionAdapter(HTTPAdapter):
             print("Invalid response from %s: %s" % (uri, e), file=sys.stderr)
         return None
 
-    def get_connection(
-        self, url: str, proxies: Optional[Dict[str, str]] = None
-    ) -> HTTPConnectionPool:
-        parsed = urlparse.urlparse(url)
-
-        (host, port) = self.lookup(parsed.netloc)
-        netloc = "%s:%d" % (host, port)
-        print("Connecting to %s" % (netloc,), file=sys.stderr)
-        url = urlparse.urlunparse(
-            ("https", netloc, parsed.path, parsed.params, parsed.query, parsed.fragment)
-        )
-        return super().get_connection(url, proxies)
-
 
 if __name__ == "__main__":
     main()
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 400dd12aba..e2cfcea0f2 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -713,7 +713,7 @@ class HttpResponseException(CodeMessageException):
         set to the reason code from the HTTP response.
 
         Returns:
-            SynapseError:
+            The error converted to a SynapseError.
         """
         # try to parse the body as json, to get better errcode/msg, but
         # default to M_UNKNOWN with the HTTP status as the error text
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1d9aef45c2..74909b7d4a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -14,14 +14,12 @@
 # limitations under the License.
 import logging
 import sys
-from typing import Dict, List, Optional, Tuple
+from typing import Dict, List
 
-from twisted.internet import address
 from twisted.web.resource import Resource
 
 import synapse
 import synapse.events
-from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.api.urls import (
     CLIENT_API_PREFIX,
     FEDERATION_PREFIX,
@@ -43,8 +41,6 @@ from synapse.config.logger import setup_logging
 from synapse.config.server import ListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.server import JsonResource, OptionsResource
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.http.site import SynapseRequest
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
@@ -70,12 +66,12 @@ from synapse.rest.client import (
     versions,
     voip,
 )
-from synapse.rest.client._base import client_patterns
 from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
 from synapse.rest.client.devices import DevicesRestServlet
 from synapse.rest.client.keys import (
     KeyChangesServlet,
     KeyQueryServlet,
+    KeyUploadServlet,
     OneTimeKeyServlet,
 )
 from synapse.rest.client.register import (
@@ -132,107 +128,12 @@ from synapse.storage.databases.main.transactions import TransactionWorkerStore
 from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
 from synapse.storage.databases.main.user_directory import UserDirectoryStore
 from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
-from synapse.types import JsonDict
 from synapse.util import SYNAPSE_VERSION
 from synapse.util.httpresourcetree import create_resource_tree
 
 logger = logging.getLogger("synapse.app.generic_worker")
 
 
-class KeyUploadServlet(RestServlet):
-    """An implementation of the `KeyUploadServlet` that responds to read only
-    requests, but otherwise proxies through to the master instance.
-    """
-
-    PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
-
-    def __init__(self, hs: HomeServer):
-        """
-        Args:
-            hs: server
-        """
-        super().__init__()
-        self.auth = hs.get_auth()
-        self.store = hs.get_datastores().main
-        self.http_client = hs.get_simple_http_client()
-        self.main_uri = hs.config.worker.worker_main_http_uri
-
-    async def on_POST(
-        self, request: SynapseRequest, device_id: Optional[str]
-    ) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request, allow_guest=True)
-        user_id = requester.user.to_string()
-        body = parse_json_object_from_request(request)
-
-        if device_id is not None:
-            # passing the device_id here is deprecated; however, we allow it
-            # for now for compatibility with older clients.
-            if requester.device_id is not None and device_id != requester.device_id:
-                logger.warning(
-                    "Client uploading keys for a different device "
-                    "(logged in as %s, uploading for %s)",
-                    requester.device_id,
-                    device_id,
-                )
-        else:
-            device_id = requester.device_id
-
-        if device_id is None:
-            raise SynapseError(
-                400, "To upload keys, you must pass device_id when authenticating"
-            )
-
-        if body:
-            # They're actually trying to upload something, proxy to main synapse.
-
-            # Proxy headers from the original request, such as the auth headers
-            # (in case the access token is there) and the original IP /
-            # User-Agent of the request.
-            headers: Dict[bytes, List[bytes]] = {
-                header: list(request.requestHeaders.getRawHeaders(header, []))
-                for header in (b"Authorization", b"User-Agent")
-            }
-            # Add the previous hop to the X-Forwarded-For header.
-            x_forwarded_for = list(
-                request.requestHeaders.getRawHeaders(b"X-Forwarded-For", [])
-            )
-            # we use request.client here, since we want the previous hop, not the
-            # original client (as returned by request.getClientAddress()).
-            if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
-                previous_host = request.client.host.encode("ascii")
-                # If the header exists, add to the comma-separated list of the first
-                # instance of the header. Otherwise, generate a new header.
-                if x_forwarded_for:
-                    x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
-                    x_forwarded_for.extend(x_forwarded_for[1:])
-                else:
-                    x_forwarded_for = [previous_host]
-            headers[b"X-Forwarded-For"] = x_forwarded_for
-
-            # Replicate the original X-Forwarded-Proto header. Note that
-            # XForwardedForRequest overrides isSecure() to give us the original protocol
-            # used by the client, as opposed to the protocol used by our upstream proxy
-            # - which is what we want here.
-            headers[b"X-Forwarded-Proto"] = [
-                b"https" if request.isSecure() else b"http"
-            ]
-
-            try:
-                result = await self.http_client.post_json_get_json(
-                    self.main_uri + request.uri.decode("ascii"), body, headers=headers
-                )
-            except HttpResponseException as e:
-                raise e.to_synapse_error() from e
-            except RequestSendFailed as e:
-                raise SynapseError(502, "Failed to talk to master") from e
-
-            return 200, result
-        else:
-            # Just interested in counts.
-            result = await self.store.count_e2e_one_time_keys(user_id, device_id)
-            return 200, {"one_time_key_counts": result}
-
-
 class GenericWorkerSlavedStore(
     # FIXME(#3714): We need to add UserDirectoryStore as we write directly
     # rather than going via the correct worker.
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 94d1150415..5468b963a2 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -317,10 +317,9 @@ def setup_logging(
     Set up the logging subsystem.
 
     Args:
-        config (LoggingConfig | synapse.config.worker.WorkerConfig):
-            configuration data
+        config: configuration data
 
-        use_worker_options (bool): True to use the 'worker_log_config' option
+        use_worker_options: True to use the 'worker_log_config' option
             instead of 'log_config'.
 
         logBeginner: The Twisted logBeginner to use.
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 1ed001e105..5c13fe428a 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -150,8 +150,5 @@ class RatelimitConfig(Config):
 
         self.rc_third_party_invite = RatelimitSettings(
             config.get("rc_third_party_invite", {}),
-            defaults={
-                "per_second": self.rc_message.per_second,
-                "burst_count": self.rc_message.burst_count,
-            },
+            defaults={"per_second": 0.0025, "burst_count": 5},
         )
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 88b3168cbc..913b83e174 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -162,7 +162,13 @@ class WorkerConfig(Config):
         self.worker_name = config.get("worker_name", self.worker_app)
         self.instance_name = self.worker_name or "master"
 
+        # FIXME: Remove this check after a suitable amount of time.
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+        if self.worker_main_http_uri is not None:
+            logger.warning(
+                "The config option worker_main_http_uri is unused since Synapse 1.73. "
+                "It can be safely removed from your configuration."
+            )
 
         # This option is really only here to support `--manhole` command line
         # argument.
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index c88afb2986..ed15f88350 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -213,7 +213,7 @@ class Keyring:
 
     def verify_json_objects_for_server(
         self, server_and_json: Iterable[Tuple[str, dict, int]]
-    ) -> List[defer.Deferred]:
+    ) -> List["defer.Deferred[None]"]:
         """Bulk verifies signatures of json objects, bulk fetching keys as
         necessary.
 
@@ -226,10 +226,9 @@ class Keyring:
                 valid.
 
         Returns:
-            List<Deferred[None]>: for each input triplet, a deferred indicating success
-                or failure to verify each json object's signature for the given
-                server_name. The deferreds run their callbacks in the sentinel
-                logcontext.
+            For each input triplet, a deferred indicating success or failure to
+            verify each json object's signature for the given server_name. The
+            deferreds run their callbacks in the sentinel logcontext.
         """
         return [
             run_in_background(
@@ -858,7 +857,7 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
                 response = await self.client.get_json(
                     destination=server_name,
                     path="/_matrix/key/v2/server/"
-                    + urllib.parse.quote(requested_key_id),
+                    + urllib.parse.quote(requested_key_id, safe=""),
                     ignore_backoff=True,
                     # we only give the remote server 10s to respond. It should be an
                     # easy request to handle, so if it doesn't reply within 10s, it's
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 030c3ca408..8aca9a3ab9 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -597,8 +597,7 @@ def _event_type_from_format_version(
         format_version: The event format version
 
     Returns:
-        type: A type that can be initialized as per the initializer of
-        `FrozenEvent`
+        A type that can be initialized as per the initializer of `FrozenEvent`
     """
 
     if format_version == EventFormatVersions.ROOM_V1_V2:
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index e2ee10dd3d..d62906043f 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -128,6 +128,7 @@ class EventBuilder:
                 state_filter=StateFilter.from_types(
                     auth_types_for_event(self.room_version, self)
                 ),
+                await_full_state=False,
             )
             auth_event_ids = self._event_auth_handler.compute_auth_events(
                 self, state_ids
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 084c45a95c..3ae5e8634c 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -505,6 +505,7 @@ class PerDestinationQueue:
                     new_pdus = await filter_events_for_server(
                         self._storage_controllers,
                         self._destination,
+                        self._server_name,
                         new_pdus,
                         redact=False,
                     )
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index cd39d4d111..a3cfc701cd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -280,12 +280,11 @@ class TransportLayerClient:
         Note that this does not append any events to any graphs.
 
         Args:
-            destination (str): address of remote homeserver
-            room_id (str): room to join/leave
-            user_id (str): user to be joined/left
-            membership (str): one of join/leave
-            params (dict[str, str|Iterable[str]]): Query parameters to include in the
-                request.
+            destination: address of remote homeserver
+            room_id: room to join/leave
+            user_id: user to be joined/left
+            membership: one of join/leave
+            params: Query parameters to include in the request.
 
         Returns:
             Succeeds when we get a 2xx HTTP response. The result
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 1db8009d6c..cdaf0d5de7 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -224,10 +224,10 @@ class BaseFederationServlet:
 
         With arguments:
 
-            origin (unicode|None): The authenticated server_name of the calling server,
+            origin (str|None): The authenticated server_name of the calling server,
                 unless REQUIRE_AUTH is set to False and authentication failed.
 
-            content (unicode|None): decoded json body of the request. None if the
+            content (str|None): decoded json body of the request. None if the
                 request was a GET.
 
             query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index a9912c467d..bf1221f523 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -870,7 +870,7 @@ class E2eKeysHandler:
         - signatures of the user's master key by the user's devices.
 
         Args:
-            user_id (string): the user uploading the keys
+            user_id: the user uploading the keys
             signatures (dict[string, dict]): map of devices to signed keys
 
         Returns:
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 28dc08c22a..83f53ceb88 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -377,8 +377,9 @@ class E2eRoomKeysHandler:
         """Deletes a given version of the user's e2e_room_keys backup
 
         Args:
-            user_id(str): the user whose current backup version we're deleting
-            version(str): the version id of the backup being deleted
+            user_id: the user whose current backup version we're deleting
+            version: Optional. the version ID of the backup version we're deleting
+                If missing, we delete the current backup version info.
         Raises:
             NotFoundError: if this backup version doesn't exist
         """
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5fc3b8bc8c..d92582fd5c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -379,6 +379,7 @@ class FederationHandler:
             filtered_extremities = await filter_events_for_server(
                 self._storage_controllers,
                 self.server_name,
+                self.server_name,
                 events_to_check,
                 redact=False,
                 check_history_visibility_only=True,
@@ -1231,7 +1232,9 @@ class FederationHandler:
     async def on_backfill_request(
         self, origin: str, room_id: str, pdu_list: List[str], limit: int
     ) -> List[EventBase]:
-        await self._event_auth_handler.assert_host_in_room(room_id, origin)
+        # We allow partially joined rooms since in this case we are filtering out
+        # non-local events in `filter_events_for_server`.
+        await self._event_auth_handler.assert_host_in_room(room_id, origin, True)
 
         # Synapse asks for 100 events per backfill request. Do not allow more.
         limit = min(limit, 100)
@@ -1252,7 +1255,7 @@ class FederationHandler:
         )
 
         events = await filter_events_for_server(
-            self._storage_controllers, origin, events
+            self._storage_controllers, origin, self.server_name, events
         )
 
         return events
@@ -1283,7 +1286,7 @@ class FederationHandler:
         await self._event_auth_handler.assert_host_in_room(event.room_id, origin)
 
         events = await filter_events_for_server(
-            self._storage_controllers, origin, [event]
+            self._storage_controllers, origin, self.server_name, [event]
         )
         event = events[0]
         return event
@@ -1296,7 +1299,9 @@ class FederationHandler:
         latest_events: List[str],
         limit: int,
     ) -> List[EventBase]:
-        await self._event_auth_handler.assert_host_in_room(room_id, origin)
+        # We allow partially joined rooms since in this case we are filtering out
+        # non-local events in `filter_events_for_server`.
+        await self._event_auth_handler.assert_host_in_room(room_id, origin, True)
 
         # Only allow up to 20 events to be retrieved per request.
         limit = min(limit, 20)
@@ -1309,7 +1314,7 @@ class FederationHandler:
         )
 
         missing_events = await filter_events_for_server(
-            self._storage_controllers, origin, missing_events
+            self._storage_controllers, origin, self.server_name, missing_events
         )
 
         return missing_events
@@ -1596,8 +1601,8 @@ class FederationHandler:
         Fetch the complexity of a remote room over federation.
 
         Args:
-            remote_room_hosts (list[str]): The remote servers to ask.
-            room_id (str): The room ID to ask about.
+            remote_room_hosts: The remote servers to ask.
+            room_id: The room ID to ask about.
 
         Returns:
             Dict contains the complexity
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 93d09e9939..848e46eb9b 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -711,7 +711,7 @@ class IdentityHandler:
             inviter_display_name: The current display name of the
                 inviter.
             inviter_avatar_url: The URL of the inviter's avatar.
-            id_access_token (str): The access token to authenticate to the identity
+            id_access_token: The access token to authenticate to the identity
                 server with
 
         Returns:
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 867973dcca..41c675f408 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -787,7 +787,7 @@ class OidcProvider:
                 Must include an ``access_token`` field.
 
         Returns:
-            UserInfo: an object representing the user.
+            an object representing the user.
         """
         logger.debug("Using the OAuth2 access_token to request userinfo")
         metadata = await self.load_metadata()
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0066d63987..cf08737d11 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -201,7 +201,7 @@ class BasePresenceHandler(abc.ABC):
         """Get the current presence state for multiple users.
 
         Returns:
-            dict: `user_id` -> `UserPresenceState`
+            A mapping of `user_id` -> `UserPresenceState`
         """
         states = {}
         missing = []
@@ -478,7 +478,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
             return _NullContextManager()
 
         prev_state = await self.current_state_for_user(user_id)
-        if prev_state != PresenceState.BUSY:
+        if prev_state.state != PresenceState.BUSY:
             # We set state here but pass ignore_status_msg = True as we don't want to
             # cause the status message to be cleared.
             # Note that this causes last_active_ts to be incremented which is not
diff --git a/synapse/handlers/saml.py b/synapse/handlers/saml.py
index 9602f0d0bb..874860d461 100644
--- a/synapse/handlers/saml.py
+++ b/synapse/handlers/saml.py
@@ -441,7 +441,7 @@ class DefaultSamlMappingProvider:
             client_redirect_url: where the client wants to redirect to
 
         Returns:
-            dict: A dict containing new user attributes. Possible keys:
+            A dict containing new user attributes. Possible keys:
                 * mxid_localpart (str): Required. The localpart of the user's mxid
                 * displayname (str): The displayname of the user
                 * emails (list[str]): Any emails for the user
@@ -483,7 +483,7 @@ class DefaultSamlMappingProvider:
         Args:
             config: A dictionary containing configuration options for this provider
         Returns:
-            SamlConfig: A custom config object for this module
+            A custom config object for this module
         """
         # Parse config options and use defaults where necessary
         mxid_source_attribute = config.get("mxid_source_attribute", "uid")
diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py
index 6a9f6635d2..8729630581 100644
--- a/synapse/http/additional_resource.py
+++ b/synapse/http/additional_resource.py
@@ -45,8 +45,7 @@ class AdditionalResource(DirectServeJsonResource):
 
         Args:
             hs: homeserver
-            handler ((twisted.web.server.Request) -> twisted.internet.defer.Deferred):
-                function to be called to handle the request.
+            handler: function to be called to handle the request.
         """
         super().__init__()
         self._handler = handler
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 2f0177f1e2..0359231e7d 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -155,11 +155,10 @@ class MatrixFederationAgent:
                 a file for a file upload).  Or None if the request is to have
                 no body.
         Returns:
-            Deferred[twisted.web.iweb.IResponse]:
-                fires when the header of the response has been received (regardless of the
-                response status code). Fails if there is any problem which prevents that
-                response from being received (including problems that prevent the request
-                from being sent).
+            A deferred which fires when the header of the response has been received
+            (regardless of the response status code). Fails if there is any problem
+            which prevents that response from being received (including problems that
+            prevent the request from being sent).
         """
         # We use urlparse as that will set `port` to None if there is no
         # explicit port.
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3c35b1d2c7..b92f1d3d1a 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -951,8 +951,7 @@ class MatrixFederationHttpClient:
 
             args: query params
         Returns:
-            dict|list: Succeeds when we get a 2xx HTTP response. The
-            result will be the decoded JSON body.
+            Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
 
         Raises:
             HttpResponseException: If we get an HTTP response code >= 300
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 1f8227896f..18899bc6d1 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -34,7 +34,7 @@ from twisted.web.client import (
 )
 from twisted.web.error import SchemeNotSupported
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS
+from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse
 
 from synapse.http import redact_uri
 from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
@@ -134,7 +134,7 @@ class ProxyAgent(_AgentBase):
         uri: bytes,
         headers: Optional[Headers] = None,
         bodyProducer: Optional[IBodyProducer] = None,
-    ) -> defer.Deferred:
+    ) -> "defer.Deferred[IResponse]":
         """
         Issue a request to the server indicated by the given uri.
 
@@ -157,17 +157,17 @@ class ProxyAgent(_AgentBase):
                 a file upload). Or, None if the request is to have no body.
 
         Returns:
-            Deferred[IResponse]: completes when the header of the response has
-                 been received (regardless of the response status code).
+            A deferred which completes when the header of the response has
+            been received (regardless of the response status code).
 
-                 Can fail with:
-                    SchemeNotSupported: if the uri is not http or https
+            Can fail with:
+                SchemeNotSupported: if the uri is not http or https
 
-                    twisted.internet.error.TimeoutError if the server we are connecting
-                        to (proxy or destination) does not accept a connection before
-                        connectTimeout.
+                twisted.internet.error.TimeoutError if the server we are connecting
+                    to (proxy or destination) does not accept a connection before
+                    connectTimeout.
 
-                    ... other things too.
+                ... other things too.
         """
         uri = uri.strip()
         if not _VALID_URI.match(uri):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index b26e34bceb..051a1899a0 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -267,7 +267,7 @@ class HttpServer(Protocol):
                 request. The first argument will be the request object and
                 subsequent arguments will be any matched groups from the regex.
                 This should return either tuple of (code, response), or None.
-            servlet_classname (str): The name of the handler to be used in prometheus
+            servlet_classname: The name of the handler to be used in prometheus
                 and opentracing logs.
         """
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 3dbd541fed..6a1dbf7f33 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -400,7 +400,7 @@ class SynapseRequest(Request):
         be sure to call finished_processing.
 
         Args:
-            servlet_name (str): the name of the servlet which will be
+            servlet_name: the name of the servlet which will be
                 processing this request. This is used in the metrics.
 
                 It is possible to update this afterwards by updating
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 6a08ffed64..f62bea968f 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -117,8 +117,7 @@ class ContextResourceUsage:
         """Create a new ContextResourceUsage
 
         Args:
-            copy_from (ContextResourceUsage|None): if not None, an object to
-                copy stats from
+            copy_from: if not None, an object to copy stats from
         """
         if copy_from is None:
             self.reset()
@@ -162,7 +161,7 @@ class ContextResourceUsage:
         """Add another ContextResourceUsage's stats to this one's.
 
         Args:
-            other (ContextResourceUsage): the other resource usage object
+            other: the other resource usage object
         """
         self.ru_utime += other.ru_utime
         self.ru_stime += other.ru_stime
@@ -342,7 +341,7 @@ class LoggingContext:
         called directly.
 
         Returns:
-            LoggingContext: the current logging context
+            The current logging context
         """
         warnings.warn(
             "synapse.logging.context.LoggingContext.current_context() is deprecated "
@@ -362,7 +361,8 @@ class LoggingContext:
         called directly.
 
         Args:
-            context(LoggingContext): The context to activate.
+            context: The context to activate.
+
         Returns:
             The context that was previously active
         """
@@ -474,8 +474,7 @@ class LoggingContext:
         """Get resources used by this logcontext so far.
 
         Returns:
-            ContextResourceUsage: a *copy* of the object tracking resource
-                usage so far
+            A *copy* of the object tracking resource usage so far
         """
         # we always return a copy, for consistency
         res = self._resource_usage.copy()
@@ -663,7 +662,8 @@ def current_context() -> LoggingContextOrSentinel:
 def set_current_context(context: LoggingContextOrSentinel) -> LoggingContextOrSentinel:
     """Set the current logging context in thread local storage
     Args:
-        context(LoggingContext): The context to activate.
+        context: The context to activate.
+
     Returns:
         The context that was previously active
     """
@@ -700,7 +700,7 @@ def nested_logging_context(suffix: str) -> LoggingContext:
         suffix: suffix to add to the parent context's 'name'.
 
     Returns:
-        LoggingContext: new logging context.
+        A new logging context.
     """
     curr_context = current_context()
     if not curr_context:
@@ -898,20 +898,19 @@ def defer_to_thread(
     on it.
 
     Args:
-        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
-            the Deferred will be invoked, and whose threadpool we should use for the
-            function.
+        reactor: The reactor in whose main thread the Deferred will be invoked,
+            and whose threadpool we should use for the function.
 
             Normally this will be hs.get_reactor().
 
-        f (callable): The function to call.
+        f: The function to call.
 
         args: positional arguments to pass to f.
 
         kwargs: keyword arguments to pass to f.
 
     Returns:
-        Deferred: A Deferred which fires a callback with the result of `f`, or an
+        A Deferred which fires a callback with the result of `f`, or an
             errback if `f` throws an exception.
     """
     return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
@@ -939,20 +938,20 @@ def defer_to_threadpool(
     on it.
 
     Args:
-        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
-            the Deferred will be invoked. Normally this will be hs.get_reactor().
+        reactor: The reactor in whose main thread the Deferred will be invoked.
+            Normally this will be hs.get_reactor().
 
-        threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
-            running `f`. Normally this will be hs.get_reactor().getThreadPool().
+        threadpool: The threadpool to use for running `f`. Normally this will be
+            hs.get_reactor().getThreadPool().
 
-        f (callable): The function to call.
+        f: The function to call.
 
         args: positional arguments to pass to f.
 
         kwargs: keyword arguments to pass to f.
 
     Returns:
-        Deferred: A Deferred which fires a callback with the result of `f`, or an
+        A Deferred which fires a callback with the result of `f`, or an
             errback if `f` throws an exception.
     """
     curr_context = current_context()
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 8ce5a2a338..b69060854f 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -721,7 +721,7 @@ def inject_header_dict(
         destination: address of entity receiving the span context. Must be given unless
             check_destination is False. The context will only be injected if the
             destination matches the opentracing whitelist
-        check_destination (bool): If false, destination will be ignored and the context
+        check_destination: If false, destination will be ignored and the context
             will always be injected.
 
     Note:
@@ -780,7 +780,7 @@ def get_active_span_text_map(destination: Optional[str] = None) -> Dict[str, str
         destination: the name of the remote server.
 
     Returns:
-        dict: the active span's context if opentracing is enabled, otherwise empty.
+        the active span's context if opentracing is enabled, otherwise empty.
     """
 
     if destination and not whitelisted_homeserver(destination):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 30e689d00d..1adc1fd64f 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -787,7 +787,7 @@ class ModuleApi:
         Added in Synapse v0.25.0.
 
         Args:
-            access_token(str): access token
+            access_token: access token
 
         Returns:
             twisted.internet.defer.Deferred - resolves once the access token
@@ -832,7 +832,7 @@ class ModuleApi:
             **kwargs: named args to be passed to func
 
         Returns:
-            Deferred[object]: result of func
+            Result of func
         """
         # type-ignore: See https://github.com/python/mypy/issues/8862
         return defer.ensureDeferred(
@@ -924,8 +924,7 @@ class ModuleApi:
                 to represent 'any') of the room state to acquire.
 
         Returns:
-            twisted.internet.defer.Deferred[list(synapse.events.FrozenEvent)]:
-                The filtered state events in the room.
+            The filtered state events in the room.
         """
         state_ids = yield defer.ensureDeferred(
             self._storage_controllers.state.get_current_state_ids(
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 5e661f8c73..3f4d3fc51a 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -153,7 +153,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         argument list.
 
         Returns:
-            dict: If POST/PUT request then dictionary must be JSON serialisable,
+            If POST/PUT request then dictionary must be JSON serialisable,
             otherwise must be appropriate for adding as query args.
         """
         return {}
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 3d63645726..c21629def8 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Tuple
 from twisted.web.server import Request
 
 from synapse.http.server import HttpServer
+from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
 from synapse.types import JsonDict
 
@@ -78,5 +79,71 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
         return 200, user_devices
 
 
+class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
+    """Ask master to upload keys for the user and send them out over federation to
+    update other servers.
+
+    For now, only the master is permitted to handle key upload requests;
+    any worker can handle key query requests (since they're read-only).
+
+    Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
+    the main process to accomplish this.
+
+    Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
+    Request format(borrowed and expanded from KeyUploadServlet):
+
+        POST /_synapse/replication/upload_keys_for_user
+
+    {
+        "user_id": "<user_id>",
+        "device_id": "<device_id>",
+        "keys": {
+            ....this part can be found in KeyUploadServlet in rest/client/keys.py....
+        }
+    }
+
+    Response is equivalent to ` /_matrix/client/v3/keys/upload` found in KeyUploadServlet
+
+    """
+
+    NAME = "upload_keys_for_user"
+    PATH_ARGS = ()
+    CACHE = False
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
+        self.store = hs.get_datastores().main
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, device_id: str, keys: JsonDict
+    ) -> JsonDict:
+
+        return {
+            "user_id": user_id,
+            "device_id": device_id,
+            "keys": keys,
+        }
+
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request
+    ) -> Tuple[int, JsonDict]:
+        content = parse_json_object_from_request(request)
+
+        user_id = content["user_id"]
+        device_id = content["device_id"]
+        keys = content["keys"]
+
+        results = await self.e2e_keys_handler.upload_keys_for_user(
+            user_id, device_id, keys
+        )
+
+        return 200, results
+
+
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
+    ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/__init__.py b/synapse/replication/slave/__init__.py
deleted file mode 100644
index f43a360a80..0000000000
--- a/synapse/replication/slave/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/synapse/replication/slave/storage/__init__.py b/synapse/replication/slave/storage/__init__.py
deleted file mode 100644
index f43a360a80..0000000000
--- a/synapse/replication/slave/storage/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
deleted file mode 100644
index 8f3f953ed4..0000000000
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import List, Optional, Tuple
-
-from synapse.storage.database import LoggingDatabaseConnection
-from synapse.storage.util.id_generators import AbstractStreamIdTracker, _load_current_id
-
-
-class SlavedIdTracker(AbstractStreamIdTracker):
-    """Tracks the "current" stream ID of a stream with a single writer.
-
-    See `AbstractStreamIdTracker` for more details.
-
-    Note that this class does not work correctly when there are multiple
-    writers.
-    """
-
-    def __init__(
-        self,
-        db_conn: LoggingDatabaseConnection,
-        table: str,
-        column: str,
-        extra_tables: Optional[List[Tuple[str, str]]] = None,
-        step: int = 1,
-    ):
-        self.step = step
-        self._current = _load_current_id(db_conn, table, column, step)
-        if extra_tables:
-            for table, column in extra_tables:
-                self.advance(None, _load_current_id(db_conn, table, column))
-
-    def advance(self, instance_name: Optional[str], new_id: int) -> None:
-        self._current = (max if self.step > 0 else min)(self._current, new_id)
-
-    def get_current_token(self) -> int:
-        return self._current
-
-    def get_current_token_for_writer(self, instance_name: str) -> int:
-        return self.get_current_token()
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 7763ffb2d0..56a5c21910 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -245,7 +245,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self._parse_and_dispatch_line(line)
 
     def _parse_and_dispatch_line(self, line: bytes) -> None:
-        if line.strip() == "":
+        if line.strip() == b"":
             # Ignore blank lines
             return
 
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 1951b8a9f2..6e0c44be2a 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -903,8 +903,9 @@ class PushersRestServlet(RestServlet):
         @user:server/pushers
 
     Returns:
-        pushers: Dictionary containing pushers information.
-        total: Number of pushers in dictionary `pushers`.
+        A dictionary with keys:
+            pushers: Dictionary containing pushers information.
+            total: Number of pushers in dictionary `pushers`.
     """
 
     PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/pushers$")
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index f653d2a3e1..ee038c7192 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -27,6 +27,7 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag
+from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
 from synapse.rest.client._base import client_patterns, interactive_auth_handler
 from synapse.types import JsonDict, StreamToken
 from synapse.util.cancellation import cancellable
@@ -43,24 +44,48 @@ class KeyUploadServlet(RestServlet):
     Content-Type: application/json
 
     {
-      "device_keys": {
-        "user_id": "<user_id>",
-        "device_id": "<device_id>",
-        "valid_until_ts": <millisecond_timestamp>,
-        "algorithms": [
-          "m.olm.curve25519-aes-sha2",
-        ]
-        "keys": {
-          "<algorithm>:<device_id>": "<key_base64>",
+        "device_keys": {
+            "user_id": "<user_id>",
+            "device_id": "<device_id>",
+            "valid_until_ts": <millisecond_timestamp>,
+            "algorithms": [
+                "m.olm.curve25519-aes-sha2",
+            ]
+            "keys": {
+                "<algorithm>:<device_id>": "<key_base64>",
+            },
+            "signatures:" {
+                "<user_id>" {
+                    "<algorithm>:<device_id>": "<signature_base64>"
+                }
+            }
+        },
+        "fallback_keys": {
+            "<algorithm>:<device_id>": "<key_base64>",
+            "signed_<algorithm>:<device_id>": {
+                "fallback": true,
+                "key": "<key_base64>",
+                "signatures": {
+                    "<user_id>": {
+                        "<algorithm>:<device_id>": "<key_base64>"
+                    }
+                }
+            }
+        }
+        "one_time_keys": {
+            "<algorithm>:<key_id>": "<key_base64>"
         },
-        "signatures:" {
-          "<user_id>" {
-            "<algorithm>:<device_id>": "<signature_base64>"
-      } } },
-      "one_time_keys": {
-        "<algorithm>:<key_id>": "<key_base64>"
-      },
     }
+
+    response, e.g.:
+
+    {
+        "one_time_key_counts": {
+            "curve25519": 10,
+            "signed_curve25519": 20
+        }
+    }
+
     """
 
     PATTERNS = client_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -71,6 +96,13 @@ class KeyUploadServlet(RestServlet):
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
         self.device_handler = hs.get_device_handler()
 
+        if hs.config.worker.worker_app is None:
+            # if main process
+            self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
+        else:
+            # then a worker
+            self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
+
     async def on_POST(
         self, request: SynapseRequest, device_id: Optional[str]
     ) -> Tuple[int, JsonDict]:
@@ -109,8 +141,8 @@ class KeyUploadServlet(RestServlet):
                 400, "To upload keys, you must pass device_id when authenticating"
             )
 
-        result = await self.e2e_keys_handler.upload_keys_for_user(
-            user_id, device_id, body
+        result = await self.key_uploader(
+            user_id=user_id, device_id=device_id, keys=body
         )
         return 200, result
 
diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py
index 05706b598c..8adced41e5 100644
--- a/synapse/rest/client/login.py
+++ b/synapse/rest/client/login.py
@@ -350,7 +350,7 @@ class LoginRestServlet(RestServlet):
             auth_provider_session_id: The session ID got during login from the SSO IdP.
 
         Returns:
-            result: Dictionary of account information after successful login.
+            Dictionary of account information after successful login.
         """
 
         # Before we actually log them in we check if they've already logged in
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 328c0c5477..40b0d39eb2 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -344,8 +344,8 @@ class MediaRepository:
         download from remote server.
 
         Args:
-            server_name (str): Remote server_name where the media originated.
-            media_id (str): The media ID of the content (as defined by the
+            server_name: Remote server_name where the media originated.
+            media_id: The media ID of the content (as defined by the
                 remote server).
 
         Returns:
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index 9b93b9b4f6..a48a4de92a 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -138,7 +138,7 @@ class Thumbnailer:
         """Rescales the image to the given dimensions.
 
         Returns:
-            BytesIO: the bytes of the encoded image ready to be written to disk
+            The bytes of the encoded image ready to be written to disk
         """
         with self._resize(width, height) as scaled:
             return self._encode_image(scaled, output_type)
@@ -155,7 +155,7 @@ class Thumbnailer:
             max_height: The largest possible height.
 
         Returns:
-            BytesIO: the bytes of the encoded image ready to be written to disk
+            The bytes of the encoded image ready to be written to disk
         """
         if width * self.height > height * self.width:
             scaled_width = width
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 698ca742ed..94025ba41f 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -113,9 +113,8 @@ def copy_with_str_subst(x: Any, substitutions: Any) -> Any:
     """Deep-copy a structure, carrying out string substitutions on any strings
 
     Args:
-        x (object): structure to be copied
-        substitutions (object): substitutions to be made - passed into the
-            string '%' operator
+        x: structure to be copied
+        substitutions: substitutions to be made - passed into the string '%' operator
 
     Returns:
         copy of x
diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
index 3134cd2d3d..a31a2c99a7 100644
--- a/synapse/server_notices/resource_limits_server_notices.py
+++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -170,11 +170,13 @@ class ResourceLimitsServerNotices:
             room_id: The room id of the server notices room
 
         Returns:
-            bool: Is the room currently blocked
-            list: The list of pinned event IDs that are unrelated to limit blocking
-            This list can be used as a convenience in the case where the block
-            is to be lifted and the remaining pinned event references need to be
-            preserved
+            Tuple of:
+                Is the room currently blocked
+
+                The list of pinned event IDs that are unrelated to limit blocking
+                This list can be used as a convenience in the case where the block
+                is to be lifted and the remaining pinned event references need to be
+                preserved
         """
         currently_blocked = False
         pinned_state_event = None
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 6f3dd0463e..833ffec3de 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -190,6 +190,7 @@ class StateHandler:
         room_id: str,
         event_ids: Collection[str],
         state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> StateMap[str]:
         """Fetch the state after each of the given event IDs. Resolve them and return.
 
@@ -200,13 +201,18 @@ class StateHandler:
         Args:
             room_id: the room_id containing the given events.
             event_ids: the events whose state should be fetched and resolved.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at the given `event_id`s, regardless of whether `state_filter` is
+                satisfied by partial state.
 
         Returns:
             the state dict (a mapping from (event_type, state_key) -> event_id) which
             holds the resolution of the states after the given event IDs.
         """
         logger.debug("calling resolve_state_groups from compute_state_after_events")
-        ret = await self.resolve_state_groups_for_events(room_id, event_ids)
+        ret = await self.resolve_state_groups_for_events(
+            room_id, event_ids, await_full_state
+        )
         return await ret.get_state(self._state_storage_controller, state_filter)
 
     async def get_current_user_ids_in_room(
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 48976dc570..33ffef521b 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -204,9 +204,8 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
         process to to so, calling the per_item_callback for each item.
 
         Args:
-            room_id (str):
-            task (_EventPersistQueueTask): A _PersistEventsTask or
-                _UpdateCurrentStateTask to process.
+            room_id:
+            task: A _PersistEventsTask or _UpdateCurrentStateTask to process.
 
         Returns:
             the result returned by the `_per_item_callback` passed to
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 4717c9728a..0dc44b246c 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -569,15 +569,15 @@ class DatabasePool:
             retcols=["update_name"],
             desc="check_background_updates",
         )
-        updates = [x["update_name"] for x in updates]
+        background_update_names = [x["update_name"] for x in updates]
 
         for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items():
-            if update_name not in updates:
+            if update_name not in background_update_names:
                 logger.debug("Now safe to upsert in %s", table)
                 self._unsafe_to_upsert_tables.discard(table)
 
         # If there's any updates still running, reschedule to run.
-        if updates:
+        if background_update_names:
             self._clock.call_later(
                 15.0,
                 run_as_background_process,
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index c38b8a9e5a..282687ebce 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -27,7 +27,6 @@ from typing import (
 )
 
 from synapse.api.constants import AccountDataTypes
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
 from synapse.storage._base import db_to_json
 from synapse.storage.database import (
@@ -68,12 +67,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
         # to write account data. A value of `True` implies that `_account_data_id_gen`
         # is an `AbstractStreamIdGenerator` and not just a tracker.
         self._account_data_id_gen: AbstractStreamIdTracker
+        self._can_write_to_account_data = (
+            self._instance_name in hs.config.worker.writers.account_data
+        )
 
         if isinstance(database.engine, PostgresEngine):
-            self._can_write_to_account_data = (
-                self._instance_name in hs.config.worker.writers.account_data
-            )
-
             self._account_data_id_gen = MultiWriterIdGenerator(
                 db_conn=db_conn,
                 db=database,
@@ -95,21 +93,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
             # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
             # updated over replication. (Multiple writers are not supported for
             # SQLite).
-            if self._instance_name in hs.config.worker.writers.account_data:
-                self._can_write_to_account_data = True
-                self._account_data_id_gen = StreamIdGenerator(
-                    db_conn,
-                    "room_account_data",
-                    "stream_id",
-                    extra_tables=[("room_tags_revisions", "stream_id")],
-                )
-            else:
-                self._account_data_id_gen = SlavedIdTracker(
-                    db_conn,
-                    "room_account_data",
-                    "stream_id",
-                    extra_tables=[("room_tags_revisions", "stream_id")],
-                )
+            self._account_data_id_gen = StreamIdGenerator(
+                db_conn,
+                "room_account_data",
+                "stream_id",
+                extra_tables=[("room_tags_revisions", "stream_id")],
+                is_writer=self._instance_name in hs.config.worker.writers.account_data,
+            )
 
         account_max = self.get_max_account_data_stream_id()
         self._account_data_stream_cache = StreamChangeCache(
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index aa58c2adc3..57230df5ae 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -38,7 +38,6 @@ from synapse.logging.opentracing import (
     whitelisted_homeserver,
 )
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
@@ -86,28 +85,19 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
     ):
         super().__init__(database, db_conn, hs)
 
-        if hs.config.worker.worker_app is None:
-            self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
-                db_conn,
-                "device_lists_stream",
-                "stream_id",
-                extra_tables=[
-                    ("user_signature_stream", "stream_id"),
-                    ("device_lists_outbound_pokes", "stream_id"),
-                    ("device_lists_changes_in_room", "stream_id"),
-                ],
-            )
-        else:
-            self._device_list_id_gen = SlavedIdTracker(
-                db_conn,
-                "device_lists_stream",
-                "stream_id",
-                extra_tables=[
-                    ("user_signature_stream", "stream_id"),
-                    ("device_lists_outbound_pokes", "stream_id"),
-                    ("device_lists_changes_in_room", "stream_id"),
-                ],
-            )
+        # In the worker store this is an ID tracker which we overwrite in the non-worker
+        # class below that is used on the main process.
+        self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+            db_conn,
+            "device_lists_stream",
+            "stream_id",
+            extra_tables=[
+                ("user_signature_stream", "stream_id"),
+                ("device_lists_outbound_pokes", "stream_id"),
+                ("device_lists_changes_in_room", "stream_id"),
+            ],
+            is_writer=hs.config.worker.worker_app is None,
+        )
 
         # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
         # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
@@ -535,7 +525,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
             limit: Maximum number of device updates to return
 
         Returns:
-            List: List of device update tuples:
+            List of device update tuples:
                 - user_id
                 - device_id
                 - stream_id
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index af59be6b48..6240f9a75e 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -391,10 +391,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         Returns:
             A dict giving the info metadata for this backup version, with
             fields including:
-                version(str)
-                algorithm(str)
-                auth_data(object): opaque dict supplied by the client
-                etag(int): tag of the keys in the backup
+                version (str)
+                algorithm (str)
+                auth_data (object): opaque dict supplied by the client
+                etag (int): tag of the keys in the backup
         """
 
         def _get_e2e_room_keys_version_info_txn(txn: LoggingTransaction) -> JsonDict:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 2a4f58ed92..cf33e73e2b 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -412,10 +412,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         """Retrieve a number of one-time keys for a user
 
         Args:
-            user_id(str): id of user to get keys for
-            device_id(str): id of device to get keys for
-            key_ids(list[str]): list of key ids (excluding algorithm) to
-                retrieve
+            user_id: id of user to get keys for
+            device_id: id of device to get keys for
+            key_ids: list of key ids (excluding algorithm) to retrieve
 
         Returns:
             A map from (algorithm, key_id) to json string for key
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index c4acff5be6..d68f127f9b 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1279,9 +1279,10 @@ class PersistEventsStore:
         Pick the earliest non-outlier if there is one, else the earliest one.
 
         Args:
-            events_and_contexts (list[(EventBase, EventContext)]):
+            events_and_contexts:
+
         Returns:
-            list[(EventBase, EventContext)]: filtered list
+            filtered list
         """
         new_events_and_contexts: OrderedDict[
             str, Tuple[EventBase, EventContext]
@@ -1307,9 +1308,8 @@ class PersistEventsStore:
         """Update min_depth for each room
 
         Args:
-            txn (twisted.enterprise.adbapi.Connection): db connection
-            events_and_contexts (list[(EventBase, EventContext)]): events
-                we are persisting
+            txn: db connection
+            events_and_contexts: events we are persisting
         """
         depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
@@ -1580,13 +1580,11 @@ class PersistEventsStore:
         """Update all the miscellaneous tables for new events
 
         Args:
-            txn (twisted.enterprise.adbapi.Connection): db connection
-            events_and_contexts (list[(EventBase, EventContext)]): events
-                we are persisting
-            all_events_and_contexts (list[(EventBase, EventContext)]): all
-                events that we were going to persist. This includes events
-                we've already persisted, etc, that wouldn't appear in
-                events_and_context.
+            txn: db connection
+            events_and_contexts: events we are persisting
+            all_events_and_contexts: all events that we were going to persist.
+                This includes events we've already persisted, etc, that wouldn't
+                appear in events_and_context.
             inhibit_local_membership_updates: Stop the local_current_membership
                 from being updated by these events. This should be set to True
                 for backfilled events because backfilled events in the past do
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 467d20253d..01e935edef 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -59,7 +59,6 @@ from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
 )
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -213,26 +212,20 @@ class EventsWorkerStore(SQLBaseStore):
             # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
             # updated over replication. (Multiple writers are not supported for
             # SQLite).
-            if hs.get_instance_name() in hs.config.worker.writers.events:
-                self._stream_id_gen = StreamIdGenerator(
-                    db_conn,
-                    "events",
-                    "stream_ordering",
-                )
-                self._backfill_id_gen = StreamIdGenerator(
-                    db_conn,
-                    "events",
-                    "stream_ordering",
-                    step=-1,
-                    extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
-                )
-            else:
-                self._stream_id_gen = SlavedIdTracker(
-                    db_conn, "events", "stream_ordering"
-                )
-                self._backfill_id_gen = SlavedIdTracker(
-                    db_conn, "events", "stream_ordering", step=-1
-                )
+            self._stream_id_gen = StreamIdGenerator(
+                db_conn,
+                "events",
+                "stream_ordering",
+                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
+            )
+            self._backfill_id_gen = StreamIdGenerator(
+                db_conn,
+                "events",
+                "stream_ordering",
+                step=-1,
+                extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
+                is_writer=hs.get_instance_name() in hs.config.worker.writers.events,
+            )
 
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
@@ -1589,7 +1582,7 @@ class EventsWorkerStore(SQLBaseStore):
             room_id: The room ID to query.
 
         Returns:
-            dict[str:float] of complexity version to complexity.
+            Map of complexity version to complexity.
         """
         state_events = await self.get_current_state_event_counts(room_id)
 
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index efd136a864..db9a24db5e 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -217,7 +217,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
         def _reap_users(txn: LoggingTransaction, reserved_users: List[str]) -> None:
             """
             Args:
-                reserved_users (tuple): reserved users to preserve
+                reserved_users: reserved users to preserve
             """
 
             thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
@@ -370,8 +370,8 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
         should not appear in the MAU stats).
 
         Args:
-            txn (cursor):
-            user_id (str): user to add/update
+            txn:
+            user_id: user to add/update
         """
         assert (
             self._update_on_this_worker
@@ -401,7 +401,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
         add the user to the monthly active tables
 
         Args:
-            user_id(str): the user_id to query
+            user_id: the user_id to query
         """
         assert (
             self._update_on_this_worker
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 8ae10f6127..12ad44dbb3 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -30,7 +30,6 @@ from typing import (
 
 from synapse.api.errors import StoreError
 from synapse.config.homeserver import ExperimentalConfig
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import PushRulesStream
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
@@ -111,14 +110,14 @@ class PushRulesWorkerStore(
     ):
         super().__init__(database, db_conn, hs)
 
-        if hs.config.worker.worker_app is None:
-            self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
-                db_conn, "push_rules_stream", "stream_id"
-            )
-        else:
-            self._push_rules_stream_id_gen = SlavedIdTracker(
-                db_conn, "push_rules_stream", "stream_id"
-            )
+        # In the worker store this is an ID tracker which we overwrite in the non-worker
+        # class below that is used on the main process.
+        self._push_rules_stream_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+            db_conn,
+            "push_rules_stream",
+            "stream_id",
+            is_writer=hs.config.worker.worker_app is None,
+        )
 
         push_rules_prefill, push_rules_id = self.db_pool.get_cache_dict(
             db_conn,
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index 4a01562d45..fee37b9ce4 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -27,7 +27,6 @@ from typing import (
 )
 
 from synapse.push import PusherConfig, ThrottleParams
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import PushersStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
@@ -59,20 +58,15 @@ class PusherWorkerStore(SQLBaseStore):
     ):
         super().__init__(database, db_conn, hs)
 
-        if hs.config.worker.worker_app is None:
-            self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
-                db_conn,
-                "pushers",
-                "id",
-                extra_tables=[("deleted_pushers", "stream_id")],
-            )
-        else:
-            self._pushers_id_gen = SlavedIdTracker(
-                db_conn,
-                "pushers",
-                "id",
-                extra_tables=[("deleted_pushers", "stream_id")],
-            )
+        # In the worker store this is an ID tracker which we overwrite in the non-worker
+        # class below that is used on the main process.
+        self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator(
+            db_conn,
+            "pushers",
+            "id",
+            extra_tables=[("deleted_pushers", "stream_id")],
+            is_writer=hs.config.worker.worker_app is None,
+        )
 
         self.db_pool.updates.register_background_update_handler(
             "remove_deactivated_pushers",
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index fbf27497ec..a580e4bdda 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -27,7 +27,6 @@ from typing import (
 )
 
 from synapse.api.constants import EduTypes
-from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import ReceiptsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
@@ -61,6 +60,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
         hs: "HomeServer",
     ):
         self._instance_name = hs.get_instance_name()
+
+        # In the worker store this is an ID tracker which we overwrite in the non-worker
+        # class below that is used on the main process.
         self._receipts_id_gen: AbstractStreamIdTracker
 
         if isinstance(database.engine, PostgresEngine):
@@ -87,14 +89,12 @@ class ReceiptsWorkerStore(SQLBaseStore):
             # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
             # updated over replication. (Multiple writers are not supported for
             # SQLite).
-            if hs.get_instance_name() in hs.config.worker.writers.receipts:
-                self._receipts_id_gen = StreamIdGenerator(
-                    db_conn, "receipts_linearized", "stream_id"
-                )
-            else:
-                self._receipts_id_gen = SlavedIdTracker(
-                    db_conn, "receipts_linearized", "stream_id"
-                )
+            self._receipts_id_gen = StreamIdGenerator(
+                db_conn,
+                "receipts_linearized",
+                "stream_id",
+                is_writer=hs.get_instance_name() in hs.config.worker.writers.receipts,
+            )
 
         super().__init__(database, db_conn, hs)
 
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 5167089e03..31f0f2bd3d 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -953,7 +953,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         """Returns user id from threepid
 
         Args:
-            txn (cursor):
+            txn:
             medium: threepid medium e.g. email
             address: threepid address e.g. me@example.com
 
@@ -1283,8 +1283,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         """Sets an expiration date to the account with the given user ID.
 
         Args:
-             user_id (str): User ID to set an expiration date for.
-             use_delta (bool): If set to False, the expiration date for the user will be
+             user_id: User ID to set an expiration date for.
+             use_delta: If set to False, the expiration date for the user will be
                 now + validity period. If set to True, this expiration date will be a
                 random value in the [now + period - d ; now + period] range, d being a
                 delta equal to 10% of the validity period.
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 7d97f8f60e..4fbaefad73 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2057,7 +2057,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
         Args:
             report_id: ID of reported event in database
         Returns:
-            event_report: json list of information from event report
+            JSON dict of information from an event report or None if the
+            report does not exist.
         """
 
         def _get_event_report_txn(
@@ -2130,8 +2131,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             user_id: search for user_id. Ignored if user_id is None
             room_id: search for room_id. Ignored if room_id is None
         Returns:
-            event_reports: json list of event reports
-            count: total number of event reports matching the filter criteria
+            Tuple of:
+                json list of event reports
+                total number of event reports matching the filter criteria
         """
 
         def _get_event_reports_paginate_txn(
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index ddb25b5cea..698d6f7515 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -185,9 +185,8 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         - who should be in the user_directory.
 
         Args:
-            progress (dict)
-            batch_size (int): Maximum number of state events to process
-                per cycle.
+            progress
+            batch_size: Maximum number of state events to process per cycle.
 
         Returns:
             number of events processed.
@@ -708,10 +707,10 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
         Returns the rooms that a user is in.
 
         Args:
-            user_id(str): Must be a local user
+            user_id: Must be a local user
 
         Returns:
-            list: user_id
+            List of room IDs
         """
         rows = await self.db_pool.simple_select_onecol(
             table="users_who_share_private_rooms",
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 2dfe4c0b66..0d7108f01b 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -186,11 +186,13 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
         column: str,
         extra_tables: Iterable[Tuple[str, str]] = (),
         step: int = 1,
+        is_writer: bool = True,
     ) -> None:
         assert step != 0
         self._lock = threading.Lock()
         self._step: int = step
         self._current: int = _load_current_id(db_conn, table, column, step)
+        self._is_writer = is_writer
         for table, column in extra_tables:
             self._current = (max if step > 0 else min)(
                 self._current, _load_current_id(db_conn, table, column, step)
@@ -204,9 +206,11 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
         self._unfinished_ids: OrderedDict[int, int] = OrderedDict()
 
     def advance(self, instance_name: str, new_id: int) -> None:
-        # `StreamIdGenerator` should only be used when there is a single writer,
-        # so replication should never happen.
-        raise Exception("Replication is not supported by StreamIdGenerator")
+        # Advance should never be called on a writer instance, only over replication
+        if self._is_writer:
+            raise Exception("Replication is not supported by writer StreamIdGenerator")
+
+        self._current = (max if self._step > 0 else min)(self._current, new_id)
 
     def get_next(self) -> AsyncContextManager[int]:
         with self._lock:
@@ -249,6 +253,9 @@ class StreamIdGenerator(AbstractStreamIdGenerator):
         return _AsyncCtxManagerWrapper(manager())
 
     def get_current_token(self) -> int:
+        if not self._is_writer:
+            return self._current
+
         with self._lock:
             if self._unfinished_ids:
                 return next(iter(self._unfinished_ids)) - self._step
diff --git a/synapse/types.py b/synapse/types.py
index 773f0438d5..f2d436ddc3 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -143,8 +143,8 @@ class Requester:
         Requester.
 
         Args:
-            store (DataStore): Used to convert AS ID to AS object
-            input (dict): A dict produced by `serialize`
+            store: Used to convert AS ID to AS object
+            input: A dict produced by `serialize`
 
         Returns:
             Requester
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 7f1d41eb3c..d24c4f68c4 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -217,7 +217,8 @@ async def concurrently_execute(
         limit: Maximum number of conccurent executions.
 
     Returns:
-        Deferred: Resolved when all function invocations have finished.
+        None, when all function invocations have finished. The return values
+        from those functions are discarded.
     """
     it = iter(args)
 
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index f7c3a6794e..9387632d0d 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -197,7 +197,7 @@ def register_cache(
         resize_callback: A function which can be called to resize the cache.
 
     Returns:
-        CacheMetric: an object which provides inc_{hits,misses,evictions} methods
+        an object which provides inc_{hits,misses,evictions} methods
     """
     if resizable:
         if not resize_callback:
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index bcb1cba362..bf7bd351e0 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -153,7 +153,7 @@ class DeferredCache(Generic[KT, VT]):
         Args:
             key:
             callback: Gets called when the entry in the cache is invalidated
-            update_metrics (bool): whether to update the cache hit rate metrics
+            update_metrics: whether to update the cache hit rate metrics
 
         Returns:
             A Deferred which completes with the result. Note that this may later fail
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index fa91479c97..5eaf70c7ab 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -169,10 +169,11 @@ class DictionaryCache(Generic[KT, DKT, DV]):
                 if it is in the cache.
 
         Returns:
-            DictionaryEntry: If `dict_keys` is not None then `DictionaryEntry`
-            will contain include the keys that are in the cache. If None then
-            will either return the full dict if in the cache, or the empty
-            dict (with `full` set to False) if it isn't.
+            If `dict_keys` is not None then `DictionaryEntry` will contain include
+            the keys that are in the cache.
+
+            If None then will either return the full dict if in the cache, or the
+            empty dict (with `full` set to False) if it isn't.
         """
         if dict_keys is None:
             # The caller wants the full set of dictionary keys for this cache key
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index c6a5d0dfc0..01ad02af67 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -207,7 +207,7 @@ class ExpiringCache(Generic[KT, VT]):
         items from the cache.
 
         Returns:
-            bool: Whether the cache changed size or not.
+            Whether the cache changed size or not.
         """
         new_size = int(self._original_max_size * factor)
         if new_size != self._max_size:
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index aa93109d13..dcf0eac3bf 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -389,11 +389,11 @@ class LruCache(Generic[KT, VT]):
             cache_name: The name of this cache, for the prometheus metrics. If unset,
                 no metrics will be reported on this cache.
 
-            cache_type (type):
+            cache_type:
                 type of underlying cache to be used. Typically one of dict
                 or TreeCache.
 
-            size_callback (func(V) -> int | None):
+            size_callback:
 
             metrics_collection_callback:
                 metrics collection callback. This is called early in the metrics
@@ -403,7 +403,7 @@ class LruCache(Generic[KT, VT]):
 
                 Ignored if cache_name is None.
 
-            apply_cache_factor_from_config (bool): If true, `max_size` will be
+            apply_cache_factor_from_config: If true, `max_size` will be
                 multiplied by a cache factor derived from the homeserver config
 
             clock:
@@ -796,7 +796,7 @@ class LruCache(Generic[KT, VT]):
         items from the cache.
 
         Returns:
-            bool: Whether the cache changed size or not.
+            Whether the cache changed size or not.
         """
         if not self.apply_cache_factor_from_config:
             return False
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 9f64fed0d7..2aceb1a47f 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -183,7 +183,7 @@ class FederationRateLimiter:
                 # Handle request ...
 
         Args:
-            host (str): Origin of incoming request.
+            host: Origin of incoming request.
 
         Returns:
             context manager which returns a deferred.
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
index 1e9c2faa64..54bc7589fd 100644
--- a/synapse/util/threepids.py
+++ b/synapse/util/threepids.py
@@ -48,7 +48,7 @@ async def check_3pid_allowed(
         registration: whether we want to bind the 3PID as part of registering a new user.
 
     Returns:
-        bool: whether the 3PID medium/address is allowed to be added to this HS
+        whether the 3PID medium/address is allowed to be added to this HS
     """
     if not await hs.get_password_auth_provider().is_3pid_allowed(
         medium, address, registration
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 177e198e7e..b1ec7f4bd8 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -90,10 +90,10 @@ class WheelTimer(Generic[T]):
         """Fetch any objects that have timed out
 
         Args:
-            now (ms): Current time in msec
+            now: Current time in msec
 
         Returns:
-            list: List of objects that have timed out
+            List of objects that have timed out
         """
         now_key = int(now / self.bucket_size)
 
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 40a9c5b53f..b443857571 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -563,7 +563,8 @@ def get_effective_room_visibility_from_state(state: StateMap[EventBase]) -> str:
 
 async def filter_events_for_server(
     storage: StorageControllers,
-    server_name: str,
+    target_server_name: str,
+    local_server_name: str,
     events: List[EventBase],
     redact: bool = True,
     check_history_visibility_only: bool = False,
@@ -603,7 +604,7 @@ async def filter_events_for_server(
         # if the server is either in the room or has been invited
         # into the room.
         for ev in memberships.values():
-            assert get_domain_from_id(ev.state_key) == server_name
+            assert get_domain_from_id(ev.state_key) == target_server_name
 
             memtype = ev.membership
             if memtype == Membership.JOIN:
@@ -622,6 +623,24 @@ async def filter_events_for_server(
         # to no users having been erased.
         erased_senders = {}
 
+    # Filter out non-local events when we are in the middle of a partial join, since our servers
+    # list can be out of date and we could leak events to servers not in the room anymore.
+    # This can also be true for local events but we consider it to be an acceptable risk.
+
+    # We do this check as a first step and before retrieving membership events because
+    # otherwise a room could be fully joined after we retrieve those, which would then bypass
+    # this check but would base the filtering on an outdated view of the membership events.
+
+    partial_state_invisible_events = set()
+    if not check_history_visibility_only:
+        for e in events:
+            sender_domain = get_domain_from_id(e.sender)
+            if (
+                sender_domain != local_server_name
+                and await storage.main.is_partial_state_room(e.room_id)
+            ):
+                partial_state_invisible_events.add(e)
+
     # Let's check to see if all the events have a history visibility
     # of "shared" or "world_readable". If that's the case then we don't
     # need to check membership (as we know the server is in the room).
@@ -636,7 +655,7 @@ async def filter_events_for_server(
             if event_to_history_vis[e.event_id]
             not in (HistoryVisibility.SHARED, HistoryVisibility.WORLD_READABLE)
         ],
-        server_name,
+        target_server_name,
     )
 
     to_return = []
@@ -645,6 +664,10 @@ async def filter_events_for_server(
         visible = check_event_is_visible(
             event_to_history_vis[e.event_id], event_to_memberships.get(e.event_id, {})
         )
+
+        if e in partial_state_invisible_events:
+            visible = False
+
         if visible and not erased:
             to_return.append(e)
         elif redact:
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 820a1a54e2..63628aa6b0 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -469,6 +469,18 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
         keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
         self.assertEqual(keys, {})
 
+    def test_keyid_containing_forward_slash(self) -> None:
+        """We should url-encode any url unsafe chars in key ids.
+
+        Detects https://github.com/matrix-org/synapse/issues/14488.
+        """
+        fetcher = ServerKeyFetcher(self.hs)
+        self.get_success(fetcher.get_keys("example.com", ["key/potato"], 0))
+
+        self.http_client.get_json.assert_called_once()
+        args, kwargs = self.http_client.get_json.call_args
+        self.assertEqual(kwargs["path"], "/_matrix/key/v2/server/key%2Fpotato")
+
 
 class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index c96dc6caf2..c5981ff965 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -15,6 +15,7 @@
 from typing import Optional
 from unittest.mock import Mock, call
 
+from parameterized import parameterized
 from signedjson.key import generate_signing_key
 
 from synapse.api.constants import EventTypes, Membership, PresenceState
@@ -37,6 +38,7 @@ from synapse.rest.client import room
 from synapse.types import UserID, get_domain_from_id
 
 from tests import unittest
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 
 
 class PresenceUpdateTestCase(unittest.HomeserverTestCase):
@@ -505,7 +507,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         self.assertEqual(state, new_state)
 
 
-class PresenceHandlerTestCase(unittest.HomeserverTestCase):
+class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
     def prepare(self, reactor, clock, hs):
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
@@ -716,20 +718,47 @@ class PresenceHandlerTestCase(unittest.HomeserverTestCase):
         # our status message should be the same as it was before
         self.assertEqual(state.status_msg, status_msg)
 
-    def test_set_presence_from_syncing_keeps_busy(self):
-        """Test that presence set by syncing doesn't affect busy status"""
-        # while this isn't the default
-        self.presence_handler._busy_presence_enabled = True
+    @parameterized.expand([(False,), (True,)])
+    @unittest.override_config(
+        {
+            "experimental_features": {
+                "msc3026_enabled": True,
+            },
+        }
+    )
+    def test_set_presence_from_syncing_keeps_busy(self, test_with_workers: bool):
+        """Test that presence set by syncing doesn't affect busy status
 
+        Args:
+            test_with_workers: If True, check the presence state of the user by calling
+                /sync against a worker, rather than the main process.
+        """
         user_id = "@test:server"
         status_msg = "I'm busy!"
 
+        # By default, we call /sync against the main process.
+        worker_to_sync_against = self.hs
+        if test_with_workers:
+            # Create a worker and use it to handle /sync traffic instead.
+            # This is used to test that presence changes get replicated from workers
+            # to the main process correctly.
+            worker_to_sync_against = self.make_worker_hs(
+                "synapse.app.generic_worker", {"worker_name": "presence_writer"}
+            )
+
+        # Set presence to BUSY
         self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg)
 
+        # Perform a sync with a presence state other than busy. This should NOT change
+        # our presence status; we only change from busy if we explicitly set it via
+        # /presence/*.
         self.get_success(
-            self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
+            worker_to_sync_against.get_presence_handler().user_syncing(
+                user_id, True, PresenceState.ONLINE
+            )
         )
 
+        # Check against the main process that the user's presence did not change.
         state = self.get_success(
             self.presence_handler.get_state(UserID.from_string(user_id))
         )
diff --git a/tests/http/__init__.py b/tests/http/__init__.py
index e74f7f5b48..093537adef 100644
--- a/tests/http/__init__.py
+++ b/tests/http/__init__.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 import os.path
 import subprocess
+from typing import List
 
 from zope.interface import implementer
 
@@ -70,14 +71,14 @@ subjectAltName = %(sanentries)s
 """
 
 
-def create_test_cert_file(sanlist):
+def create_test_cert_file(sanlist: List[bytes]) -> str:
     """build an x509 certificate file
 
     Args:
-        sanlist: list[bytes]: a list of subjectAltName values for the cert
+        sanlist: a list of subjectAltName values for the cert
 
     Returns:
-        str: the path to the file
+        The path to the file
     """
     global cert_file_count
     csr_filename = "server.csr"
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 02cef6f876..058ca57e55 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -778,8 +778,11 @@ def _test_sending_local_online_presence_to_local_user(
             worker process. The test users will still sync with the main process. The purpose of testing
             with a worker is to check whether a Synapse module running on a worker can inform other workers/
             the main process that they should include additional presence when a user next syncs.
+            If this argument is True, `test_case` MUST be an instance of BaseMultiWorkerStreamTestCase.
     """
     if test_with_workers:
+        assert isinstance(test_case, BaseMultiWorkerStreamTestCase)
+
         # Create a worker process to make module_api calls against
         worker_hs = test_case.make_worker_hs(
             "synapse.app.generic_worker", {"worker_name": "presence_writer"}
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index 121f3d8d65..3029a16dda 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -542,8 +542,13 @@ class FakeRedisPubSubProtocol(Protocol):
             self.send("OK")
         elif command == b"GET":
             self.send(None)
+
+        # Connection keep-alives.
+        elif command == b"PING":
+            self.send("PONG")
+
         else:
-            raise Exception("Unknown command")
+            raise Exception(f"Unknown command: {command}")
 
     def send(self, msg):
         """Send a message back to the client."""
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 96f3880923..dce71f7334 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -143,6 +143,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
         self.persist(type="m.room.create", key="", creator=USER_ID)
         self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
         event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
+        assert event.internal_metadata.stream_ordering is not None
 
         self.replicate()
 
@@ -230,6 +231,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
         j2 = self.persist(
             type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
         )
+        assert j2.internal_metadata.stream_ordering is not None
         self.replicate()
 
         expected_pos = PersistedEventPosition(
@@ -287,6 +289,7 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
             )
         )
         self.replicate()
+        assert j2.internal_metadata.stream_ordering is not None
 
         event_source = RoomEventSource(self.hs)
         event_source.store = self.slaved_store
@@ -336,10 +339,10 @@ class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):
 
     event_id = 0
 
-    def persist(self, backfill=False, **kwargs):
+    def persist(self, backfill=False, **kwargs) -> FrozenEvent:
         """
         Returns:
-            synapse.events.FrozenEvent: The event that was persisted.
+            The event that was persisted.
         """
         event, context = self.build_event(**kwargs)
 
diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py
index 13aa5eb51a..96cdf2c45b 100644
--- a/tests/replication/test_multi_media_repo.py
+++ b/tests/replication/test_multi_media_repo.py
@@ -15,8 +15,9 @@ import logging
 import os
 from typing import Optional, Tuple
 
+from twisted.internet.interfaces import IOpenSSLServerConnectionCreator
 from twisted.internet.protocol import Factory
-from twisted.protocols.tls import TLSMemoryBIOFactory
+from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
 from twisted.web.http import HTTPChannel
 from twisted.web.server import Request
 
@@ -102,7 +103,7 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         # fish the test server back out of the server-side TLS protocol.
-        http_server = server_tls_protocol.wrappedProtocol
+        http_server: HTTPChannel = server_tls_protocol.wrappedProtocol  # type: ignore[assignment]
 
         # give the reactor a pump to get the TLS juices flowing.
         self.reactor.pump((0.1,))
@@ -238,16 +239,15 @@ def get_connection_factory():
     return test_server_connection_factory
 
 
-def _build_test_server(connection_creator):
+def _build_test_server(
+    connection_creator: IOpenSSLServerConnectionCreator,
+) -> TLSMemoryBIOProtocol:
     """Construct a test server
 
     This builds an HTTP channel, wrapped with a TLSMemoryBIOProtocol
 
     Args:
-        connection_creator (IOpenSSLServerConnectionCreator): thing to build
-            SSL connections
-        sanlist (list[bytes]): list of the SAN entries for the cert returned
-            by the server
+        connection_creator: thing to build SSL connections
 
     Returns:
         TLSMemoryBIOProtocol
diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py
index bf403045e9..7cbc40736c 100644
--- a/tests/server_notices/test_resource_limits_server_notices.py
+++ b/tests/server_notices/test_resource_limits_server_notices.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import Tuple
 from unittest.mock import Mock
 
 from twisted.test.proto_helpers import MemoryReactor
@@ -350,14 +351,15 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
 
         self.assertTrue(notice_in_room, "No server notice in room")
 
-    def _trigger_notice_and_join(self):
+    def _trigger_notice_and_join(self) -> Tuple[str, str, str]:
         """Creates enough active users to hit the MAU limit and trigger a system notice
         about it, then joins the system notices room with one of the users created.
 
         Returns:
-            user_id (str): The ID of the user that joined the room.
-            tok (str): The access token of the user that joined the room.
-            room_id (str): The ID of the room that's been joined.
+            A tuple of:
+                user_id: The ID of the user that joined the room.
+                tok: The access token of the user that joined the room.
+                room_id: The ID of the room that's been joined.
         """
         user_id = None
         tok = None
diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py
index 2d8d1f860f..d6a2b8d274 100644
--- a/tests/storage/test_id_generators.py
+++ b/tests/storage/test_id_generators.py
@@ -16,15 +16,157 @@ from typing import List, Optional
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.server import HomeServer
-from synapse.storage.database import DatabasePool, LoggingTransaction
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+)
 from synapse.storage.engines import IncorrectDatabaseSetup
-from synapse.storage.util.id_generators import MultiWriterIdGenerator
+from synapse.storage.types import Cursor
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
 from tests.utils import USE_POSTGRES_FOR_TESTS
 
 
+class StreamIdGeneratorTestCase(HomeserverTestCase):
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.db_pool: DatabasePool = self.store.db_pool
+
+        self.get_success(self.db_pool.runInteraction("_setup_db", self._setup_db))
+
+    def _setup_db(self, txn: LoggingTransaction) -> None:
+        txn.execute(
+            """
+            CREATE TABLE foobar (
+                stream_id BIGINT NOT NULL,
+                data TEXT
+            );
+            """
+        )
+        txn.execute("INSERT INTO foobar VALUES (123, 'hello world');")
+
+    def _create_id_generator(self) -> StreamIdGenerator:
+        def _create(conn: LoggingDatabaseConnection) -> StreamIdGenerator:
+            return StreamIdGenerator(
+                db_conn=conn,
+                table="foobar",
+                column="stream_id",
+            )
+
+        return self.get_success_or_raise(self.db_pool.runWithConnection(_create))
+
+    def test_initial_value(self) -> None:
+        """Check that we read the current token from the DB."""
+        id_gen = self._create_id_generator()
+        self.assertEqual(id_gen.get_current_token(), 123)
+
+    def test_single_gen_next(self) -> None:
+        """Check that we correctly increment the current token from the DB."""
+        id_gen = self._create_id_generator()
+
+        async def test_gen_next() -> None:
+            async with id_gen.get_next() as next_id:
+                # We haven't persisted `next_id` yet; current token is still 123
+                self.assertEqual(id_gen.get_current_token(), 123)
+                # But we did learn what the next value is
+                self.assertEqual(next_id, 124)
+
+            # Once the context manager closes we assume that the `next_id` has been
+            # written to the DB.
+            self.assertEqual(id_gen.get_current_token(), 124)
+
+        self.get_success(test_gen_next())
+
+    def test_multiple_gen_nexts(self) -> None:
+        """Check that we handle overlapping calls to gen_next sensibly."""
+        id_gen = self._create_id_generator()
+
+        async def test_gen_next() -> None:
+            ctx1 = id_gen.get_next()
+            ctx2 = id_gen.get_next()
+            ctx3 = id_gen.get_next()
+
+            # Request three new stream IDs.
+            self.assertEqual(await ctx1.__aenter__(), 124)
+            self.assertEqual(await ctx2.__aenter__(), 125)
+            self.assertEqual(await ctx3.__aenter__(), 126)
+
+            # None are persisted: current token unchanged.
+            self.assertEqual(id_gen.get_current_token(), 123)
+
+            # Persist each in turn.
+            await ctx1.__aexit__(None, None, None)
+            self.assertEqual(id_gen.get_current_token(), 124)
+            await ctx2.__aexit__(None, None, None)
+            self.assertEqual(id_gen.get_current_token(), 125)
+            await ctx3.__aexit__(None, None, None)
+            self.assertEqual(id_gen.get_current_token(), 126)
+
+        self.get_success(test_gen_next())
+
+    def test_multiple_gen_nexts_closed_in_different_order(self) -> None:
+        """Check that we handle overlapping calls to gen_next, even when their IDs
+        created and persisted in different orders."""
+        id_gen = self._create_id_generator()
+
+        async def test_gen_next() -> None:
+            ctx1 = id_gen.get_next()
+            ctx2 = id_gen.get_next()
+            ctx3 = id_gen.get_next()
+
+            # Request three new stream IDs.
+            self.assertEqual(await ctx1.__aenter__(), 124)
+            self.assertEqual(await ctx2.__aenter__(), 125)
+            self.assertEqual(await ctx3.__aenter__(), 126)
+
+            # None are persisted: current token unchanged.
+            self.assertEqual(id_gen.get_current_token(), 123)
+
+            # Persist them in a different order, starting with 126 from ctx3.
+            await ctx3.__aexit__(None, None, None)
+            # We haven't persisted 124 from ctx1 yet---current token is still 123.
+            self.assertEqual(id_gen.get_current_token(), 123)
+
+            # Now persist 124 from ctx1.
+            await ctx1.__aexit__(None, None, None)
+            # Current token is then 124, waiting for 125 to be persisted.
+            self.assertEqual(id_gen.get_current_token(), 124)
+
+            # Finally persist 125 from ctx2.
+            await ctx2.__aexit__(None, None, None)
+            # Current token is then 126 (skipping over 125).
+            self.assertEqual(id_gen.get_current_token(), 126)
+
+        self.get_success(test_gen_next())
+
+    def test_gen_next_while_still_waiting_for_persistence(self) -> None:
+        """Check that we handle overlapping calls to gen_next."""
+        id_gen = self._create_id_generator()
+
+        async def test_gen_next() -> None:
+            ctx1 = id_gen.get_next()
+            ctx2 = id_gen.get_next()
+            ctx3 = id_gen.get_next()
+
+            # Request two new stream IDs.
+            self.assertEqual(await ctx1.__aenter__(), 124)
+            self.assertEqual(await ctx2.__aenter__(), 125)
+
+            # Persist ctx2 first.
+            await ctx2.__aexit__(None, None, None)
+            # Still waiting on ctx1's ID to be persisted.
+            self.assertEqual(id_gen.get_current_token(), 123)
+
+            # Now request a third stream ID. It should be 126 (the smallest ID that
+            # we've not yet handed out.)
+            self.assertEqual(await ctx3.__aenter__(), 126)
+
+        self.get_success(test_gen_next())
+
+
 class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
     if not USE_POSTGRES_FOR_TESTS:
         skip = "Requires Postgres"
@@ -48,9 +190,9 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
         )
 
     def _create_id_generator(
-        self, instance_name="master", writers: Optional[List[str]] = None
+        self, instance_name: str = "master", writers: Optional[List[str]] = None
     ) -> MultiWriterIdGenerator:
-        def _create(conn):
+        def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator:
             return MultiWriterIdGenerator(
                 conn,
                 self.db_pool,
@@ -446,7 +588,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
         self._insert_row_with_id("master", 3)
 
         # Now we add a row *without* updating the stream ID
-        def _insert(txn):
+        def _insert(txn: Cursor) -> None:
             txn.execute("INSERT INTO foobar VALUES (26, 'master')")
 
         self.get_success(self.db_pool.runInteraction("_insert", _insert))
@@ -481,9 +623,9 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
         )
 
     def _create_id_generator(
-        self, instance_name="master", writers: Optional[List[str]] = None
+        self, instance_name: str = "master", writers: Optional[List[str]] = None
     ) -> MultiWriterIdGenerator:
-        def _create(conn):
+        def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator:
             return MultiWriterIdGenerator(
                 conn,
                 self.db_pool,
@@ -617,9 +759,9 @@ class MultiTableMultiWriterIdGeneratorTestCase(HomeserverTestCase):
         )
 
     def _create_id_generator(
-        self, instance_name="master", writers: Optional[List[str]] = None
+        self, instance_name: str = "master", writers: Optional[List[str]] = None
     ) -> MultiWriterIdGenerator:
-        def _create(conn):
+        def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator:
             return MultiWriterIdGenerator(
                 conn,
                 self.db_pool,
@@ -641,7 +783,7 @@ class MultiTableMultiWriterIdGeneratorTestCase(HomeserverTestCase):
         instance_name: str,
         number: int,
         update_stream_table: bool = True,
-    ):
+    ) -> None:
         """Insert N rows as the given instance, inserting with stream IDs pulled
         from the postgres sequence.
         """
diff --git a/tests/test_visibility.py b/tests/test_visibility.py
index c385b2f8d4..d0b9ad5454 100644
--- a/tests/test_visibility.py
+++ b/tests/test_visibility.py
@@ -61,7 +61,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase):
 
         filtered = self.get_success(
             filter_events_for_server(
-                self._storage_controllers, "test_server", events_to_filter
+                self._storage_controllers, "test_server", "hs", events_to_filter
             )
         )
 
@@ -83,7 +83,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase):
         self.assertEqual(
             self.get_success(
                 filter_events_for_server(
-                    self._storage_controllers, "remote_hs", [outlier]
+                    self._storage_controllers, "remote_hs", "hs", [outlier]
                 )
             ),
             [outlier],
@@ -94,7 +94,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase):
 
         filtered = self.get_success(
             filter_events_for_server(
-                self._storage_controllers, "remote_hs", [outlier, evt]
+                self._storage_controllers, "remote_hs", "local_hs", [outlier, evt]
             )
         )
         self.assertEqual(len(filtered), 2, f"expected 2 results, got: {filtered}")
@@ -106,7 +106,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase):
         # be redacted)
         filtered = self.get_success(
             filter_events_for_server(
-                self._storage_controllers, "other_server", [outlier, evt]
+                self._storage_controllers, "other_server", "local_hs", [outlier, evt]
             )
         )
         self.assertEqual(filtered[0], outlier)
@@ -141,7 +141,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase):
         # ... and the filtering happens.
         filtered = self.get_success(
             filter_events_for_server(
-                self._storage_controllers, "test_server", events_to_filter
+                self._storage_controllers, "test_server", "local_hs", events_to_filter
             )
         )
 
diff --git a/tests/unittest.py b/tests/unittest.py
index 5116be338e..a120c2976c 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -360,13 +360,13 @@ class HomeserverTestCase(TestCase):
                 store.db_pool.updates.do_next_background_update(False), by=0.1
             )
 
-    def make_homeserver(self, reactor, clock):
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock):
         """
         Make and return a homeserver.
 
         Args:
             reactor: A Twisted Reactor, or something that pretends to be one.
-            clock (synapse.util.Clock): The Clock, associated with the reactor.
+            clock: The Clock, associated with the reactor.
 
         Returns:
             A homeserver suitable for testing.
@@ -426,9 +426,8 @@ class HomeserverTestCase(TestCase):
 
         Args:
             reactor: A Twisted Reactor, or something that pretends to be one.
-            clock (synapse.util.Clock): The Clock, associated with the reactor.
-            homeserver (synapse.server.HomeServer): The HomeServer to test
-            against.
+            clock: The Clock, associated with the reactor.
+            homeserver: The HomeServer to test against.
 
         Function to optionally be overridden in subclasses.
         """
@@ -452,11 +451,10 @@ class HomeserverTestCase(TestCase):
         given content.
 
         Args:
-            method (bytes/unicode): The HTTP request method ("verb").
-            path (bytes/unicode): The HTTP path, suitably URL encoded (e.g.
-            escaped UTF-8 & spaces and such).
-            content (bytes or dict): The body of the request. JSON-encoded, if
-            a dict.
+            method: The HTTP request method ("verb").
+            path: The HTTP path, suitably URL encoded (e.g. escaped UTF-8 & spaces
+                and such). content (bytes or dict): The body of the request.
+                JSON-encoded, if a dict.
             shorthand: Whether to try and be helpful and prefix the given URL
             with the usual REST API path, if it doesn't contain it.
             federation_auth_origin: if set to not-None, we will add a fake