summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/export_signing_key.py103
-rwxr-xr-xsynapse/_scripts/generate_config.py83
-rwxr-xr-xsynapse/_scripts/generate_log_config.py49
-rwxr-xr-xsynapse/_scripts/generate_signing_key.py41
-rwxr-xr-xsynapse/_scripts/hash_password.py83
-rwxr-xr-xsynapse/_scripts/move_remote_media_to_new_store.py118
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py1257
-rwxr-xr-xsynapse/_scripts/synctl.py360
-rwxr-xr-xsynapse/_scripts/update_synapse_database.py117
-rw-r--r--synapse/appservice/__init__.py133
-rw-r--r--synapse/appservice/api.py24
-rw-r--r--synapse/config/_base.py2
-rw-r--r--synapse/events/utils.py81
-rw-r--r--synapse/federation/federation_client.py8
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/federation/sender/__init__.py26
-rw-r--r--synapse/federation/sender/per_destination_queue.py10
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/directory.py6
-rw-r--r--synapse/handlers/events.py3
-rw-r--r--synapse/handlers/initial_sync.py9
-rw-r--r--synapse/handlers/message.py3
-rw-r--r--synapse/handlers/pagination.py7
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/room_member.py2
-rw-r--r--synapse/handlers/room_summary.py2
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/replication/tcp/client.py2
-rw-r--r--synapse/rest/client/notifications.py9
-rw-r--r--synapse/rest/client/relations.py55
-rw-r--r--synapse/rest/client/sync.py132
-rw-r--r--synapse/state/__init__.py6
-rw-r--r--synapse/storage/databases/main/deviceinbox.py3
-rw-r--r--synapse/storage/relations.py31
-rw-r--r--synapse/storage/state.py8
-rw-r--r--synapse/util/async_helpers.py14
-rw-r--r--synapse/visibility.py17
38 files changed, 2525 insertions, 293 deletions
diff --git a/synapse/_scripts/export_signing_key.py b/synapse/_scripts/export_signing_key.py
new file mode 100755
index 0000000000..3d254348f1
--- /dev/null
+++ b/synapse/_scripts/export_signing_key.py
@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import argparse
+import sys
+import time
+from typing import Optional
+
+import nacl.signing
+from signedjson.key import encode_verify_key_base64, get_verify_key, read_signing_keys
+
+
+def exit(status: int = 0, message: Optional[str] = None):
+    if message:
+        print(message, file=sys.stderr)
+    sys.exit(status)
+
+
+def format_plain(public_key: nacl.signing.VerifyKey):
+    print(
+        "%s:%s %s"
+        % (
+            public_key.alg,
+            public_key.version,
+            encode_verify_key_base64(public_key),
+        )
+    )
+
+
+def format_for_config(public_key: nacl.signing.VerifyKey, expiry_ts: int):
+    print(
+        '  "%s:%s": { key: "%s", expired_ts: %i }'
+        % (
+            public_key.alg,
+            public_key.version,
+            encode_verify_key_base64(public_key),
+            expiry_ts,
+        )
+    )
+
+
+def main():
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "key_file",
+        nargs="+",
+        type=argparse.FileType("r"),
+        help="The key file to read",
+    )
+
+    parser.add_argument(
+        "-x",
+        action="store_true",
+        dest="for_config",
+        help="format the output for inclusion in the old_signing_keys config setting",
+    )
+
+    parser.add_argument(
+        "--expiry-ts",
+        type=int,
+        default=int(time.time() * 1000) + 6 * 3600000,
+        help=(
+            "The expiry time to use for -x, in milliseconds since 1970. The default "
+            "is (now+6h)."
+        ),
+    )
+
+    args = parser.parse_args()
+
+    formatter = (
+        (lambda k: format_for_config(k, args.expiry_ts))
+        if args.for_config
+        else format_plain
+    )
+
+    for file in args.key_file:
+        try:
+            res = read_signing_keys(file)
+        except Exception as e:
+            exit(
+                status=1,
+                message="Error reading key from file %s: %s %s"
+                % (file.name, type(e), e),
+            )
+            res = []
+        for key in res:
+            formatter(get_verify_key(key))
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/generate_config.py b/synapse/_scripts/generate_config.py
new file mode 100755
index 0000000000..75fce20b12
--- /dev/null
+++ b/synapse/_scripts/generate_config.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python3
+
+import argparse
+import shutil
+import sys
+
+from synapse.config.homeserver import HomeServerConfig
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument(
+        "--config-dir",
+        default="CONFDIR",
+        help="The path where the config files are kept. Used to create filenames for "
+        "things like the log config and the signing key. Default: %(default)s",
+    )
+
+    parser.add_argument(
+        "--data-dir",
+        default="DATADIR",
+        help="The path where the data files are kept. Used to create filenames for "
+        "things like the database and media store. Default: %(default)s",
+    )
+
+    parser.add_argument(
+        "--server-name",
+        default="SERVERNAME",
+        help="The server name. Used to initialise the server_name config param, but also "
+        "used in the names of some of the config files. Default: %(default)s",
+    )
+
+    parser.add_argument(
+        "--report-stats",
+        action="store",
+        help="Whether the generated config reports anonymized usage statistics",
+        choices=["yes", "no"],
+    )
+
+    parser.add_argument(
+        "--generate-secrets",
+        action="store_true",
+        help="Enable generation of new secrets for things like the macaroon_secret_key."
+        "By default, these parameters will be left unset.",
+    )
+
+    parser.add_argument(
+        "-o",
+        "--output-file",
+        type=argparse.FileType("w"),
+        default=sys.stdout,
+        help="File to write the configuration to. Default: stdout",
+    )
+
+    parser.add_argument(
+        "--header-file",
+        type=argparse.FileType("r"),
+        help="File from which to read a header, which will be printed before the "
+        "generated config.",
+    )
+
+    args = parser.parse_args()
+
+    report_stats = args.report_stats
+    if report_stats is not None:
+        report_stats = report_stats == "yes"
+
+    conf = HomeServerConfig().generate_config(
+        config_dir_path=args.config_dir,
+        data_dir_path=args.data_dir,
+        server_name=args.server_name,
+        generate_secrets=args.generate_secrets,
+        report_stats=report_stats,
+    )
+
+    if args.header_file:
+        shutil.copyfileobj(args.header_file, args.output_file)
+
+    args.output_file.write(conf)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/generate_log_config.py b/synapse/_scripts/generate_log_config.py
new file mode 100755
index 0000000000..82fc763140
--- /dev/null
+++ b/synapse/_scripts/generate_log_config.py
@@ -0,0 +1,49 @@
+#!/usr/bin/env python3
+
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import sys
+
+from synapse.config.logger import DEFAULT_LOG_CONFIG
+
+
+def main():
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "-o",
+        "--output-file",
+        type=argparse.FileType("w"),
+        default=sys.stdout,
+        help="File to write the configuration to. Default: stdout",
+    )
+
+    parser.add_argument(
+        "-f",
+        "--log-file",
+        type=str,
+        default="/var/log/matrix-synapse/homeserver.log",
+        help="name of the log file",
+    )
+
+    args = parser.parse_args()
+    out = args.output_file
+    out.write(DEFAULT_LOG_CONFIG.substitute(log_file=args.log_file))
+    out.flush()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/generate_signing_key.py b/synapse/_scripts/generate_signing_key.py
new file mode 100755
index 0000000000..bc26d25bfd
--- /dev/null
+++ b/synapse/_scripts/generate_signing_key.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import argparse
+import sys
+
+from signedjson.key import generate_signing_key, write_signing_keys
+
+from synapse.util.stringutils import random_string
+
+
+def main():
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "-o",
+        "--output_file",
+        type=argparse.FileType("w"),
+        default=sys.stdout,
+        help="Where to write the output to",
+    )
+    args = parser.parse_args()
+
+    key_id = "a_" + random_string(4)
+    key = (generate_signing_key(key_id),)
+    write_signing_keys(args.output_file, key)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/hash_password.py b/synapse/_scripts/hash_password.py
new file mode 100755
index 0000000000..708640c7de
--- /dev/null
+++ b/synapse/_scripts/hash_password.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+
+import argparse
+import getpass
+import sys
+import unicodedata
+
+import bcrypt
+import yaml
+
+
+def prompt_for_pass():
+    password = getpass.getpass("Password: ")
+
+    if not password:
+        raise Exception("Password cannot be blank.")
+
+    confirm_password = getpass.getpass("Confirm password: ")
+
+    if password != confirm_password:
+        raise Exception("Passwords do not match.")
+
+    return password
+
+
+def main():
+    bcrypt_rounds = 12
+    password_pepper = ""
+
+    parser = argparse.ArgumentParser(
+        description=(
+            "Calculate the hash of a new password, so that passwords can be reset"
+        )
+    )
+    parser.add_argument(
+        "-p",
+        "--password",
+        default=None,
+        help="New password for user. Will prompt if omitted.",
+    )
+    parser.add_argument(
+        "-c",
+        "--config",
+        type=argparse.FileType("r"),
+        help=(
+            "Path to server config file. "
+            "Used to read in bcrypt_rounds and password_pepper."
+        ),
+    )
+
+    args = parser.parse_args()
+    if "config" in args and args.config:
+        config = yaml.safe_load(args.config)
+        bcrypt_rounds = config.get("bcrypt_rounds", bcrypt_rounds)
+        password_config = config.get("password_config", None) or {}
+        password_pepper = password_config.get("pepper", password_pepper)
+    password = args.password
+
+    if not password:
+        password = prompt_for_pass()
+
+    # On Python 2, make sure we decode it to Unicode before we normalise it
+    if isinstance(password, bytes):
+        try:
+            password = password.decode(sys.stdin.encoding)
+        except UnicodeDecodeError:
+            print(
+                "ERROR! Your password is not decodable using your terminal encoding (%s)."
+                % (sys.stdin.encoding,)
+            )
+
+    pw = unicodedata.normalize("NFKC", password)
+
+    hashed = bcrypt.hashpw(
+        pw.encode("utf8") + password_pepper.encode("utf8"),
+        bcrypt.gensalt(bcrypt_rounds),
+    ).decode("ascii")
+
+    print(hashed)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/move_remote_media_to_new_store.py b/synapse/_scripts/move_remote_media_to_new_store.py
new file mode 100755
index 0000000000..9667d95dfe
--- /dev/null
+++ b/synapse/_scripts/move_remote_media_to_new_store.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Moves a list of remote media from one media store to another.
+
+The input should be a list of media files to be moved, one per line. Each line
+should be formatted::
+
+    <origin server>|<file id>
+
+This can be extracted from postgres with::
+
+    psql --tuples-only -A -c "select media_origin, filesystem_id from
+        matrix.remote_media_cache where ..."
+
+To use, pipe the above into::
+
+    PYTHON_PATH=. synapse/_scripts/move_remote_media_to_new_store.py <source repo> <dest repo>
+"""
+
+import argparse
+import logging
+import os
+import shutil
+import sys
+
+from synapse.rest.media.v1.filepath import MediaFilePaths
+
+logger = logging.getLogger()
+
+
+def main(src_repo, dest_repo):
+    src_paths = MediaFilePaths(src_repo)
+    dest_paths = MediaFilePaths(dest_repo)
+    for line in sys.stdin:
+        line = line.strip()
+        parts = line.split("|")
+        if len(parts) != 2:
+            print("Unable to parse input line %s" % line, file=sys.stderr)
+            sys.exit(1)
+
+        move_media(parts[0], parts[1], src_paths, dest_paths)
+
+
+def move_media(origin_server, file_id, src_paths, dest_paths):
+    """Move the given file, and any thumbnails, to the dest repo
+
+    Args:
+        origin_server (str):
+        file_id (str):
+        src_paths (MediaFilePaths):
+        dest_paths (MediaFilePaths):
+    """
+    logger.info("%s/%s", origin_server, file_id)
+
+    # check that the original exists
+    original_file = src_paths.remote_media_filepath(origin_server, file_id)
+    if not os.path.exists(original_file):
+        logger.warning(
+            "Original for %s/%s (%s) does not exist",
+            origin_server,
+            file_id,
+            original_file,
+        )
+    else:
+        mkdir_and_move(
+            original_file, dest_paths.remote_media_filepath(origin_server, file_id)
+        )
+
+    # now look for thumbnails
+    original_thumb_dir = src_paths.remote_media_thumbnail_dir(origin_server, file_id)
+    if not os.path.exists(original_thumb_dir):
+        return
+
+    mkdir_and_move(
+        original_thumb_dir,
+        dest_paths.remote_media_thumbnail_dir(origin_server, file_id),
+    )
+
+
+def mkdir_and_move(original_file, dest_file):
+    dirname = os.path.dirname(dest_file)
+    if not os.path.exists(dirname):
+        logger.debug("mkdir %s", dirname)
+        os.makedirs(dirname)
+    logger.debug("mv %s %s", original_file, dest_file)
+    shutil.move(original_file, dest_file)
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(
+        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
+    )
+    parser.add_argument("-v", action="store_true", help="enable debug logging")
+    parser.add_argument("src_repo", help="Path to source content repo")
+    parser.add_argument("dest_repo", help="Path to source content repo")
+    args = parser.parse_args()
+
+    logging_config = {
+        "level": logging.DEBUG if args.v else logging.INFO,
+        "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
+    }
+    logging.basicConfig(**logging_config)
+
+    main(args.src_repo, args.dest_repo)
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
new file mode 100755
index 0000000000..c38666da18
--- /dev/null
+++ b/synapse/_scripts/synapse_port_db.py
@@ -0,0 +1,1257 @@
+#!/usr/bin/env python
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import curses
+import logging
+import sys
+import time
+import traceback
+from typing import Dict, Iterable, Optional, Set
+
+import yaml
+from matrix_common.versionstring import get_distribution_version_string
+
+from twisted.internet import defer, reactor
+
+from synapse.config.database import DatabaseConnectionConfig
+from synapse.config.homeserver import HomeServerConfig
+from synapse.logging.context import (
+    LoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
+)
+from synapse.storage.database import DatabasePool, make_conn
+from synapse.storage.databases.main import PushRuleStore
+from synapse.storage.databases.main.account_data import AccountDataWorkerStore
+from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
+from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
+from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
+from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackgroundStore
+from synapse.storage.databases.main.events_bg_updates import (
+    EventsBackgroundUpdatesStore,
+)
+from synapse.storage.databases.main.group_server import GroupServerWorkerStore
+from synapse.storage.databases.main.media_repository import (
+    MediaRepositoryBackgroundUpdateStore,
+)
+from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
+from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.registration import (
+    RegistrationBackgroundUpdateStore,
+    find_max_generated_user_id_localpart,
+)
+from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
+from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
+from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
+from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.databases.main.user_directory import (
+    UserDirectoryBackgroundUpdateStore,
+)
+from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
+from synapse.storage.engines import create_engine
+from synapse.storage.prepare_database import prepare_database
+from synapse.util import Clock
+
+logger = logging.getLogger("synapse_port_db")
+
+
+BOOLEAN_COLUMNS = {
+    "events": ["processed", "outlier", "contains_url"],
+    "rooms": ["is_public", "has_auth_chain_index"],
+    "event_edges": ["is_state"],
+    "presence_list": ["accepted"],
+    "presence_stream": ["currently_active"],
+    "public_room_list_stream": ["visibility"],
+    "devices": ["hidden"],
+    "device_lists_outbound_pokes": ["sent"],
+    "users_who_share_rooms": ["share_private"],
+    "groups": ["is_public"],
+    "group_rooms": ["is_public"],
+    "group_users": ["is_public", "is_admin"],
+    "group_summary_rooms": ["is_public"],
+    "group_room_categories": ["is_public"],
+    "group_summary_users": ["is_public"],
+    "group_roles": ["is_public"],
+    "local_group_membership": ["is_publicised", "is_admin"],
+    "e2e_room_keys": ["is_verified"],
+    "account_validity": ["email_sent"],
+    "redactions": ["have_censored"],
+    "room_stats_state": ["is_federatable"],
+    "local_media_repository": ["safe_from_quarantine"],
+    "users": ["shadow_banned"],
+    "e2e_fallback_keys_json": ["used"],
+    "access_tokens": ["used"],
+}
+
+
+APPEND_ONLY_TABLES = [
+    "event_reference_hashes",
+    "events",
+    "event_json",
+    "state_events",
+    "room_memberships",
+    "topics",
+    "room_names",
+    "rooms",
+    "local_media_repository",
+    "local_media_repository_thumbnails",
+    "remote_media_cache",
+    "remote_media_cache_thumbnails",
+    "redactions",
+    "event_edges",
+    "event_auth",
+    "received_transactions",
+    "sent_transactions",
+    "transaction_id_to_pdu",
+    "users",
+    "state_groups",
+    "state_groups_state",
+    "event_to_state_groups",
+    "rejections",
+    "event_search",
+    "presence_stream",
+    "push_rules_stream",
+    "ex_outlier_stream",
+    "cache_invalidation_stream_by_instance",
+    "public_room_list_stream",
+    "state_group_edges",
+    "stream_ordering_to_exterm",
+]
+
+
+IGNORED_TABLES = {
+    # We don't port these tables, as they're a faff and we can regenerate
+    # them anyway.
+    "user_directory",
+    "user_directory_search",
+    "user_directory_search_content",
+    "user_directory_search_docsize",
+    "user_directory_search_segdir",
+    "user_directory_search_segments",
+    "user_directory_search_stat",
+    "user_directory_search_pos",
+    "users_who_share_private_rooms",
+    "users_in_public_room",
+    # UI auth sessions have foreign keys so additional care needs to be taken,
+    # the sessions are transient anyway, so ignore them.
+    "ui_auth_sessions",
+    "ui_auth_sessions_credentials",
+    "ui_auth_sessions_ips",
+}
+
+
+# Error returned by the run function. Used at the top-level part of the script to
+# handle errors and return codes.
+end_error = None  # type: Optional[str]
+# The exec_info for the error, if any. If error is defined but not exec_info the script
+# will show only the error message without the stacktrace, if exec_info is defined but
+# not the error then the script will show nothing outside of what's printed in the run
+# function. If both are defined, the script will print both the error and the stacktrace.
+end_error_exec_info = None
+
+
+class Store(
+    ClientIpBackgroundUpdateStore,
+    DeviceInboxBackgroundUpdateStore,
+    DeviceBackgroundUpdateStore,
+    EventsBackgroundUpdatesStore,
+    MediaRepositoryBackgroundUpdateStore,
+    RegistrationBackgroundUpdateStore,
+    RoomBackgroundUpdateStore,
+    RoomMemberBackgroundUpdateStore,
+    SearchBackgroundUpdateStore,
+    StateBackgroundUpdateStore,
+    MainStateBackgroundUpdateStore,
+    UserDirectoryBackgroundUpdateStore,
+    EndToEndKeyBackgroundStore,
+    StatsStore,
+    AccountDataWorkerStore,
+    PushRuleStore,
+    PusherWorkerStore,
+    PresenceBackgroundUpdateStore,
+    GroupServerWorkerStore,
+):
+    def execute(self, f, *args, **kwargs):
+        return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
+
+    def execute_sql(self, sql, *args):
+        def r(txn):
+            txn.execute(sql, args)
+            return txn.fetchall()
+
+        return self.db_pool.runInteraction("execute_sql", r)
+
+    def insert_many_txn(self, txn, table, headers, rows):
+        sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+            table,
+            ", ".join(k for k in headers),
+            ", ".join("%s" for _ in headers),
+        )
+
+        try:
+            txn.executemany(sql, rows)
+        except Exception:
+            logger.exception("Failed to insert: %s", table)
+            raise
+
+    def set_room_is_public(self, room_id, is_public):
+        raise Exception(
+            "Attempt to set room_is_public during port_db: database not empty?"
+        )
+
+
+class MockHomeserver:
+    def __init__(self, config):
+        self.clock = Clock(reactor)
+        self.config = config
+        self.hostname = config.server.server_name
+        self.version_string = "Synapse/" + get_distribution_version_string(
+            "matrix-synapse"
+        )
+
+    def get_clock(self):
+        return self.clock
+
+    def get_reactor(self):
+        return reactor
+
+    def get_instance_name(self):
+        return "master"
+
+
+class Porter(object):
+    def __init__(self, **kwargs):
+        self.__dict__.update(kwargs)
+
+    async def setup_table(self, table):
+        if table in APPEND_ONLY_TABLES:
+            # It's safe to just carry on inserting.
+            row = await self.postgres_store.db_pool.simple_select_one(
+                table="port_from_sqlite3",
+                keyvalues={"table_name": table},
+                retcols=("forward_rowid", "backward_rowid"),
+                allow_none=True,
+            )
+
+            total_to_port = None
+            if row is None:
+                if table == "sent_transactions":
+                    (
+                        forward_chunk,
+                        already_ported,
+                        total_to_port,
+                    ) = await self._setup_sent_transactions()
+                    backward_chunk = 0
+                else:
+                    await self.postgres_store.db_pool.simple_insert(
+                        table="port_from_sqlite3",
+                        values={
+                            "table_name": table,
+                            "forward_rowid": 1,
+                            "backward_rowid": 0,
+                        },
+                    )
+
+                    forward_chunk = 1
+                    backward_chunk = 0
+                    already_ported = 0
+            else:
+                forward_chunk = row["forward_rowid"]
+                backward_chunk = row["backward_rowid"]
+
+            if total_to_port is None:
+                already_ported, total_to_port = await self._get_total_count_to_port(
+                    table, forward_chunk, backward_chunk
+                )
+        else:
+
+            def delete_all(txn):
+                txn.execute(
+                    "DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,)
+                )
+                txn.execute("TRUNCATE %s CASCADE" % (table,))
+
+            await self.postgres_store.execute(delete_all)
+
+            await self.postgres_store.db_pool.simple_insert(
+                table="port_from_sqlite3",
+                values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
+            )
+
+            forward_chunk = 1
+            backward_chunk = 0
+
+            already_ported, total_to_port = await self._get_total_count_to_port(
+                table, forward_chunk, backward_chunk
+            )
+
+        return table, already_ported, total_to_port, forward_chunk, backward_chunk
+
+    async def get_table_constraints(self) -> Dict[str, Set[str]]:
+        """Returns a map of tables that have foreign key constraints to tables they depend on."""
+
+        def _get_constraints(txn):
+            # We can pull the information about foreign key constraints out from
+            # the postgres schema tables.
+            sql = """
+                SELECT DISTINCT
+                    tc.table_name,
+                    ccu.table_name AS foreign_table_name
+                FROM
+                    information_schema.table_constraints AS tc
+                    INNER JOIN information_schema.constraint_column_usage AS ccu
+                    USING (table_schema, constraint_name)
+                WHERE tc.constraint_type = 'FOREIGN KEY'
+                  AND tc.table_name != ccu.table_name;
+            """
+            txn.execute(sql)
+
+            results = {}
+            for table, foreign_table in txn:
+                results.setdefault(table, set()).add(foreign_table)
+            return results
+
+        return await self.postgres_store.db_pool.runInteraction(
+            "get_table_constraints", _get_constraints
+        )
+
+    async def handle_table(
+        self, table, postgres_size, table_size, forward_chunk, backward_chunk
+    ):
+        logger.info(
+            "Table %s: %i/%i (rows %i-%i) already ported",
+            table,
+            postgres_size,
+            table_size,
+            backward_chunk + 1,
+            forward_chunk - 1,
+        )
+
+        if not table_size:
+            return
+
+        self.progress.add_table(table, postgres_size, table_size)
+
+        if table == "event_search":
+            await self.handle_search_table(
+                postgres_size, table_size, forward_chunk, backward_chunk
+            )
+            return
+
+        if table in IGNORED_TABLES:
+            self.progress.update(table, table_size)  # Mark table as done
+            return
+
+        if table == "user_directory_stream_pos":
+            # We need to make sure there is a single row, `(X, null), as that is
+            # what synapse expects to be there.
+            await self.postgres_store.db_pool.simple_insert(
+                table=table, values={"stream_id": None}
+            )
+            self.progress.update(table, table_size)  # Mark table as done
+            return
+
+        forward_select = (
+            "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,)
+        )
+
+        backward_select = (
+            "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" % (table,)
+        )
+
+        do_forward = [True]
+        do_backward = [True]
+
+        while True:
+
+            def r(txn):
+                forward_rows = []
+                backward_rows = []
+                if do_forward[0]:
+                    txn.execute(forward_select, (forward_chunk, self.batch_size))
+                    forward_rows = txn.fetchall()
+                    if not forward_rows:
+                        do_forward[0] = False
+
+                if do_backward[0]:
+                    txn.execute(backward_select, (backward_chunk, self.batch_size))
+                    backward_rows = txn.fetchall()
+                    if not backward_rows:
+                        do_backward[0] = False
+
+                if forward_rows or backward_rows:
+                    headers = [column[0] for column in txn.description]
+                else:
+                    headers = None
+
+                return headers, forward_rows, backward_rows
+
+            headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
+                "select", r
+            )
+
+            if frows or brows:
+                if frows:
+                    forward_chunk = max(row[0] for row in frows) + 1
+                if brows:
+                    backward_chunk = min(row[0] for row in brows) - 1
+
+                rows = frows + brows
+                rows = self._convert_rows(table, headers, rows)
+
+                def insert(txn):
+                    self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
+
+                    self.postgres_store.db_pool.simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": table},
+                        updatevalues={
+                            "forward_rowid": forward_chunk,
+                            "backward_rowid": backward_chunk,
+                        },
+                    )
+
+                await self.postgres_store.execute(insert)
+
+                postgres_size += len(rows)
+
+                self.progress.update(table, postgres_size)
+            else:
+                return
+
+    async def handle_search_table(
+        self, postgres_size, table_size, forward_chunk, backward_chunk
+    ):
+        select = (
+            "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
+            " FROM event_search as es"
+            " INNER JOIN events AS e USING (event_id, room_id)"
+            " WHERE es.rowid >= ?"
+            " ORDER BY es.rowid LIMIT ?"
+        )
+
+        while True:
+
+            def r(txn):
+                txn.execute(select, (forward_chunk, self.batch_size))
+                rows = txn.fetchall()
+                headers = [column[0] for column in txn.description]
+
+                return headers, rows
+
+            headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
+
+            if rows:
+                forward_chunk = rows[-1][0] + 1
+
+                # We have to treat event_search differently since it has a
+                # different structure in the two different databases.
+                def insert(txn):
+                    sql = (
+                        "INSERT INTO event_search (event_id, room_id, key,"
+                        " sender, vector, origin_server_ts, stream_ordering)"
+                        " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
+                    )
+
+                    rows_dict = []
+                    for row in rows:
+                        d = dict(zip(headers, row))
+                        if "\0" in d["value"]:
+                            logger.warning("dropping search row %s", d)
+                        else:
+                            rows_dict.append(d)
+
+                    txn.executemany(
+                        sql,
+                        [
+                            (
+                                row["event_id"],
+                                row["room_id"],
+                                row["key"],
+                                row["sender"],
+                                row["value"],
+                                row["origin_server_ts"],
+                                row["stream_ordering"],
+                            )
+                            for row in rows_dict
+                        ],
+                    )
+
+                    self.postgres_store.db_pool.simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": "event_search"},
+                        updatevalues={
+                            "forward_rowid": forward_chunk,
+                            "backward_rowid": backward_chunk,
+                        },
+                    )
+
+                await self.postgres_store.execute(insert)
+
+                postgres_size += len(rows)
+
+                self.progress.update("event_search", postgres_size)
+
+            else:
+                return
+
+    def build_db_store(
+        self,
+        db_config: DatabaseConnectionConfig,
+        allow_outdated_version: bool = False,
+    ):
+        """Builds and returns a database store using the provided configuration.
+
+        Args:
+            db_config: The database configuration
+            allow_outdated_version: True to suppress errors about the database server
+                version being too old to run a complete synapse
+
+        Returns:
+            The built Store object.
+        """
+        self.progress.set_state("Preparing %s" % db_config.config["name"])
+
+        engine = create_engine(db_config.config)
+
+        hs = MockHomeserver(self.hs_config)
+
+        with make_conn(db_config, engine, "portdb") as db_conn:
+            engine.check_database(
+                db_conn, allow_outdated_version=allow_outdated_version
+            )
+            prepare_database(db_conn, engine, config=self.hs_config)
+            store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
+            db_conn.commit()
+
+        return store
+
+    async def run_background_updates_on_postgres(self):
+        # Manually apply all background updates on the PostgreSQL database.
+        postgres_ready = (
+            await self.postgres_store.db_pool.updates.has_completed_background_updates()
+        )
+
+        if not postgres_ready:
+            # Only say that we're running background updates when there are background
+            # updates to run.
+            self.progress.set_state("Running background updates on PostgreSQL")
+
+        while not postgres_ready:
+            await self.postgres_store.db_pool.updates.do_next_background_update(100)
+            postgres_ready = await (
+                self.postgres_store.db_pool.updates.has_completed_background_updates()
+            )
+
+    async def run(self):
+        """Ports the SQLite database to a PostgreSQL database.
+
+        When a fatal error is met, its message is assigned to the global "end_error"
+        variable. When this error comes with a stacktrace, its exec_info is assigned to
+        the global "end_error_exec_info" variable.
+        """
+        global end_error
+
+        try:
+            # we allow people to port away from outdated versions of sqlite.
+            self.sqlite_store = self.build_db_store(
+                DatabaseConnectionConfig("master-sqlite", self.sqlite_config),
+                allow_outdated_version=True,
+            )
+
+            # Check if all background updates are done, abort if not.
+            updates_complete = (
+                await self.sqlite_store.db_pool.updates.has_completed_background_updates()
+            )
+            if not updates_complete:
+                end_error = (
+                    "Pending background updates exist in the SQLite3 database."
+                    " Please start Synapse again and wait until every update has finished"
+                    " before running this script.\n"
+                )
+                return
+
+            self.postgres_store = self.build_db_store(
+                self.hs_config.database.get_single_database()
+            )
+
+            await self.run_background_updates_on_postgres()
+
+            self.progress.set_state("Creating port tables")
+
+            def create_port_table(txn):
+                txn.execute(
+                    "CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
+                    " table_name varchar(100) NOT NULL UNIQUE,"
+                    " forward_rowid bigint NOT NULL,"
+                    " backward_rowid bigint NOT NULL"
+                    ")"
+                )
+
+            # The old port script created a table with just a "rowid" column.
+            # We want people to be able to rerun this script from an old port
+            # so that they can pick up any missing events that were not
+            # ported across.
+            def alter_table(txn):
+                txn.execute(
+                    "ALTER TABLE IF EXISTS port_from_sqlite3"
+                    " RENAME rowid TO forward_rowid"
+                )
+                txn.execute(
+                    "ALTER TABLE IF EXISTS port_from_sqlite3"
+                    " ADD backward_rowid bigint NOT NULL DEFAULT 0"
+                )
+
+            try:
+                await self.postgres_store.db_pool.runInteraction(
+                    "alter_table", alter_table
+                )
+            except Exception:
+                # On Error Resume Next
+                pass
+
+            await self.postgres_store.db_pool.runInteraction(
+                "create_port_table", create_port_table
+            )
+
+            # Step 2. Set up sequences
+            #
+            # We do this before porting the tables so that event if we fail half
+            # way through the postgres DB always have sequences that are greater
+            # than their respective tables. If we don't then creating the
+            # `DataStore` object will fail due to the inconsistency.
+            self.progress.set_state("Setting up sequence generators")
+            await self._setup_state_group_id_seq()
+            await self._setup_user_id_seq()
+            await self._setup_events_stream_seqs()
+            await self._setup_sequence(
+                "device_inbox_sequence", ("device_inbox", "device_federation_outbox")
+            )
+            await self._setup_sequence(
+                "account_data_sequence",
+                ("room_account_data", "room_tags_revisions", "account_data"),
+            )
+            await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
+            await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
+            await self._setup_auth_chain_sequence()
+
+            # Step 3. Get tables.
+            self.progress.set_state("Fetching tables")
+            sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
+                table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
+            )
+
+            postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
+                table="information_schema.tables",
+                keyvalues={},
+                retcol="distinct table_name",
+            )
+
+            tables = set(sqlite_tables) & set(postgres_tables)
+            logger.info("Found %d tables", len(tables))
+
+            # Step 4. Figure out what still needs copying
+            self.progress.set_state("Checking on port progress")
+            setup_res = await make_deferred_yieldable(
+                defer.gatherResults(
+                    [
+                        run_in_background(self.setup_table, table)
+                        for table in tables
+                        if table not in ["schema_version", "applied_schema_deltas"]
+                        and not table.startswith("sqlite_")
+                    ],
+                    consumeErrors=True,
+                )
+            )
+            # Map from table name to args passed to `handle_table`, i.e. a tuple
+            # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
+            tables_to_port_info_map = {r[0]: r[1:] for r in setup_res}
+
+            # Step 5. Do the copying.
+            #
+            # This is slightly convoluted as we need to ensure tables are ported
+            # in the correct order due to foreign key constraints.
+            self.progress.set_state("Copying to postgres")
+
+            constraints = await self.get_table_constraints()
+            tables_ported = set()  # type: Set[str]
+
+            while tables_to_port_info_map:
+                # Pulls out all tables that are still to be ported and which
+                # only depend on tables that are already ported (if any).
+                tables_to_port = [
+                    table
+                    for table in tables_to_port_info_map
+                    if not constraints.get(table, set()) - tables_ported
+                ]
+
+                await make_deferred_yieldable(
+                    defer.gatherResults(
+                        [
+                            run_in_background(
+                                self.handle_table,
+                                table,
+                                *tables_to_port_info_map.pop(table),
+                            )
+                            for table in tables_to_port
+                        ],
+                        consumeErrors=True,
+                    )
+                )
+
+                tables_ported.update(tables_to_port)
+
+            self.progress.done()
+        except Exception as e:
+            global end_error_exec_info
+            end_error = str(e)
+            end_error_exec_info = sys.exc_info()
+            logger.exception("")
+        finally:
+            reactor.stop()
+
+    def _convert_rows(self, table, headers, rows):
+        bool_col_names = BOOLEAN_COLUMNS.get(table, [])
+
+        bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names]
+
+        class BadValueException(Exception):
+            pass
+
+        def conv(j, col):
+            if j in bool_cols:
+                return bool(col)
+            if isinstance(col, bytes):
+                return bytearray(col)
+            elif isinstance(col, str) and "\0" in col:
+                logger.warning(
+                    "DROPPING ROW: NUL value in table %s col %s: %r",
+                    table,
+                    headers[j],
+                    col,
+                )
+                raise BadValueException()
+            return col
+
+        outrows = []
+        for row in rows:
+            try:
+                outrows.append(
+                    tuple(conv(j, col) for j, col in enumerate(row) if j > 0)
+                )
+            except BadValueException:
+                pass
+
+        return outrows
+
+    async def _setup_sent_transactions(self):
+        # Only save things from the last day
+        yesterday = int(time.time() * 1000) - 86400000
+
+        # And save the max transaction id from each destination
+        select = (
+            "SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
+            "SELECT max(rowid) FROM sent_transactions"
+            " GROUP BY destination"
+            ")"
+        )
+
+        def r(txn):
+            txn.execute(select)
+            rows = txn.fetchall()
+            headers = [column[0] for column in txn.description]
+
+            ts_ind = headers.index("ts")
+
+            return headers, [r for r in rows if r[ts_ind] < yesterday]
+
+        headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
+
+        rows = self._convert_rows("sent_transactions", headers, rows)
+
+        inserted_rows = len(rows)
+        if inserted_rows:
+            max_inserted_rowid = max(r[0] for r in rows)
+
+            def insert(txn):
+                self.postgres_store.insert_many_txn(
+                    txn, "sent_transactions", headers[1:], rows
+                )
+
+            await self.postgres_store.execute(insert)
+        else:
+            max_inserted_rowid = 0
+
+        def get_start_id(txn):
+            txn.execute(
+                "SELECT rowid FROM sent_transactions WHERE ts >= ?"
+                " ORDER BY rowid ASC LIMIT 1",
+                (yesterday,),
+            )
+
+            rows = txn.fetchall()
+            if rows:
+                return rows[0][0]
+            else:
+                return 1
+
+        next_chunk = await self.sqlite_store.execute(get_start_id)
+        next_chunk = max(max_inserted_rowid + 1, next_chunk)
+
+        await self.postgres_store.db_pool.simple_insert(
+            table="port_from_sqlite3",
+            values={
+                "table_name": "sent_transactions",
+                "forward_rowid": next_chunk,
+                "backward_rowid": 0,
+            },
+        )
+
+        def get_sent_table_size(txn):
+            txn.execute(
+                "SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
+            )
+            (size,) = txn.fetchone()
+            return int(size)
+
+        remaining_count = await self.sqlite_store.execute(get_sent_table_size)
+
+        total_count = remaining_count + inserted_rows
+
+        return next_chunk, inserted_rows, total_count
+
+    async def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk):
+        frows = await self.sqlite_store.execute_sql(
+            "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk
+        )
+
+        brows = await self.sqlite_store.execute_sql(
+            "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk
+        )
+
+        return frows[0][0] + brows[0][0]
+
+    async def _get_already_ported_count(self, table):
+        rows = await self.postgres_store.execute_sql(
+            "SELECT count(*) FROM %s" % (table,)
+        )
+
+        return rows[0][0]
+
+    async def _get_total_count_to_port(self, table, forward_chunk, backward_chunk):
+        remaining, done = await make_deferred_yieldable(
+            defer.gatherResults(
+                [
+                    run_in_background(
+                        self._get_remaining_count_to_port,
+                        table,
+                        forward_chunk,
+                        backward_chunk,
+                    ),
+                    run_in_background(self._get_already_ported_count, table),
+                ],
+            )
+        )
+
+        remaining = int(remaining) if remaining else 0
+        done = int(done) if done else 0
+
+        return done, remaining + done
+
+    async def _setup_state_group_id_seq(self) -> None:
+        curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+            table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
+        )
+
+        if not curr_id:
+            return
+
+        def r(txn):
+            next_id = curr_id + 1
+            txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
+
+        await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
+
+    async def _setup_user_id_seq(self) -> None:
+        curr_id = await self.sqlite_store.db_pool.runInteraction(
+            "setup_user_id_seq", find_max_generated_user_id_localpart
+        )
+
+        def r(txn):
+            next_id = curr_id + 1
+            txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
+
+        await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
+
+    async def _setup_events_stream_seqs(self) -> None:
+        """Set the event stream sequences to the correct values."""
+
+        # We get called before we've ported the events table, so we need to
+        # fetch the current positions from the SQLite store.
+        curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+            table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True
+        )
+
+        curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+            table="events",
+            keyvalues={},
+            retcol="MAX(-MIN(stream_ordering), 1)",
+            allow_none=True,
+        )
+
+        def _setup_events_stream_seqs_set_pos(txn):
+            if curr_forward_id:
+                txn.execute(
+                    "ALTER SEQUENCE events_stream_seq RESTART WITH %s",
+                    (curr_forward_id + 1,),
+                )
+
+            if curr_backward_id:
+                txn.execute(
+                    "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
+                    (curr_backward_id + 1,),
+                )
+
+        await self.postgres_store.db_pool.runInteraction(
+            "_setup_events_stream_seqs",
+            _setup_events_stream_seqs_set_pos,
+        )
+
+    async def _setup_sequence(
+        self, sequence_name: str, stream_id_tables: Iterable[str]
+    ) -> None:
+        """Set a sequence to the correct value."""
+        current_stream_ids = []
+        for stream_id_table in stream_id_tables:
+            max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+                table=stream_id_table,
+                keyvalues={},
+                retcol="COALESCE(MAX(stream_id), 1)",
+                allow_none=True,
+            )
+            current_stream_ids.append(max_stream_id)
+
+        next_id = max(current_stream_ids) + 1
+
+        def r(txn):
+            sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name,)
+            txn.execute(sql + " %s", (next_id,))
+
+        await self.postgres_store.db_pool.runInteraction(
+            "_setup_%s" % (sequence_name,), r
+        )
+
+    async def _setup_auth_chain_sequence(self) -> None:
+        curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+            table="event_auth_chains",
+            keyvalues={},
+            retcol="MAX(chain_id)",
+            allow_none=True,
+        )
+
+        def r(txn):
+            txn.execute(
+                "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
+                (curr_chain_id + 1,),
+            )
+
+        if curr_chain_id is not None:
+            await self.postgres_store.db_pool.runInteraction(
+                "_setup_event_auth_chain_id",
+                r,
+            )
+
+
+##############################################
+# The following is simply UI stuff
+##############################################
+
+
+class Progress(object):
+    """Used to report progress of the port"""
+
+    def __init__(self):
+        self.tables = {}
+
+        self.start_time = int(time.time())
+
+    def add_table(self, table, cur, size):
+        self.tables[table] = {
+            "start": cur,
+            "num_done": cur,
+            "total": size,
+            "perc": int(cur * 100 / size),
+        }
+
+    def update(self, table, num_done):
+        data = self.tables[table]
+        data["num_done"] = num_done
+        data["perc"] = int(num_done * 100 / data["total"])
+
+    def done(self):
+        pass
+
+
+class CursesProgress(Progress):
+    """Reports progress to a curses window"""
+
+    def __init__(self, stdscr):
+        self.stdscr = stdscr
+
+        curses.use_default_colors()
+        curses.curs_set(0)
+
+        curses.init_pair(1, curses.COLOR_RED, -1)
+        curses.init_pair(2, curses.COLOR_GREEN, -1)
+
+        self.last_update = 0
+
+        self.finished = False
+
+        self.total_processed = 0
+        self.total_remaining = 0
+
+        super(CursesProgress, self).__init__()
+
+    def update(self, table, num_done):
+        super(CursesProgress, self).update(table, num_done)
+
+        self.total_processed = 0
+        self.total_remaining = 0
+        for data in self.tables.values():
+            self.total_processed += data["num_done"] - data["start"]
+            self.total_remaining += data["total"] - data["num_done"]
+
+        self.render()
+
+    def render(self, force=False):
+        now = time.time()
+
+        if not force and now - self.last_update < 0.2:
+            # reactor.callLater(1, self.render)
+            return
+
+        self.stdscr.clear()
+
+        rows, cols = self.stdscr.getmaxyx()
+
+        duration = int(now) - int(self.start_time)
+
+        minutes, seconds = divmod(duration, 60)
+        duration_str = "%02dm %02ds" % (minutes, seconds)
+
+        if self.finished:
+            status = "Time spent: %s (Done!)" % (duration_str,)
+        else:
+
+            if self.total_processed > 0:
+                left = float(self.total_remaining) / self.total_processed
+
+                est_remaining = (int(now) - self.start_time) * left
+                est_remaining_str = "%02dm %02ds remaining" % divmod(est_remaining, 60)
+            else:
+                est_remaining_str = "Unknown"
+            status = "Time spent: %s (est. remaining: %s)" % (
+                duration_str,
+                est_remaining_str,
+            )
+
+        self.stdscr.addstr(0, 0, status, curses.A_BOLD)
+
+        max_len = max(len(t) for t in self.tables.keys())
+
+        left_margin = 5
+        middle_space = 1
+
+        items = self.tables.items()
+        items = sorted(items, key=lambda i: (i[1]["perc"], i[0]))
+
+        for i, (table, data) in enumerate(items):
+            if i + 2 >= rows:
+                break
+
+            perc = data["perc"]
+
+            color = curses.color_pair(2) if perc == 100 else curses.color_pair(1)
+
+            self.stdscr.addstr(
+                i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color
+            )
+
+            size = 20
+
+            progress = "[%s%s]" % (
+                "#" * int(perc * size / 100),
+                " " * (size - int(perc * size / 100)),
+            )
+
+            self.stdscr.addstr(
+                i + 2,
+                left_margin + max_len + middle_space,
+                "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]),
+            )
+
+        if self.finished:
+            self.stdscr.addstr(rows - 1, 0, "Press any key to exit...")
+
+        self.stdscr.refresh()
+        self.last_update = time.time()
+
+    def done(self):
+        self.finished = True
+        self.render(True)
+        self.stdscr.getch()
+
+    def set_state(self, state):
+        self.stdscr.clear()
+        self.stdscr.addstr(0, 0, state + "...", curses.A_BOLD)
+        self.stdscr.refresh()
+
+
+class TerminalProgress(Progress):
+    """Just prints progress to the terminal"""
+
+    def update(self, table, num_done):
+        super(TerminalProgress, self).update(table, num_done)
+
+        data = self.tables[table]
+
+        print(
+            "%s: %d%% (%d/%d)" % (table, data["perc"], data["num_done"], data["total"])
+        )
+
+    def set_state(self, state):
+        print(state + "...")
+
+
+##############################################
+##############################################
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="A script to port an existing synapse SQLite database to"
+        " a new PostgreSQL database."
+    )
+    parser.add_argument("-v", action="store_true")
+    parser.add_argument(
+        "--sqlite-database",
+        required=True,
+        help="The snapshot of the SQLite database file. This must not be"
+        " currently used by a running synapse server",
+    )
+    parser.add_argument(
+        "--postgres-config",
+        type=argparse.FileType("r"),
+        required=True,
+        help="The database config file for the PostgreSQL database",
+    )
+    parser.add_argument(
+        "--curses", action="store_true", help="display a curses based progress UI"
+    )
+
+    parser.add_argument(
+        "--batch-size",
+        type=int,
+        default=1000,
+        help="The number of rows to select from the SQLite table each"
+        " iteration [default=1000]",
+    )
+
+    args = parser.parse_args()
+
+    logging_config = {
+        "level": logging.DEBUG if args.v else logging.INFO,
+        "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
+    }
+
+    if args.curses:
+        logging_config["filename"] = "port-synapse.log"
+
+    logging.basicConfig(**logging_config)
+
+    sqlite_config = {
+        "name": "sqlite3",
+        "args": {
+            "database": args.sqlite_database,
+            "cp_min": 1,
+            "cp_max": 1,
+            "check_same_thread": False,
+        },
+    }
+
+    hs_config = yaml.safe_load(args.postgres_config)
+
+    if "database" not in hs_config:
+        sys.stderr.write("The configuration file must have a 'database' section.\n")
+        sys.exit(4)
+
+    postgres_config = hs_config["database"]
+
+    if "name" not in postgres_config:
+        sys.stderr.write("Malformed database config: no 'name'\n")
+        sys.exit(2)
+    if postgres_config["name"] != "psycopg2":
+        sys.stderr.write("Database must use the 'psycopg2' connector.\n")
+        sys.exit(3)
+
+    config = HomeServerConfig()
+    config.parse_config_dict(hs_config, "", "")
+
+    def start(stdscr=None):
+        if stdscr:
+            progress = CursesProgress(stdscr)
+        else:
+            progress = TerminalProgress()
+
+        porter = Porter(
+            sqlite_config=sqlite_config,
+            progress=progress,
+            batch_size=args.batch_size,
+            hs_config=config,
+        )
+
+        @defer.inlineCallbacks
+        def run():
+            with LoggingContext("synapse_port_db_run"):
+                yield defer.ensureDeferred(porter.run())
+
+        reactor.callWhenRunning(run)
+
+        reactor.run()
+
+    if args.curses:
+        curses.wrapper(start)
+    else:
+        start()
+
+    if end_error:
+        if end_error_exec_info:
+            exc_type, exc_value, exc_traceback = end_error_exec_info
+            traceback.print_exception(exc_type, exc_value, exc_traceback)
+
+        sys.stderr.write(end_error)
+
+        sys.exit(5)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/synctl.py b/synapse/_scripts/synctl.py
new file mode 100755
index 0000000000..1ab36949c7
--- /dev/null
+++ b/synapse/_scripts/synctl.py
@@ -0,0 +1,360 @@
+#!/usr/bin/env python
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import collections
+import errno
+import glob
+import os
+import os.path
+import signal
+import subprocess
+import sys
+import time
+from typing import Iterable, Optional
+
+import yaml
+
+from synapse.config import find_config_files
+
+MAIN_PROCESS = "synapse.app.homeserver"
+
+GREEN = "\x1b[1;32m"
+YELLOW = "\x1b[1;33m"
+RED = "\x1b[1;31m"
+NORMAL = "\x1b[m"
+
+SYNCTL_CACHE_FACTOR_WARNING = """\
+Setting 'synctl_cache_factor' in the config is deprecated. Instead, please do
+one of the following:
+ - Either set the environment variable 'SYNAPSE_CACHE_FACTOR'
+ - or set 'caches.global_factor' in the homeserver config.
+--------------------------------------------------------------------------------"""
+
+
+def pid_running(pid):
+    try:
+        os.kill(pid, 0)
+    except OSError as err:
+        if err.errno == errno.EPERM:
+            pass  # process exists
+        else:
+            return False
+
+    # When running in a container, orphan processes may not get reaped and their
+    # PIDs may remain valid. Try to work around the issue.
+    try:
+        with open(f"/proc/{pid}/status") as status_file:
+            if "zombie" in status_file.read():
+                return False
+    except Exception:
+        # This isn't Linux or `/proc/` is unavailable.
+        # Assume that the process is still running.
+        pass
+
+    return True
+
+
+def write(message, colour=NORMAL, stream=sys.stdout):
+    # Lets check if we're writing to a TTY before colouring
+    should_colour = False
+    try:
+        should_colour = stream.isatty()
+    except AttributeError:
+        # Just in case `isatty` isn't defined on everything. The python
+        # docs are incredibly vague.
+        pass
+
+    if not should_colour:
+        stream.write(message + "\n")
+    else:
+        stream.write(colour + message + NORMAL + "\n")
+
+
+def abort(message, colour=RED, stream=sys.stderr):
+    write(message, colour, stream)
+    sys.exit(1)
+
+
+def start(pidfile: str, app: str, config_files: Iterable[str], daemonize: bool) -> bool:
+    """Attempts to start a synapse main or worker process.
+    Args:
+        pidfile: the pidfile we expect the process to create
+        app: the python module to run
+        config_files: config files to pass to synapse
+        daemonize: if True, will include a --daemonize argument to synapse
+
+    Returns:
+        True if the process started successfully or was already running
+        False if there was an error starting the process
+    """
+
+    if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
+        print(app + " already running")
+        return True
+
+    args = [sys.executable, "-m", app]
+    for c in config_files:
+        args += ["-c", c]
+    if daemonize:
+        args.append("--daemonize")
+
+    try:
+        subprocess.check_call(args)
+        write("started %s(%s)" % (app, ",".join(config_files)), colour=GREEN)
+        return True
+    except subprocess.CalledProcessError as e:
+        err = "%s(%s) failed to start (exit code: %d). Check the Synapse logfile" % (
+            app,
+            ",".join(config_files),
+            e.returncode,
+        )
+        if daemonize:
+            err += ", or run synctl with --no-daemonize"
+        err += "."
+        write(err, colour=RED, stream=sys.stderr)
+        return False
+
+
+def stop(pidfile: str, app: str) -> Optional[int]:
+    """Attempts to kill a synapse worker from the pidfile.
+    Args:
+        pidfile: path to file containing worker's pid
+        app: name of the worker's appservice
+
+    Returns:
+        process id, or None if the process was not running
+    """
+
+    if os.path.exists(pidfile):
+        pid = int(open(pidfile).read())
+        try:
+            os.kill(pid, signal.SIGTERM)
+            write("stopped %s" % (app,), colour=GREEN)
+            return pid
+        except OSError as err:
+            if err.errno == errno.ESRCH:
+                write("%s not running" % (app,), colour=YELLOW)
+            elif err.errno == errno.EPERM:
+                abort("Cannot stop %s: Operation not permitted" % (app,))
+            else:
+                abort("Cannot stop %s: Unknown error" % (app,))
+    else:
+        write(
+            "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)"
+            % (app, pidfile),
+            colour=YELLOW,
+        )
+    return None
+
+
+Worker = collections.namedtuple(
+    "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"]
+)
+
+
+def main():
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "action",
+        choices=["start", "stop", "restart"],
+        help="whether to start, stop or restart the synapse",
+    )
+    parser.add_argument(
+        "configfile",
+        nargs="?",
+        default="homeserver.yaml",
+        help="the homeserver config file. Defaults to homeserver.yaml. May also be"
+        " a directory with *.yaml files",
+    )
+    parser.add_argument(
+        "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
+    )
+    parser.add_argument(
+        "-a",
+        "--all-processes",
+        metavar="WORKERCONFIGDIR",
+        help="start or stop all the workers in the given directory"
+        " and the main synapse process",
+    )
+    parser.add_argument(
+        "--no-daemonize",
+        action="store_false",
+        dest="daemonize",
+        help="Run synapse in the foreground for debugging. "
+        "Will work only if the daemonize option is not set in the config.",
+    )
+
+    options = parser.parse_args()
+
+    if options.worker and options.all_processes:
+        write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr)
+        sys.exit(1)
+    if not options.daemonize and options.all_processes:
+        write('Cannot use "--no-daemonize" with "--all-processes"', stream=sys.stderr)
+        sys.exit(1)
+
+    configfile = options.configfile
+
+    if not os.path.exists(configfile):
+        write(
+            f"Config file {configfile} does not exist.\n"
+            f"To generate a config file, run:\n"
+            f"    {sys.executable} -m {MAIN_PROCESS}"
+            f" -c {configfile} --generate-config"
+            f" --server-name=<server name> --report-stats=<yes/no>\n",
+            stream=sys.stderr,
+        )
+        sys.exit(1)
+
+    config_files = find_config_files([configfile])
+    config = {}
+    for config_file in config_files:
+        with open(config_file) as file_stream:
+            yaml_config = yaml.safe_load(file_stream)
+        if yaml_config is not None:
+            config.update(yaml_config)
+
+    pidfile = config["pid_file"]
+    cache_factor = config.get("synctl_cache_factor")
+    start_stop_synapse = True
+
+    if cache_factor:
+        write(SYNCTL_CACHE_FACTOR_WARNING)
+        os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+
+    cache_factors = config.get("synctl_cache_factors", {})
+    for cache_name, factor in cache_factors.items():
+        os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
+    worker_configfiles = []
+    if options.worker:
+        start_stop_synapse = False
+        worker_configfile = options.worker
+        if not os.path.exists(worker_configfile):
+            write(
+                "No worker config found at %r" % (worker_configfile,), stream=sys.stderr
+            )
+            sys.exit(1)
+        worker_configfiles.append(worker_configfile)
+
+    if options.all_processes:
+        # To start the main synapse with -a you need to add a worker file
+        # with worker_app == "synapse.app.homeserver"
+        start_stop_synapse = False
+        worker_configdir = options.all_processes
+        if not os.path.isdir(worker_configdir):
+            write(
+                "No worker config directory found at %r" % (worker_configdir,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.extend(
+            sorted(glob.glob(os.path.join(worker_configdir, "*.yaml")))
+        )
+
+    workers = []
+    for worker_configfile in worker_configfiles:
+        with open(worker_configfile) as stream:
+            worker_config = yaml.safe_load(stream)
+        worker_app = worker_config["worker_app"]
+        if worker_app == "synapse.app.homeserver":
+            # We need to special case all of this to pick up options that may
+            # be set in the main config file or in this worker config file.
+            worker_pidfile = worker_config.get("pid_file") or pidfile
+            worker_cache_factor = (
+                worker_config.get("synctl_cache_factor") or cache_factor
+            )
+            worker_cache_factors = (
+                worker_config.get("synctl_cache_factors") or cache_factors
+            )
+            # The master process doesn't support using worker_* config.
+            for key in worker_config:
+                if key == "worker_app":  # But we allow worker_app
+                    continue
+                assert not key.startswith(
+                    "worker_"
+                ), "Main process cannot use worker_* config"
+        else:
+            worker_pidfile = worker_config["worker_pid_file"]
+            worker_cache_factor = worker_config.get("synctl_cache_factor")
+            worker_cache_factors = worker_config.get("synctl_cache_factors", {})
+        workers.append(
+            Worker(
+                worker_app,
+                worker_configfile,
+                worker_pidfile,
+                worker_cache_factor,
+                worker_cache_factors,
+            )
+        )
+
+    action = options.action
+
+    if action == "stop" or action == "restart":
+        running_pids = []
+        for worker in workers:
+            pid = stop(worker.pidfile, worker.app)
+            if pid is not None:
+                running_pids.append(pid)
+
+        if start_stop_synapse:
+            pid = stop(pidfile, MAIN_PROCESS)
+            if pid is not None:
+                running_pids.append(pid)
+
+        if len(running_pids) > 0:
+            write("Waiting for processes to exit...")
+            for running_pid in running_pids:
+                while pid_running(running_pid):
+                    time.sleep(0.2)
+            write("All processes exited")
+
+    if action == "start" or action == "restart":
+        error = False
+        if start_stop_synapse:
+            if not start(pidfile, MAIN_PROCESS, (configfile,), options.daemonize):
+                error = True
+
+        for worker in workers:
+            env = os.environ.copy()
+
+            if worker.cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+            for cache_name, factor in worker.cache_factors.items():
+                os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
+            if not start(
+                worker.pidfile,
+                worker.app,
+                (configfile, worker.configfile),
+                options.daemonize,
+            ):
+                error = True
+
+            # Reset env back to the original
+            os.environ.clear()
+            os.environ.update(env)
+
+        if error:
+            exit(1)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py
new file mode 100755
index 0000000000..f43676afaa
--- /dev/null
+++ b/synapse/_scripts/update_synapse_database.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import logging
+import sys
+
+import yaml
+from matrix_common.versionstring import get_distribution_version_string
+
+from twisted.internet import defer, reactor
+
+from synapse.config.homeserver import HomeServerConfig
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.server import HomeServer
+from synapse.storage import DataStore
+
+logger = logging.getLogger("update_database")
+
+
+class MockHomeserver(HomeServer):
+    DATASTORE_CLASS = DataStore
+
+    def __init__(self, config, **kwargs):
+        super(MockHomeserver, self).__init__(
+            config.server.server_name, reactor=reactor, config=config, **kwargs
+        )
+
+        self.version_string = "Synapse/" + get_distribution_version_string(
+            "matrix-synapse"
+        )
+
+
+def run_background_updates(hs):
+    store = hs.get_datastores().main
+
+    async def run_background_updates():
+        await store.db_pool.updates.run_background_updates(sleep=False)
+        # Stop the reactor to exit the script once every background update is run.
+        reactor.stop()
+
+    def run():
+        # Apply all background updates on the database.
+        defer.ensureDeferred(
+            run_as_background_process("background_updates", run_background_updates)
+        )
+
+    reactor.callWhenRunning(run)
+
+    reactor.run()
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description=(
+            "Updates a synapse database to the latest schema and optionally runs background updates"
+            " on it."
+        )
+    )
+    parser.add_argument("-v", action="store_true")
+    parser.add_argument(
+        "--database-config",
+        type=argparse.FileType("r"),
+        required=True,
+        help="Synapse configuration file, giving the details of the database to be updated",
+    )
+    parser.add_argument(
+        "--run-background-updates",
+        action="store_true",
+        required=False,
+        help="run background updates after upgrading the database schema",
+    )
+
+    args = parser.parse_args()
+
+    logging_config = {
+        "level": logging.DEBUG if args.v else logging.INFO,
+        "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s",
+    }
+
+    logging.basicConfig(**logging_config)
+
+    # Load, process and sanity-check the config.
+    hs_config = yaml.safe_load(args.database_config)
+
+    if "database" not in hs_config:
+        sys.stderr.write("The configuration file must have a 'database' section.\n")
+        sys.exit(4)
+
+    config = HomeServerConfig()
+    config.parse_config_dict(hs_config, "", "")
+
+    # Instantiate and initialise the homeserver object.
+    hs = MockHomeserver(config)
+
+    # Setup instantiates the store within the homeserver object and updates the
+    # DB.
+    hs.setup()
+
+    if args.run_background_updates:
+        run_background_updates(hs)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 4d3f8e4923..07ec95f1d6 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -175,27 +175,14 @@ class ApplicationService:
             return namespace.exclusive
         return False
 
-    async def _matches_user(self, event: EventBase, store: "DataStore") -> bool:
-        if self.is_interested_in_user(event.sender):
-            return True
-
-        # also check m.room.member state key
-        if event.type == EventTypes.Member and self.is_interested_in_user(
-            event.state_key
-        ):
-            return True
-
-        does_match = await self.matches_user_in_member_list(event.room_id, store)
-        return does_match
-
     @cached(num_args=1, cache_context=True)
-    async def matches_user_in_member_list(
+    async def _matches_user_in_member_list(
         self,
         room_id: str,
         store: "DataStore",
         cache_context: _CacheContext,
     ) -> bool:
-        """Check if this service is interested a room based upon it's membership
+        """Check if this service is interested a room based upon its membership
 
         Args:
             room_id: The room to check.
@@ -214,47 +201,110 @@ class ApplicationService:
                 return True
         return False
 
-    def _matches_room_id(self, event: EventBase) -> bool:
-        if hasattr(event, "room_id"):
-            return self.is_interested_in_room(event.room_id)
-        return False
+    def is_interested_in_user(
+        self,
+        user_id: str,
+    ) -> bool:
+        """
+        Returns whether the application is interested in a given user ID.
+
+        The appservice is considered to be interested in a user if either: the
+        user ID is in the appservice's user namespace, or if the user is the
+        appservice's configured sender_localpart.
+
+        Args:
+            user_id: The ID of the user to check.
+
+        Returns:
+            True if the application service is interested in the user, False if not.
+        """
+        return (
+            # User is the appservice's sender_localpart user
+            user_id == self.sender
+            # User is in the appservice's user namespace
+            or self.is_user_in_namespace(user_id)
+        )
+
+    @cached(num_args=1, cache_context=True)
+    async def is_interested_in_room(
+        self,
+        room_id: str,
+        store: "DataStore",
+        cache_context: _CacheContext,
+    ) -> bool:
+        """
+        Returns whether the application service is interested in a given room ID.
+
+        The appservice is considered to be interested in the room if either: the ID or one
+        of the aliases of the room is in the appservice's room ID or alias namespace
+        respectively, or if one of the members of the room fall into the appservice's user
+        namespace.
 
