summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md83
-rw-r--r--INSTALL.md12
-rw-r--r--changelog.d/5727.feature1
-rw-r--r--changelog.d/6140.misc1
-rw-r--r--changelog.d/6164.doc1
-rw-r--r--changelog.d/6213.bugfix1
-rw-r--r--changelog.d/6218.misc1
-rw-r--r--changelog.d/6220.feature1
-rw-r--r--changelog.d/6232.bugfix1
-rw-r--r--changelog.d/6235.bugfix1
-rw-r--r--changelog.d/6238.feature1
-rw-r--r--changelog.d/6240.misc1
-rw-r--r--changelog.d/6250.misc1
-rw-r--r--changelog.d/6251.misc1
-rw-r--r--changelog.d/6253.bugfix1
-rw-r--r--changelog.d/6254.bugfix1
-rw-r--r--changelog.d/6257.doc1
-rw-r--r--changelog.d/6259.misc1
-rw-r--r--changelog.d/6263.misc1
-rw-r--r--changelog.d/6269.misc1
-rw-r--r--changelog.d/6270.misc1
-rw-r--r--changelog.d/6271.misc1
-rw-r--r--changelog.d/6272.doc1
-rw-r--r--changelog.d/6273.doc1
-rw-r--r--changelog.d/6274.misc1
-rw-r--r--changelog.d/6275.misc1
-rw-r--r--changelog.d/6276.misc1
-rw-r--r--changelog.d/6277.misc1
-rw-r--r--changelog.d/6278.bugfix1
-rw-r--r--changelog.d/6279.misc1
-rw-r--r--changelog.d/6280.misc1
-rw-r--r--changelog.d/6284.bugfix1
-rw-r--r--changelog.d/6291.misc1
-rw-r--r--changelog.d/6294.misc1
-rw-r--r--changelog.d/6295.misc1
-rw-r--r--changelog.d/6298.misc1
-rw-r--r--changelog.d/6300.misc1
-rw-r--r--changelog.d/6301.feature1
-rw-r--r--changelog.d/6304.misc1
-rw-r--r--changelog.d/6305.misc1
-rw-r--r--changelog.d/6306.bugfix1
-rw-r--r--changelog.d/6307.bugfix1
-rw-r--r--changelog.d/6310.feature1
-rw-r--r--changelog.d/6312.misc1
-rw-r--r--changelog.d/6313.bugfix1
-rw-r--r--changelog.d/6314.misc1
-rw-r--r--changelog.d/6318.misc1
-rw-r--r--changelog.d/6319.misc1
-rw-r--r--changelog.d/6320.bugfix1
-rw-r--r--changelog.d/6322.misc1
-rw-r--r--changelog.d/6330.misc1
-rw-r--r--changelog.d/6332.bugfix1
-rw-r--r--changelog.d/6336.misc1
-rw-r--r--changelog.d/6338.bugfix1
-rw-r--r--changelog.d/6340.feature1
-rw-r--r--changelog.d/6341.misc1
-rw-r--r--changelog.d/6361.misc1
-rw-r--r--changelog.d/6362.misc1
-rw-r--r--changelog.d/6388.doc1
-rw-r--r--changelog.d/6390.doc1
-rw-r--r--changelog.d/6392.misc1
-rw-r--r--changelog.d/6408.bugfix1
-rw-r--r--docker/README.md12
-rwxr-xr-xdocker/start.py6
-rw-r--r--docs/sample_config.yaml8
-rw-r--r--docs/user_directory.md3
-rw-r--r--snap/snapcraft.yaml20
-rw-r--r--synapse/__init__.py4
-rw-r--r--synapse/_scripts/register_new_matrix_user.py6
-rw-r--r--synapse/api/errors.py2
-rw-r--r--synapse/app/federation_sender.py2
-rw-r--r--synapse/appservice/api.py2
-rw-r--r--synapse/config/appservice.py2
-rw-r--r--synapse/config/captcha.py4
-rw-r--r--synapse/config/emailconfig.py2
-rw-r--r--synapse/config/room_directory.py2
-rw-r--r--synapse/config/server.py10
-rw-r--r--synapse/federation/federation_client.py6
-rw-r--r--synapse/federation/persistence.py4
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/sender/transaction_manager.py4
-rw-r--r--synapse/federation/transport/__init__.py4
-rw-r--r--synapse/federation/transport/client.py6
-rw-r--r--synapse/federation/transport/server.py2
-rw-r--r--synapse/handlers/auth.py88
-rw-r--r--synapse/handlers/directory.py4
-rw-r--r--synapse/handlers/e2e_keys.py19
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/profile.py6
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/http/matrixfederationclient.py2
-rw-r--r--synapse/http/servlet.py2
-rw-r--r--synapse/logging/_structured.py14
-rw-r--r--synapse/logging/_terse_json.py108
-rw-r--r--synapse/push/httppusher.py5
-rw-r--r--synapse/push/mailer.py4
-rw-r--r--synapse/replication/http/__init__.py10
-rw-r--r--synapse/replication/http/devices.py73
-rw-r--r--synapse/rest/admin/__init__.py567
-rw-r--r--synapse/rest/admin/groups.py46
-rw-r--r--synapse/rest/admin/rooms.py157
-rw-r--r--synapse/rest/admin/users.py406
-rw-r--r--synapse/rest/client/v1/login.py111
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py2
-rw-r--r--synapse/server_notices/consent_server_notices.py2
-rw-r--r--synapse/storage/_base.py62
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py19
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py6
-rw-r--r--synapse/storage/data_stores/main/events.py8
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py43
-rw-r--r--synapse/storage/data_stores/main/filtering.py2
-rw-r--r--synapse/storage/data_stores/main/media_repository.py6
-rw-r--r--synapse/storage/data_stores/main/receipts.py2
-rw-r--r--synapse/storage/data_stores/main/registration.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py2
-rw-r--r--synapse/storage/data_stores/main/tags.py4
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/streams/config.py9
-rw-r--r--synapse/util/httpresourcetree.py2
-rw-r--r--tests/rest/client/v1/test_rooms.py74
-rw-r--r--tests/server.py2
122 files changed, 1312 insertions, 839 deletions
diff --git a/CHANGES.md b/CHANGES.md
index 9312dc2941..d26bc7a86f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,86 @@
+Synapse 1.6.0rc2 (2019-11-25)
+=============================
+
+Bugfixes
+--------
+
+- Fix a bug which could cause the background database update hander for event labels to get stuck in a loop raising exceptions. ([\#6407](https://github.com/matrix-org/synapse/issues/6407))
+
+
+Synapse 1.6.0rc1 (2019-11-20)
+=============================
+
+Features
+--------
+
+- Add federation support for cross-signing. ([\#5727](https://github.com/matrix-org/synapse/issues/5727))
+- Increase default room version from 4 to 5, thereby enforcing server key validity period checks. ([\#6220](https://github.com/matrix-org/synapse/issues/6220))
+- Add support for outbound http proxying via http_proxy/HTTPS_PROXY env vars. ([\#6238](https://github.com/matrix-org/synapse/issues/6238))
+- Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)). ([\#6301](https://github.com/matrix-org/synapse/issues/6301), [\#6310](https://github.com/matrix-org/synapse/issues/6310), [\#6340](https://github.com/matrix-org/synapse/issues/6340))
+
+
+Bugfixes
+--------
+
+- Fix LruCache callback deduplication for Python 3.8. Contributed by @V02460. ([\#6213](https://github.com/matrix-org/synapse/issues/6213))
+- Remove a room from a server's public rooms list on room upgrade. ([\#6232](https://github.com/matrix-org/synapse/issues/6232), [\#6235](https://github.com/matrix-org/synapse/issues/6235))
+- Delete keys from key backup when deleting backup versions. ([\#6253](https://github.com/matrix-org/synapse/issues/6253))
+- Make notification of cross-signing signatures work with workers. ([\#6254](https://github.com/matrix-org/synapse/issues/6254))
+- Fix exception when remote servers attempt to join a room that they're not allowed to join. ([\#6278](https://github.com/matrix-org/synapse/issues/6278))
+- Prevent errors from appearing on Synapse startup if `git` is not installed. ([\#6284](https://github.com/matrix-org/synapse/issues/6284))
+- Appservice requests will no longer contain a double slash prefix when the appservice url provided ends in a slash. ([\#6306](https://github.com/matrix-org/synapse/issues/6306))
+- Fix `/purge_room` admin API. ([\#6307](https://github.com/matrix-org/synapse/issues/6307))
+- Fix the `hidden` field in the `devices` table for SQLite versions prior to 3.23.0. ([\#6313](https://github.com/matrix-org/synapse/issues/6313))
+- Fix bug which casued rejected events to be persisted with the wrong room state. ([\#6320](https://github.com/matrix-org/synapse/issues/6320))
+- Fix bug where `rc_login` ratelimiting would prematurely kick in. ([\#6335](https://github.com/matrix-org/synapse/issues/6335))
+- Prevent the server taking a long time to start up when guest registration is enabled. ([\#6338](https://github.com/matrix-org/synapse/issues/6338))
+- Fix bug where upgrading a guest account to a full user would fail when account validity is enabled. ([\#6359](https://github.com/matrix-org/synapse/issues/6359))
+- Fix `to_device` stream ID getting reset every time Synapse restarts, which had the potential to cause unable to decrypt errors. ([\#6363](https://github.com/matrix-org/synapse/issues/6363))
+- Fix permission denied error when trying to generate a config file with the docker image. ([\#6389](https://github.com/matrix-org/synapse/issues/6389))
+
+
+Improved Documentation
+----------------------
+
+- Contributor documentation now mentions script to run linters. ([\#6164](https://github.com/matrix-org/synapse/issues/6164))
+- Modify CAPTCHA_SETUP.md to update the terms `private key` and `public key` to `secret key` and `site key` respectively. Contributed by Yash Jipkate. ([\#6257](https://github.com/matrix-org/synapse/issues/6257))
+- Update `INSTALL.md` Email section to talk about `account_threepid_delegates`. ([\#6272](https://github.com/matrix-org/synapse/issues/6272))
+- Fix a small typo in `account_threepid_delegates` configuration option. ([\#6273](https://github.com/matrix-org/synapse/issues/6273))
+
+
+Internal Changes
+----------------
+
+- Add a CI job to test the `synapse_port_db` script. ([\#6140](https://github.com/matrix-org/synapse/issues/6140), [\#6276](https://github.com/matrix-org/synapse/issues/6276))
+- Convert EventContext to an attrs. ([\#6218](https://github.com/matrix-org/synapse/issues/6218))
+- Move `persist_events` out from main data store. ([\#6240](https://github.com/matrix-org/synapse/issues/6240), [\#6300](https://github.com/matrix-org/synapse/issues/6300))
+- Reduce verbosity of user/room stats. ([\#6250](https://github.com/matrix-org/synapse/issues/6250))
+- Reduce impact of debug logging. ([\#6251](https://github.com/matrix-org/synapse/issues/6251))
+- Expose some homeserver functionality to spam checkers. ([\#6259](https://github.com/matrix-org/synapse/issues/6259))
+- Change cache descriptors to always return deferreds. ([\#6263](https://github.com/matrix-org/synapse/issues/6263), [\#6291](https://github.com/matrix-org/synapse/issues/6291))
+- Fix incorrect comment regarding the functionality of an `if` statement. ([\#6269](https://github.com/matrix-org/synapse/issues/6269))
+- Update CI to run `isort` over the `scripts` and `scripts-dev` directories. ([\#6270](https://github.com/matrix-org/synapse/issues/6270))
+- Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated. ([\#6271](https://github.com/matrix-org/synapse/issues/6271), [\#6314](https://github.com/matrix-org/synapse/issues/6314))
+- Port replication http server endpoints to async/await. ([\#6274](https://github.com/matrix-org/synapse/issues/6274))
+- Port room rest handlers to async/await. ([\#6275](https://github.com/matrix-org/synapse/issues/6275))
+- Remove redundant CLI parameters on CI's `flake8` step. ([\#6277](https://github.com/matrix-org/synapse/issues/6277))
+- Port `federation_server.py` to async/await. ([\#6279](https://github.com/matrix-org/synapse/issues/6279))
+- Port receipt and read markers to async/wait. ([\#6280](https://github.com/matrix-org/synapse/issues/6280))
+- Split out state storage into separate data store. ([\#6294](https://github.com/matrix-org/synapse/issues/6294), [\#6295](https://github.com/matrix-org/synapse/issues/6295))
+- Refactor EventContext for clarity. ([\#6298](https://github.com/matrix-org/synapse/issues/6298))
+- Update the version of black used to 19.10b0. ([\#6304](https://github.com/matrix-org/synapse/issues/6304))
+- Add some documentation about worker replication. ([\#6305](https://github.com/matrix-org/synapse/issues/6305))
+- Move admin endpoints into separate files. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#6308](https://github.com/matrix-org/synapse/issues/6308))
+- Document the use of `lint.sh` for code style enforcement & extend it to run on specified paths only. ([\#6312](https://github.com/matrix-org/synapse/issues/6312))
+- Add optional python dependencies and dependant binary libraries to snapcraft packaging. ([\#6317](https://github.com/matrix-org/synapse/issues/6317))
+- Remove the dependency on psutil and replace functionality with the stdlib `resource` module. ([\#6318](https://github.com/matrix-org/synapse/issues/6318), [\#6336](https://github.com/matrix-org/synapse/issues/6336))
+- Improve documentation for EventContext fields. ([\#6319](https://github.com/matrix-org/synapse/issues/6319))
+- Add some checks that we aren't using state from rejected events. ([\#6330](https://github.com/matrix-org/synapse/issues/6330))
+- Add continuous integration for python 3.8. ([\#6341](https://github.com/matrix-org/synapse/issues/6341))
+- Correct spacing/case of various instances of the word "homeserver". ([\#6357](https://github.com/matrix-org/synapse/issues/6357))
+- Temporarily blacklist the failing unit test PurgeRoomTestCase.test_purge_room. ([\#6361](https://github.com/matrix-org/synapse/issues/6361))
+
+
 Synapse 1.5.1 (2019-11-06)
 ==========================
 
diff --git a/INSTALL.md b/INSTALL.md
index 29e0abafd3..9b7360f0ef 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -133,9 +133,9 @@ sudo yum install libtiff-devel libjpeg-devel libzip-devel freetype-devel \
 sudo yum groupinstall "Development Tools"
 ```
 
-#### Mac OS X
+#### macOS
 
-Installing prerequisites on Mac OS X:
+Installing prerequisites on macOS:
 
 ```
 xcode-select --install
@@ -144,6 +144,14 @@ sudo pip install virtualenv
 brew install pkg-config libffi
 ```
 
+On macOS Catalina (10.15) you may need to explicitly install OpenSSL
+via brew and inform `pip` about it so that `psycopg2` builds:
+
+```
+brew install openssl@1.1
+export LDFLAGS=-L/usr/local/Cellar/openssl\@1.1/1.1.1d/lib/
+```
+
 #### OpenSUSE
 
 Installing prerequisites on openSUSE:
diff --git a/changelog.d/5727.feature b/changelog.d/5727.feature
deleted file mode 100644
index 819bebf2d7..0000000000
--- a/changelog.d/5727.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add federation support for cross-signing.
diff --git a/changelog.d/6140.misc b/changelog.d/6140.misc
deleted file mode 100644
index 0feb97ec61..0000000000
--- a/changelog.d/6140.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add a CI job to test the `synapse_port_db` script.
\ No newline at end of file
diff --git a/changelog.d/6164.doc b/changelog.d/6164.doc
deleted file mode 100644
index f9395b02b3..0000000000
--- a/changelog.d/6164.doc
+++ /dev/null
@@ -1 +0,0 @@
-Contributor documentation now mentions script to run linters.
diff --git a/changelog.d/6213.bugfix b/changelog.d/6213.bugfix
deleted file mode 100644
index 2bb2d08851..0000000000
--- a/changelog.d/6213.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix LruCache callback deduplication for Python 3.8. Contributed by @V02460.
diff --git a/changelog.d/6218.misc b/changelog.d/6218.misc
deleted file mode 100644
index 49d10c36cf..0000000000
--- a/changelog.d/6218.misc
+++ /dev/null
@@ -1 +0,0 @@
-Convert EventContext to an attrs.
diff --git a/changelog.d/6220.feature b/changelog.d/6220.feature
deleted file mode 100644
index 8343e9912b..0000000000
--- a/changelog.d/6220.feature
+++ /dev/null
@@ -1 +0,0 @@
-Increase default room version from 4 to 5, thereby enforcing server key validity period checks.
diff --git a/changelog.d/6232.bugfix b/changelog.d/6232.bugfix
deleted file mode 100644
index 12718ba934..0000000000
--- a/changelog.d/6232.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Remove a room from a server's public rooms list on room upgrade.
\ No newline at end of file
diff --git a/changelog.d/6235.bugfix b/changelog.d/6235.bugfix
deleted file mode 100644
index 12718ba934..0000000000
--- a/changelog.d/6235.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Remove a room from a server's public rooms list on room upgrade.
\ No newline at end of file
diff --git a/changelog.d/6238.feature b/changelog.d/6238.feature
deleted file mode 100644
index d225ac33b6..0000000000
--- a/changelog.d/6238.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add support for outbound http proxying via http_proxy/HTTPS_PROXY env vars.
diff --git a/changelog.d/6240.misc b/changelog.d/6240.misc
deleted file mode 100644
index 0b3d7a14a1..0000000000
--- a/changelog.d/6240.misc
+++ /dev/null
@@ -1 +0,0 @@
-Move `persist_events` out from main data store.
diff --git a/changelog.d/6250.misc b/changelog.d/6250.misc
deleted file mode 100644
index 12e3fe66b0..0000000000
--- a/changelog.d/6250.misc
+++ /dev/null
@@ -1 +0,0 @@
-Reduce verbosity of user/room stats.
diff --git a/changelog.d/6251.misc b/changelog.d/6251.misc
deleted file mode 100644
index 371c6983be..0000000000
--- a/changelog.d/6251.misc
+++ /dev/null
@@ -1 +0,0 @@
-Reduce impact of debug logging.
diff --git a/changelog.d/6253.bugfix b/changelog.d/6253.bugfix
deleted file mode 100644
index 266fae381c..0000000000
--- a/changelog.d/6253.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Delete keys from key backup when deleting backup versions.
diff --git a/changelog.d/6254.bugfix b/changelog.d/6254.bugfix
deleted file mode 100644
index 3181484b88..0000000000
--- a/changelog.d/6254.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Make notification of cross-signing signatures work with workers.
diff --git a/changelog.d/6257.doc b/changelog.d/6257.doc
deleted file mode 100644
index e985afde0e..0000000000
--- a/changelog.d/6257.doc
+++ /dev/null
@@ -1 +0,0 @@
-Modify CAPTCHA_SETUP.md to update the terms `private key` and `public key` to `secret key` and `site key` respectively. Contributed by Yash Jipkate.
diff --git a/changelog.d/6259.misc b/changelog.d/6259.misc
deleted file mode 100644
index 3ff81b1ac7..0000000000
--- a/changelog.d/6259.misc
+++ /dev/null
@@ -1 +0,0 @@
-Expose some homeserver functionality to spam checkers.
diff --git a/changelog.d/6263.misc b/changelog.d/6263.misc
deleted file mode 100644
index 7b1bb4b679..0000000000
--- a/changelog.d/6263.misc
+++ /dev/null
@@ -1 +0,0 @@
-Change cache descriptors to always return deferreds.
diff --git a/changelog.d/6269.misc b/changelog.d/6269.misc
deleted file mode 100644
index 9fd333cc89..0000000000
--- a/changelog.d/6269.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix incorrect comment regarding the functionality of an `if` statement.
\ No newline at end of file
diff --git a/changelog.d/6270.misc b/changelog.d/6270.misc
deleted file mode 100644
index d1c5811323..0000000000
--- a/changelog.d/6270.misc
+++ /dev/null
@@ -1 +0,0 @@
-Update CI to run `isort` over the `scripts` and `scripts-dev` directories.
\ No newline at end of file
diff --git a/changelog.d/6271.misc b/changelog.d/6271.misc
deleted file mode 100644
index 2369760272..0000000000
--- a/changelog.d/6271.misc
+++ /dev/null
@@ -1 +0,0 @@
-Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated.
\ No newline at end of file
diff --git a/changelog.d/6272.doc b/changelog.d/6272.doc
deleted file mode 100644
index 232180bcdc..0000000000
--- a/changelog.d/6272.doc
+++ /dev/null
@@ -1 +0,0 @@
-Update `INSTALL.md` Email section to talk about `account_threepid_delegates`.
\ No newline at end of file
diff --git a/changelog.d/6273.doc b/changelog.d/6273.doc
deleted file mode 100644
index 21a41d987d..0000000000
--- a/changelog.d/6273.doc
+++ /dev/null
@@ -1 +0,0 @@
-Fix a small typo in `account_threepid_delegates` configuration option.
\ No newline at end of file
diff --git a/changelog.d/6274.misc b/changelog.d/6274.misc
deleted file mode 100644
index eb4966124f..0000000000
--- a/changelog.d/6274.misc
+++ /dev/null
@@ -1 +0,0 @@
-Port replication http server endpoints to async/await.
diff --git a/changelog.d/6275.misc b/changelog.d/6275.misc
deleted file mode 100644
index f57e2c4adb..0000000000
--- a/changelog.d/6275.misc
+++ /dev/null
@@ -1 +0,0 @@
-Port room rest handlers to async/await.
diff --git a/changelog.d/6276.misc b/changelog.d/6276.misc
deleted file mode 100644
index 4a4428251e..0000000000
--- a/changelog.d/6276.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add a CI job to test the `synapse_port_db` script.
diff --git a/changelog.d/6277.misc b/changelog.d/6277.misc
deleted file mode 100644
index 490713577f..0000000000
--- a/changelog.d/6277.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove redundant CLI parameters on CI's `flake8` step.
\ No newline at end of file
diff --git a/changelog.d/6278.bugfix b/changelog.d/6278.bugfix
deleted file mode 100644
index c107270461..0000000000
--- a/changelog.d/6278.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix exception when remote servers attempt to join a room that they're not allowed to join.
diff --git a/changelog.d/6279.misc b/changelog.d/6279.misc
deleted file mode 100644
index 5f5144a9ee..0000000000
--- a/changelog.d/6279.misc
+++ /dev/null
@@ -1 +0,0 @@
-Port `federation_server.py` to async/await.
diff --git a/changelog.d/6280.misc b/changelog.d/6280.misc
deleted file mode 100644
index 96a0eb21b2..0000000000
--- a/changelog.d/6280.misc
+++ /dev/null
@@ -1 +0,0 @@
-Port receipt and read markers to async/wait.
diff --git a/changelog.d/6284.bugfix b/changelog.d/6284.bugfix
deleted file mode 100644
index cf15053d2d..0000000000
--- a/changelog.d/6284.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Prevent errors from appearing on Synapse startup if `git` is not installed.
\ No newline at end of file
diff --git a/changelog.d/6291.misc b/changelog.d/6291.misc
deleted file mode 100644
index 7b1bb4b679..0000000000
--- a/changelog.d/6291.misc
+++ /dev/null
@@ -1 +0,0 @@
-Change cache descriptors to always return deferreds.
diff --git a/changelog.d/6294.misc b/changelog.d/6294.misc
deleted file mode 100644
index a3e6b8296e..0000000000
--- a/changelog.d/6294.misc
+++ /dev/null
@@ -1 +0,0 @@
-Split out state storage into separate data store.
diff --git a/changelog.d/6295.misc b/changelog.d/6295.misc
deleted file mode 100644
index a3e6b8296e..0000000000
--- a/changelog.d/6295.misc
+++ /dev/null
@@ -1 +0,0 @@
-Split out state storage into separate data store.
diff --git a/changelog.d/6298.misc b/changelog.d/6298.misc
deleted file mode 100644
index d4190730b2..0000000000
--- a/changelog.d/6298.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor EventContext for clarity.
\ No newline at end of file
diff --git a/changelog.d/6300.misc b/changelog.d/6300.misc
deleted file mode 100644
index 0b3d7a14a1..0000000000
--- a/changelog.d/6300.misc
+++ /dev/null
@@ -1 +0,0 @@
-Move `persist_events` out from main data store.
diff --git a/changelog.d/6301.feature b/changelog.d/6301.feature
deleted file mode 100644
index 78a187a1dc..0000000000
--- a/changelog.d/6301.feature
+++ /dev/null
@@ -1 +0,0 @@
-Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).
diff --git a/changelog.d/6304.misc b/changelog.d/6304.misc
deleted file mode 100644
index 20372b4f7c..0000000000
--- a/changelog.d/6304.misc
+++ /dev/null
@@ -1 +0,0 @@
-Update the version of black used to 19.10b0.
diff --git a/changelog.d/6305.misc b/changelog.d/6305.misc
deleted file mode 100644
index f047fc3062..0000000000
--- a/changelog.d/6305.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add some documentation about worker replication.
diff --git a/changelog.d/6306.bugfix b/changelog.d/6306.bugfix
deleted file mode 100644
index c7dcbcdce8..0000000000
--- a/changelog.d/6306.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Appservice requests will no longer contain a double slash prefix when the appservice url provided ends in a slash.
diff --git a/changelog.d/6307.bugfix b/changelog.d/6307.bugfix
deleted file mode 100644
index f2917c5053..0000000000
--- a/changelog.d/6307.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix `/purge_room` admin API.
diff --git a/changelog.d/6310.feature b/changelog.d/6310.feature
deleted file mode 100644
index 78a187a1dc..0000000000
--- a/changelog.d/6310.feature
+++ /dev/null
@@ -1 +0,0 @@
-Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).
diff --git a/changelog.d/6312.misc b/changelog.d/6312.misc
deleted file mode 100644
index 55e3e1654d..0000000000
--- a/changelog.d/6312.misc
+++ /dev/null
@@ -1 +0,0 @@
-Document the use of `lint.sh` for code style enforcement & extend it to run on specified paths only.
diff --git a/changelog.d/6313.bugfix b/changelog.d/6313.bugfix
deleted file mode 100644
index f4d4a97f00..0000000000
--- a/changelog.d/6313.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix the `hidden` field in the `devices` table for SQLite versions prior to 3.23.0.
diff --git a/changelog.d/6314.misc b/changelog.d/6314.misc
deleted file mode 100644
index 2369760272..0000000000
--- a/changelog.d/6314.misc
+++ /dev/null
@@ -1 +0,0 @@
-Replace every instance of `logger.warn` method with `logger.warning` as the former is deprecated.
\ No newline at end of file
diff --git a/changelog.d/6318.misc b/changelog.d/6318.misc
deleted file mode 100644
index 63527ccef4..0000000000
--- a/changelog.d/6318.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove the dependency on psutil and replace functionality with the stdlib `resource` module.
diff --git a/changelog.d/6319.misc b/changelog.d/6319.misc
deleted file mode 100644
index 9711ef21ed..0000000000
--- a/changelog.d/6319.misc
+++ /dev/null
@@ -1 +0,0 @@
-Improve documentation for EventContext fields.
diff --git a/changelog.d/6320.bugfix b/changelog.d/6320.bugfix
deleted file mode 100644
index 2c3fad5655..0000000000
--- a/changelog.d/6320.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix bug which casued rejected events to be persisted with the wrong room state.
diff --git a/changelog.d/6322.misc b/changelog.d/6322.misc
new file mode 100644
index 0000000000..70ef36ca80
--- /dev/null
+++ b/changelog.d/6322.misc
@@ -0,0 +1 @@
+Improve the performance of outputting structured logging.
diff --git a/changelog.d/6330.misc b/changelog.d/6330.misc
deleted file mode 100644
index 6239cba263..0000000000
--- a/changelog.d/6330.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add some checks that we aren't using state from rejected events.
diff --git a/changelog.d/6332.bugfix b/changelog.d/6332.bugfix
new file mode 100644
index 0000000000..67d5170ba0
--- /dev/null
+++ b/changelog.d/6332.bugfix
@@ -0,0 +1 @@
+Fix caching devices for remote users when using workers, so that we don't attempt to refetch (and potentially fail) each time a user requests devices.
diff --git a/changelog.d/6336.misc b/changelog.d/6336.misc
deleted file mode 100644
index 63527ccef4..0000000000
--- a/changelog.d/6336.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove the dependency on psutil and replace functionality with the stdlib `resource` module.
diff --git a/changelog.d/6338.bugfix b/changelog.d/6338.bugfix
deleted file mode 100644
index 8e469f0fb6..0000000000
--- a/changelog.d/6338.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Prevent the server taking a long time to start up when guest registration is enabled.
\ No newline at end of file
diff --git a/changelog.d/6340.feature b/changelog.d/6340.feature
deleted file mode 100644
index 78a187a1dc..0000000000
--- a/changelog.d/6340.feature
+++ /dev/null
@@ -1 +0,0 @@
-Implement label-based filtering on `/sync` and `/messages` ([MSC2326](https://github.com/matrix-org/matrix-doc/pull/2326)).
diff --git a/changelog.d/6341.misc b/changelog.d/6341.misc
deleted file mode 100644
index 359b9bf1d7..0000000000
--- a/changelog.d/6341.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add continuous integration for python 3.8.
\ No newline at end of file
diff --git a/changelog.d/6361.misc b/changelog.d/6361.misc
deleted file mode 100644
index 324d74ebf9..0000000000
--- a/changelog.d/6361.misc
+++ /dev/null
@@ -1 +0,0 @@
-Temporarily blacklist the failing unit test PurgeRoomTestCase.test_purge_room.
diff --git a/changelog.d/6362.misc b/changelog.d/6362.misc
new file mode 100644
index 0000000000..b79a5bea99
--- /dev/null
+++ b/changelog.d/6362.misc
@@ -0,0 +1 @@
+Clean up some unnecessary quotation marks around the codebase.
\ No newline at end of file
diff --git a/changelog.d/6388.doc b/changelog.d/6388.doc
new file mode 100644
index 0000000000..c777cb6b8f
--- /dev/null
+++ b/changelog.d/6388.doc
@@ -0,0 +1 @@
+Fix link in the user directory documentation.
diff --git a/changelog.d/6390.doc b/changelog.d/6390.doc
new file mode 100644
index 0000000000..093411bec1
--- /dev/null
+++ b/changelog.d/6390.doc
@@ -0,0 +1 @@
+Add build instructions to the docker readme.
\ No newline at end of file
diff --git a/changelog.d/6392.misc b/changelog.d/6392.misc
new file mode 100644
index 0000000000..a00257944f
--- /dev/null
+++ b/changelog.d/6392.misc
@@ -0,0 +1 @@
+Add a test scenario to make sure room history purges don't break `/messages` in the future.
diff --git a/changelog.d/6408.bugfix b/changelog.d/6408.bugfix
new file mode 100644
index 0000000000..c9babe599b
--- /dev/null
+++ b/changelog.d/6408.bugfix
@@ -0,0 +1 @@
+Fix an intermittent exception when handling read-receipts.
diff --git a/docker/README.md b/docker/README.md
index 24dfa77dcc..9f112a01d0 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -130,3 +130,15 @@ docker run -it --rm \
 This will generate the same configuration file as the legacy mode used, but
 will store it in `/data/homeserver.yaml` instead of a temporary location. You
 can then use it as shown above at [Running synapse](#running-synapse).
+
+## Building the image
+
+If you need to build the image from a Synapse checkout, use the following `docker
+ build` command from the repo's root:
+ 
+```
+docker build -t matrixdotorg/synapse -f docker/Dockerfile .
+```
+
+You can choose to build a different docker image by changing the value of the `-f` flag to
+point to another Dockerfile.
diff --git a/docker/start.py b/docker/start.py
index 6e1cb807a1..97fd247f8f 100755
--- a/docker/start.py
+++ b/docker/start.py
@@ -169,11 +169,11 @@ def run_generate_config(environ, ownership):
     # log("running %s" % (args, ))
 
     if ownership is not None:
-        args = ["su-exec", ownership] + args
-        os.execv("/sbin/su-exec", args)
-
         # make sure that synapse has perms to write to the data dir.
         subprocess.check_output(["chown", ownership, data_dir])
+
+        args = ["su-exec", ownership] + args
+        os.execv("/sbin/su-exec", args)
     else:
         os.execv("/usr/local/bin/python", args)
 
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 35b0bdf6f5..896159394c 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -287,7 +287,7 @@ listeners:
 # Used by phonehome stats to group together related servers.
 #server_context: context
 
-# Resource-constrained Homeserver Settings
+# Resource-constrained homeserver Settings
 #
 # If limit_remote_rooms.enabled is True, the room complexity will be
 # checked before a user joins a new remote room. If it is above
@@ -743,11 +743,11 @@ uploads_path: "DATADIR/uploads"
 ## Captcha ##
 # See docs/CAPTCHA_SETUP for full details of configuring this.
 
-# This Home Server's ReCAPTCHA public key.
+# This homeserver's ReCAPTCHA public key.
 #
 #recaptcha_public_key: "YOUR_PUBLIC_KEY"
 
-# This Home Server's ReCAPTCHA private key.
+# This homeserver's ReCAPTCHA private key.
 #
 #recaptcha_private_key: "YOUR_PRIVATE_KEY"
 
@@ -1270,7 +1270,7 @@ password_config:
 #   smtp_user: "exampleusername"
 #   smtp_pass: "examplepassword"
 #   require_transport_security: false
-#   notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
+#   notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
 #   app_name: Matrix
 #
 #   # Enable email notifications by default
diff --git a/docs/user_directory.md b/docs/user_directory.md
index e64aa453cc..37dc71e751 100644
--- a/docs/user_directory.md
+++ b/docs/user_directory.md
@@ -7,7 +7,6 @@ who are present in a publicly viewable room present on the server.
 
 The directory info is stored in various tables, which can (typically after
 DB corruption) get stale or out of sync.  If this happens, for now the
-solution to fix it is to execute the SQL here
-https://github.com/matrix-org/synapse/blob/master/synapse/storage/schema/delta/53/user_dir_populate.sql
+solution to fix it is to execute the SQL [here](../synapse/storage/data_stores/main/schema/delta/53/user_dir_populate.sql)
 and then restart synapse. This should then start a background task to
 flush the current tables and regenerate the directory.
diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml
index 1f7df71db2..9e644e8567 100644
--- a/snap/snapcraft.yaml
+++ b/snap/snapcraft.yaml
@@ -20,3 +20,23 @@ parts:
     source: .
     plugin: python
     python-version: python3
+    python-packages:
+      - '.[all]'
+    build-packages:
+      - libffi-dev
+      - libturbojpeg0-dev
+      - libssl-dev
+      - libxslt1-dev
+      - libpq-dev
+      - zlib1g-dev
+    stage-packages:
+      - libasn1-8-heimdal
+      - libgssapi3-heimdal
+      - libhcrypto4-heimdal
+      - libheimbase1-heimdal
+      - libheimntlm0-heimdal
+      - libhx509-5-heimdal
+      - libkrb5-26-heimdal
+      - libldap-2.4-2
+      - libpq5
+      - libsasl2-2
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ec16f54a49..051c83774e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-""" This is a reference implementation of a Matrix home server.
+""" This is a reference implementation of a Matrix homeserver.
 """
 
 import os
@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.5.1"
+__version__ = "1.6.0rc2"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index bdcd915bbe..d528450c78 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -144,8 +144,8 @@ def main():
     logging.captureWarnings(True)
 
     parser = argparse.ArgumentParser(
-        description="Used to register new users with a given home server when"
-        " registration has been disabled. The home server must be"
+        description="Used to register new users with a given homeserver when"
+        " registration has been disabled. The homeserver must be"
         " configured with the 'registration_shared_secret' option"
         " set."
     )
@@ -202,7 +202,7 @@ def main():
         "server_url",
         default="https://localhost:8448",
         nargs="?",
-        help="URL to use to talk to the home server. Defaults to "
+        help="URL to use to talk to the homeserver. Defaults to "
         " 'https://localhost:8448'.",
     )
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index cca92c34ba..5853a54c95 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -457,7 +457,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
 
 
 class FederationError(RuntimeError):
-    """  This class is used to inform remote home servers about erroneous
+    """  This class is used to inform remote homeservers about erroneous
     PDUs they sent us.
 
     FATAL: The remote server could not interpret the source event.
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 139221ad34..448e45e00f 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
         self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
 
     def _get_federation_out_pos(self, db_conn):
-        sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
+        sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
         sql = self.database_engine.convert_param_style(sql)
 
         txn = db_conn.cursor()
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 3e25bf5747..57174da021 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -185,7 +185,7 @@ class ApplicationServiceApi(SimpleHttpClient):
 
                 if not _is_valid_3pe_metadata(info):
                     logger.warning(
-                        "query_3pe_protocol to %s did not return a" " valid result", uri
+                        "query_3pe_protocol to %s did not return a valid result", uri
                     )
                     return None
 
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index e77d3387ff..ca43e96bd1 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -134,7 +134,7 @@ def _load_appservice(hostname, as_info, config_filename):
             for regex_obj in as_info["namespaces"][ns]:
                 if not isinstance(regex_obj, dict):
                     raise ValueError(
-                        "Expected namespace entry in %s to be an object," " but got %s",
+                        "Expected namespace entry in %s to be an object, but got %s",
                         ns,
                         regex_obj,
                     )
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index 44bd5c6799..f0171bb5b2 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -35,11 +35,11 @@ class CaptchaConfig(Config):
         ## Captcha ##
         # See docs/CAPTCHA_SETUP for full details of configuring this.
 
-        # This Home Server's ReCAPTCHA public key.
+        # This homeserver's ReCAPTCHA public key.
         #
         #recaptcha_public_key: "YOUR_PUBLIC_KEY"
 
-        # This Home Server's ReCAPTCHA private key.
+        # This homeserver's ReCAPTCHA private key.
         #
         #recaptcha_private_key: "YOUR_PRIVATE_KEY"
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 39e7a1dddb..43fad0bf8b 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -305,7 +305,7 @@ class EmailConfig(Config):
         #   smtp_user: "exampleusername"
         #   smtp_pass: "examplepassword"
         #   require_transport_security: false
-        #   notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
+        #   notif_from: "Your Friendly %(app)s homeserver <noreply@example.com>"
         #   app_name: Matrix
         #
         #   # Enable email notifications by default
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 7c9f05bde4..7ac7699676 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -170,7 +170,7 @@ class _RoomDirectoryRule(object):
             self.action = action
         else:
             raise ConfigError(
-                "%s rules can only have action of 'allow'" " or 'deny'" % (option_name,)
+                "%s rules can only have action of 'allow' or 'deny'" % (option_name,)
             )
 
         self._alias_matches_all = alias == "*"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 9015ca5f87..11336d7549 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -223,7 +223,7 @@ class ServerConfig(Config):
             self.federation_ip_range_blacklist.update(["0.0.0.0", "::"])
         except Exception as e:
             raise ConfigError(
-                "Invalid range(s) provided in " "federation_ip_range_blacklist: %s" % e
+                "Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
             )
 
         if self.public_baseurl is not None:
@@ -721,7 +721,7 @@ class ServerConfig(Config):
         # Used by phonehome stats to group together related servers.
         #server_context: context
 
-        # Resource-constrained Homeserver Settings
+        # Resource-constrained homeserver Settings
         #
         # If limit_remote_rooms.enabled is True, the room complexity will be
         # checked before a user joins a new remote room. If it is above
@@ -781,20 +781,20 @@ class ServerConfig(Config):
             "--daemonize",
             action="store_true",
             default=None,
-            help="Daemonize the home server",
+            help="Daemonize the homeserver",
         )
         server_group.add_argument(
             "--print-pidfile",
             action="store_true",
             default=None,
-            help="Print the path to the pidfile just" " before daemonizing",
+            help="Print the path to the pidfile just before daemonizing",
         )
         server_group.add_argument(
             "--manhole",
             metavar="PORT",
             dest="manhole",
             type=int,
-            help="Turn on the twisted telnet manhole" " service on the given port.",
+            help="Turn on the twisted telnet manhole service on the given port.",
         )
 
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 545d719652..27f6aff004 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -177,7 +177,7 @@ class FederationClient(FederationBase):
         given destination server.
 
         Args:
-            dest (str): The remote home server to ask.
+            dest (str): The remote homeserver to ask.
             room_id (str): The room_id to backfill.
             limit (int): The maximum number of PDUs to return.
             extremities (list): List of PDU id and origins of the first pdus
@@ -227,7 +227,7 @@ class FederationClient(FederationBase):
         one succeeds.
 
         Args:
-            destinations (list): Which home servers to query
+            destinations (list): Which homeservers to query
             event_id (str): event to fetch
             room_version (str): version of the room
             outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
@@ -312,7 +312,7 @@ class FederationClient(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def get_state_for_room(self, destination, room_id, event_id):
-        """Requests all of the room state at a given event from a remote home server.
+        """Requests all of the room state at a given event from a remote homeserver.
 
         Args:
             destination (str): The remote homeserver to query for the state.
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 44edcabed4..d68b4bd670 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -44,7 +44,7 @@ class TransactionActions(object):
             response code and response body.
         """
         if not transaction.transaction_id:
-            raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
+            raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
         return self.store.get_received_txn_response(transaction.transaction_id, origin)
 
@@ -56,7 +56,7 @@ class TransactionActions(object):
             Deferred
         """
         if not transaction.transaction_id:
-            raise RuntimeError("Cannot persist a transaction with no " "transaction_id")
+            raise RuntimeError("Cannot persist a transaction with no transaction_id")
 
         return self.store.set_received_txn_response(
             transaction.transaction_id, origin, code, response
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 2b2ee8612a..4ebb0e8bc0 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -49,7 +49,7 @@ sent_pdus_destination_dist_count = Counter(
 
 sent_pdus_destination_dist_total = Counter(
     "synapse_federation_client_sent_pdu_destinations:total",
-    "" "Total number of PDUs queued for sending across all destinations",
+    "Total number of PDUs queued for sending across all destinations",
 )
 
 
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 67b3e1ab6e..5fed626d5b 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -84,7 +84,7 @@ class TransactionManager(object):
             txn_id = str(self._next_txn_id)
 
             logger.debug(
-                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                "TX [%s] {%s} Attempting new transaction (pdus: %d, edus: %d)",
                 destination,
                 txn_id,
                 len(pdus),
@@ -103,7 +103,7 @@ class TransactionManager(object):
             self._next_txn_id += 1
 
             logger.info(
-                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                "TX [%s] {%s} Sending transaction [%s], (PDUs: %d, EDUs: %d)",
                 destination,
                 txn_id,
                 transaction.transaction_id,
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index d9fcc520a0..5db733af98 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -14,9 +14,9 @@
 # limitations under the License.
 
 """The transport layer is responsible for both sending transactions to remote
-home servers and receiving a variety of requests from other home servers.
+homeservers and receiving a variety of requests from other homeservers.
 
-By default this is done over HTTPS (and all home servers are required to
+By default this is done over HTTPS (and all homeservers are required to
 support HTTPS), however individual pairings of servers may decide to
 communicate over a different (albeit still reliable) protocol.
 """
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 920fa86853..dc95ab2113 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -44,7 +44,7 @@ class TransportLayerClient(object):
         given event.
 
         Args:
-            destination (str): The host name of the remote home server we want
+            destination (str): The host name of the remote homeserver we want
                 to get the state from.
             context (str): The name of the context we want the state of
             event_id (str): The event we want the context at.
@@ -68,7 +68,7 @@ class TransportLayerClient(object):
         given event. Returns the state's event_id's
 
         Args:
-            destination (str): The host name of the remote home server we want
+            destination (str): The host name of the remote homeserver we want
                 to get the state from.
             context (str): The name of the context we want the state of
             event_id (str): The event we want the context at.
@@ -91,7 +91,7 @@ class TransportLayerClient(object):
         """ Requests the pdu with give id and origin from the given server.
 
         Args:
-            destination (str): The host name of the remote home server we want
+            destination (str): The host name of the remote homeserver we want
                 to get the state from.
             event_id (str): The id of the event being requested.
             timeout (int): How long to try (in ms) the destination for before
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d6c23f22bd..09baa9c57d 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -714,7 +714,7 @@ class PublicRoomList(BaseFederationServlet):
 
     This API returns information in the same format as /publicRooms on the
     client API, but will only ever include local public rooms and hence is
-    intended for consumption by other home servers.
+    intended for consumption by other homeservers.
 
     GET /publicRooms HTTP/1.1
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7a0f54ca24..54a71c49d2 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -102,8 +102,9 @@ class AuthHandler(BaseHandler):
                         login_types.append(t)
         self._supported_login_types = login_types
 
-        self._account_ratelimiter = Ratelimiter()
-        self._failed_attempts_ratelimiter = Ratelimiter()
+        # Ratelimiter for failed auth during UIA. Uses same ratelimit config
+        # as per `rc_login.failed_attempts`.
+        self._failed_uia_attempts_ratelimiter = Ratelimiter()
 
         self._clock = self.hs.get_clock()
 
@@ -133,12 +134,38 @@ class AuthHandler(BaseHandler):
 
             AuthError if the client has completed a login flow, and it gives
                 a different user to `requester`
+
+            LimitExceededError if the ratelimiter's failed request count for this
+                user is too high to proceed
+
         """
 
+        user_id = requester.user.to_string()
+
+        # Check if we should be ratelimited due to too many previous failed attempts
+        self._failed_uia_attempts_ratelimiter.ratelimit(
+            user_id,
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
+        )
+
         # build a list of supported flows
         flows = [[login_type] for login_type in self._supported_login_types]
 
-        result, params, _ = yield self.check_auth(flows, request_body, clientip)
+        try:
+            result, params, _ = yield self.check_auth(flows, request_body, clientip)
+        except LoginError:
+            # Update the ratelimite to say we failed (`can_do_action` doesn't raise).
+            self._failed_uia_attempts_ratelimiter.can_do_action(
+                user_id,
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=True,
+            )
+            raise
 
         # find the completed login type
         for login_type in self._supported_login_types:
@@ -223,7 +250,7 @@ class AuthHandler(BaseHandler):
             # could continue registration from your phone having clicked the
             # email auth link on there). It's probably too open to abuse
             # because it lets unauthenticated clients store arbitrary objects
-            # on a home server.
+            # on a homeserver.
             # Revisit: Assumimg the REST APIs do sensible validation, the data
             # isn't arbintrary.
             session["clientdict"] = clientdict
@@ -501,11 +528,8 @@ class AuthHandler(BaseHandler):
             multiple matches
 
         Raises:
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
             UserDeactivatedError if a user is found but is deactivated.
         """
-        self.ratelimit_login_per_account(user_id)
         res = yield self._find_user_id_and_pwd_hash(user_id)
         if res is not None:
             return res[0]
@@ -572,8 +596,6 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem accessing the database
             SynapseError if there was a problem with the request
             LoginError if there was an authentication problem.
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
         """
 
         if username.startswith("@"):
@@ -581,8 +603,6 @@ class AuthHandler(BaseHandler):
         else:
             qualified_user_id = UserID(username, self.hs.hostname).to_string()
 
-        self.ratelimit_login_per_account(qualified_user_id)
-
         login_type = login_submission.get("type")
         known_login_type = False
 
@@ -650,15 +670,6 @@ class AuthHandler(BaseHandler):
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
 
-        # unknown username or invalid password.
-        self._failed_attempts_ratelimiter.ratelimit(
-            qualified_user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
-            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
-            update=True,
-        )
-
         # We raise a 403 here, but note that if we're doing user-interactive
         # login, it turns all LoginErrors into a 401 anyway.
         raise LoginError(403, "Invalid password", errcode=Codes.FORBIDDEN)
@@ -710,10 +721,6 @@ class AuthHandler(BaseHandler):
         Returns:
             Deferred[unicode] the canonical_user_id, or Deferred[None] if
                 unknown user/bad password
-
-        Raises:
-            LimitExceededError if the ratelimiter's login requests count for this
-                user is too high too proceed.
         """
         lookupres = yield self._find_user_id_and_pwd_hash(user_id)
         if not lookupres:
@@ -742,7 +749,7 @@ class AuthHandler(BaseHandler):
             auth_api.validate_macaroon(macaroon, "login", user_id)
         except Exception:
             raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN)
-        self.ratelimit_login_per_account(user_id)
+
         yield self.auth.check_auth_blocking(user_id)
         return user_id
 
@@ -810,7 +817,7 @@ class AuthHandler(BaseHandler):
     @defer.inlineCallbacks
     def add_threepid(self, user_id, medium, address, validated_at):
         # 'Canonicalise' email addresses down to lower case.
-        # We've now moving towards the Home Server being the entity that
+        # We've now moving towards the homeserver being the entity that
         # is responsible for validating threepids used for resetting passwords
         # on accounts, so in future Synapse will gain knowledge of specific
         # types (mediums) of threepid. For now, we still use the existing
@@ -912,35 +919,6 @@ class AuthHandler(BaseHandler):
         else:
             return defer.succeed(False)
 
-    def ratelimit_login_per_account(self, user_id):
-        """Checks whether the process must be stopped because of ratelimiting.
-
-        Checks against two ratelimiters: the generic one for login attempts per
-        account and the one specific to failed attempts.
-
-        Args:
-            user_id (unicode): complete @user:id
-
-        Raises:
-            LimitExceededError if one of the ratelimiters' login requests count
-                for this user is too high too proceed.
-        """
-        self._failed_attempts_ratelimiter.ratelimit(
-            user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
-            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
-            update=False,
-        )
-
-        self._account_ratelimiter.ratelimit(
-            user_id.lower(),
-            time_now_s=self._clock.time(),
-            rate_hz=self.hs.config.rc_login_account.per_second,
-            burst_count=self.hs.config.rc_login_account.burst_count,
-            update=True,
-        )
-
 
 @attr.s
 class MacaroonGenerator(object):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index c4632f8984..a07d2f1a17 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler(BaseHandler):
             if not service.is_interested_in_alias(room_alias.to_string()):
                 raise SynapseError(
                     400,
-                    "This application service has not reserved" " this kind of alias.",
+                    "This application service has not reserved this kind of alias.",
                     errcode=Codes.EXCLUSIVE,
                 )
         else:
@@ -283,7 +283,7 @@ class DirectoryHandler(BaseHandler):
     def on_directory_query(self, args):
         room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
-            raise SynapseError(400, "Room Alias is not hosted on this Home Server")
+            raise SynapseError(400, "Room Alias is not hosted on this homeserver")
 
         result = yield self.get_association_from_room_alias(room_alias)
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f09a0b73c8..28c12753c1 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
 from synapse.api.errors import CodeMessageException, Codes, NotFoundError, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import (
     UserID,
     get_domain_from_id,
@@ -53,6 +54,12 @@ class E2eKeysHandler(object):
 
         self._edu_updater = SigningKeyEduUpdater(hs, self)
 
+        self._is_master = hs.config.worker_app is None
+        if not self._is_master:
+            self._user_device_resync_client = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
+
         federation_registry = hs.get_federation_registry()
 
         # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
@@ -191,9 +198,15 @@ class E2eKeysHandler(object):
                 # probably be tracking their device lists. However, we haven't
                 # done an initial sync on the device list so we do it now.
                 try:
-                    user_devices = yield self.device_handler.device_list_updater.user_device_resync(
-                        user_id
-                    )
+                    if self._is_master:
+                        user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                            user_id
+                        )
+                    else:
+                        user_devices = yield self._user_device_resync_client(
+                            user_id=user_id
+                        )
+
                     user_devices = user_devices["devices"]
                     for device in user_devices:
                         results[user_id] = {device["device_id"]: device["keys"]}
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 05dd8d2671..0e904f2da0 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -97,9 +97,9 @@ class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
         a) handling received Pdus before handing them on as Events to the rest
-        of the home server (including auth and state conflict resoultion)
+        of the homeserver (including auth and state conflict resoultion)
         b) converting events that were produced by local clients that may need
-        to be sent to remote home servers.
+        to be sent to remote homeservers.
         c) doing the necessary dances to invite remote users and join remote
         rooms.
     """
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 22e0a04da4..1e5a4613c9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -152,7 +152,7 @@ class BaseProfileHandler(BaseHandler):
             by_admin (bool): Whether this change was made by an administrator.
         """
         if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
@@ -207,7 +207,7 @@ class BaseProfileHandler(BaseHandler):
         """target_user is the user whose avatar_url is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if not by_admin and target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
@@ -231,7 +231,7 @@ class BaseProfileHandler(BaseHandler):
     def on_profile_query(self, args):
         user = UserID.from_string(args["user_id"])
         if not self.hs.is_mine(user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         just_field = args.get("field", None)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 235f11c322..95806af41e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -630,7 +630,7 @@ class RegistrationHandler(BaseHandler):
         # And we add an email pusher for them by default, but only
         # if email notifications are enabled (so people don't start
         # getting mail spam where they weren't before if email
-        # notifs are set up on a home server)
+        # notifs are set up on a homeserver)
         if (
             self.hs.config.email_enable_notifs
             and self.hs.config.email_notif_for_new_users
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index ca8ae9fb5b..856337b7e2 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -120,7 +120,7 @@ class TypingHandler(object):
         auth_user_id = auth_user.to_string()
 
         if not self.is_mine_id(target_user_id):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
@@ -150,7 +150,7 @@ class TypingHandler(object):
         auth_user_id = auth_user.to_string()
 
         if not self.is_mine_id(target_user_id):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+            raise SynapseError(400, "User is not hosted on this homeserver")
 
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 691380abda..16765d54e0 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -530,7 +530,7 @@ class MatrixFederationHttpClient(object):
         """
         Builds the Authorization headers for a federation request
         Args:
-            destination (bytes|None): The desination home server of the request.
+            destination (bytes|None): The desination homeserver of the request.
                 May be None if the destination is an identity server, in which case
                 destination_is must be non-None.
             method (bytes): The HTTP method of the request
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index e9a5e46ced..13fcb408a6 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -96,7 +96,7 @@ def parse_boolean_from_args(args, name, default=None, required=False):
             return {b"true": True, b"false": False}[args[name][0]]
         except Exception:
             message = (
-                "Boolean query parameter %r must be one of" " ['true', 'false']"
+                "Boolean query parameter %r must be one of ['true', 'false']"
             ) % (name,)
             raise SynapseError(400, message)
     else:
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 334ddaf39a..ffa7b20ca8 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -261,6 +261,18 @@ def parse_drain_configs(
             )
 
 
+class StoppableLogPublisher(LogPublisher):
+    """
+    A log publisher that can tell its observers to shut down any external
+    communications.
+    """
+
+    def stop(self):
+        for obs in self._observers:
+            if hasattr(obs, "stop"):
+                obs.stop()
+
+
 def setup_structured_logging(
     hs,
     config,
@@ -336,7 +348,7 @@ def setup_structured_logging(
             # We should never get here, but, just in case, throw an error.
             raise ConfigError("%s drain type cannot be configured" % (observer.type,))
 
-    publisher = LogPublisher(*observers)
+    publisher = StoppableLogPublisher(*observers)
     log_filter = LogLevelFilterPredicate()
 
     for namespace, namespace_config in log_config.get(
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 0ebbde06f2..05fc64f409 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -17,25 +17,29 @@
 Log formatters that output terse JSON.
 """
 
+import json
 import sys
+import traceback
 from collections import deque
 from ipaddress import IPv4Address, IPv6Address, ip_address
 from math import floor
-from typing import IO
+from typing import IO, Optional
 
 import attr
-from simplejson import dumps
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
+from twisted.internet.defer import Deferred
 from twisted.internet.endpoints import (
     HostnameEndpoint,
     TCP4ClientEndpoint,
     TCP6ClientEndpoint,
 )
+from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
 from twisted.logger import FileLogObserver, ILogObserver, Logger
-from twisted.python.failure import Failure
+
+_encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
 
 
 def flatten_event(event: dict, metadata: dict, include_time: bool = False):
@@ -141,19 +145,57 @@ def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogOb
 
     def formatEvent(_event: dict) -> str:
         flattened = flatten_event(_event, metadata)
-        return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
+        return _encoder.encode(flattened) + "\n"
 
     return FileLogObserver(outFile, formatEvent)
 
 
 @attr.s
+@implementer(IPushProducer)
+class LogProducer(object):
+    """
+    An IPushProducer that writes logs from its buffer to its transport when it
+    is resumed.
+
+    Args:
+        buffer: Log buffer to read logs from.
+        transport: Transport to write to.
+    """
+
+    transport = attr.ib(type=ITransport)
+    _buffer = attr.ib(type=deque)
+    _paused = attr.ib(default=False, type=bool, init=False)
+
+    def pauseProducing(self):
+        self._paused = True
+
+    def stopProducing(self):
+        self._paused = True
+        self._buffer = None
+
+    def resumeProducing(self):
+        self._paused = False
+
+        while self._paused is False and (self._buffer and self.transport.connected):
+            try:
+                event = self._buffer.popleft()
+                self.transport.write(_encoder.encode(event).encode("utf8"))
+                self.transport.write(b"\n")
+            except Exception:
+                # Something has gone wrong writing to the transport -- log it
+                # and break out of the while.
+                traceback.print_exc(file=sys.__stderr__)
+                break
+
+
+@attr.s
 @implementer(ILogObserver)
 class TerseJSONToTCPLogObserver(object):
     """
     An IObserver that writes JSON logs to a TCP target.
 
     Args:
-        hs (HomeServer): The Homeserver that is being logged for.
+        hs (HomeServer): The homeserver that is being logged for.
         host: The host of the logging target.
         port: The logging target's port.
         metadata: Metadata to be added to each log entry.
@@ -165,8 +207,9 @@ class TerseJSONToTCPLogObserver(object):
     metadata = attr.ib(type=dict)
     maximum_buffer = attr.ib(type=int)
     _buffer = attr.ib(default=attr.Factory(deque), type=deque)
-    _writer = attr.ib(default=None)
+    _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
     _logger = attr.ib(default=attr.Factory(Logger))
+    _producer = attr.ib(default=None, type=Optional[LogProducer])
 
     def start(self) -> None:
 
@@ -187,38 +230,43 @@ class TerseJSONToTCPLogObserver(object):
         factory = Factory.forProtocol(Protocol)
         self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
         self._service.startService()
+        self._connect()
 
-    def _write_loop(self) -> None:
+    def stop(self):
+        self._service.stopService()
+
+    def _connect(self) -> None:
         """
-        Implement the write loop.
+        Triggers an attempt to connect then write to the remote if not already writing.
         """
-        if self._writer:
+        if self._connection_waiter:
             return
 
-        self._writer = self._service.whenConnected()
+        self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
+
+        @self._connection_waiter.addErrback
+        def fail(r):
+            r.printTraceback(file=sys.__stderr__)
+            self._connection_waiter = None
+            self._connect()
 
-        @self._writer.addBoth
+        @self._connection_waiter.addCallback
         def writer(r):
-            if isinstance(r, Failure):
-                r.printTraceback(file=sys.__stderr__)
-                self._writer = None
-                self.hs.get_reactor().callLater(1, self._write_loop)
+            # We have a connection. If we already have a producer, and its
+            # transport is the same, just trigger a resumeProducing.
+            if self._producer and r.transport is self._producer.transport:
+                self._producer.resumeProducing()
                 return
 
-            try:
-                for event in self._buffer:
-                    r.transport.write(
-                        dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
-                            "utf8"
-                        )
-                    )
-                    r.transport.write(b"\n")
-                self._buffer.clear()
-            except Exception as e:
-                sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
-
-            self._writer = False
-            self.hs.get_reactor().callLater(1, self._write_loop)
+            # If the producer is still producing, stop it.
+            if self._producer:
+                self._producer.stopProducing()
+
+            # Make a new producer and start it.
+            self._producer = LogProducer(buffer=self._buffer, transport=r.transport)
+            r.transport.registerProducer(self._producer, True)
+            self._producer.resumeProducing()
+            self._connection_waiter = None
 
     def _handle_pressure(self) -> None:
         """
@@ -277,4 +325,4 @@ class TerseJSONToTCPLogObserver(object):
             self._logger.failure("Failed clearing backpressure")
 
         # Try and write immediately.
-        self._write_loop()
+        self._connect()
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index e994037be6..d0879b0490 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -246,7 +246,7 @@ class HttpPusher(object):
                     # fixed, we don't suddenly deliver a load
                     # of old notifications.
                     logger.warning(
-                        "Giving up on a notification to user %s, " "pushkey %s",
+                        "Giving up on a notification to user %s, pushkey %s",
                         self.user_id,
                         self.pushkey,
                     )
@@ -299,8 +299,7 @@ class HttpPusher(object):
                     # for sanity, we only remove the pushkey if it
                     # was the one we actually sent...
                     logger.warning(
-                        ("Ignoring rejected pushkey %s because we" " didn't send it"),
-                        pk,
+                        ("Ignoring rejected pushkey %s because we didn't send it"), pk,
                     )
                 else:
                     logger.info("Pushkey %s was rejected: removing", pk)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 1d15a06a58..b13b646bfd 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
 
 
 MESSAGE_FROM_PERSON_IN_ROOM = (
-    "You have a message on %(app)s from %(person)s " "in the %(room)s room..."
+    "You have a message on %(app)s from %(person)s in the %(room)s room..."
 )
 MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
 MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
@@ -55,7 +55,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = (
     "You have messages on %(app)s from %(person)s and others..."
 )
 INVITE_FROM_PERSON_TO_ROOM = (
-    "%(person)s has invited you to join the " "%(room)s room on %(app)s..."
+    "%(person)s has invited you to join the %(room)s room on %(app)s..."
 )
 INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..."
 
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 81b85352b1..28dbc6fcba 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,14 @@
 # limitations under the License.
 
 from synapse.http.server import JsonResource
-from synapse.replication.http import federation, login, membership, register, send_event
+from synapse.replication.http import (
+    devices,
+    federation,
+    login,
+    membership,
+    register,
+    send_event,
+)
 
 REPLICATION_PREFIX = "/_synapse/replication"
 
@@ -30,3 +37,4 @@ class ReplicationRestResource(JsonResource):
         federation.register_servlets(hs, self)
         login.register_servlets(hs, self)
         register.register_servlets(hs, self)
+        devices.register_servlets(hs, self)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
new file mode 100644
index 0000000000..e32aac0a25
--- /dev/null
+++ b/synapse/replication/http/devices.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
+    """Ask master to resync the device list for a user by contacting their
+    server.
+
+    This must happen on master so that the results can be correctly cached in
+    the database and streamed to workers.
+
+    Request format:
+
+        POST /_synapse/replication/user_device_resync/:user_id
+
+        {}
+
+    Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
+    response, e.g.:
+
+        {
+            "user_id": "@alice:example.org",
+            "devices": [
+                {
+                    "device_id": "JLAFKJWSCS",
+                    "keys": { ... },
+                    "device_display_name": "Alice's Mobile Phone"
+                }
+            ]
+        }
+    """
+
+    NAME = "user_device_resync"
+    PATH_ARGS = ("user_id",)
+    CACHE = False
+
+    def __init__(self, hs):
+        super(ReplicationUserDevicesResyncRestServlet, self).__init__(hs)
+
+        self.device_list_updater = hs.get_device_handler().device_list_updater
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    def _serialize_payload(user_id):
+        return {}
+
+    async def _handle_request(self, request, user_id):
+        user_devices = await self.device_list_updater.user_device_resync(user_id)
+
+        return 200, user_devices
+
+
+def register_servlets(hs, http_server):
+    ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5c2a2eb593..68a59a3424 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -14,62 +14,39 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import hashlib
-import hmac
 import logging
 import platform
 import re
 
-from six import text_type
-from six.moves import http_client
-
 import synapse
-from synapse.api.constants import Membership, UserTypes
 from synapse.api.errors import Codes, NotFoundError, SynapseError
 from synapse.http.server import JsonResource
-from synapse.http.servlet import (
-    RestServlet,
-    assert_params_in_dict,
-    parse_integer,
-    parse_json_object_from_request,
-    parse_string,
-)
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.rest.admin._base import (
     assert_requester_is_admin,
-    assert_user_is_admin,
     historical_admin_path_patterns,
 )
+from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
 from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
 from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
+from synapse.rest.admin.rooms import ShutdownRoomRestServlet
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
-from synapse.rest.admin.users import UserAdminServlet
-from synapse.types import UserID, create_requester
-from synapse.util.async_helpers import maybe_awaitable
+from synapse.rest.admin.users import (
+    AccountValidityRenewServlet,
+    DeactivateAccountRestServlet,
+    GetUsersPaginatedRestServlet,
+    ResetPasswordRestServlet,
+    SearchUsersRestServlet,
+    UserAdminServlet,
+    UserRegisterServlet,
+    UsersRestServlet,
+    WhoisRestServlet,
+)
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger(__name__)
 
 
-class UsersRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, user_id):
-        target_user = UserID.from_string(user_id)
-        await assert_requester_is_admin(self.auth, request)
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only users a local user")
-
-        ret = await self.handlers.admin_handler.get_users()
-
-        return 200, ret
-
-
 class VersionServlet(RestServlet):
     PATTERNS = (re.compile("^/_synapse/admin/v1/server_version$"),)
 
@@ -83,159 +60,6 @@ class VersionServlet(RestServlet):
         return 200, self.res
 
 
-class UserRegisterServlet(RestServlet):
-    """
-    Attributes:
-         NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
-         nonces (dict[str, int]): The nonces that we will accept. A dict of
-             nonce to the time it was generated, in int seconds.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/register")
-    NONCE_TIMEOUT = 60
-
-    def __init__(self, hs):
-        self.handlers = hs.get_handlers()
-        self.reactor = hs.get_reactor()
-        self.nonces = {}
-        self.hs = hs
-
-    def _clear_old_nonces(self):
-        """
-        Clear out old nonces that are older than NONCE_TIMEOUT.
-        """
-        now = int(self.reactor.seconds())
-
-        for k, v in list(self.nonces.items()):
-            if now - v > self.NONCE_TIMEOUT:
-                del self.nonces[k]
-
-    def on_GET(self, request):
-        """
-        Generate a new nonce.
-        """
-        self._clear_old_nonces()
-
-        nonce = self.hs.get_secrets().token_hex(64)
-        self.nonces[nonce] = int(self.reactor.seconds())
-        return 200, {"nonce": nonce}
-
-    async def on_POST(self, request):
-        self._clear_old_nonces()
-
-        if not self.hs.config.registration_shared_secret:
-            raise SynapseError(400, "Shared secret registration is not enabled")
-
-        body = parse_json_object_from_request(request)
-
-        if "nonce" not in body:
-            raise SynapseError(400, "nonce must be specified", errcode=Codes.BAD_JSON)
-
-        nonce = body["nonce"]
-
-        if nonce not in self.nonces:
-            raise SynapseError(400, "unrecognised nonce")
-
-        # Delete the nonce, so it can't be reused, even if it's invalid
-        del self.nonces[nonce]
-
-        if "username" not in body:
-            raise SynapseError(
-                400, "username must be specified", errcode=Codes.BAD_JSON
-            )
-        else:
-            if (
-                not isinstance(body["username"], text_type)
-                or len(body["username"]) > 512
-            ):
-                raise SynapseError(400, "Invalid username")
-
-            username = body["username"].encode("utf-8")
-            if b"\x00" in username:
-                raise SynapseError(400, "Invalid username")
-
-        if "password" not in body:
-            raise SynapseError(
-                400, "password must be specified", errcode=Codes.BAD_JSON
-            )
-        else:
-            if (
-                not isinstance(body["password"], text_type)
-                or len(body["password"]) > 512
-            ):
-                raise SynapseError(400, "Invalid password")
-
-            password = body["password"].encode("utf-8")
-            if b"\x00" in password:
-                raise SynapseError(400, "Invalid password")
-
-        admin = body.get("admin", None)
-        user_type = body.get("user_type", None)
-
-        if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
-            raise SynapseError(400, "Invalid user type")
-
-        got_mac = body["mac"]
-
-        want_mac = hmac.new(
-            key=self.hs.config.registration_shared_secret.encode(),
-            digestmod=hashlib.sha1,
-        )
-        want_mac.update(nonce.encode("utf8"))
-        want_mac.update(b"\x00")
-        want_mac.update(username)
-        want_mac.update(b"\x00")
-        want_mac.update(password)
-        want_mac.update(b"\x00")
-        want_mac.update(b"admin" if admin else b"notadmin")
-        if user_type:
-            want_mac.update(b"\x00")
-            want_mac.update(user_type.encode("utf8"))
-        want_mac = want_mac.hexdigest()
-
-        if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
-            raise SynapseError(403, "HMAC incorrect")
-
-        # Reuse the parts of RegisterRestServlet to reduce code duplication
-        from synapse.rest.client.v2_alpha.register import RegisterRestServlet
-
-        register = RegisterRestServlet(self.hs)
-
-        user_id = await register.registration_handler.register_user(
-            localpart=body["username"].lower(),
-            password=body["password"],
-            admin=bool(admin),
-            user_type=user_type,
-        )
-
-        result = await register._create_registration_details(user_id, body)
-        return 200, result
-
-
-class WhoisRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/whois/(?P<user_id>[^/]*)")
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, user_id):
-        target_user = UserID.from_string(user_id)
-        requester = await self.auth.get_user_by_req(request)
-        auth_user = requester.user
-
-        if target_user != auth_user:
-            await assert_user_is_admin(self.auth, auth_user)
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only whois a local user")
-
-        ret = await self.handlers.admin_handler.get_whois(target_user)
-
-        return 200, ret
-
-
 class PurgeHistoryRestServlet(RestServlet):
     PATTERNS = historical_admin_path_patterns(
         "/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@@ -342,369 +166,6 @@ class PurgeHistoryStatusRestServlet(RestServlet):
         return 200, purge_status.asdict()
 
 
-class DeactivateAccountRestServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/deactivate/(?P<target_user_id>[^/]*)")
-
-    def __init__(self, hs):
-        self._deactivate_account_handler = hs.get_deactivate_account_handler()
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request, target_user_id):
-        await assert_requester_is_admin(self.auth, request)
-        body = parse_json_object_from_request(request, allow_empty_body=True)
-        erase = body.get("erase", False)
-        if not isinstance(erase, bool):
-            raise SynapseError(
-                http_client.BAD_REQUEST,
-                "Param 'erase' must be a boolean, if given",
-                Codes.BAD_JSON,
-            )
-
-        UserID.from_string(target_user_id)
-
-        result = await self._deactivate_account_handler.deactivate_account(
-            target_user_id, erase
-        )
-        if result:
-            id_server_unbind_result = "success"
-        else:
-            id_server_unbind_result = "no-support"
-
-        return 200, {"id_server_unbind_result": id_server_unbind_result}
-
-
-class ShutdownRoomRestServlet(RestServlet):
-    """Shuts down a room by removing all local users from the room and blocking
-    all future invites and joins to the room. Any local aliases will be repointed
-    to a new room created by `new_room_user_id` and kicked users will be auto
-    joined to the new room.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/shutdown_room/(?P<room_id>[^/]+)")
-
-    DEFAULT_MESSAGE = (
-        "Sharing illegal content on this server is not permitted and rooms in"
-        " violation will be blocked."
-    )
-
-    def __init__(self, hs):
-        self.hs = hs
-        self.store = hs.get_datastore()
-        self.state = hs.get_state_handler()
-        self._room_creation_handler = hs.get_room_creation_handler()
-        self.event_creation_handler = hs.get_event_creation_handler()
-        self.room_member_handler = hs.get_room_member_handler()
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request, room_id):
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        content = parse_json_object_from_request(request)
-        assert_params_in_dict(content, ["new_room_user_id"])
-        new_room_user_id = content["new_room_user_id"]
-
-        room_creator_requester = create_requester(new_room_user_id)
-
-        message = content.get("message", self.DEFAULT_MESSAGE)
-        room_name = content.get("room_name", "Content Violation Notification")
-
-        info = await self._room_creation_handler.create_room(
-            room_creator_requester,
-            config={
-                "preset": "public_chat",
-                "name": room_name,
-                "power_level_content_override": {"users_default": -10},
-            },
-            ratelimit=False,
-        )
-        new_room_id = info["room_id"]
-
-        requester_user_id = requester.user.to_string()
-
-        logger.info(
-            "Shutting down room %r, joining to new room: %r", room_id, new_room_id
-        )
-
-        # This will work even if the room is already blocked, but that is
-        # desirable in case the first attempt at blocking the room failed below.
-        await self.store.block_room(room_id, requester_user_id)
-
-        users = await self.state.get_current_users_in_room(room_id)
-        kicked_users = []
-        failed_to_kick_users = []
-        for user_id in users:
-            if not self.hs.is_mine_id(user_id):
-                continue
-
-            logger.info("Kicking %r from %r...", user_id, room_id)
-
-            try:
-                target_requester = create_requester(user_id)
-                await self.room_member_handler.update_membership(
-                    requester=target_requester,
-                    target=target_requester.user,
-                    room_id=room_id,
-                    action=Membership.LEAVE,
-                    content={},
-                    ratelimit=False,
-                    require_consent=False,
-                )
-
-                await self.room_member_handler.forget(target_requester.user, room_id)
-
-                await self.room_member_handler.update_membership(
-                    requester=target_requester,
-                    target=target_requester.user,
-                    room_id=new_room_id,
-                    action=Membership.JOIN,
-                    content={},
-                    ratelimit=False,
-                    require_consent=False,
-                )
-
-                kicked_users.append(user_id)
-            except Exception:
-                logger.exception(
-                    "Failed to leave old room and join new room for %r", user_id
-                )
-                failed_to_kick_users.append(user_id)
-
-        await self.event_creation_handler.create_and_send_nonmember_event(
-            room_creator_requester,
-            {
-                "type": "m.room.message",
-                "content": {"body": message, "msgtype": "m.text"},
-                "room_id": new_room_id,
-                "sender": new_room_user_id,
-            },
-            ratelimit=False,
-        )
-
-        aliases_for_room = await maybe_awaitable(
-            self.store.get_aliases_for_room(room_id)
-        )
-
-        await self.store.update_aliases_for_room(
-            room_id, new_room_id, requester_user_id
-        )
-
-        return (
-            200,
-            {
-                "kicked_users": kicked_users,
-                "failed_to_kick_users": failed_to_kick_users,
-                "local_aliases": aliases_for_room,
-                "new_room_id": new_room_id,
-            },
-        )
-
-
-class ResetPasswordRestServlet(RestServlet):
-    """Post request to allow an administrator reset password for a user.
-    This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/reset_password/
-            @user:to_reset_password?access_token=admin_access_token
-        JsonBodyToSend:
-            {
-                "new_password": "secret"
-            }
-        Returns:
-            200 OK with empty object if success otherwise an error.
-        """
-
-    PATTERNS = historical_admin_path_patterns(
-        "/reset_password/(?P<target_user_id>[^/]*)"
-    )
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self._set_password_handler = hs.get_set_password_handler()
-
-    async def on_POST(self, request, target_user_id):
-        """Post request to allow an administrator reset password for a user.
-        This needs user to have administrator access in Synapse.
-        """
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        UserID.from_string(target_user_id)
-
-        params = parse_json_object_from_request(request)
-        assert_params_in_dict(params, ["new_password"])
-        new_password = params["new_password"]
-
-        await self._set_password_handler.set_password(
-            target_user_id, new_password, requester
-        )
-        return 200, {}
-
-
-class GetUsersPaginatedRestServlet(RestServlet):
-    """Get request to get specific number of users from Synapse.
-    This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/users_paginate/
-            @admin:user?access_token=admin_access_token&start=0&limit=10
-        Returns:
-            200 OK with json object {list[dict[str, Any]], count} or empty object.
-        """
-
-    PATTERNS = historical_admin_path_patterns(
-        "/users_paginate/(?P<target_user_id>[^/]*)"
-    )
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, target_user_id):
-        """Get request to get specific number of users from Synapse.
-        This needs user to have administrator access in Synapse.
-        """
-        await assert_requester_is_admin(self.auth, request)
-
-        target_user = UserID.from_string(target_user_id)
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only users a local user")
-
-        order = "name"  # order by name in user table
-        start = parse_integer(request, "start", required=True)
-        limit = parse_integer(request, "limit", required=True)
-
-        logger.info("limit: %s, start: %s", limit, start)
-
-        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
-        return 200, ret
-
-    async def on_POST(self, request, target_user_id):
-        """Post request to get specific number of users from Synapse..
-        This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/users_paginate/
-            @admin:user?access_token=admin_access_token
-        JsonBodyToSend:
-            {
-                "start": "0",
-                "limit": "10
-            }
-        Returns:
-            200 OK with json object {list[dict[str, Any]], count} or empty object.
-        """
-        await assert_requester_is_admin(self.auth, request)
-        UserID.from_string(target_user_id)
-
-        order = "name"  # order by name in user table
-        params = parse_json_object_from_request(request)
-        assert_params_in_dict(params, ["limit", "start"])
-        limit = params["limit"]
-        start = params["start"]
-        logger.info("limit: %s, start: %s", limit, start)
-
-        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
-        return 200, ret
-
-
-class SearchUsersRestServlet(RestServlet):
-    """Get request to search user table for specific users according to
-    search term.
-    This needs user to have administrator access in Synapse.
-        Example:
-            http://localhost:8008/_synapse/admin/v1/search_users/
-            @admin:user?access_token=admin_access_token&term=alice
-        Returns:
-            200 OK with json object {list[dict[str, Any]], count} or empty object.
-    """
-
-    PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
-
-    def __init__(self, hs):
-        self.store = hs.get_datastore()
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
-
-    async def on_GET(self, request, target_user_id):
-        """Get request to search user table for specific users according to
-        search term.
-        This needs user to have a administrator access in Synapse.
-        """
-        await assert_requester_is_admin(self.auth, request)
-
-        target_user = UserID.from_string(target_user_id)
-
-        # To allow all users to get the users list
-        # if not is_admin and target_user != auth_user:
-        #     raise AuthError(403, "You are not a server admin")
-
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "Can only users a local user")
-
-        term = parse_string(request, "term", required=True)
-        logger.info("term: %s ", term)
-
-        ret = await self.handlers.admin_handler.search_users(term)
-        return 200, ret
-
-
-class DeleteGroupAdminRestServlet(RestServlet):
-    """Allows deleting of local groups
-    """
-
-    PATTERNS = historical_admin_path_patterns("/delete_group/(?P<group_id>[^/]*)")
-
-    def __init__(self, hs):
-        self.group_server = hs.get_groups_server_handler()
-        self.is_mine_id = hs.is_mine_id
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request, group_id):
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        if not self.is_mine_id(group_id):
-            raise SynapseError(400, "Can only delete local groups")
-
-        await self.group_server.delete_group(group_id, requester.user.to_string())
-        return 200, {}
-
-
-class AccountValidityRenewServlet(RestServlet):
-    PATTERNS = historical_admin_path_patterns("/account_validity/validity$")
-
-    def __init__(self, hs):
-        """
-        Args:
-            hs (synapse.server.HomeServer): server
-        """
-        self.hs = hs
-        self.account_activity_handler = hs.get_account_validity_handler()
-        self.auth = hs.get_auth()
-
-    async def on_POST(self, request):
-        await assert_requester_is_admin(self.auth, request)
-
-        body = parse_json_object_from_request(request)
-
-        if "user_id" not in body:
-            raise SynapseError(400, "Missing property 'user_id' in the request body")
-
-        expiration_ts = await self.account_activity_handler.renew_account_for_user(
-            body["user_id"],
-            body.get("expiration_ts"),
-            not body.get("enable_renewal_emails", True),
-        )
-
-        res = {"expiration_ts": expiration_ts}
-        return 200, res
-
-
 ########################################################################################
 #
 # please don't add more servlets here: this file is already long and unwieldy. Put
diff --git a/synapse/rest/admin/groups.py b/synapse/rest/admin/groups.py
new file mode 100644
index 0000000000..0b54ca09f4
--- /dev/null
+++ b/synapse/rest/admin/groups.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import RestServlet
+from synapse.rest.admin._base import (
+    assert_user_is_admin,
+    historical_admin_path_patterns,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class DeleteGroupAdminRestServlet(RestServlet):
+    """Allows deleting of local groups
+    """
+
+    PATTERNS = historical_admin_path_patterns("/delete_group/(?P<group_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.group_server = hs.get_groups_server_handler()
+        self.is_mine_id = hs.is_mine_id
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, group_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        if not self.is_mine_id(group_id):
+            raise SynapseError(400, "Can only delete local groups")
+
+        await self.group_server.delete_group(group_id, requester.user.to_string())
+        return 200, {}
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
new file mode 100644
index 0000000000..f7cc5e9be9
--- /dev/null
+++ b/synapse/rest/admin/rooms.py
@@ -0,0 +1,157 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from synapse.api.constants import Membership
+from synapse.http.servlet import (
+    RestServlet,
+    assert_params_in_dict,
+    parse_json_object_from_request,
+)
+from synapse.rest.admin._base import (
+    assert_user_is_admin,
+    historical_admin_path_patterns,
+)
+from synapse.types import create_requester
+from synapse.util.async_helpers import maybe_awaitable
+
+logger = logging.getLogger(__name__)
+
+
+class ShutdownRoomRestServlet(RestServlet):
+    """Shuts down a room by removing all local users from the room and blocking
+    all future invites and joins to the room. Any local aliases will be repointed
+    to a new room created by `new_room_user_id` and kicked users will be auto
+    joined to the new room.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/shutdown_room/(?P<room_id>[^/]+)")
+
+    DEFAULT_MESSAGE = (
+        "Sharing illegal content on this server is not permitted and rooms in"
+        " violation will be blocked."
+    )
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state = hs.get_state_handler()
+        self._room_creation_handler = hs.get_room_creation_handler()
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, room_id):
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        content = parse_json_object_from_request(request)
+        assert_params_in_dict(content, ["new_room_user_id"])
+        new_room_user_id = content["new_room_user_id"]
+
+        room_creator_requester = create_requester(new_room_user_id)
+
+        message = content.get("message", self.DEFAULT_MESSAGE)
+        room_name = content.get("room_name", "Content Violation Notification")
+
+        info = await self._room_creation_handler.create_room(
+            room_creator_requester,
+            config={
+                "preset": "public_chat",
+                "name": room_name,
+                "power_level_content_override": {"users_default": -10},
+            },
+            ratelimit=False,
+        )
+        new_room_id = info["room_id"]
+
+        requester_user_id = requester.user.to_string()
+
+        logger.info(
+            "Shutting down room %r, joining to new room: %r", room_id, new_room_id
+        )
+
+        # This will work even if the room is already blocked, but that is
+        # desirable in case the first attempt at blocking the room failed below.
+        await self.store.block_room(room_id, requester_user_id)
+
+        users = await self.state.get_current_users_in_room(room_id)
+        kicked_users = []
+        failed_to_kick_users = []
+        for user_id in users:
+            if not self.hs.is_mine_id(user_id):
+                continue
+
+            logger.info("Kicking %r from %r...", user_id, room_id)
+
+            try:
+                target_requester = create_requester(user_id)
+                await self.room_member_handler.update_membership(
+                    requester=target_requester,
+                    target=target_requester.user,
+                    room_id=room_id,
+                    action=Membership.LEAVE,
+                    content={},
+                    ratelimit=False,
+                    require_consent=False,
+                )
+
+                await self.room_member_handler.forget(target_requester.user, room_id)
+
+                await self.room_member_handler.update_membership(
+                    requester=target_requester,
+                    target=target_requester.user,
+                    room_id=new_room_id,
+                    action=Membership.JOIN,
+                    content={},
+                    ratelimit=False,
+                    require_consent=False,
+                )
+
+                kicked_users.append(user_id)
+            except Exception:
+                logger.exception(
+                    "Failed to leave old room and join new room for %r", user_id
+                )
+                failed_to_kick_users.append(user_id)
+
+        await self.event_creation_handler.create_and_send_nonmember_event(
+            room_creator_requester,
+            {
+                "type": "m.room.message",
+                "content": {"body": message, "msgtype": "m.text"},
+                "room_id": new_room_id,
+                "sender": new_room_user_id,
+            },
+            ratelimit=False,
+        )
+
+        aliases_for_room = await maybe_awaitable(
+            self.store.get_aliases_for_room(room_id)
+        )
+
+        await self.store.update_aliases_for_room(
+            room_id, new_room_id, requester_user_id
+        )
+
+        return (
+            200,
+            {
+                "kicked_users": kicked_users,
+                "failed_to_kick_users": failed_to_kick_users,
+                "local_aliases": aliases_for_room,
+                "new_room_id": new_room_id,
+            },
+        )
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index d5d124a0dc..58a83f93af 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -12,17 +12,419 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import hashlib
+import hmac
+import logging
 import re
 
-from synapse.api.errors import SynapseError
+from six import text_type
+from six.moves import http_client
+
+from synapse.api.constants import UserTypes
+from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import (
     RestServlet,
     assert_params_in_dict,
+    parse_integer,
     parse_json_object_from_request,
+    parse_string,
+)
+from synapse.rest.admin._base import (
+    assert_requester_is_admin,
+    assert_user_is_admin,
+    historical_admin_path_patterns,
 )
-from synapse.rest.admin import assert_requester_is_admin, assert_user_is_admin
 from synapse.types import UserID
 
+logger = logging.getLogger(__name__)
+
+
+class UsersRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.admin_handler = hs.get_handlers().admin_handler
+
+    async def on_GET(self, request, user_id):
+        target_user = UserID.from_string(user_id)
+        await assert_requester_is_admin(self.auth, request)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        ret = await self.admin_handler.get_users()
+
+        return 200, ret
+
+
+class GetUsersPaginatedRestServlet(RestServlet):
+    """Get request to get specific number of users from Synapse.
+    This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/users_paginate/
+            @admin:user?access_token=admin_access_token&start=0&limit=10
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+        """
+
+    PATTERNS = historical_admin_path_patterns(
+        "/users_paginate/(?P<target_user_id>[^/]*)"
+    )
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    async def on_GET(self, request, target_user_id):
+        """Get request to get specific number of users from Synapse.
+        This needs user to have administrator access in Synapse.
+        """
+        await assert_requester_is_admin(self.auth, request)
+
+        target_user = UserID.from_string(target_user_id)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        order = "name"  # order by name in user table
+        start = parse_integer(request, "start", required=True)
+        limit = parse_integer(request, "limit", required=True)
+
+        logger.info("limit: %s, start: %s", limit, start)
+
+        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
+        return 200, ret
+
+    async def on_POST(self, request, target_user_id):
+        """Post request to get specific number of users from Synapse..
+        This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/users_paginate/
+            @admin:user?access_token=admin_access_token
+        JsonBodyToSend:
+            {
+                "start": "0",
+                "limit": "10
+            }
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+        """
+        await assert_requester_is_admin(self.auth, request)
+        UserID.from_string(target_user_id)
+
+        order = "name"  # order by name in user table
+        params = parse_json_object_from_request(request)
+        assert_params_in_dict(params, ["limit", "start"])
+        limit = params["limit"]
+        start = params["start"]
+        logger.info("limit: %s, start: %s", limit, start)
+
+        ret = await self.handlers.admin_handler.get_users_paginate(order, start, limit)
+        return 200, ret
+
+
+class UserRegisterServlet(RestServlet):
+    """
+    Attributes:
+         NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
+         nonces (dict[str, int]): The nonces that we will accept. A dict of
+             nonce to the time it was generated, in int seconds.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/register")
+    NONCE_TIMEOUT = 60
+
+    def __init__(self, hs):
+        self.handlers = hs.get_handlers()
+        self.reactor = hs.get_reactor()
+        self.nonces = {}
+        self.hs = hs
+
+    def _clear_old_nonces(self):
+        """
+        Clear out old nonces that are older than NONCE_TIMEOUT.
+        """
+        now = int(self.reactor.seconds())
+
+        for k, v in list(self.nonces.items()):
+            if now - v > self.NONCE_TIMEOUT:
+                del self.nonces[k]
+
+    def on_GET(self, request):
+        """
+        Generate a new nonce.
+        """
+        self._clear_old_nonces()
+
+        nonce = self.hs.get_secrets().token_hex(64)
+        self.nonces[nonce] = int(self.reactor.seconds())
+        return 200, {"nonce": nonce}
+
+    async def on_POST(self, request):
+        self._clear_old_nonces()
+
+        if not self.hs.config.registration_shared_secret:
+            raise SynapseError(400, "Shared secret registration is not enabled")
+
+        body = parse_json_object_from_request(request)
+
+        if "nonce" not in body:
+            raise SynapseError(400, "nonce must be specified", errcode=Codes.BAD_JSON)
+
+        nonce = body["nonce"]
+
+        if nonce not in self.nonces:
+            raise SynapseError(400, "unrecognised nonce")
+
+        # Delete the nonce, so it can't be reused, even if it's invalid
+        del self.nonces[nonce]
+
+        if "username" not in body:
+            raise SynapseError(
+                400, "username must be specified", errcode=Codes.BAD_JSON
+            )
+        else:
+            if (
+                not isinstance(body["username"], text_type)
+                or len(body["username"]) > 512
+            ):
+                raise SynapseError(400, "Invalid username")
+
+            username = body["username"].encode("utf-8")
+            if b"\x00" in username:
+                raise SynapseError(400, "Invalid username")
+
+        if "password" not in body:
+            raise SynapseError(
+                400, "password must be specified", errcode=Codes.BAD_JSON
+            )
+        else:
+            if (
+                not isinstance(body["password"], text_type)
+                or len(body["password"]) > 512
+            ):
+                raise SynapseError(400, "Invalid password")
+
+            password = body["password"].encode("utf-8")
+            if b"\x00" in password:
+                raise SynapseError(400, "Invalid password")
+
+        admin = body.get("admin", None)
+        user_type = body.get("user_type", None)
+
+        if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
+            raise SynapseError(400, "Invalid user type")
+
+        got_mac = body["mac"]
+
+        want_mac = hmac.new(
+            key=self.hs.config.registration_shared_secret.encode(),
+            digestmod=hashlib.sha1,
+        )
+        want_mac.update(nonce.encode("utf8"))
+        want_mac.update(b"\x00")
+        want_mac.update(username)
+        want_mac.update(b"\x00")
+        want_mac.update(password)
+        want_mac.update(b"\x00")
+        want_mac.update(b"admin" if admin else b"notadmin")
+        if user_type:
+            want_mac.update(b"\x00")
+            want_mac.update(user_type.encode("utf8"))
+        want_mac = want_mac.hexdigest()
+
+        if not hmac.compare_digest(want_mac.encode("ascii"), got_mac.encode("ascii")):
+            raise SynapseError(403, "HMAC incorrect")
+
+        # Reuse the parts of RegisterRestServlet to reduce code duplication
+        from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+
+        register = RegisterRestServlet(self.hs)
+
+        user_id = await register.registration_handler.register_user(
+            localpart=body["username"].lower(),
+            password=body["password"],
+            admin=bool(admin),
+            user_type=user_type,
+        )
+
+        result = await register._create_registration_details(user_id, body)
+        return 200, result
+
+
+class WhoisRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/whois/(?P<user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    async def on_GET(self, request, user_id):
+        target_user = UserID.from_string(user_id)
+        requester = await self.auth.get_user_by_req(request)
+        auth_user = requester.user
+
+        if target_user != auth_user:
+            await assert_user_is_admin(self.auth, auth_user)
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only whois a local user")
+
+        ret = await self.handlers.admin_handler.get_whois(target_user)
+
+        return 200, ret
+
+
+class DeactivateAccountRestServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/deactivate/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self._deactivate_account_handler = hs.get_deactivate_account_handler()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request, target_user_id):
+        await assert_requester_is_admin(self.auth, request)
+        body = parse_json_object_from_request(request, allow_empty_body=True)
+        erase = body.get("erase", False)
+        if not isinstance(erase, bool):
+            raise SynapseError(
+                http_client.BAD_REQUEST,
+                "Param 'erase' must be a boolean, if given",
+                Codes.BAD_JSON,
+            )
+
+        UserID.from_string(target_user_id)
+
+        result = await self._deactivate_account_handler.deactivate_account(
+            target_user_id, erase
+        )
+        if result:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        return 200, {"id_server_unbind_result": id_server_unbind_result}
+
+
+class AccountValidityRenewServlet(RestServlet):
+    PATTERNS = historical_admin_path_patterns("/account_validity/validity$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        self.hs = hs
+        self.account_activity_handler = hs.get_account_validity_handler()
+        self.auth = hs.get_auth()
+
+    async def on_POST(self, request):
+        await assert_requester_is_admin(self.auth, request)
+
+        body = parse_json_object_from_request(request)
+
+        if "user_id" not in body:
+            raise SynapseError(400, "Missing property 'user_id' in the request body")
+
+        expiration_ts = await self.account_activity_handler.renew_account_for_user(
+            body["user_id"],
+            body.get("expiration_ts"),
+            not body.get("enable_renewal_emails", True),
+        )
+
+        res = {"expiration_ts": expiration_ts}
+        return 200, res
+
+
+class ResetPasswordRestServlet(RestServlet):
+    """Post request to allow an administrator reset password for a user.
+    This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/reset_password/
+            @user:to_reset_password?access_token=admin_access_token
+        JsonBodyToSend:
+            {
+                "new_password": "secret"
+            }
+        Returns:
+            200 OK with empty object if success otherwise an error.
+        """
+
+    PATTERNS = historical_admin_path_patterns(
+        "/reset_password/(?P<target_user_id>[^/]*)"
+    )
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self._set_password_handler = hs.get_set_password_handler()
+
+    async def on_POST(self, request, target_user_id):
+        """Post request to allow an administrator reset password for a user.
+        This needs user to have administrator access in Synapse.
+        """
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        UserID.from_string(target_user_id)
+
+        params = parse_json_object_from_request(request)
+        assert_params_in_dict(params, ["new_password"])
+        new_password = params["new_password"]
+
+        await self._set_password_handler.set_password(
+            target_user_id, new_password, requester
+        )
+        return 200, {}
+
+
+class SearchUsersRestServlet(RestServlet):
+    """Get request to search user table for specific users according to
+    search term.
+    This needs user to have administrator access in Synapse.
+        Example:
+            http://localhost:8008/_synapse/admin/v1/search_users/
+            @admin:user?access_token=admin_access_token&term=alice
+        Returns:
+            200 OK with json object {list[dict[str, Any]], count} or empty object.
+    """
+
+    PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.handlers = hs.get_handlers()
+
+    async def on_GET(self, request, target_user_id):
+        """Get request to search user table for specific users according to
+        search term.
+        This needs user to have a administrator access in Synapse.
+        """
+        await assert_requester_is_admin(self.auth, request)
+
+        target_user = UserID.from_string(target_user_id)
+
+        # To allow all users to get the users list
+        # if not is_admin and target_user != auth_user:
+        #     raise AuthError(403, "You are not a server admin")
+
+        if not self.hs.is_mine(target_user):
+            raise SynapseError(400, "Can only users a local user")
+
+        term = parse_string(request, "term", required=True)
+        logger.info("term: %s ", term)
+
+        ret = await self.handlers.admin_handler.search_users(term)
+        return 200, ret
+
 
 class UserAdminServlet(RestServlet):
     """
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 24a0ce74f2..19eb15003d 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -92,8 +92,11 @@ class LoginRestServlet(RestServlet):
         self.auth_handler = self.hs.get_auth_handler()
         self.registration_handler = hs.get_registration_handler()
         self.handlers = hs.get_handlers()
+        self._clock = hs.get_clock()
         self._well_known_builder = WellKnownBuilder(hs)
         self._address_ratelimiter = Ratelimiter()
+        self._account_ratelimiter = Ratelimiter()
+        self._failed_attempts_ratelimiter = Ratelimiter()
 
     def on_GET(self, request):
         flows = []
@@ -202,6 +205,16 @@ class LoginRestServlet(RestServlet):
                 # (See add_threepid in synapse/handlers/auth.py)
                 address = address.lower()
 
+            # We also apply account rate limiting using the 3PID as a key, as
+            # otherwise using 3PID bypasses the ratelimiting based on user ID.
+            self._failed_attempts_ratelimiter.ratelimit(
+                (medium, address),
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=False,
+            )
+
             # Check for login providers that support 3pid login types
             (
                 canonical_user_id,
@@ -211,7 +224,8 @@ class LoginRestServlet(RestServlet):
             )
             if canonical_user_id:
                 # Authentication through password provider and 3pid succeeded
-                result = yield self._register_device_with_callback(
+
+                result = yield self._complete_login(
                     canonical_user_id, login_submission, callback_3pid
                 )
                 return result
@@ -225,6 +239,21 @@ class LoginRestServlet(RestServlet):
                 logger.warning(
                     "unknown 3pid identifier medium %s, address %r", medium, address
                 )
+                # We mark that we've failed to log in here, as
+                # `check_password_provider_3pid` might have returned `None` due
+                # to an incorrect password, rather than the account not
+                # existing.
+                #
+                # If it returned None but the 3PID was bound then we won't hit
+                # this code path, which is fine as then the per-user ratelimit
+                # will kick in below.
+                self._failed_attempts_ratelimiter.can_do_action(
+                    (medium, address),
+                    time_now_s=self._clock.time(),
+                    rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                    burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                    update=True,
+                )
                 raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
             identifier = {"type": "m.id.user", "user": user_id}
@@ -236,29 +265,84 @@ class LoginRestServlet(RestServlet):
         if "user" not in identifier:
             raise SynapseError(400, "User identifier is missing 'user' key")
 
-        canonical_user_id, callback = yield self.auth_handler.validate_login(
-            identifier["user"], login_submission
+        if identifier["user"].startswith("@"):
+            qualified_user_id = identifier["user"]
+        else:
+            qualified_user_id = UserID(identifier["user"], self.hs.hostname).to_string()
+
+        # Check if we've hit the failed ratelimit (but don't update it)
+        self._failed_attempts_ratelimiter.ratelimit(
+            qualified_user_id.lower(),
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+            burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+            update=False,
         )
 
-        result = yield self._register_device_with_callback(
+        try:
+            canonical_user_id, callback = yield self.auth_handler.validate_login(
+                identifier["user"], login_submission
+            )
+        except LoginError:
+            # The user has failed to log in, so we need to update the rate
+            # limiter. Using `can_do_action` avoids us raising a ratelimit
+            # exception and masking the LoginError. The actual ratelimiting
+            # should have happened above.
+            self._failed_attempts_ratelimiter.can_do_action(
+                qualified_user_id.lower(),
+                time_now_s=self._clock.time(),
+                rate_hz=self.hs.config.rc_login_failed_attempts.per_second,
+                burst_count=self.hs.config.rc_login_failed_attempts.burst_count,
+                update=True,
+            )
+            raise
+
+        result = yield self._complete_login(
             canonical_user_id, login_submission, callback
         )
         return result
 
     @defer.inlineCallbacks
-    def _register_device_with_callback(self, user_id, login_submission, callback=None):
-        """ Registers a device with a given user_id. Optionally run a callback
-        function after registration has completed.
+    def _complete_login(
+        self, user_id, login_submission, callback=None, create_non_existant_users=False
+    ):
+        """Called when we've successfully authed the user and now need to
+        actually login them in (e.g. create devices). This gets called on
+        all succesful logins.
+
+        Applies the ratelimiting for succesful login attempts against an
+        account.
 
         Args:
             user_id (str): ID of the user to register.
             login_submission (dict): Dictionary of login information.
             callback (func|None): Callback function to run after registration.
+            create_non_existant_users (bool): Whether to create the user if
+                they don't exist. Defaults to False.
 
         Returns:
             result (Dict[str,str]): Dictionary of account information after
                 successful registration.
         """
+
+        # Before we actually log them in we check if they've already logged in
+        # too often. This happens here rather than before as we don't
+        # necessarily know the user before now.
+        self._account_ratelimiter.ratelimit(
+            user_id.lower(),
+            time_now_s=self._clock.time(),
+            rate_hz=self.hs.config.rc_login_account.per_second,
+            burst_count=self.hs.config.rc_login_account.burst_count,
+            update=True,
+        )
+
+        if create_non_existant_users:
+            user_id = yield self.auth_handler.check_user_exists(user_id)
+            if not user_id:
+                user_id = yield self.registration_handler.register_user(
+                    localpart=UserID.from_string(user_id).localpart
+                )
+
         device_id = login_submission.get("device_id")
         initial_display_name = login_submission.get("initial_device_display_name")
         device_id, access_token = yield self.registration_handler.register_device(
@@ -285,7 +369,7 @@ class LoginRestServlet(RestServlet):
             token
         )
 
-        result = yield self._register_device_with_callback(user_id, login_submission)
+        result = yield self._complete_login(user_id, login_submission)
         return result
 
     @defer.inlineCallbacks
@@ -313,15 +397,8 @@ class LoginRestServlet(RestServlet):
             raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
 
         user_id = UserID(user, self.hs.hostname).to_string()
-
-        registered_user_id = yield self.auth_handler.check_user_exists(user_id)
-        if not registered_user_id:
-            registered_user_id = yield self.registration_handler.register_user(
-                localpart=user
-            )
-
-        result = yield self._register_device_with_callback(
-            registered_user_id, login_submission
+        result = yield self._complete_login(
+            user_id, login_submission, create_non_existant_users=True
         )
         return result
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 15c15a12f5..a23d6f5c75 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -122,7 +122,7 @@ class PreviewUrlResource(DirectServeResource):
                 pattern = entry[attrib]
                 value = getattr(url_tuple, attrib)
                 logger.debug(
-                    "Matching attrib '%s' with value '%s' against" " pattern '%s'",
+                    "Matching attrib '%s' with value '%s' against pattern '%s'",
                     attrib,
                     value,
                     pattern,
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 415e9c17d8..5736c56032 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -54,7 +54,7 @@ class ConsentServerNotices(object):
                 )
             if "body" not in self._server_notice_content:
                 raise ConfigError(
-                    "user_consent server_notice_consent must contain a 'body' " "key."
+                    "user_consent server_notice_consent must contain a 'body' key."
                 )
 
             self._consent_uri_builder = ConsentURIBuilder(hs.config)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 1a2b7ebe25..459901ac60 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -361,14 +361,11 @@ class SQLBaseStore(object):
                 expiration_ts,
             )
 
-        self._simple_insert_txn(
+        self._simple_upsert_txn(
             txn,
             "account_validity",
-            values={
-                "user_id": user_id,
-                "expiration_ts_ms": expiration_ts,
-                "email_sent": False,
-            },
+            keyvalues={"user_id": user_id},
+            values={"expiration_ts_ms": expiration_ts, "email_sent": False},
         )
 
     def start_profiling(self):
@@ -412,16 +409,15 @@ class SQLBaseStore(object):
             i = 0
             N = 5
             while True:
+                cursor = LoggingTransaction(
+                    conn.cursor(),
+                    name,
+                    self.database_engine,
+                    after_callbacks,
+                    exception_callbacks,
+                )
                 try:
-                    txn = conn.cursor()
-                    txn = LoggingTransaction(
-                        txn,
-                        name,
-                        self.database_engine,
-                        after_callbacks,
-                        exception_callbacks,
-                    )
-                    r = func(txn, *args, **kwargs)
+                    r = func(cursor, *args, **kwargs)
                     conn.commit()
                     return r
                 except self.database_engine.module.OperationalError as e:
@@ -459,6 +455,40 @@ class SQLBaseStore(object):
                                 )
                             continue
                     raise
+                finally:
+                    # we're either about to retry with a new cursor, or we're about to
+                    # release the connection. Once we release the connection, it could
+                    # get used for another query, which might do a conn.rollback().
+                    #
+                    # In the latter case, even though that probably wouldn't affect the
+                    # results of this transaction, python's sqlite will reset all
+                    # statements on the connection [1], which will make our cursor
+                    # invalid [2].
+                    #
+                    # In any case, continuing to read rows after commit()ing seems
+                    # dubious from the PoV of ACID transactional semantics
+                    # (sqlite explicitly says that once you commit, you may see rows
+                    # from subsequent updates.)
+                    #
+                    # In psycopg2, cursors are essentially a client-side fabrication -
+                    # all the data is transferred to the client side when the statement
+                    # finishes executing - so in theory we could go on streaming results
+                    # from the cursor, but attempting to do so would make us
+                    # incompatible with sqlite, so let's make sure we're not doing that
+                    # by closing the cursor.
+                    #
+                    # (*named* cursors in psycopg2 are different and are proper server-
+                    # side things, but (a) we don't use them and (b) they are implicitly
+                    # closed by ending the transaction anyway.)
+                    #
+                    # In short, if we haven't finished with the cursor yet, that's a
+                    # problem waiting to bite us.
+                    #
+                    # TL;DR: we're done with the cursor, so we can close it.
+                    #
+                    # [1]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/connection.c#L465
+                    # [2]: https://github.com/python/cpython/blob/v3.8.0/Modules/_sqlite/cursor.c#L236
+                    cursor.close()
         except Exception as e:
             logger.debug("[TXN FAIL] {%s} %s", name, e)
             raise
@@ -854,7 +884,7 @@ class SQLBaseStore(object):
             allvalues.update(values)
             latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
 
-        sql = ("INSERT INTO %s (%s) VALUES (%s) " "ON CONFLICT (%s) DO %s") % (
+        sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index f04aad0743..a23744f11c 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -358,8 +358,21 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
     def _add_messages_to_local_device_inbox_txn(
         self, txn, stream_id, messages_by_user_then_device
     ):
-        sql = "UPDATE device_max_stream_id" " SET stream_id = ?" " WHERE stream_id < ?"
-        txn.execute(sql, (stream_id, stream_id))
+        # Compatible method of performing an upsert
+        sql = "SELECT stream_id FROM device_max_stream_id"
+
+        txn.execute(sql)
+        rows = txn.fetchone()
+        if rows:
+            db_stream_id = rows[0]
+            if db_stream_id < stream_id:
+                # Insert the new stream_id
+                sql = "UPDATE device_max_stream_id SET stream_id = ?"
+        else:
+            # No rows, perform an insert
+            sql = "INSERT INTO device_max_stream_id (stream_id) VALUES (?)"
+
+        txn.execute(sql, (stream_id,))
 
         local_by_user_then_device = {}
         for user_id, messages_by_device in messages_by_user_then_device.items():
@@ -367,7 +380,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             devices = list(messages_by_device.keys())
             if len(devices) == 1 and devices[0] == "*":
                 # Handle wildcard device_ids.
-                sql = "SELECT device_id FROM devices" " WHERE user_id = ?"
+                sql = "SELECT device_id FROM devices WHERE user_id = ?"
                 txn.execute(sql, (user_id,))
                 message_json = json.dumps(messages_by_device["*"])
                 for row in txn:
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 073412a78d..d8ad59ad93 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -138,9 +138,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 result.setdefault(user_id, {})[device_id] = None
 
         # get signatures on the device
-        signature_sql = (
-            "SELECT * " "  FROM e2e_cross_signing_signatures " " WHERE %s"
-        ) % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
+        signature_sql = ("SELECT *  FROM e2e_cross_signing_signatures WHERE %s") % (
+            " OR ".join("(" + q + ")" for q in signature_query_clauses)
+        )
 
         txn.execute(signature_sql, signature_query_params)
         rows = self.cursor_to_dict(txn)
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 878f7568a6..627c0b67f1 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -713,9 +713,7 @@ class EventsStore(
 
                 metadata_json = encode_json(event.internal_metadata.get_dict())
 
-                sql = (
-                    "UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
-                )
+                sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
                 txn.execute(sql, (metadata_json, event.event_id))
 
                 # Add an entry to the ex_outlier_stream table to replicate the
@@ -732,7 +730,7 @@ class EventsStore(
                     },
                 )
 
-                sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
+                sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
                 txn.execute(sql, (False, event.event_id))
 
                 # Update the event_backward_extremities table now that this
@@ -1479,7 +1477,7 @@ class EventsStore(
 
         # We do joins against events_to_purge for e.g. calculating state
         # groups to purge, etc., so lets make an index.
-        txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
+        txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
 
         txn.execute("SELECT event_id, should_delete FROM events_to_purge")
         event_rows = txn.fetchall()
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 0ed59ef48e..aa87f9abc5 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -530,24 +530,31 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             nbrows = 0
             last_row_event_id = ""
             for (event_id, event_json_raw) in results:
-                event_json = json.loads(event_json_raw)
-
-                self._simple_insert_many_txn(
-                    txn=txn,
-                    table="event_labels",
-                    values=[
-                        {
-                            "event_id": event_id,
-                            "label": label,
-                            "room_id": event_json["room_id"],
-                            "topological_ordering": event_json["depth"],
-                        }
-                        for label in event_json["content"].get(
-                            EventContentFields.LABELS, []
-                        )
-                        if isinstance(label, str)
-                    ],
-                )
+                try:
+                    event_json = json.loads(event_json_raw)
+
+                    self._simple_insert_many_txn(
+                        txn=txn,
+                        table="event_labels",
+                        values=[
+                            {
+                                "event_id": event_id,
+                                "label": label,
+                                "room_id": event_json["room_id"],
+                                "topological_ordering": event_json["depth"],
+                            }
+                            for label in event_json["content"].get(
+                                EventContentFields.LABELS, []
+                            )
+                            if isinstance(label, str)
+                        ],
+                    )
+                except Exception as e:
+                    logger.warning(
+                        "Unable to load event %s (no labels will be imported): %s",
+                        event_id,
+                        e,
+                    )
 
                 nbrows += 1
                 last_row_event_id = event_id
diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py
index a2a2a67927..f05ace299a 100644
--- a/synapse/storage/data_stores/main/filtering.py
+++ b/synapse/storage/data_stores/main/filtering.py
@@ -55,7 +55,7 @@ class FilteringStore(SQLBaseStore):
             if filter_id_response is not None:
                 return filter_id_response[0]
 
-            sql = "SELECT MAX(filter_id) FROM user_filters " "WHERE user_id = ?"
+            sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
             txn.execute(sql, (user_localpart,))
             max_id = txn.fetchone()[0]
             if max_id is None:
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 84b5f3ad5e..0f2887bdce 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -337,7 +337,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         if len(media_ids) == 0:
             return
 
-        sql = "DELETE FROM local_media_repository_url_cache" " WHERE media_id = ?"
+        sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
 
         def _delete_url_cache_txn(txn):
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
@@ -365,11 +365,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             return
 
         def _delete_url_cache_media_txn(txn):
-            sql = "DELETE FROM local_media_repository" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
-            sql = "DELETE FROM local_media_repository_thumbnails" " WHERE media_id = ?"
+            sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
 
             txn.executemany(sql, [(media_id,) for media_id in media_ids])
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0c24430f28..8b17334ff4 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -280,7 +280,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 args.append(limit)
             txn.execute(sql, args)
 
-            return (r[0:5] + (json.loads(r[5]),) for r in txn)
+            return list(r[0:5] + (json.loads(r[5]),) for r in txn)
 
         return self.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index ee1b2b2bbf..6a594c160c 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -377,9 +377,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         """
 
         def f(txn):
-            sql = (
-                "SELECT name, password_hash FROM users" " WHERE lower(name) = lower(?)"
-            )
+            sql = "SELECT name, password_hash FROM users WHERE lower(name) = lower(?)"
             txn.execute(sql, (user_id,))
             return dict(txn)
 
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 8780fdd989..9ae4a913a1 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -616,7 +616,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     def _get_max_topological_txn(self, txn, room_id):
         txn.execute(
-            "SELECT MAX(topological_ordering) FROM events" " WHERE room_id = ?",
+            "SELECT MAX(topological_ordering) FROM events WHERE room_id = ?",
             (room_id,),
         )
 
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 10d1887f75..aa24339717 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -83,9 +83,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
         )
 
         def get_tag_content(txn, tag_ids):
-            sql = (
-                "SELECT tag, content" " FROM room_tags" " WHERE user_id=? AND room_id=?"
-            )
+            sql = "SELECT tag, content FROM room_tags WHERE user_id=? AND room_id=?"
             results = []
             for stream_id, user_id, room_id in tag_ids:
                 txn.execute(sql, (user_id, room_id))
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 2e7753820e..731e1c9d9c 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -447,7 +447,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
         # Mark as done.
         cur.execute(
             database_engine.convert_param_style(
-                "INSERT INTO applied_module_schemas (module_name, file)" " VALUES (?,?)"
+                "INSERT INTO applied_module_schemas (module_name, file) VALUES (?,?)"
             ),
             (modname, name),
         )
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 02994ab2a5..cd56cd91ed 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -88,9 +88,12 @@ class PaginationConfig(object):
             raise SynapseError(400, "Invalid request.")
 
     def __repr__(self):
-        return (
-            "PaginationConfig(from_tok=%r, to_tok=%r," " direction=%r, limit=%r)"
-        ) % (self.from_token, self.to_token, self.direction, self.limit)
+        return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % (
+            self.from_token,
+            self.to_token,
+            self.direction,
+            self.limit,
+        )
 
     def get_source_config(self, source_name):
         keyname = "%s_key" % source_name
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index 1a20c596bf..3c0e8469f3 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -20,7 +20,7 @@ logger = logging.getLogger(__name__)
 
 
 def create_resource_tree(desired_tree, root_resource):
-    """Create the resource tree for this Home Server.
+    """Create the resource tree for this homeserver.
 
     This in unduly complicated because Twisted does not support putting
     child resources more than 1 level deep at a time.
diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 5e38fd6ced..e84e578f99 100644
--- a/tests/rest/client/v1/test_rooms.py
+++ b/tests/rest/client/v1/test_rooms.py
@@ -25,7 +25,9 @@ from twisted.internet import defer
 
 import synapse.rest.admin
 from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.handlers.pagination import PurgeStatus
 from synapse.rest.client.v1 import login, profile, room
+from synapse.util.stringutils import random_string
 
 from tests import unittest
 
@@ -910,6 +912,78 @@ class RoomMessageListTestCase(RoomBase):
 
         return channel.json_body["chunk"]
 
+    def test_room_messages_purge(self):
+        store = self.hs.get_datastore()
+        pagination_handler = self.hs.get_pagination_handler()
+
+        # Send a first message in the room, which will be removed by the purge.
+        first_event_id = self.helper.send(self.room_id, "message 1")["event_id"]
+        first_token = self.get_success(
+            store.get_topological_token_for_event(first_event_id)
+        )
+
+        # Send a second message in the room, which won't be removed, and which we'll
+        # use as the marker to purge events before.
+        second_event_id = self.helper.send(self.room_id, "message 2")["event_id"]
+        second_token = self.get_success(
+            store.get_topological_token_for_event(second_event_id)
+        )
+
+        # Send a third event in the room to ensure we don't fall under any edge case
+        # due to our marker being the latest forward extremity in the room.
+        self.helper.send(self.room_id, "message 3")
+
+        # Check that we get the first and second message when querying /messages.
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
+            % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})),
+        )
+        self.render(request)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
+
+        # Purge every event before the second event.
+        purge_id = random_string(16)
+        pagination_handler._purges_by_id[purge_id] = PurgeStatus()
+        self.get_success(
+            pagination_handler._purge_history(
+                purge_id=purge_id,
+                room_id=self.room_id,
+                token=second_token,
+                delete_local_events=True,
+            )
+        )
+
+        # Check that we only get the second message through /message now that the first
+        # has been purged.
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
+            % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})),
+        )
+        self.render(request)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 1, [event["content"] for event in chunk])
+
+        # Check that we get no event, but also no error, when querying /messages with
+        # the token that was pointing at the first event, because we don't have it
+        # anymore.
+        request, channel = self.make_request(
+            "GET",
+            "/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
+            % (self.room_id, first_token, json.dumps({"types": [EventTypes.Message]})),
+        )
+        self.render(request)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 0, [event["content"] for event in chunk])
+
 
 class RoomSearchTestCase(unittest.HomeserverTestCase):
     servlets = [
diff --git a/tests/server.py b/tests/server.py
index f878aeaada..2b7cf4242e 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -379,6 +379,7 @@ class FakeTransport(object):
 
     disconnecting = False
     disconnected = False
+    connected = True
     buffer = attr.ib(default=b"")
     producer = attr.ib(default=None)
     autoflush = attr.ib(default=True)
@@ -402,6 +403,7 @@ class FakeTransport(object):
                     "FakeTransport: Delaying disconnect until buffer is flushed"
                 )
             else:
+                self.connected = False
                 self.disconnected = True
 
     def abortConnection(self):