summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/latest_deps.yml3
-rw-r--r--.rustfmt.toml1
-rw-r--r--changelog.d/13667.feature1
-rw-r--r--changelog.d/13722.feature1
-rw-r--r--changelog.d/13768.misc1
-rw-r--r--changelog.d/13772.doc1
-rw-r--r--changelog.d/13792.misc1
-rw-r--r--changelog.d/13799.feature1
-rw-r--r--changelog.d/13809.misc1
-rw-r--r--changelog.d/13831.feature1
-rw-r--r--changelog.d/13832.feature1
-rw-r--r--changelog.d/13836.doc1
-rw-r--r--changelog.d/13840.bugfix1
-rw-r--r--changelog.d/13843.removal1
-rw-r--r--changelog.d/13850.misc1
-rw-r--r--changelog.d/13859.misc1
-rw-r--r--changelog.d/13860.feature1
-rw-r--r--changelog.d/13870.doc1
-rw-r--r--contrib/workers-bash-scripts/create-multiple-generic-workers.md4
-rw-r--r--docs/admin_api/register_api.md2
-rw-r--r--docs/sso_mapping_providers.md12
-rw-r--r--rust/Cargo.toml10
-rw-r--r--rust/src/lib.rs9
-rw-r--r--rust/src/push/base_rules.rs335
-rw-r--r--rust/src/push/mod.rs502
-rwxr-xr-xscripts-dev/make_full_schema.sh166
-rw-r--r--scripts-dev/mypy_synapse_plugin.py4
-rw-r--r--stubs/synapse/synapse_rust/__init__.pyi (renamed from stubs/synapse/synapse_rust.pyi)0
-rw-r--r--stubs/synapse/synapse_rust/push.pyi37
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py1
-rwxr-xr-xsynapse/_scripts/update_synapse_database.py14
-rw-r--r--synapse/config/experimental.py10
-rw-r--r--synapse/handlers/auth.py34
-rw-r--r--synapse/handlers/push_rules.py5
-rw-r--r--synapse/handlers/register.py4
-rw-r--r--synapse/handlers/sso.py3
-rw-r--r--synapse/module_api/__init__.py46
-rw-r--r--synapse/push/__init__.py4
-rw-r--r--synapse/push/baserules.py583
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/push/clientformat.py5
-rw-r--r--synapse/push/pusherpool.py85
-rw-r--r--synapse/replication/tcp/client.py10
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/admin/users.py4
-rw-r--r--synapse/rest/client/account.py19
-rw-r--r--synapse/rest/client/login_token_request.py94
-rw-r--r--synapse/rest/client/pusher.py21
-rw-r--r--synapse/rest/client/versions.py4
-rw-r--r--synapse/storage/_base.py23
-rw-r--r--synapse/storage/background_updates.py5
-rw-r--r--synapse/storage/databases/main/cache.py20
-rw-r--r--synapse/storage/databases/main/push_rule.py23
-rw-r--r--synapse/storage/databases/main/pusher.py140
-rw-r--r--synapse/storage/databases/main/relations.py38
-rw-r--r--synapse/storage/databases/main/stream.py6
-rw-r--r--synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql16
-rw-r--r--synapse/storage/schema/main/delta/73/03pusher_device_id.sql20
-rw-r--r--synapse/util/caches/descriptors.py14
-rw-r--r--tests/handlers/test_deactivate_account.py27
-rw-r--r--tests/push/test_email.py4
-rw-r--r--tests/push/test_http.py193
-rw-r--r--tests/replication/test_module_cache_invalidation.py79
-rw-r--r--tests/replication/test_pusher_shard.py2
-rw-r--r--tests/rest/admin/test_user.py2
-rw-r--r--tests/rest/client/test_login_token_request.py132
-rw-r--r--tests/rest/client/test_relations.py29
-rw-r--r--tests/unittest.py36
68 files changed, 2007 insertions, 858 deletions
diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml
index 8366ac9393..9a708286a4 100644
--- a/.github/workflows/latest_deps.yml
+++ b/.github/workflows/latest_deps.yml
@@ -201,10 +201,11 @@ jobs:
   open-issue:
     if: "failure() && github.event_name != 'push' && github.event_name != 'pull_request'"
     needs:
-      # TODO: should mypy be included here? It feels more brittle than the other two.
+      # TODO: should mypy be included here? It feels more brittle than the others.
       - mypy
       - trial
       - sytest
+      - complement
 
     runs-on: ubuntu-latest
 