-    async def _matches_aliases(self, event: EventBase, store: "DataStore") -> bool:
-        alias_list = await store.get_aliases_for_room(event.room_id)
+        Args:
+            room_id: The ID of the room to check.
+            store: The homeserver's datastore class.
+
+        Returns:
+            True if the application service is interested in the room, False if not.
+        """
+        # Check if we have interest in this room ID
+        if self.is_room_id_in_namespace(room_id):
+            return True
+
+        # likewise with the room's aliases (if it has any)
+        alias_list = await store.get_aliases_for_room(room_id)
         for alias in alias_list:
-            if self.is_interested_in_alias(alias):
+            if self.is_room_alias_in_namespace(alias):
                 return True
 
-        return False
+        # And finally, perform an expensive check on whether any of the
+        # users in the room match the appservice's user namespace
+        return await self._matches_user_in_member_list(
+            room_id, store, on_invalidate=cache_context.invalidate
+        )
 
-    async def is_interested(self, event: EventBase, store: "DataStore") -> bool:
+    @cached(num_args=1, cache_context=True)
+    async def is_interested_in_event(
+        self,
+        event_id: str,
+        event: EventBase,
+        store: "DataStore",
+        cache_context: _CacheContext,
+    ) -> bool:
         """Check if this service is interested in this event.
 
         Args:
+            event_id: The ID of the event to check. This is purely used for simplifying the
+                caching of calls to this method.
             event: The event to check.
             store: The datastore to query.
 
         Returns:
-            True if this service would like to know about this event.
+            True if this service would like to know about this event, otherwise False.
         """
-        # Do cheap checks first
-        if self._matches_room_id(event):
+        # Check if we're interested in this event's sender by namespace (or if they're the
+        # sender_localpart user)
+        if self.is_interested_in_user(event.sender):
             return True
 
-        # This will check the namespaces first before
-        # checking the store, so should be run before _matches_aliases
-        if await self._matches_user(event, store):
+        # additionally, if this is a membership event, perform the same checks on
+        # the user it references
+        if event.type == EventTypes.Member and self.is_interested_in_user(
+            event.state_key
+        ):
             return True
 
-        # This will check the store, so should be run last
-        if await self._matches_aliases(event, store):
+        # This will check the datastore, so should be run last
+        if await self.is_interested_in_room(
+            event.room_id, store, on_invalidate=cache_context.invalidate
+        ):
             return True
 
         return False
 
-    @cached(num_args=1)
+    @cached(num_args=1, cache_context=True)
     async def is_interested_in_presence(
-        self, user_id: UserID, store: "DataStore"
+        self, user_id: UserID, store: "DataStore", cache_context: _CacheContext
     ) -> bool:
         """Check if this service is interested a user's presence
 
