summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md58
-rw-r--r--README.rst2
-rw-r--r--changelog.d/7021.bugfix1
-rw-r--r--changelog.d/7732.bugfix1
-rw-r--r--changelog.d/7760.bugfix1
-rw-r--r--changelog.d/7765.misc1
-rw-r--r--changelog.d/7768.misc1
-rw-r--r--changelog.d/7769.misc1
-rw-r--r--changelog.d/7770.misc1
-rw-r--r--changelog.d/7779.bugfix1
-rw-r--r--debian/changelog6
-rw-r--r--docker/Dockerfile2
-rw-r--r--docs/jwt.md90
-rw-r--r--docs/reverse_proxy.md2
-rw-r--r--docs/sample_config.yaml35
-rw-r--r--scripts-dev/check_signature.py2
-rwxr-xr-xscripts-dev/lint.sh2
-rw-r--r--setup.cfg1
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py5
-rw-r--r--synapse/app/generic_worker.py30
-rw-r--r--synapse/appservice/api.py1
-rw-r--r--synapse/config/__main__.py1
-rw-r--r--synapse/config/emailconfig.py5
-rw-r--r--synapse/config/jwt_config.py35
-rw-r--r--synapse/events/builder.py2
-rw-r--r--synapse/federation/federation_client.py6
-rw-r--r--synapse/federation/federation_server.py6
-rw-r--r--synapse/federation/send_queue.py2
-rw-r--r--synapse/federation/sender/per_destination_queue.py4
-rw-r--r--synapse/federation/transport/client.py2
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/groups/attestations.py2
-rw-r--r--synapse/groups/groups_server.py2
-rw-r--r--synapse/handlers/appservice.py74
-rw-r--r--synapse/handlers/auth.py3
-rw-r--r--synapse/handlers/cas_handler.py3
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/groups_local.py2
-rw-r--r--synapse/handlers/identity.py4
-rw-r--r--synapse/handlers/message.py23
-rw-r--r--synapse/handlers/room_member.py202
-rw-r--r--synapse/handlers/room_member_worker.py19
-rw-r--r--synapse/handlers/typing.py3
-rw-r--r--synapse/http/matrixfederationclient.py12
-rw-r--r--synapse/logging/opentracing.py4
-rw-r--r--synapse/metrics/background_process_metrics.py10
-rw-r--r--synapse/notifier.py2
-rw-r--r--synapse/push/push_rule_evaluator.py31
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/replication/http/_base.py4
-rw-r--r--synapse/replication/http/membership.py92
-rw-r--r--synapse/replication/tcp/__init__.py2
-rw-r--r--synapse/replication/tcp/client.py2
-rw-r--r--synapse/replication/tcp/commands.py13
-rw-r--r--synapse/replication/tcp/handler.py4
-rw-r--r--synapse/replication/tcp/protocol.py2
-rw-r--r--synapse/replication/tcp/redis.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py56
-rw-r--r--synapse/replication/tcp/streams/events.py4
-rw-r--r--synapse/rest/client/v1/login.py48
-rw-r--r--synapse/rest/client/v1/room.py6
-rw-r--r--synapse/rest/client/v1/voip.py2
-rw-r--r--synapse/rest/media/v1/thumbnailer.py3
-rw-r--r--synapse/secrets.py3
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/storage/data_stores/main/cache.py36
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py54
-rw-r--r--synapse/storage/data_stores/main/devices.py70
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py65
-rw-r--r--synapse/storage/data_stores/main/events.py119
-rw-r--r--synapse/storage/data_stores/main/events_worker.py5
-rw-r--r--synapse/storage/data_stores/main/group_server.py52
-rw-r--r--synapse/storage/data_stores/main/purge_events.py1
-rw-r--r--synapse/storage/data_stores/main/pusher.py108
-rw-r--r--synapse/storage/data_stores/main/room.py41
-rw-r--r--synapse/storage/data_stores/main/schema/delta/25/fts.py6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/27/ts.py6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/31/search_update.py6
-rw-r--r--synapse/storage/data_stores/main/schema/delta/33/event_fields.py6
-rw-r--r--synapse/storage/data_stores/main/tags.py45
-rw-r--r--synapse/storage/data_stores/main/ui_auth.py2
-rw-r--r--synapse/storage/persist_events.py6
-rw-r--r--synapse/storage/types.py2
-rw-r--r--synapse/streams/config.py4
-rw-r--r--synapse/streams/events.py2
-rw-r--r--synapse/types.py2
-rw-r--r--synapse/util/__init__.py2
-rw-r--r--synapse/util/async_helpers.py2
-rw-r--r--synapse/util/caches/descriptors.py2
-rw-r--r--synapse/util/distributor.py2
-rw-r--r--synapse/util/patch_inline_callbacks.py2
-rw-r--r--synapse/util/retryutils.py4
-rw-r--r--synapse/visibility.py4
-rw-r--r--tests/crypto/test_keyring.py2
-rw-r--r--tests/handlers/test_appservice.py68
-rw-r--r--tests/handlers/test_e2e_keys.py4
-rw-r--r--tests/push/test_push_rule_evaluator.py17
-rw-r--r--tests/rest/admin/test_room.py1
-rw-r--r--tests/rest/client/test_retention.py2
-rw-r--r--tests/rest/client/v1/test_presence.py2
-rw-r--r--tests/rest/client/v2_alpha/test_relations.py2
-rw-r--r--tests/rest/media/v1/test_media_storage.py4
-rw-r--r--tests/storage/test_base.py7
-rw-r--r--tests/test_mau.py2
-rw-r--r--tests/test_utils/event_injection.py2
-rw-r--r--tests/util/test_logcontext.py4
-rw-r--r--tox.ini4
108 files changed, 1028 insertions, 707 deletions
diff --git a/CHANGES.md b/CHANGES.md
index 1cdb0e3afc..bd8cf1a11f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,8 +1,59 @@
+Synapse 1.17.0rc1 (2020-07-09)
+==============================
+
+Bugfixes
+--------
+
+- Fix inconsistent handling of upper and lower case in email addresses when used as identifiers for login, etc. Contributed by @dklimpel. ([\#7021](https://github.com/matrix-org/synapse/issues/7021))
+- Fix "Tried to close a non-active scope!" error messages when opentracing is enabled. ([\#7732](https://github.com/matrix-org/synapse/issues/7732))
+- Fix incorrect error message when database CTYPE was set incorrectly. ([\#7760](https://github.com/matrix-org/synapse/issues/7760))
+- Fix to not ignore `set_tweak` actions in Push Rules that have no `value`, as permitted by the specification. ([\#7766](https://github.com/matrix-org/synapse/issues/7766))
+- Fix synctl to handle empty config files correctly. Contributed by @kotovalexarian. ([\#7779](https://github.com/matrix-org/synapse/issues/7779))
+- Fixes a long standing bug in worker mode where worker information was saved in the devices table instead of the original IP address and user agent. ([\#7797](https://github.com/matrix-org/synapse/issues/7797))
+- Fix 'stuck invites' which happen when we are unable to reject a room invite received over federation. ([\#7804](https://github.com/matrix-org/synapse/issues/7804), [\#7809](https://github.com/matrix-org/synapse/issues/7809), [\#7810](https://github.com/matrix-org/synapse/issues/7810))
+
+
+Updates to the Docker image
+---------------------------
+
+- Include libwebp in the Docker file to properly handle webp image uploads. ([\#7791](https://github.com/matrix-org/synapse/issues/7791))
+
+
+Improved Documentation
+----------------------
+
+- Improve the documentation of the non-standard JSON web token login type. ([\#7776](https://github.com/matrix-org/synapse/issues/7776))
+- Update doc links for caddy. Contributed by Nicolai Søborg. ([\#7789](https://github.com/matrix-org/synapse/issues/7789))
+
+
+Internal Changes
+----------------
+
+- Refactor getting replication updates from database. ([\#7740](https://github.com/matrix-org/synapse/issues/7740))
+- Send push notifications with a high or low priority depending upon whether they may generate user-observable effects. ([\#7765](https://github.com/matrix-org/synapse/issues/7765))
+- Use symbolic names for replication stream names. ([\#7768](https://github.com/matrix-org/synapse/issues/7768))
+- Add early returns to `_check_for_soft_fail`. ([\#7769](https://github.com/matrix-org/synapse/issues/7769))
+- Fix up `synapse.handlers.federation` to pass mypy. ([\#7770](https://github.com/matrix-org/synapse/issues/7770))
+- Convert the appserver handler to async/await. ([\#7775](https://github.com/matrix-org/synapse/issues/7775))
+- Allow to use higher versions of prometheus_client <0.9.0 which are expected to introduce no breaking changes. Contributed by Oliver Kurz. ([\#7780](https://github.com/matrix-org/synapse/issues/7780))
+- Update linting scripts and codebase to be compatible with `isort` v5. ([\#7786](https://github.com/matrix-org/synapse/issues/7786))
+- Stop populating unused table `local_invites`. ([\#7793](https://github.com/matrix-org/synapse/issues/7793))
+- Ensure that strings (not bytes) are passed into JSON serialization. ([\#7799](https://github.com/matrix-org/synapse/issues/7799))
+- Switch from simplejson to the standard library json. ([\#7800](https://github.com/matrix-org/synapse/issues/7800))
+- Add `signing_key` property to `HomeServer` to save code duplication. ([\#7805](https://github.com/matrix-org/synapse/issues/7805))
+- Improve stacktraces from exceptions in background processes. ([\#7808](https://github.com/matrix-org/synapse/issues/7808))
+- Fix various spelling errors in comments and log lines. ([\#7811](https://github.com/matrix-org/synapse/issues/7811))
+
+
 Synapse 1.16.0 (2020-07-08)
 ===========================
 
-No significant changes.
+No significant changes since 1.16.0rc2.
 
+Note that this release deprecates the `m.login.jwt` login method, renaming it
+to `org.matrix.login.jwt`, as `m.login.jwt` is not part of the Matrix spec.
+Otherwise the behaviour is identical. Synapse will accept both names for now,
+but this may change in a future release.
 
 Synapse 1.16.0rc2 (2020-07-02)
 ==============================
@@ -45,11 +96,6 @@ Security advisory
 Synapse 1.16.0rc1 (2020-07-01)
 ==============================
 
-Note that this release deprecates the `m.login.jwt` login method, renaming it
-to `org.matrix.login.jwt`, as `m.login.jwt` is not part of the Matrix spec.
-Otherwise the behaviour is identical. Synapse will accept both names for now,
-but this may change in a future release.
-
 Features
 --------
 
diff --git a/README.rst b/README.rst
index 2441b6a35c..38376e23c2 100644
--- a/README.rst
+++ b/README.rst
@@ -215,7 +215,7 @@ Using a reverse proxy with Synapse
 It is recommended to put a reverse proxy such as
 `nginx <https://nginx.org/en/docs/http/ngx_http_proxy_module.html>`_,
 `Apache <https://httpd.apache.org/docs/current/mod/mod_proxy_http.html>`_,
-`Caddy <https://caddyserver.com/docs/proxy>`_ or
+`Caddy <https://caddyserver.com/docs/quick-starts/reverse-proxy>`_ or
 `HAProxy <https://www.haproxy.org/>`_ in front of Synapse. One advantage of
 doing so is that it means that you can expose the default https port (443) to
 Matrix clients without needing to run Synapse with root privileges.
diff --git a/changelog.d/7021.bugfix b/changelog.d/7021.bugfix
deleted file mode 100644
index 140fe37b2d..0000000000
--- a/changelog.d/7021.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix inconsistent handling of upper and lower case in email addresses when used as identifiers for login, etc. Contributed by @dklimpel.
diff --git a/changelog.d/7732.bugfix b/changelog.d/7732.bugfix
deleted file mode 100644
index d5e352e141..0000000000
--- a/changelog.d/7732.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix "Tried to close a non-active scope!" error messages when opentracing is enabled.
diff --git a/changelog.d/7760.bugfix b/changelog.d/7760.bugfix
deleted file mode 100644
index f6081f3d30..0000000000
--- a/changelog.d/7760.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix incorrect error message when database CTYPE was set incorrectly.
diff --git a/changelog.d/7765.misc b/changelog.d/7765.misc
deleted file mode 100644
index fa9cfd24cb..0000000000
--- a/changelog.d/7765.misc
+++ /dev/null
@@ -1 +0,0 @@
-Send push notifications with a high or low priority depending upon whether they may generate user-observable effects.
diff --git a/changelog.d/7768.misc b/changelog.d/7768.misc
deleted file mode 100644
index dfb3d24c7d..0000000000
--- a/changelog.d/7768.misc
+++ /dev/null
@@ -1 +0,0 @@
-Use symbolic names for replication stream names.
diff --git a/changelog.d/7769.misc b/changelog.d/7769.misc
deleted file mode 100644
index 2e200286ce..0000000000
--- a/changelog.d/7769.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add early returns to `_check_for_soft_fail`.
diff --git a/changelog.d/7770.misc b/changelog.d/7770.misc
deleted file mode 100644
index 5b864084be..0000000000
--- a/changelog.d/7770.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix up `synapse.handlers.federation` to pass mypy.
diff --git a/changelog.d/7779.bugfix b/changelog.d/7779.bugfix
deleted file mode 100644
index 61de45d570..0000000000
--- a/changelog.d/7779.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix synctl to handle empty config files correctly. Contributed by @kotovalexarian.
diff --git a/debian/changelog b/debian/changelog
index 1e7d7191ad..f7c146d777 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+matrix-synapse-py3 (1.17.0rc1) stable; urgency=medium
+
+  * New synapse release 1.17.0rc1.
+
+ -- Synapse Packaging team <packages@matrix.org>  Thu, 09 Jul 2020 16:53:12 +0100
+
 matrix-synapse-py3 (1.16.0) stable; urgency=medium
 
   * New synapse release 1.16.0.
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 9a3cf7b3f5..093e89af6c 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -24,6 +24,7 @@ RUN apk add \
         build-base \
         libffi-dev \
         libjpeg-turbo-dev \
+        libwebp-dev \
         libressl-dev \
         libxslt-dev \
         linux-headers \
@@ -61,6 +62,7 @@ FROM docker.io/python:${PYTHON_VERSION}-alpine3.11
 RUN apk add --no-cache --virtual .runtime_deps \
         libffi \
         libjpeg-turbo \
+        libwebp \
         libressl \
         libxslt \
         libpq \
diff --git a/docs/jwt.md b/docs/jwt.md
new file mode 100644
index 0000000000..289d66b365
--- /dev/null
+++ b/docs/jwt.md
@@ -0,0 +1,90 @@
+# JWT Login Type
+
+Synapse comes with a non-standard login type to support
+[JSON Web Tokens](https://en.wikipedia.org/wiki/JSON_Web_Token). In general the
+documentation for
+[the login endpoint](https://matrix.org/docs/spec/client_server/r0.6.1#login)
+is still valid (and the mechanism works similarly to the
+[token based login](https://matrix.org/docs/spec/client_server/r0.6.1#token-based)).
+
+To log in using a JSON Web Token, clients should submit a `/login` request as
+follows:
+
+```json
+{
+  "type": "org.matrix.login.jwt",
+  "token": "<jwt>"
+}
+```
+
+Note that the login type of `m.login.jwt` is supported, but is deprecated. This
+will be removed in a future version of Synapse.
+
+The `jwt` should encode the local part of the user ID as the standard `sub`
+claim. In the case that the token is not valid, the homeserver must respond with
+`401 Unauthorized` and an error code of `M_UNAUTHORIZED`.
+
+(Note that this differs from the token based logins which return a
+`403 Forbidden` and an error code of `M_FORBIDDEN` if an error occurs.)
+
+As with other login types, there are additional fields (e.g. `device_id` and
+`initial_device_display_name`) which can be included in the above request.
+
+## Preparing Synapse
+
+The JSON Web Token integration in Synapse uses the
+[`PyJWT`](https://pypi.org/project/pyjwt/) library, which must be installed
+as follows:
+
+ * The relevant libraries are included in the Docker images and Debian packages
+   provided by `matrix.org` so no further action is needed.
+
+ * If you installed Synapse into a virtualenv, run `/path/to/env/bin/pip
+   install synapse[pyjwt]` to install the necessary dependencies.
+
+ * For other installation mechanisms, see the documentation provided by the
+   maintainer.
+
+To enable the JSON web token integration, you should then add an `jwt_config` section
+to your configuration file (or uncomment the `enabled: true` line in the
+existing section). See [sample_config.yaml](./sample_config.yaml) for some
+sample settings.
+
+## How to test JWT as a developer
+
+Although JSON Web Tokens are typically generated from an external server, the
+examples below use [PyJWT](https://pyjwt.readthedocs.io/en/latest/) directly.
+
+1.  Configure Synapse with JWT logins:
+
+    ```yaml
+    jwt_config:
+        enabled: true
+        secret: "my-secret-token"
+        algorithm: "HS256"
+    ```
+2.  Generate a JSON web token:
+
+    ```bash
+    $ pyjwt --key=my-secret-token --alg=HS256 encode sub=test-user
+    eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.Ag71GT8v01UO3w80aqRPTeuVPBIBZkYhNTJJ-_-zQIc
+    ```
+3.  Query for the login types and ensure `org.matrix.login.jwt` is there:
+
+    ```bash
+    curl http://localhost:8080/_matrix/client/r0/login
+    ```
+4.  Login used the generated JSON web token from above:
+
+    ```bash
+    $ curl http://localhost:8082/_matrix/client/r0/login -X POST \
+        --data '{"type":"org.matrix.login.jwt","token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.Ag71GT8v01UO3w80aqRPTeuVPBIBZkYhNTJJ-_-zQIc"}'
+    {
+        "access_token": "<access token>",
+        "device_id": "ACBDEFGHI",
+        "home_server": "localhost:8080",
+        "user_id": "@test-user:localhost:8480"
+    }
+    ```
+
+You should now be able to use the returned access token to query the client API.
diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md
index cbb8269568..131990001a 100644
--- a/docs/reverse_proxy.md
+++ b/docs/reverse_proxy.md
@@ -3,7 +3,7 @@
 It is recommended to put a reverse proxy such as
 [nginx](https://nginx.org/en/docs/http/ngx_http_proxy_module.html),
 [Apache](https://httpd.apache.org/docs/current/mod/mod_proxy_http.html),
-[Caddy](https://caddyserver.com/docs/proxy) or
+[Caddy](https://caddyserver.com/docs/quick-starts/reverse-proxy) or
 [HAProxy](https://www.haproxy.org/) in front of Synapse. One advantage
 of doing so is that it means that you can expose the default https port
 (443) to Matrix clients without needing to run Synapse with root
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 2169082470..390cd7d607 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -1979,12 +1979,39 @@ sso:
     #template_dir: "res/templates"
 
 
-# The JWT needs to contain a globally unique "sub" (subject) claim.
+# JSON web token integration. The following settings can be used to make
+# Synapse JSON web tokens for authentication, instead of its internal
+# password database.
+#
+# Each JSON Web Token needs to contain a "sub" (subject) claim, which is
+# used as the localpart of the mxid.
+#
+# Note that this is a non-standard login type and client support is
+# expected to be non-existant.
+#
+# See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md.
 #
 #jwt_config:
-#   enabled: true
-#   secret: "a secret"
-#   algorithm: "HS256"
+    # Uncomment the following to enable authorization using JSON web
+    # tokens. Defaults to false.
+    #
+    #enabled: true
+
+    # This is either the private shared secret or the public key used to
+    # decode the contents of the JSON web token.
+    #
+    # Required if 'enabled' is true.
+    #
+    #secret: "provided-by-your-issuer"
+
+    # The algorithm used to sign the JSON web token.
+    #
+    # Supported algorithms are listed at
+    # https://pyjwt.readthedocs.io/en/latest/algorithms.html
+    #
+    # Required if 'enabled' is true.
+    #
+    #algorithm: "provided-by-your-issuer"
 
 
 password_config:
diff --git a/scripts-dev/check_signature.py b/scripts-dev/check_signature.py
index ecda103cf7..6755bc5282 100644
--- a/scripts-dev/check_signature.py
+++ b/scripts-dev/check_signature.py
@@ -2,9 +2,9 @@ import argparse
 import json
 import logging
 import sys
-import urllib2
 
 import dns.resolver
+import urllib2
 from signedjson.key import decode_verify_key_bytes, write_signing_keys
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
diff --git a/scripts-dev/lint.sh b/scripts-dev/lint.sh
index 6f1ba22931..66b0568858 100755
--- a/scripts-dev/lint.sh
+++ b/scripts-dev/lint.sh
@@ -15,7 +15,7 @@ else
 fi
 
 echo "Linting these locations: $files"
-isort -y -rc $files
+isort $files
 python3 -m black $files
 ./scripts-dev/config-lint.sh
 flake8 $files
diff --git a/setup.cfg b/setup.cfg
index f2bca272e1..a32278ea8a 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -26,7 +26,6 @@ ignore=W503,W504,E203,E731,E501
 
 [isort]
 line_length = 88
-not_skip = __init__.py
 sections=FUTURE,STDLIB,COMPAT,THIRDPARTY,TWISTED,FIRSTPARTY,TESTS,LOCALFOLDER
 default_section=THIRDPARTY
 known_first_party = synapse
diff --git a/synapse/__init__.py b/synapse/__init__.py
index de65ce6db8..5bb09a37d7 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.16.0"
+__version__ = "1.17.0rc1"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e615533ea3..768b840b6a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 from typing import Optional
 
@@ -22,7 +21,6 @@ from netaddr import IPAddress
 from twisted.internet import defer
 from twisted.web.server import Request
 
-import synapse.logging.opentracing as opentracing
 import synapse.types
 from synapse import event_auth
 from synapse.api.auth_blocking import AuthBlocking
@@ -35,6 +33,7 @@ from synapse.api.errors import (
 )
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
+from synapse.logging import opentracing as opentracing
 from synapse.types import StateMap, UserID
 from synapse.util.caches import register_cache
 from synapse.util.caches.lrucache import LruCache
@@ -543,7 +542,7 @@ class Auth(object):
         # Currently we ignore the `for_verification` flag even though there are
         # some situations where we can drop particular auth events when adding
         # to the event's `auth_events` (e.g. joins pointing to previous joins
-        # when room is publically joinable). Dropping event IDs has the
+        # when room is publicly joinable). Dropping event IDs has the
         # advantage that the auth chain for the room grows slower, but we use
         # the auth chain in state resolution v2 to order events, which means
         # care must be taken if dropping events to ensure that it doesn't
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 27a3fc9ed6..f6792d9fc8 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
 
 from typing_extensions import ContextManager
 
-from twisted.internet import defer, reactor
+from twisted.internet import address, defer, reactor
 
 import synapse
 import synapse.events
@@ -206,10 +206,30 @@ class KeyUploadServlet(RestServlet):
 
         if body:
             # They're actually trying to upload something, proxy to main synapse.
-            # Pass through the auth headers, if any, in case the access token
-            # is there.
-            auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
-            headers = {"Authorization": auth_headers}
+
+            # Proxy headers from the original request, such as the auth headers
+            # (in case the access token is there) and the original IP /
+            # User-Agent of the request.
+            headers = {
+                header: request.requestHeaders.getRawHeaders(header, [])
+                for header in (b"Authorization", b"User-Agent")
+            }
+            # Add the previous hop the the X-Forwarded-For header.
+            x_forwarded_for = request.requestHeaders.getRawHeaders(
+                b"X-Forwarded-For", []
+            )
+            if isinstance(request.client, (address.IPv4Address, address.IPv6Address)):
+                previous_host = request.client.host.encode("ascii")
+                # If the header exists, add to the comma-separated list of the first
+                # instance of the header. Otherwise, generate a new header.
+                if x_forwarded_for:
+                    x_forwarded_for = [
+                        x_forwarded_for[0] + b", " + previous_host
+                    ] + x_forwarded_for[1:]
+                else:
+                    x_forwarded_for = [previous_host]
+            headers[b"X-Forwarded-For"] = x_forwarded_for
+
             try:
                 result = await self.http_client.post_json_get_json(
                     self.main_uri + request.uri.decode("ascii"), body, headers=headers
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index da9a5e86d4..f92bfb420b 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -98,7 +98,6 @@ class ApplicationServiceApi(SimpleHttpClient):
         if service.url is None:
             return False
         uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
-        response = None
         try:
             response = yield self.get_json(uri, {"access_token": service.hs_token})
             if response is not None:  # just an empty json object
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index fca35b008c..65043d5b5b 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -16,6 +16,7 @@ from synapse.config._base import ConfigError
 
 if __name__ == "__main__":
     import sys
+
     from synapse.config.homeserver import HomeServerConfig
 
     action = sys.argv[1]
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index ca61214454..b1dc7ad502 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -14,7 +14,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from __future__ import print_function
 
 # This file can't be called email.py because if it is, we cannot:
@@ -73,7 +72,7 @@ class EmailConfig(Config):
 
         template_dir = email_config.get("template_dir")
         # we need an absolute path, because we change directory after starting (and
-        # we don't yet know what auxilliary templates like mail.css we will need).
+        # we don't yet know what auxiliary templates like mail.css we will need).
         # (Note that loading as package_resources with jinja.PackageLoader doesn't
         # work for the same reason.)
         if not template_dir:
@@ -145,8 +144,8 @@ class EmailConfig(Config):
             or self.threepid_behaviour_email == ThreepidBehaviour.LOCAL
         ):
             # make sure we can import the required deps
-            import jinja2
             import bleach
+            import jinja2
 
             # prevent unused warnings
             jinja2
diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py
index a568726985..fce96b4acf 100644
--- a/synapse/config/jwt_config.py
+++ b/synapse/config/jwt_config.py
@@ -45,10 +45,37 @@ class JWTConfig(Config):
 
     def generate_config_section(self, **kwargs):
         return """\
-        # The JWT needs to contain a globally unique "sub" (subject) claim.
+        # JSON web token integration. The following settings can be used to make
+        # Synapse JSON web tokens for authentication, instead of its internal
+        # password database.
+        #
+        # Each JSON Web Token needs to contain a "sub" (subject) claim, which is
+        # used as the localpart of the mxid.
+        #
+        # Note that this is a non-standard login type and client support is
+        # expected to be non-existant.
+        #
+        # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md.
         #
         #jwt_config:
-        #   enabled: true
-        #   secret: "a secret"
-        #   algorithm: "HS256"
+            # Uncomment the following to enable authorization using JSON web
+            # tokens. Defaults to false.
+            #
+            #enabled: true
+
+            # This is either the private shared secret or the public key used to
+            # decode the contents of the JSON web token.
+            #
+            # Required if 'enabled' is true.
+            #
+            #secret: "provided-by-your-issuer"
+
+            # The algorithm used to sign the JSON web token.
+            #
+            # Supported algorithms are listed at
+            # https://pyjwt.readthedocs.io/en/latest/algorithms.html
+            #
+            # Required if 'enabled' is true.
+            #
+            #algorithm: "provided-by-your-issuer"
         """
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index a0c4a40c27..92aadfe7ef 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -162,7 +162,7 @@ class EventBuilderFactory(object):
     def __init__(self, hs):
         self.clock = hs.get_clock()
         self.hostname = hs.hostname
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
 
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 687cd841ac..a37cc9cb4a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -87,7 +87,7 @@ class FederationClient(FederationBase):
         self.transport_layer = hs.get_federation_transport_client()
 
         self.hostname = hs.hostname
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
 
         self._get_pdu_cache = ExpiringCache(
             cache_name="get_pdu_cache",
@@ -245,7 +245,7 @@ class FederationClient(FederationBase):
             event_id: event to fetch
             room_version: version of the room
             outlier: Indicates whether the PDU is an `outlier`, i.e. if
-                it's from an arbitary point in the context as opposed to part
+                it's from an arbitrary point in the context as opposed to part
                 of the current block of PDUs. Defaults to `False`
             timeout: How long to try (in ms) each destination for before
                 moving to the next destination. None indicates no timeout.
@@ -351,7 +351,7 @@ class FederationClient(FederationBase):
         outlier: bool = False,
         include_none: bool = False,
     ) -> List[EventBase]:
-        """Takes a list of PDUs and checks the signatures and hashs of each
+        """Takes a list of PDUs and checks the signatures and hashes of each
         one. If a PDU fails its signature check then we check if we have it in
         the database and if not then request if from the originating server of
         that PDU.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e704cf2f44..86051decd4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -717,7 +717,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
     # server name is a literal IP
     allow_ip_literals = acl_event.content.get("allow_ip_literals", True)
     if not isinstance(allow_ip_literals, bool):
-        logger.warning("Ignorning non-bool allow_ip_literals flag")
+        logger.warning("Ignoring non-bool allow_ip_literals flag")
         allow_ip_literals = True
     if not allow_ip_literals:
         # check for ipv6 literals. These start with '['.
@@ -731,7 +731,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
     # next,  check the deny list
     deny = acl_event.content.get("deny", [])
     if not isinstance(deny, (list, tuple)):
-        logger.warning("Ignorning non-list deny ACL %s", deny)
+        logger.warning("Ignoring non-list deny ACL %s", deny)
         deny = []
     for e in deny:
         if _acl_entry_matches(server_name, e):
@@ -741,7 +741,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
     # then the allow list.
     allow = acl_event.content.get("allow", [])
     if not isinstance(allow, (list, tuple)):
-        logger.warning("Ignorning non-list allow ACL %s", allow)
+        logger.warning("Ignoring non-list allow ACL %s", allow)
         allow = []
     for e in allow:
         if _acl_entry_matches(server_name, e):
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 6bbd762681..860b03f7b9 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -359,7 +359,7 @@ class BaseFederationRow(object):
     Specifies how to identify, serialize and deserialize the different types.
     """
 
-    TypeId = ""  # Unique string that ids the type. Must be overriden in sub classes.
+    TypeId = ""  # Unique string that ids the type. Must be overridden in sub classes.
 
     @staticmethod
     def from_data(data):
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 4e698981a4..12966e239b 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -119,7 +119,7 @@ class PerDestinationQueue(object):
         )
 
     def send_pdu(self, pdu: EventBase, order: int) -> None:
-        """Add a PDU to the queue, and start the transmission loop if neccessary
+        """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
             pdu: pdu to send
@@ -129,7 +129,7 @@ class PerDestinationQueue(object):
         self.attempt_new_transaction()
 
     def send_presence(self, states: Iterable[UserPresenceState]) -> None:
-        """Add presence updates to the queue. Start the transmission loop if neccessary.
+        """Add presence updates to the queue. Start the transmission loop if necessary.
 
         Args:
             states: presence to send
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 1ac522c8a1..e502c12050 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -746,7 +746,7 @@ class TransportLayerClient(object):
     def remove_user_from_group(
         self, destination, group_id, requester_user_id, user_id, content
     ):
-        """Remove a user fron a group
+        """Remove a user from a group
         """
         path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id)
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 506f35e39c..1478ee03a5 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -110,7 +110,7 @@ class Authenticator(object):
         self.server_name = hs.hostname
         self.store = hs.get_datastore()
         self.federation_domain_whitelist = hs.config.federation_domain_whitelist
-        self.notifer = hs.get_notifier()
+        self.notifier = hs.get_notifier()
 
         self.replication_client = None
         if hs.config.worker.worker_app:
@@ -176,7 +176,7 @@ class Authenticator(object):
             await self.store.set_destination_retry_timings(origin, None, 0, 0)
 
             # Inform the relevant places that the remote server is back up.
-            self.notifer.notify_remote_server_up(origin)
+            self.notifier.notify_remote_server_up(origin)
             if self.replication_client:
                 # If we're on a worker we try and inform master about this. The
                 # replication client doesn't hook into the notifier to avoid
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 27b0c02655..dab13c243f 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -70,7 +70,7 @@ class GroupAttestationSigning(object):
         self.keyring = hs.get_keyring()
         self.clock = hs.get_clock()
         self.server_name = hs.hostname
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
 
     @defer.inlineCallbacks
     def verify_attestation(self, attestation, group_id, user_id, server_name=None):
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 8db8ab1b7b..8cb922ddc7 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -41,7 +41,7 @@ class GroupsServerWorkerHandler(object):
         self.clock = hs.get_clock()
         self.keyring = hs.get_keyring()
         self.is_mine_id = hs.is_mine_id
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
         self.server_name = hs.hostname
         self.attestations = hs.get_groups_attestation_signing()
         self.transport_client = hs.get_federation_transport_client()
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 904c96eeec..92d4c6e16c 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -48,8 +48,7 @@ class ApplicationServicesHandler(object):
         self.current_max = 0
         self.is_processing = False
 
-    @defer.inlineCallbacks
-    def notify_interested_services(self, current_id):
+    async def notify_interested_services(self, current_id):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
@@ -74,7 +73,7 @@ class ApplicationServicesHandler(object):
                     (
                         upper_bound,
                         events,
-                    ) = yield self.store.get_new_events_for_appservice(
+                    ) = await self.store.get_new_events_for_appservice(
                         self.current_max, limit
                     )
 
@@ -85,10 +84,9 @@ class ApplicationServicesHandler(object):
                     for event in events:
                         events_by_room.setdefault(event.room_id, []).append(event)
 
-                    @defer.inlineCallbacks
-                    def handle_event(event):
+                    async def handle_event(event):
                         # Gather interested services
-                        services = yield self._get_services_for_event(event)
+                        services = await self._get_services_for_event(event)
                         if len(services) == 0:
                             return  # no services need notifying
 
@@ -96,9 +94,9 @@ class ApplicationServicesHandler(object):
                         # query API for all services which match that user regex.
                         # This needs to block as these user queries need to be
                         # made BEFORE pushing the event.
-                        yield self._check_user_exists(event.sender)
+                        await self._check_user_exists(event.sender)
                         if event.type == EventTypes.Member:
-                            yield self._check_user_exists(event.state_key)
+                            await self._check_user_exists(event.state_key)
 
                         if not self.started_scheduler:
 
@@ -115,17 +113,16 @@ class ApplicationServicesHandler(object):
                             self.scheduler.submit_event_for_as(service, event)
 
                         now = self.clock.time_msec()
-                        ts = yield self.store.get_received_ts(event.event_id)
+                        ts = await self.store.get_received_ts(event.event_id)
                         synapse.metrics.event_processing_lag_by_event.labels(
                             "appservice_sender"
                         ).observe((now - ts) / 1000)
 
-                    @defer.inlineCallbacks
-                    def handle_room_events(events):
+                    async def handle_room_events(events):
                         for event in events:
-                            yield handle_event(event)
+                            await handle_event(event)
 
-                    yield make_deferred_yieldable(
+                    await make_deferred_yieldable(
                         defer.gatherResults(
                             [
                                 run_in_background(handle_room_events, evs)
@@ -135,10 +132,10 @@ class ApplicationServicesHandler(object):
                         )
                     )
 
-                    yield self.store.set_appservice_last_pos(upper_bound)
+                    await self.store.set_appservice_last_pos(upper_bound)
 
                     now = self.clock.time_msec()
-                    ts = yield self.store.get_received_ts(events[-1].event_id)
+                    ts = await self.store.get_received_ts(events[-1].event_id)
 
                     synapse.metrics.event_processing_positions.labels(
                         "appservice_sender"
@@ -161,8 +158,7 @@ class ApplicationServicesHandler(object):
             finally:
                 self.is_processing = False
 
-    @defer.inlineCallbacks
-    def query_user_exists(self, user_id):
+    async def query_user_exists(self, user_id):
         """Check if any application service knows this user_id exists.
 
         Args:
@@ -170,15 +166,14 @@ class ApplicationServicesHandler(object):
         Returns:
             True if this user exists on at least one application service.
         """
-        user_query_services = yield self._get_services_for_user(user_id=user_id)
+        user_query_services = self._get_services_for_user(user_id=user_id)
         for user_service in user_query_services:
-            is_known_user = yield self.appservice_api.query_user(user_service, user_id)
+            is_known_user = await self.appservice_api.query_user(user_service, user_id)
             if is_known_user:
                 return True
         return False
 
-    @defer.inlineCallbacks
-    def query_room_alias_exists(self, room_alias):
+    async def query_room_alias_exists(self, room_alias):
         """Check if an application service knows this room alias exists.
 
         Args:
@@ -193,19 +188,18 @@ class ApplicationServicesHandler(object):
             s for s in services if (s.is_interested_in_alias(room_alias_str))
         ]
         for alias_service in alias_query_services:
-            is_known_alias = yield self.appservice_api.query_alias(
+            is_known_alias = await self.appservice_api.query_alias(
                 alias_service, room_alias_str
             )
             if is_known_alias:
                 # the alias exists now so don't query more ASes.
-                result = yield self.store.get_association_from_room_alias(room_alias)
+                result = await self.store.get_association_from_room_alias(room_alias)
                 return result
 
-    @defer.inlineCallbacks
-    def query_3pe(self, kind, protocol, fields):
-        services = yield self._get_services_for_3pn(protocol)
+    async def query_3pe(self, kind, protocol, fields):
+        services = self._get_services_for_3pn(protocol)
 
-        results = yield make_deferred_yieldable(
+        results = await make_deferred_yieldable(
             defer.DeferredList(
                 [
                     run_in_background(
@@ -224,8 +218,7 @@ class ApplicationServicesHandler(object):
 
         return ret
 
-    @defer.inlineCallbacks
-    def get_3pe_protocols(self, only_protocol=None):
+    async def get_3pe_protocols(self, only_protocol=None):
         services = self.store.get_app_services()
         protocols = {}
 
@@ -238,7 +231,7 @@ class ApplicationServicesHandler(object):
                 if p not in protocols:
                     protocols[p] = []
 
-                info = yield self.appservice_api.get_3pe_protocol(s, p)
+                info = await self.appservice_api.get_3pe_protocol(s, p)
 
                 if info is not None:
                     protocols[p].append(info)
@@ -263,8 +256,7 @@ class ApplicationServicesHandler(object):
 
         return protocols
 
-    @defer.inlineCallbacks
-    def _get_services_for_event(self, event):
+    async def _get_services_for_event(self, event):
         """Retrieve a list of application services interested in this event.
 
         Args:
@@ -280,7 +272,7 @@ class ApplicationServicesHandler(object):
         # inside of a list comprehension anymore.
         interested_list = []
         for s in services:
-            if (yield s.is_interested(event, self.store)):
+            if await s.is_interested(event, self.store):
                 interested_list.append(s)
 
         return interested_list
@@ -288,21 +280,20 @@ class ApplicationServicesHandler(object):
     def _get_services_for_user(self, user_id):
         services = self.store.get_app_services()
         interested_list = [s for s in services if (s.is_interested_in_user(user_id))]
-        return defer.succeed(interested_list)
+        return interested_list
 
     def _get_services_for_3pn(self, protocol):
         services = self.store.get_app_services()
         interested_list = [s for s in services if s.is_interested_in_protocol(protocol)]
-        return defer.succeed(interested_list)
+        return interested_list
 
-    @defer.inlineCallbacks
-    def _is_unknown_user(self, user_id):
+    async def _is_unknown_user(self, user_id):
         if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             return False
 
-        user_info = yield self.store.get_user_by_id(user_id)
+        user_info = await self.store.get_user_by_id(user_id)
         if user_info:
             return False
 
@@ -311,10 +302,9 @@ class ApplicationServicesHandler(object):
         service_list = [s for s in services if s.sender == user_id]
         return len(service_list) == 0
 
-    @defer.inlineCallbacks
-    def _check_user_exists(self, user_id):
-        unknown_user = yield self._is_unknown_user(user_id)
+    async def _check_user_exists(self, user_id):
+        unknown_user = await self._is_unknown_user(user_id)
         if unknown_user:
-            exists = yield self.query_user_exists(user_id)
+            exists = await self.query_user_exists(user_id)
             return exists
         return True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d713a06bf9..a162392e4c 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,7 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 import time
 import unicodedata
@@ -24,7 +23,6 @@ import attr
 import bcrypt  # type: ignore[import]
 import pymacaroons
 
-import synapse.util.stringutils as stringutils
 from synapse.api.constants import LoginType
 from synapse.api.errors import (
     AuthError,
@@ -45,6 +43,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import ModuleApi
 from synapse.push.mailer import load_jinja2_templates
 from synapse.types import Requester, UserID
+from synapse.util import stringutils as stringutils
 from synapse.util.threepids import canonicalise_email
 
 from ._base import BaseHandler
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 76f213723a..d79ffefdb5 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -12,11 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 import urllib
-import xml.etree.ElementTree as ET
 from typing import Dict, Optional, Tuple
+from xml.etree import ElementTree as ET
 
 from twisted.web.client import PartialDownloadError
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3326535410..a94e467aef 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1577,7 +1577,7 @@ class FederationHandler(BaseHandler):
                 room_version,
                 event.get_pdu_json(),
                 self.hs.hostname,
-                self.hs.config.signing_key[0],
+                self.hs.signing_key,
             )
         )
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 7cb106e365..ecdb12a7bf 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -70,7 +70,7 @@ class GroupsLocalWorkerHandler(object):
         self.clock = hs.get_clock()
         self.keyring = hs.get_keyring()
         self.is_mine_id = hs.is_mine_id
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
         self.server_name = hs.hostname
         self.notifier = hs.get_notifier()
         self.attestations = hs.get_groups_attestation_signing()
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index a77088e295..88537a5be3 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -268,10 +268,10 @@ class IdentityHandler(BaseHandler):
         url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
         auth_headers = self.federation_http_client.build_auth_headers(
             destination=None,
-            method="POST",
+            method=b"POST",
             url_bytes=url_bytes,
             content=content,
-            destination_is=id_server,
+            destination_is=id_server.encode("ascii"),
         )
         headers = {b"Authorization": auth_headers}
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 665ad19b5d..da206e1ec1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Optional, Tuple
+from typing import TYPE_CHECKING, Optional, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
@@ -55,6 +55,9 @@ from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -349,7 +352,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
 
 
 class EventCreationHandler(object):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
@@ -814,11 +817,17 @@ class EventCreationHandler(object):
                 403, "This event is not allowed in this context", Codes.FORBIDDEN
             )
 
-        try:
-            await self.auth.check_from_context(room_version, event, context)
-        except AuthError as err:
-            logger.warning("Denying new event %r because %s", event, err)
-            raise err
+        if event.internal_metadata.is_out_of_band_membership():
+            # the only sort of out-of-band-membership events we expect to see here
+            # are invite rejections we have generated ourselves.
+            assert event.type == EventTypes.Member
+            assert event.content["membership"] == Membership.LEAVE
+        else:
+            try:
+                await self.auth.check_from_context(room_version, event, context)
+            except AuthError as err:
+                logger.warning("Denying new event %r because %s", event, err)
+                raise err
 
         # Ensure that we can round trip before trying to persist in db
         try:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index d585114f2d..c3fee116c2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1,7 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2016-2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,17 +16,21 @@
 import abc
 import logging
 from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple, Union
+
+from unpaddedbase64 import encode_base64
 
 from synapse import types
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
+from synapse.api.room_versions import EventFormatVersions
+from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase
+from synapse.events.builder import create_local_event_from_event_dict
 from synapse.events.snapshot import EventContext
-from synapse.replication.http.membership import (
-    ReplicationLocallyRejectInviteRestServlet,
-)
-from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
+from synapse.events.validator import EventValidator
+from synapse.storage.roommember import RoomsForUser
+from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
 
@@ -75,10 +77,6 @@ class RoomMemberHandler(object):
         )
         if self._is_on_event_persistence_instance:
             self.persist_event_storage = hs.get_storage().persistence
-        else:
-            self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
-                hs
-            )
 
         # This is only used to get at ratelimit function, and
         # maybe_kick_guest_users. It's fine there are multiple of these as
@@ -106,46 +104,28 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
-        content: dict,
-    ) -> Tuple[Optional[str], int]:
-        """Attempt to reject an invite for a room this server is not in. If we
-        fail to do so we locally mark the invite as rejected.
+        content: JsonDict,
+    ) -> Tuple[str, int]:
+        """
+        Rejects an out-of-band invite we have received from a remote server
 
         Args:
-            requester
-            remote_room_hosts: List of servers to use to try and reject invite
-            room_id
-            target: The user rejecting the invite
-            content: The content for the rejection event
+            invite_event_id: ID of the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester: user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
 
         Returns:
-            A dictionary to be returned to the client, may
-            include event_id etc, or nothing if we locally rejected
+            event id, stream_id of the leave event
         """
         raise NotImplementedError()
 
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-        if self._is_on_event_persistence_instance:
-            return await self.persist_event_storage.locally_reject_invite(
-                user_id, room_id
-            )
-        else:
-            result = await self._locally_reject_client(
-                instance_name=self._event_stream_writer_instance,
-                user_id=user_id,
-                room_id=room_id,
-            )
-            return result["stream_id"]
-
     @abc.abstractmethod
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Notifies distributor on master process that the user has joined the
@@ -290,7 +270,7 @@ class RoomMemberHandler(object):
         content: Optional[dict] = None,
         new_room: bool = False,
         require_consent: bool = True,
-    ) -> Tuple[Optional[str], int]:
+    ) -> Tuple[str, int]:
         """Update a user's membership in a room"""
         key = (room_id,)
 
@@ -324,7 +304,7 @@ class RoomMemberHandler(object):
         content: Optional[dict] = None,
         new_room: bool = False,
         require_consent: bool = True,
-    ) -> Tuple[Optional[str], int]:
+    ) -> Tuple[str, int]:
         content_specified = bool(content)
         if content is None:
             content = {}
@@ -516,11 +496,17 @@ class RoomMemberHandler(object):
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
                 # perhaps we've been invited
-                inviter = await self._get_inviter(target.to_string(), room_id)
-                if not inviter:
+                invite = await self.store.get_invite_for_local_user_in_room(
+                    user_id=target.to_string(), room_id=room_id
+                )  # type: Optional[RoomsForUser]
+                if not invite:
                     raise SynapseError(404, "Not a known room")
 
-                if self.hs.is_mine(inviter):
+                logger.info(
+                    "%s rejects invite to %s from %s", target, room_id, invite.sender
+                )
+
+                if self.hs.is_mine_id(invite.sender):
                     # the inviter was on our server, but has now left. Carry on
                     # with the normal rejection codepath.
                     #
@@ -528,10 +514,10 @@ class RoomMemberHandler(object):
                     # active on other servers.
                     pass
                 else:
-                    # send the rejection to the inviter's HS.
-                    remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    return await self._remote_reject_invite(
-                        requester, remote_room_hosts, room_id, target, content,
+                    # send the rejection to the inviter's HS (with fallback to
+                    # local event)
+                    return await self.remote_reject_invite(
+                        invite.event_id, txn_id, requester, content,
                     )
 
         return await self._local_membership_update(
@@ -1069,33 +1055,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         return event_id, stream_id
 
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
-        content: dict,
-    ) -> Tuple[Optional[str], int]:
-        """Implements RoomMemberHandler._remote_reject_invite
+        content: JsonDict,
+    ) -> Tuple[str, int]:
+        """
+        Rejects an out-of-band invite received from a remote user
+
+        Implements RoomMemberHandler.remote_reject_invite
         """
+        invite_event = await self.store.get_event(invite_event_id)
+        room_id = invite_event.room_id
+        target_user = invite_event.state_key
+
+        # first of all, try doing a rejection via the inviting server
         fed_handler = self.federation_handler
         try:
+            inviter_id = UserID.from_string(invite_event.sender)
             event, stream_id = await fed_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, target.to_string(), content=content,
+                [inviter_id.domain], room_id, target_user, content=content
             )
             return event.event_id, stream_id
         except Exception as e:
-            # if we were unable to reject the exception, just mark
-            # it as rejected on our end and plough ahead.
+            # if we were unable to reject the invite, we will generate our own
+            # leave event.
             #
             # The 'except' clause is very broad, but we need to
             # capture everything from DNS failures upwards
             #
             logger.warning("Failed to reject invite: %s", e)
 
-            stream_id = await self.locally_reject_invite(target.to_string(), room_id)
-            return None, stream_id
+            return await self._locally_reject_invite(
+                invite_event, txn_id, requester, content
+            )
+
+    async def _locally_reject_invite(
+        self,
+        invite_event: EventBase,
+        txn_id: Optional[str],
+        requester: Requester,
+        content: JsonDict,
+    ) -> Tuple[str, int]:
+        """Generate a local invite rejection
+
+        This is called after we fail to reject an invite via a remote server. It
+        generates an out-of-band membership event locally.
+
+        Args:
+            invite_event: the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester:  user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
+        """
+
+        room_id = invite_event.room_id
+        target_user = invite_event.state_key
+        room_version = await self.store.get_room_version(room_id)
+
+        content["membership"] = Membership.LEAVE
+
+        # the auth events for the new event are the same as that of the invite, plus
+        # the invite itself.
+        #
+        # the prev_events are just the invite.
+        invite_hash = invite_event.event_id  # type: Union[str, Tuple]
+        if room_version.event_format == EventFormatVersions.V1:
+            alg, h = compute_event_reference_hash(invite_event)
+            invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
+
+        auth_events = tuple(invite_event.auth_events) + (invite_hash,)
+        prev_events = (invite_hash,)
+
+        # we cap depth of generated events, to ensure that they are not
+        # rejected by other servers (and so that they can be persisted in
+        # the db)
+        depth = min(invite_event.depth + 1, MAX_DEPTH)
+
+        event_dict = {
+            "depth": depth,
+            "auth_events": auth_events,
+            "prev_events": prev_events,
+            "type": EventTypes.Member,
+            "room_id": room_id,
+            "sender": target_user,
+            "content": content,
+            "state_key": target_user,
+        }
+
+        event = create_local_event_from_event_dict(
+            clock=self.clock,
+            hostname=self.hs.hostname,
+            signing_key=self.hs.signing_key,
+            room_version=room_version,
+            event_dict=event_dict,
+        )
+        event.internal_metadata.outlier = True
+        event.internal_metadata.out_of_band_membership = True
+        if txn_id is not None:
+            event.internal_metadata.txn_id = txn_id
+        if requester.access_token_id is not None:
+            event.internal_metadata.token_id = requester.access_token_id
+
+        EventValidator().validate_new(event, self.config)
+
+        context = await self.state_handler.compute_event_context(event)
+        context.app_service = requester.app_service
+        stream_id = await self.event_creation_handler.handle_new_client_event(
+            requester, event, context, extra_users=[UserID.from_string(target_user)],
+        )
+        return event.event_id, stream_id
 
     async def _user_joined_room(self, target: UserID, room_id: str) -> None:
         """Implements RoomMemberHandler._user_joined_room
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 02e0c4103d..897338fd54 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -61,21 +61,22 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
 
         return ret["event_id"], ret["stream_id"]
 
-    async def _remote_reject_invite(
+    async def remote_reject_invite(
         self,
+        invite_event_id: str,
+        txn_id: Optional[str],
         requester: Requester,
-        remote_room_hosts: List[str],
-        room_id: str,
-        target: UserID,
         content: dict,
-    ) -> Tuple[Optional[str], int]:
-        """Implements RoomMemberHandler._remote_reject_invite
+    ) -> Tuple[str, int]:
+        """
+        Rejects an out-of-band invite received from a remote user
+
+        Implements RoomMemberHandler.remote_reject_invite
         """
         ret = await self._remote_reject_client(
+            invite_event_id=invite_event_id,
+            txn_id=txn_id,
             requester=requester,
-            remote_room_hosts=remote_room_hosts,
-            room_id=room_id,
-            user_id=target.to_string(),
             content=content,
         )
         return ret["event_id"], ret["stream_id"]
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 6c7abaa578..879c4c07c6 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -294,6 +294,9 @@ class TypingHandler(object):
         rows.sort()
 
         limited = False
+        # We, unusually, use a strict limit here as we have all the rows in
+        # memory rather than pulling them out of the database with a `LIMIT ?`
+        # clause.
         if len(rows) > limit:
             rows = rows[:limit]
             current_id = rows[-1][0]
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 18f6a8fd29..148eeb19dc 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -176,7 +176,7 @@ class MatrixFederationHttpClient(object):
 
     def __init__(self, hs, tls_client_options_factory):
         self.hs = hs
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
         self.server_name = hs.hostname
 
         real_reactor = hs.get_reactor()
@@ -562,13 +562,17 @@ class MatrixFederationHttpClient(object):
         Returns:
             list[bytes]: a list of headers to be added as "Authorization:" headers
         """
-        request = {"method": method, "uri": url_bytes, "origin": self.server_name}
+        request = {
+            "method": method.decode("ascii"),
+            "uri": url_bytes.decode("ascii"),
+            "origin": self.server_name,
+        }
 
         if destination is not None:
-            request["destination"] = destination
+            request["destination"] = destination.decode("ascii")
 
         if destination_is is not None:
-            request["destination_is"] = destination_is
+            request["destination_is"] = destination_is.decode("ascii")
 
         if content is not None:
             request["content"] = content
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 1676771ef0..c6c0e623c1 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -164,7 +164,6 @@ Gotchas
   than one caller? Will all of those calling functions have be in a context
   with an active span?
 """
-
 import contextlib
 import inspect
 import logging
@@ -180,8 +179,8 @@ from twisted.internet import defer
 from synapse.config import ConfigError
 
 if TYPE_CHECKING:
-    from synapse.server import HomeServer
     from synapse.http.site import SynapseRequest
+    from synapse.server import HomeServer
 
 # Helper class
 
@@ -227,6 +226,7 @@ except ImportError:
     tags = _DummyTagNames
 try:
     from jaeger_client import Config as JaegerConfig
+
     from synapse.logging.scopecontextmanager import LogContextScopeManager
 except ImportError:
     JaegerConfig = None  # type: ignore
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 13785038ad..a9269196b3 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Dict, Optional, Set
 from prometheus_client.core import REGISTRY, Counter, Gauge
 
 from twisted.internet import defer
+from twisted.python.failure import Failure
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
 
@@ -212,7 +213,14 @@ def run_as_background_process(desc, func, *args, **kwargs):
 
                 return (yield result)
             except Exception:
-                logger.exception("Background process '%s' threw an exception", desc)
+                # failure.Failure() fishes the original Failure out of our stack, and
+                # thus gives us a sensible stack trace.
+                f = Failure()
+                logger.error(
+                    "Background process '%s' threw an exception",
+                    desc,
+                    exc_info=(f.type, f.value, f.getTracebackObject()),
+                )
             finally:
                 _background_process_in_flight_count.labels(desc).dec()
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 87c120a59c..bd41f77852 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -83,7 +83,7 @@ class _NotifierUserStream(object):
         self.current_token = current_token
 
         # The last token for which we should wake up any streams that have a
-        # token that comes before it. This gets updated everytime we get poked.
+        # token that comes before it. This gets updated every time we get poked.
         # We start it at the current token since if we get any streams
         # that have a token from before we have no idea whether they should be
         # woken up or not, so lets just wake them up.
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 8e0d3a416d..2d79ada189 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -16,7 +16,7 @@
 
 import logging
 import re
-from typing import Pattern
+from typing import Any, Dict, List, Pattern, Union
 
 from synapse.events import EventBase
 from synapse.types import UserID
@@ -72,13 +72,36 @@ def _test_ineq_condition(condition, number):
         return False
 
 
-def tweaks_for_actions(actions):
+def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
+    """
+    Converts a list of actions into a `tweaks` dict (which can then be passed to
+        the push gateway).
+
+    This function ignores all actions other than `set_tweak` actions, and treats
+    absent `value`s as `True`, which agrees with the only spec-defined treatment
+    of absent `value`s (namely, for `highlight` tweaks).
+
+    Args:
+        actions: list of actions
+            e.g. [
+                {"set_tweak": "a", "value": "AAA"},
+                {"set_tweak": "b", "value": "BBB"},
+                {"set_tweak": "highlight"},
+                "notify"
+            ]
+
+    Returns:
+        dictionary of tweaks for those actions
+            e.g. {"a": "AAA", "b": "BBB", "highlight": True}
+    """
     tweaks = {}
     for a in actions:
         if not isinstance(a, dict):
             continue
-        if "set_tweak" in a and "value" in a:
-            tweaks[a["set_tweak"]] = a["value"]
+        if "set_tweak" in a:
+            # value is allowed to be absent in which case the value assumed
+            # should be True.
+            tweaks[a["set_tweak"]] = a.get("value", True)
     return tweaks
 
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index b1cac901eb..8cfcdb0573 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,7 +66,7 @@ REQUIREMENTS = [
     "pymacaroons>=0.13.0",
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
-    "prometheus_client>=0.0.18,<0.8.0",
+    "prometheus_client>=0.0.18,<0.9.0",
     # we use attr.validators.deep_iterable, which arrived in 19.1.0
     "attrs>=19.1.0",
     "netaddr>=0.7.18",
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 0843d28d4b..fb0dd04f88 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -92,11 +92,11 @@ class ReplicationEndpoint(object):
         # assert here that sub classes don't try and use the name.
         assert (
             "instance_name" not in self.PATH_ARGS
-        ), "`instance_name` is a reserved paramater name"
+        ), "`instance_name` is a reserved parameter name"
         assert (
             "instance_name"
             not in signature(self.__class__._serialize_payload).parameters
-        ), "`instance_name` is a reserved paramater name"
+        ), "`instance_name` is a reserved parameter name"
 
         assert self.METHOD in ("PUT", "POST", "GET")
 
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index a7174c4a8f..63ef6eb7be 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,11 +14,11 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
 
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import Requester, UserID
+from synapse.types import JsonDict, Requester, UserID
 from synapse.util.distributor import user_joined_room, user_left_room
 
 if TYPE_CHECKING:
@@ -88,49 +88,54 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
 
 
 class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
-    """Rejects the invite for the user and room.
+    """Rejects an out-of-band invite we have received from a remote server
 
     Request format:
 
-        POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
+        POST /_synapse/replication/remote_reject_invite/:event_id
 
         {
+            "txn_id": ...,
             "requester": ...,
-            "remote_room_hosts": [...],
             "content": { ... }
         }
     """
 
     NAME = "remote_reject_invite"
-    PATH_ARGS = ("room_id", "user_id")
+    PATH_ARGS = ("invite_event_id",)
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
 
-        self.federation_handler = hs.get_handlers().federation_handler
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
         self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
-    def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content):
+    def _serialize_payload(  # type: ignore
+        invite_event_id: str,
+        txn_id: Optional[str],
+        requester: Requester,
+        content: JsonDict,
+    ):
         """
         Args:
-            requester(Requester)
-            room_id (str)
-            user_id (str)
-            remote_room_hosts (list[str]): Servers to try and reject via
+            invite_event_id: ID of the invite to be rejected
+            txn_id: optional transaction ID supplied by the client
+            requester: user making the rejection request, according to the access token
+            content: additional content to include in the rejection event.
+               Normally an empty dict.
         """
         return {
+            "txn_id": txn_id,
             "requester": requester.serialize(),
-            "remote_room_hosts": remote_room_hosts,
             "content": content,
         }
 
-    async def _handle_request(self, request, room_id, user_id):
+    async def _handle_request(self, request, invite_event_id):
         content = parse_json_object_from_request(request)
 
-        remote_room_hosts = content["remote_room_hosts"]
+        txn_id = content["txn_id"]
         event_content = content["content"]
 
         requester = Requester.deserialize(self.store, content["requester"])
@@ -138,60 +143,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         if requester.user:
             request.authenticated_entity = requester.user.to_string()
 
-        logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
-
-        try:
-            event, stream_id = await self.federation_handler.do_remotely_reject_invite(
-                remote_room_hosts, room_id, user_id, event_content,
-            )
-            event_id = event.event_id
-        except Exception as e:
-            # if we were unable to reject the exception, just mark
-            # it as rejected on our end and plough ahead.
-            #
-            # The 'except' clause is very broad, but we need to
-            # capture everything from DNS failures upwards
-            #
-            logger.warning("Failed to reject invite: %s", e)
-
-            stream_id = await self.member_handler.locally_reject_invite(
-                user_id, room_id
-            )
-            event_id = None
+        # hopefully we're now on the master, so this won't recurse!
+        event_id, stream_id = await self.member_handler.remote_reject_invite(
+            invite_event_id, txn_id, requester, event_content,
+        )
 
         return 200, {"event_id": event_id, "stream_id": stream_id}
 
 
-class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
-    """Rejects the invite for the user and room locally.
-
-    Request format:
-
-        POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
-
-        {}
-    """
-
-    NAME = "locally_reject_invite"
-    PATH_ARGS = ("room_id", "user_id")
-
-    def __init__(self, hs: "HomeServer"):
-        super().__init__(hs)
-
-        self.member_handler = hs.get_room_member_handler()
-
-    @staticmethod
-    def _serialize_payload(room_id, user_id):
-        return {}
-
-    async def _handle_request(self, request, room_id, user_id):
-        logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)
-
-        stream_id = await self.member_handler.locally_reject_invite(user_id, room_id)
-
-        return 200, {"stream_id": stream_id}
-
-
 class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
     """Notifies that a user has joined or left the room
 
@@ -245,4 +204,3 @@ def register_servlets(hs, http_server):
     ReplicationRemoteJoinRestServlet(hs).register(http_server)
     ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
     ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
-    ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)
diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py
index 523a1358d4..1b8718b11d 100644
--- a/synapse/replication/tcp/__init__.py
+++ b/synapse/replication/tcp/__init__.py
@@ -25,7 +25,7 @@ Structure of the module:
  * command.py  - the definitions of all the valid commands
  * protocol.py - the TCP protocol classes
  * resource.py - handles streaming stream updates to replications
- * streams/    - the definitons of all the valid streams
+ * streams/    - the definitions of all the valid streams
 
 
 The general interaction of the classes are:
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index df29732f51..4985e40b1f 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -33,8 +33,8 @@ from synapse.util.async_helpers import timeout_deferred
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
-    from synapse.server import HomeServer
     from synapse.replication.tcp.handler import ReplicationCommandHandler
+    from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index ea5937a20c..ccc7f1f0d1 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -18,18 +18,11 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
 allowed to be sent by which side.
 """
 import abc
+import json
 import logging
-import platform
 from typing import Tuple, Type
 
-if platform.python_implementation() == "PyPy":
-    import json
-
-    _json_encoder = json.JSONEncoder()
-else:
-    import simplejson as json  # type: ignore[no-redef]  # noqa: F821
-
-    _json_encoder = json.JSONEncoder(namedtuple_as_object=False)  # type: ignore[call-arg]  # noqa: F821
+_json_encoder = json.JSONEncoder()
 
 logger = logging.getLogger(__name__)
 
@@ -54,7 +47,7 @@ class Command(metaclass=abc.ABCMeta):
 
     @abc.abstractmethod
     def to_line(self) -> str:
-        """Serialises the comamnd for the wire. Does not include the command
+        """Serialises the command for the wire. Does not include the command
         prefix.
         """
 
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e6a2e2598b..55b3b79008 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -13,7 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar
 
@@ -149,10 +148,11 @@ class ReplicationCommandHandler:
         using TCP.
         """
         if hs.config.redis.redis_enabled:
+            import txredisapi
+
             from synapse.replication.tcp.redis import (
                 RedisDirectTcpReplicationClientFactory,
             )
-            import txredisapi
 
             logger.info(
                 "Connecting to redis (host=%r port=%r)",
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 4198eece71..ca47f5cc88 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -317,7 +317,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
     def _queue_command(self, cmd):
         """Queue the command until the connection is ready to write to again.
         """
-        logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd)
+        logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd)
         self.pending_commands.append(cmd)
 
         if len(self.pending_commands) > self.max_line_buffer:
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index e776b63183..0a7e7f67be 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -177,7 +177,7 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
     Args:
         hs
         outbound_redis_connection: A connection to redis that will be used to
-            send outbound commands (this is seperate to the redis connection
+            send outbound commands (this is separate to the redis connection
             used to subscribe).
     """
 
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f196eff072..9076bbe9f1 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -198,26 +198,6 @@ def current_token_without_instance(
     return lambda instance_name: current_token()
 
 
-def db_query_to_update_function(
-    query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]]
-) -> UpdateFunction:
-    """Wraps a db query function which returns a list of rows to make it
-    suitable for use as an `update_function` for the Stream class
-    """
-
-    async def update_function(instance_name, from_token, upto_token, limit):
-        rows = await query_function(from_token, upto_token, limit)
-        updates = [(row[0], row[1:]) for row in rows]
-        limited = False
-        if len(updates) >= limit:
-            upto_token = updates[-1][0]
-            limited = True
-
-        return updates, upto_token, limited
-
-    return update_function
-
-
 def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
     """Makes a suitable function for use as an `update_function` that queries
     the master process for updates.
@@ -393,7 +373,7 @@ class PushersStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_pushers_stream_token),
-            db_query_to_update_function(store.get_all_updated_pushers_rows),
+            store.get_all_updated_pushers_rows,
         )
 
 
@@ -421,26 +401,12 @@ class CachesStream(Stream):
     ROW_TYPE = CachesStreamRow
 
     def __init__(self, hs):
-        self.store = hs.get_datastore()
+        store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            self.store.get_cache_stream_token,
-            self._update_function,
-        )
-
-    async def _update_function(
-        self, instance_name: str, from_token: int, upto_token: int, limit: int
-    ):
-        rows = await self.store.get_all_updated_caches(
-            instance_name, from_token, upto_token, limit
+            store.get_cache_stream_token,
+            store.get_all_updated_caches,
         )
-        updates = [(row[0], row[1:]) for row in rows]
-        limited = False
-        if len(updates) >= limit:
-            upto_token = updates[-1][0]
-            limited = True
-
-        return updates, upto_token, limited
 
 
 class PublicRoomsStream(Stream):
@@ -465,7 +431,7 @@ class PublicRoomsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_current_public_room_stream_id),
-            db_query_to_update_function(store.get_all_new_public_rooms),
+            store.get_all_new_public_rooms,
         )
 
 
@@ -486,7 +452,7 @@ class DeviceListsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_device_stream_token),
-            db_query_to_update_function(store.get_all_device_list_changes_for_remotes),
+            store.get_all_device_list_changes_for_remotes,
         )
 
 
@@ -504,7 +470,7 @@ class ToDeviceStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_to_device_stream_token),
-            db_query_to_update_function(store.get_all_new_device_messages),
+            store.get_all_new_device_messages,
         )
 
 
@@ -524,7 +490,7 @@ class TagAccountDataStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_max_account_data_stream_id),
-            db_query_to_update_function(store.get_all_updated_tags),
+            store.get_all_updated_tags,
         )
 
 
@@ -612,7 +578,7 @@ class GroupServerStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_group_stream_token),
-            db_query_to_update_function(store.get_all_groups_changes),
+            store.get_all_groups_changes,
         )
 
 
@@ -630,7 +596,5 @@ class UserSignatureStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_device_stream_token),
-            db_query_to_update_function(
-                store.get_all_user_signature_changes_for_remotes
-            ),
+            store.get_all_user_signature_changes_for_remotes,
         )
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index f370390331..1c2a4cce7f 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -13,7 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import heapq
 from collections import Iterable
 from typing import List, Tuple, Type
@@ -22,7 +21,6 @@ import attr
 
 from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
 
-
 """Handling of the 'events' replication stream
 
 This stream contains rows of various types. Each row therefore contains a 'type'
@@ -64,7 +62,7 @@ class BaseEventsStreamRow(object):
     Specifies how to identify, serialize and deserialize the different types.
     """
 
-    # Unique string that ids the type. Must be overriden in sub classes.
+    # Unique string that ids the type. Must be overridden in sub classes.
     TypeId = None  # type: str
 
     @classmethod
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index f6eef7afee..64d5c58b65 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+from typing import Awaitable, Callable, Dict, Optional
 
 from synapse.api.errors import Codes, LoginError, SynapseError
 from synapse.api.ratelimiting import Ratelimiter
@@ -26,7 +27,7 @@ from synapse.http.servlet import (
 from synapse.http.site import SynapseRequest
 from synapse.rest.client.v2_alpha._base import client_patterns
 from synapse.rest.well_known import WellKnownBuilder
-from synapse.types import UserID
+from synapse.types import JsonDict, UserID
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.threepids import canonicalise_email
 
@@ -114,7 +115,7 @@ class LoginRestServlet(RestServlet):
             burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
         )
 
-    def on_GET(self, request):
+    def on_GET(self, request: SynapseRequest):
         flows = []
         if self.jwt_enabled:
             flows.append({"type": LoginRestServlet.JWT_TYPE})
@@ -142,10 +143,10 @@ class LoginRestServlet(RestServlet):
 
         return 200, {"flows": flows}
 
-    def on_OPTIONS(self, request):
+    def on_OPTIONS(self, request: SynapseRequest):
         return 200, {}
 
-    async def on_POST(self, request):
+    async def on_POST(self, request: SynapseRequest):
         self._address_ratelimiter.ratelimit(request.getClientIP())
 
         login_submission = parse_json_object_from_request(request)
@@ -154,9 +155,9 @@ class LoginRestServlet(RestServlet):
                 login_submission["type"] == LoginRestServlet.JWT_TYPE
                 or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED
             ):
-                result = await self.do_jwt_login(login_submission)
+                result = await self._do_jwt_login(login_submission)
             elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE:
-                result = await self.do_token_login(login_submission)
+                result = await self._do_token_login(login_submission)
             else:
                 result = await self._do_other_login(login_submission)
         except KeyError:
@@ -167,14 +168,14 @@ class LoginRestServlet(RestServlet):
             result["well_known"] = well_known_data
         return 200, result
 
-    async def _do_other_login(self, login_submission):
+    async def _do_other_login(self, login_submission: JsonDict) -> Dict[str, str]:
         """Handle non-token/saml/jwt logins
 
         Args:
             login_submission:
 
         Returns:
-            dict: HTTP response
+            HTTP response
         """
         # Log the request we got, but only certain fields to minimise the chance of
         # logging someone's password (even if they accidentally put it in the wrong
@@ -292,25 +293,30 @@ class LoginRestServlet(RestServlet):
         return result
 
     async def _complete_login(
-        self, user_id, login_submission, callback=None, create_non_existent_users=False
-    ):
+        self,
+        user_id: str,
+        login_submission: JsonDict,
+        callback: Optional[
+            Callable[[Dict[str, str]], Awaitable[Dict[str, str]]]
+        ] = None,
+        create_non_existent_users: bool = False,
+    ) -> Dict[str, str]:
         """Called when we've successfully authed the user and now need to
         actually login them in (e.g. create devices). This gets called on
-        all succesful logins.
+        all successful logins.
 
-        Applies the ratelimiting for succesful login attempts against an
+        Applies the ratelimiting for successful login attempts against an
         account.
 
         Args:
-            user_id (str): ID of the user to register.
-            login_submission (dict): Dictionary of login information.
-            callback (func|None): Callback function to run after registration.
-            create_non_existent_users (bool): Whether to create the user if
-                they don't exist. Defaults to False.
+            user_id: ID of the user to register.
+            login_submission: Dictionary of login information.
+            callback: Callback function to run after registration.
+            create_non_existent_users: Whether to create the user if they don't
+                exist. Defaults to False.
 
         Returns:
-            result (Dict[str,str]): Dictionary of account information after
-                successful registration.
+            result: Dictionary of account information after successful registration.
         """
 
         # Before we actually log them in we check if they've already logged in
@@ -344,7 +350,7 @@ class LoginRestServlet(RestServlet):
 
         return result
 
-    async def do_token_login(self, login_submission):
+    async def _do_token_login(self, login_submission: JsonDict) -> Dict[str, str]:
         token = login_submission["token"]
         auth_handler = self.auth_handler
         user_id = await auth_handler.validate_short_term_login_token_and_get_user_id(
@@ -354,7 +360,7 @@ class LoginRestServlet(RestServlet):
         result = await self._complete_login(user_id, login_submission)
         return result
 
-    async def do_jwt_login(self, login_submission):
+    async def _do_jwt_login(self, login_submission: JsonDict) -> Dict[str, str]:
         token = login_submission.get("token", None)
         if token is None:
             raise LoginError(
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index bbf98a5cfe..01b80b86fa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -217,10 +217,8 @@ class RoomStateEventRestServlet(TransactionRestServlet):
             )
             event_id = event.event_id
 
-        ret = {}  # type: dict
-        if event_id:
-            set_tag("event_id", event_id)
-            ret = {"event_id": event_id}
+        set_tag("event_id", event_id)
+        ret = {"event_id": event_id}
         return 200, ret
 
 
diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py
index 747d46eac2..50277c6cf6 100644
--- a/synapse/rest/client/v1/voip.py
+++ b/synapse/rest/client/v1/voip.py
@@ -50,7 +50,7 @@ class VoipRestServlet(RestServlet):
             # We need to use standard padded base64 encoding here
             # encode_base64 because we need to add the standard padding to get the
             # same result as the TURN server.
-            password = base64.b64encode(mac.digest())
+            password = base64.b64encode(mac.digest()).decode("ascii")
 
         elif turnUris and turnUsername and turnPassword and userLifetime:
             username = turnUsername
diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py
index c234ea7421..7126997134 100644
--- a/synapse/rest/media/v1/thumbnailer.py
+++ b/synapse/rest/media/v1/thumbnailer.py
@@ -12,11 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
 from io import BytesIO
 
-import PIL.Image as Image
+from PIL import Image as Image
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/secrets.py b/synapse/secrets.py
index 0b327a0f82..5f43f81eb0 100644
--- a/synapse/secrets.py
+++ b/synapse/secrets.py
@@ -19,7 +19,6 @@ Injectable secrets module for Synapse.
 See https://docs.python.org/3/library/secrets.html#module-secrets for the API
 used in Python 3.6, and the API emulated in Python 2.7.
 """
-
 import sys
 
 # secrets is available since python 3.6
@@ -31,8 +30,8 @@ if sys.version_info[0:2] >= (3, 6):
 
 
 else:
-    import os
     import binascii
+    import os
 
     class Secrets(object):
         def token_bytes(self, nbytes=32):
diff --git a/synapse/server.py b/synapse/server.py
index d14b6b722c..f1078a3805 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -234,6 +234,8 @@ class HomeServer(object):
 
         self._reactor = reactor
         self.hostname = hostname
+        # the key we use to sign events and requests
+        self.signing_key = config.key.signing_key[0]
         self.config = config
         self._building = {}
         self._listening_services = []
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index d30766e543..f39f556c20 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -16,7 +16,7 @@
 
 import itertools
 import logging
-from typing import Any, Iterable, Optional, Tuple
+from typing import Any, Iterable, List, Optional, Tuple
 
 from synapse.api.constants import EventTypes
 from synapse.replication.tcp.streams import BackfillStream, CachesStream
@@ -46,13 +46,30 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
     async def get_all_updated_caches(
         self, instance_name: str, last_id: int, current_id: int, limit: int
-    ):
-        """Fetches cache invalidation rows between the two given IDs written
-        by the given instance. Returns at most `limit` rows.
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for caches replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         def get_all_updated_caches_txn(txn):
             # We purposefully don't bound by the current token, as we want to
@@ -66,7 +83,14 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (last_id, instance_name, limit))
-            return txn.fetchall()
+            updates = [(row[0], row[1:]) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
 
         return await self.db.runInteraction(
             "get_all_updated_caches", get_all_updated_caches_txn
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index 9a1178fb39..d313b9705f 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -207,31 +208,46 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
         )
 
-    def get_all_new_device_messages(self, last_pos, current_pos, limit):
-        """
+    async def get_all_new_device_messages(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for to device replication stream.
+
         Args:
-            last_pos(int):
-            current_pos(int):
-            limit(int):
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
         Returns:
-            A deferred list of rows from the device inbox
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
-        if last_pos == current_pos:
-            return defer.succeed([])
+
+        if last_id == current_id:
+            return [], current_id, False
 
         def get_all_new_device_messages_txn(txn):
             # We limit like this as we might have multiple rows per stream_id, and
             # we want to make sure we always get all entries for any stream_id
             # we return.
-            upper_pos = min(current_pos, last_pos + limit)
+            upper_pos = min(current_id, last_id + limit)
             sql = (
                 "SELECT max(stream_id), user_id"
                 " FROM device_inbox"
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " GROUP BY user_id"
             )
-            txn.execute(sql, (last_pos, upper_pos))
-            rows = txn.fetchall()
+            txn.execute(sql, (last_id, upper_pos))
+            updates = [(row[0], row[1:]) for row in txn]
 
             sql = (
                 "SELECT max(stream_id), destination"
@@ -239,15 +255,21 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " WHERE ? < stream_id AND stream_id <= ?"
                 " GROUP BY destination"
             )
-            txn.execute(sql, (last_pos, upper_pos))
-            rows.extend(txn)
+            txn.execute(sql, (last_id, upper_pos))
+            updates.extend((row[0], row[1:]) for row in txn)
 
             # Order by ascending stream ordering
-            rows.sort()
+            updates.sort()
 
-            return rows
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
 
-        return self.db.runInteraction(
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_new_device_messages", get_all_new_device_messages_txn
         )
 
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 0ff0542453..343cf9a2d5 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -582,32 +582,58 @@ class DeviceWorkerStore(SQLBaseStore):
             return set()
 
     async def get_all_device_list_changes_for_remotes(
-        self, from_key: int, to_key: int, limit: int,
-    ) -> List[Tuple[int, str]]:
-        """Return a list of `(stream_id, entity)` which is the combined list of
-        changes to devices and which destinations need to be poked. Entity is
-        either a user ID (starting with '@') or a remote destination.
-        """
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for device lists replication stream.
 
-        # This query Does The Right Thing where it'll correctly apply the
-        # bounds to the inner queries.
-        sql = """
-            SELECT stream_id, entity FROM (
-                SELECT stream_id, user_id AS entity FROM device_lists_stream
-                UNION ALL
-                SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
-            ) AS e
-            WHERE ? < stream_id AND stream_id <= ?
-            LIMIT ?
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
-        return await self.db.execute(
+        if last_id == current_id:
+            return [], current_id, False
+
+        def _get_all_device_list_changes_for_remotes(txn):
+            # This query Does The Right Thing where it'll correctly apply the
+            # bounds to the inner queries.
+            sql = """
+                SELECT stream_id, entity FROM (
+                    SELECT stream_id, user_id AS entity FROM device_lists_stream
+                    UNION ALL
+                    SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes
+                ) AS e
+                WHERE ? < stream_id AND stream_id <= ?
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_id, current_id, limit))
+            updates = [(row[0], row[1:]) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_device_list_changes_for_remotes",
-            None,
-            sql,
-            from_key,
-            to_key,
-            limit,
+            _get_all_device_list_changes_for_remotes,
         )
 
     @cached(max_entries=10000)
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 1a0842d4b0..6c3cff82e1 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -14,7 +14,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Dict, List
+from typing import Dict, List, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
@@ -479,34 +479,61 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return result
 
-    def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit):
-        """Return a list of changes from the user signature stream to notify remotes.
+    async def get_all_user_signature_changes_for_remotes(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for groups replication stream.
+
         Note that the user signature stream represents when a user signs their
         device with their user-signing key, which is not published to other
         users or servers, so no `destination` is needed in the returned
         list. However, this is needed to poke workers.
 
         Args:
-            from_key (int): the stream ID to start at (exclusive)
-            to_key (int): the stream ID to end at (inclusive)
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
 
         Returns:
-            Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
-        """
-        sql = """
-            SELECT stream_id, from_user_id AS user_id
-            FROM user_signature_stream
-            WHERE ? < stream_id AND stream_id <= ?
-            ORDER BY stream_id ASC
-            LIMIT ?
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
-        return self.db.execute(
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def _get_all_user_signature_changes_for_remotes_txn(txn):
+            sql = """
+                SELECT stream_id, from_user_id AS user_id
+                FROM user_signature_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
+
+            updates = [(row[0], (row[1:])) for row in txn]
+
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_user_signature_changes_for_remotes",
-            None,
-            sql,
-            from_key,
-            to_key,
-            limit,
+            _get_all_user_signature_changes_for_remotes_txn,
         )
 
 
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index cfd24d2f06..230fb5cd7f 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -14,7 +14,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import itertools
 import logging
 from collections import OrderedDict, namedtuple
@@ -28,12 +27,7 @@ from prometheus_client import Counter
 from twisted.internet import defer
 
 import synapse.metrics
-from synapse.api.constants import (
-    EventContentFields,
-    EventTypes,
-    Membership,
-    RelationTypes,
-)
+from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
 from synapse.api.room_versions import RoomVersions
 from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase  # noqa: F401
@@ -48,8 +42,8 @@ from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
-    from synapse.storage.data_stores.main import DataStore
     from synapse.server import HomeServer
+    from synapse.storage.data_stores.main import DataStore
 
 
 logger = logging.getLogger(__name__)
@@ -820,7 +814,6 @@ class PersistEventsStore:
             "event_reference_hashes",
             "event_search",
             "event_to_state_groups",
-            "local_invites",
             "state_events",
             "rejections",
             "redactions",
@@ -1197,65 +1190,27 @@ class PersistEventsStore:
                 (event.state_key,),
             )
 
-            # We update the local_invites table only if the event is "current",
-            # i.e., its something that has just happened. If the event is an
-            # outlier it is only current if its an "out of band membership",
-            # like a remote invite or a rejection of a remote invite.
-            is_new_state = not backfilled and (
-                not event.internal_metadata.is_outlier()
-                or event.internal_metadata.is_out_of_band_membership()
-            )
-            is_mine = self.is_mine_id(event.state_key)
-            if is_new_state and is_mine:
-                if event.membership == Membership.INVITE:
-                    self.db.simple_insert_txn(
-                        txn,
-                        table="local_invites",
-                        values={
-                            "event_id": event.event_id,
-                            "invitee": event.state_key,
-                            "inviter": event.sender,
-                            "room_id": event.room_id,
-                            "stream_id": event.internal_metadata.stream_ordering,
-                        },
-                    )
-                else:
-                    sql = (
-                        "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
-                        " room_id = ? AND invitee = ? AND locally_rejected is NULL"
-                        " AND replaced_by is NULL"
-                    )
-
-                    txn.execute(
-                        sql,
-                        (
-                            event.internal_metadata.stream_ordering,
-                            event.event_id,
-                            event.room_id,
-                            event.state_key,
-                        ),
-                    )
-
-                # We also update the `local_current_membership` table with
-                # latest invite info. This will usually get updated by the
-                # `current_state_events` handling, unless its an outlier.
-                if event.internal_metadata.is_outlier():
-                    # This should only happen for out of band memberships, so
-                    # we add a paranoia check.
-                    assert event.internal_metadata.is_out_of_band_membership()
-
-                    self.db.simple_upsert_txn(
-                        txn,
-                        table="local_current_membership",
-                        keyvalues={
-                            "room_id": event.room_id,
-                            "user_id": event.state_key,
-                        },
-                        values={
-                            "event_id": event.event_id,
-                            "membership": event.membership,
-                        },
-                    )
+            # We update the local_current_membership table only if the event is
+            # "current", i.e., its something that has just happened.
+            #
+            # This will usually get updated by the `current_state_events` handling,
+            # unless its an outlier, and an outlier is only "current" if it's an "out of
+            # band membership", like a remote invite or a rejection of a remote invite.
+            if (
+                self.is_mine_id(event.state_key)
+                and not backfilled
+                and event.internal_metadata.is_outlier()
+                and event.internal_metadata.is_out_of_band_membership()
+            ):
+                self.db.simple_upsert_txn(
+                    txn,
+                    table="local_current_membership",
+                    keyvalues={"room_id": event.room_id, "user_id": event.state_key},
+                    values={
+                        "event_id": event.event_id,
+                        "membership": event.membership,
+                    },
+                )
 
     def _handle_event_relations(self, txn, event):
         """Handles inserting relation data during peristence of events
@@ -1586,31 +1541,3 @@ class PersistEventsStore:
                 if not ev.internal_metadata.is_outlier()
             ],
         )
-
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-
-        sql = (
-            "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
-            " room_id = ? AND invitee = ? AND locally_rejected is NULL"
-            " AND replaced_by is NULL"
-        )
-
-        def f(txn, stream_ordering):
-            txn.execute(sql, (stream_ordering, True, room_id, user_id))
-
-            # We also clear this entry from `local_current_membership`.
-            # Ideally we'd point to a leave event, but we don't have one, so
-            # nevermind.
-            self.db.simple_delete_txn(
-                txn,
-                table="local_current_membership",
-                keyvalues={"room_id": room_id, "user_id": user_id},
-            )
-
-        with self._stream_id_gen.get_next() as stream_ordering:
-            await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
-
-        return stream_ordering
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 47a3e63589..01cad7d4fa 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -82,10 +82,7 @@ class EventsWorkerStore(SQLBaseStore):
             # We are the process in charge of generating stream ids for events,
             # so instantiate ID generators based on the database
             self._stream_id_gen = StreamIdGenerator(
-                db_conn,
-                "events",
-                "stream_ordering",
-                extra_tables=[("local_invites", "stream_id")],
+                db_conn, "events", "stream_ordering",
             )
             self._backfill_id_gen = StreamIdGenerator(
                 db_conn,
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index fb1361f1c1..4fb9f9850c 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -14,6 +14,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Tuple
+
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -526,13 +528,35 @@ class GroupServerWorkerStore(SQLBaseStore):
             "get_groups_changes_for_user", _get_groups_changes_for_user_txn
         )
 
-    def get_all_groups_changes(self, from_token, to_token, limit):
-        from_token = int(from_token)
-        has_changed = self._group_updates_stream_cache.has_any_entity_changed(
-            from_token
-        )
+    async def get_all_groups_changes(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for groups replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        last_id = int(last_id)
+        has_changed = self._group_updates_stream_cache.has_any_entity_changed(last_id)
+
         if not has_changed:
-            return defer.succeed([])
+            return [], current_id, False
 
         def _get_all_groups_changes_txn(txn):
             sql = """
@@ -541,13 +565,21 @@ class GroupServerWorkerStore(SQLBaseStore):
                 WHERE ? < stream_id AND stream_id <= ?
                 LIMIT ?
             """
-            txn.execute(sql, (from_token, to_token, limit))
-            return [
-                (stream_id, group_id, user_id, gtype, json.loads(content_json))
+            txn.execute(sql, (last_id, current_id, limit))
+            updates = [
+                (stream_id, (group_id, user_id, gtype, json.loads(content_json)))
                 for stream_id, group_id, user_id, gtype, content_json in txn
             ]
 
-        return self.db.runInteraction(
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db.runInteraction(
             "get_all_groups_changes", _get_all_groups_changes_txn
         )
 
diff --git a/synapse/storage/data_stores/main/purge_events.py b/synapse/storage/data_stores/main/purge_events.py
index a93e1ef198..6546569139 100644
--- a/synapse/storage/data_stores/main/purge_events.py
+++ b/synapse/storage/data_stores/main/purge_events.py
@@ -361,7 +361,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
             "event_push_summary",
             "pusher_throttle",
             "group_summary_rooms",
-            "local_invites",
             "room_account_data",
             "room_tags",
             "local_current_membership",
diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py
index 547b9d69cb..5461016240 100644
--- a/synapse/storage/data_stores/main/pusher.py
+++ b/synapse/storage/data_stores/main/pusher.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import Iterable, Iterator
+from typing import Iterable, Iterator, List, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
@@ -98,77 +98,69 @@ class PusherWorkerStore(SQLBaseStore):
         rows = yield self.db.runInteraction("get_all_pushers", get_pushers)
         return rows
 
-    def get_all_updated_pushers(self, last_id, current_id, limit):
-        if last_id == current_id:
-            return defer.succeed(([], []))
-
-        def get_all_updated_pushers_txn(txn):
-            sql = (
-                "SELECT id, user_name, access_token, profile_tag, kind,"
-                " app_id, app_display_name, device_display_name, pushkey, ts,"
-                " lang, data"
-                " FROM pushers"
-                " WHERE ? < id AND id <= ?"
-                " ORDER BY id ASC LIMIT ?"
-            )
-            txn.execute(sql, (last_id, current_id, limit))
-            updated = txn.fetchall()
-
-            sql = (
-                "SELECT stream_id, user_id, app_id, pushkey"
-                " FROM deleted_pushers"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
-            txn.execute(sql, (last_id, current_id, limit))
-            deleted = txn.fetchall()
+    async def get_all_updated_pushers_rows(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for pushers replication stream.
 
-            return updated, deleted
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
 
-        return self.db.runInteraction(
-            "get_all_updated_pushers", get_all_updated_pushers_txn
-        )
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
 
-    def get_all_updated_pushers_rows(self, last_id, current_id, limit):
-        """Get all the pushers that have changed between the given tokens.
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
 
-        Returns:
-            Deferred(list(tuple)): each tuple consists of:
-                stream_id (str)
-                user_id (str)
-                app_id (str)
-                pushkey (str)
-                was_deleted (bool): whether the pusher was added/updated (False)
-                    or deleted (True)
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_updated_pushers_rows_txn(txn):
-            sql = (
-                "SELECT id, user_name, app_id, pushkey"
-                " FROM pushers"
-                " WHERE ? < id AND id <= ?"
-                " ORDER BY id ASC LIMIT ?"
-            )
+            sql = """
+                SELECT id, user_name, app_id, pushkey
+                FROM pushers
+                WHERE ? < id AND id <= ?
+                ORDER BY id ASC LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
-            results = [list(row) + [False] for row in txn]
-
-            sql = (
-                "SELECT stream_id, user_id, app_id, pushkey"
-                " FROM deleted_pushers"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
+            updates = [
+                (stream_id, (user_name, app_id, pushkey, False))
+                for stream_id, user_name, app_id, pushkey in txn
+            ]
+
+            sql = """
+                SELECT stream_id, user_id, app_id, pushkey
+                FROM deleted_pushers
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
+            updates.extend(
+                (stream_id, (user_name, app_id, pushkey, True))
+                for stream_id, user_name, app_id, pushkey in txn
+            )
+
+            updates.sort()  # Sort so that they're ordered by stream id
 
-            results.extend(list(row) + [True] for row in txn)
-            results.sort()  # Sort so that they're ordered by stream id
+            limited = False
+            upper_bound = current_id
+            if len(updates) >= limit:
+                limited = True
+                upper_bound = updates[-1][0]
 
-            return results
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
         )
 
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index d72d6affb8..98a8b5a11a 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -826,7 +826,32 @@ class RoomWorkerStore(SQLBaseStore):
 
         return total_media_quarantined
 
-    def get_all_new_public_rooms(self, prev_id, current_id, limit):
+    async def get_all_new_public_rooms(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for public rooms replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+        if last_id == current_id:
+            return [], current_id, False
+
         def get_all_new_public_rooms(txn):
             sql = """
                 SELECT stream_id, room_id, visibility, appservice_id, network_id
@@ -836,13 +861,17 @@ class RoomWorkerStore(SQLBaseStore):
                 LIMIT ?
             """
 
-            txn.execute(sql, (prev_id, current_id, limit))
-            return txn.fetchall()
+            txn.execute(sql, (last_id, current_id, limit))
+            updates = [(row[0], row[1:]) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
 
-        if prev_id == current_id:
-            return defer.succeed([])
+            return updates, upto_token, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_new_public_rooms", get_all_new_public_rooms
         )
 
diff --git a/synapse/storage/data_stores/main/schema/delta/25/fts.py b/synapse/storage/data_stores/main/schema/delta/25/fts.py
index 4b2ffd35fd..ee675e71ff 100644
--- a/synapse/storage/data_stores/main/schema/delta/25/fts.py
+++ b/synapse/storage/data_stores/main/schema/delta/25/fts.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.storage.prepare_database import get_statements
 
@@ -66,7 +64,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/schema/delta/27/ts.py b/synapse/storage/data_stores/main/schema/delta/27/ts.py
index 414f9f5aa0..b7972cfa8e 100644
--- a/synapse/storage/data_stores/main/schema/delta/27/ts.py
+++ b/synapse/storage/data_stores/main/schema/delta/27/ts.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/schema/delta/31/search_update.py b/synapse/storage/data_stores/main/schema/delta/31/search_update.py
index 7d8ca5f93f..63b757ade6 100644
--- a/synapse/storage/data_stores/main/schema/delta/31/search_update.py
+++ b/synapse/storage/data_stores/main/schema/delta/31/search_update.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
@@ -50,7 +48,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "rows_inserted": 0,
             "have_added_indexes": False,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py
index bff1256a7b..a3e81eeac7 100644
--- a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py
+++ b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py
@@ -11,11 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import json
 import logging
 
-import simplejson
-
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs):
             "max_stream_id_exclusive": max_stream_id + 1,
             "rows_inserted": 0,
         }
-        progress_json = simplejson.dumps(progress)
+        progress_json = json.dumps(progress)
 
         sql = (
             "INSERT into background_updates (update_name, progress_json)"
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index f8c776be3f..290317fd94 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -53,18 +54,32 @@ class TagsWorkerStore(AccountDataWorkerStore):
 
         return deferred
 
-    @defer.inlineCallbacks
-    def get_all_updated_tags(self, last_id, current_id, limit):
-        """Get all the client tags that have changed on the server
+    async def get_all_updated_tags(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for tags replication stream.
+
         Args:
-            last_id(int): The position to fetch from.
-            current_id(int): The position to fetch up to.
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
         Returns:
-            A deferred list of tuples of stream_id int, user_id string,
-            room_id string, tag string and content string.
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
+
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         def get_all_updated_tags_txn(txn):
             sql = (
@@ -76,7 +91,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
             txn.execute(sql, (last_id, current_id, limit))
             return txn.fetchall()
 
-        tag_ids = yield self.db.runInteraction(
+        tag_ids = await self.db.runInteraction(
             "get_all_updated_tags", get_all_updated_tags_txn
         )
 
@@ -89,21 +104,27 @@ class TagsWorkerStore(AccountDataWorkerStore):
                 for tag, content in txn:
                     tags.append(json.dumps(tag) + ":" + content)
                 tag_json = "{" + ",".join(tags) + "}"
-                results.append((stream_id, user_id, room_id, tag_json))
+                results.append((stream_id, (user_id, room_id, tag_json)))
 
             return results
 
         batch_size = 50
         results = []
         for i in range(0, len(tag_ids), batch_size):
-            tags = yield self.db.runInteraction(
+            tags = await self.db.runInteraction(
                 "get_all_updated_tag_content",
                 get_tag_content,
                 tag_ids[i : i + batch_size],
             )
             results.extend(tags)
 
-        return results
+        limited = False
+        upto_token = current_id
+        if len(results) >= limit:
+            upto_token = results[-1][0]
+            limited = True
+
+        return results, upto_token, limited
 
     @defer.inlineCallbacks
     def get_updated_tags(self, user_id, stream_id):
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index ec2f38c373..4c044b1a15 100644
--- a/synapse/storage/data_stores/main/ui_auth.py
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -17,10 +17,10 @@ from typing import Any, Dict, Optional, Union
 
 import attr
 
-import synapse.util.stringutils as stringutils
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
 from synapse.types import JsonDict
+from synapse.util import stringutils as stringutils
 
 
 @attr.s
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index ec894a91cb..fa46041676 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -783,9 +783,3 @@ class EventsPersistenceStorage(object):
 
         for user_id in left_users:
             await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
-
-    async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
-        """Mark the invite has having been rejected even though we failed to
-        create a leave event for it.
-        """
-        return await self.persist_events_store.locally_reject_invite(user_id, room_id)
diff --git a/synapse/storage/types.py b/synapse/storage/types.py
index daff81c5ee..2d2b560e74 100644
--- a/synapse/storage/types.py
+++ b/synapse/storage/types.py
@@ -12,12 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from typing import Any, Iterable, Iterator, List, Tuple
 
 from typing_extensions import Protocol
 
-
 """
 Some very basic protocol definitions for the DB-API2 classes specified in PEP-249
 """
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index cd56cd91ed..ca7c16ff65 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -68,13 +68,13 @@ class PaginationConfig(object):
             elif from_tok:
                 from_tok = StreamToken.from_string(from_tok)
         except Exception:
-            raise SynapseError(400, "'from' paramater is invalid")
+            raise SynapseError(400, "'from' parameter is invalid")
 
         try:
             if to_tok:
                 to_tok = StreamToken.from_string(to_tok)
         except Exception:
-            raise SynapseError(400, "'to' paramater is invalid")
+            raise SynapseError(400, "'to' parameter is invalid")
 
         limit = parse_integer(request, "limit", default=default_limit)
 
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index fcd2aaa9c9..5d3eddcfdc 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -68,7 +68,7 @@ class EventSources(object):
         The returned token does not have the current values for fields other
         than `room`, since they are not used during pagination.
 
-        Retuns:
+        Returns:
             Deferred[StreamToken]
         """
         token = StreamToken(
diff --git a/synapse/types.py b/synapse/types.py
index 9f28f9a192..40cab6ed74 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -31,7 +31,7 @@ from synapse.api.errors import Codes, SynapseError
 if sys.version_info[:3] >= (3, 6, 0):
     from typing import Collection
 else:
-    from typing import Sized, Iterable, Container
+    from typing import Container, Iterable, Sized
 
     T_co = TypeVar("T_co", covariant=True)
 
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 60f0de70f7..c63256d3bd 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -55,7 +55,7 @@ class Clock(object):
         return self._reactor.seconds()
 
     def time_msec(self):
-        """Returns the current system time in miliseconds since epoch."""
+        """Returns the current system time in milliseconds since epoch."""
         return int(self.time() * 1000)
 
     def looping_call(self, f, msec, *args, **kwargs):
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 65abf0846e..f562770922 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -352,7 +352,7 @@ class ReadWriteLock(object):
     # resolved when they release the lock).
     #
     # Read: We know its safe to acquire a read lock when the latest writer has
-    # been resolved. The new reader is appeneded to the list of latest readers.
+    # been resolved. The new reader is appended to the list of latest readers.
     #
     # Write: We know its safe to acquire the write lock when both the latest
     # writers and readers have been resolved. The new writer replaces the latest
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 64f35fc288..9b09c08b89 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -516,7 +516,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
         """
         Args:
             orig (function)
-            cached_method_name (str): The name of the chached method.
+            cached_method_name (str): The name of the cached method.
             list_name (str): Name of the argument which is the bulk lookup list
             num_args (int): number of positional arguments (excluding ``self``,
                 but including list_name) to use as cache keys. Defaults to all
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 45af8d3eeb..da20523b70 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -39,7 +39,7 @@ class Distributor(object):
     Signals are named simply by strings.
 
     TODO(paul): It would be nice to give signals stronger object identities,
-      so we can attach metadata, docstrings, detect typoes, etc... But this
+      so we can attach metadata, docstrings, detect typos, etc... But this
       model will do for today.
     """
 
diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py
index 2605f3c65b..54c046b6e1 100644
--- a/synapse/util/patch_inline_callbacks.py
+++ b/synapse/util/patch_inline_callbacks.py
@@ -192,7 +192,7 @@ def _check_yield_points(f: Callable, changes: List[str]):
                 result = yield d
             except Exception:
                 # this will fish an earlier Failure out of the stack where possible, and
-                # thus is preferable to passing in an exeception to the Failure
+                # thus is preferable to passing in an exception to the Failure
                 # constructor, since it results in less stack-mangling.
                 result = Failure()
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index af69587196..8794317caa 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -22,7 +22,7 @@ from synapse.api.errors import CodeMessageException
 
 logger = logging.getLogger(__name__)
 
-# the intial backoff, after the first transaction fails
+# the initial backoff, after the first transaction fails
 MIN_RETRY_INTERVAL = 10 * 60 * 1000
 
 # how much we multiply the backoff by after each subsequent fail
@@ -174,7 +174,7 @@ class RetryDestinationLimiter(object):
             # has been decommissioned.
             # If we get a 401, then we should probably back off since they
             # won't accept our requests for at least a while.
-            # 429 is us being aggresively rate limited, so lets rate limit
+            # 429 is us being aggressively rate limited, so lets rate limit
             # ourselves.
             if exc_val.code == 404 and self.backoff_on_404:
                 valid_err_code = False
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 3dfd4af26c..0f042c5696 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -319,7 +319,7 @@ def filter_events_for_server(
         return True
 
     # Lets check to see if all the events have a history visibility
-    # of "shared" or "world_readable". If thats the case then we don't
+    # of "shared" or "world_readable". If that's the case then we don't
     # need to check membership (as we know the server is in the room).
     event_to_state_ids = yield storage.state.get_state_ids_for_events(
         frozenset(e.event_id for e in events),
@@ -335,7 +335,7 @@ def filter_events_for_server(
             visibility_ids.add(hist)
 
     # If we failed to find any history visibility events then the default
-    # is "shared" visiblity.
+    # is "shared" visibility.
     if not visibility_ids:
         all_open = True
     else:
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 70c8e72303..f9ce609923 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -192,7 +192,7 @@ class KeyringTestCase(unittest.HomeserverTestCase):
         d = _verify_json_for_server(kr, "server9", {}, 0, "test unsigned")
         self.failureResultOf(d, SynapseError)
 
-        # should suceed on a signed object
+        # should succeed on a signed object
         d = _verify_json_for_server(kr, "server9", json1, 500, "test signed")
         # self.assertFalse(d.called)
         self.get_success(d)
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index ba7148ec01..ebabe9a7d6 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -32,10 +32,11 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         self.mock_as_api = Mock()
         self.mock_scheduler = Mock()
         hs = Mock()
-        hs.get_datastore = Mock(return_value=self.mock_store)
-        self.mock_store.get_received_ts.return_value = 0
-        hs.get_application_service_api = Mock(return_value=self.mock_as_api)
-        hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler)
+        hs.get_datastore.return_value = self.mock_store
+        self.mock_store.get_received_ts.return_value = defer.succeed(0)
+        self.mock_store.set_appservice_last_pos.return_value = defer.succeed(None)
+        hs.get_application_service_api.return_value = self.mock_as_api
+        hs.get_application_service_scheduler.return_value = self.mock_scheduler
         hs.get_clock.return_value = MockClock()
         self.handler = ApplicationServicesHandler(hs)
 
@@ -48,18 +49,18 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             self._mkservice(is_interested=False),
         ]
 
-        self.mock_store.get_app_services = Mock(return_value=services)
-        self.mock_store.get_user_by_id = Mock(return_value=[])
+        self.mock_as_api.query_user.return_value = defer.succeed(True)
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_user_by_id.return_value = defer.succeed([])
 
         event = Mock(
             sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
         )
         self.mock_store.get_new_events_for_appservice.side_effect = [
-            (0, [event]),
-            (0, []),
+            defer.succeed((0, [event])),
+            defer.succeed((0, [])),
         ]
-        self.mock_as_api.push = Mock()
-        yield self.handler.notify_interested_services(0)
+        yield defer.ensureDeferred(self.handler.notify_interested_services(0))
         self.mock_scheduler.submit_event_for_as.assert_called_once_with(
             interested_service, event
         )
@@ -68,36 +69,34 @@ class AppServiceHandlerTestCase(unittest.TestCase):
     def test_query_user_exists_unknown_user(self):
         user_id = "@someone:anywhere"
         services = [self._mkservice(is_interested=True)]
-        services[0].is_interested_in_user = Mock(return_value=True)
-        self.mock_store.get_app_services = Mock(return_value=services)
-        self.mock_store.get_user_by_id = Mock(return_value=None)
+        services[0].is_interested_in_user.return_value = True
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_user_by_id.return_value = defer.succeed(None)
 
         event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
-        self.mock_as_api.push = Mock()
-        self.mock_as_api.query_user = Mock()
+        self.mock_as_api.query_user.return_value = defer.succeed(True)
         self.mock_store.get_new_events_for_appservice.side_effect = [
-            (0, [event]),
-            (0, []),
+            defer.succeed((0, [event])),
+            defer.succeed((0, [])),
         ]
-        yield self.handler.notify_interested_services(0)
+        yield defer.ensureDeferred(self.handler.notify_interested_services(0))
         self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
 
     @defer.inlineCallbacks
     def test_query_user_exists_known_user(self):
         user_id = "@someone:anywhere"
         services = [self._mkservice(is_interested=True)]
-        services[0].is_interested_in_user = Mock(return_value=True)
-        self.mock_store.get_app_services = Mock(return_value=services)
-        self.mock_store.get_user_by_id = Mock(return_value={"name": user_id})
+        services[0].is_interested_in_user.return_value = True
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_user_by_id.return_value = defer.succeed({"name": user_id})
 
         event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
-        self.mock_as_api.push = Mock()
-        self.mock_as_api.query_user = Mock()
+        self.mock_as_api.query_user.return_value = defer.succeed(True)
         self.mock_store.get_new_events_for_appservice.side_effect = [
-            (0, [event]),
-            (0, []),
+            defer.succeed((0, [event])),
+            defer.succeed((0, [])),
         ]
-        yield self.handler.notify_interested_services(0)
+        yield defer.ensureDeferred(self.handler.notify_interested_services(0))
         self.assertFalse(
             self.mock_as_api.query_user.called,
             "query_user called when it shouldn't have been.",
@@ -107,7 +106,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
     def test_query_room_alias_exists(self):
         room_alias_str = "#foo:bar"
         room_alias = Mock()
-        room_alias.to_string = Mock(return_value=room_alias_str)
+        room_alias.to_string.return_value = room_alias_str
 
         room_id = "!alpha:bet"
         servers = ["aperture"]
@@ -118,12 +117,15 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             self._mkservice_alias(is_interested_in_alias=False),
         ]
 
-        self.mock_store.get_app_services = Mock(return_value=services)
-        self.mock_store.get_association_from_room_alias = Mock(
-            return_value=Mock(room_id=room_id, servers=servers)
+        self.mock_as_api.query_alias.return_value = defer.succeed(True)
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_association_from_room_alias.return_value = defer.succeed(
+            Mock(room_id=room_id, servers=servers)
         )
 
-        result = yield self.handler.query_room_alias_exists(room_alias)
+        result = yield defer.ensureDeferred(
+            self.handler.query_room_alias_exists(room_alias)
+        )
 
         self.mock_as_api.query_alias.assert_called_once_with(
             interested_service, room_alias_str
@@ -133,14 +135,14 @@ class AppServiceHandlerTestCase(unittest.TestCase):
 
     def _mkservice(self, is_interested):
         service = Mock()
-        service.is_interested = Mock(return_value=is_interested)
+        service.is_interested.return_value = defer.succeed(is_interested)
         service.token = "mock_service_token"
         service.url = "mock_service_url"
         return service
 
     def _mkservice_alias(self, is_interested_in_alias):
         service = Mock()
-        service.is_interested_in_alias = Mock(return_value=is_interested_in_alias)
+        service.is_interested_in_alias.return_value = is_interested_in_alias
         service.token = "mock_service_token"
         service.url = "mock_service_url"
         return service
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 6c1dc72bd1..1acf287ca4 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -14,11 +14,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import mock
 
-import signedjson.key as key
-import signedjson.sign as sign
+from signedjson import key as key, sign as sign
 
 from twisted.internet import defer
 
diff --git a/tests/push/test_push_rule_evaluator.py b/tests/push/test_push_rule_evaluator.py
index af35d23aea..1f4b5ca2ac 100644
--- a/tests/push/test_push_rule_evaluator.py
+++ b/tests/push/test_push_rule_evaluator.py
@@ -15,6 +15,7 @@
 
 from synapse.api.room_versions import RoomVersions
 from synapse.events import FrozenEvent
+from synapse.push import push_rule_evaluator
 from synapse.push.push_rule_evaluator import PushRuleEvaluatorForEvent
 
 from tests import unittest
@@ -84,3 +85,19 @@ class PushRuleEvaluatorTestCase(unittest.TestCase):
         for body in (1, True, {"foo": "bar"}):
             evaluator = self._get_evaluator({"body": body})
             self.assertFalse(evaluator.matches(condition, "@user:test", "foo"))
+
+    def test_tweaks_for_actions(self):
+        """
+        This tests the behaviour of tweaks_for_actions.
+        """
+
+        actions = [
+            {"set_tweak": "sound", "value": "default"},
+            {"set_tweak": "highlight"},
+            "notify",
+        ]
+
+        self.assertEqual(
+            push_rule_evaluator.tweaks_for_actions(actions),
+            {"sound": "default", "highlight": True},
+        )
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 54cd24bf64..ae6d05a043 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -213,7 +213,6 @@ class PurgeRoomTestCase(unittest.HomeserverTestCase):
             "event_push_summary",

             "pusher_throttle",

             "group_summary_rooms",

-            "local_invites",

             "room_account_data",

             "room_tags",

             # "state_groups",  # Current impl leaves orphaned state groups around.

diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py
index 9e549d8a91..cc264cf0b5 100644
--- a/tests/rest/client/test_retention.py
+++ b/tests/rest/client/test_retention.py
@@ -127,7 +127,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
 
         events.append(self.get_success(store.get_event(valid_event_id)))
 
-        # Advance the time by anothe 2 days. After this, the first event should be
+        # Advance the time by another 2 days. After this, the first event should be
         # outdated but not the second one.
         self.reactor.advance(one_day_ms * 2 / 1000)
 
diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py
index 0fdff79aa7..3c66255dac 100644
--- a/tests/rest/client/v1/test_presence.py
+++ b/tests/rest/client/v1/test_presence.py
@@ -60,7 +60,7 @@ class PresenceTestCase(unittest.HomeserverTestCase):
 
     def test_put_presence_disabled(self):
         """
-        PUT to the status endpoint with use_presence disbled will NOT call
+        PUT to the status endpoint with use_presence disabled will NOT call
         set_state on the presence handler.
         """
         self.hs.config.use_presence = False
diff --git a/tests/rest/client/v2_alpha/test_relations.py b/tests/rest/client/v2_alpha/test_relations.py
index fd641a7c2f..99c9f4e928 100644
--- a/tests/rest/client/v2_alpha/test_relations.py
+++ b/tests/rest/client/v2_alpha/test_relations.py
@@ -99,7 +99,7 @@ class RelationsTestCase(unittest.HomeserverTestCase):
         self.assertEquals(400, channel.code, channel.json_body)
 
     def test_basic_paginate_relations(self):
-        """Tests that calling pagination API corectly the latest relations.
+        """Tests that calling pagination API correctly the latest relations.
         """
         channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction")
         self.assertEquals(200, channel.code, channel.json_body)
diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py
index 2ed9312d56..66fa5978b2 100644
--- a/tests/rest/media/v1/test_media_storage.py
+++ b/tests/rest/media/v1/test_media_storage.py
@@ -12,8 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
 import os
 import shutil
 import tempfile
@@ -25,8 +23,8 @@ from urllib import parse
 from mock import Mock
 
 import attr
-import PIL.Image as Image
 from parameterized import parameterized_class
+from PIL import Image as Image
 
 from twisted.internet.defer import Deferred
 
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index 278961c331..b589506c60 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -25,7 +25,7 @@ from synapse.storage.database import Database
 from synapse.storage.engines import create_engine
 
 from tests import unittest
-from tests.utils import TestHomeServer
+from tests.utils import TestHomeServer, default_config
 
 
 class SQLBaseStoreTestCase(unittest.TestCase):
@@ -49,10 +49,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
 
         self.db_pool.runWithConnection = runWithConnection
 
-        config = Mock()
-        config._disable_native_upserts = True
-        config.caches = Mock()
-        config.caches.event_cache_size = 1
+        config = default_config(name="test", parse=True)
         hs = TestHomeServer("test", config=config)
 
         sqlite_config = {"name": "sqlite3"}
diff --git a/tests/test_mau.py b/tests/test_mau.py
index 49667ed7f4..654a6fa42d 100644
--- a/tests/test_mau.py
+++ b/tests/test_mau.py
@@ -166,7 +166,7 @@ class TestMauLimit(unittest.HomeserverTestCase):
         self.do_sync_for_user(token5)
         self.do_sync_for_user(token6)
 
-        # But old user cant
+        # But old user can't
         with self.assertRaises(SynapseError) as cm:
             self.do_sync_for_user(token1)
 
diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py
index 431e9f8e5e..43297b530c 100644
--- a/tests/test_utils/event_injection.py
+++ b/tests/test_utils/event_injection.py
@@ -13,7 +13,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from typing import Optional, Tuple
 
 import synapse.server
@@ -25,7 +24,6 @@ from synapse.types import Collection
 
 from tests.test_utils import get_awaitable_result
 
-
 """
 Utility functions for poking events into the storage of the server under test.
 """
diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py
index 95301c013c..58ee918f65 100644
--- a/tests/util/test_logcontext.py
+++ b/tests/util/test_logcontext.py
@@ -124,7 +124,7 @@ class LoggingContextTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_make_deferred_yieldable(self):
-        # a function which retuns an incomplete deferred, but doesn't follow
+        # a function which returns an incomplete deferred, but doesn't follow
         # the synapse rules.
         def blocking_function():
             d = defer.Deferred()
@@ -183,7 +183,7 @@ class LoggingContextTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_make_deferred_yieldable_with_await(self):
-        # an async function which retuns an incomplete coroutine, but doesn't
+        # an async function which returns an incomplete coroutine, but doesn't
         # follow the synapse rules.
 
         async def blocking_function():
diff --git a/tox.ini b/tox.ini
index 83641266bb..e5aef3c062 100644
--- a/tox.ini
+++ b/tox.ini
@@ -132,8 +132,8 @@ commands =
 
 [testenv:check_isort]
 skip_install = True
-deps = isort
-commands = /bin/sh -c "isort -c -df -sp setup.cfg -rc synapse tests scripts-dev scripts"
+deps = isort==5.0.3
+commands = /bin/sh -c "isort -c --df --sp setup.cfg synapse tests scripts-dev scripts"
 
 [testenv:check-newsfragment]
 skip_install = True