diff --git a/.rustfmt.toml b/.rustfmt.toml
new file mode 100644
index 0000000000..bf96e7743d
--- /dev/null
+++ b/.rustfmt.toml
@@ -0,0 +1 @@
+group_imports = "StdExternalCrate"
diff --git a/changelog.d/13667.feature b/changelog.d/13667.feature
new file mode 100644
index 0000000000..a0b3cfe18c
--- /dev/null
+++ b/changelog.d/13667.feature
@@ -0,0 +1 @@
+Add cache invalidation across workers to module API.
diff --git a/changelog.d/13722.feature b/changelog.d/13722.feature
new file mode 100644
index 0000000000..588d143c0f
--- /dev/null
+++ b/changelog.d/13722.feature
@@ -0,0 +1 @@
+Experimental implementation of MSC3882 to allow an existing device/session to generate a login token for use on a new device/session.
diff --git a/changelog.d/13768.misc b/changelog.d/13768.misc
new file mode 100644
index 0000000000..28bddb7059
--- /dev/null
+++ b/changelog.d/13768.misc
@@ -0,0 +1 @@
+Port push rules to using Rust.
diff --git a/changelog.d/13772.doc b/changelog.d/13772.doc
new file mode 100644
index 0000000000..3398ff3765
--- /dev/null
+++ b/changelog.d/13772.doc
@@ -0,0 +1 @@
+Add `worker_main_http_uri` for the worker generator bash script.
diff --git a/changelog.d/13792.misc b/changelog.d/13792.misc
new file mode 100644
index 0000000000..36ac91400a
--- /dev/null
+++ b/changelog.d/13792.misc
@@ -0,0 +1 @@
+Update the script which makes full schema dumps.
diff --git a/changelog.d/13799.feature b/changelog.d/13799.feature
new file mode 100644
index 0000000000..6c8e5cffe2
--- /dev/null
+++ b/changelog.d/13799.feature
@@ -0,0 +1 @@
+Add experimental support for [MSC3881: Remotely toggle push notifications for another client](https://github.com/matrix-org/matrix-spec-proposals/pull/3881).
diff --git a/changelog.d/13809.misc b/changelog.d/13809.misc
new file mode 100644
index 0000000000..c2dacca2f2
--- /dev/null
+++ b/changelog.d/13809.misc
@@ -0,0 +1 @@
+Improve the `synapse.api.auth.Auth` mock used in unit tests.
diff --git a/changelog.d/13831.feature b/changelog.d/13831.feature
new file mode 100644
index 0000000000..6c8e5cffe2
--- /dev/null
+++ b/changelog.d/13831.feature
@@ -0,0 +1 @@
+Add experimental support for [MSC3881: Remotely toggle push notifications for another client](https://github.com/matrix-org/matrix-spec-proposals/pull/3881).
diff --git a/changelog.d/13832.feature b/changelog.d/13832.feature
new file mode 100644
index 0000000000..1dc1d66efe
--- /dev/null
+++ b/changelog.d/13832.feature
@@ -0,0 +1 @@
+Improve validation for the unspecced, internal-only `_matrix/client/unstable/add_threepid/msisdn/submit_token` endpoint.
diff --git a/changelog.d/13836.doc b/changelog.d/13836.doc
new file mode 100644
index 0000000000..f2edab00f4
--- /dev/null
+++ b/changelog.d/13836.doc
@@ -0,0 +1 @@
+Fix a mistake in sso_mapping_providers.md: `map_user_attributes` is expected to return `display_name` not `displayname`.
diff --git a/changelog.d/13840.bugfix b/changelog.d/13840.bugfix
new file mode 100644
index 0000000000..0f014439a8
--- /dev/null
+++ b/changelog.d/13840.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse v1.53.0 where the experimental implementation of [MSC3715](https://github.com/matrix-org/matrix-spec-proposals/pull/3715) would give incorrect results when paginating forward.
diff --git a/changelog.d/13843.removal b/changelog.d/13843.removal
new file mode 100644
index 0000000000..f6caaa8895
--- /dev/null
+++ b/changelog.d/13843.removal
@@ -0,0 +1 @@
+Remove the `complete_sso_login` method from the Module API which was deprecated in Synapse 1.13.0.
diff --git a/changelog.d/13850.misc b/changelog.d/13850.misc
new file mode 100644
index 0000000000..a973118aaf
--- /dev/null
+++ b/changelog.d/13850.misc
@@ -0,0 +1 @@
+Fix the release script not publishing binary wheels.
\ No newline at end of file
diff --git a/changelog.d/13859.misc b/changelog.d/13859.misc
new file mode 100644
index 0000000000..2780a4af3c
--- /dev/null
+++ b/changelog.d/13859.misc
@@ -0,0 +1 @@
+Raise issue if complement fails with latest deps.
diff --git a/changelog.d/13860.feature b/changelog.d/13860.feature
new file mode 100644
index 0000000000..6c8e5cffe2
--- /dev/null
+++ b/changelog.d/13860.feature
@@ -0,0 +1 @@
+Add experimental support for [MSC3881: Remotely toggle push notifications for another client](https://github.com/matrix-org/matrix-spec-proposals/pull/3881).
diff --git a/changelog.d/13870.doc b/changelog.d/13870.doc
new file mode 100644
index 0000000000..2598bc270c
--- /dev/null
+++ b/changelog.d/13870.doc
@@ -0,0 +1 @@
+Fix a cross-link from the register admin API to the `registration_shared_secret` configuration documentation.
diff --git a/contrib/workers-bash-scripts/create-multiple-generic-workers.md b/contrib/workers-bash-scripts/create-multiple-generic-workers.md
index d303101429..c9be707b3c 100644
--- a/contrib/workers-bash-scripts/create-multiple-generic-workers.md
+++ b/contrib/workers-bash-scripts/create-multiple-generic-workers.md
@@ -7,7 +7,7 @@ You can alternatively create multiple worker configuration files with a simple `
 #!/bin/bash
 for i in {1..5}
 do
-cat << EOF >> generic_worker$i.yaml
+cat << EOF > generic_worker$i.yaml
 worker_app: synapse.app.generic_worker
 worker_name: generic_worker$i
 
@@ -15,6 +15,8 @@ worker_name: generic_worker$i
 worker_replication_host: 127.0.0.1
 worker_replication_http_port: 9093
 
+worker_main_http_uri: http://localhost:8008/
+
 worker_listeners:
   - type: http
     port: 808$i
diff --git a/docs/admin_api/register_api.md b/docs/admin_api/register_api.md
index f6be31b443..dd2830f3a1 100644
--- a/docs/admin_api/register_api.md
+++ b/docs/admin_api/register_api.md
@@ -5,7 +5,7 @@ non-interactive way. This is generally used for bootstrapping a Synapse
 instance with administrator accounts.
 
 To authenticate yourself to the server, you will need both the shared secret
-([`registration_shared_secret`](../configuration/config_documentation.md#registration_shared_secret)
+([`registration_shared_secret`](../usage/configuration/config_documentation.md#registration_shared_secret)
 in the homeserver configuration), and a one-time nonce. If the registration
 shared secret is not configured, this API is not enabled.
 
diff --git a/docs/sso_mapping_providers.md b/docs/sso_mapping_providers.md
index 817499149f..9f5e5fbbe1 100644
--- a/docs/sso_mapping_providers.md
+++ b/docs/sso_mapping_providers.md
@@ -73,8 +73,8 @@ A custom mapping provider must specify the following methods:
 * `async def map_user_attributes(self, userinfo, token, failures)`
     - This method must be async.
     - Arguments:
-      - `userinfo` - A `authlib.oidc.core.claims.UserInfo` object to extract user
-                     information from.
+      - `userinfo` - An [`authlib.oidc.core.claims.UserInfo`](https://docs.authlib.org/en/latest/specs/oidc.html#authlib.oidc.core.UserInfo)
+                     object to extract user information from.
       - `token` - A dictionary which includes information necessary to make
                   further requests to the OpenID provider.
       - `failures` - An `int` that represents the amount of times the returned
@@ -91,7 +91,13 @@ A custom mapping provider must specify the following methods:
         `None`, the user is prompted to pick their own username. This is only used
         during a user's first login. Once a localpart has been associated with a
         remote user ID (see `get_remote_user_id`) it cannot be updated.
-      - `displayname`: An optional string, the display name for the user.
+      - `confirm_localpart`: A boolean. If set to `True`, when a `localpart`
+        string is returned from this method, Synapse will prompt the user to
+        either accept this localpart or pick their own username. Otherwise this
+        option has no effect. If omitted, defaults to `False`.
+      - `display_name`: An optional string, the display name for the user.
+      - `emails`: A list of strings, the email address(es) to associate with
+        this user. If omitted, defaults to an empty list.
 * `async def get_extra_attributes(self, userinfo, token)`
     - This method must be async.
     - Arguments:
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 657f78c0b1..44263bf77e 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -18,7 +18,15 @@ crate-type = ["cdylib"]
 name = "synapse.synapse_rust"
 
 [dependencies]
-pyo3 = { version = "0.16.5", features = ["extension-module", "macros", "abi3", "abi3-py37"] }
+anyhow = "1.0.63"
+lazy_static = "1.4.0"
+log = "0.4.17"
+pyo3 = { version = "0.17.1", features = ["extension-module", "macros", "anyhow", "abi3", "abi3-py37"] }
+pyo3-log = "0.7.0"
+pythonize = "0.17.0"
+regex = "1.6.0"
+serde = { version = "1.0.144", features = ["derive"] }
+serde_json = "1.0.85"
 
 [build-dependencies]
 blake2 = "0.10.4"
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index ba42465fb8..c7b60e58a7 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -1,5 +1,7 @@
 use pyo3::prelude::*;
 
+pub mod push;
+
 /// Returns the hash of all the rust source files at the time it was compiled.
 ///
 /// Used by python to detect if the rust library is outdated.
@@ -17,8 +19,13 @@ fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
 
 /// The entry point for defining the Python module.
 #[pymodule]
-fn synapse_rust(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
+fn synapse_rust(py: Python<'_>, m: &PyModule) -> PyResult<()> {
+    pyo3_log::init();
+
     m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
     m.add_function(wrap_pyfunction!(get_rust_file_digest, m)?)?;
+
+    push::register_module(py, m)?;
+
     Ok(())
 }
diff --git a/rust/src/push/base_rules.rs b/rust/src/push/base_rules.rs
new file mode 100644
index 0000000000..7c62bc4849
--- /dev/null
+++ b/rust/src/push/base_rules.rs
@@ -0,0 +1,335 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Contains the definitions of the "base" push rules.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+
+use lazy_static::lazy_static;
+use serde_json::Value;
+
+use super::KnownCondition;
+use crate::push::Action;
+use crate::push::Condition;
+use crate::push::EventMatchCondition;
+use crate::push::PushRule;
+use crate::push::SetTweak;
+use crate::push::TweakValue;
+
+const HIGHLIGHT_ACTION: Action = Action::SetTweak(SetTweak {
+    set_tweak: Cow::Borrowed("highlight"),
+    value: None,
+    other_keys: Value::Null,
+});
+
+const HIGHLIGHT_FALSE_ACTION: Action = Action::SetTweak(SetTweak {
+    set_tweak: Cow::Borrowed("highlight"),
+    value: Some(TweakValue::Other(Value::Bool(false))),
+    other_keys: Value::Null,
+});
+
+const SOUND_ACTION: Action = Action::SetTweak(SetTweak {
+    set_tweak: Cow::Borrowed("sound"),
+    value: Some(TweakValue::String(Cow::Borrowed("default"))),
+    other_keys: Value::Null,
+});
+
+const RING_ACTION: Action = Action::SetTweak(SetTweak {
+    set_tweak: Cow::Borrowed("sound"),
+    value: Some(TweakValue::String(Cow::Borrowed("ring"))),
+    other_keys: Value::Null,
+});
+
+pub const BASE_PREPEND_OVERRIDE_RULES: &[PushRule] = &[PushRule {
+    rule_id: Cow::Borrowed("global/override/.m.rule.master"),
+    priority_class: 5,
+    conditions: Cow::Borrowed(&[]),
+    actions: Cow::Borrowed(&[Action::DontNotify]),
+    default: true,
+    default_enabled: false,
+}];
+
+pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.suppress_notices"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+            EventMatchCondition {
+                key: Cow::Borrowed("content.msgtype"),
+                pattern: Some(Cow::Borrowed("m.notice")),
+                pattern_type: None,
+            },
+        ))]),
+        actions: Cow::Borrowed(&[Action::DontNotify]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.invite_for_me"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.member")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("content.membership"),
+                pattern: Some(Cow::Borrowed("invite")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("state_key"),
+                pattern: None,
+                pattern_type: Some(Cow::Borrowed("user_id")),
+            })),
+        ]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_FALSE_ACTION, SOUND_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.member_event"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+            EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.member")),
+                pattern_type: None,
+            },
+        ))]),
+        actions: Cow::Borrowed(&[Action::DontNotify]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.contains_display_name"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::ContainsDisplayName)]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_ACTION, SOUND_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.roomnotif"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::SenderNotificationPermission {
+                key: Cow::Borrowed("room"),
+            }),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("content.body"),
+                pattern: Some(Cow::Borrowed("@room")),
+                pattern_type: None,
+            })),
+        ]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.tombstone"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.tombstone")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("state_key"),
+                pattern: Some(Cow::Borrowed("")),
+                pattern_type: None,
+            })),
+        ]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.m.rule.reaction"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+            EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.reaction")),
+                pattern_type: None,
+            },
+        ))]),
+        actions: Cow::Borrowed(&[Action::DontNotify]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/override/.org.matrix.msc3786.rule.room.server_acl"),
+        priority_class: 5,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.server_acl")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("state_key"),
+                pattern: Some(Cow::Borrowed("")),
+                pattern_type: None,
+            })),
+        ]),
+        actions: Cow::Borrowed(&[]),
+        default: true,
+        default_enabled: true,
+    },
+];
+
+pub const BASE_APPEND_CONTENT_RULES: &[PushRule] = &[PushRule {
+    rule_id: Cow::Borrowed("global/content/.m.rule.contains_user_name"),
+    priority_class: 4,
+    conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+        EventMatchCondition {
+            key: Cow::Borrowed("content.body"),
+            pattern: None,
+            pattern_type: Some(Cow::Borrowed("user_localpart")),
+        },
+    ))]),
+    actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_ACTION, SOUND_ACTION]),
+    default: true,
+    default_enabled: true,
+}];
+
+pub const BASE_APPEND_UNDERRIDE_RULES: &[PushRule] = &[
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.m.rule.call"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+            EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.call.invite")),
+                pattern_type: None,
+            },
+        ))]),
+        actions: Cow::Borrowed(&[Action::Notify, RING_ACTION, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.m.rule.room_one_to_one"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.message")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::RoomMemberCount {
+                is: Some(Cow::Borrowed("2")),
+            }),
+        ]),
+        actions: Cow::Borrowed(&[Action::Notify, SOUND_ACTION, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.m.rule.encrypted_room_one_to_one"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.encrypted")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::RoomMemberCount {
+                is: Some(Cow::Borrowed("2")),
+            }),
+        ]),
+        actions: Cow::Borrowed(&[Action::Notify, SOUND_ACTION, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.org.matrix.msc3772.thread_reply"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::RelationMatch {
+            rel_type: Cow::Borrowed("m.thread"),
+            sender: None,
+            sender_type: Some(Cow::Borrowed("user_id")),
+        })]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.m.rule.message"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+            EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.message")),
+                pattern_type: None,
+            },
+        ))]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.m.rule.encrypted"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch(
+            EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("m.room.encrypted")),
+                pattern_type: None,
+            },
+        ))]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+    PushRule {
+        rule_id: Cow::Borrowed("global/underride/.im.vector.jitsi"),
+        priority_class: 1,
+        conditions: Cow::Borrowed(&[
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("type"),
+                pattern: Some(Cow::Borrowed("im.vector.modular.widgets")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("content.type"),
+                pattern: Some(Cow::Borrowed("jitsi")),
+                pattern_type: None,
+            })),
+            Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+                key: Cow::Borrowed("state_key"),
+                pattern: Some(Cow::Borrowed("*")),
+                pattern_type: None,
+            })),
+        ]),
+        actions: Cow::Borrowed(&[Action::Notify, HIGHLIGHT_FALSE_ACTION]),
+        default: true,
+        default_enabled: true,
+    },
+];
+
+lazy_static! {
+    pub static ref BASE_RULES_BY_ID: HashMap<&'static str, &'static PushRule> =
+        BASE_PREPEND_OVERRIDE_RULES
+            .iter()
+            .chain(BASE_APPEND_OVERRIDE_RULES.iter())
+            .chain(BASE_APPEND_CONTENT_RULES.iter())
+            .chain(BASE_APPEND_UNDERRIDE_RULES.iter())
+            .map(|rule| { (&*rule.rule_id, rule) })
+            .collect();
+}
diff --git a/rust/src/push/mod.rs b/rust/src/push/mod.rs
new file mode 100644
index 0000000000..de6764e7c5
--- /dev/null
+++ b/rust/src/push/mod.rs
@@ -0,0 +1,502 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! An implementation of Matrix push rules.
+//!
+//! The `Cow<_>` type is used extensively within this module to allow creating
+//! the base rules as constants (in Rust constants can't require explicit
+//! allocation atm).
+//!
+//! ---
+//!
+//! Push rules is the system used to determine which events trigger a push (and a
+//! bump in notification counts).
+//!
+//! This consists of a list of "push rules" for each user, where a push rule is a
+//! pair of "conditions" and "actions". When a user receives an event Synapse
+//! iterates over the list of push rules until it finds one where all the conditions
+//! match the event, at which point "actions" describe the outcome (e.g. notify,
+//! highlight, etc).
+//!
+//! Push rules are split up into 5 different "kinds" (aka "priority classes"), which
+//! are run in order:
+//!     1. Override — highest priority rules, e.g. always ignore notices
+//!     2. Content — content specific rules, e.g. @ notifications
+//!     3. Room — per room rules, e.g. enable/disable notifications for all messages
+//!        in a room
+//!     4. Sender — per sender rules, e.g. never notify for messages from a given
+//!        user
+//!     5. Underride — the lowest priority "default" rules, e.g. notify for every
+//!        message.
+//!
+//! The set of "base rules" are the list of rules that every user has by default. A
+//! user can modify their copy of the push rules in one of three ways:
+//!
+//!     1. Adding a new push rule of a certain kind
+//!     2. Changing the actions of a base rule
+//!     3. Enabling/disabling a base rule.
+//!
+//! The base rules are split into whether they come before or after a particular
+//! kind, so the order of push rule evaluation would be: base rules for before
+//! "override" kind, user defined "override" rules, base rules after "override"
+//! kind, etc, etc.
+
+use std::borrow::Cow;
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+use anyhow::{Context, Error};
+use log::warn;
+use pyo3::prelude::*;
+use pythonize::pythonize;
+use serde::de::Error as _;
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+mod base_rules;
+
+/// Called when registering modules with python.
+pub fn register_module(py: Python<'_>, m: &PyModule) -> PyResult<()> {
+    let child_module = PyModule::new(py, "push")?;
+    child_module.add_class::<PushRule>()?;
+    child_module.add_class::<PushRules>()?;
+    child_module.add_class::<FilteredPushRules>()?;
+    child_module.add_function(wrap_pyfunction!(get_base_rule_ids, m)?)?;
+
+    m.add_submodule(child_module)?;
+
+    // We need to manually add the module to sys.modules to make `from
+    // synapse.synapse_rust import push` work.
+    py.import("sys")?
+        .getattr("modules")?
+        .set_item("synapse.synapse_rust.push", child_module)?;
+
+    Ok(())
+}
+
+#[pyfunction]
+fn get_base_rule_ids() -> HashSet<&'static str> {
+    base_rules::BASE_RULES_BY_ID.keys().copied().collect()
+}
+
+/// A single push rule for a user.
+#[derive(Debug, Clone)]
+#[pyclass(frozen)]
+pub struct PushRule {
+    /// A unique ID for this rule
+    pub rule_id: Cow<'static, str>,
+    /// The "kind" of push rule this is (see `PRIORITY_CLASS_MAP` in Python)
+    #[pyo3(get)]
+    pub priority_class: i32,
+    /// The conditions that must all match for actions to be applied
+    pub conditions: Cow<'static, [Condition]>,
+    /// The actions to apply if all conditions are met
+    pub actions: Cow<'static, [Action]>,
+    /// Whether this is a base rule
+    #[pyo3(get)]
+    pub default: bool,
+    /// Whether this is enabled by default
+    #[pyo3(get)]
+    pub default_enabled: bool,
+}
+
+#[pymethods]
+impl PushRule {
+    #[staticmethod]
+    pub fn from_db(
+        rule_id: String,
+        priority_class: i32,
+        conditions: &str,
+        actions: &str,
+    ) -> Result<PushRule, Error> {
+        let conditions = serde_json::from_str(conditions).context("parsing conditions")?;
+        let actions = serde_json::from_str(actions).context("parsing actions")?;
+
+        Ok(PushRule {
+            rule_id: Cow::Owned(rule_id),
+            priority_class,
+            conditions,
+            actions,
+            default: false,
+            default_enabled: true,
+        })
+    }
+
+    #[getter]
+    fn rule_id(&self) -> &str {
+        &self.rule_id
+    }
+
+    #[getter]
+    fn actions(&self) -> Vec<Action> {
+        self.actions.clone().into_owned()
+    }
+
+    #[getter]
+    fn conditions(&self) -> Vec<Condition> {
+        self.conditions.clone().into_owned()
+    }
+
+    fn __repr__(&self) -> String {
+        format!(
+            "<PushRule rule_id={}, conditions={:?}, actions={:?}>",
+            self.rule_id, self.conditions, self.actions
+        )
+    }
+}
+
+/// The "action" Synapse should perform for a matching push rule.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Action {
+    DontNotify,
+    Notify,
+    Coalesce,
+    SetTweak(SetTweak),
+
+    // An unrecognized custom action.
+    Unknown(Value),
+}
+
+impl IntoPy<PyObject> for Action {
+    fn into_py(self, py: Python<'_>) -> PyObject {
+        // When we pass the `Action` struct to Python we want it to be converted
+        // to a dict. We use `pythonize`, which converts the struct using the
+        // `serde` serialization.
+        pythonize(py, &self).expect("valid action")
+    }
+}
+
+/// The body of a `SetTweak` push action.
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
+pub struct SetTweak {
+    set_tweak: Cow<'static, str>,
+
+    #[serde(skip_serializing_if = "Option::is_none")]
+    value: Option<TweakValue>,
+
+    // This picks up any other fields that may have been added by clients.
+    // These get added when we convert the `Action` to a python object.
+    #[serde(flatten)]
+    other_keys: Value,
+}
+
+/// The value of a `set_tweak`.
+///
+/// We need this (rather than using `TweakValue` directly) so that we can use
+/// `&'static str` in the value when defining the constant base rules.
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
+#[serde(untagged)]
+pub enum TweakValue {
+    String(Cow<'static, str>),
+    Other(Value),
+}
+
+impl Serialize for Action {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: serde::Serializer,
+    {
+        match self {
+            Action::DontNotify => serializer.serialize_str("dont_notify"),
+            Action::Notify => serializer.serialize_str("notify"),
+            Action::Coalesce => serializer.serialize_str("coalesce"),
+            Action::SetTweak(tweak) => tweak.serialize(serializer),
+            Action::Unknown(value) => value.serialize(serializer),
+        }
+    }
+}
+
+/// Simple helper class for deserializing Action from JSON.
+#[derive(Deserialize)]
+#[serde(untagged)]
+enum ActionDeserializeHelper {
+    Str(String),
+    SetTweak(SetTweak),
+    Unknown(Value),
+}
+
+impl<'de> Deserialize<'de> for Action {
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        let helper: ActionDeserializeHelper = Deserialize::deserialize(deserializer)?;
+        match helper {
+            ActionDeserializeHelper::Str(s) => match &*s {
+                "dont_notify" => Ok(Action::DontNotify),
+                "notify" => Ok(Action::Notify),
+                "coalesce" => Ok(Action::Coalesce),
+                _ => Err(D::Error::custom("unrecognized action")),
+            },
+            ActionDeserializeHelper::SetTweak(set_tweak) => Ok(Action::SetTweak(set_tweak)),
+            ActionDeserializeHelper::Unknown(value) => Ok(Action::Unknown(value)),
+        }
+    }
+}
+
+/// A condition used in push rules to match against an event.
+///
+/// We need this split as `serde` doesn't give us the ability to have a
+/// "catchall" variant in tagged enums.
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(untagged)]
+pub enum Condition {
+    /// A recognized condition that we can match against
+    Known(KnownCondition),
+    /// An unrecognized condition that we ignore.
+    Unknown(Value),
+}
+
+/// The set of "known" conditions that we can handle.
+#[derive(Serialize, Deserialize, Debug, Clone)]
+#[serde(rename_all = "snake_case")]
+#[serde(tag = "kind")]
+pub enum KnownCondition {
+    EventMatch(EventMatchCondition),
+    ContainsDisplayName,
+    RoomMemberCount {
+        #[serde(skip_serializing_if = "Option::is_none")]
+        is: Option<Cow<'static, str>>,
+    },
+    SenderNotificationPermission {
+        key: Cow<'static, str>,
+    },
+    #[serde(rename = "org.matrix.msc3772.relation_match")]
+    RelationMatch {
+        rel_type: Cow<'static, str>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        sender: Option<Cow<'static, str>>,
+        #[serde(skip_serializing_if = "Option::is_none")]
+        sender_type: Option<Cow<'static, str>>,
+    },
+}
+
+impl IntoPy<PyObject> for Condition {
+    fn into_py(self, py: Python<'_>) -> PyObject {
+        pythonize(py, &self).expect("valid condition")
+    }
+}
+
+/// The body of a [`Condition::EventMatch`]
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct EventMatchCondition {
+    key: Cow<'static, str>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pattern: Option<Cow<'static, str>>,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pattern_type: Option<Cow<'static, str>>,
+}
+
+/// The collection of push rules for a user.
+#[derive(Debug, Clone, Default)]
+#[pyclass(frozen)]
+struct PushRules {
+    /// Custom push rules that override a base rule.
+    overridden_base_rules: HashMap<Cow<'static, str>, PushRule>,
+
+    /// Custom rules that come between the prepend/append override base rules.
+    override_rules: Vec<PushRule>,
+    /// Custom rules that come before the base content rules.
+    content: Vec<PushRule>,
+    /// Custom rules that come before the base room rules.
+    room: Vec<PushRule>,
+    /// Custom rules that come before the base sender rules.
+    sender: Vec<PushRule>,
+    /// Custom rules that come before the base underride rules.
+    underride: Vec<PushRule>,
+}
+
+#[pymethods]
+impl PushRules {
+    #[new]
+    fn new(rules: Vec<PushRule>) -> PushRules {
+        let mut push_rules: PushRules = Default::default();
+
+        for rule in rules {
+            if let Some(&o) = base_rules::BASE_RULES_BY_ID.get(&*rule.rule_id) {
+                push_rules.overridden_base_rules.insert(
+                    rule.rule_id.clone(),
+                    PushRule {
+                        actions: rule.actions.clone(),
+                        ..o.clone()
+                    },
+                );
+
+                continue;
+            }
+
+            match rule.priority_class {
+                5 => push_rules.override_rules.push(rule),
+                4 => push_rules.content.push(rule),
+                3 => push_rules.room.push(rule),
+                2 => push_rules.sender.push(rule),
+                1 => push_rules.underride.push(rule),
+                _ => {
+                    warn!(
+                        "Unrecognized priority class for rule {}: {}",
+                        rule.rule_id, rule.priority_class
+                    );
+                }
+            }
+        }
+
+        push_rules
+    }
+
+    /// Returns the list of all rules, including base rules, in the order they
+    /// should be executed in.
+    fn rules(&self) -> Vec<PushRule> {
+        self.iter().cloned().collect()
+    }
+}
+
+impl PushRules {
+    /// Iterates over all the rules, including base rules, in the order they
+    /// should be executed in.
+    pub fn iter(&self) -> impl Iterator<Item = &PushRule> {
+        base_rules::BASE_PREPEND_OVERRIDE_RULES
+            .iter()
+            .chain(self.override_rules.iter())
+            .chain(base_rules::BASE_APPEND_OVERRIDE_RULES.iter())
+            .chain(self.content.iter())
+            .chain(base_rules::BASE_APPEND_CONTENT_RULES.iter())
+            .chain(self.room.iter())
+            .chain(self.sender.iter())
+            .chain(self.underride.iter())
+            .chain(base_rules::BASE_APPEND_UNDERRIDE_RULES.iter())
+            .map(|rule| {
+                self.overridden_base_rules
+                    .get(&*rule.rule_id)
+                    .unwrap_or(rule)
+            })
+    }
+}
+
+/// A wrapper around `PushRules` that checks the enabled state of rules and
+/// filters out disabled experimental rules.
+#[derive(Debug, Clone, Default)]
+#[pyclass(frozen)]
+pub struct FilteredPushRules {
+    push_rules: PushRules,
+    enabled_map: BTreeMap<String, bool>,
+    msc3786_enabled: bool,
+    msc3772_enabled: bool,
+}
+
+#[pymethods]
+impl FilteredPushRules {
+    #[new]
+    fn py_new(
+        push_rules: PushRules,
+        enabled_map: BTreeMap<String, bool>,
+        msc3786_enabled: bool,
+        msc3772_enabled: bool,
+    ) -> Self {
+        Self {
+            push_rules,
+            enabled_map,
+            msc3786_enabled,
+            msc3772_enabled,
+        }
+    }
+
+    /// Returns the list of all rules and their enabled state, including base
+    /// rules, in the order they should be executed in.
+    fn rules(&self) -> Vec<(PushRule, bool)> {
+        self.iter().map(|(r, e)| (r.clone(), e)).collect()
+    }
+}
+
+impl FilteredPushRules {
+    /// Iterates over all the rules and their enabled state, including base
+    /// rules, in the order they should be executed in.
+    fn iter(&self) -> impl Iterator<Item = (&PushRule, bool)> {
+        self.push_rules
+            .iter()
+            .filter(|rule| {
+                // Ignore disabled experimental push rules
+                if !self.msc3786_enabled
+                    && rule.rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
+                {
+                    return false;
+                }
+
+                if !self.msc3772_enabled
+                    && rule.rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
+                {
+                    return false;
+                }
+
+                true
+            })
+            .map(|r| {
+                let enabled = *self
+                    .enabled_map
+                    .get(&*r.rule_id)
+                    .unwrap_or(&r.default_enabled);
+                (r, enabled)
+            })
+    }
+}
+
+#[test]
+fn test_serialize_condition() {
+    let condition = Condition::Known(KnownCondition::EventMatch(EventMatchCondition {
+        key: "content.body".into(),
+        pattern: Some("coffee".into()),
+        pattern_type: None,
+    }));
+
+    let json = serde_json::to_string(&condition).unwrap();
+    assert_eq!(
+        json,
+        r#"{"kind":"event_match","key":"content.body","pattern":"coffee"}"#
+    )
+}
+
+#[test]
+fn test_deserialize_condition() {
+    let json = r#"{"kind":"event_match","key":"content.body","pattern":"coffee"}"#;
+
+    let _: Condition = serde_json::from_str(json).unwrap();
+}
+
+#[test]
+fn test_deserialize_custom_condition() {
+    let json = r#"{"kind":"custom_tag"}"#;
+
+    let condition: Condition = serde_json::from_str(json).unwrap();
+    assert!(matches!(condition, Condition::Unknown(_)));
+
+    let new_json = serde_json::to_string(&condition).unwrap();
+    assert_eq!(json, new_json);
+}
+
+#[test]
+fn test_deserialize_action() {
+    let _: Action = serde_json::from_str(r#""notify""#).unwrap();
+    let _: Action = serde_json::from_str(r#""dont_notify""#).unwrap();
+    let _: Action = serde_json::from_str(r#""coalesce""#).unwrap();
+    let _: Action = serde_json::from_str(r#"{"set_tweak": "highlight"}"#).unwrap();
+}
+
+#[test]
+fn test_custom_action() {
+    let json = r#"{"some_custom":"action_fields"}"#;
+
+    let action: Action = serde_json::from_str(json).unwrap();
+    assert!(matches!(action, Action::Unknown(_)));
+
+    let new_json = serde_json::to_string(&action).unwrap();
+    assert_eq!(json, new_json);
+}
diff --git a/scripts-dev/make_full_schema.sh b/scripts-dev/make_full_schema.sh
index 61394360ce..d8cd06ee4f 100755
--- a/scripts-dev/make_full_schema.sh
+++ b/scripts-dev/make_full_schema.sh
@@ -2,23 +2,16 @@
 #
 # This script generates SQL files for creating a brand new Synapse DB with the latest
 # schema, on both SQLite3 and Postgres.
-#
-# It does so by having Synapse generate an up-to-date SQLite DB, then running
-# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
 
 export PGHOST="localhost"
-POSTGRES_DB_NAME="synapse_full_schema.$$"
-
-SQLITE_SCHEMA_FILE="schema.sql.sqlite"
-SQLITE_ROWS_FILE="rows.sql.sqlite"
-POSTGRES_SCHEMA_FILE="full.sql.postgres"
-POSTGRES_ROWS_FILE="rows.sql.postgres"
-
+POSTGRES_MAIN_DB_NAME="synapse_full_schema_main.$$"
+POSTGRES_COMMON_DB_NAME="synapse_full_schema_common.$$"
+POSTGRES_STATE_DB_NAME="synapse_full_schema_state.$$"
 REQUIRED_DEPS=("matrix-synapse" "psycopg2")
 
 usage() {
   echo
-  echo "Usage: $0 -p <postgres_username> -o <path> [-c] [-n] [-h]"
+  echo "Usage: $0 -p <postgres_username> -o <path> [-c] [-n <schema number>] [-h]"
   echo
   echo "-p <postgres_username>"
   echo "  Username to connect to local postgres instance. The password will be requested"
@@ -27,11 +20,16 @@ usage() {
   echo "  CI mode. Prints every command that the script runs."
   echo "-o <path>"
   echo "  Directory to output full schema files to."
+  echo "-n <schema number>"
+  echo "  Schema number for the new snapshot. Used to set the location of files within "
+  echo "  the output directory, mimicking that of synapse/storage/schemas."
+  echo "  Defaults to 9999."
   echo "-h"
   echo "  Display this help text."
 }
 
-while getopts "p:co:h" opt; do
+SCHEMA_NUMBER="9999"
+while getopts "p:co:hn:" opt; do
   case $opt in
     p)
       export PGUSER=$OPTARG
@@ -48,6 +46,9 @@ while getopts "p:co:h" opt; do
       usage
       exit
       ;;
+    n)
+      SCHEMA_NUMBER="$OPTARG"
+      ;;
     \?)
       echo "ERROR: Invalid option: -$OPTARG" >&2
       usage
@@ -95,12 +96,21 @@ cd "$(dirname "$0")/.."
 TMPDIR=$(mktemp -d)
 KEY_FILE=$TMPDIR/test.signing.key # default Synapse signing key path
 SQLITE_CONFIG=$TMPDIR/sqlite.conf
-SQLITE_DB=$TMPDIR/homeserver.db
+SQLITE_MAIN_DB=$TMPDIR/main.db
+SQLITE_STATE_DB=$TMPDIR/state.db
+SQLITE_COMMON_DB=$TMPDIR/common.db
 POSTGRES_CONFIG=$TMPDIR/postgres.conf
 
 # Ensure these files are delete on script exit
-# TODO: the trap should also drop the temp postgres DB
-trap 'rm -rf $TMPDIR' EXIT
+cleanup() {
+  echo "Cleaning up temporary sqlite database and config files..."
+  rm -r "$TMPDIR"
+  echo "Cleaning up temporary Postgres database..."
+  dropdb --if-exists "$POSTGRES_COMMON_DB_NAME"
+  dropdb --if-exists "$POSTGRES_MAIN_DB_NAME"
+  dropdb --if-exists "$POSTGRES_STATE_DB_NAME"
+}
+trap 'cleanup' EXIT
 
 cat > "$SQLITE_CONFIG" <<EOF
 server_name: "test"
@@ -110,10 +120,22 @@ macaroon_secret_key: "abcde"
 
 report_stats: false
 
-database:
-  name: "sqlite3"
-  args:
-    database: "$SQLITE_DB"
+databases:
+  common:
+    name: "sqlite3"
+    data_stores: []
+    args:
+      database: "$SQLITE_COMMON_DB"
+  main:
+    name: "sqlite3"
+    data_stores: ["main"]
+    args:
+      database: "$SQLITE_MAIN_DB"
+  state:
+    name: "sqlite3"
+    data_stores: ["state"]
+    args:
+      database: "$SQLITE_STATE_DB"
 
 # Suppress the key server warning.
 trusted_key_servers: []
@@ -127,13 +149,32 @@ macaroon_secret_key: "abcde"
 
 report_stats: false
 
-database:
-  name: "psycopg2"
-  args:
-    user: "$PGUSER"
-    host: "$PGHOST"
-    password: "$PGPASSWORD"
-    database: "$POSTGRES_DB_NAME"
+databases:
+  common:
+    name: "psycopg2"
+    data_stores: []
+    args:
+      user: "$PGUSER"
+      host: "$PGHOST"
+      password: "$PGPASSWORD"
+      database: "$POSTGRES_COMMON_DB_NAME"
+  main:
+    name: "psycopg2"
+    data_stores: ["main"]
+    args:
+      user: "$PGUSER"
+      host: "$PGHOST"
+      password: "$PGPASSWORD"
+      database: "$POSTGRES_MAIN_DB_NAME"
+  state:
+    name: "psycopg2"
+    data_stores: ["state"]
+    args:
+      user: "$PGUSER"
+      host: "$PGHOST"
+      password: "$PGPASSWORD"
+      database: "$POSTGRES_STATE_DB_NAME"
+
 
 # Suppress the key server warning.
 trusted_key_servers: []
@@ -148,33 +189,76 @@ echo "Running db background jobs..."
 synapse/_scripts/update_synapse_database.py --database-config "$SQLITE_CONFIG" --run-background-updates
 
 # Create the PostgreSQL database.
-echo "Creating postgres database..."
-createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_DB_NAME"
+echo "Creating postgres databases..."
+createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_COMMON_DB_NAME"
+createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_MAIN_DB_NAME"
+createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_STATE_DB_NAME"
 
 echo "Running db background jobs..."
 synapse/_scripts/update_synapse_database.py --database-config "$POSTGRES_CONFIG" --run-background-updates
 
 
-# Delete schema_version, applied_schema_deltas and applied_module_schemas tables
-# Also delete any shadow tables from fts4
 echo "Dropping unwanted db tables..."
-SQL="
+
+# Some common tables are created and updated by Synapse itself and do not belong in the
+# schema.
+DROP_APP_MANAGED_TABLES="
 DROP TABLE schema_version;
+DROP TABLE schema_compat_version;
 DROP TABLE applied_schema_deltas;
 DROP TABLE applied_module_schemas;
 "
-sqlite3 "$SQLITE_DB" <<< "$SQL"
-psql "$POSTGRES_DB_NAME" -w <<< "$SQL"
+# Other common tables are not created by Synapse and do belong in the schema.
+# TODO: we could derive DROP_COMMON_TABLES from the dump of the common-only DB. But
+#       since there's only one table there, I haven't bothered to do so.
+DROP_COMMON_TABLES="$DROP_APP_MANAGED_TABLES
+DROP TABLE background_updates;
+"
+
+sqlite3 "$SQLITE_COMMON_DB" <<< "$DROP_APP_MANAGED_TABLES"
+sqlite3 "$SQLITE_MAIN_DB" <<< "$DROP_COMMON_TABLES"
+sqlite3 "$SQLITE_STATE_DB" <<< "$DROP_COMMON_TABLES"
+psql "$POSTGRES_COMMON_DB_NAME" -w <<< "$DROP_APP_MANAGED_TABLES"
+psql "$POSTGRES_MAIN_DB_NAME" -w <<< "$DROP_COMMON_TABLES"
+psql "$POSTGRES_STATE_DB_NAME" -w <<< "$DROP_COMMON_TABLES"
+
+# For Reasons(TM), SQLite's `.schema` also dumps out "shadow tables", the implementation
+# details behind full text search tables. Omit these from the dumps.
+
+sqlite3 "$SQLITE_MAIN_DB" <<< "
+DROP TABLE event_search_content;
+DROP TABLE event_search_segments;
+DROP TABLE event_search_segdir;
+DROP TABLE event_search_docsize;
+DROP TABLE event_search_stat;
+DROP TABLE user_directory_search_content;
+DROP TABLE user_directory_search_segments;
+DROP TABLE user_directory_search_segdir;
+DROP TABLE user_directory_search_docsize;
+DROP TABLE user_directory_search_stat;
+"
 
-echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_SCHEMA_FILE' and '$OUTPUT_DIR/$SQLITE_ROWS_FILE'..."
-sqlite3 "$SQLITE_DB" ".schema --indent" > "$OUTPUT_DIR/$SQLITE_SCHEMA_FILE"
-sqlite3 "$SQLITE_DB" ".dump --data-only --nosys" > "$OUTPUT_DIR/$SQLITE_ROWS_FILE"
+echo "Dumping SQLite3 schema..."
+
+mkdir -p "$OUTPUT_DIR/"{common,main,state}"/full_schema/$SCHEMA_NUMBER"
+sqlite3 "$SQLITE_COMMON_DB" ".schema --indent"           > "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
+sqlite3 "$SQLITE_COMMON_DB" ".dump --data-only --nosys" >> "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
+sqlite3 "$SQLITE_MAIN_DB"   ".schema --indent"           > "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
+sqlite3 "$SQLITE_MAIN_DB"   ".dump --data-only --nosys" >> "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
+sqlite3 "$SQLITE_STATE_DB"  ".schema --indent"           > "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
+sqlite3 "$SQLITE_STATE_DB"  ".dump --data-only --nosys" >> "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.sqlite"
+
+cleanup_pg_schema() {
+   sed -e '/^$/d' -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d'
+}
 
-echo "Dumping Postgres schema to '$OUTPUT_DIR/$POSTGRES_SCHEMA_FILE' and '$OUTPUT_DIR/$POSTGRES_ROWS_FILE'..."
-pg_dump --format=plain --schema-only         --no-tablespaces --no-acl --no-owner "$POSTGRES_DB_NAME" | sed -e '/^$/d' -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_SCHEMA_FILE"
-pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_DB_NAME" | sed -e '/^$/d' -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_ROWS_FILE"
+echo "Dumping Postgres schema..."
 
-echo "Cleaning up temporary Postgres database..."
-dropdb $POSTGRES_DB_NAME
+pg_dump --format=plain --schema-only         --no-tablespaces --no-acl --no-owner "$POSTGRES_COMMON_DB_NAME" | cleanup_pg_schema  > "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
+pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_COMMON_DB_NAME" | cleanup_pg_schema >> "$OUTPUT_DIR/common/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
+pg_dump --format=plain --schema-only         --no-tablespaces --no-acl --no-owner "$POSTGRES_MAIN_DB_NAME"   | cleanup_pg_schema  > "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
+pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_MAIN_DB_NAME"   | cleanup_pg_schema >> "$OUTPUT_DIR/main/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
+pg_dump --format=plain --schema-only         --no-tablespaces --no-acl --no-owner "$POSTGRES_STATE_DB_NAME"  | cleanup_pg_schema  > "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
+pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_STATE_DB_NAME"  | cleanup_pg_schema >> "$OUTPUT_DIR/state/full_schema/$SCHEMA_NUMBER/full.sql.postgres"
 
 echo "Done! Files dumped to: $OUTPUT_DIR"
diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py
index d08517a953..2c377533c0 100644
--- a/scripts-dev/mypy_synapse_plugin.py
+++ b/scripts-dev/mypy_synapse_plugin.py
@@ -29,7 +29,7 @@ class SynapsePlugin(Plugin):
         self, fullname: str
     ) -> Optional[Callable[[MethodSigContext], CallableType]]:
         if fullname.startswith(
-            "synapse.util.caches.descriptors._CachedFunction.__call__"
+            "synapse.util.caches.descriptors.CachedFunction.__call__"
         ) or fullname.startswith(
             "synapse.util.caches.descriptors._LruCachedFunction.__call__"
         ):
@@ -38,7 +38,7 @@ class SynapsePlugin(Plugin):
 
 
 def cached_function_method_signature(ctx: MethodSigContext) -> CallableType:
-    """Fixes the `_CachedFunction.__call__` signature to be correct.
+    """Fixes the `CachedFunction.__call__` signature to be correct.
 
     It already has *almost* the correct signature, except:
 
diff --git a/stubs/synapse/synapse_rust.pyi b/stubs/synapse/synapse_rust/__init__.pyi
index 8658d3138f..8658d3138f 100644
--- a/stubs/synapse/synapse_rust.pyi
+++ b/stubs/synapse/synapse_rust/__init__.pyi
diff --git a/stubs/synapse/synapse_rust/push.pyi b/stubs/synapse/synapse_rust/push.pyi
new file mode 100644
index 0000000000..93c4e69d42
--- /dev/null
+++ b/stubs/synapse/synapse_rust/push.pyi
@@ -0,0 +1,37 @@
+from typing import Any, Collection, Dict, Mapping, Sequence, Tuple, Union
+
+from synapse.types import JsonDict
+
+class PushRule:
+    @property
+    def rule_id(self) -> str: ...
+    @property
+    def priority_class(self) -> int: ...
+    @property
+    def conditions(self) -> Sequence[Mapping[str, str]]: ...
+    @property
+    def actions(self) -> Sequence[Union[Mapping[str, Any], str]]: ...
+    @property
+    def default(self) -> bool: ...
+    @property
+    def default_enabled(self) -> bool: ...
+    @staticmethod
+    def from_db(
+        rule_id: str, priority_class: int, conditions: str, actions: str
+    ) -> "PushRule": ...
+
+class PushRules:
+    def __init__(self, rules: Collection[PushRule]): ...
+    def rules(self) -> Collection[PushRule]: ...
+
+class FilteredPushRules:
+    def __init__(
+        self,
+        push_rules: PushRules,
+        enabled_map: Dict[str, bool],
+        msc3786_enabled: bool,
+        msc3772_enabled: bool,
+    ): ...
+    def rules(self) -> Collection[Tuple[PushRule, bool]]: ...
+
+def get_base_rule_ids() -> Collection[str]: ...
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 30983c47fb..450ba462ba 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -111,6 +111,7 @@ BOOLEAN_COLUMNS = {
     "e2e_fallback_keys_json": ["used"],
     "access_tokens": ["used"],
     "device_lists_changes_in_room": ["converted_to_destinations"],
+    "pushers": ["enabled"],
 }
 
 
diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py
index b4aeae6dd5..fb1fb83f50 100755
--- a/synapse/_scripts/update_synapse_database.py
+++ b/synapse/_scripts/update_synapse_database.py
@@ -48,10 +48,13 @@ class MockHomeserver(HomeServer):
 
 
 def run_background_updates(hs: HomeServer) -> None:
-    store = hs.get_datastores().main
+    main = hs.get_datastores().main
+    state = hs.get_datastores().state
 
     async def run_background_updates() -> None:
-        await store.db_pool.updates.run_background_updates(sleep=False)
+        await main.db_pool.updates.run_background_updates(sleep=False)
+        if state:
+            await state.db_pool.updates.run_background_updates(sleep=False)
         # Stop the reactor to exit the script once every background update is run.
         reactor.stop()
 
@@ -97,8 +100,11 @@ def main() -> None:
     # Load, process and sanity-check the config.
     hs_config = yaml.safe_load(args.database_config)
 
-    if "database" not in hs_config:
-        sys.stderr.write("The configuration file must have a 'database' section.\n")
+    if "database" not in hs_config and "databases" not in hs_config:
+        sys.stderr.write(
+            "The configuration file must have a 'database' or 'databases' section. "
+            "See https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#database"
+        )
         sys.exit(4)
 
     config = HomeServerConfig()
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 702b81e636..bf27f6c101 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -93,3 +93,13 @@ class ExperimentalConfig(Config):
 
         # MSC3852: Expose last seen user agent field on /_matrix/client/v3/devices.
         self.msc3852_enabled: bool = experimental.get("msc3852_enabled", False)
+
+        # MSC3881: Remotely toggle push notifications for another client
+        self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
+
+        # MSC3882: Allow an existing session to sign in a new session
+        self.msc3882_enabled: bool = experimental.get("msc3882_enabled", False)
+        self.msc3882_ui_auth: bool = experimental.get("msc3882_ui_auth", True)
+        self.msc3882_token_timeout = self.parse_duration(
+            experimental.get("msc3882_token_timeout", "5m")
+        )
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 0327fc57a4..eacd631ee0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -63,7 +63,6 @@ from synapse.http.server import finish_request, respond_with_html
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import defer_to_thread
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.roommember import ProfileInfo
 from synapse.types import JsonDict, Requester, UserID
 from synapse.util import stringutils as stringutils
 from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
@@ -1687,41 +1686,10 @@ class AuthHandler:
             respond_with_html(request, 403, self._sso_account_deactivated_template)
             return
 
-        profile = await self.store.get_profileinfo(
+        user_profile_data = await self.store.get_profileinfo(
             UserID.from_string(registered_user_id).localpart
         )
 
-        self._complete_sso_login(
-            registered_user_id,
-            auth_provider_id,
-            request,
-            client_redirect_url,
-            extra_attributes,
-            new_user=new_user,
-            user_profile_data=profile,
-            auth_provider_session_id=auth_provider_session_id,
-        )
-
-    def _complete_sso_login(
-        self,
-        registered_user_id: str,
-        auth_provider_id: str,
-        request: Request,
-        client_redirect_url: str,
-        extra_attributes: Optional[JsonDict] = None,
-        new_user: bool = False,
-        user_profile_data: Optional[ProfileInfo] = None,
-        auth_provider_session_id: Optional[str] = None,
-    ) -> None:
-        """
-        The synchronous portion of complete_sso_login.
-
-        This exists purely for backwards compatibility of synapse.module_api.ModuleApi.
-        """
-
-        if user_profile_data is None:
-            user_profile_data = ProfileInfo(None, None)
-
         # Store any extra attributes which will be passed in the login response.
         # Note that this is per-user so it may overwrite a previous value, this
         # is considered OK since the newest SSO attributes should be most valid.
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 2599160bcc..1219672a59 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -16,14 +16,17 @@ from typing import TYPE_CHECKING, List, Optional, Union
 import attr
 
 from synapse.api.errors import SynapseError, UnrecognizedRequestError
-from synapse.push.baserules import BASE_RULE_IDS
 from synapse.storage.push_rule import RuleNotFoundException
+from synapse.synapse_rust.push import get_base_rule_ids
 from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
+BASE_RULE_IDS = get_base_rule_ids()
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class RuleSpec:
     scope: str
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 20ec22105a..cfcadb34db 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -997,7 +997,7 @@ class RegistrationHandler:
             assert user_tuple
             token_id = user_tuple.token_id
 
-            await self.pusher_pool.add_pusher(
+            await self.pusher_pool.add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="email",
@@ -1005,7 +1005,7 @@ class RegistrationHandler:
                 app_display_name="Email Notifications",
                 device_display_name=threepid["address"],
                 pushkey=threepid["address"],
-                lang=None,  # We don't know a user's language here
+                lang=None,
                 data={},
             )
 
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 1e171f3f71..6bc1cbd787 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -128,6 +128,9 @@ class SsoIdentityProvider(Protocol):
 
 @attr.s(auto_attribs=True)
 class UserAttributes:
+    # NB: This struct is documented in docs/sso_mapping_providers.md so that users can
+    # populate it with data from their own mapping providers.
+
     # the localpart of the mxid that the mapper has assigned to the user.
     # if `None`, the mapper has not picked a userid, and the user should be prompted to
     # enter one.
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 87ba154cb7..59755bff6d 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -125,7 +125,7 @@ from synapse.types import (
 )
 from synapse.util import Clock
 from synapse.util.async_helpers import maybe_awaitable
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import CachedFunction, cached
 from synapse.util.frozenutils import freeze
 
 if TYPE_CHECKING:
@@ -836,29 +836,35 @@ class ModuleApi:
             self._store.db_pool.runInteraction(desc, func, *args, **kwargs)  # type: ignore[arg-type]
         )
 
-    def complete_sso_login(
-        self, registered_user_id: str, request: SynapseRequest, client_redirect_url: str
-    ) -> None:
-        """Complete a SSO login by redirecting the user to a page to confirm whether they
-        want their access token sent to `client_redirect_url`, or redirect them to that
-        URL with a token directly if the URL matches with one of the whitelisted clients.
+    def register_cached_function(self, cached_func: CachedFunction) -> None:
+        """Register a cached function that should be invalidated across workers.
+        Invalidation local to a worker can be done directly using `cached_func.invalidate`,
+        however invalidation that needs to go to other workers needs to call `invalidate_cache`
+        on the module API instead.
 
-        This is deprecated in favor of complete_sso_login_async.
+        Args:
+            cached_function: The cached function that will be registered to receive invalidation
+            locally and from other workers.
+        """
+        self._store.register_external_cached_function(
+            f"{cached_func.__module__}.{cached_func.__name__}", cached_func
+        )
 
-        Added in Synapse v1.11.1.
+    async def invalidate_cache(
+        self, cached_func: CachedFunction, keys: Tuple[Any, ...]
+    ) -> None:
+        """Invalidate a cache entry of a cached function across workers. The cached function
+        needs to be registered on all workers first with `register_cached_function`.
 
         Args:
-            registered_user_id: The MXID that has been registered as a previous step of
-                of this SSO login.
-            request: The request to respond to.
-            client_redirect_url: The URL to which to offer to redirect the user (or to
-                redirect them directly if whitelisted).
-        """
-        self._auth_handler._complete_sso_login(
-            registered_user_id,
-            "<unknown>",
-            request,
-            client_redirect_url,
+            cached_function: The cached function that needs an invalidation
+            keys: keys of the entry to invalidate, usually matching the arguments of the
+            cached function.
+        """
+        cached_func.invalidate(keys)
+        await self._store.send_invalidation_to_replication(
+            f"{cached_func.__module__}.{cached_func.__name__}",
+            keys,
         )
 
     async def complete_sso_login_async(
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 57c4d70466..a0c760239d 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -116,6 +116,8 @@ class PusherConfig:
     last_stream_ordering: int
     last_success: Optional[int]
     failing_since: Optional[int]
+    enabled: bool
+    device_id: Optional[str]
 
     def as_dict(self) -> Dict[str, Any]:
         """Information that can be retrieved about a pusher after creation."""
@@ -128,6 +130,8 @@ class PusherConfig:
             "lang": self.lang,
             "profile_tag": self.profile_tag,
             "pushkey": self.pushkey,
+            "enabled": self.enabled,
+            "device_id": self.device_id,
         }
 
 
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
deleted file mode 100644
index 440205e80c..0000000000
--- a/synapse/push/baserules.py
+++ /dev/null
@@ -1,583 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2017 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Push rules is the system used to determine which events trigger a push (and a
-bump in notification counts).
-
-This consists of a list of "push rules" for each user, where a push rule is a
-pair of "conditions" and "actions". When a user receives an event Synapse
-iterates over the list of push rules until it finds one where all the conditions
-match the event, at which point "actions" describe the outcome (e.g. notify,
-highlight, etc).
-
-Push rules are split up into 5 different "kinds" (aka "priority classes"), which
-are run in order:
-    1. Override — highest priority rules, e.g. always ignore notices
-    2. Content — content specific rules, e.g. @ notifications
-    3. Room — per room rules, e.g. enable/disable notifications for all messages
-       in a room
-    4. Sender — per sender rules, e.g. never notify for messages from a given
-       user
-    5. Underride — the lowest priority "default" rules, e.g. notify for every
-       message.
-
-The set of "base rules" are the list of rules that every user has by default. A
-user can modify their copy of the push rules in one of three ways:
-
-    1. Adding a new push rule of a certain kind
-    2. Changing the actions of a base rule
-    3. Enabling/disabling a base rule.
-
-The base rules are split into whether they come before or after a particular
-kind, so the order of push rule evaluation would be: base rules for before
-"override" kind, user defined "override" rules, base rules after "override"
-kind, etc, etc.
-"""
-
-import itertools
-import logging
-from typing import Dict, Iterator, List, Mapping, Sequence, Tuple, Union
-
-import attr
-
-from synapse.config.experimental import ExperimentalConfig
-from synapse.push.rulekinds import PRIORITY_CLASS_MAP
-
-logger = logging.getLogger(__name__)
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True)
-class PushRule:
-    """A push rule
-
-    Attributes:
-        rule_id: a unique ID for this rule
-        priority_class: what "kind" of push rule this is (see
-            `PRIORITY_CLASS_MAP` for mapping between int and kind)
-        conditions: the sequence of conditions that all need to match
-        actions: the actions to apply if all conditions are met
-        default: is this a base rule?
-        default_enabled: is this enabled by default?
-    """
-
-    rule_id: str
-    priority_class: int
-    conditions: Sequence[Mapping[str, str]]
-    actions: Sequence[Union[str, Mapping]]
-    default: bool = False
-    default_enabled: bool = True
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False)
-class PushRules:
-    """A collection of push rules for an account.
-
-    Can be iterated over, producing push rules in priority order.
-    """
-
-    # A mapping from rule ID to push rule that overrides a base rule. These will
-    # be returned instead of the base rule.
-    overriden_base_rules: Dict[str, PushRule] = attr.Factory(dict)
-
-    # The following stores the custom push rules at each priority class.
-    #
-    # We keep these separate (rather than combining into one big list) to avoid
-    # copying the base rules around all the time.
-    override: List[PushRule] = attr.Factory(list)
-    content: List[PushRule] = attr.Factory(list)
-    room: List[PushRule] = attr.Factory(list)
-    sender: List[PushRule] = attr.Factory(list)
-    underride: List[PushRule] = attr.Factory(list)
-
-    def __iter__(self) -> Iterator[PushRule]:
-        # When iterating over the push rules we need to return the base rules
-        # interspersed at the correct spots.
-        for rule in itertools.chain(
-            BASE_PREPEND_OVERRIDE_RULES,
-            self.override,
-            BASE_APPEND_OVERRIDE_RULES,
-            self.content,
-            BASE_APPEND_CONTENT_RULES,
-            self.room,
-            self.sender,
-            self.underride,
-            BASE_APPEND_UNDERRIDE_RULES,
-        ):
-            # Check if a base rule has been overriden by a custom rule. If so
-            # return that instead.
-            override_rule = self.overriden_base_rules.get(rule.rule_id)
-            if override_rule:
-                yield override_rule
-            else:
-                yield rule
-
-    def __len__(self) -> int:
-        # The length is mostly used by caches to get a sense of "size" / amount
-        # of memory this object is using, so we only count the number of custom
-        # rules.
-        return (
-            len(self.overriden_base_rules)
-            + len(self.override)
-            + len(self.content)
-            + len(self.room)
-            + len(self.sender)
-            + len(self.underride)
-        )
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False)
-class FilteredPushRules:
-    """A wrapper around `PushRules` that filters out disabled experimental push
-    rules, and includes the "enabled" state for each rule when iterated over.
-    """
-
-    push_rules: PushRules
-    enabled_map: Dict[str, bool]
-    experimental_config: ExperimentalConfig
-
-    def __iter__(self) -> Iterator[Tuple[PushRule, bool]]:
-        for rule in self.push_rules:
-            if not _is_experimental_rule_enabled(
-                rule.rule_id, self.experimental_config
-            ):
-                continue
-
-            enabled = self.enabled_map.get(rule.rule_id, rule.default_enabled)
-
-            yield rule, enabled
-
-    def __len__(self) -> int:
-        return len(self.push_rules)
-
-
-DEFAULT_EMPTY_PUSH_RULES = PushRules()
-
-
-def compile_push_rules(rawrules: List[PushRule]) -> PushRules:
-    """Given a set of custom push rules return a `PushRules` instance (which
-    includes the base rules).
-    """
-
-    if not rawrules:
-        # Fast path to avoid allocating empty lists when there are no custom
-        # rules for the user.
-        return DEFAULT_EMPTY_PUSH_RULES
-
-    rules = PushRules()
-
-    for rule in rawrules:
-        # We need to decide which bucket each custom push rule goes into.
-
-        # If it has the same ID as a base rule then it overrides that...
-        overriden_base_rule = BASE_RULES_BY_ID.get(rule.rule_id)
-        if overriden_base_rule:
-            rules.overriden_base_rules[rule.rule_id] = attr.evolve(
-                overriden_base_rule, actions=rule.actions
-            )
-            continue
-
-        # ... otherwise it gets added to the appropriate priority class bucket
-        collection: List[PushRule]
-        if rule.priority_class == 5:
-            collection = rules.override
-        elif rule.priority_class == 4:
-            collection = rules.content
-        elif rule.priority_class == 3:
-            collection = rules.room
-        elif rule.priority_class == 2:
-            collection = rules.sender
-        elif rule.priority_class == 1:
-            collection = rules.underride
-        elif rule.priority_class <= 0:
-            logger.info(
-                "Got rule with priority class less than zero, but doesn't override a base rule: %s",
-                rule,
-            )
-            continue
-        else:
-            # We log and continue here so as not to break event sending
-            logger.error("Unknown priority class: %", rule.priority_class)
-            continue
-
-        collection.append(rule)
-
-    return rules
-
-
-def _is_experimental_rule_enabled(
-    rule_id: str, experimental_config: ExperimentalConfig
-) -> bool:
-    """Used by `FilteredPushRules` to filter out experimental rules when they
-    have not been enabled.
-    """
-    if (
-        rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
-        and not experimental_config.msc3786_enabled
-    ):
-        return False
-    if (
-        rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
-        and not experimental_config.msc3772_enabled
-    ):
-        return False
-    return True
-
-
-BASE_APPEND_CONTENT_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["content"],
-        rule_id="global/content/.m.rule.contains_user_name",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                # Match the localpart of the requester's MXID.
-                "pattern_type": "user_localpart",
-            }
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight"},
-        ],
-    )
-]
-
-
-BASE_PREPEND_OVERRIDE_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.master",
-        default_enabled=False,
-        conditions=[],
-        actions=["dont_notify"],
-    )
-]
-
-
-BASE_APPEND_OVERRIDE_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.suppress_notices",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "content.msgtype",
-                "pattern": "m.notice",
-                "_cache_key": "_suppress_notices",
-            }
-        ],
-        actions=["dont_notify"],
-    ),
-    # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
-    # otherwise invites will be matched by .m.rule.member_event
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.invite_for_me",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.member",
-                "_cache_key": "_member",
-            },
-            {
-                "kind": "event_match",
-                "key": "content.membership",
-                "pattern": "invite",
-                "_cache_key": "_invite_member",
-            },
-            # Match the requester's MXID.
-            {"kind": "event_match", "key": "state_key", "pattern_type": "user_id"},
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    # Will we sometimes want to know about people joining and leaving?
-    # Perhaps: if so, this could be expanded upon. Seems the most usual case
-    # is that we don't though. We add this override rule so that even if
-    # the room rule is set to notify, we don't get notifications about
-    # join/leave/avatar/displayname events.
-    # See also: https://matrix.org/jira/browse/SYN-607
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.member_event",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.member",
-                "_cache_key": "_member",
-            }
-        ],
-        actions=["dont_notify"],
-    ),
-    # This was changed from underride to override so it's closer in priority
-    # to the content rules where the user name highlight rule lives. This
-    # way a room rule is lower priority than both but a custom override rule
-    # is higher priority than both.
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.contains_display_name",
-        conditions=[{"kind": "contains_display_name"}],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight"},
-        ],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.roomnotif",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                "pattern": "@room",
-                "_cache_key": "_roomnotif_content",
-            },
-            {
-                "kind": "sender_notification_permission",
-                "key": "room",
-                "_cache_key": "_roomnotif_pl",
-            },
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": True}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.tombstone",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.tombstone",
-                "_cache_key": "_tombstone",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "",
-                "_cache_key": "_tombstone_statekey",
-            },
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": True}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.reaction",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.reaction",
-                "_cache_key": "_reaction",
-            }
-        ],
-        actions=["dont_notify"],
-    ),
-    # XXX: This is an experimental rule that is only enabled if msc3786_enabled
-    # is enabled, if it is not the rule gets filtered out in _load_rules() in
-    # PushRulesWorkerStore
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.org.matrix.msc3786.rule.room.server_acl",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.server_acl",
-                "_cache_key": "_room_server_acl",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "",
-                "_cache_key": "_room_server_acl_state_key",
-            },
-        ],
-        actions=[],
-    ),
-]
-
-
-BASE_APPEND_UNDERRIDE_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.call",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.call.invite",
-                "_cache_key": "_call",
-            }
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "ring"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    # XXX: once m.direct is standardised everywhere, we should use it to detect
-    # a DM from the user's perspective rather than this heuristic.
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.room_one_to_one",
-        conditions=[
-            {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"},
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.message",
-                "_cache_key": "_message",
-            },
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    # XXX: this is going to fire for events which aren't m.room.messages
-    # but are encrypted (e.g. m.call.*)...
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.encrypted_room_one_to_one",
-        conditions=[
-            {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"},
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.encrypted",
-                "_cache_key": "_encrypted",
-            },
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.org.matrix.msc3772.thread_reply",
-        conditions=[
-            {
-                "kind": "org.matrix.msc3772.relation_match",
-                "rel_type": "m.thread",
-                # Match the requester's MXID.
-                "sender_type": "user_id",
-            }
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.message",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.message",
-                "_cache_key": "_message",
-            }
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-    # XXX: this is going to fire for events which aren't m.room.messages
-    # but are encrypted (e.g. m.call.*)...
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.encrypted",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.encrypted",
-                "_cache_key": "_encrypted",
-            }
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.im.vector.jitsi",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "im.vector.modular.widgets",
-                "_cache_key": "_type_modular_widgets",
-            },
-            {
-                "kind": "event_match",
-                "key": "content.type",
-                "pattern": "jitsi",
-                "_cache_key": "_content_type_jitsi",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "*",
-                "_cache_key": "_is_state_event",
-            },
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-]
-
-
-BASE_RULE_IDS = set()
-
-BASE_RULES_BY_ID: Dict[str, PushRule] = {}
-
-for r in BASE_APPEND_CONTENT_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
-
-for r in BASE_PREPEND_OVERRIDE_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
-
-for r in BASE_APPEND_OVERRIDE_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
-
-for r in BASE_APPEND_UNDERRIDE_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 3846fbc5f0..404379ef67 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -37,11 +37,11 @@ from synapse.events.snapshot import EventContext
 from synapse.state import POWER_KEY
 from synapse.storage.databases.main.roommember import EventIdMembership
 from synapse.storage.state import StateFilter
+from synapse.synapse_rust.push import FilteredPushRules, PushRule
 from synapse.util.caches import register_cache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_event_for_clients_with_state
 
-from .baserules import FilteredPushRules, PushRule
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 if TYPE_CHECKING:
@@ -280,7 +280,8 @@ class BulkPushRuleEvaluator:
         thread_id = "main"
         if relation:
             relations = await self._get_mutual_relations(
-                relation.parent_id, itertools.chain(*rules_by_user.values())
+                relation.parent_id,
+                itertools.chain(*(r.rules() for r in rules_by_user.values())),
             )
             if relation.rel_type == RelationTypes.THREAD:
                 thread_id = relation.parent_id
@@ -333,7 +334,7 @@ class BulkPushRuleEvaluator:
                 # current user, it'll be added to the dict later.
                 actions_by_user[uid] = []
 
-            for rule, enabled in rules:
+            for rule, enabled in rules.rules():
                 if not enabled:
                     continue
 
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 73618d9234..ebc13beda1 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -16,10 +16,9 @@ import copy
 from typing import Any, Dict, List, Optional
 
 from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+from synapse.synapse_rust.push import FilteredPushRules, PushRule
 from synapse.types import UserID
 
-from .baserules import FilteredPushRules, PushRule
-
 
 def format_push_rules_for_user(
     user: UserID, ruleslist: FilteredPushRules
@@ -34,7 +33,7 @@ def format_push_rules_for_user(
 
     rules["global"] = _add_empty_priority_class_arrays(rules["global"])
 
-    for r, enabled in ruleslist:
+    for r, enabled in ruleslist.rules():
         template_name = _priority_class_to_template_name(r.priority_class)
 
         rulearray = rules["global"][template_name]
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 1e0ef44fc7..e2648cbc93 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -94,7 +94,7 @@ class PusherPool:
             return
         run_as_background_process("start_pushers", self._start_pushers)
 
-    async def add_pusher(
+    async def add_or_update_pusher(
         self,
         user_id: str,
         access_token: Optional[int],
@@ -106,6 +106,8 @@ class PusherPool:
         lang: Optional[str],
         data: JsonDict,
         profile_tag: str = "",
+        enabled: bool = True,
+        device_id: Optional[str] = None,
     ) -> Optional[Pusher]:
         """Creates a new pusher and adds it to the pool
 
@@ -147,9 +149,22 @@ class PusherPool:
                 last_stream_ordering=last_stream_ordering,
                 last_success=None,
                 failing_since=None,
+                enabled=enabled,
+                device_id=device_id,
             )
         )
 