@@ -272,20 +322,19 @@ class ApplicationService:
 
         # Then find out if the appservice is interested in any of those rooms
         for room_id in room_ids:
-            if await self.matches_user_in_member_list(room_id, store):
+            if await self.is_interested_in_room(
+                room_id, store, on_invalidate=cache_context.invalidate
+            ):
                 return True
         return False
 
-    def is_interested_in_user(self, user_id: str) -> bool:
-        return (
-            bool(self._matches_regex(ApplicationService.NS_USERS, user_id))
-            or user_id == self.sender
-        )
+    def is_user_in_namespace(self, user_id: str) -> bool:
+        return bool(self._matches_regex(ApplicationService.NS_USERS, user_id))
 
-    def is_interested_in_alias(self, alias: str) -> bool:
+    def is_room_alias_in_namespace(self, alias: str) -> bool:
         return bool(self._matches_regex(ApplicationService.NS_ALIASES, alias))
 
-    def is_interested_in_room(self, room_id: str) -> bool:
+    def is_room_id_in_namespace(self, room_id: str) -> bool:
         return bool(self._matches_regex(ApplicationService.NS_ROOMS, room_id))
 
     def is_exclusive_user(self, user_id: str) -> bool:
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index a0ea958af6..98fe354014 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -25,7 +25,7 @@ from synapse.appservice import (
     TransactionUnusedFallbackKeys,
 )
 from synapse.events import EventBase
