summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--Cargo.lock4
-rw-r--r--changelog.d/14629.feature1
-rw-r--r--changelog.d/14667.doc1
-rw-r--r--changelog.d/14747.feature1
-rw-r--r--changelog.d/14749.misc1
-rw-r--r--changelog.d/14773.doc1
-rw-r--r--changelog.d/14775.feature1
-rw-r--r--changelog.d/14799.bugfix1
-rw-r--r--changelog.d/14803.doc1
-rw-r--r--changelog.d/14804.misc1
-rw-r--r--changelog.d/14807.misc1
-rw-r--r--changelog.d/14811.feature1
-rw-r--r--changelog.d/14812.bugfix1
-rw-r--r--changelog.d/14816.misc1
-rw-r--r--changelog.d/14818.doc1
-rw-r--r--changelog.d/14819.misc1
-rw-r--r--changelog.d/14821.misc1
-rw-r--r--changelog.d/14822.misc1
-rw-r--r--changelog.d/14824.doc1
-rw-r--r--changelog.d/14825.misc1
-rw-r--r--changelog.d/14826.misc1
-rw-r--r--changelog.d/14832.misc1
-rw-r--r--changelog.d/14833.misc1
-rw-r--r--changelog.d/14839.feature1
-rw-r--r--changelog.d/14841.misc1
-rw-r--r--changelog.d/14842.bugfix1
-rw-r--r--changelog.d/14843.misc1
-rw-r--r--changelog.d/14845.doc1
-rw-r--r--changelog.d/14848.misc1
-rw-r--r--changelog.d/14855.misc1
-rw-r--r--changelog.d/14856.misc1
-rw-r--r--contrib/workers-bash-scripts/create-multiple-generic-workers.md6
-rw-r--r--contrib/workers-bash-scripts/create-multiple-stream-writers.md10
-rw-r--r--docker/complement/conf/workers-shared-extra.yaml.j24
-rw-r--r--docs/application_services.md1
-rw-r--r--docs/code_style.md15
-rw-r--r--docs/systemd-with-workers/workers/event_persister.yaml1
-rw-r--r--docs/systemd-with-workers/workers/generic_worker.yaml3
-rw-r--r--docs/systemd-with-workers/workers/media_worker.yaml1
-rw-r--r--docs/upgrade.md13
-rw-r--r--docs/usage/administration/request_log.md4
-rw-r--r--docs/usage/configuration/config_documentation.md35
-rw-r--r--mypy.ini5
-rw-r--r--pyproject.toml8
-rwxr-xr-xscripts-dev/complement.sh2
-rwxr-xr-xscripts-dev/database-save.sh1
-rwxr-xr-xscripts-dev/lint.sh33
-rw-r--r--stubs/sortedcontainers/sortedlist.pyi1
-rw-r--r--stubs/sortedcontainers/sortedset.pyi2
-rw-r--r--stubs/synapse/synapse_rust/push.pyi17
-rw-r--r--synapse/api/constants.py1
-rw-r--r--synapse/app/generic_worker.py3
-rw-r--r--synapse/app/homeserver.py3
-rw-r--r--synapse/config/_base.pyi10
-rw-r--r--synapse/config/experimental.py24
-rw-r--r--synapse/crypto/keyring.py61
-rw-r--r--synapse/events/utils.py31
-rw-r--r--synapse/federation/federation_client.py27
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/transport/client.py37
-rw-r--r--synapse/federation/transport/server/federation.py10
-rw-r--r--synapse/handlers/account_data.py7
-rw-r--r--synapse/handlers/device.py11
-rw-r--r--synapse/handlers/initial_sync.py8
-rw-r--r--synapse/handlers/message.py21
-rw-r--r--synapse/handlers/presence.py5
-rw-r--r--synapse/handlers/sync.py11
-rw-r--r--synapse/module_api/__init__.py27
-rw-r--r--synapse/replication/tcp/client.py13
-rw-r--r--synapse/replication/tcp/handler.py3
-rw-r--r--synapse/replication/tcp/streams/__init__.py6
-rw-r--r--synapse/replication/tcp/streams/_base.py123
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/storage/databases/main/account_data.py6
-rw-r--r--synapse/storage/databases/main/devices.py13
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py12
-rw-r--r--synapse/storage/databases/main/tags.py54
-rw-r--r--synapse/storage/schema/main/delta/73/24_events_jump_to_date_index.sql17
-rw-r--r--synapse/storage/schema/main/delta/73/25drop_presence.sql17
-rw-r--r--synapse/util/ratelimitutils.py26
-rw-r--r--tests/federation/test_federation_server.py5
-rw-r--r--tests/federation/transport/test_client.py81
-rw-r--r--tests/module_api/test_api.py18
-rw-r--r--tests/push/test_bulk_push_rule_evaluator.py85
-rw-r--r--tests/push/test_email.py46
-rw-r--r--tests/push/test_http.py2
-rw-r--r--tests/push/test_presentable_names.py44
-rw-r--r--tests/push/test_push_rule_evaluator.py26
-rw-r--r--tests/replication/tcp/test_handler.py78
-rw-r--r--tests/rest/client/test_relations.py185
-rw-r--r--tests/storage/test_event_push_actions.py6
-rw-r--r--tests/util/test_ratelimitutils.py39
93 files changed, 1007 insertions, 400 deletions
diff --git a/.gitignore b/.gitignore
index 2b09bddf18..6937de88bc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -69,3 +69,6 @@ book/
 
 # Poetry will create a setup.py, which we don't want to include.
 /setup.py