+        # Before we actually persist the pusher, we check if the user already has one
+        # this app ID and pushkey. If so, we want to keep the access token and device ID
+        # in place, since this could be one device modifying (e.g. enabling/disabling)
+        # another device's pusher.
+        existing_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+            user_id, app_id, pushkey
+        )
+        if existing_config:
+            access_token = existing_config.access_token
+            device_id = existing_config.device_id
+
         await self.store.add_pusher(
             user_id=user_id,
             access_token=access_token,
@@ -163,8 +178,10 @@ class PusherPool:
             data=data,
             last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
+            enabled=enabled,
+            device_id=device_id,
         )
-        pusher = await self.start_pusher_by_id(app_id, pushkey, user_id)
+        pusher = await self.process_pusher_change_by_id(app_id, pushkey, user_id)
 
         return pusher
 
@@ -276,10 +293,25 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
-    async def start_pusher_by_id(
+    async def _get_pusher_config_for_user_by_app_id_and_pushkey(
+        self, user_id: str, app_id: str, pushkey: str
+    ) -> Optional[PusherConfig]:
+        resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
+
+        pusher_config = None
+        for r in resultlist:
+            if r.user_name == user_id:
+                pusher_config = r
+
+        return pusher_config
+
+    async def process_pusher_change_by_id(
         self, app_id: str, pushkey: str, user_id: str
     ) -> Optional[Pusher]:
-        """Look up the details for the given pusher, and start it
+        """Look up the details for the given pusher, and either start it if its
+        "enabled" flag is True, or try to stop it otherwise.
+
+        If the pusher is new and its "enabled" flag is False, the stop is a noop.
 
         Returns:
             The pusher started, if any
@@ -290,12 +322,13 @@ class PusherPool:
         if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
             return None
 
-        resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
+        pusher_config = await self._get_pusher_config_for_user_by_app_id_and_pushkey(
+            user_id, app_id, pushkey
+        )
 
-        pusher_config = None
-        for r in resultlist:
-            if r.user_name == user_id:
-                pusher_config = r
+        if pusher_config and not pusher_config.enabled:
+            self.maybe_stop_pusher(app_id, pushkey, user_id)
+            return None
 
         pusher = None
         if pusher_config:
@@ -305,7 +338,7 @@ class PusherPool:
 
     async def _start_pushers(self) -> None:
         """Start all the pushers"""
-        pushers = await self.store.get_all_pushers()
+        pushers = await self.store.get_enabled_pushers()
 
         # Stagger starting up the pushers so we don't completely drown the
         # process on start up.
@@ -363,6 +396,8 @@ class PusherPool:
 
         synapse_pushers.labels(type(pusher).__name__, pusher.app_id).inc()
 
+        logger.info("Starting pusher %s / %s", pusher.user_id, appid_pushkey)
+
         # Check if there *may* be push to process. We do this as this check is a
         # lot cheaper to do than actually fetching the exact rows we need to
         # push.
@@ -382,16 +417,7 @@ class PusherPool:
         return pusher
 
     async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
-        appid_pushkey = "%s:%s" % (app_id, pushkey)
-
-        byuser = self.pushers.get(user_id, {})
-
-        if appid_pushkey in byuser:
-            logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
-            pusher = byuser.pop(appid_pushkey)
-            pusher.on_stop()
-
-            synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
+        self.maybe_stop_pusher(app_id, pushkey, user_id)
 
         # We can only delete pushers on master.
         if self._remove_pusher_client:
@@ -402,3 +428,22 @@ class PusherPool:
             await self.store.delete_pusher_by_app_id_pushkey_user_id(
                 app_id, pushkey, user_id
             )
+
+    def maybe_stop_pusher(self, app_id: str, pushkey: str, user_id: str) -> None:
+        """Stops a pusher with the given app ID and push key if one is running.
+
+        Args:
+            app_id: the pusher's app ID.
+            pushkey: the pusher's push key.
+            user_id: the user the pusher belongs to. Only used for logging.
+        """
+        appid_pushkey = "%s:%s" % (app_id, pushkey)
+
+        byuser = self.pushers.get(user_id, {})
+
+        if appid_pushkey in byuser:
+            logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
+            pusher = byuser.pop(appid_pushkey)
+            pusher.on_stop()
+
+            synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e4f2201c92..cf9cd6833b 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -189,7 +189,9 @@ class ReplicationDataHandler:
                 if row.deleted:
                     self.stop_pusher(row.user_id, row.app_id, row.pushkey)
                 else:
-                    await self.start_pusher(row.user_id, row.app_id, row.pushkey)
+                    await self.process_pusher_change(
+                        row.user_id, row.app_id, row.pushkey
+                    )
         elif stream_name == EventsStream.NAME:
             # We shouldn't get multiple rows per token for events stream, so
             # we don't need to optimise this for multiple rows.
@@ -334,13 +336,15 @@ class ReplicationDataHandler:
         logger.info("Stopping pusher %r / %r", user_id, key)
         pusher.on_stop()
 
-    async def start_pusher(self, user_id: str, app_id: str, pushkey: str) -> None:
+    async def process_pusher_change(
+        self, user_id: str, app_id: str, pushkey: str
+    ) -> None:
         if not self._notify_pushers:
             return
 
         key = "%s:%s" % (app_id, pushkey)
         logger.info("Starting pusher %r / %r", user_id, key)
-        await self._pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
+        await self._pusher_pool.process_pusher_change_by_id(app_id, pushkey, user_id)
 
 
 class FederationSenderHandler:
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index b712215112..9a2ab99ede 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -30,6 +30,7 @@ from synapse.rest.client import (
     keys,
     knock,
     login as v1_login,
+    login_token_request,
     logout,
     mutual_rooms,
     notifications,
@@ -130,3 +131,4 @@ class ClientRestResource(JsonResource):
 
         # unstable
         mutual_rooms.register_servlets(hs, client_resource)
+        login_token_request.register_servlets(hs, client_resource)
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 2ca6b2d08a..1274773d7e 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -375,7 +375,7 @@ class UserRestServletV2(RestServlet):
                         and self.hs.config.email.email_notif_for_new_users
                         and medium == "email"
                     ):
-                        await self.pusher_pool.add_pusher(
+                        await self.pusher_pool.add_or_update_pusher(
                             user_id=user_id,
                             access_token=None,
                             kind="email",
@@ -383,7 +383,7 @@ class UserRestServletV2(RestServlet):
                             app_display_name="Email Notifications",
                             device_display_name=address,
                             pushkey=address,
-                            lang=None,  # We don't know a user's language here
+                            lang=None,
                             data={},
                         )
 
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 2db2a04f95..44f622bcce 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -534,6 +534,11 @@ class AddThreepidMsisdnSubmitTokenServlet(RestServlet):
         "/add_threepid/msisdn/submit_token$", releases=(), unstable=True
     )
 
+    class PostBody(RequestBodyModel):
+        client_secret: ClientSecretStr
+        sid: StrictStr
+        token: StrictStr
+
     def __init__(self, hs: "HomeServer"):
         super().__init__()
         self.config = hs.config
@@ -549,16 +554,14 @@ class AddThreepidMsisdnSubmitTokenServlet(RestServlet):
                 "instead.",
             )
 
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(body, ["client_secret", "sid", "token"])
-        assert_valid_client_secret(body["client_secret"])
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         # Proxy submit_token request to msisdn threepid delegate
         response = await self.identity_handler.proxy_msisdn_submit_token(
             self.config.registration.account_threepid_delegate_msisdn,
-            body["client_secret"],
-            body["sid"],
-            body["token"],
+            body.client_secret,
+            body.sid,
+            body.token,
         )
         return 200, response
 