-from synapse.events.utils import serialize_event
+from synapse.events.utils import SerializeEventConfig, serialize_event
 from synapse.http.client import SimpleHttpClient
 from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util.caches.response_cache import ResponseCache
@@ -321,16 +321,18 @@ class ApplicationServiceApi(SimpleHttpClient):
             serialize_event(
                 e,
                 time_now,
-                as_client_event=True,
-                # If this is an invite or a knock membership event, and we're interested
-                # in this user, then include any stripped state alongside the event.
-                include_stripped_room_state=(
-                    e.type == EventTypes.Member
-                    and (
-                        e.membership == Membership.INVITE
-                        or e.membership == Membership.KNOCK
-                    )
-                    and service.is_interested_in_user(e.state_key)
+                config=SerializeEventConfig(
+                    as_client_event=True,
+                    # If this is an invite or a knock membership event, and we're interested
+                    # in this user, then include any stripped state alongside the event.
+                    include_stripped_room_state=(
+                        e.type == EventTypes.Member
+                        and (
+                            e.membership == Membership.INVITE
+                            or e.membership == Membership.KNOCK
+                        )
+                        and service.is_interested_in_user(e.state_key)
+                    ),
                 ),
             )
             for e in events
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 1265738dc1..8e19e2fc26 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -383,7 +383,7 @@ class RootConfig:
         Build a default configuration file
 
         This is used when the user explicitly asks us to generate a config file
-        (eg with --generate_config).
+        (eg with --generate-config).
 
         Args:
             config_dir_path: The path where the config files are kept. Used to
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 9386fa29dd..ee34cb46e4 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -26,6 +26,7 @@ from typing import (
     Union,
 )
 
+import attr
 from frozendict import frozendict
 
 from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
@@ -303,29 +304,37 @@ def format_event_for_client_v2_without_room_id(d: JsonDict) -> JsonDict:
     return d
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class SerializeEventConfig:
+    as_client_event: bool = True
+    # Function to convert from federation format to client format
+    event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1
+    # ID of the user's auth token - used for namespacing of transaction IDs
+    token_id: Optional[int] = None
+    # List of event fields to include. If empty, all fields will be returned.
+    only_event_fields: Optional[List[str]] = None
+    # Some events can have stripped room state stored in the `unsigned` field.
+    # This is required for invite and knock functionality. If this option is
+    # False, that state will be removed from the event before it is returned.
+    # Otherwise, it will be kept.
+    include_stripped_room_state: bool = False
+
+
+_DEFAULT_SERIALIZE_EVENT_CONFIG = SerializeEventConfig()
+
+
 def serialize_event(
     e: Union[JsonDict, EventBase],
     time_now_ms: int,
     *,
-    as_client_event: bool = True,
-    event_format: Callable[[JsonDict], JsonDict] = format_event_for_client_v1,
-    token_id: Optional[str] = None,
-    only_event_fields: Optional[List[str]] = None,
-    include_stripped_room_state: bool = False,
+    config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
 ) -> JsonDict:
     """Serialize event for clients
 
     Args:
         e
         time_now_ms
-        as_client_event
-        event_format
-        token_id
-        only_event_fields
-        include_stripped_room_state: Some events can have stripped room state
-            stored in the `unsigned` field. This is required for invite and knock
-            functionality. If this option is False, that state will be removed from the
-            event before it is returned. Otherwise, it will be kept.
+        config: Event serialization config
 
     Returns:
         The serialized event dictionary.
@@ -348,11 +357,11 @@ def serialize_event(
 
     if "redacted_because" in e.unsigned:
         d["unsigned"]["redacted_because"] = serialize_event(
-            e.unsigned["redacted_because"], time_now_ms, event_format=event_format
+            e.unsigned["redacted_because"], time_now_ms, config=config
         )
 
-    if token_id is not None:
-        if token_id == getattr(e.internal_metadata, "token_id", None):
+    if config.token_id is not None:
+        if config.token_id == getattr(e.internal_metadata, "token_id", None):
             txn_id = getattr(e.internal_metadata, "txn_id", None)
             if txn_id is not None:
                 d["unsigned"]["transaction_id"] = txn_id
@@ -361,13 +370,14 @@ def serialize_event(
     # that are meant to provide metadata about a room to an invitee/knocker. They are
     # intended to only be included in specific circumstances, such as down sync, and
     # should not be included in any other case.
-    if not include_stripped_room_state:
+    if not config.include_stripped_room_state:
         d["unsigned"].pop("invite_room_state", None)
         d["unsigned"].pop("knock_room_state", None)
 
-    if as_client_event:
-        d = event_format(d)
+    if config.as_client_event:
+        d = config.event_format(d)
 
+    only_event_fields = config.only_event_fields
     if only_event_fields:
         if not isinstance(only_event_fields, list) or not all(
             isinstance(f, str) for f in only_event_fields
@@ -390,18 +400,18 @@ class EventClientSerializer:
         event: Union[JsonDict, EventBase],
         time_now: int,
         *,
+        config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
         bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None,
-        **kwargs: Any,
     ) -> JsonDict:
         """Serializes a single event.
 
         Args:
             event: The event being serialized.
             time_now: The current time in milliseconds
+            config: Event serialization config
             bundle_aggregations: Whether to include the bundled aggregations for this
                 event. Only applies to non-state events. (State events never include
                 bundled aggregations.)
-            **kwargs: Arguments to pass to `serialize_event`
 
         Returns:
             The serialized event
@@ -410,7 +420,7 @@ class EventClientSerializer:
         if not isinstance(event, EventBase):
             return event
 
-        serialized_event = serialize_event(event, time_now, **kwargs)
+        serialized_event = serialize_event(event, time_now, config=config)
 
         # Check if there are any bundled aggregations to include with the event.
         if bundle_aggregations:
@@ -419,6 +429,7 @@ class EventClientSerializer:
                 self._inject_bundled_aggregations(
                     event,
                     time_now,
+                    config,
                     bundle_aggregations[event.event_id],
                     serialized_event,
                 )
@@ -456,6 +467,7 @@ class EventClientSerializer:
         self,
         event: EventBase,
         time_now: int,
+        config: SerializeEventConfig,
         aggregations: "BundledAggregations",
         serialized_event: JsonDict,
     ) -> None:
@@ -466,6 +478,7 @@ class EventClientSerializer:
             time_now: The current time in milliseconds
             aggregations: The bundled aggregation to serialize.
             serialized_event: The serialized event which may be modified.
+            config: Event serialization config
 
         """
         serialized_aggregations = {}
@@ -493,8 +506,8 @@ class EventClientSerializer:
             thread = aggregations.thread
 
             # Don't bundle aggregations as this could recurse forever.
-            serialized_latest_event = self.serialize_event(
-                thread.latest_event, time_now, bundle_aggregations=None
+            serialized_latest_event = serialize_event(
+                thread.latest_event, time_now, config=config
             )
             # Manually apply an edit, if one exists.
             if thread.latest_edit:
@@ -515,20 +528,34 @@ class EventClientSerializer:
             )
 
     def serialize_events(
-        self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
+        self,
+        events: Iterable[Union[JsonDict, EventBase]],
+        time_now: int,
+        *,
+        config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG,
+        bundle_aggregations: Optional[Dict[str, "BundledAggregations"]] = None,
     ) -> List[JsonDict]:
         """Serializes multiple events.
 
         Args:
             event
             time_now: The current time in milliseconds
-            **kwargs: Arguments to pass to `serialize_event`
+            config: Event serialization config
+            bundle_aggregations: Whether to include the bundled aggregations for this
+                event. Only applies to non-state events. (State events never include
+                bundled aggregations.)
 
         Returns:
             The list of serialized events
         """
         return [
-            self.serialize_event(event, time_now=time_now, **kwargs) for event in events
+            self.serialize_event(
+                event,
+                time_now,
+                config=config,
+                bundle_aggregations=bundle_aggregations,
+            )
+            for event in events
         ]
 
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 64e595e748..467275b98c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1428,7 +1428,7 @@ class FederationClient(FederationBase):
 
             # Validate children_state of the room.
             children_state = room.pop("children_state", [])
-            if not isinstance(children_state, Sequence):
+            if not isinstance(children_state, list):
                 raise InvalidResponseError("'room.children_state' must be a list")
             if any(not isinstance(e, dict) for e in children_state):
                 raise InvalidResponseError("Invalid event in 'children_state' list")
@@ -1440,14 +1440,14 @@ class FederationClient(FederationBase):
 
             # Validate the children rooms.
             children = res.get("children", [])
-            if not isinstance(children, Sequence):
+            if not isinstance(children, list):
                 raise InvalidResponseError("'children' must be a list")
             if any(not isinstance(r, dict) for r in children):
                 raise InvalidResponseError("Invalid room in 'children' list")
 
             # Validate the inaccessible children.
             inaccessible_children = res.get("inaccessible_children", [])
-            if not isinstance(inaccessible_children, Sequence):
+            if not isinstance(inaccessible_children, list):
                 raise InvalidResponseError("'inaccessible_children' must be a list")
             if any(not isinstance(r, str) for r in inaccessible_children):
                 raise InvalidResponseError(
@@ -1630,7 +1630,7 @@ def _validate_hierarchy_event(d: JsonDict) -> None:
         raise ValueError("Invalid event: 'content' must be a dict")
 
     via = content.get("via")
-    if not isinstance(via, Sequence):
+    if not isinstance(via, list):
         raise ValueError("Invalid event: 'via' must be a list")
     if any(not isinstance(v, str) for v in via):
         raise ValueError("Invalid event: 'via' must be a list of strings")
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 0d7c4f5067..d720b5fd3f 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -244,7 +244,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         self.notifier.on_new_replication_data()
 
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = False) -> None:
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
         # stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6106a486d1..30e2421efc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -118,7 +118,12 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = True) -> None:
+        """Tells the sender that a new device message is ready to be sent to the
+        destination. The `immediate` flag specifies whether the messages should
+        be tried to be sent immediately, or whether it can be delayed for a
+        short while (to aid performance).
+        """
         raise NotImplementedError()
 
     @abc.abstractmethod
@@ -146,9 +151,8 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
 
 
 @attr.s
-class _PresenceQueue:
-    """A queue of destinations that need to be woken up due to new presence
-    updates.
+class _DestinationWakeupQueue:
+    """A queue of destinations that need to be woken up due to new updates.
 
     Staggers waking up of per destination queues to ensure that we don't attempt
     to start TLS connections with many hosts all at once, leading to pinned CPU.
@@ -175,7 +179,7 @@ class _PresenceQueue:
         if not self.processing:
             self._handle()
 
-    @wrap_as_background_process("_PresenceQueue.handle")
+    @wrap_as_background_process("_DestinationWakeupQueue.handle")
     async def _handle(self) -> None:
         """Background process to drain the queue."""
 
@@ -297,7 +301,7 @@ class FederationSender(AbstractFederationSender):
 
         self._external_cache = hs.get_external_cache()
 
-        self._presence_queue = _PresenceQueue(self, self.clock)
+        self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
 
     def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
         """Get or create a PerDestinationQueue for the given destination
@@ -614,7 +618,7 @@ class FederationSender(AbstractFederationSender):
                 states, start_loop=False
             )
 
-            self._presence_queue.add_to_queue(destination)
+            self._destination_wakeup_queue.add_to_queue(destination)
 
     def build_and_send_edu(
         self,
@@ -667,7 +671,7 @@ class FederationSender(AbstractFederationSender):
         else:
             queue.send_edu(edu)
 
-    def send_device_messages(self, destination: str) -> None:
+    def send_device_messages(self, destination: str, immediate: bool = False) -> None:
         if destination == self.server_name:
             logger.warning("Not sending device update to ourselves")
             return
@@ -677,7 +681,11 @@ class FederationSender(AbstractFederationSender):
         ):
             return
 
-        self._get_per_destination_queue(destination).attempt_new_transaction()
+        if immediate:
+            self._get_per_destination_queue(destination).attempt_new_transaction()
+        else:
+            self._get_per_destination_queue(destination).mark_new_data()
+            self._destination_wakeup_queue.add_to_queue(destination)
 
     def wake_destination(self, destination: str) -> None:
         """Called when we want to retry sending transactions to a remote.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index c8768f22bc..d80f0ac5e8 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -219,6 +219,16 @@ class PerDestinationQueue:
         self._pending_edus.append(edu)
         self.attempt_new_transaction()
 
+    def mark_new_data(self) -> None:
+        """Marks that the destination has new data to send, without starting a
+        new transaction.
+
+        If a transaction loop is already in progress then a new transcation will
+        be attempted when the current one finishes.
+        """
+
+        self._new_data_to_send = True
+
     def attempt_new_transaction(self) -> None:
         """Try to start a new transaction to this destination
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index e6461cc3c9..bd913e524e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -571,7 +571,7 @@ class ApplicationServicesHandler:
         room_alias_str = room_alias.to_string()
         services = self.store.get_app_services()
         alias_query_services = [
-            s for s in services if (s.is_interested_in_alias(room_alias_str))
+            s for s in services if (s.is_room_alias_in_namespace(room_alias_str))
         ]
         for alias_service in alias_query_services:
             is_known_alias = await self.appservice_api.query_alias(
@@ -660,7 +660,7 @@ class ApplicationServicesHandler:
         # inside of a list comprehension anymore.
         interested_list = []
         for s in services:
-            if await s.is_interested(event, self.store):
+            if await s.is_interested_in_event(event.event_id, event, self.store):
                 interested_list.append(s)
 
         return interested_list
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 934b5bd734..d90cb259a6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -506,7 +506,7 @@ class DeviceHandler(DeviceWorkerHandler):
                 "Sending device list update notif for %r to: %r", user_id, hosts
             )
             for host in hosts:
-                self.federation_sender.send_device_messages(host)
+                self.federation_sender.send_device_messages(host, immediate=False)
                 log_kv({"message": "sent device update to host", "host": host})
 
     async def notify_user_signature_update(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index b7064c6624..33d827a45b 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler:
 
         service = requester.app_service
         if service:
-            if not service.is_interested_in_alias(room_alias_str):
+            if not service.is_room_alias_in_namespace(room_alias_str):
                 raise SynapseError(
                     400,
                     "This application service has not reserved this kind of alias.",
@@ -221,7 +221,7 @@ class DirectoryHandler:
     async def delete_appservice_association(
         self, service: ApplicationService, room_alias: RoomAlias
     ) -> None:
-        if not service.is_interested_in_alias(room_alias.to_string()):
+        if not service.is_room_alias_in_namespace(room_alias.to_string()):
             raise SynapseError(
                 400,
                 "This application service has not reserved this kind of alias",
@@ -376,7 +376,7 @@ class DirectoryHandler:
         # non-exclusive locks on the alias (or there are no interested services)
         services = self.store.get_app_services()
         interested_services = [
-            s for s in services if s.is_interested_in_alias(alias.to_string())
+            s for s in services if s.is_room_alias_in_namespace(alias.to_string())
         ]
 
         for service in interested_services:
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 97e75e60c3..d2ccb5c5d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Iterable, List, Optional
 from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
+from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, UserID
@@ -120,7 +121,7 @@ class EventStreamHandler:
             chunks = self._event_serializer.serialize_events(
                 events,
                 time_now,
-                as_client_event=as_client_event,
+                config=SerializeEventConfig(as_client_event=as_client_event),
             )
 
             chunk = {
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 344f20f37c..316cfae24f 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase
+from synapse.events.utils import SerializeEventConfig
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.receipts import ReceiptEventSource
@@ -156,6 +157,8 @@ class InitialSyncHandler:
         if limit is None:
             limit = 10
 
+        serializer_options = SerializeEventConfig(as_client_event=as_client_event)
+
         async def handle_room(event: RoomsForUser) -> None:
             d: JsonDict = {
                 "room_id": event.room_id,
@@ -173,7 +176,7 @@ class InitialSyncHandler:
                 d["invite"] = self._event_serializer.serialize_event(
                     invite_event,
                     time_now,
-                    as_client_event=as_client_event,
+                    config=serializer_options,
                 )
 
             rooms_ret.append(d)
@@ -225,7 +228,7 @@ class InitialSyncHandler:
                         self._event_serializer.serialize_events(
                             messages,
                             time_now=time_now,
-                            as_client_event=as_client_event,
+                            config=serializer_options,
                         )
                     ),
                     "start": await start_token.to_string(self.store),
@@ -235,7 +238,7 @@ class InitialSyncHandler:
                 d["state"] = self._event_serializer.serialize_events(
                     current_state.values(),
                     time_now=time_now,
-                    as_client_event=as_client_event,
+                    config=serializer_options,
                 )
 
                 account_data_events = []
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 61cb133ef2..0799ec9a84 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1069,6 +1069,9 @@ class EventCreationHandler:
         if relation_type == RelationTypes.ANNOTATION:
             aggregation_key = relation["key"]
 
+            if len(aggregation_key) > 500:
+                raise SynapseError(400, "Aggregation key is too long")
+
             already_exists = await self.store.has_user_annotated_event(
                 relates_to, event.type, aggregation_key, event.sender
             )
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5c01a426ff..183fabcfc0 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -22,6 +22,7 @@ from twisted.python.failure import Failure
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
+from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.room import ShutdownRoomResponse
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
@@ -541,13 +542,15 @@ class PaginationHandler:
 
         time_now = self.clock.time_msec()
 
+        serialize_options = SerializeEventConfig(as_client_event=as_client_event)
+
         chunk = {
             "chunk": (
                 self._event_serializer.serialize_events(
                     events,
                     time_now,
+                    config=serialize_options,
                     bundle_aggregations=aggregations,
-                    as_client_event=as_client_event,
                 )
             ),
             "start": await from_token.to_string(self.store),
@@ -556,7 +559,7 @@ class PaginationHandler:
 
         if state:
             chunk["state"] = self._event_serializer.serialize_events(
-                state, time_now, as_client_event=as_client_event
+                state, time_now, config=serialize_options
             )
 
         return chunk
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index b4132c353a..6250bb3bdf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -269,7 +269,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
         # Then filter down to rooms that the AS can read
         events = []
         for room_id, event in rooms_to_events.items():
-            if not await service.matches_user_in_member_list(room_id, self.store):
+            if not await service.is_interested_in_room(room_id, self.store):
                 continue
 
             events.append(event)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a582837cf0..7cbc484b06 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1736,8 +1736,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             txn_id=txn_id,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
+            outlier=True,
         )
-        event.internal_metadata.outlier = True
         event.internal_metadata.out_of_band_membership = True
 
         result_event = await self.event_creation_handler.handle_new_client_event(
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 55c2cbdba8..3979cbba71 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -857,7 +857,7 @@ class _RoomEntry:
 
 def _has_valid_via(e: EventBase) -> bool:
     via = e.content.get("via")
-    if not via or not isinstance(via, Sequence):
+    if not via or not isinstance(via, list):
         return False
     for v in via:
         if not isinstance(v, str):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 843c68eb0f..3b89126528 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -486,9 +486,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
                 if handler._room_serials[room_id] <= from_key:
                     continue
 
-                if not await service.matches_user_in_member_list(
-                    room_id, self._main_store
-                ):
+                if not await service.is_interested_in_room(room_id, self._main_store):
                     continue
 
                 events.append(self._make_event_for(room_id))
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 1b8479b0b4..b8fc1d4db9 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -380,7 +380,7 @@ class FederationSenderHandler:
             # changes.
             hosts = {row.entity for row in rows if not row.entity.startswith("@")}
             for host in hosts:
-                self.federation_sender.send_device_messages(host)
+                self.federation_sender.send_device_messages(host, immediate=False)
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local
diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py
index 20377a9ac6..ff040de6b8 100644
--- a/synapse/rest/client/notifications.py
+++ b/synapse/rest/client/notifications.py
@@ -16,7 +16,10 @@ import logging
 from typing import TYPE_CHECKING, Tuple
 
 from synapse.api.constants import ReceiptTypes
-from synapse.events.utils import format_event_for_client_v2_without_room_id
+from synapse.events.utils import (
+    SerializeEventConfig,
+    format_event_for_client_v2_without_room_id,
+)
 from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_integer, parse_string
 from synapse.http.site import SynapseRequest
@@ -75,7 +78,9 @@ class NotificationsServlet(RestServlet):
                     self._event_serializer.serialize_event(
                         notif_events[pa.event_id],
                         self.clock.time_msec(),
-                        event_format=format_event_for_client_v2_without_room_id,
+                        config=SerializeEventConfig(
+                            event_format=format_event_for_client_v2_without_room_id
+                        ),
                     )
                 ),
             }
diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py
index 487ea38b55..07fa1cdd4c 100644
--- a/synapse/rest/client/relations.py
+++ b/synapse/rest/client/relations.py
@@ -27,50 +27,15 @@ from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_integer, parse_string
 from synapse.http.site import SynapseRequest
 from synapse.rest.client._base import client_patterns
-from synapse.storage.relations import (
-    AggregationPaginationToken,
-    PaginationChunk,
-    RelationPaginationToken,
-)
-from synapse.types import JsonDict, RoomStreamToken, StreamToken
+from synapse.storage.relations import AggregationPaginationToken, PaginationChunk
+from synapse.types import JsonDict, StreamToken
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
-    from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
 
 
-async def _parse_token(
-    store: "DataStore", token: Optional[str]
-) -> Optional[StreamToken]:
-    """
-    For backwards compatibility support RelationPaginationToken, but new pagination
-    tokens are generated as full StreamTokens, to be compatible with /sync and /messages.
-    """
-    if not token:
-        return None
-    # Luckily the format for StreamToken and RelationPaginationToken differ enough
-    # that they can easily be separated. An "_" appears in the serialization of
-    # RoomStreamToken (as part of StreamToken), but RelationPaginationToken uses
-    # "-" only for separators.
-    if "_" in token:
-        return await StreamToken.from_string(store, token)
-    else:
-        relation_token = RelationPaginationToken.from_string(token)
-        return StreamToken(
-            room_key=RoomStreamToken(relation_token.topological, relation_token.stream),
-            presence_key=0,
-            typing_key=0,
-            receipt_key=0,
-            account_data_key=0,
-            push_rules_key=0,
-            to_device_key=0,
-            device_list_key=0,
-            groups_key=0,
-        )
-
-
 class RelationPaginationServlet(RestServlet):
     """API to paginate relations on an event by topological ordering, optionally
     filtered by relation type and event type.
@@ -122,8 +87,12 @@ class RelationPaginationServlet(RestServlet):
             pagination_chunk = PaginationChunk(chunk=[])
         else:
             # Return the relations
-            from_token = await _parse_token(self.store, from_token_str)
-            to_token = await _parse_token(self.store, to_token_str)
+            from_token = None
+            if from_token_str:
+                from_token = await StreamToken.from_string(self.store, from_token_str)
+            to_token = None
+            if to_token_str:
+                to_token = await StreamToken.from_string(self.store, to_token_str)
 
             pagination_chunk = await self.store.get_relations_for_event(
                 event_id=parent_id,
@@ -317,8 +286,12 @@ class RelationAggregationGroupPaginationServlet(RestServlet):
         from_token_str = parse_string(request, "from")
         to_token_str = parse_string(request, "to")
 
-        from_token = await _parse_token(self.store, from_token_str)
-        to_token = await _parse_token(self.store, to_token_str)
+        from_token = None
+        if from_token_str:
+            from_token = await StreamToken.from_string(self.store, from_token_str)
+        to_token = None
+        if to_token_str:
+            to_token = await StreamToken.from_string(self.store, to_token_str)
 
         result = await self.store.get_relations_for_event(
             event_id=parent_id,
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index f3018ff690..53c385a86c 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -14,24 +14,14 @@
 import itertools
 import logging
 from collections import defaultdict
-from typing import (
-    TYPE_CHECKING,
-    Any,
-    Callable,
-    Dict,
-    Iterable,
-    List,
-    Optional,
-    Tuple,
-    Union,
-)
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
 
 from synapse.api.constants import Membership, PresenceState
 from synapse.api.errors import Codes, StoreError, SynapseError
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
-from synapse.events import EventBase
 from synapse.events.utils import (
+    SerializeEventConfig,
     format_event_for_client_v2_without_room_id,
     format_event_raw,
 )
@@ -48,7 +38,6 @@ from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import trace
-from synapse.storage.databases.main.relations import BundledAggregations
 from synapse.types import JsonDict, StreamToken
 from synapse.util import json_decoder
 
@@ -239,28 +228,31 @@ class SyncRestServlet(RestServlet):
         else:
             raise Exception("Unknown event format %s" % (filter.event_format,))
 
+        serialize_options = SerializeEventConfig(
+            event_format=event_formatter,
+            token_id=access_token_id,
+            only_event_fields=filter.event_fields,
+        )
+        stripped_serialize_options = SerializeEventConfig(
+            event_format=event_formatter,
+            token_id=access_token_id,
+            include_stripped_room_state=True,
+        )
+
         joined = await self.encode_joined(
-            sync_result.joined,
-            time_now,
-            access_token_id,
-            filter.event_fields,
-            event_formatter,
+            sync_result.joined, time_now, serialize_options
         )
 
         invited = await self.encode_invited(
-            sync_result.invited, time_now, access_token_id, event_formatter
+            sync_result.invited, time_now, stripped_serialize_options
         )
 
         knocked = await self.encode_knocked(
-            sync_result.knocked, time_now, access_token_id, event_formatter
+            sync_result.knocked, time_now, stripped_serialize_options
         )
 
         archived = await self.encode_archived(
-            sync_result.archived,
-            time_now,
-            access_token_id,
-            filter.event_fields,
-            event_formatter,
+            sync_result.archived, time_now, serialize_options
         )
 
         logger.debug("building sync response dict")
@@ -339,9 +331,7 @@ class SyncRestServlet(RestServlet):
         self,
         rooms: List[JoinedSyncResult],
         time_now: int,
-        token_id: Optional[int],
-        event_fields: List[str],
-        event_formatter: Callable[[JsonDict], JsonDict],
+        serialize_options: SerializeEventConfig,
     ) -> JsonDict:
         """
         Encode the joined rooms in a sync result
@@ -349,24 +339,14 @@ class SyncRestServlet(RestServlet):
         Args:
             rooms: list of sync results for rooms this user is joined to
             time_now: current time - used as a baseline for age calculations
-            token_id: ID of the user's auth token - used for namespacing
-                of transaction IDs
-            event_fields: List of event fields to include. If empty,
-                all fields will be returned.
-            event_formatter: function to convert from federation format
-                to client format
+            serialize_options: Event serializer options
         Returns:
             The joined rooms list, in our response format
         """
         joined = {}
         for room in rooms:
             joined[room.room_id] = await self.encode_room(
-                room,
-                time_now,
-                token_id,
-                joined=True,
-                only_fields=event_fields,
-                event_formatter=event_formatter,
+                room, time_now, joined=True, serialize_options=serialize_options
             )
 
         return joined
@@ -376,8 +356,7 @@ class SyncRestServlet(RestServlet):
         self,
         rooms: List[InvitedSyncResult],
         time_now: int,
-        token_id: Optional[int],
-        event_formatter: Callable[[JsonDict], JsonDict],
+        serialize_options: SerializeEventConfig,
     ) -> JsonDict:
         """
         Encode the invited rooms in a sync result
@@ -385,10 +364,7 @@ class SyncRestServlet(RestServlet):
         Args:
             rooms: list of sync results for rooms this user is invited to
             time_now: current time - used as a baseline for age calculations
-            token_id: ID of the user's auth token - used for namespacing
-                of transaction IDs
-            event_formatter: function to convert from federation format
-                to client format
+            serialize_options: Event serializer options
 
         Returns:
             The invited rooms list, in our response format
@@ -396,11 +372,7 @@ class SyncRestServlet(RestServlet):
         invited = {}
         for room in rooms:
             invite = self._event_serializer.serialize_event(
-                room.invite,
-                time_now,
-                token_id=token_id,
-                event_format=event_formatter,
-                include_stripped_room_state=True,
+                room.invite, time_now, config=serialize_options
             )
             unsigned = dict(invite.get("unsigned", {}))
             invite["unsigned"] = unsigned
@@ -415,8 +387,7 @@ class SyncRestServlet(RestServlet):
         self,
         rooms: List[KnockedSyncResult],
         time_now: int,
-        token_id: Optional[int],
-        event_formatter: Callable[[Dict], Dict],
+        serialize_options: SerializeEventConfig,
     ) -> Dict[str, Dict[str, Any]]:
         """
         Encode the rooms we've knocked on in a sync result.
@@ -424,8 +395,7 @@ class SyncRestServlet(RestServlet):
         Args:
             rooms: list of sync results for rooms this user is knocking on
             time_now: current time - used as a baseline for age calculations
-            token_id: ID of the user's auth token - used for namespacing of transaction IDs
-            event_formatter: function to convert from federation format to client format
+            serialize_options: Event serializer options
 
         Returns:
             The list of rooms the user has knocked on, in our response format.
@@ -433,11 +403,7 @@ class SyncRestServlet(RestServlet):
         knocked = {}
         for room in rooms:
             knock = self._event_serializer.serialize_event(
-                room.knock,
-                time_now,
-                token_id=token_id,
-                event_format=event_formatter,
-                include_stripped_room_state=True,
+                room.knock, time_now, config=serialize_options
             )
 
             # Extract the `unsigned` key from the knock event.
@@ -470,9 +436,7 @@ class SyncRestServlet(RestServlet):
         self,
         rooms: List[ArchivedSyncResult],
         time_now: int,
-        token_id: Optional[int],
-        event_fields: List[str],
-        event_formatter: Callable[[JsonDict], JsonDict],
+        serialize_options: SerializeEventConfig,
     ) -> JsonDict:
         """
         Encode the archived rooms in a sync result
@@ -480,23 +444,14 @@ class SyncRestServlet(RestServlet):
         Args:
             rooms: list of sync results for rooms this user is joined to
             time_now: current time - used as a baseline for age calculations
-            token_id: ID of the user's auth token - used for namespacing
-                of transaction IDs
-            event_fields: List of event fields to include. If empty,
-                all fields will be returned.
-            event_formatter: function to convert from federation format to client format
+            serialize_options: Event serializer options
         Returns:
             The archived rooms list, in our response format
         """
         joined = {}
         for room in rooms:
             joined[room.room_id] = await self.encode_room(
-                room,
-                time_now,
-                token_id,
-                joined=False,
-                only_fields=event_fields,
-                event_formatter=event_formatter,
+                room, time_now, joined=False, serialize_options=serialize_options
             )
 
         return joined
@@ -505,10 +460,8 @@ class SyncRestServlet(RestServlet):
         self,
         room: Union[JoinedSyncResult, ArchivedSyncResult],
         time_now: int,
-        token_id: Optional[int],
         joined: bool,
-        only_fields: Optional[List[str]],
-        event_formatter: Callable[[JsonDict], JsonDict],
+        serialize_options: SerializeEventConfig,
     ) -> JsonDict:
         """
         Args:
@@ -524,20 +477,6 @@ class SyncRestServlet(RestServlet):
         Returns:
             The room, encoded in our response format
         """
-
-        def serialize(
-            events: Iterable[EventBase],
-            aggregations: Optional[Dict[str, BundledAggregations]] = None,
-        ) -> List[JsonDict]:
-            return self._event_serializer.serialize_events(
-                events,
-                time_now=time_now,
-                bundle_aggregations=aggregations,
-                token_id=token_id,
-                event_format=event_formatter,
-                only_event_fields=only_fields,
-            )
-
         state_dict = room.state
         timeline_events = room.timeline.events
 
@@ -554,9 +493,14 @@ class SyncRestServlet(RestServlet):
                     event.room_id,
                 )
 
-        serialized_state = serialize(state_events)
-        serialized_timeline = serialize(
-            timeline_events, room.timeline.bundled_aggregations
+        serialized_state = self._event_serializer.serialize_events(
+            state_events, time_now, config=serialize_options
+        )
+        serialized_timeline = self._event_serializer.serialize_events(
+            timeline_events,
+            time_now,
+            config=serialize_options,
+            bundle_aggregations=room.timeline.bundled_aggregations,
         )
 
         account_data = room.account_data
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 6babd5963c..21888cc8c5 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -194,7 +194,7 @@ class StateHandler:
         }
 
     async def get_current_state_ids(
-        self, room_id: str, latest_event_ids: Optional[Iterable[str]] = None
+        self, room_id: str, latest_event_ids: Optional[Collection[str]] = None
     ) -> StateMap[str]:
         """Get the current state, or the state at a set of events, for a room
 
@@ -243,7 +243,7 @@ class StateHandler:
         return await self.get_hosts_in_room_at_events(room_id, event_ids)
 
     async def get_hosts_in_room_at_events(
-        self, room_id: str, event_ids: Iterable[str]
+        self, room_id: str, event_ids: Collection[str]
     ) -> Set[str]:
         """Get the hosts that were in a room at the given event ids
 
@@ -404,7 +404,7 @@ class StateHandler:
 
     @measure_func()
     async def resolve_state_groups_for_events(
-        self, room_id: str, event_ids: Iterable[str]
+        self, room_id: str, event_ids: Collection[str]
     ) -> _StateCacheEntry:
         """Given a list of event_ids this method fetches the state at each
         event, resolves conflicts between them and returns them.
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 1392363de1..b4a1b041b1 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -298,6 +298,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 # This user has new messages sent to them. Query messages for them
                 user_ids_to_query.add(user_id)
 
+        if not user_ids_to_query:
+            return {}, to_stream_id
+
         def get_device_messages_txn(txn: LoggingTransaction):
             # Build a query to select messages from any of the given devices that
             # are between the given stream id bounds.
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 36ca2b8273..fba270150b 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -55,37 +55,6 @@ class PaginationChunk:
 
 
 @attr.s(frozen=True, slots=True, auto_attribs=True)
-class RelationPaginationToken:
-    """Pagination token for relation pagination API.
-
-    As the results are in topological order, we can use the
-    `topological_ordering` and `stream_ordering` fields of the events at the
-    boundaries of the chunk as pagination tokens.
-
-    Attributes:
-        topological: The topological ordering of the boundary event
-        stream: The stream ordering of the boundary event.
-    """
-
-    topological: int
-    stream: int
-
-    @staticmethod
-    def from_string(string: str) -> "RelationPaginationToken":
-        try:
-            t, s = string.split("-")
-            return RelationPaginationToken(int(t), int(s))
-        except ValueError:
-            raise SynapseError(400, "Invalid relation pagination token")
-
-    async def to_string(self, store: "DataStore") -> str:
-        return "%d-%d" % (self.topological, self.stream)
-
-    def as_tuple(self) -> Tuple[Any, ...]:
-        return attr.astuple(self)
-
-
-@attr.s(frozen=True, slots=True, auto_attribs=True)
 class AggregationPaginationToken:
     """Pagination token for relation aggregation pagination API.
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e79ecf64a0..86f1a5373b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -561,7 +561,7 @@ class StateGroupStorage:
         return state_group_delta.prev_group, state_group_delta.delta_ids
 
     async def get_state_groups_ids(
-        self, _room_id: str, event_ids: Iterable[str]
+        self, _room_id: str, event_ids: Collection[str]
     ) -> Dict[int, MutableStateMap[str]]:
         """Get the event IDs of all the state for the state groups for the given events
 
@@ -596,7 +596,7 @@ class StateGroupStorage:
         return group_to_state[state_group]
 
     async def get_state_groups(
-        self, room_id: str, event_ids: Iterable[str]
+        self, room_id: str, event_ids: Collection[str]
     ) -> Dict[int, List[EventBase]]:
         """Get the state groups for the given list of event_ids
 
@@ -648,7 +648,7 @@ class StateGroupStorage:
         return self.stores.state._get_state_groups_from_groups(groups, state_filter)
 
     async def get_state_for_events(
-        self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None
+        self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
     ) -> Dict[str, StateMap[EventBase]]:
         """Given a list of event_ids and type tuples, return a list of state
         dicts for each event.
@@ -684,7 +684,7 @@ class StateGroupStorage:
         return {event: event_to_state[event] for event in event_ids}
 
     async def get_state_ids_for_events(
-        self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None
+        self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
     ) -> Dict[str, StateMap[str]]:
         """
         Get the state dicts corresponding to a list of events, containing the event_ids
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 60c03a66fd..a9f67dcbac 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -40,7 +40,7 @@ from typing import (
 )
 
 import attr
-from typing_extensions import ContextManager
+from typing_extensions import ContextManager, Literal
 
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError
@@ -96,6 +96,10 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
 
     __slots__ = ["_deferred", "_observers", "_result"]
 
+    _deferred: "defer.Deferred[_T]"
+    _observers: Union[List["defer.Deferred[_T]"], Tuple[()]]
+    _result: Union[None, Tuple[Literal[True], _T], Tuple[Literal[False], Failure]]
+
     def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
         object.__setattr__(self, "_deferred", deferred)
         object.__setattr__(self, "_result", None)
@@ -158,12 +162,14 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
         effect the underlying deferred.
         """
         if not self._result:
+            assert isinstance(self._observers, list)
             d: "defer.Deferred[_T]" = defer.Deferred()
             self._observers.append(d)
             return d
+        elif self._result[0]:
+            return defer.succeed(self._result[1])
         else:
-            success, res = self._result
-            return defer.succeed(res) if success else defer.fail(res)
+            return defer.fail(self._result[1])
 
     def observers(self) -> "Collection[defer.Deferred[_T]]":
         return self._observers
@@ -175,6 +181,8 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
         return self._result is not None and self._result[0] is True
 
     def get_result(self) -> Union[_T, Failure]:
+        if self._result is None:
+            raise ValueError(f"{self!r} has no result yet")
         return self._result[1]
 
     def __getattr__(self, name: str) -> Any:
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 1b970ce479..281cbe4d88 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -81,8 +81,9 @@ async def filter_events_for_client(
 
     types = ((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id))
 
+    # we exclude outliers at this point, and then handle them separately later
     event_id_to_state = await storage.state.get_state_for_events(
-        frozenset(e.event_id for e in events),
+        frozenset(e.event_id for e in events if not e.internal_metadata.outlier),
         state_filter=StateFilter.from_types(types),
     )
 
@@ -154,6 +155,17 @@ async def filter_events_for_client(
         if event.event_id in always_include_ids:
             return event
 
+        # we need to handle outliers separately, since we don't have the room state.
+        if event.internal_metadata.outlier:
+            # Normally these can't be seen by clients, but we make an exception for
+            # for out-of-band membership events (eg, incoming invites, or rejections of
+            # said invite) for the user themselves.
+            if event.type == EventTypes.Member and event.state_key == user_id:
+                logger.debug("Returning out-of-band-membership event %s", event)
+                return event
+
+            return None
+
         state = event_id_to_state[event.event_id]
 
         # get the room_visibility at the time of the event.
@@ -198,6 +210,9 @@ async def filter_events_for_client(
 
             # Always allow the user to see their own leave events, otherwise
             # they won't see the room disappear if they reject the invite
+            #
+            # (Note this doesn't work for out-of-band invite rejections, which don't
+            # have prev_state populated. They are handled above in the outlier code.)
             if membership == "leave" and (
                 prev_membership == "join" or prev_membership == "invite"
             ):