+
+# Don't include users' poetry configs
+/poetry.toml
diff --git a/Cargo.lock b/Cargo.lock
index ace6a8c50a..079a3f854e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -294,9 +294,9 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.7.0"
+version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a"
+checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733"
 dependencies = [
  "aho-corasick",
  "memchr",
diff --git a/changelog.d/14629.feature b/changelog.d/14629.feature
new file mode 100644
index 0000000000..78f5fc2403
--- /dev/null
+++ b/changelog.d/14629.feature
@@ -0,0 +1 @@
+Adds a `set_displayname()` method to the module API for setting a user's display name.
diff --git a/changelog.d/14667.doc b/changelog.d/14667.doc
new file mode 100644
index 0000000000..86d6288121
--- /dev/null
+++ b/changelog.d/14667.doc
@@ -0,0 +1 @@
+Include `x_forwarded` entry in the HTTP listener example configs and remove the remaining `worker_main_http_uri` entries.
diff --git a/changelog.d/14747.feature b/changelog.d/14747.feature
new file mode 100644
index 0000000000..0b8066159c
--- /dev/null
+++ b/changelog.d/14747.feature
@@ -0,0 +1 @@
+Add a dedicated listener configuration for `health` endpoint.
\ No newline at end of file
diff --git a/changelog.d/14749.misc b/changelog.d/14749.misc
new file mode 100644
index 0000000000..ff81325225
--- /dev/null
+++ b/changelog.d/14749.misc
@@ -0,0 +1 @@
+Faster remote room joins (worker mode): do not populate external hosts-in-room cache when sending events as this requires blocking for full state.
\ No newline at end of file
diff --git a/changelog.d/14773.doc b/changelog.d/14773.doc
new file mode 100644
index 0000000000..0992444be0
--- /dev/null
+++ b/changelog.d/14773.doc
@@ -0,0 +1 @@
+Remove duplicate commands from the Code Style documentation page; point to the Contributing Guide instead.
\ No newline at end of file
diff --git a/changelog.d/14775.feature b/changelog.d/14775.feature
new file mode 100644
index 0000000000..7b7ee42cac
--- /dev/null
+++ b/changelog.d/14775.feature
@@ -0,0 +1 @@
+Implement support for MSC3890: Remotely silence local notifications.
\ No newline at end of file
diff --git a/changelog.d/14799.bugfix b/changelog.d/14799.bugfix
new file mode 100644
index 0000000000..dc867bd93a
--- /dev/null
+++ b/changelog.d/14799.bugfix
@@ -0,0 +1 @@
+Add index to improve performance of the `/timestamp_to_event` endpoint used for jumping to a specific date in the timeline of a room.
\ No newline at end of file
diff --git a/changelog.d/14803.doc b/changelog.d/14803.doc
new file mode 100644
index 0000000000..30d8ec8dbc
--- /dev/null
+++ b/changelog.d/14803.doc
@@ -0,0 +1 @@
+Add missing documentation for `tag` to `listeners` section.
\ No newline at end of file
diff --git a/changelog.d/14804.misc b/changelog.d/14804.misc
new file mode 100644
index 0000000000..24302332bd
--- /dev/null
+++ b/changelog.d/14804.misc
@@ -0,0 +1 @@
+Add some clarifying comments and refactor a portion of the `Keyring` class for readability.
\ No newline at end of file
diff --git a/changelog.d/14807.misc b/changelog.d/14807.misc
new file mode 100644
index 0000000000..eef9cd3a44
--- /dev/null
+++ b/changelog.d/14807.misc
@@ -0,0 +1 @@
+Add local poetry config files (`poetry.toml`) to `.gitignore`.
\ No newline at end of file
diff --git a/changelog.d/14811.feature b/changelog.d/14811.feature
new file mode 100644
index 0000000000..87542835c3
--- /dev/null
+++ b/changelog.d/14811.feature
@@ -0,0 +1 @@
+Per [MSC3925](https://github.com/matrix-org/matrix-spec-proposals/pull/3925), bundle the whole of the replacement with any edited events, and optionally inhibit server-side replacement.
diff --git a/changelog.d/14812.bugfix b/changelog.d/14812.bugfix
new file mode 100644
index 0000000000..94e0d70cbc
--- /dev/null
+++ b/changelog.d/14812.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where Synapse would exhaust the stack when processing many federation requests where the remote homeserver has disconencted early.
diff --git a/changelog.d/14816.misc b/changelog.d/14816.misc
new file mode 100644
index 0000000000..d44571b731
--- /dev/null
+++ b/changelog.d/14816.misc
@@ -0,0 +1 @@
+Add missing type hints.
diff --git a/changelog.d/14818.doc b/changelog.d/14818.doc
new file mode 100644
index 0000000000..7a47cc8ab3
--- /dev/null
+++ b/changelog.d/14818.doc
@@ -0,0 +1 @@
+Updated documentation in configuration manual for `user_directory.search_all_users`.
\ No newline at end of file
diff --git a/changelog.d/14819.misc b/changelog.d/14819.misc
new file mode 100644
index 0000000000..9c568dbc9c
--- /dev/null
+++ b/changelog.d/14819.misc
@@ -0,0 +1 @@
+Refactor push tests.
diff --git a/changelog.d/14821.misc b/changelog.d/14821.misc
new file mode 100644
index 0000000000..99e4e5e8a1
--- /dev/null
+++ b/changelog.d/14821.misc
@@ -0,0 +1 @@
+Re-enable some linting that was disabled when we switched to ruff.
diff --git a/changelog.d/14822.misc b/changelog.d/14822.misc
new file mode 100644
index 0000000000..5e02cc8488
--- /dev/null
+++ b/changelog.d/14822.misc
@@ -0,0 +1 @@
+Add `cargo fmt` and `cargo clippy` to the lint script.
\ No newline at end of file
diff --git a/changelog.d/14824.doc b/changelog.d/14824.doc
new file mode 100644
index 0000000000..172d37baf2
--- /dev/null
+++ b/changelog.d/14824.doc
@@ -0,0 +1 @@
+Add `worker_manhole` to configuration manual.
\ No newline at end of file
diff --git a/changelog.d/14825.misc b/changelog.d/14825.misc
new file mode 100644
index 0000000000..64312ac09e
--- /dev/null
+++ b/changelog.d/14825.misc
@@ -0,0 +1 @@
+Drop unused table `presence`.
\ No newline at end of file
diff --git a/changelog.d/14826.misc b/changelog.d/14826.misc
new file mode 100644
index 0000000000..e80673a721
--- /dev/null
+++ b/changelog.d/14826.misc
@@ -0,0 +1 @@
+Merge the two account data and the two device list replication streams.
diff --git a/changelog.d/14832.misc b/changelog.d/14832.misc
new file mode 100644
index 0000000000..61e7401e43
--- /dev/null
+++ b/changelog.d/14832.misc
@@ -0,0 +1 @@
+Faster joins: use stable identifiers from [MSC3706](https://github.com/matrix-org/matrix-spec-proposals/pull/3706).
diff --git a/changelog.d/14833.misc b/changelog.d/14833.misc
new file mode 100644
index 0000000000..e80673a721
--- /dev/null
+++ b/changelog.d/14833.misc
@@ -0,0 +1 @@
+Merge the two account data and the two device list replication streams.
diff --git a/changelog.d/14839.feature b/changelog.d/14839.feature
new file mode 100644
index 0000000000..a4206be007
--- /dev/null
+++ b/changelog.d/14839.feature
@@ -0,0 +1 @@
+Faster joins: always serve a partial join response to servers that request it with the stable query param.
diff --git a/changelog.d/14841.misc b/changelog.d/14841.misc
new file mode 100644
index 0000000000..61e7401e43
--- /dev/null
+++ b/changelog.d/14841.misc
@@ -0,0 +1 @@
+Faster joins: use stable identifiers from [MSC3706](https://github.com/matrix-org/matrix-spec-proposals/pull/3706).
diff --git a/changelog.d/14842.bugfix b/changelog.d/14842.bugfix
new file mode 100644
index 0000000000..94e0d70cbc
--- /dev/null
+++ b/changelog.d/14842.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where Synapse would exhaust the stack when processing many federation requests where the remote homeserver has disconencted early.
diff --git a/changelog.d/14843.misc b/changelog.d/14843.misc
new file mode 100644
index 0000000000..bec3c216bc
--- /dev/null
+++ b/changelog.d/14843.misc
@@ -0,0 +1 @@
+Add a parameter to control whether the federation client performs a partial state join.
diff --git a/changelog.d/14845.doc b/changelog.d/14845.doc
new file mode 100644
index 0000000000..dd969aa05c
--- /dev/null
+++ b/changelog.d/14845.doc
@@ -0,0 +1 @@
+Fix the example config missing the `id` field in [application service documentation](https://matrix-org.github.io/synapse/latest/application_services.html).
diff --git a/changelog.d/14848.misc b/changelog.d/14848.misc
new file mode 100644
index 0000000000..32aa6c9bc8
--- /dev/null
+++ b/changelog.d/14848.misc
@@ -0,0 +1 @@
+Bump regex from 1.7.0 to 1.7.1.
diff --git a/changelog.d/14855.misc b/changelog.d/14855.misc
new file mode 100644
index 0000000000..f0e292f287
--- /dev/null
+++ b/changelog.d/14855.misc
@@ -0,0 +1 @@
+Add an early return when handling no-op presence updates.
diff --git a/changelog.d/14856.misc b/changelog.d/14856.misc
new file mode 100644
index 0000000000..3731d6cbf1
--- /dev/null
+++ b/changelog.d/14856.misc
@@ -0,0 +1 @@
+Fix `wait_for_stream_position` to correctly wait for the right instance to advance its token.
diff --git a/contrib/workers-bash-scripts/create-multiple-generic-workers.md b/contrib/workers-bash-scripts/create-multiple-generic-workers.md
index c9be707b3c..63d0038a7d 100644
--- a/contrib/workers-bash-scripts/create-multiple-generic-workers.md
+++ b/contrib/workers-bash-scripts/create-multiple-generic-workers.md
@@ -15,19 +15,19 @@ worker_name: generic_worker$i
 worker_replication_host: 127.0.0.1
 worker_replication_http_port: 9093
 
-worker_main_http_uri: http://localhost:8008/
-
 worker_listeners:
   - type: http
     port: 808$i
+    x_forwarded: true
     resources:
       - names: [client, federation]
 
 worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml
+#worker_pid_file: DATADIR/generic_worker$i.pid
 EOF
 done
 ```
 
 This would create five generic workers with a unique `worker_name` field in each file and listening on ports 8081-8085.
 
-Customise the script to your needs.
+Customise the script to your needs. Note that `worker_pid_file` is required if `worker_daemonize` is `true`. Uncomment and/or modify the line if needed.
diff --git a/contrib/workers-bash-scripts/create-multiple-stream-writers.md b/contrib/workers-bash-scripts/create-multiple-stream-writers.md
index 0d2ca780a6..efa5dea305 100644
--- a/contrib/workers-bash-scripts/create-multiple-stream-writers.md
+++ b/contrib/workers-bash-scripts/create-multiple-stream-writers.md
@@ -8,7 +8,9 @@ It also prints out the example lines for Synapse main configuration file.
 
 Remember to route necessary endpoints directly to a worker associated with it.
 
-If you run the script as-is, it will create workers with the replication listener starting from port 8034 and another, regular http listener starting from 8044. If you don't need all of the stream writers listed in the script, just remove them from the ```STREAM_WRITERS``` array.
+If you run the script as-is, it will create workers with the replication listener starting from port 8034 and another, regular http listener starting from 8044. If you don't need all of the stream writers listed in the script, just remove them from the ```STREAM_WRITERS``` array. 
+
+Hint: Note that `worker_pid_file` is required if `worker_daemonize` is `true`. Uncomment and/or modify the line if needed.
 
 ```sh
 #!/bin/bash
@@ -46,9 +48,11 @@ worker_listeners:
 
   - type: http
     port: $(expr $HTTP_START_PORT + $i)
+    x_forwarded: true
     resources:
       - names: [client]
 
+#worker_pid_file: DATADIR/${STREAM_WRITERS[$i]}.pid
 worker_log_config: /etc/matrix-synapse/stream-writer-log.yaml
 EOF
 HOMESERVER_YAML_INSTANCE_MAP+=$"  ${STREAM_WRITERS[$i]}_stream_writer:
@@ -91,7 +95,9 @@ Simply run the script to create YAML files in the current folder and print out t
 
 ```console
 $ ./create_stream_writers.sh
-
+```
+You should receive an output similar to the following:
+```console
 # Add these lines to your homeserver.yaml.
 # Don't forget to configure your reverse proxy and
 # necessary endpoints to their respective worker.
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2
index cb839fed07..7e9ec23808 100644
--- a/docker/complement/conf/workers-shared-extra.yaml.j2
+++ b/docker/complement/conf/workers-shared-extra.yaml.j2
@@ -94,14 +94,14 @@ allow_device_name_lookup_over_federation: true
 experimental_features:
   # Enable history backfilling support
   msc2716_enabled: true
-  # server-side support for partial state in /send_join responses
-  msc3706_enabled: true
   {% if not workers_in_use %}
   # client-side support for partial state in /send_join responses
   faster_joins: true
   {% endif %}
   # Filtering /messages by relation type.
   msc3874_enabled: true
+  # Enable deleting device-specific notification settings stored in account data
+  msc3890_enabled: true
   # Enable removing account data support
   msc3391_enabled: true
 
diff --git a/docs/application_services.md b/docs/application_services.md
index e4592010a2..1f988185a9 100644
--- a/docs/application_services.md
+++ b/docs/application_services.md
@@ -15,6 +15,7 @@ app_service_config_files:
 The format of the AS configuration file is as follows:
 
 ```yaml
+id: <your-AS-id>
 url: <base url of AS>
 as_token: <token AS will add to requests to HS>
 hs_token: <token HS will add to requests to AS>
diff --git a/docs/code_style.md b/docs/code_style.md
index 3aa7d0d741..026001b8a3 100644
--- a/docs/code_style.md
+++ b/docs/code_style.md
@@ -13,23 +13,14 @@ The necessary tools are:
 - [ruff](https://github.com/charliermarsh/ruff), which can spot common errors; and
 - [mypy](https://mypy.readthedocs.io/en/stable/), a type checker.
 
-Install them with:
-
-```sh
-pip install -e ".[lint,mypy]"
-```
-
-The easiest way to run the lints is to invoke the linter script as follows.
-
-```sh
-scripts-dev/lint.sh
-```
+See [the contributing guide](development/contributing_guide.md#run-the-linters) for instructions
+on how to install the above tools and run the linters.
 
 It's worth noting that modern IDEs and text editors can run these tools
 automatically on save. It may be worth looking into whether this
 functionality is supported in your editor for a more convenient
 development workflow. It is not, however, recommended to run `mypy`
-on save as they take a while and can be very resource intensive.
+on save as it takes a while and can be very resource intensive.
 
 ## General rules
 
diff --git a/docs/systemd-with-workers/workers/event_persister.yaml b/docs/systemd-with-workers/workers/event_persister.yaml
index 9bc6997bad..c11d5897b1 100644
--- a/docs/systemd-with-workers/workers/event_persister.yaml
+++ b/docs/systemd-with-workers/workers/event_persister.yaml
@@ -17,6 +17,7 @@ worker_listeners:
   #
   #- type: http
   #  port: 8035
+  #  x_forwarded: true
   #  resources:
   #    - names: [client]
 
diff --git a/docs/systemd-with-workers/workers/generic_worker.yaml b/docs/systemd-with-workers/workers/generic_worker.yaml
index 6e7b60886e..a858f99ed1 100644
--- a/docs/systemd-with-workers/workers/generic_worker.yaml
+++ b/docs/systemd-with-workers/workers/generic_worker.yaml
@@ -5,11 +5,10 @@ worker_name: generic_worker1
 worker_replication_host: 127.0.0.1
 worker_replication_http_port: 9093
 
-worker_main_http_uri: http://localhost:8008/
-
 worker_listeners:
   - type: http
     port: 8083
+    x_forwarded: true
     resources:
       - names: [client, federation]
 
diff --git a/docs/systemd-with-workers/workers/media_worker.yaml b/docs/systemd-with-workers/workers/media_worker.yaml
index eb34d12492..8ad046f11a 100644
--- a/docs/systemd-with-workers/workers/media_worker.yaml
+++ b/docs/systemd-with-workers/workers/media_worker.yaml
@@ -8,6 +8,7 @@ worker_replication_http_port: 9093
 worker_listeners:
   - type: http
     port: 8085
+    x_forwarded: true
     resources:
       - names: [media]
 
diff --git a/docs/upgrade.md b/docs/upgrade.md
index c4bc5889a9..270c33b656 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -88,6 +88,19 @@ process, for example:
     dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
     ```
 
+# Upgrading to v1.76.0
+
+## Changes to the account data replication streams
+
+Synapse has changed the format of the account data and devices replication
+streams (between workers). This is a forwards- and backwards-incompatible
+change: v1.75 workers cannot process account data replicated by v1.76 workers,
+and vice versa.
+
+Once all workers are upgraded to v1.76 (or downgraded to v1.75), account data
+and device replication will resume as normal.
+
+
 # Upgrading to v1.74.0
 
 ## Unicode support in user search
diff --git a/docs/usage/administration/request_log.md b/docs/usage/administration/request_log.md
index 7dd9969d86..292e3449f1 100644
--- a/docs/usage/administration/request_log.md
+++ b/docs/usage/administration/request_log.md
@@ -10,10 +10,10 @@ See the following for how to decode the dense data available from the default lo
 ```
 
 
-| Part  | Explanation | 
+| Part  | Explanation |
 | ----- | ------------ |
 | AAAA  | Timestamp request was logged (not received) |
-| BBBB  | Logger name (`synapse.access.(http\|https).<tag>`, where 'tag' is defined in the `listeners` config section, normally the port) |
+| BBBB  | Logger name (`synapse.access.(http\|https).<tag>`, where 'tag' is defined in the [`listeners`](../configuration/config_documentation.md#listeners) config section, normally the port) |
 | CCCC  | Line number in code |
 | DDDD  | Log Level |
 | EEEE  | Request Identifier (This identifier is shared by related log lines)|
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 93d6c7fb02..3481e866f7 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -422,6 +422,10 @@ Sub-options for each listener include:
 
 * `port`: the TCP port to bind to.
 
+* `tag`: An alias for the port in the logger name. If set the tag is logged instead
+of the port. Default to `None`, is optional and only valid for listener with `type: http`.
+See the docs [request log format](../administration/request_log.md).
+
 * `bind_addresses`: a list of local addresses to listen on. The default is
        'all local interfaces'.
 
@@ -476,6 +480,12 @@ Valid resource names are:
 
 * `static`: static resources under synapse/static (/_matrix/static). (Mostly useful for 'fallback authentication'.)
 
+* `health`: the [health check endpoint](../../reverse_proxy.md#health-check-endpoint). This endpoint
+  is by default active for all other resources and does not have to be activated separately.
+  This is only useful if you want to use the health endpoint explicitly on a dedicated port or
+  for [workers](../../workers.md) and containers without listener e.g.
+  [application services](../../workers.md#notifying-application-services).
+
 Example configuration #1:
 ```yaml
 listeners:
@@ -3462,8 +3472,8 @@ This setting defines options related to the user directory.
 This option has the following sub-options:
 * `enabled`:  Defines whether users can search the user directory. If false then
    empty responses are returned to all queries. Defaults to true.
-* `search_all_users`: Defines whether to search all users visible to your HS when searching
-   the user directory. If false, search results will only contain users
+* `search_all_users`: Defines whether to search all users visible to your HS at the time the search is performed. If set to true, will return all users who share a room with the user from the homeserver.
+   If false, search results will only contain users
     visible in public rooms and users sharing a room with the requester.
     Defaults to false.
 
@@ -4020,6 +4030,27 @@ worker_listeners:
       - names: [client, federation]
 ```
 ---
+### `worker_manhole`
+
+A worker may have a listener for [`manhole`](../../manhole.md).
+It allows server administrators to access a Python shell on the worker.
+
+Example configuration:
+```yaml
+worker_manhole: 9000
+```
+
+This is a short form for:
+```yaml
+worker_listeners:
+  - port: 9000
+    bind_addresses: ['127.0.0.1']
+    type: manhole
+```
+
+It needs also an additional [`manhole_settings`](#manhole_settings) configuration.
+
+---
 ### `worker_daemonize`
 
 Specifies whether the worker should be started as a daemon process.
diff --git a/mypy.ini b/mypy.ini
index 013fbbdfc0..468bfe588c 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -48,9 +48,6 @@ exclude = (?x)
    |tests/logging/__init__.py
    |tests/logging/test_terse_json.py
    |tests/module_api/test_api.py
-   |tests/push/test_email.py
-   |tests/push/test_presentable_names.py
-   |tests/push/test_push_rule_evaluator.py
    |tests/rest/client/test_transactions.py
    |tests/rest/media/v1/test_media_storage.py
    |tests/server.py
@@ -101,7 +98,7 @@ disallow_untyped_defs = True
 [mypy-tests.metrics.*]
 disallow_untyped_defs = True
 
-[mypy-tests.push.test_bulk_push_rule_evaluator]
+[mypy-tests.push.*]
 disallow_untyped_defs = True
 
 [mypy-tests.rest.*]
diff --git a/pyproject.toml b/pyproject.toml
index d3fa8b6b86..15dc010fd8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -48,11 +48,6 @@ line-length = 88
 #  E731: do not assign a lambda expression, use a def
 #  E501: Line too long (black enforces this for us)
 #
-# See https://github.com/charliermarsh/ruff/#pyflakes
-#  F401: unused import
-#  F811: Redefinition of unused
-#  F821: Undefined name
-#
 # flake8-bugbear compatible checks. Its error codes are described at
 # https://github.com/charliermarsh/ruff/#flake8-bugbear
 #  B019: Use of functools.lru_cache or functools.cache on methods can lead to memory leaks
@@ -64,9 +59,6 @@ ignore = [
     "B024",
     "E501",
     "E731",
-    "F401",
-    "F811",
-    "F821",
 ]
 select = [
     # pycodestyle checks.
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index 51d1bac618..7c48d8bccb 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -190,7 +190,7 @@ fi
 
 extra_test_args=()
 
-test_tags="synapse_blacklist,msc3787,msc3874,msc3391"
+test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391"
 
 # All environment variables starting with PASS_ will be shared.
 # (The prefix is stripped off before reaching the container.)
diff --git a/scripts-dev/database-save.sh b/scripts-dev/database-save.sh
index 040c8a4943..91674027ae 100755
--- a/scripts-dev/database-save.sh
+++ b/scripts-dev/database-save.sh
@@ -11,6 +11,5 @@
 sqlite3 "$1" <<'EOF' >table-save.sql
 .dump users
 .dump access_tokens
-.dump presence
 .dump profiles
 EOF
diff --git a/scripts-dev/lint.sh b/scripts-dev/lint.sh
index 2bf58ac5d4..392c509a8a 100755
--- a/scripts-dev/lint.sh
+++ b/scripts-dev/lint.sh
@@ -101,10 +101,43 @@ echo
 # Print out the commands being run
 set -x
 
+# Ensure the sort order of imports.
 isort "${files[@]}"
+
+# Ensure Python code conforms to an opinionated style.
 python3 -m black "${files[@]}"
+
+# Ensure the sample configuration file conforms to style checks.
 ./scripts-dev/config-lint.sh
+
+# Catch any common programming mistakes in Python code.
 # --quiet suppresses the update check.
 ruff --quiet "${files[@]}"
+
+# Catch any common programming mistakes in Rust code.
+#
+# --bins, --examples, --lib, --tests combined explicitly disable checking
+# the benchmarks, which can fail due to `#![feature]` macros not being
+# allowed on the stable rust toolchain (rustc error E0554).
+#
+# --allow-staged and --allow-dirty suppress clippy raising errors
+# for uncommitted files. Only needed when using --fix.
+#
+# -D warnings disables the "warnings" lint.
+#
+# Using --fix has a tendency to cause subsequent runs of clippy to recompile
+# rust code, which can slow down this script. Thus we run clippy without --fix
+# first which is quick, and then re-run it with --fix if an error was found.
+if ! cargo-clippy --bins --examples --lib --tests -- -D warnings > /dev/null 2>&1; then
+  cargo-clippy \
+    --bins --examples --lib --tests --allow-staged --allow-dirty --fix -- -D warnings
+fi
+
+# Ensure the formatting of Rust code.
+cargo-fmt
+
+# Ensure all Pydantic models use strict types.
 ./scripts-dev/check_pydantic_models.py lint
+
+# Ensure type hints are correct.
 mypy
diff --git a/stubs/sortedcontainers/sortedlist.pyi b/stubs/sortedcontainers/sortedlist.pyi
index cd4c969849..1fe1a136f1 100644
--- a/stubs/sortedcontainers/sortedlist.pyi
+++ b/stubs/sortedcontainers/sortedlist.pyi
@@ -7,7 +7,6 @@ from __future__ import annotations
 from typing import (
     Any,
     Callable,
-    Generic,
     Iterable,
     Iterator,
     List,
diff --git a/stubs/sortedcontainers/sortedset.pyi b/stubs/sortedcontainers/sortedset.pyi
index d761c438f7..6db11eacbe 100644
--- a/stubs/sortedcontainers/sortedset.pyi
+++ b/stubs/sortedcontainers/sortedset.pyi
@@ -5,10 +5,8 @@
 from __future__ import annotations
 
 from typing import (
-    AbstractSet,
     Any,
     Callable,
-    Generic,
     Hashable,
     Iterable,
     Iterator,
diff --git a/stubs/synapse/synapse_rust/push.pyi b/stubs/synapse/synapse_rust/push.pyi
index dab5d4aff7..373b40740b 100644
--- a/stubs/synapse/synapse_rust/push.pyi
+++ b/stubs/synapse/synapse_rust/push.pyi
@@ -1,3 +1,17 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from typing import Any, Collection, Dict, Mapping, Optional, Sequence, Tuple, Union
 
 from synapse.types import JsonDict
@@ -54,3 +68,6 @@ class PushRuleEvaluator:
         user_id: Optional[str],
         display_name: Optional[str],
     ) -> Collection[Union[Mapping, str]]: ...
+    def matches(
+        self, condition: JsonDict, user_id: Optional[str], display_name: Optional[str]
+    ) -> bool: ...
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 6a5e7171da..6432d32d83 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -249,6 +249,7 @@ class RoomEncryptionAlgorithms:
 class AccountDataTypes:
     DIRECT: Final = "m.direct"
     IGNORED_USER_LIST: Final = "m.ignored_user_list"
+    TAG: Final = "m.tag"
 
 
 class HistoryVisibility:
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index bcc8abe20c..8108b1e98f 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -199,6 +199,9 @@ class GenericWorkerServer(HomeServer):
                             "A 'media' listener is configured but the media"
                             " repository is disabled. Ignoring."
                         )
+                elif name == "health":
+                    # Skip loading, health resource is always included
+                    continue
 
                 if name == "openid" and "federation" not in res.names:
                     # Only load the openid resource separately if federation resource
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b9be558c7e..6176a70eb2 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -96,6 +96,9 @@ class SynapseHomeServer(HomeServer):
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
+                if name == "health":
+                    # Skip loading, health resource is always included
+                    continue
                 resources.update(self._configure_named_resource(name, res.compress))
 
         additional_resources = listener_config.http_options.additional_resources
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index bd265de536..b5cec132b4 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -1,5 +1,3 @@
-from __future__ import annotations
-
 import argparse
 from typing import (
     Any,
@@ -20,7 +18,7 @@ from typing import (
 
 import jinja2
 
-from synapse.config import (
+from synapse.config import (  # noqa: F401
     account_validity,
     api,
     appservice,
@@ -169,7 +167,7 @@ class RootConfig:
         self, section_name: Literal["caches"]
     ) -> cache.CacheConfig: ...
     @overload
-    def reload_config_section(self, section_name: str) -> Config: ...
+    def reload_config_section(self, section_name: str) -> "Config": ...
 
 class Config:
     root: RootConfig
@@ -202,9 +200,9 @@ def find_config_files(search_paths: List[str]) -> List[str]: ...
 class ShardedWorkerHandlingConfig:
     instances: List[str]
     def __init__(self, instances: List[str]) -> None: ...
-    def should_handle(self, instance_name: str, key: str) -> bool: ...
+    def should_handle(self, instance_name: str, key: str) -> bool: ...  # noqa: F811
 
 class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
-    def get_instance(self, key: str) -> str: ...
+    def get_instance(self, key: str) -> str: ...  # noqa: F811
 
 def read_file(file_path: Any, config_path: Iterable[str]) -> str: ...
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 0f3870bfe1..0444ef8244 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -17,6 +17,7 @@ from typing import Any, Optional
 import attr
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
+from synapse.config import ConfigError
 from synapse.config._base import Config
 from synapse.types import JsonDict
 
@@ -74,11 +75,15 @@ class ExperimentalConfig(Config):
         )
 
         # MSC3706 (server-side support for partial state in /send_join responses)
+        # Synapse will always serve partial state responses to requests using the stable
+        # query parameter `omit_members`. If this flag is set, Synapse will also serve
+        # partial state responses to requests using the unstable query parameter
+        # `org.matrix.msc3706.partial_state`.
         self.msc3706_enabled: bool = experimental.get("msc3706_enabled", False)
 
         # experimental support for faster joins over federation
         # (MSC2775, MSC3706, MSC3895)
-        # requires a target server with msc3706_enabled enabled.
+        # requires a target server that can provide a partial join response (MSC3706)
         self.faster_joins_enabled: bool = experimental.get("faster_joins", False)
 
         # MSC3720 (Account status endpoint)
@@ -93,6 +98,9 @@ class ExperimentalConfig(Config):
         # MSC2815 (allow room moderators to view redacted event content)
         self.msc2815_enabled: bool = experimental.get("msc2815_enabled", False)
 
+        # MSC3391: Removing account data.
+        self.msc3391_enabled = experimental.get("msc3391_enabled", False)
+
         # MSC3773: Thread notifications
         self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False)
 
@@ -127,6 +135,17 @@ class ExperimentalConfig(Config):
             "msc3886_endpoint", None
         )
 
+        # MSC3890: Remotely silence local notifications
+        # Note: This option requires "experimental_features.msc3391_enabled" to be
+        # set to "true", in order to communicate account data deletions to clients.
+        self.msc3890_enabled: bool = experimental.get("msc3890_enabled", False)
+        if self.msc3890_enabled and not self.msc3391_enabled:
+            raise ConfigError(
+                "Option 'experimental_features.msc3391' must be set to 'true' to "
+                "enable 'experimental_features.msc3890'. MSC3391 functionality is "
+                "required to communicate account data deletions to clients."
+            )
+
         # MSC3912: Relation-based redactions.
         self.msc3912_enabled: bool = experimental.get("msc3912_enabled", False)
 
@@ -139,3 +158,6 @@ class ExperimentalConfig(Config):
 
         # MSC3391: Removing account data.
         self.msc3391_enabled = experimental.get("msc3391_enabled", False)
+
+        # MSC3925: do not replace events with their edits
+        self.msc3925_inhibit_edit = experimental.get("msc3925_inhibit_edit", False)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 69310d9035..86cd4af9bd 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -154,17 +154,21 @@ class Keyring:
 
         if key_fetchers is None:
             key_fetchers = (
+                # Fetch keys from the database.
                 StoreKeyFetcher(hs),
+                # Fetch keys from a configured Perspectives server.
                 PerspectivesKeyFetcher(hs),
+                # Fetch keys from the origin server directly.
                 ServerKeyFetcher(hs),
             )
         self._key_fetchers = key_fetchers
 
-        self._server_queue: BatchingQueue[
+        self._fetch_keys_queue: BatchingQueue[
             _FetchKeyRequest, Dict[str, Dict[str, FetchKeyResult]]
         ] = BatchingQueue(
             "keyring_server",
             clock=hs.get_clock(),
+            # The method called to fetch each key
             process_batch_callback=self._inner_fetch_key_requests,
         )
 
@@ -287,7 +291,7 @@ class Keyring:
                 minimum_valid_until_ts=verify_request.minimum_valid_until_ts,
                 key_ids=list(key_ids_to_find),
             )
-            found_keys_by_server = await self._server_queue.add_to_queue(
+            found_keys_by_server = await self._fetch_keys_queue.add_to_queue(
                 key_request, key=verify_request.server_name
             )
 
@@ -352,7 +356,17 @@ class Keyring:
     async def _inner_fetch_key_requests(
         self, requests: List[_FetchKeyRequest]
     ) -> Dict[str, Dict[str, FetchKeyResult]]:
-        """Processing function for the queue of `_FetchKeyRequest`."""
+        """Processing function for the queue of `_FetchKeyRequest`.
+
+        Takes a list of key fetch requests, de-duplicates them and then carries out
+        each request by invoking self._inner_fetch_key_request.
+
+        Args:
+            requests: A list of requests for homeserver verify keys.
+
+        Returns:
+            {server name: {key id: fetch key result}}
+        """
 
         logger.debug("Starting fetch for %s", requests)
 
@@ -397,8 +411,23 @@ class Keyring:
     async def _inner_fetch_key_request(
         self, verify_request: _FetchKeyRequest
     ) -> Dict[str, FetchKeyResult]:
-        """Attempt to fetch the given key by calling each key fetcher one by
-        one.
+        """Attempt to fetch the given key by calling each key fetcher one by one.
+
+        If a key is found, check whether its `valid_until_ts` attribute satisfies the
+        `minimum_valid_until_ts` attribute of the `verify_request`. If it does, we
+        refrain from asking subsequent fetchers for that key.
+
+        Even if the above check fails, we still return the found key - the caller may
+        still find the invalid key result useful. In this case, we continue to ask
+        subsequent fetchers for the invalid key, in case they return a valid result
+        for it. This can happen when fetching a stale key result from the database,
+        before querying the origin server for an up-to-date result.
+
+        Args:
+            verify_request: The request for a verify key. Can include multiple key IDs.
+
+        Returns:
+            A map of {key_id: the key fetch result}.
         """
         logger.debug("Starting fetch for %s", verify_request)
 
@@ -420,26 +449,22 @@ class Keyring:
                 if not key:
                     continue
 
-                # If we already have a result for the given key ID we keep the
+                # If we already have a result for the given key ID, we keep the
                 # one with the highest `valid_until_ts`.
                 existing_key = found_keys.get(key_id)
-                if existing_key:
-                    if key.valid_until_ts <= existing_key.valid_until_ts:
-                        continue
+                if existing_key and existing_key.valid_until_ts > key.valid_until_ts:
+                    continue
+
+                # Check if this key's expiry timestamp is valid for the verify request.
+                if key.valid_until_ts >= verify_request.minimum_valid_until_ts:
+                    # Stop looking for this key from subsequent fetchers.
+                    missing_key_ids.discard(key_id)
 
-                # We always store the returned key even if it doesn't the
+                # We always store the returned key even if it doesn't meet the
                 # `minimum_valid_until_ts` requirement, as some verification
                 # requests may still be able to be satisfied by it.
-                #
-                # We still keep looking for the key from other fetchers in that
-                # case though.
                 found_keys[key_id] = key
 
-                if key.valid_until_ts < verify_request.minimum_valid_until_ts:
-                    continue
-
-                missing_key_ids.discard(key_id)
-
         return found_keys
 
 
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 13fa93afb8..ae57a4df5e 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -403,6 +403,14 @@ class EventClientSerializer:
     clients.
     """
 
+    def __init__(self, inhibit_replacement_via_edits: bool = False):
+        """
+        Args:
+            inhibit_replacement_via_edits: If this is set to True, then events are
+               never replaced by their edits.
+        """
+        self._inhibit_replacement_via_edits = inhibit_replacement_via_edits
+
     def serialize_event(
         self,
         event: Union[JsonDict, EventBase],
@@ -422,6 +430,8 @@ class EventClientSerializer:
                into the event.
             apply_edits: Whether the content of the event should be modified to reflect
                any replacement in `bundle_aggregations[<event_id>].replace`.
+               See also the `inhibit_replacement_via_edits` constructor arg: if that is
+               set to True, then this argument is ignored.
         Returns:
             The serialized event
         """
@@ -495,7 +505,8 @@ class EventClientSerializer:
                 again for additional events in a recursive manner.
             serialized_event: The serialized event which may be modified.
             apply_edits: Whether the content of the event should be modified to reflect
-               any replacement in `aggregations.replace`.
+               any replacement in `aggregations.replace` (subject to the
+               `inhibit_replacement_via_edits` constructor arg).
         """
 
         # We have already checked that aggregations exist for this event.
@@ -518,15 +529,21 @@ class EventClientSerializer:
         if event_aggregations.replace:
             # If there is an edit, optionally apply it to the event.
             edit = event_aggregations.replace
-            if apply_edits:
+            if apply_edits and not self._inhibit_replacement_via_edits:
                 self._apply_edit(event, serialized_event, edit)
 
             # Include information about it in the relations dict.
-            serialized_aggregations[RelationTypes.REPLACE] = {
-                "event_id": edit.event_id,
-                "origin_server_ts": edit.origin_server_ts,
-                "sender": edit.sender,
-            }
+            #
+            # Matrix spec v1.5 (https://spec.matrix.org/v1.5/client-server-api/#server-side-aggregation-of-mreplace-relationships)
+            # said that we should only include the `event_id`, `origin_server_ts` and
+            # `sender` of the edit; however MSC3925 proposes extending it to the whole
+            # of the edit, which is what we do here.
+            serialized_aggregations[RelationTypes.REPLACE] = self.serialize_event(
+                edit,
+                time_now,
+                config=config,
+                apply_edits=False,
+            )
 
         # Include any threaded replies to this event.
         if event_aggregations.thread:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 137cfb3346..15a9a88302 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1014,7 +1014,11 @@ class FederationClient(FederationBase):
         )
 
     async def send_join(
-        self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion
+        self,
+        destinations: Iterable[str],
+        pdu: EventBase,
+        room_version: RoomVersion,
+        partial_state: bool = True,
     ) -> SendJoinResult:
         """Sends a join event to one of a list of homeservers.
 
@@ -1027,6 +1031,10 @@ class FederationClient(FederationBase):
             pdu: event to be sent
             room_version: the version of the room (according to the server that
                 did the make_join)
+            partial_state: whether to ask the remote server to omit membership state
+                events from the response. If the remote server complies,
+                `partial_state` in the send join result will be set. Defaults to
+                `True`.
 
         Returns:
             The result of the send join request.
@@ -1037,7 +1045,9 @@ class FederationClient(FederationBase):
         """
 
         async def send_request(destination: str) -> SendJoinResult:
-            response = await self._do_send_join(room_version, destination, pdu)
+            response = await self._do_send_join(
+                room_version, destination, pdu, omit_members=partial_state
+            )
 
             # If an event was returned (and expected to be returned):
             #
@@ -1142,9 +1152,9 @@ class FederationClient(FederationBase):
                     % (auth_chain_create_events,)
                 )
 
-            if response.partial_state and not response.servers_in_room:
+            if response.members_omitted and not response.servers_in_room:
                 raise InvalidResponseError(
-                    "partial_state was set, but no servers were listed in the room"
+                    "members_omitted was set, but no servers were listed in the room"
                 )
 
             return SendJoinResult(
@@ -1152,7 +1162,7 @@ class FederationClient(FederationBase):
                 state=signed_state,
                 auth_chain=signed_auth,
                 origin=destination,
-                partial_state=response.partial_state,
+                partial_state=response.members_omitted,
                 servers_in_room=response.servers_in_room or [],
             )
 
@@ -1177,7 +1187,11 @@ class FederationClient(FederationBase):
         )
 
     async def _do_send_join(
-        self, room_version: RoomVersion, destination: str, pdu: EventBase
+        self,
+        room_version: RoomVersion,
+        destination: str,
+        pdu: EventBase,
+        omit_members: bool,
     ) -> SendJoinResponse:
         time_now = self._clock.time_msec()
 
@@ -1188,6 +1202,7 @@ class FederationClient(FederationBase):
                 room_id=pdu.room_id,
                 event_id=pdu.event_id,
                 content=pdu.get_pdu_json(time_now),
+                omit_members=omit_members,
             )
         except HttpResponseException as e:
             # If an error is received that is due to an unrecognised endpoint,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bb20af6e91..3197939a36 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -725,10 +725,12 @@ class FederationServer(FederationBase):
             "state": [p.get_pdu_json(time_now) for p in state_events],
             "auth_chain": [p.get_pdu_json(time_now) for p in auth_chain_events],
             "org.matrix.msc3706.partial_state": caller_supports_partial_state,
+            "members_omitted": caller_supports_partial_state,
         }
 
         if servers_in_room is not None:
             resp["org.matrix.msc3706.servers_in_room"] = list(servers_in_room)
+            resp["servers_in_room"] = list(servers_in_room)
 
         return resp
 
@@ -1500,7 +1502,7 @@ def _get_event_ids_for_partial_state_join(
     prev_state_ids: StateMap[str],
     summary: Dict[str, MemberSummary],
 ) -> Collection[str]:
-    """Calculate state to be retuned in a partial_state send_join
+    """Calculate state to be returned in a partial_state send_join
 
     Args:
         join_event: the join event being send_joined
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 77f1f39cac..556883f079 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -351,12 +351,16 @@ class TransportLayerClient:
         room_id: str,
         event_id: str,
         content: JsonDict,
+        omit_members: bool,
     ) -> "SendJoinResponse":
         path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
         query_params: Dict[str, str] = {}
         if self._faster_joins_enabled:
             # lazy-load state on join
-            query_params["org.matrix.msc3706.partial_state"] = "true"
+            query_params["org.matrix.msc3706.partial_state"] = (
+                "true" if omit_members else "false"
+            )
+            query_params["omit_members"] = "true" if omit_members else "false"
 
         return await self.client.put_json(
             destination=destination,
@@ -794,7 +798,7 @@ class SendJoinResponse:
     event: Optional[EventBase] = None
 
     # The room state is incomplete
-    partial_state: bool = False
+    members_omitted: bool = False
 
     # List of servers in the room
     servers_in_room: Optional[List[str]] = None
@@ -834,16 +838,18 @@ def _event_list_parser(
 
 
 @ijson.coroutine
-def _partial_state_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
+def _members_omitted_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
     """Helper function for use with `ijson.items_coro`
 
-    Parses the partial_state field in send_join responses
+    Parses the members_omitted field in send_join responses
     """
     while True:
         val = yield
         if not isinstance(val, bool):
-            raise TypeError("partial_state must be a boolean")
-        response.partial_state = val
+            raise TypeError(
+                "members_omitted (formerly org.matrix.msc370c.partial_state) must be a boolean"
+            )
+        response.members_omitted = val
 
 
 @ijson.coroutine
@@ -904,11 +910,19 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
         if not v1_api:
             self._coros.append(
                 ijson.items_coro(
-                    _partial_state_parser(self._response),
+                    _members_omitted_parser(self._response),
                     "org.matrix.msc3706.partial_state",
                     use_float="True",
                 )
             )
+            # The stable field name comes last, so it "wins" if the fields disagree
+            self._coros.append(
+                ijson.items_coro(
+                    _members_omitted_parser(self._response),
+                    "members_omitted",
+                    use_float="True",
+                )
+            )
 
             self._coros.append(
                 ijson.items_coro(
@@ -918,6 +932,15 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
                 )
             )
 
+            # Again, stable field name comes last
+            self._coros.append(
+                ijson.items_coro(
+                    _servers_in_room_parser(self._response),
+                    "servers_in_room",
+                    use_float="True",
+                )
+            )
+
     def write(self, data: bytes) -> int:
         for c in self._coros:
             c.send(data)
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 53e77b4bb6..17c427387e 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -422,7 +422,7 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
         server_name: str,
     ):
         super().__init__(hs, authenticator, ratelimiter, server_name)
-        self._msc3706_enabled = hs.config.experimental.msc3706_enabled
+        self._read_msc3706_query_param = hs.config.experimental.msc3706_enabled
 
     async def on_PUT(
         self,
@@ -436,10 +436,16 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
         #   match those given in content
 
         partial_state = False
-        if self._msc3706_enabled:
+        # The stable query parameter wins, if it disagrees with the unstable
+        # parameter for some reason.
+        stable_param = parse_boolean_from_args(query, "omit_members", default=None)
+        if stable_param is not None:
+            partial_state = stable_param
+        elif self._read_msc3706_query_param:
             partial_state = parse_boolean_from_args(
                 query, "org.matrix.msc3706.partial_state", default=False
             )
+
         result = await self.handler.on_send_join_request(
             origin, content, room_id, caller_supports_partial_state=partial_state
         )
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index aba7315cf7..834006356a 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -16,6 +16,7 @@ import logging
 import random
 from typing import TYPE_CHECKING, Awaitable, Callable, Collection, List, Optional, Tuple
 
+from synapse.api.constants import AccountDataTypes
 from synapse.replication.http.account_data import (
     ReplicationAddRoomAccountDataRestServlet,
     ReplicationAddTagRestServlet,
@@ -335,7 +336,11 @@ class AccountDataEventSource(EventSource[int, JsonDict]):
 
         for room_id, room_tags in tags.items():
             results.append(
-                {"type": "m.tag", "content": {"tags": room_tags}, "room_id": room_id}
+                {
+                    "type": AccountDataTypes.TAG,
+                    "content": {"tags": room_tags},
+                    "room_id": room_id,
+                }
             )
 
         (
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 89864e1119..0640ea79a0 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -346,6 +346,7 @@ class DeviceHandler(DeviceWorkerHandler):
         super().__init__(hs)
 
         self.federation_sender = hs.get_federation_sender()
+        self._account_data_handler = hs.get_account_data_handler()
         self._storage_controllers = hs.get_storage_controllers()
 
         self.device_list_updater = DeviceListUpdater(hs, self)
@@ -502,7 +503,7 @@ class DeviceHandler(DeviceWorkerHandler):
             else:
                 raise
 
-        # Delete access tokens and e2e keys for each device. Not optimised as it is not
+        # Delete data specific to each device. Not optimised as it is not
         # considered as part of a critical path.
         for device_id in device_ids:
             await self._auth_handler.delete_access_tokens_for_user(
@@ -512,6 +513,14 @@ class DeviceHandler(DeviceWorkerHandler):
                 user_id=user_id, device_id=device_id
             )
 
+            if self.hs.config.experimental.msc3890_enabled:
+                # Remove any local notification settings for this device in accordance
+                # with MSC3890.
+                await self._account_data_handler.remove_account_data_for_user(
+                    user_id,
+                    f"org.matrix.msc3890.local_notification_settings.{device_id}",
+                )
+
         await self.notify_device_update(user_id, device_ids)
 
     async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 9c335e6863..8c2260ad7d 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -15,7 +15,7 @@
 import logging
 from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 
-from synapse.api.constants import EduTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EduTypes, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import SerializeEventConfig
@@ -239,7 +239,7 @@ class InitialSyncHandler:
                 tags = tags_by_room.get(event.room_id)
                 if tags:
                     account_data_events.append(
-                        {"type": "m.tag", "content": {"tags": tags}}
+                        {"type": AccountDataTypes.TAG, "content": {"tags": tags}}
                     )
 
                 account_data = account_data_by_room.get(event.room_id, {})
@@ -326,7 +326,9 @@ class InitialSyncHandler:
         account_data_events = []
         tags = await self.store.get_tags_for_room(user_id, room_id)
         if tags:
-            account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
+            account_data_events.append(
+                {"type": AccountDataTypes.TAG, "content": {"tags": tags}}
+            )
 
         account_data = await self.store.get_account_data_for_room(user_id, room_id)
         for account_data_type, content in account_data.items():
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 88fc51a4c9..3278a695ed 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1531,12 +1531,23 @@ class EventCreationHandler:
         external federation senders don't have to recalculate it themselves.
         """
 
-        for event, _ in events_and_context:
-            if not self._external_cache.is_enabled():
-                return
+        if not self._external_cache.is_enabled():
+            return
 
-            # If external cache is enabled we should always have this.
-            assert self._external_cache_joined_hosts_updates is not None
+        # If external cache is enabled we should always have this.
+        assert self._external_cache_joined_hosts_updates is not None
+
+        for event, event_context in events_and_context:
+            if event_context.partial_state:
+                # To populate the cache for a partial-state event, we either have to
+                # block until full state, which the code below does, or change the
+                # meaning of cache values to be the list of hosts to which we plan to
+                # send events and calculate that instead.
+                #
+                # The federation senders don't use the external cache when sending
+                # events in partial-state rooms anyway, so let's not bother populating
+                # the cache.
+                continue
 
             # We actually store two mappings, event ID -> prev state group,
             # state group -> joined hosts, which is much more space efficient
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2af90b25a3..43e4e7b1b4 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2155,6 +2155,11 @@ class PresenceFederationQueue:
         # This should only be called on a presence writer.
         assert self._presence_writer
 
+        if not states or not destinations:
+            # Ignore calls which either don't have any new states or don't need
+            # to be sent anywhere.
+            return
+
         if self._federation:
             self._federation.send_presence_to_destinations(
                 states=states,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 20ee2f203a..78d488f2b1 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -31,7 +31,12 @@ from typing import (
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import (
+    AccountDataTypes,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@@ -2331,7 +2336,9 @@ class SyncHandler:
 
             account_data_events = []
             if tags is not None:
-                account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
+                account_data_events.append(
+                    {"type": AccountDataTypes.TAG, "content": {"tags": tags}}
+                )
 
             for account_data_type, content in account_data.items():
                 account_data_events.append(
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 6f4a934b05..6153a48257 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -1585,6 +1585,33 @@ class ModuleApi:
 
         return room_id_and_alias["room_id"], room_id_and_alias.get("room_alias", None)
 
+    async def set_displayname(
+        self,
+        user_id: UserID,
+        new_displayname: str,
+        deactivation: bool = False,
+    ) -> None:
+        """Sets a user's display name.
+
+        Added in Synapse v1.76.0.
+
+        Args:
+            user_id:
+                The user whose display name is to be changed.
+            new_displayname:
+                The new display name to give the user.
+            deactivation:
+                Whether this change was made while deactivating the user.
+        """
+        requester = create_requester(user_id)
+        await self._hs.get_profile_handler().set_displayname(
+            target_user=user_id,
+            requester=requester,
+            new_displayname=new_displayname,
+            by_admin=True,
+            deactivation=deactivation,
+        )
+
 
 class PublicRoomListManager:
     """Contains methods for adding to, removing from and querying whether a room
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index b5e40da533..322d695bc7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -33,7 +33,6 @@ from synapse.replication.tcp.streams import (
     PushersStream,
     PushRulesStream,
     ReceiptsStream,
-    TagAccountDataStream,
     ToDeviceStream,
     TypingStream,
     UnPartialStatedEventStream,
@@ -168,7 +167,7 @@ class ReplicationDataHandler:
             self.notifier.on_new_event(
                 StreamKeyType.PUSH_RULES, token, users=[row.user_id for row in rows]
             )
-        elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
+        elif stream_name in AccountDataStream.NAME:
             self.notifier.on_new_event(
                 StreamKeyType.ACCOUNT_DATA, token, users=[row.user_id for row in rows]
             )
@@ -188,7 +187,7 @@ class ReplicationDataHandler:
         elif stream_name == DeviceListsStream.NAME:
             all_room_ids: Set[str] = set()
             for row in rows:
-                if row.entity.startswith("@"):
+                if row.entity.startswith("@") and not row.is_signature:
                     room_ids = await self.store.get_rooms_for_user(row.entity)
                     all_room_ids.update(room_ids)
             self.notifier.on_new_event(
@@ -326,7 +325,7 @@ class ReplicationDataHandler:
             # anyway in that case we don't need to wait.
             return
 
-        current_position = self._streams[stream_name].current_token(self._instance_name)
+        current_position = self._streams[stream_name].current_token(instance_name)
         if position <= current_position:
             # We're already past the position
             return
@@ -423,7 +422,11 @@ class FederationSenderHandler:
             # The entities are either user IDs (starting with '@') whose devices
             # have changed, or remote servers that we need to tell about
             # changes.
-            hosts = {row.entity for row in rows if not row.entity.startswith("@")}
+            hosts = {
+                row.entity
+                for row in rows
+                if not row.entity.startswith("@") and not row.is_signature
+            }
             for host in hosts:
                 self.federation_sender.send_device_messages(host, immediate=False)
 
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 0f166d16aa..d03a53d764 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -58,7 +58,6 @@ from synapse.replication.tcp.streams import (
     PresenceStream,
     ReceiptsStream,
     Stream,
-    TagAccountDataStream,
     ToDeviceStream,
     TypingStream,
 )
@@ -145,7 +144,7 @@ class ReplicationCommandHandler:
 
                 continue
 
-            if isinstance(stream, (AccountDataStream, TagAccountDataStream)):
+            if isinstance(stream, AccountDataStream):
                 # Only add AccountDataStream and TagAccountDataStream as a source on the
                 # instance in charge of account_data persistence.
                 if hs.get_instance_name() in hs.config.worker.writers.account_data:
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index 110f10aab9..9c67f661a3 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -35,10 +35,8 @@ from synapse.replication.tcp.streams._base import (
     PushRulesStream,
     ReceiptsStream,
     Stream,
-    TagAccountDataStream,
     ToDeviceStream,
     TypingStream,
-    UserSignatureStream,
 )
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.replication.tcp.streams.federation import FederationStream
@@ -62,9 +60,7 @@ STREAMS_MAP = {
         DeviceListsStream,
         ToDeviceStream,
         FederationStream,
-        TagAccountDataStream,
         AccountDataStream,
-        UserSignatureStream,
         UnPartialStatedRoomStream,
         UnPartialStatedEventStream,
     )
@@ -83,9 +79,7 @@ __all__ = [
     "CachesStream",
     "DeviceListsStream",
     "ToDeviceStream",
-    "TagAccountDataStream",
     "AccountDataStream",
-    "UserSignatureStream",
     "UnPartialStatedRoomStream",
     "UnPartialStatedEventStream",
 ]
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index e01155ad59..a4bdb48c0c 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -28,8 +28,8 @@ from typing import (
 
 import attr
 
+from synapse.api.constants import AccountDataTypes
 from synapse.replication.http.streams import ReplicationGetStreamUpdates
-from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -463,18 +463,67 @@ class DeviceListsStream(Stream):
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class DeviceListsStreamRow:
         entity: str
+        # Indicates that a user has signed their own device with their user-signing key
+        is_signature: bool
 
     NAME = "device_lists"
     ROW_TYPE = DeviceListsStreamRow
 
     def __init__(self, hs: "HomeServer"):
-        store = hs.get_datastores().main
+        self.store = hs.get_datastores().main
         super().__init__(
             hs.get_instance_name(),
-            current_token_without_instance(store.get_device_stream_token),
-            store.get_all_device_list_changes_for_remotes,
+            current_token_without_instance(self.store.get_device_stream_token),
+            self._update_function,
         )
 
+    async def _update_function(
+        self,
+        instance_name: str,
+        from_token: Token,
+        current_token: Token,
+        target_row_count: int,
+    ) -> StreamUpdateResult:
+        (
+            device_updates,
+            devices_to_token,
+            devices_limited,
+        ) = await self.store.get_all_device_list_changes_for_remotes(
+            instance_name, from_token, current_token, target_row_count
+        )
+
+        (
+            signatures_updates,
+            signatures_to_token,
+            signatures_limited,
+        ) = await self.store.get_all_user_signature_changes_for_remotes(
+            instance_name, from_token, current_token, target_row_count
+        )
+
+        upper_limit_token = current_token
+        if devices_limited:
+            upper_limit_token = min(upper_limit_token, devices_to_token)
+        if signatures_limited:
+            upper_limit_token = min(upper_limit_token, signatures_to_token)
+
+        device_updates = [
+            (stream_id, (entity, False))
+            for stream_id, (entity,) in device_updates
+            if stream_id <= upper_limit_token
+        ]
+
+        signatures_updates = [
+            (stream_id, (entity, True))
+            for stream_id, (entity,) in signatures_updates
+            if stream_id <= upper_limit_token
+        ]
+
+        updates = list(
+            heapq.merge(device_updates, signatures_updates, key=lambda row: row[0])
+        )
+
+        return updates, upper_limit_token, devices_limited or signatures_limited
+
 
 class ToDeviceStream(Stream):
     """New to_device messages for a client"""
@@ -495,27 +544,6 @@ class ToDeviceStream(Stream):
         )
 
 
-class TagAccountDataStream(Stream):
-    """Someone added/removed a tag for a room"""
-
-    @attr.s(slots=True, frozen=True, auto_attribs=True)
-    class TagAccountDataStreamRow:
-        user_id: str
-        room_id: str
-        data: JsonDict
-
-    NAME = "tag_account_data"
-    ROW_TYPE = TagAccountDataStreamRow
-
-    def __init__(self, hs: "HomeServer"):
-        store = hs.get_datastores().main
-        super().__init__(
-            hs.get_instance_name(),
-            current_token_without_instance(store.get_max_account_data_stream_id),
-            store.get_all_updated_tags,
-        )
-
-
 class AccountDataStream(Stream):
     """Global or per room account data was changed"""
 
@@ -560,6 +588,19 @@ class AccountDataStream(Stream):
             to_token = room_results[-1][0]
             limited = True
 
+        tags, tag_to_token, tags_limited = await self.store.get_all_updated_tags(
+            instance_name,
+            from_token,
+            to_token,
+            limit,
+        )
+
+        # again, if the tag results hit the limit, limit the global results to
+        # the same stream token.
+        if tags_limited:
+            to_token = tag_to_token
+            limited = True
+
         # convert the global results to the right format, and limit them to the to_token
         # at the same time
         global_rows = (
@@ -568,11 +609,16 @@ class AccountDataStream(Stream):
             if stream_id <= to_token
         )
 
-        # we know that the room_results are already limited to `to_token` so no need
-        # for a check on `stream_id` here.
         room_rows = (
             (stream_id, (user_id, room_id, account_data_type))
             for stream_id, user_id, room_id, account_data_type in room_results
+            if stream_id <= to_token
+        )
+
+        tag_rows = (
+            (stream_id, (user_id, room_id, AccountDataTypes.TAG))
+            for stream_id, user_id, room_id in tags
+            if stream_id <= to_token
         )
 
         # We need to return a sorted list, so merge them together.
@@ -582,24 +628,7 @@ class AccountDataStream(Stream):
         # leading to a comparison between the data tuples. The comparison could
         # fail due to attempting to compare the `room_id` which results in a
         # `TypeError` from comparing a `str` vs `None`.
-        updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
-        return updates, to_token, limited
-
-
-class UserSignatureStream(Stream):
-    """A user has signed their own device with their user-signing key"""
-
-    @attr.s(slots=True, frozen=True, auto_attribs=True)
-    class UserSignatureStreamRow:
-        user_id: str
-
-    NAME = "user_signature"
-    ROW_TYPE = UserSignatureStreamRow
-
-    def __init__(self, hs: "HomeServer"):
-        store = hs.get_datastores().main
-        super().__init__(
-            hs.get_instance_name(),
-            current_token_without_instance(store.get_device_stream_token),
-            store.get_all_user_signature_changes_for_remotes,
+        updates = list(
+            heapq.merge(room_rows, global_rows, tag_rows, key=lambda row: row[0])
         )
+        return updates, to_token, limited
diff --git a/synapse/server.py b/synapse/server.py
index 5baae2325e..f4ab94c4f3 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -743,7 +743,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_event_client_serializer(self) -> EventClientSerializer:
-        return EventClientSerializer()
+        return EventClientSerializer(self.config.experimental.msc3925_inhibit_edit)
 
     @cache_in_self
     def get_password_policy_handler(self) -> PasswordPolicyHandler:
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 86032897f5..881d7089db 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -27,7 +27,7 @@ from typing import (
 )
 
 from synapse.api.constants import AccountDataTypes
-from synapse.replication.tcp.streams import AccountDataStream, TagAccountDataStream
+from synapse.replication.tcp.streams import AccountDataStream
 from synapse.storage._base import db_to_json
 from synapse.storage.database import (
     DatabasePool,
@@ -454,9 +454,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
     def process_replication_position(
         self, stream_name: str, instance_name: str, token: int
     ) -> None:
-        if stream_name == TagAccountDataStream.NAME:
-            self._account_data_id_gen.advance(instance_name, token)
-        elif stream_name == AccountDataStream.NAME:
+        if stream_name == AccountDataStream.NAME:
             self._account_data_id_gen.advance(instance_name, token)
         super().process_replication_position(stream_name, instance_name, token)
 
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b067664473..cd186c8472 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -38,7 +38,7 @@ from synapse.logging.opentracing import (
     whitelisted_homeserver,
 )
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
+from synapse.replication.tcp.streams._base import DeviceListsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -163,9 +163,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
     ) -> None:
         if stream_name == DeviceListsStream.NAME:
             self._invalidate_caches_for_devices(token, rows)
-        elif stream_name == UserSignatureStream.NAME:
-            for row in rows:
-                self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
+
         return super().process_replication_rows(stream_name, instance_name, token, rows)
 
     def process_replication_position(
@@ -173,14 +171,17 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
     ) -> None:
         if stream_name == DeviceListsStream.NAME:
             self._device_list_id_gen.advance(instance_name, token)
-        elif stream_name == UserSignatureStream.NAME:
-            self._device_list_id_gen.advance(instance_name, token)
+
         super().process_replication_position(stream_name, instance_name, token)
 
     def _invalidate_caches_for_devices(
         self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
     ) -> None:
         for row in rows:
+            if row.is_signature:
+                self._user_signature_stream_cache.entity_has_changed(row.entity, token)
+                continue
+
             # The entities are either user IDs (starting with '@') whose devices
             # have changed, or remote servers that we need to tell about
             # changes.
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 9e31798ab1..b9d3c36d60 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -69,6 +69,8 @@ class _BackgroundUpdates:
 
     EVENTS_POPULATE_STATE_KEY_REJECTIONS = "events_populate_state_key_rejections"
 
+    EVENTS_JUMP_TO_DATE_INDEX = "events_jump_to_date_index"
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
@@ -260,6 +262,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._background_events_populate_state_key_rejections,
         )
 
+        # Add an index that would be useful for jumping to date using
+        # get_event_id_for_timestamp.
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.EVENTS_JUMP_TO_DATE_INDEX,
+            index_name="events_jump_to_date_idx",
+            table="events",
+            columns=["room_id", "origin_server_ts"],
+            where_clause="NOT outlier",
+        )
+
     async def _background_reindex_fields_sender(
         self, progress: JsonDict, batch_size: int
     ) -> int:
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index e23c927e02..d5500cdd47 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -17,7 +17,8 @@
 import logging
 from typing import Any, Dict, Iterable, List, Tuple, cast
 
-from synapse.replication.tcp.streams import TagAccountDataStream
+from synapse.api.constants import AccountDataTypes
+from synapse.replication.tcp.streams import AccountDataStream
 from synapse.storage._base import db_to_json
 from synapse.storage.database import LoggingTransaction
 from synapse.storage.databases.main.account_data import AccountDataWorkerStore
@@ -54,7 +55,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
     async def get_all_updated_tags(
         self, instance_name: str, last_id: int, current_id: int, limit: int
-    ) -> Tuple[List[Tuple[int, Tuple[str, str, str]]], int, bool]:
+    ) -> Tuple[List[Tuple[int, str, str]], int, bool]:
         """Get updates for tags replication stream.
 
         Args:
@@ -73,7 +74,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             The token returned can be used in a subsequent call to this
             function to get further updatees.
 
-            The updates are a list of 2-tuples of stream ID and the row data
+            The updates are a list of tuples of stream ID, user ID and room ID
         """
 
         if last_id == current_id:
@@ -96,38 +97,13 @@ class TagsWorkerStore(AccountDataWorkerStore):
             "get_all_updated_tags", get_all_updated_tags_txn
         )
 
-        def get_tag_content(
-            txn: LoggingTransaction, tag_ids: List[Tuple[int, str, str]]
-        ) -> List[Tuple[int, Tuple[str, str, str]]]:
-            sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
-            results = []
-            for stream_id, user_id, room_id in tag_ids:
-                txn.execute(sql, (user_id, room_id))
-                tags = []
-                for tag, content in txn:
-                    tags.append(json_encoder.encode(tag) + ":" + content)
-                tag_json = "{" + ",".join(tags) + "}"
-                results.append((stream_id, (user_id, room_id, tag_json)))
-
-            return results
-
-        batch_size = 50
-        results = []
-        for i in range(0, len(tag_ids), batch_size):
-            tags = await self.db_pool.runInteraction(
-                "get_all_updated_tag_content",
-                get_tag_content,
-                tag_ids[i : i + batch_size],
-            )
-            results.extend(tags)
-
         limited = False
         upto_token = current_id
-        if len(results) >= limit:
-            upto_token = results[-1][0]
+        if len(tag_ids) >= limit:
+            upto_token = tag_ids[-1][0]
             limited = True
 
-        return results, upto_token, limited
+        return tag_ids, upto_token, limited
 
     async def get_updated_tags(
         self, user_id: str, stream_id: int
@@ -299,20 +275,16 @@ class TagsWorkerStore(AccountDataWorkerStore):
         token: int,
         rows: Iterable[Any],
     ) -> None:
-        if stream_name == TagAccountDataStream.NAME:
+        if stream_name == AccountDataStream.NAME:
             for row in rows:
-                self.get_tags_for_user.invalidate((row.user_id,))
-                self._account_data_stream_cache.entity_has_changed(row.user_id, token)
+                if row.data_type == AccountDataTypes.TAG:
+                    self.get_tags_for_user.invalidate((row.user_id,))
+                    self._account_data_stream_cache.entity_has_changed(
+                        row.user_id, token
+                    )
 
         super().process_replication_rows(stream_name, instance_name, token, rows)
 
-    def process_replication_position(
-        self, stream_name: str, instance_name: str, token: int
-    ) -> None:
-        if stream_name == TagAccountDataStream.NAME:
-            self._account_data_id_gen.advance(instance_name, token)
-        super().process_replication_position(stream_name, instance_name, token)
-
 
 class TagsStore(TagsWorkerStore):
     pass
diff --git a/synapse/storage/schema/main/delta/73/24_events_jump_to_date_index.sql b/synapse/storage/schema/main/delta/73/24_events_jump_to_date_index.sql
new file mode 100644
index 0000000000..67059909a1
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/24_events_jump_to_date_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7324, 'events_jump_to_date_index', '{}');
diff --git a/synapse/storage/schema/main/delta/73/25drop_presence.sql b/synapse/storage/schema/main/delta/73/25drop_presence.sql
new file mode 100644
index 0000000000..9f6ffa20b6
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/25drop_presence.sql
@@ -0,0 +1,17 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- this table is unused
+DROP TABLE presence;
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 2aceb1a47f..f262bf95a0 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -364,12 +364,22 @@ class _PerHostRatelimiter:
 
     def _on_exit(self, request_id: object) -> None:
         logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id))
-        self.current_processing.discard(request_id)
-        try:
-            # start processing the next item on the queue.
-            _, deferred = self.ready_request_queue.popitem(last=False)
 
-            with PreserveLoggingContext():
-                deferred.callback(None)
-        except KeyError:
-            pass
+        # When requests complete synchronously, we will recursively start the next
+        # request in the queue. To avoid stack exhaustion, we defer starting the next
+        # request until the next reactor tick.
+
+        def start_next_request() -> None:
+            # We only remove the completed request from the list when we're about to
+            # start the next one, otherwise we can allow extra requests through.
+            self.current_processing.discard(request_id)
+            try:
+                # start processing the next item on the queue.
+                _, deferred = self.ready_request_queue.popitem(last=False)
+
+                with PreserveLoggingContext():
+                    deferred.callback(None)
+            except KeyError:
+                pass
+
+        self.clock.call_later(0.0, start_next_request)
diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py
index 177e5b5afc..be719e49c0 100644
--- a/tests/federation/test_federation_server.py
+++ b/tests/federation/test_federation_server.py
@@ -211,9 +211,8 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
         )
         self.assertEqual(r[("m.room.member", joining_user)].membership, "join")
 
-    @override_config({"experimental_features": {"msc3706_enabled": True}})
     def test_send_join_partial_state(self) -> None:
-        """When MSC3706 support is enabled, /send_join should return partial state"""
+        """/send_join should return partial state, if requested"""
         joining_user = "@misspiggy:" + self.OTHER_SERVER_NAME
         join_result = self._make_join(joining_user)
 
@@ -224,7 +223,7 @@ class SendJoinFederationTests(unittest.FederatingHomeserverTestCase):
         )
         channel = self.make_signed_federation_request(
             "PUT",
-            f"/_matrix/federation/v2/send_join/{self._room_id}/x?org.matrix.msc3706.partial_state=true",
+            f"/_matrix/federation/v2/send_join/{self._room_id}/x?omit_members=true",
             content=join_event_dict,
         )
         self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
diff --git a/tests/federation/transport/test_client.py b/tests/federation/transport/test_client.py
index b84c74fc0e..3d61b1e8a9 100644
--- a/tests/federation/transport/test_client.py
+++ b/tests/federation/transport/test_client.py
@@ -13,12 +13,14 @@
 # limitations under the License.
 
 import json
+from typing import List, Optional
 from unittest.mock import Mock
 
 import ijson.common
 
 from synapse.api.room_versions import RoomVersions
 from synapse.federation.transport.client import SendJoinParser
+from synapse.types import JsonDict
 from synapse.util import ExceptionBundle
 
 from tests.unittest import TestCase
@@ -66,38 +68,73 @@ class SendJoinParserTestCase(TestCase):
         self.assertEqual(len(parsed_response.state), 1, parsed_response)
         self.assertEqual(parsed_response.event_dict, {}, parsed_response)
         self.assertIsNone(parsed_response.event, parsed_response)
-        self.assertFalse(parsed_response.partial_state, parsed_response)
+        self.assertFalse(parsed_response.members_omitted, parsed_response)
         self.assertEqual(parsed_response.servers_in_room, None, parsed_response)
 
     def test_partial_state(self) -> None:
-        """Check that the partial_state flag is correctly parsed"""
-        parser = SendJoinParser(RoomVersions.V1, False)
-        response = {
-            "org.matrix.msc3706.partial_state": True,
-        }
+        """Check that the members_omitted flag is correctly parsed"""
 
-        serialised_response = json.dumps(response).encode()
+        def parse(response: JsonDict) -> bool:
+            parser = SendJoinParser(RoomVersions.V1, False)
+            serialised_response = json.dumps(response).encode()
 
-        # Send data to the parser
-        parser.write(serialised_response)
+            # Send data to the parser
+            parser.write(serialised_response)
 
-        # Retrieve and check the parsed SendJoinResponse
-        parsed_response = parser.finish()
-        self.assertTrue(parsed_response.partial_state)
+            # Retrieve and check the parsed SendJoinResponse
+            parsed_response = parser.finish()
+            return parsed_response.members_omitted
 
-    def test_servers_in_room(self) -> None:
-        """Check that the servers_in_room field is correctly parsed"""
-        parser = SendJoinParser(RoomVersions.V1, False)
-        response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
+        self.assertTrue(parse({"members_omitted": True}))
+        self.assertTrue(parse({"org.matrix.msc3706.partial_state": True}))
 
-        serialised_response = json.dumps(response).encode()
+        self.assertFalse(parse({"members_omitted": False}))
+        self.assertFalse(parse({"org.matrix.msc3706.partial_state": False}))
 
-        # Send data to the parser
-        parser.write(serialised_response)
+        # If there's a conflict, the stable field wins.
+        self.assertTrue(
+            parse({"members_omitted": True, "org.matrix.msc3706.partial_state": False})
+        )
+        self.assertFalse(
+            parse({"members_omitted": False, "org.matrix.msc3706.partial_state": True})
+        )
 
-        # Retrieve and check the parsed SendJoinResponse
-        parsed_response = parser.finish()
-        self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])
+    def test_servers_in_room(self) -> None:
+        """Check that the servers_in_room field is correctly parsed"""
+
+        def parse(response: JsonDict) -> Optional[List[str]]:
+            parser = SendJoinParser(RoomVersions.V1, False)
+            serialised_response = json.dumps(response).encode()
+
+            # Send data to the parser
+            parser.write(serialised_response)
+
+            # Retrieve and check the parsed SendJoinResponse
+            parsed_response = parser.finish()
+            return parsed_response.servers_in_room
+
+        self.assertEqual(
+            parse({"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}),
+            ["hs1", "hs2"],
+        )
+        self.assertEqual(parse({"servers_in_room": ["example.com"]}), ["example.com"])
+
+        # If both are provided, the stable identifier should win
+        self.assertEqual(
+            parse(
+                {
+                    "org.matrix.msc3706.servers_in_room": ["old"],
+                    "servers_in_room": ["new"],
+                }
+            ),
+            ["new"],
+        )
+
+        # And lastly, we should be able to tell if neither field was present.
+        self.assertEqual(
+            parse({}),
+            None,
+        )
 
     def test_errors_closing_coroutines(self) -> None:
         """Check we close all coroutines, even if closing the first raises an Exception.
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index b0f3f4374d..9919938e80 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -110,6 +110,24 @@ class ModuleApiTestCase(HomeserverTestCase):
         self.assertEqual(found_user.user_id.to_string(), user_id)
         self.assertIdentical(found_user.is_admin, True)
 
+    def test_can_set_displayname(self):
+        localpart = "alice_wants_a_new_displayname"
+        user_id = self.register_user(
+            localpart, "1234", displayname="Alice", admin=False
+        )
+        found_userinfo = self.get_success(self.module_api.get_userinfo_by_id(user_id))
+
+        self.get_success(
+            self.module_api.set_displayname(
+                found_userinfo.user_id, "Bob", deactivation=False
+            )
+        )
+        found_profile = self.get_success(
+            self.module_api.get_profile_for_user(localpart)
+        )
+
+        self.assertEqual(found_profile.display_name, "Bob")
+
     def test_get_userinfo_by_id(self):
         user_id = self.register_user("alice", "1234")
         found_user = self.get_success(self.module_api.get_userinfo_by_id(user_id))
diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py
index 1cd453248e..9c17a42b65 100644
--- a/tests/push/test_bulk_push_rule_evaluator.py
+++ b/tests/push/test_bulk_push_rule_evaluator.py
@@ -1,10 +1,28 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from unittest.mock import patch
 
+from twisted.test.proto_helpers import MemoryReactor
+
 from synapse.api.room_versions import RoomVersions
 from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
 from synapse.rest import admin
 from synapse.rest.client import login, register, room
+from synapse.server import HomeServer
 from synapse.types import create_requester
+from synapse.util import Clock
 
 from tests.test_utils import simple_async_mock
 from tests.unittest import HomeserverTestCase, override_config
@@ -19,6 +37,20 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
         register.register_servlets,
     ]
 
+    def prepare(
+        self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
+    ) -> None:
+        # Create a new user and room.
+        self.alice = self.register_user("alice", "pass")
+        self.token = self.login(self.alice, "pass")
+        self.requester = create_requester(self.alice)
+
+        self.room_id = self.helper.create_room_as(
+            self.alice, room_version=RoomVersions.V9.identifier, tok=self.token
+        )
+
+        self.event_creation_handler = self.hs.get_event_creation_handler()
+
     def test_action_for_event_by_user_handles_noninteger_power_levels(self) -> None:
         """We should convert floats and strings to integers before passing to Rust.
 
@@ -26,46 +58,37 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
 
         A lack of validation: the gift that keeps on giving.
         """
-        # Create a new user and room.
-        alice = self.register_user("alice", "pass")
-        token = self.login(alice, "pass")
-
-        room_id = self.helper.create_room_as(
-            alice, room_version=RoomVersions.V9.identifier, tok=token
-        )
 
         # Alter the power levels in that room to include stringy and floaty levels.
         # We need to suppress the validation logic or else it will reject these dodgy
         # values. (Presumably this validation was not always present.)
-        event_creation_handler = self.hs.get_event_creation_handler()
-        requester = create_requester(alice)
         with patch("synapse.events.validator.validate_canonicaljson"), patch(
             "synapse.events.validator.jsonschema.validate"
         ):
             self.helper.send_state(
-                room_id,
+                self.room_id,
                 "m.room.power_levels",
                 {
-                    "users": {alice: "100"},  # stringy
+                    "users": {self.alice: "100"},  # stringy
                     "notifications": {"room": 100.0},  # float
                 },
-                token,
+                self.token,
                 state_key="",
             )
 
         # Create a new message event, and try to evaluate it under the dodgy
         # power level event.
         event, context = self.get_success(
-            event_creation_handler.create_event(
-                requester,
+            self.event_creation_handler.create_event(
+                self.requester,
                 {
                     "type": "m.room.message",
-                    "room_id": room_id,
+                    "room_id": self.room_id,
                     "content": {
                         "msgtype": "m.text",
                         "body": "helo",
                     },
-                    "sender": alice,
+                    "sender": self.alice,
                 },
             )
         )
@@ -77,39 +100,29 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase):
     @override_config({"push": {"enabled": False}})
     def test_action_for_event_by_user_disabled_by_config(self) -> None:
         """Ensure that push rules are not calculated when disabled in the config"""
-        # Create a new user and room.
-        alice = self.register_user("alice", "pass")
-        token = self.login(alice, "pass")
 
-        room_id = self.helper.create_room_as(
-            alice, room_version=RoomVersions.V9.identifier, tok=token
-        )
-
-        # Alter the power levels in that room to include stringy and floaty levels.
-        # We need to suppress the validation logic or else it will reject these dodgy
-        # values. (Presumably this validation was not always present.)
-        event_creation_handler = self.hs.get_event_creation_handler()
-        requester = create_requester(alice)
-
-        # Create a new message event, and try to evaluate it under the dodgy
-        # power level event.
+        # Create a new message event which should cause a notification.
         event, context = self.get_success(
-            event_creation_handler.create_event(
-                requester,
+            self.event_creation_handler.create_event(
+                self.requester,
                 {
                     "type": "m.room.message",
-                    "room_id": room_id,
+                    "room_id": self.room_id,
                     "content": {
                         "msgtype": "m.text",
                         "body": "helo",
                     },
-                    "sender": alice,
+                    "sender": self.alice,
                 },
             )
         )
 
         bulk_evaluator = BulkPushRuleEvaluator(self.hs)
+        # Mock the method which calculates push rules -- we do this instead of
+        # e.g. checking the results in the database because we want to ensure
+        # that code isn't even running.
         bulk_evaluator._action_for_event_by_user = simple_async_mock()  # type: ignore[assignment]
-        # should not raise
+
+        # Ensure no actions are generated!
         self.get_success(bulk_evaluator.action_for_events_by_user([(event, context)]))
         bulk_evaluator._action_for_event_by_user.assert_not_called()
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index 57b2f0536e..ab8bb417e7 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -13,25 +13,28 @@
 # limitations under the License.
 import email.message
 import os
-from typing import Dict, List, Sequence, Tuple
+from typing import Any, Dict, List, Sequence, Tuple
 
 import attr
 import pkg_resources
 
 from twisted.internet.defer import Deferred
+from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.api.errors import Codes, SynapseError
 from synapse.rest.client import login, room
+from synapse.server import HomeServer
+from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
 
 
-@attr.s
+@attr.s(auto_attribs=True)
 class _User:
     "Helper wrapper for user ID and access token"
-    id = attr.ib()
-    token = attr.ib()
+    id: str
+    token: str
 
 
 class EmailPusherTests(HomeserverTestCase):
@@ -41,10 +44,9 @@ class EmailPusherTests(HomeserverTestCase):
         room.register_servlets,
         login.register_servlets,
     ]
-    user_id = True
     hijack_auth = False
 
-    def make_homeserver(self, reactor, clock):
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
 
         config = self.default_config()
         config["email"] = {
@@ -72,17 +74,17 @@ class EmailPusherTests(HomeserverTestCase):
         # List[Tuple[Deferred, args, kwargs]]
         self.email_attempts: List[Tuple[Deferred, Sequence, Dict]] = []
 
-        def sendmail(*args, **kwargs):
+        def sendmail(*args: Any, **kwargs: Any) -> Deferred:
             # This mocks out synapse.reactor.send_email._sendmail.
-            d = Deferred()
+            d: Deferred = Deferred()
             self.email_attempts.append((d, args, kwargs))
             return d
 
-        hs.get_send_email_handler()._sendmail = sendmail
+        hs.get_send_email_handler()._sendmail = sendmail  # type: ignore[assignment]
 
         return hs
 
-    def prepare(self, reactor, clock, hs):
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         # Register the user who gets notified
         self.user_id = self.register_user("user", "pass")
         self.access_token = self.login("user", "pass")
@@ -129,7 +131,7 @@ class EmailPusherTests(HomeserverTestCase):
         self.auth_handler = hs.get_auth_handler()
         self.store = hs.get_datastores().main
 
-    def test_need_validated_email(self):
+    def test_need_validated_email(self) -> None:
         """Test that we can only add an email pusher if the user has validated
         their email.
         """
@@ -151,7 +153,7 @@ class EmailPusherTests(HomeserverTestCase):
         self.assertEqual(400, cm.exception.code)
         self.assertEqual(Codes.THREEPID_NOT_FOUND, cm.exception.errcode)
 
-    def test_simple_sends_email(self):
+    def test_simple_sends_email(self) -> None:
         # Create a simple room with two users
         room = self.helper.create_room_as(self.user_id, tok=self.access_token)
         self.helper.invite(
@@ -171,7 +173,7 @@ class EmailPusherTests(HomeserverTestCase):
 
         self._check_for_mail()
 
-    def test_invite_sends_email(self):
+    def test_invite_sends_email(self) -> None:
         # Create a room and invite the user to it
         room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token)
         self.helper.invite(
@@ -184,7 +186,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about the invite
         self._check_for_mail()
 
-    def test_invite_to_empty_room_sends_email(self):
+    def test_invite_to_empty_room_sends_email(self) -> None:
         # Create a room and invite the user to it
         room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token)
         self.helper.invite(
@@ -200,7 +202,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about the invite
         self._check_for_mail()
 
-    def test_multiple_members_email(self):
+    def test_multiple_members_email(self) -> None:
         # We want to test multiple notifications, so we pause processing of push
         # while we send messages.
         self.pusher._pause_processing()
@@ -227,7 +229,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about those messages
         self._check_for_mail()
 
-    def test_multiple_rooms(self):
+    def test_multiple_rooms(self) -> None:
         # We want to test multiple notifications from multiple rooms, so we pause
         # processing of push while we send messages.
         self.pusher._pause_processing()
@@ -257,7 +259,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about those messages
         self._check_for_mail()
 
-    def test_room_notifications_include_avatar(self):
+    def test_room_notifications_include_avatar(self) -> None:
         # Create a room and set its avatar.
         room = self.helper.create_room_as(self.user_id, tok=self.access_token)
         self.helper.send_state(
@@ -290,7 +292,7 @@ class EmailPusherTests(HomeserverTestCase):
         )
         self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html)
 
-    def test_empty_room(self):
+    def test_empty_room(self) -> None:
         """All users leaving a room shouldn't cause the pusher to break."""
         # Create a simple room with two users
         room = self.helper.create_room_as(self.user_id, tok=self.access_token)
@@ -309,7 +311,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about that message
         self._check_for_mail()
 
-    def test_empty_room_multiple_messages(self):
+    def test_empty_room_multiple_messages(self) -> None:
         """All users leaving a room shouldn't cause the pusher to break."""
         # Create a simple room with two users
         room = self.helper.create_room_as(self.user_id, tok=self.access_token)
@@ -329,7 +331,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about that message
         self._check_for_mail()
 
-    def test_encrypted_message(self):
+    def test_encrypted_message(self) -> None:
         room = self.helper.create_room_as(self.user_id, tok=self.access_token)
         self.helper.invite(
             room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id
@@ -342,7 +344,7 @@ class EmailPusherTests(HomeserverTestCase):
         # We should get emailed about that message
         self._check_for_mail()
 
-    def test_no_email_sent_after_removed(self):
+    def test_no_email_sent_after_removed(self) -> None:
         # Create a simple room with two users
         room = self.helper.create_room_as(self.user_id, tok=self.access_token)
         self.helper.invite(
@@ -379,7 +381,7 @@ class EmailPusherTests(HomeserverTestCase):
         pushers = list(pushers)
         self.assertEqual(len(pushers), 0)
 
-    def test_remove_unlinked_pushers_background_job(self):
+    def test_remove_unlinked_pushers_background_job(self) -> None:
         """Checks that all existing pushers associated with unlinked email addresses are removed
         upon running the remove_deleted_email_pushers background update.
         """
diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index afaafe79aa..23447cc310 100644
--- a/tests/push/test_http.py
+++ b/tests/push/test_http.py
@@ -46,7 +46,7 @@ class HTTPPusherTests(HomeserverTestCase):
 
         m = Mock()
 
-        def post_json_get_json(url, body):
+        def post_json_get_json(url: str, body: JsonDict) -> Deferred:
             d: Deferred = Deferred()
             self.push_attempts.append((d, url, body))
             return make_deferred_yieldable(d)
diff --git a/tests/push/test_presentable_names.py b/tests/push/test_presentable_names.py
index aff563919d..d37f8ce262 100644
--- a/tests/push/test_presentable_names.py
+++ b/tests/push/test_presentable_names.py
@@ -12,11 +12,11 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-from typing import Iterable, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple, cast
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.room_versions import RoomVersions
-from synapse.events import FrozenEvent
+from synapse.events import EventBase, FrozenEvent
 from synapse.push.presentable_names import calculate_room_name
 from synapse.types import StateKey, StateMap
 
@@ -51,13 +51,15 @@ class MockDataStore:
             )
 
     async def get_event(
-        self, event_id: StateKey, allow_none: bool = False
+        self, event_id: str, allow_none: bool = False
     ) -> Optional[FrozenEvent]:
         assert allow_none, "Mock not configured for allow_none = False"
 
-        return self._events.get(event_id)
+        # Decode the state key from the event ID.
+        state_key = cast(Tuple[str, str], tuple(event_id.split("|", 1)))
+        return self._events.get(state_key)
 
-    async def get_events(self, event_ids: Iterable[StateKey]):
+    async def get_events(self, event_ids: Iterable[StateKey]) -> StateMap[EventBase]:
         # This is cheating since it just returns all events.
         return self._events
 
@@ -68,17 +70,17 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
 
     def _calculate_room_name(
         self,
-        events: StateMap[dict],
+        events: Iterable[Tuple[Tuple[str, str], dict]],
         user_id: str = "",
         fallback_to_members: bool = True,
         fallback_to_single_member: bool = True,
-    ):
-        # This isn't 100% accurate, but works with MockDataStore.
-        room_state_ids = {k[0]: k[0] for k in events}
+    ) -> Optional[str]:
+        # Encode the state key into the event ID.
+        room_state_ids = {k[0]: "|".join(k[0]) for k in events}
 
         return self.get_success(
             calculate_room_name(
-                MockDataStore(events),
+                MockDataStore(events),  # type: ignore[arg-type]
                 room_state_ids,
                 user_id or self.USER_ID,
                 fallback_to_members,
@@ -86,9 +88,9 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
             )
         )
 
-    def test_name(self):
+    def test_name(self) -> None:
         """A room name event should be used."""
-        events = [
+        events: List[Tuple[Tuple[str, str], dict]] = [
             ((EventTypes.Name, ""), {"name": "test-name"}),
         ]
         self.assertEqual("test-name", self._calculate_room_name(events))
@@ -100,9 +102,9 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
         events = [((EventTypes.Name, ""), {"name": 1})]
         self.assertEqual(1, self._calculate_room_name(events))
 
-    def test_canonical_alias(self):
+    def test_canonical_alias(self) -> None:
         """An canonical alias should be used."""
-        events = [
+        events: List[Tuple[Tuple[str, str], dict]] = [
             ((EventTypes.CanonicalAlias, ""), {"alias": "#test-name:test"}),
         ]
         self.assertEqual("#test-name:test", self._calculate_room_name(events))
@@ -114,9 +116,9 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
         events = [((EventTypes.CanonicalAlias, ""), {"alias": "test-name"})]
         self.assertEqual("Empty Room", self._calculate_room_name(events))
 
-    def test_invite(self):
+    def test_invite(self) -> None:
         """An invite has special behaviour."""
-        events = [
+        events: List[Tuple[Tuple[str, str], dict]] = [
             ((EventTypes.Member, self.USER_ID), {"membership": Membership.INVITE}),
             ((EventTypes.Member, self.OTHER_USER_ID), {"displayname": "Other User"}),
         ]
@@ -140,9 +142,9 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
         ]
         self.assertEqual("Room Invite", self._calculate_room_name(events))
 
-    def test_no_members(self):
+    def test_no_members(self) -> None:
         """Behaviour of an empty room."""
-        events = []
+        events: List[Tuple[Tuple[str, str], dict]] = []
         self.assertEqual("Empty Room", self._calculate_room_name(events))
 
         # Note that events with invalid (or missing) membership are ignored.
@@ -152,7 +154,7 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
         ]
         self.assertEqual("Empty Room", self._calculate_room_name(events))
 
-    def test_no_other_members(self):
+    def test_no_other_members(self) -> None:
         """Behaviour of a room with no other members in it."""
         events = [
             (
@@ -185,7 +187,7 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
             self._calculate_room_name(events, user_id=self.OTHER_USER_ID),
         )
 
-    def test_one_other_member(self):
+    def test_one_other_member(self) -> None:
         """Behaviour of a room with a single other member."""
         events = [
             ((EventTypes.Member, self.USER_ID), {"membership": Membership.JOIN}),
@@ -209,7 +211,7 @@ class PresentableNamesTestCase(unittest.HomeserverTestCase):
         ]
         self.assertEqual("@user:test", self._calculate_room_name(events))
 
-    def test_other_members(self):
+    def test_other_members(self) -> None:
         """Behaviour of a room with multiple other members."""
         # Two other members.
         events = [
diff --git a/tests/push/test_push_rule_evaluator.py b/tests/push/test_push_rule_evaluator.py
index 5ababe6a39..1b87756b75 100644
--- a/tests/push/test_push_rule_evaluator.py
+++ b/tests/push/test_push_rule_evaluator.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Dict, Optional, Union
+from typing import Dict, List, Optional, Union, cast
 
 import frozendict
 
@@ -30,7 +30,7 @@ from synapse.rest.client import login, register, room
 from synapse.server import HomeServer
 from synapse.storage.databases.main.appservice import _make_exclusive_regex
 from synapse.synapse_rust.push import PushRuleEvaluator
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, JsonMapping, UserID
 from synapse.util import Clock
 
 from tests import unittest
@@ -39,7 +39,7 @@ from tests.test_utils.event_injection import create_event, inject_member_event
 
 class PushRuleEvaluatorTestCase(unittest.TestCase):
     def _get_evaluator(
-        self, content: JsonDict, related_events=None
+        self, content: JsonMapping, related_events: Optional[JsonDict] = None
     ) -> PushRuleEvaluator:
         event = FrozenEvent(
             {
@@ -59,7 +59,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
             _flatten_dict(event),
             room_member_count,
             sender_power_level,
-            power_levels.get("notifications", {}),
+            cast(Dict[str, int], power_levels.get("notifications", {})),
             {} if related_events is None else related_events,
             True,
             event.room_version.msc3931_push_features,
@@ -70,9 +70,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
         """Check for a matching display name in the body of the event."""
         evaluator = self._get_evaluator({"body": "foo bar baz"})
 
-        condition = {
-            "kind": "contains_display_name",
-        }
+        condition = {"kind": "contains_display_name"}
 
         # Blank names are skipped.
         self.assertFalse(evaluator.matches(condition, "@user:test", ""))
@@ -93,7 +91,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
         self.assertTrue(evaluator.matches(condition, "@user:test", "foo bar"))
 
     def _assert_matches(
-        self, condition: JsonDict, content: JsonDict, msg: Optional[str] = None
+        self, condition: JsonDict, content: JsonMapping, msg: Optional[str] = None
     ) -> None:
         evaluator = self._get_evaluator(content)
         self.assertTrue(evaluator.matches(condition, "@user:test", "display_name"), msg)
@@ -287,7 +285,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
         This tests the behaviour of tweaks_for_actions.
         """
 
-        actions = [
+        actions: List[Union[Dict[str, str], str]] = [
             {"set_tweak": "sound", "value": "default"},
             {"set_tweak": "highlight"},
             "notify",
@@ -298,7 +296,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
             {"sound": "default", "highlight": True},
         )
 
-    def test_related_event_match(self):
+    def test_related_event_match(self) -> None:
         evaluator = self._get_evaluator(
             {
                 "m.relates_to": {
@@ -397,7 +395,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
             )
         )
 
-    def test_related_event_match_with_fallback(self):
+    def test_related_event_match_with_fallback(self) -> None:
         evaluator = self._get_evaluator(
             {
                 "m.relates_to": {
@@ -469,7 +467,7 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
             )
         )
 
-    def test_related_event_match_no_related_event(self):
+    def test_related_event_match_no_related_event(self) -> None:
         evaluator = self._get_evaluator(
             {"msgtype": "m.text", "body": "Message without related event"}
         )
@@ -518,7 +516,9 @@ class TestBulkPushRuleEvaluator(unittest.HomeserverTestCase):
         room.register_servlets,
     ]
 
-    def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer):
+    def prepare(
+        self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
+    ) -> None:
         # Define an application service so that we can register appservice users
         self._service_token = "some_token"
         self._service = ApplicationService(
diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py
index 1e299d2d67..555922409d 100644
--- a/tests/replication/tcp/test_handler.py
+++ b/tests/replication/tcp/test_handler.py
@@ -12,6 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
+
+from synapse.replication.tcp.commands import PositionCommand, RdataCommand
+
 from tests.replication._base import BaseMultiWorkerStreamTestCase
 
 
@@ -71,3 +75,77 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
         self.assertEqual(
             len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1
         )
+
+    def test_wait_for_stream_position(self) -> None:
+        """Check that wait for stream position correctly waits for an update from the
+        correct instance.
+        """
+        store = self.hs.get_datastores().main
+        cmd_handler = self.hs.get_replication_command_handler()
+        data_handler = self.hs.get_replication_data_handler()
+
+        worker1 = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            extra_config={
+                "worker_name": "worker1",
+                "run_background_tasks_on": "worker1",
+                "redis": {"enabled": True},
+            },
+        )
+
+        cache_id_gen = worker1.get_datastores().main._cache_id_gen
+        assert cache_id_gen is not None
+
+        self.replicate()
+
+        # First, make sure the master knows that `worker1` exists.
+        initial_token = cache_id_gen.get_current_token()
+        cmd_handler.send_command(
+            PositionCommand("caches", "worker1", initial_token, initial_token)
+        )
+        self.replicate()
+
+        # Next send out a normal RDATA, and check that waiting for that stream
+        # ID returns immediately.
+        ctx = cache_id_gen.get_next()
+        next_token = self.get_success(ctx.__aenter__())
+        self.get_success(ctx.__aexit__(None, None, None))
+
+        cmd_handler.send_command(
+            RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
+        )
+        self.replicate()
+
+        self.get_success(
+            data_handler.wait_for_stream_position("worker1", "caches", next_token)
+        )
+
+        # `wait_for_stream_position` should only return once master receives an
+        # RDATA from the worker
+        ctx = cache_id_gen.get_next()
+        next_token = self.get_success(ctx.__aenter__())
+        self.get_success(ctx.__aexit__(None, None, None))
+
+        d = defer.ensureDeferred(
+            data_handler.wait_for_stream_position("worker1", "caches", next_token)
+        )
+        self.assertFalse(d.called)
+
+        # ... updating the cache ID gen on the master still shouldn't cause the
+        # deferred to wake up.
+        ctx = store._cache_id_gen.get_next()
+        self.get_success(ctx.__aenter__())
+        self.get_success(ctx.__aexit__(None, None, None))
+
+        d = defer.ensureDeferred(
+            data_handler.wait_for_stream_position("worker1", "caches", next_token)
+        )
+        self.assertFalse(d.called)
+
+        # ... but receiving the RDATA should
+        cmd_handler.send_command(
+            RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
+        )
+        self.replicate()
+
+        self.assertTrue(d.called)
diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py
index b86f341ff5..c8a6911d5e 100644
--- a/tests/rest/client/test_relations.py
+++ b/tests/rest/client/test_relations.py
@@ -30,6 +30,7 @@ from tests import unittest
 from tests.server import FakeChannel
 from tests.test_utils import make_awaitable
 from tests.test_utils.event_injection import inject_event
+from tests.unittest import override_config
 
 
 class BaseRelationsTestCase(unittest.HomeserverTestCase):
@@ -355,30 +356,67 @@ class RelationsTestCase(BaseRelationsTestCase):
         self.assertEqual(200, channel.code, channel.json_body)
         self.assertNotIn("m.relations", channel.json_body["unsigned"])
 
+    def _assert_edit_bundle(
+        self, event_json: JsonDict, edit_event_id: str, edit_event_content: JsonDict
+    ) -> None:
+        """
+        Assert that the given event has a correctly-serialised edit event in its
+        bundled aggregations
+
+        Args:
+            event_json: the serialised event to be checked
+            edit_event_id: the ID of the edit event that we expect to be bundled
+            edit_event_content: the content of that event, excluding the 'm.relates_to`
+               property
+        """
+        relations_dict = event_json["unsigned"].get("m.relations")
+        self.assertIn(RelationTypes.REPLACE, relations_dict)
+
+        m_replace_dict = relations_dict[RelationTypes.REPLACE]
+        for key in [
+            "event_id",
+            "sender",
+            "origin_server_ts",
+            "content",
+            "type",
+            "unsigned",
+        ]:
+            self.assertIn(key, m_replace_dict)
+
+        expected_edit_content = {
+            "m.relates_to": {
+                "event_id": event_json["event_id"],
+                "rel_type": "m.replace",
+            }
+        }
+        expected_edit_content.update(edit_event_content)
+
+        self.assert_dict(
+            {
+                "event_id": edit_event_id,
+                "sender": self.user_id,
+                "content": expected_edit_content,
+                "type": "m.room.message",
+            },
+            m_replace_dict,
+        )
+
     def test_edit(self) -> None:
         """Test that a simple edit works."""
 
         new_body = {"msgtype": "m.text", "body": "I've been edited!"}
+        edit_event_content = {
+            "msgtype": "m.text",
+            "body": "foo",
+            "m.new_content": new_body,
+        }
         channel = self._send_relation(
             RelationTypes.REPLACE,
             "m.room.message",
-            content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
+            content=edit_event_content,
         )
         edit_event_id = channel.json_body["event_id"]
 
-        def assert_bundle(event_json: JsonDict) -> None:
-            """Assert the expected values of the bundled aggregations."""
-            relations_dict = event_json["unsigned"].get("m.relations")
-            self.assertIn(RelationTypes.REPLACE, relations_dict)
-
-            m_replace_dict = relations_dict[RelationTypes.REPLACE]
-            for key in ["event_id", "sender", "origin_server_ts"]:
-                self.assertIn(key, m_replace_dict)
-
-            self.assert_dict(
-                {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
-            )
-
         # /event should return the *original* event
         channel = self.make_request(
             "GET",
@@ -389,7 +427,7 @@ class RelationsTestCase(BaseRelationsTestCase):
         self.assertEqual(
             channel.json_body["content"], {"body": "Hi!", "msgtype": "m.text"}
         )
-        assert_bundle(channel.json_body)
+        self._assert_edit_bundle(channel.json_body, edit_event_id, edit_event_content)
 
         # Request the room messages.
         channel = self.make_request(
@@ -398,7 +436,11 @@ class RelationsTestCase(BaseRelationsTestCase):
             access_token=self.user_token,
         )
         self.assertEqual(200, channel.code, channel.json_body)
-        assert_bundle(self._find_event_in_chunk(channel.json_body["chunk"]))
+        self._assert_edit_bundle(
+            self._find_event_in_chunk(channel.json_body["chunk"]),
+            edit_event_id,
+            edit_event_content,
+        )
 
         # Request the room context.
         # /context should return the edited event.
@@ -408,7 +450,9 @@ class RelationsTestCase(BaseRelationsTestCase):
             access_token=self.user_token,
         )
         self.assertEqual(200, channel.code, channel.json_body)
-        assert_bundle(channel.json_body["event"])
+        self._assert_edit_bundle(
+            channel.json_body["event"], edit_event_id, edit_event_content
+        )
         self.assertEqual(channel.json_body["event"]["content"], new_body)
 
         # Request sync, but limit the timeline so it becomes limited (and includes
@@ -420,7 +464,11 @@ class RelationsTestCase(BaseRelationsTestCase):
         self.assertEqual(200, channel.code, channel.json_body)
         room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"]
         self.assertTrue(room_timeline["limited"])
-        assert_bundle(self._find_event_in_chunk(room_timeline["events"]))
+        self._assert_edit_bundle(
+            self._find_event_in_chunk(room_timeline["events"]),
+            edit_event_id,
+            edit_event_content,
+        )
 
         # Request search.
         channel = self.make_request(
@@ -437,7 +485,45 @@ class RelationsTestCase(BaseRelationsTestCase):
                 "results"
             ]
         ]
-        assert_bundle(self._find_event_in_chunk(chunk))
+        self._assert_edit_bundle(
+            self._find_event_in_chunk(chunk),
+            edit_event_id,
+            edit_event_content,
+        )
+
+    @override_config({"experimental_features": {"msc3925_inhibit_edit": True}})
+    def test_edit_inhibit_replace(self) -> None:
+        """
+        If msc3925_inhibit_edit is enabled, then the original event should not be
+        replaced.
+        """
+
+        new_body = {"msgtype": "m.text", "body": "I've been edited!"}
+        edit_event_content = {
+            "msgtype": "m.text",
+            "body": "foo",
+            "m.new_content": new_body,
+        }
+        channel = self._send_relation(
+            RelationTypes.REPLACE,
+            "m.room.message",
+            content=edit_event_content,
+        )
+        edit_event_id = channel.json_body["event_id"]
+
+        # /context should return the *original* event.
+        channel = self.make_request(
+            "GET",
+            f"/rooms/{self.room}/context/{self.parent_id}",
+            access_token=self.user_token,
+        )
+        self.assertEqual(200, channel.code, channel.json_body)
+        self.assertEqual(
+            channel.json_body["event"]["content"], {"body": "Hi!", "msgtype": "m.text"}
+        )
+        self._assert_edit_bundle(
+            channel.json_body["event"], edit_event_id, edit_event_content
+        )
 
     def test_multi_edit(self) -> None:
         """Test that multiple edits, including attempts by people who
@@ -455,10 +541,15 @@ class RelationsTestCase(BaseRelationsTestCase):
         )
 
         new_body = {"msgtype": "m.text", "body": "I've been edited!"}
+        edit_event_content = {
+            "msgtype": "m.text",
+            "body": "foo",
+            "m.new_content": new_body,
+        }
         channel = self._send_relation(
             RelationTypes.REPLACE,
             "m.room.message",
-            content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
+            content=edit_event_content,
         )
         edit_event_id = channel.json_body["event_id"]
 
@@ -480,16 +571,8 @@ class RelationsTestCase(BaseRelationsTestCase):
         self.assertEqual(200, channel.code, channel.json_body)
 
         self.assertEqual(channel.json_body["event"]["content"], new_body)
-
-        relations_dict = channel.json_body["event"]["unsigned"].get("m.relations")
-        self.assertIn(RelationTypes.REPLACE, relations_dict)
-
-        m_replace_dict = relations_dict[RelationTypes.REPLACE]
-        for key in ["event_id", "sender", "origin_server_ts"]:
-            self.assertIn(key, m_replace_dict)
-
-        self.assert_dict(
-            {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
+        self._assert_edit_bundle(
+            channel.json_body["event"], edit_event_id, edit_event_content
         )
 
     def test_edit_reply(self) -> None:
@@ -502,11 +585,15 @@ class RelationsTestCase(BaseRelationsTestCase):
         )
         reply = channel.json_body["event_id"]
 
-        new_body = {"msgtype": "m.text", "body": "I've been edited!"}
+        edit_event_content = {
+            "msgtype": "m.text",
+            "body": "foo",
+            "m.new_content": {"msgtype": "m.text", "body": "I've been edited!"},
+        }
         channel = self._send_relation(
             RelationTypes.REPLACE,
             "m.room.message",
-            content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
+            content=edit_event_content,
             parent_id=reply,
         )
         edit_event_id = channel.json_body["event_id"]
@@ -549,28 +636,22 @@ class RelationsTestCase(BaseRelationsTestCase):
 
             # We expect that the edit relation appears in the unsigned relations
             # section.
-            relations_dict = result_event_dict["unsigned"].get("m.relations")
-            self.assertIn(RelationTypes.REPLACE, relations_dict, desc)
-
-            m_replace_dict = relations_dict[RelationTypes.REPLACE]
-            for key in ["event_id", "sender", "origin_server_ts"]:
-                self.assertIn(key, m_replace_dict, desc)
-
-            self.assert_dict(
-                {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
+            self._assert_edit_bundle(
+                result_event_dict, edit_event_id, edit_event_content
             )
 
     def test_edit_edit(self) -> None:
         """Test that an edit cannot be edited."""
         new_body = {"msgtype": "m.text", "body": "Initial edit"}
+        edit_event_content = {
+            "msgtype": "m.text",
+            "body": "Wibble",
+            "m.new_content": new_body,
+        }
         channel = self._send_relation(
             RelationTypes.REPLACE,
             "m.room.message",
-            content={
-                "msgtype": "m.text",
-                "body": "Wibble",
-                "m.new_content": new_body,
-            },
+            content=edit_event_content,
         )
         edit_event_id = channel.json_body["event_id"]
 
@@ -599,8 +680,7 @@ class RelationsTestCase(BaseRelationsTestCase):
         )
 
         # The relations information should not include the edit to the edit.
-        relations_dict = channel.json_body["unsigned"].get("m.relations")
-        self.assertIn(RelationTypes.REPLACE, relations_dict)
+        self._assert_edit_bundle(channel.json_body, edit_event_id, edit_event_content)
 
         # /context should return the event updated for the *first* edit
         # (The edit to the edit should be ignored.)
@@ -611,13 +691,8 @@ class RelationsTestCase(BaseRelationsTestCase):
         )
         self.assertEqual(200, channel.code, channel.json_body)
         self.assertEqual(channel.json_body["event"]["content"], new_body)
-
-        m_replace_dict = relations_dict[RelationTypes.REPLACE]
-        for key in ["event_id", "sender", "origin_server_ts"]:
-            self.assertIn(key, m_replace_dict)
-
-        self.assert_dict(
-            {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
+        self._assert_edit_bundle(
+            channel.json_body["event"], edit_event_id, edit_event_content
         )
 
         # Directly requesting the edit should not have the edit to the edit applied.
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index 5fa8bd2d98..76c06a9d1e 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -154,7 +154,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         # Create a user to receive notifications and send receipts.
         user_id, token, _, other_token, room_id = self._create_users_and_room()
 
-        last_event_id: str
+        last_event_id = ""
 
         def _assert_counts(notif_count: int, highlight_count: int) -> None:
             counts = self.get_success(
@@ -289,7 +289,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         user_id, token, _, other_token, room_id = self._create_users_and_room()
         thread_id: str
 
-        last_event_id: str
+        last_event_id = ""
 
         def _assert_counts(
             notif_count: int,
@@ -471,7 +471,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         user_id, token, _, other_token, room_id = self._create_users_and_room()
         thread_id: str
 
-        last_event_id: str
+        last_event_id = ""
 
         def _assert_counts(
             notif_count: int,
diff --git a/tests/util/test_ratelimitutils.py b/tests/util/test_ratelimitutils.py
index 5b327b390e..fe4961dcf3 100644
--- a/tests/util/test_ratelimitutils.py
+++ b/tests/util/test_ratelimitutils.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 from typing import Optional
 
+from twisted.internet import defer
 from twisted.internet.defer import Deferred
 
 from synapse.config.homeserver import HomeServerConfig
@@ -57,6 +58,7 @@ class FederationRateLimiterTestCase(TestCase):
 
             # ... until we complete an earlier request
             cm2.__exit__(None, None, None)
+            reactor.advance(0.0)
             self.successResultOf(d3)
 
     def test_sleep_limit(self) -> None:
@@ -81,6 +83,43 @@ class FederationRateLimiterTestCase(TestCase):
             sleep_time = _await_resolution(reactor, d3)
             self.assertAlmostEqual(sleep_time, 500, places=3)
 
+    def test_lots_of_queued_things(self) -> None:
+        """Tests lots of synchronous things queued up behind a slow thing.
+
+        The stack should *not* explode when the slow thing completes.
+        """
+        reactor, clock = get_clock()
+        rc_config = build_rc_config(
+            {
+                "rc_federation": {
+                    "sleep_limit": 1000000000,  # never sleep
+                    "reject_limit": 1000000000,  # never reject requests
+                    "concurrent": 1,
+                }
+            }
+        )
+        ratelimiter = FederationRateLimiter(clock, rc_config)
+
+        with ratelimiter.ratelimit("testhost") as d:
+            # shouldn't block
+            self.successResultOf(d)
+
+            async def task() -> None:
+                with ratelimiter.ratelimit("testhost") as d:
+                    await d
+
+            for _ in range(1, 100):
+                defer.ensureDeferred(task())
+
+            last_task = defer.ensureDeferred(task())
+
+            # Upon exiting the context manager, all the synchronous things will resume.
+            # If a stack overflow occurs, the final task will not complete.
+
+        # Wait for all the things to complete.
+        reactor.advance(0.0)
+        self.successResultOf(last_task)
+
 
 def _await_resolution(reactor: ThreadedMemoryReactorClock, d: Deferred) -> float:
     """advance the clock until the deferred completes.