@@ -581,6 +584,10 @@ class ThreepidRestServlet(RestServlet):
 
         return 200, {"threepids": threepids}
 
+    # NOTE(dmr): I have chosen not to use Pydantic to parse this request's body, because
+    # the endpoint is deprecated. (If you really want to, you could do this by reusing
+    # ThreePidBindRestServelet.PostBody with an `alias_generator` to handle
+    # `threePidCreds` versus `three_pid_creds`.
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         if not self.hs.config.registration.enable_3pid_changes:
             raise SynapseError(
diff --git a/synapse/rest/client/login_token_request.py b/synapse/rest/client/login_token_request.py
new file mode 100644
index 0000000000..ca5c54bf17
--- /dev/null
+++ b/synapse/rest/client/login_token_request.py
@@ -0,0 +1,94 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.http.server import HttpServer
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.site import SynapseRequest
+from synapse.rest.client._base import client_patterns, interactive_auth_handler
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class LoginTokenRequestServlet(RestServlet):
+    """
+    Get a token that can be used with `m.login.token` to log in a second device.
+
+    Request:
+
+    POST /login/token HTTP/1.1
+    Content-Type: application/json
+
+    {}
+
+    Response:
+
+    HTTP/1.1 200 OK
+    {
+        "login_token": "ABDEFGH",
+        "expires_in": 3600,
+    }
+    """
+
+    PATTERNS = client_patterns("/login/token$")
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastores().main
+        self.clock = hs.get_clock()
+        self.server_name = hs.config.server.server_name
+        self.macaroon_gen = hs.get_macaroon_generator()
+        self.auth_handler = hs.get_auth_handler()
+        self.token_timeout = hs.config.experimental.msc3882_token_timeout
+        self.ui_auth = hs.config.experimental.msc3882_ui_auth
+
+    @interactive_auth_handler
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        body = parse_json_object_from_request(request)
+
+        if self.ui_auth:
+            await self.auth_handler.validate_user_via_ui_auth(
+                requester,
+                request,
+                body,
+                "issue a new access token for your account",
+                can_skip_ui_auth=False,  # Don't allow skipping of UI auth
+            )
+
+        login_token = self.macaroon_gen.generate_short_term_login_token(
+            user_id=requester.user.to_string(),
+            auth_provider_id="org.matrix.msc3882.login_token_request",
+            duration_in_ms=self.token_timeout,
+        )
+
+        return (
+            200,
+            {
+                "login_token": login_token,
+                "expires_in": self.token_timeout // 1000,
+            },
+        )
+
+
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3882_enabled:
+        LoginTokenRequestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/pusher.py b/synapse/rest/client/pusher.py
index 9a1f10f4be..975eef2144 100644
--- a/synapse/rest/client/pusher.py
+++ b/synapse/rest/client/pusher.py
@@ -42,6 +42,7 @@ class PushersRestServlet(RestServlet):
         super().__init__()
         self.hs = hs
         self.auth = hs.get_auth()
+        self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled
 
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request)
@@ -51,9 +52,16 @@ class PushersRestServlet(RestServlet):
             user.to_string()
         )
 
-        filtered_pushers = [p.as_dict() for p in pushers]
+        pusher_dicts = [p.as_dict() for p in pushers]
 
-        return 200, {"pushers": filtered_pushers}
+        for pusher in pusher_dicts:
+            if self._msc3881_enabled:
+                pusher["org.matrix.msc3881.enabled"] = pusher["enabled"]
+                pusher["org.matrix.msc3881.device_id"] = pusher["device_id"]
+            del pusher["enabled"]
+            del pusher["device_id"]
+
+        return 200, {"pushers": pusher_dicts}
 
 
 class PushersSetRestServlet(RestServlet):
@@ -65,6 +73,7 @@ class PushersSetRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.notifier = hs.get_notifier()
         self.pusher_pool = self.hs.get_pusherpool()
+        self._msc3881_enabled = self.hs.config.experimental.msc3881_enabled
 
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request)
@@ -103,6 +112,10 @@ class PushersSetRestServlet(RestServlet):
         if "append" in content:
             append = content["append"]
 
+        enabled = True
+        if self._msc3881_enabled and "org.matrix.msc3881.enabled" in content:
+            enabled = content["org.matrix.msc3881.enabled"]
+
         if not append:
             await self.pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
                 app_id=content["app_id"],
@@ -111,7 +124,7 @@ class PushersSetRestServlet(RestServlet):
             )
 
         try:
-            await self.pusher_pool.add_pusher(
+            await self.pusher_pool.add_or_update_pusher(
                 user_id=user.to_string(),
                 access_token=requester.access_token_id,
                 kind=content["kind"],
@@ -122,6 +135,8 @@ class PushersSetRestServlet(RestServlet):
                 lang=content["lang"],
                 data=content["data"],
                 profile_tag=content.get("profile_tag", ""),
+                enabled=enabled,
+                device_id=requester.device_id,
             )
         except PusherConfigException as pce:
             raise SynapseError(
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index c516cda95d..b3917a5abc 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -105,6 +105,10 @@ class VersionsRestServlet(RestServlet):
                     "org.matrix.msc3440.stable": True,  # TODO: remove when "v1.3" is added above
                     # Allows moderators to fetch redacted event content as described in MSC2815
                     "fi.mau.msc2815": self.config.experimental.msc2815_enabled,
+                    # Adds support for login token requests as per MSC3882
+                    "org.matrix.msc3882": self.config.experimental.msc3882_enabled,
+                    # Adds support for remotely enabling/disabling pushers, as per MSC3881
+                    "org.matrix.msc3881": self.config.experimental.msc3881_enabled,
                 },
             },
         )
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e30f9c76d4..303a5d5298 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -15,12 +15,13 @@
 # limitations under the License.
 import logging
 from abc import ABCMeta
-from typing import TYPE_CHECKING, Any, Collection, Iterable, Optional, Union
+from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Union
 
 from synapse.storage.database import make_in_list_sql_clause  # noqa: F401; noqa: F401
 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
 from synapse.types import get_domain_from_id
 from synapse.util import json_decoder
+from synapse.util.caches.descriptors import CachedFunction
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -47,6 +48,8 @@ class SQLBaseStore(metaclass=ABCMeta):
         self.database_engine = database.engine
         self.db_pool = database
 
+        self.external_cached_functions: Dict[str, CachedFunction] = {}
+
     def process_replication_rows(
         self,
         stream_name: str,
@@ -95,7 +98,7 @@ class SQLBaseStore(metaclass=ABCMeta):
 
     def _attempt_to_invalidate_cache(
         self, cache_name: str, key: Optional[Collection[Any]]
-    ) -> None:
+    ) -> bool:
         """Attempts to invalidate the cache of the given name, ignoring if the
         cache doesn't exist. Mainly used for invalidating caches on workers,
         where they may not have the cache.
@@ -113,9 +116,12 @@ class SQLBaseStore(metaclass=ABCMeta):
         try:
             cache = getattr(self, cache_name)
         except AttributeError:
-            # We probably haven't pulled in the cache in this worker,
-            # which is fine.
-            return
+            # Check if an externally defined module cache has been registered
+            cache = self.external_cached_functions.get(cache_name)
+            if not cache:
+                # We probably haven't pulled in the cache in this worker,
+                # which is fine.
+                return False
 
         if key is None:
             cache.invalidate_all()
@@ -125,6 +131,13 @@ class SQLBaseStore(metaclass=ABCMeta):
             invalidate_method = getattr(cache, "invalidate_local", cache.invalidate)
             invalidate_method(tuple(key))
 
+        return True
+
+    def register_external_cached_function(
+        self, cache_name: str, func: CachedFunction
+    ) -> None:
+        self.external_cached_functions[cache_name] = func
+
 
 def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
     """
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index bf5e7ee7be..2056ecb2c3 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -285,7 +285,10 @@ class BackgroundUpdater:
         back_to_back_failures = 0
 
         try:
-            logger.info("Starting background schema updates")
+            logger.info(
+                "Starting background schema updates for database %s",
+                self._database_name,
+            )
             while self.enabled:
                 try:
                     result = await self.do_next_background_update(sleep)
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 12e9a42382..2c421151c1 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -33,7 +33,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.util.caches.descriptors import _CachedFunction
+from synapse.util.caches.descriptors import CachedFunction
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -269,9 +269,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             return
 
         cache_func.invalidate(keys)
-        await self.db_pool.runInteraction(
-            "invalidate_cache_and_stream",
-            self._send_invalidation_to_replication,
+        await self.send_invalidation_to_replication(
             cache_func.__name__,
             keys,
         )
@@ -279,7 +277,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
     def _invalidate_cache_and_stream(
         self,
         txn: LoggingTransaction,
-        cache_func: _CachedFunction,
+        cache_func: CachedFunction,
         keys: Tuple[Any, ...],
     ) -> None:
         """Invalidates the cache and adds it to the cache stream so slaves
@@ -293,7 +291,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
 
     def _invalidate_all_cache_and_stream(
-        self, txn: LoggingTransaction, cache_func: _CachedFunction
+        self, txn: LoggingTransaction, cache_func: CachedFunction
     ) -> None:
         """Invalidates the entire cache and adds it to the cache stream so slaves
         will know to invalidate their caches.
@@ -334,6 +332,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 txn, CURRENT_STATE_CACHE_NAME, [room_id]
             )
 
+    async def send_invalidation_to_replication(
+        self, cache_name: str, keys: Optional[Collection[Any]]
+    ) -> None:
+        await self.db_pool.runInteraction(
+            "send_invalidation_to_replication",
+            self._send_invalidation_to_replication,
+            cache_name,
+            keys,
+        )
+
     def _send_invalidation_to_replication(
         self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]]
     ) -> None:
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 5079edd1e0..ed17b2e70c 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -30,9 +30,8 @@ from typing import (
 
 from synapse.api.errors import StoreError
 from synapse.config.homeserver import ExperimentalConfig
-from synapse.push.baserules import FilteredPushRules, PushRule, compile_push_rules
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
-from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
@@ -51,6 +50,7 @@ from synapse.storage.util.id_generators import (
     IdGenerator,
     StreamIdGenerator,
 )
+from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
@@ -72,18 +72,25 @@ def _load_rules(
     """
 
     ruleslist = [
-        PushRule(
+        PushRule.from_db(
             rule_id=rawrule["rule_id"],
             priority_class=rawrule["priority_class"],
-            conditions=db_to_json(rawrule["conditions"]),
-            actions=db_to_json(rawrule["actions"]),
+            conditions=rawrule["conditions"],
+            actions=rawrule["actions"],
         )
         for rawrule in rawrules
     ]
 
-    push_rules = compile_push_rules(ruleslist)
+    push_rules = PushRules(
+        ruleslist,
+    )
 
-    filtered_rules = FilteredPushRules(push_rules, enabled_map, experimental_config)
+    filtered_rules = FilteredPushRules(
+        push_rules,
+        enabled_map,
+        msc3786_enabled=experimental_config.msc3786_enabled,
+        msc3772_enabled=experimental_config.msc3772_enabled,
+    )
 
     return filtered_rules
 
@@ -845,7 +852,7 @@ class PushRuleStore(PushRulesWorkerStore):
         user_push_rules = await self.get_push_rules_for_user(user_id)
 
         # Get rules relating to the old room and copy them to the new room
-        for rule, enabled in user_push_rules:
+        for rule, enabled in user_push_rules.rules():
             if not enabled:
                 continue
 
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index bd0cfa7f32..01206950a9 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -89,6 +89,11 @@ class PusherWorkerStore(SQLBaseStore):
                 )
                 continue
 
+            # If we're using SQLite, then boolean values are integers. This is
+            # troublesome since some code using the return value of this method might
+            # expect it to be a boolean, or will expose it to clients (in responses).
+            r["enabled"] = bool(r["enabled"])
+
             yield PusherConfig(**r)
 
     async def get_pushers_by_app_id_and_pushkey(
@@ -100,38 +105,52 @@ class PusherWorkerStore(SQLBaseStore):
         return await self.get_pushers_by({"user_name": user_id})
 
     async def get_pushers_by(self, keyvalues: Dict[str, Any]) -> Iterator[PusherConfig]:
-        ret = await self.db_pool.simple_select_list(
-            "pushers",
-            keyvalues,
-            [
-                "id",
-                "user_name",
-                "access_token",
-                "profile_tag",
-                "kind",
-                "app_id",
-                "app_display_name",
-                "device_display_name",
-                "pushkey",
-                "ts",
-                "lang",
-                "data",
-                "last_stream_ordering",
-                "last_success",
-                "failing_since",
-            ],
+        """Retrieve pushers that match the given criteria.
+
+        Args:
+            keyvalues: A {column: value} dictionary.
+
+        Returns:
+            The pushers for which the given columns have the given values.
+        """
+
+        def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+            # We could technically use simple_select_list here, but we need to call
+            # COALESCE on the 'enabled' column. While it is technically possible to give
+            # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it
+            # feels a bit hacky, so it's probably better to just inline the query.
+            sql = """
+            SELECT
+                id, user_name, access_token, profile_tag, kind, app_id,
+                app_display_name, device_display_name, pushkey, ts, lang, data,
+                last_stream_ordering, last_success, failing_since,
+                COALESCE(enabled, TRUE) AS enabled, device_id
+            FROM pushers
+            """
+
+            sql += "WHERE %s" % (" AND ".join("%s = ?" % (k,) for k in keyvalues),)
+
+            txn.execute(sql, list(keyvalues.values()))
+
+            return self.db_pool.cursor_to_dict(txn)
+
+        ret = await self.db_pool.runInteraction(
             desc="get_pushers_by",
+            func=get_pushers_by_txn,
         )
+
         return self._decode_pushers_rows(ret)
 
-    async def get_all_pushers(self) -> Iterator[PusherConfig]:
-        def get_pushers(txn: LoggingTransaction) -> Iterator[PusherConfig]:
-            txn.execute("SELECT * FROM pushers")
+    async def get_enabled_pushers(self) -> Iterator[PusherConfig]:
+        def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]:
+            txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)")
             rows = self.db_pool.cursor_to_dict(txn)
 
             return self._decode_pushers_rows(rows)
 
-        return await self.db_pool.runInteraction("get_all_pushers", get_pushers)
+        return await self.db_pool.runInteraction(
+            "get_enabled_pushers", get_enabled_pushers_txn
+        )
 
     async def get_all_updated_pushers_rows(
         self, instance_name: str, last_id: int, current_id: int, limit: int
@@ -458,7 +477,74 @@ class PusherWorkerStore(SQLBaseStore):
         return number_deleted
 
 
-class PusherStore(PusherWorkerStore):
+class PusherBackgroundUpdatesStore(SQLBaseStore):
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            "set_device_id_for_pushers", self._set_device_id_for_pushers
+        )
+
+    async def _set_device_id_for_pushers(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update to populate the device_id column of the pushers table."""
+        last_pusher_id = progress.get("pusher_id", 0)
+
+        def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int:
+            txn.execute(
+                """
+                    SELECT p.id, at.device_id
+                    FROM pushers AS p
+                    INNER JOIN access_tokens AS at
+                        ON p.access_token = at.id
+                    WHERE
+                        p.access_token IS NOT NULL
+                        AND at.device_id IS NOT NULL
+                        AND p.id > ?
+                    ORDER BY p.id
+                    LIMIT ?
+                """,
+                (last_pusher_id, batch_size),
+            )
+
+            rows = self.db_pool.cursor_to_dict(txn)
+            if len(rows) == 0:
+                return 0
+
+            self.db_pool.simple_update_many_txn(
+                txn=txn,
+                table="pushers",
+                key_names=("id",),
+                key_values=[(row["id"],) for row in rows],
+                value_names=("device_id",),
+                value_values=[(row["device_id"],) for row in rows],
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["id"]}
+            )
+
+            return len(rows)
+
+        nb_processed = await self.db_pool.runInteraction(
+            "set_device_id_for_pushers", set_device_id_for_pushers_txn
+        )
+
+        if nb_processed < batch_size:
+            await self.db_pool.updates._end_background_update(
+                "set_device_id_for_pushers"
+            )
+
+        return nb_processed
+
+
+class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
     def get_pushers_stream_token(self) -> int:
         return self._pushers_id_gen.get_current_token()
 
@@ -476,6 +562,8 @@ class PusherStore(PusherWorkerStore):
         data: Optional[JsonDict],
         last_stream_ordering: int,
         profile_tag: str = "",
+        enabled: bool = True,
+        device_id: Optional[str] = None,
     ) -> None:
         async with self._pushers_id_gen.get_next() as stream_id:
             # no need to lock because `pushers` has a unique key on
@@ -494,6 +582,8 @@ class PusherStore(PusherWorkerStore):
                     "last_stream_ordering": last_stream_ordering,
                     "profile_tag": profile_tag,
                     "id": stream_id,
+                    "enabled": enabled,
+                    "device_id": device_id,
                 },
                 desc="add_pusher",
                 lock=False,
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 7bd27790eb..898947af95 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -51,6 +51,8 @@ class _RelatedEvent:
     event_id: str
     # The sender of the related event.
     sender: str
+    topological_ordering: Optional[int]
+    stream_ordering: int
 
 
 class RelationsWorkerStore(SQLBaseStore):
@@ -91,6 +93,9 @@ class RelationsWorkerStore(SQLBaseStore):
         # it. The `event_id` must match the `event.event_id`.
         assert event.event_id == event_id
 
+        # Ensure bad limits aren't being passed in.
+        assert limit >= 0
+
         where_clause = ["relates_to_id = ?", "room_id = ?"]
         where_args: List[Union[str, int]] = [event.event_id, room_id]
         is_redacted = event.internal_metadata.is_redacted()
@@ -139,21 +144,34 @@ class RelationsWorkerStore(SQLBaseStore):
         ) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]:
             txn.execute(sql, where_args + [limit + 1])
 
-            last_topo_id = None
-            last_stream_id = None
             events = []
-            for row in txn:
+            for event_id, relation_type, sender, topo_ordering, stream_ordering in txn:
                 # Do not include edits for redacted events as they leak event
                 # content.
-                if not is_redacted or row[1] != RelationTypes.REPLACE:
-                    events.append(_RelatedEvent(row[0], row[2]))
-                last_topo_id = row[3]
-                last_stream_id = row[4]
+                if not is_redacted or relation_type != RelationTypes.REPLACE:
+                    events.append(
+                        _RelatedEvent(event_id, sender, topo_ordering, stream_ordering)
+                    )
 
-            # If there are more events, generate the next pagination key.
+            # If there are more events, generate the next pagination key from the
+            # last event returned.
             next_token = None
-            if len(events) > limit and last_topo_id and last_stream_id:
-                next_key = RoomStreamToken(last_topo_id, last_stream_id)
+            if len(events) > limit:
+                # Instead of using the last row (which tells us there is more
+                # data), use the last row to be returned.
+                events = events[:limit]
+
+                topo = events[-1].topological_ordering
+                token = events[-1].stream_ordering
+                if direction == "b":
+                    # Tokens are positions between events.
+                    # This token points *after* the last event in the chunk.
+                    # We need it to point to the event before it in the chunk
+                    # when we are going backwards so we subtract one from the
+                    # stream part.
+                    token -= 1
+                next_key = RoomStreamToken(topo, token)
+
                 if from_token:
                     next_token = from_token.copy_and_replace(
                         StreamKeyType.ROOM, next_key
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 3f9bfaeac5..530f04e149 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1334,15 +1334,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         if rows:
             topo = rows[-1].topological_ordering
-            toke = rows[-1].stream_ordering
+            token = rows[-1].stream_ordering
             if direction == "b":
                 # Tokens are positions between events.
                 # This token points *after* the last event in the chunk.
                 # We need it to point to the event before it in the chunk
                 # when we are going backwards so we subtract one from the
                 # stream part.
-                toke -= 1
-            next_token = RoomStreamToken(topo, toke)
+                token -= 1
+            next_token = RoomStreamToken(topo, token)
         else:
             # TODO (erikj): We should work out what to do here instead.
             next_token = to_token if to_token else from_token
diff --git a/synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql b/synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql
new file mode 100644
index 0000000000..dba3b4900b
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/02add_pusher_enabled.sql
@@ -0,0 +1,16 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE pushers ADD COLUMN enabled BOOLEAN;
\ No newline at end of file
diff --git a/synapse/storage/schema/main/delta/73/03pusher_device_id.sql b/synapse/storage/schema/main/delta/73/03pusher_device_id.sql
new file mode 100644
index 0000000000..1b4ffbeebe
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/03pusher_device_id.sql
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a device_id column to track the device ID that created the pusher. It's NULLable
+-- on purpose, because a) it might not be possible to track down the device that created
+-- old pushers (pushers.access_token and access_tokens.device_id are both NULLable), and
+-- b) access tokens retrieved via the admin API don't have a device associated to them.
+ALTER TABLE pushers ADD COLUMN device_id TEXT;
\ No newline at end of file
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 10aff4d04a..3909f1caea 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -53,7 +53,7 @@ CacheKey = Union[Tuple, Any]
 F = TypeVar("F", bound=Callable[..., Any])
 
 
-class _CachedFunction(Generic[F]):
+class CachedFunction(Generic[F]):
     invalidate: Any = None
     invalidate_all: Any = None
     prefill: Any = None
@@ -242,7 +242,7 @@ class LruCacheDescriptor(_CacheDescriptorBase):
 
             return ret2
 
-        wrapped = cast(_CachedFunction, _wrapped)
+        wrapped = cast(CachedFunction, _wrapped)
         wrapped.cache = cache
         obj.__dict__[self.name] = wrapped
 
@@ -363,7 +363,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
 
             return make_deferred_yieldable(ret)
 
-        wrapped = cast(_CachedFunction, _wrapped)
+        wrapped = cast(CachedFunction, _wrapped)
 
         if self.num_args == 1:
             assert not self.tree
@@ -572,7 +572,7 @@ def cached(
     iterable: bool = False,
     prune_unread_entries: bool = True,
     name: Optional[str] = None,
-) -> Callable[[F], _CachedFunction[F]]:
+) -> Callable[[F], CachedFunction[F]]:
     func = lambda orig: DeferredCacheDescriptor(
         orig,
         max_entries=max_entries,
@@ -585,7 +585,7 @@ def cached(
         name=name,
     )
 
-    return cast(Callable[[F], _CachedFunction[F]], func)
+    return cast(Callable[[F], CachedFunction[F]], func)
 
 
 def cachedList(
@@ -594,7 +594,7 @@ def cachedList(
     list_name: str,
     num_args: Optional[int] = None,
     name: Optional[str] = None,
-) -> Callable[[F], _CachedFunction[F]]:
+) -> Callable[[F], CachedFunction[F]]:
     """Creates a descriptor that wraps a function in a `DeferredCacheListDescriptor`.
 
     Used to do batch lookups for an already created cache. One of the arguments
@@ -631,7 +631,7 @@ def cachedList(
         name=name,
     )
 
-    return cast(Callable[[F], _CachedFunction[F]], func)
+    return cast(Callable[[F], CachedFunction[F]], func)
 
 
 def _get_cache_key_builder(
diff --git a/tests/handlers/test_deactivate_account.py b/tests/handlers/test_deactivate_account.py
index 7b9b711521..bce65fab7d 100644
--- a/tests/handlers/test_deactivate_account.py
+++ b/tests/handlers/test_deactivate_account.py
@@ -15,11 +15,11 @@
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import AccountDataTypes
-from synapse.push.baserules import PushRule
 from synapse.push.rulekinds import PRIORITY_CLASS_MAP
 from synapse.rest import admin
 from synapse.rest.client import account, login
 from synapse.server import HomeServer
+from synapse.synapse_rust.push import PushRule
 from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
@@ -161,20 +161,15 @@ class DeactivateAccountTestCase(HomeserverTestCase):
             self._store.get_push_rules_for_user(self.user)
         )
         # Filter out default rules; we don't care
-        push_rules = [r for r, _ in filtered_push_rules if self._is_custom_rule(r)]
+        push_rules = [
+            r for r, _ in filtered_push_rules.rules() if self._is_custom_rule(r)
+        ]
         # Check our rule made it
-        self.assertEqual(
-            push_rules,
-            [
-                PushRule(
-                    rule_id="personal.override.rule1",
-                    priority_class=5,
-                    conditions=[],
-                    actions=[],
-                )
-            ],
-            push_rules,
-        )
+        self.assertEqual(len(push_rules), 1)
+        self.assertEqual(push_rules[0].rule_id, "personal.override.rule1")
+        self.assertEqual(push_rules[0].priority_class, 5)
+        self.assertEqual(push_rules[0].conditions, [])
+        self.assertEqual(push_rules[0].actions, [])
 
         # Request the deactivation of our account
         self._deactivate_my_account()
@@ -183,7 +178,9 @@ class DeactivateAccountTestCase(HomeserverTestCase):
             self._store.get_push_rules_for_user(self.user)
         )
         # Filter out default rules; we don't care
-        push_rules = [r for r, _ in filtered_push_rules if self._is_custom_rule(r)]
+        push_rules = [
+            r for r, _ in filtered_push_rules.rules() if self._is_custom_rule(r)
+        ]
         # Check our rule no longer exists
         self.assertEqual(push_rules, [], push_rules)
 
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index 7a3b0d6755..fd14568f55 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -114,7 +114,7 @@ class EmailPusherTests(HomeserverTestCase):
         )
 
         self.pusher = self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=self.user_id,
                 access_token=self.token_id,
                 kind="email",
@@ -136,7 +136,7 @@ class EmailPusherTests(HomeserverTestCase):
         """
         with self.assertRaises(SynapseError) as cm:
             self.get_success_or_raise(
-                self.hs.get_pusherpool().add_pusher(
+                self.hs.get_pusherpool().add_or_update_pusher(
                     user_id=self.user_id,
                     access_token=self.token_id,
                     kind="email",
diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index d9c68cdd2d..b383b8401f 100644
--- a/tests/push/test_http.py
+++ b/tests/push/test_http.py
@@ -19,9 +19,10 @@ from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.logging.context import make_deferred_yieldable
-from synapse.push import PusherConfigException
-from synapse.rest.client import login, push_rule, receipts, room
+from synapse.push import PusherConfig, PusherConfigException
+from synapse.rest.client import login, push_rule, pusher, receipts, room
 from synapse.server import HomeServer
+from synapse.storage.databases.main.registration import TokenLookupResult
 from synapse.types import JsonDict
 from synapse.util import Clock
 
@@ -35,6 +36,7 @@ class HTTPPusherTests(HomeserverTestCase):
         login.register_servlets,
         receipts.register_servlets,
         push_rule.register_servlets,
+        pusher.register_servlets,
     ]
     user_id = True
     hijack_auth = False
@@ -74,7 +76,7 @@ class HTTPPusherTests(HomeserverTestCase):
 
         def test_data(data: Optional[JsonDict]) -> None:
             self.get_failure(
-                self.hs.get_pusherpool().add_pusher(
+                self.hs.get_pusherpool().add_or_update_pusher(
                     user_id=user_id,
                     access_token=token_id,
                     kind="http",
@@ -119,7 +121,7 @@ class HTTPPusherTests(HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -235,7 +237,7 @@ class HTTPPusherTests(HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -355,7 +357,7 @@ class HTTPPusherTests(HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -441,7 +443,7 @@ class HTTPPusherTests(HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -518,7 +520,7 @@ class HTTPPusherTests(HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -624,7 +626,7 @@ class HTTPPusherTests(HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -728,18 +730,38 @@ class HTTPPusherTests(HomeserverTestCase):
         )
         self.assertEqual(channel.code, 200, channel.json_body)
 
-    def _make_user_with_pusher(self, username: str) -> Tuple[str, str]:
+    def _make_user_with_pusher(
+        self, username: str, enabled: bool = True
+    ) -> Tuple[str, str]:
+        """Registers a user and creates a pusher for them.
+
+        Args:
+            username: the localpart of the new user's Matrix ID.
+            enabled: whether to create the pusher in an enabled or disabled state.
+        """
         user_id = self.register_user(username, "pass")
         access_token = self.login(username, "pass")
 
         # Register the pusher
+        self._set_pusher(user_id, access_token, enabled)
+
+        return user_id, access_token
+
+    def _set_pusher(self, user_id: str, access_token: str, enabled: bool) -> None:
+        """Creates or updates the pusher for the given user.
+
+        Args:
+            user_id: the user's Matrix ID.
+            access_token: the access token associated with the pusher.
+            enabled: whether to enable or disable the pusher.
+        """
         user_tuple = self.get_success(
             self.hs.get_datastores().main.get_user_by_access_token(access_token)
         )
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
@@ -749,11 +771,11 @@ class HTTPPusherTests(HomeserverTestCase):
                 pushkey="a@example.com",
                 lang=None,
                 data={"url": "http://example.com/_matrix/push/v1/notify"},
+                enabled=enabled,
+                device_id=user_tuple.device_id,
             )
         )
 
-        return user_id, access_token
-
     def test_dont_notify_rule_overrides_message(self) -> None:
         """
         The override push rule will suppress notification
@@ -791,3 +813,148 @@ class HTTPPusherTests(HomeserverTestCase):
         # The user sends a message back (sends a notification)
         self.helper.send(room, body="Hello", tok=access_token)
         self.assertEqual(len(self.push_attempts), 1)
+
+    @override_config({"experimental_features": {"msc3881_enabled": True}})
+    def test_disable(self) -> None:
+        """Tests that disabling a pusher means it's not pushed to anymore."""
+        user_id, access_token = self._make_user_with_pusher("user")
+        other_user_id, other_access_token = self._make_user_with_pusher("otheruser")
+
+        room = self.helper.create_room_as(user_id, tok=access_token)
+        self.helper.join(room=room, user=other_user_id, tok=other_access_token)
+
+        # Send a message and check that it generated a push.
+        self.helper.send(room, body="Hi!", tok=other_access_token)
+        self.assertEqual(len(self.push_attempts), 1)
+
+        # Disable the pusher.
+        self._set_pusher(user_id, access_token, enabled=False)
+
+        # Send another message and check that it did not generate a push.
+        self.helper.send(room, body="Hi!", tok=other_access_token)
+        self.assertEqual(len(self.push_attempts), 1)
+
+        # Get the pushers for the user and check that it is marked as disabled.
+        channel = self.make_request("GET", "/pushers", access_token=access_token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(len(channel.json_body["pushers"]), 1)
+
+        enabled = channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]
+        self.assertFalse(enabled)
+        self.assertTrue(isinstance(enabled, bool))
+
+    @override_config({"experimental_features": {"msc3881_enabled": True}})
+    def test_enable(self) -> None:
+        """Tests that enabling a disabled pusher means it gets pushed to."""
+        # Create the user with the pusher already disabled.
+        user_id, access_token = self._make_user_with_pusher("user", enabled=False)
+        other_user_id, other_access_token = self._make_user_with_pusher("otheruser")
+
+        room = self.helper.create_room_as(user_id, tok=access_token)
+        self.helper.join(room=room, user=other_user_id, tok=other_access_token)
+
+        # Send a message and check that it did not generate a push.
+        self.helper.send(room, body="Hi!", tok=other_access_token)
+        self.assertEqual(len(self.push_attempts), 0)
+
+        # Enable the pusher.
+        self._set_pusher(user_id, access_token, enabled=True)
+
+        # Send another message and check that it did generate a push.
+        self.helper.send(room, body="Hi!", tok=other_access_token)
+        self.assertEqual(len(self.push_attempts), 1)
+
+        # Get the pushers for the user and check that it is marked as enabled.
+        channel = self.make_request("GET", "/pushers", access_token=access_token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(len(channel.json_body["pushers"]), 1)
+
+        enabled = channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"]
+        self.assertTrue(enabled)
+        self.assertTrue(isinstance(enabled, bool))
+
+    @override_config({"experimental_features": {"msc3881_enabled": True}})
+    def test_null_enabled(self) -> None:
+        """Tests that a pusher that has an 'enabled' column set to NULL (eg pushers
+        created before the column was introduced) is considered enabled.
+        """
+        # We intentionally set 'enabled' to None so that it's stored as NULL in the
+        # database.
+        user_id, access_token = self._make_user_with_pusher("user", enabled=None)  # type: ignore[arg-type]
+
+        channel = self.make_request("GET", "/pushers", access_token=access_token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(len(channel.json_body["pushers"]), 1)
+        self.assertTrue(channel.json_body["pushers"][0]["org.matrix.msc3881.enabled"])
+
+    def test_update_different_device_access_token_device_id(self) -> None:
+        """Tests that if we create a pusher from one device, the update it from another
+        device, the access token and device ID associated with the pusher stays the
+        same.
+        """
+        # Create a user with a pusher.
+        user_id, access_token = self._make_user_with_pusher("user")
+
+        # Get the token ID for the current access token, since that's what we store in
+        # the pushers table. Also get the device ID from it.
+        user_tuple = self.get_success(
+            self.hs.get_datastores().main.get_user_by_access_token(access_token)
+        )
+        token_id = user_tuple.token_id
+        device_id = user_tuple.device_id
+
+        # Generate a new access token, and update the pusher with it.
+        new_token = self.login("user", "pass")
+        self._set_pusher(user_id, new_token, enabled=False)
+
+        # Get the current list of pushers for the user.
+        ret = self.get_success(
+            self.hs.get_datastores().main.get_pushers_by({"user_name": user_id})
+        )
+        pushers: List[PusherConfig] = list(ret)
+
+        # Check that we still have one pusher, and that the access token and device ID
+        # associated with it didn't change.
+        self.assertEqual(len(pushers), 1)
+        self.assertEqual(pushers[0].access_token, token_id)
+        self.assertEqual(pushers[0].device_id, device_id)
+
+    @override_config({"experimental_features": {"msc3881_enabled": True}})
+    def test_device_id(self) -> None:
+        """Tests that a pusher created with a given device ID shows that device ID in
+        GET /pushers requests.
+        """
+        self.register_user("user", "pass")
+        access_token = self.login("user", "pass")
+
+        # We create the pusher with an HTTP request rather than with
+        # _make_user_with_pusher so that we can test the device ID is correctly set when
+        # creating a pusher via an API call.
+        self.make_request(
+            method="POST",
+            path="/pushers/set",
+            content={
+                "kind": "http",
+                "app_id": "m.http",
+                "app_display_name": "HTTP Push Notifications",
+                "device_display_name": "pushy push",
+                "pushkey": "a@example.com",
+                "lang": "en",
+                "data": {"url": "http://example.com/_matrix/push/v1/notify"},
+            },
+            access_token=access_token,
+        )
+
+        # Look up the user info for the access token so we can compare the device ID.
+        lookup_result: TokenLookupResult = self.get_success(
+            self.hs.get_datastores().main.get_user_by_access_token(access_token)
+        )
+
+        # Get the user's devices and check it has the correct device ID.
+        channel = self.make_request("GET", "/pushers", access_token=access_token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(len(channel.json_body["pushers"]), 1)
+        self.assertEqual(
+            channel.json_body["pushers"][0]["org.matrix.msc3881.device_id"],
+            lookup_result.device_id,
+        )
diff --git a/tests/replication/test_module_cache_invalidation.py b/tests/replication/test_module_cache_invalidation.py
new file mode 100644
index 0000000000..b93cae67d3
--- /dev/null
+++ b/tests/replication/test_module_cache_invalidation.py
@@ -0,0 +1,79 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+import synapse
+from synapse.module_api import cached
+
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+
+logger = logging.getLogger(__name__)
+
+FIRST_VALUE = "one"
+SECOND_VALUE = "two"
+
+KEY = "mykey"
+
+
+class TestCache:
+    current_value = FIRST_VALUE
+
+    @cached()
+    async def cached_function(self, user_id: str) -> str:
+        return self.current_value
+
+
+class ModuleCacheInvalidationTestCase(BaseMultiWorkerStreamTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+    ]
+
+    def test_module_cache_full_invalidation(self):
+        main_cache = TestCache()
+        self.hs.get_module_api().register_cached_function(main_cache.cached_function)
+
+        worker_hs = self.make_worker_hs("synapse.app.generic_worker")
+
+        worker_cache = TestCache()
+        worker_hs.get_module_api().register_cached_function(
+            worker_cache.cached_function
+        )
+
+        self.assertEqual(FIRST_VALUE, self.get_success(main_cache.cached_function(KEY)))
+        self.assertEqual(
+            FIRST_VALUE, self.get_success(worker_cache.cached_function(KEY))
+        )
+
+        main_cache.current_value = SECOND_VALUE
+        worker_cache.current_value = SECOND_VALUE
+        # No invalidation yet, should return the cached value on both the main process and the worker
+        self.assertEqual(FIRST_VALUE, self.get_success(main_cache.cached_function(KEY)))
+        self.assertEqual(
+            FIRST_VALUE, self.get_success(worker_cache.cached_function(KEY))
+        )
+
+        # Full invalidation on the main process, should be replicated on the worker that
+        # should returned the updated value too
+        self.get_success(
+            self.hs.get_module_api().invalidate_cache(
+                main_cache.cached_function, (KEY,)
+            )
+        )
+
+        self.assertEqual(
+            SECOND_VALUE, self.get_success(main_cache.cached_function(KEY))
+        )
+        self.assertEqual(
+            SECOND_VALUE, self.get_success(worker_cache.cached_function(KEY))
+        )
diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index 8f4f6688ce..59fea93e49 100644
--- a/tests/replication/test_pusher_shard.py
+++ b/tests/replication/test_pusher_shard.py
@@ -55,7 +55,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         token_id = user_dict.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=user_id,
                 access_token=token_id,
                 kind="http",
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 9f536ceeb3..1847e6ad6b 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -2839,7 +2839,7 @@ class PushersRestTestCase(unittest.HomeserverTestCase):
         token_id = user_tuple.token_id
 
         self.get_success(
-            self.hs.get_pusherpool().add_pusher(
+            self.hs.get_pusherpool().add_or_update_pusher(
                 user_id=self.other_user,
                 access_token=token_id,
                 kind="http",
diff --git a/tests/rest/client/test_login_token_request.py b/tests/rest/client/test_login_token_request.py
new file mode 100644
index 0000000000..d5bb16c98d
--- /dev/null
+++ b/tests/rest/client/test_login_token_request.py
@@ -0,0 +1,132 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.rest import admin
+from synapse.rest.client import login, login_token_request
+from synapse.server import HomeServer
+from synapse.util import Clock
+
+from tests import unittest
+from tests.unittest import override_config
+
+
+class LoginTokenRequestServletTestCase(unittest.HomeserverTestCase):
+
+    servlets = [
+        login.register_servlets,
+        admin.register_servlets,
+        login_token_request.register_servlets,
+    ]
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        self.hs = self.setup_test_homeserver()
+        self.hs.config.registration.enable_registration = True
+        self.hs.config.registration.registrations_require_3pid = []
+        self.hs.config.registration.auto_join_rooms = []
+        self.hs.config.captcha.enable_registration_captcha = False
+
+        return self.hs
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.user = "user123"
+        self.password = "password"
+
+    def test_disabled(self) -> None:
+        channel = self.make_request("POST", "/login/token", {}, access_token=None)
+        self.assertEqual(channel.code, 400)
+
+        self.register_user(self.user, self.password)
+        token = self.login(self.user, self.password)
+
+        channel = self.make_request("POST", "/login/token", {}, access_token=token)
+        self.assertEqual(channel.code, 400)
+
+    @override_config({"experimental_features": {"msc3882_enabled": True}})
+    def test_require_auth(self) -> None:
+        channel = self.make_request("POST", "/login/token", {}, access_token=None)
+        self.assertEqual(channel.code, 401)
+
+    @override_config({"experimental_features": {"msc3882_enabled": True}})
+    def test_uia_on(self) -> None:
+        user_id = self.register_user(self.user, self.password)
+        token = self.login(self.user, self.password)
+
+        channel = self.make_request("POST", "/login/token", {}, access_token=token)
+        self.assertEqual(channel.code, 401)
+        self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
+
+        session = channel.json_body["session"]
+
+        uia = {
+            "auth": {
+                "type": "m.login.password",
+                "identifier": {"type": "m.id.user", "user": self.user},
+                "password": self.password,
+                "session": session,
+            },
+        }
+
+        channel = self.make_request("POST", "/login/token", uia, access_token=token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(channel.json_body["expires_in"], 300)
+
+        login_token = channel.json_body["login_token"]
+
+        channel = self.make_request(
+            "POST",
+            "/login",
+            content={"type": "m.login.token", "token": login_token},
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+        self.assertEqual(channel.json_body["user_id"], user_id)
+
+    @override_config(
+        {"experimental_features": {"msc3882_enabled": True, "msc3882_ui_auth": False}}
+    )
+    def test_uia_off(self) -> None:
+        user_id = self.register_user(self.user, self.password)
+        token = self.login(self.user, self.password)
+
+        channel = self.make_request("POST", "/login/token", {}, access_token=token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(channel.json_body["expires_in"], 300)
+
+        login_token = channel.json_body["login_token"]
+
+        channel = self.make_request(
+            "POST",
+            "/login",
+            content={"type": "m.login.token", "token": login_token},
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+        self.assertEqual(channel.json_body["user_id"], user_id)
+
+    @override_config(
+        {
+            "experimental_features": {
+                "msc3882_enabled": True,
+                "msc3882_ui_auth": False,
+                "msc3882_token_timeout": "15s",
+            }
+        }
+    )
+    def test_expires_in(self) -> None:
+        self.register_user(self.user, self.password)
+        token = self.login(self.user, self.password)
+
+        channel = self.make_request("POST", "/login/token", {}, access_token=token)
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(channel.json_body["expires_in"], 15)
diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py
index 651f4f415d..d33e34d829 100644
--- a/tests/rest/client/test_relations.py
+++ b/tests/rest/client/test_relations.py
@@ -788,6 +788,7 @@ class RelationPaginationTestCase(BaseRelationsTestCase):
             channel.json_body["chunk"][0],
         )
 
+    @unittest.override_config({"experimental_features": {"msc3715_enabled": True}})
     def test_repeated_paginate_relations(self) -> None:
         """Test that if we paginate using a limit and tokens then we get the
         expected events.
@@ -809,7 +810,7 @@ class RelationPaginationTestCase(BaseRelationsTestCase):
 
             channel = self.make_request(
                 "GET",
-                f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}?limit=1{from_token}",
+                f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}?limit=3{from_token}",
                 access_token=self.user_token,
             )
             self.assertEqual(200, channel.code, channel.json_body)
@@ -827,6 +828,32 @@ class RelationPaginationTestCase(BaseRelationsTestCase):
         found_event_ids.reverse()
         self.assertEqual(found_event_ids, expected_event_ids)
 
+        # Test forward pagination.
+        prev_token = ""
+        found_event_ids = []
+        for _ in range(20):
+            from_token = ""
+            if prev_token:
+                from_token = "&from=" + prev_token
+
+            channel = self.make_request(
+                "GET",
+                f"/_matrix/client/v1/rooms/{self.room}/relations/{self.parent_id}?org.matrix.msc3715.dir=f&limit=3{from_token}",
+                access_token=self.user_token,
+            )
+            self.assertEqual(200, channel.code, channel.json_body)
+
+            found_event_ids.extend(e["event_id"] for e in channel.json_body["chunk"])
+            next_batch = channel.json_body.get("next_batch")
+
+            self.assertNotEqual(prev_token, next_batch)
+            prev_token = next_batch
+
+            if not prev_token:
+                break
+
+        self.assertEqual(found_event_ids, expected_event_ids)
+
     def test_pagination_from_sync_and_messages(self) -> None:
         """Pagination tokens from /sync and /messages can be used to paginate /relations."""
         channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "A")
diff --git a/tests/unittest.py b/tests/unittest.py
index 975b0a23a7..00cb023198 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -300,47 +300,31 @@ class HomeserverTestCase(TestCase):
         if hasattr(self, "user_id"):
             if self.hijack_auth:
                 assert self.helper.auth_user_id is not None
+                token = "some_fake_token"
 
                 # We need a valid token ID to satisfy foreign key constraints.
                 token_id = self.get_success(
                     self.hs.get_datastores().main.add_access_token_to_user(
                         self.helper.auth_user_id,
-                        "some_fake_token",
+                        token,
                         None,
                         None,
                     )
                 )
 
-                async def get_user_by_access_token(
-                    token: Optional[str] = None, allow_guest: bool = False
-                ) -> JsonDict:
-                    assert self.helper.auth_user_id is not None
-                    return {
-                        "user": UserID.from_string(self.helper.auth_user_id),
-                        "token_id": token_id,
-                        "is_guest": False,
-                    }
-
-                async def get_user_by_req(
-                    request: SynapseRequest,
-                    allow_guest: bool = False,
-                    allow_expired: bool = False,
-                ) -> Requester:
+                # This has to be a function and not just a Mock, because
+                # `self.helper.auth_user_id` is temporarily reassigned in some tests
+                async def get_requester(*args, **kwargs) -> Requester:
                     assert self.helper.auth_user_id is not None
                     return create_requester(
-                        UserID.from_string(self.helper.auth_user_id),
-                        token_id,
-                        False,
-                        False,
-                        None,
+                        user_id=UserID.from_string(self.helper.auth_user_id),
+                        access_token_id=token_id,
                     )
 
                 # Type ignore: mypy doesn't like us assigning to methods.
-                self.hs.get_auth().get_user_by_req = get_user_by_req  # type: ignore[assignment]
-                self.hs.get_auth().get_user_by_access_token = get_user_by_access_token  # type: ignore[assignment]
-                self.hs.get_auth().get_access_token_from_request = Mock(  # type: ignore[assignment]
-                    return_value="1234"
-                )
+                self.hs.get_auth().get_user_by_req = get_requester  # type: ignore[assignment]
+                self.hs.get_auth().get_user_by_access_token = get_requester  # type: ignore[assignment]
+                self.hs.get_auth().get_access_token_from_request = Mock(return_value=token)  # type: ignore[assignment]
 
         if self.needs_threadpool:
             self.reactor.threadpool = ThreadPool()  # type: ignore[assignment]