summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2019-10-03 13:48:21 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2019-10-03 13:48:21 +0100
commit782dd72037cf71fb3f9e4922b07c56df2f59de75 (patch)
treed8d74a1d411cd83a762880715f47b2f33351994a /synapse/handlers
parentUpdate the issue template for new way of getting server version (#6051) (diff)
parent1.4.0 (diff)
downloadsynapse-782dd72037cf71fb3f9e4922b07c56df2f59de75.tar.xz
Merge tag 'v1.4.0'
Synapse 1.4.0 (2019-10-03)
==========================

Bugfixes
--------

- Redact `client_secret` in server logs. ([\#6158](https://github.com/matrix-org/synapse/issues/6158))

Synapse 1.4.0rc2 (2019-10-02)
=============================

Bugfixes
--------

- Fix bug in background update that adds last seen information to the `devices` table, and improve its performance on Postgres. ([\#6135](https://github.com/matrix-org/synapse/issues/6135))
- Fix bad performance of censoring redactions background task. ([\#6141](https://github.com/matrix-org/synapse/issues/6141))
- Fix fetching censored redactions from DB, which caused APIs like initial sync to fail if it tried to include the censored redaction. ([\#6145](https://github.com/matrix-org/synapse/issues/6145))
- Fix exceptions when storing large retry intervals for down remote servers. ([\#6146](https://github.com/matrix-org/synapse/issues/6146))

Internal Changes
----------------

- Fix up sample config entry for `redaction_retention_period` option. ([\#6117](https://github.com/matrix-org/synapse/issues/6117))

Synapse 1.4.0rc1 (2019-09-26)
=============================

Note that this release includes significant changes around 3pid
verification. Administrators are reminded to review the [upgrade notes](UPGRADE.rst#upgrading-to-v140).

Features
--------

- Changes to 3pid verification:
  - Add the ability to send registration emails from the homeserver rather than delegating to an identity server. ([\#5835](https://github.com/matrix-org/synapse/issues/5835), [\#5940](https://github.com/matrix-org/synapse/issues/5940), [\#5993](https://github.com/matrix-org/synapse/issues/5993), [\#5994](https://github.com/matrix-org/synapse/issues/5994), [\#5868](https://github.com/matrix-org/synapse/issues/5868))
  - Replace `trust_identity_server_for_password_resets` config option with `account_threepid_delegates`, and make the `id_server` parameteter optional on `*/requestToken` endpoints, as per [MSC2263](https://github.com/matrix-org/matrix-doc/pull/2263). ([\#5876](https://github.com/matrix-org/synapse/issues/5876), [\#5969](https://github.com/matrix-org/synapse/issues/5969), [\#6028](https://github.com/matrix-org/synapse/issues/6028))
  - Switch to using the v2 Identity Service `/lookup` API where available, with fallback to v1. (Implements [MSC2134](https://github.com/matrix-org/matrix-doc/pull/2134) plus `id_access_token authentication` for v2 Identity Service APIs from [MSC2140](https://github.com/matrix-org/matrix-doc/pull/2140)). ([\#5897](https://github.com/matrix-org/synapse/issues/5897))
  - Remove `bind_email` and `bind_msisdn` parameters from `/register` ala [MSC2140](https://github.com/matrix-org/matrix-doc/pull/2140). ([\#5964](https://github.com/matrix-org/synapse/issues/5964))
  - Add `m.id_access_token` to `unstable_features` in `/versions` as per [MSC2264](https://github.com/matrix-org/matrix-doc/pull/2264). ([\#5974](https://github.com/matrix-org/synapse/issues/5974))
  - Use the v2 Identity Service API for 3PID invites. ([\#5979](https://github.com/matrix-org/synapse/issues/5979))
  - Add `POST /_matrix/client/unstable/account/3pid/unbind` endpoint from [MSC2140](https://github.com/matrix-org/matrix-doc/pull/2140) for unbinding a 3PID from an identity server without removing it from the homeserver user account. ([\#5980](https://github.com/matrix-org/synapse/issues/5980), [\#6062](https://github.com/matrix-org/synapse/issues/6062))
  - Use `account_threepid_delegate.email` and `account_threepid_delegate.msisdn` for validating threepid sessions. ([\#6011](https://github.com/matrix-org/synapse/issues/6011))
  - Allow homeserver to handle or delegate email validation when adding an email to a user's account. ([\#6042](https://github.com/matrix-org/synapse/issues/6042))
  - Implement new Client Server API endpoints `/account/3pid/add` and `/account/3pid/bind` as per [MSC2290](https://github.com/matrix-org/matrix-doc/pull/2290). ([\#6043](https://github.com/matrix-org/synapse/issues/6043))
  - Add an unstable feature flag for separate add/bind 3pid APIs. ([\#6044](https://github.com/matrix-org/synapse/issues/6044))
  - Remove `bind` parameter from Client Server POST `/account` endpoint as per [MSC2290](https://github.com/matrix-org/matrix-doc/pull/2290/). ([\#6067](https://github.com/matrix-org/synapse/issues/6067))
  - Add `POST /add_threepid/msisdn/submit_token` endpoint for proxying submitToken on an `account_threepid_handler`. ([\#6078](https://github.com/matrix-org/synapse/issues/6078))
  - Add `submit_url` response parameter to `*/msisdn/requestToken` endpoints. ([\#6079](https://github.com/matrix-org/synapse/issues/6079))
  - Add `m.require_identity_server` flag to /version's unstable_features. ([\#5972](https://github.com/matrix-org/synapse/issues/5972))
- Enhancements to OpenTracing support:
  - Make OpenTracing work in worker mode. ([\#5771](https://github.com/matrix-org/synapse/issues/5771))
  - Pass OpenTracing contexts between servers when transmitting EDUs. ([\#5852](https://github.com/matrix-org/synapse/issues/5852))
  - OpenTracing for device list updates. ([\#5853](https://github.com/matrix-org/synapse/issues/5853))
  - Add a tag recording a request's authenticated entity and corresponding servlet in OpenTracing. ([\#5856](https://github.com/matrix-org/synapse/issues/5856))
  - Add minimum OpenTracing for client servlets. ([\#5983](https://github.com/matrix-org/synapse/issues/5983))
  - Check at setup that OpenTracing is installed if it's enabled in the config. ([\#5985](https://github.com/matrix-org/synapse/issues/5985))
  - Trace replication send times. ([\#5986](https://github.com/matrix-org/synapse/issues/5986))
  - Include missing OpenTracing contexts in outbout replication requests. ([\#5982](https://github.com/matrix-org/synapse/issues/5982))
  - Fix sending of EDUs when OpenTracing is enabled with an empty whitelist. ([\#5984](https://github.com/matrix-org/synapse/issues/5984))
  - Fix invalid references to None while OpenTracing if the log context slips. ([\#5988](https://github.com/matrix-org/synapse/issues/5988), [\#5991](https://github.com/matrix-org/synapse/issues/5991))
  - OpenTracing for room and e2e keys. ([\#5855](https://github.com/matrix-org/synapse/issues/5855))
  - Add OpenTracing span over HTTP push processing. ([\#6003](https://github.com/matrix-org/synapse/issues/6003))
- Add an admin API to purge old rooms from the database. ([\#5845](https://github.com/matrix-org/synapse/issues/5845))
- Retry well-known lookups if we have recently seen a valid well-known record for the server. ([\#5850](https://github.com/matrix-org/synapse/issues/5850))
- Add support for filtered room-directory search requests over federation ([MSC2197](https://github.com/matrix-org/matrix-doc/pull/2197), in order to allow upcoming room directory query performance improvements. ([\#5859](https://github.com/matrix-org/synapse/issues/5859))
- Correctly retry all hosts returned from SRV when we fail to connect. ([\#5864](https://github.com/matrix-org/synapse/issues/5864))
- Add admin API endpoint for setting whether or not a user is a server administrator. ([\#5878](https://github.com/matrix-org/synapse/issues/5878))
- Enable cleaning up extremities with dummy events by default to prevent undue build up of forward extremities. ([\#5884](https://github.com/matrix-org/synapse/issues/5884))
- Add config option to sign remote key query responses with a separate key. ([\#5895](https://github.com/matrix-org/synapse/issues/5895))
- Add support for config templating. ([\#5900](https://github.com/matrix-org/synapse/issues/5900))
- Users with the type of "support" or "bot" are no longer required to consent. ([\#5902](https://github.com/matrix-org/synapse/issues/5902))
- Let synctl accept a directory of config files. ([\#5904](https://github.com/matrix-org/synapse/issues/5904))
- Increase max display name size to 256. ([\#5906](https://github.com/matrix-org/synapse/issues/5906))
- Add admin API endpoint for getting whether or not a user is a server administrator. ([\#5914](https://github.com/matrix-org/synapse/issues/5914))
- Redact events in the database that have been redacted for a week. ([\#5934](https://github.com/matrix-org/synapse/issues/5934))
- New prometheus metrics:
  - `synapse_federation_known_servers`: represents the total number of servers your server knows about (i.e. is in rooms with), including itself. Enable by setting `metrics_flags.known_servers` to True in the configuration.([\#5981](https://github.com/matrix-org/synapse/issues/5981))
  - `synapse_build_info`: exposes the Python version, OS version, and Synapse version of the running server. ([\#6005](https://github.com/matrix-org/synapse/issues/6005))
- Give appropriate exit codes when synctl fails. ([\#5992](https://github.com/matrix-org/synapse/issues/5992))
- Apply the federation blacklist to requests to identity servers. ([\#6000](https://github.com/matrix-org/synapse/issues/6000))
- Add `report_stats_endpoint` option to configure where stats are reported to, if enabled. Contributed by @Sorunome. ([\#6012](https://github.com/matrix-org/synapse/issues/6012))
- Add config option to increase ratelimits for room admins redacting messages. ([\#6015](https://github.com/matrix-org/synapse/issues/6015))
- Stop sending federation transactions to servers which have been down for a long time. ([\#6026](https://github.com/matrix-org/synapse/issues/6026))
- Make the process for mapping SAML2 users to matrix IDs more flexible. ([\#6037](https://github.com/matrix-org/synapse/issues/6037))
- Return a clearer error message when a timeout occurs when attempting to contact an identity server. ([\#6073](https://github.com/matrix-org/synapse/issues/6073))
- Prevent password reset's submit_token endpoint from accepting trailing slashes. ([\#6074](https://github.com/matrix-org/synapse/issues/6074))
- Return 403 on `/register/available` if registration has been disabled. ([\#6082](https://github.com/matrix-org/synapse/issues/6082))
- Explicitly log when a homeserver does not have the `trusted_key_servers` config field configured. ([\#6090](https://github.com/matrix-org/synapse/issues/6090))
- Add support for pruning old rows in `user_ips` table. ([\#6098](https://github.com/matrix-org/synapse/issues/6098))

Bugfixes
--------

- Don't create broken room when `power_level_content_override.users` does not contain `creator_id`. ([\#5633](https://github.com/matrix-org/synapse/issues/5633))
- Fix database index so that different backup versions can have the same sessions. ([\#5857](https://github.com/matrix-org/synapse/issues/5857))
- Fix Synapse looking for config options `password_reset_failure_template` and `password_reset_success_template`, when they are actually `password_reset_template_failure_html`, `password_reset_template_success_html`. ([\#5863](https://github.com/matrix-org/synapse/issues/5863))
- Fix stack overflow when recovering an appservice which had an outage. ([\#5885](https://github.com/matrix-org/synapse/issues/5885))
- Fix error message which referred to `public_base_url` instead of `public_baseurl`. Thanks to @aaronraimist for the fix! ([\#5909](https://github.com/matrix-org/synapse/issues/5909))
- Fix 404 for thumbnail download when `dynamic_thumbnails` is `false` and the thumbnail was dynamically generated. Fix reported by rkfg. ([\#5915](https://github.com/matrix-org/synapse/issues/5915))
- Fix a cache-invalidation bug for worker-based deployments. ([\#5920](https://github.com/matrix-org/synapse/issues/5920))
- Fix admin API for listing media in a room not being available with an external media repo. ([\#5966](https://github.com/matrix-org/synapse/issues/5966))
- Fix list media admin API always returning an error. ([\#5967](https://github.com/matrix-org/synapse/issues/5967))
- Fix room and user stats tracking. ([\#5971](https://github.com/matrix-org/synapse/issues/5971), [\#5998](https://github.com/matrix-org/synapse/issues/5998), [\#6029](https://github.com/matrix-org/synapse/issues/6029))
- Return a `M_MISSING_PARAM` if `sid` is not provided to `/account/3pid`. ([\#5995](https://github.com/matrix-org/synapse/issues/5995))
- `federation_certificate_verification_whitelist` now will not cause `TypeErrors` to be raised (a regression in 1.3). Additionally, it now supports internationalised domain names in their non-canonical representation. ([\#5996](https://github.com/matrix-org/synapse/issues/5996))
- Only count real users when checking for auto-creation of auto-join room. ([\#6004](https://github.com/matrix-org/synapse/issues/6004))
- Ensure support users can be registered even if MAU limit is reached. ([\#6020](https://github.com/matrix-org/synapse/issues/6020))
- Fix bug where login error was shown incorrectly on SSO fallback login. ([\#6024](https://github.com/matrix-org/synapse/issues/6024))
- Fix bug in calculating the federation retry backoff period. ([\#6025](https://github.com/matrix-org/synapse/issues/6025))
- Prevent exceptions being logged when extremity-cleanup events fail due to lack of user consent to the terms of service. ([\#6053](https://github.com/matrix-org/synapse/issues/6053))
- Remove POST method from password-reset `submit_token` endpoint until we implement `submit_url` functionality. ([\#6056](https://github.com/matrix-org/synapse/issues/6056))
- Fix logcontext spam on non-Linux platforms. ([\#6059](https://github.com/matrix-org/synapse/issues/6059))
- Ensure query parameters in email validation links are URL-encoded. ([\#6063](https://github.com/matrix-org/synapse/issues/6063))
- Fix a bug which caused SAML attribute maps to be overridden by defaults. ([\#6069](https://github.com/matrix-org/synapse/issues/6069))
- Fix the logged number of updated items for the `users_set_deactivated_flag` background update. ([\#6092](https://github.com/matrix-org/synapse/issues/6092))
- Add `sid` to `next_link` for email validation. ([\#6097](https://github.com/matrix-org/synapse/issues/6097))
- Threepid validity checks on msisdns should not be dependent on `threepid_behaviour_email`. ([\#6104](https://github.com/matrix-org/synapse/issues/6104))
- Ensure that servers which are not configured to support email address verification do not offer it in the registration flows. ([\#6107](https://github.com/matrix-org/synapse/issues/6107))

Updates to the Docker image
---------------------------

- Avoid changing `UID/GID` if they are already correct. ([\#5970](https://github.com/matrix-org/synapse/issues/5970))
- Provide `SYNAPSE_WORKER` envvar to specify python module. ([\#6058](https://github.com/matrix-org/synapse/issues/6058))

Improved Documentation
----------------------

- Convert documentation to markdown (from rst) ([\#5849](https://github.com/matrix-org/synapse/issues/5849))
- Update `INSTALL.md` to say that Python 2 is no longer supported. ([\#5953](https://github.com/matrix-org/synapse/issues/5953))
- Add developer documentation for using SAML2. ([\#6032](https://github.com/matrix-org/synapse/issues/6032))
- Add some notes on rolling back to v1.3.1. ([\#6049](https://github.com/matrix-org/synapse/issues/6049))
- Update the upgrade notes. ([\#6050](https://github.com/matrix-org/synapse/issues/6050))

Deprecations and Removals
-------------------------

- Remove shared-secret registration from `/_matrix/client/r0/register` endpoint. Contributed by Awesome Technologies Innovationslabor GmbH. ([\#5877](https://github.com/matrix-org/synapse/issues/5877))
- Deprecate the `trusted_third_party_id_servers` option. ([\#5875](https://github.com/matrix-org/synapse/issues/5875))

Internal Changes
----------------

- Lay the groundwork for structured logging output. ([\#5680](https://github.com/matrix-org/synapse/issues/5680))
- Retry well-known lookup before the cache expires, giving a grace period where the remote well-known can be down but we still use the old result. ([\#5844](https://github.com/matrix-org/synapse/issues/5844))
- Remove log line for debugging issue #5407. ([\#5860](https://github.com/matrix-org/synapse/issues/5860))
- Refactor the Appservice scheduler code. ([\#5886](https://github.com/matrix-org/synapse/issues/5886))
- Compatibility with v2 Identity Service APIs other than /lookup. ([\#5892](https://github.com/matrix-org/synapse/issues/5892), [\#6013](https://github.com/matrix-org/synapse/issues/6013))
- Stop populating some unused tables. ([\#5893](https://github.com/matrix-org/synapse/issues/5893), [\#6047](https://github.com/matrix-org/synapse/issues/6047))
- Add missing index on `users_in_public_rooms` to improve the performance of directory queries. ([\#5894](https://github.com/matrix-org/synapse/issues/5894))
- Improve the logging when we have an error when fetching signing keys. ([\#5896](https://github.com/matrix-org/synapse/issues/5896))
- Add support for database engine-specific schema deltas, based on file extension. ([\#5911](https://github.com/matrix-org/synapse/issues/5911))
- Update Buildkite pipeline to use plugins instead of buildkite-agent commands. ([\#5922](https://github.com/matrix-org/synapse/issues/5922))
- Add link in sample config to the logging config schema. ([\#5926](https://github.com/matrix-org/synapse/issues/5926))
- Remove unnecessary parentheses in return statements. ([\#5931](https://github.com/matrix-org/synapse/issues/5931))
- Remove unused `jenkins/prepare_sytest.sh` file. ([\#5938](https://github.com/matrix-org/synapse/issues/5938))
- Move Buildkite pipeline config to the pipelines repo. ([\#5943](https://github.com/matrix-org/synapse/issues/5943))
- Remove unnecessary return statements in the codebase which were the result of a regex run. ([\#5962](https://github.com/matrix-org/synapse/issues/5962))
- Remove left-over methods from v1 registration API. ([\#5963](https://github.com/matrix-org/synapse/issues/5963))
- Cleanup event auth type initialisation. ([\#5975](https://github.com/matrix-org/synapse/issues/5975))
- Clean up dependency checking at setup. ([\#5989](https://github.com/matrix-org/synapse/issues/5989))
- Update OpenTracing docs to use the unified `trace` method. ([\#5776](https://github.com/matrix-org/synapse/issues/5776))
- Small refactor of function arguments and docstrings in` RoomMemberHandler`. ([\#6009](https://github.com/matrix-org/synapse/issues/6009))
- Remove unused `origin` argument on `FederationHandler.add_display_name_to_third_party_invite`. ([\#6010](https://github.com/matrix-org/synapse/issues/6010))
- Add a `failure_ts` column to the `destinations` database table. ([\#6016](https://github.com/matrix-org/synapse/issues/6016), [\#6072](https://github.com/matrix-org/synapse/issues/6072))
- Clean up some code in the retry logic. ([\#6017](https://github.com/matrix-org/synapse/issues/6017))
- Fix the structured logging tests stomping on the global log configuration for subsequent tests. ([\#6023](https://github.com/matrix-org/synapse/issues/6023))
- Clean up the sample config for SAML authentication. ([\#6064](https://github.com/matrix-org/synapse/issues/6064))
- Change mailer logging to reflect Synapse doesn't just do chat notifications by email now. ([\#6075](https://github.com/matrix-org/synapse/issues/6075))
- Move last-seen info into devices table. ([\#6089](https://github.com/matrix-org/synapse/issues/6089))
- Remove unused parameter to `get_user_id_by_threepid`. ([\#6099](https://github.com/matrix-org/synapse/issues/6099))
- Refactor the user-interactive auth handling. ([\#6105](https://github.com/matrix-org/synapse/issues/6105))
- Refactor code for calculating registration flows. ([\#6106](https://github.com/matrix-org/synapse/issues/6106))
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py43
-rw-r--r--synapse/handlers/account_data.py4
-rw-r--r--synapse/handlers/account_validity.py12
-rw-r--r--synapse/handlers/admin.py19
-rw-r--r--synapse/handlers/appservice.py2
-rw-r--r--synapse/handlers/auth.py173
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/device.py65
-rw-r--r--synapse/handlers/devicemessage.py30
-rw-r--r--synapse/handlers/e2e_keys.py52
-rw-r--r--synapse/handlers/e2e_room_keys.py28
-rw-r--r--synapse/handlers/events.py1
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/handlers/identity.py479
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py126
-rw-r--r--synapse/handlers/pagination.py17
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py170
-rw-r--r--synapse/handlers/room.py19
-rw-r--r--synapse/handlers/room_list.py29
-rw-r--r--synapse/handlers/room_member.py334
-rw-r--r--synapse/handlers/saml_handler.py106
-rw-r--r--synapse/handlers/stats.py315
-rw-r--r--synapse/handlers/sync.py31
-rw-r--r--synapse/handlers/typing.py2
-rw-r--r--synapse/handlers/ui_auth/__init__.py22
-rw-r--r--synapse/handlers/ui_auth/checkers.py247
30 files changed, 1626 insertions, 734 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c29c78bd65..d15c6282fb 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -45,6 +45,7 @@ class BaseHandler(object):
         self.state_handler = hs.get_state_handler()
         self.distributor = hs.get_distributor()
         self.ratelimiter = hs.get_ratelimiter()
+        self.admin_redaction_ratelimiter = hs.get_admin_redaction_ratelimiter()
         self.clock = hs.get_clock()
         self.hs = hs
 
@@ -53,7 +54,7 @@ class BaseHandler(object):
         self.event_builder_factory = hs.get_event_builder_factory()
 
     @defer.inlineCallbacks
-    def ratelimit(self, requester, update=True):
+    def ratelimit(self, requester, update=True, is_admin_redaction=False):
         """Ratelimits requests.
 
         Args:
@@ -62,6 +63,9 @@ class BaseHandler(object):
                 Set to False when doing multiple checks for one request (e.g.
                 to check up front if we would reject the request), and set to
                 True for the last call for a given request.
+            is_admin_redaction (bool): Whether this is a room admin/moderator
+                redacting an event. If so then we may apply different
+                ratelimits depending on config.
 
         Raises:
             LimitExceededError if the request should be ratelimited
@@ -90,16 +94,33 @@ class BaseHandler(object):
             messages_per_second = override.messages_per_second
             burst_count = override.burst_count
         else:
-            messages_per_second = self.hs.config.rc_message.per_second
-            burst_count = self.hs.config.rc_message.burst_count
-
-        allowed, time_allowed = self.ratelimiter.can_do_action(
-            user_id,
-            time_now,
-            rate_hz=messages_per_second,
-            burst_count=burst_count,
-            update=update,
-        )
+            # We default to different values if this is an admin redaction and
+            # the config is set
+            if is_admin_redaction and self.hs.config.rc_admin_redaction:
+                messages_per_second = self.hs.config.rc_admin_redaction.per_second
+                burst_count = self.hs.config.rc_admin_redaction.burst_count
+            else:
+                messages_per_second = self.hs.config.rc_message.per_second
+                burst_count = self.hs.config.rc_message.burst_count
+
+        if is_admin_redaction and self.hs.config.rc_admin_redaction:
+            # If we have separate config for admin redactions we use a separate
+            # ratelimiter
+            allowed, time_allowed = self.admin_redaction_ratelimiter.can_do_action(
+                user_id,
+                time_now,
+                rate_hz=messages_per_second,
+                burst_count=burst_count,
+                update=update,
+            )
+        else:
+            allowed, time_allowed = self.ratelimiter.can_do_action(
+                user_id,
+                time_now,
+                rate_hz=messages_per_second,
+                burst_count=burst_count,
+                update=update,
+            )
         if not allowed:
             raise LimitExceededError(
                 retry_after_ms=int(1000 * (time_allowed - time_now))
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 8acd9f9a83..38bc67191c 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -51,8 +51,8 @@ class AccountDataEventSource(object):
                     {"type": account_data_type, "content": content, "room_id": room_id}
                 )
 
-        return (results, current_stream_id)
+        return results, current_stream_id
 
     @defer.inlineCallbacks
     def get_pagination_rows(self, user, config, key):
-        return ([], config.to_id)
+        return [], config.to_id
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 34574f1a12..d04e0fe576 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -38,6 +38,7 @@ logger = logging.getLogger(__name__)
 class AccountValidityHandler(object):
     def __init__(self, hs):
         self.hs = hs
+        self.config = hs.config
         self.store = self.hs.get_datastore()
         self.sendmail = self.hs.get_sendmail()
         self.clock = self.hs.get_clock()
@@ -62,9 +63,14 @@ class AccountValidityHandler(object):
             self._raw_from = email.utils.parseaddr(self._from_string)[1]
 
             self._template_html, self._template_text = load_jinja2_templates(
-                config=self.hs.config,
-                template_html_name=self.hs.config.email_expiry_template_html,
-                template_text_name=self.hs.config.email_expiry_template_text,
+                self.config.email_template_dir,
+                [
+                    self.config.email_expiry_template_html,
+                    self.config.email_expiry_template_text,
+                ],
+                apply_format_ts_filter=True,
+                apply_mxc_to_http_filter=True,
+                public_baseurl=self.config.public_baseurl,
             )
 
             # Check the renewal emails to send and send them every 30min.
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 2f22f56ca4..1a87b58838 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -94,6 +94,25 @@ class AdminHandler(BaseHandler):
 
         return ret
 
+    def get_user_server_admin(self, user):
+        """
+        Get the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+        """
+        return self.store.is_server_admin(user)
+
+    def set_user_server_admin(self, user, admin):
+        """
+        Set the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+            admin (bool): whether or not the user should be an admin of this server
+        """
+        return self.store.set_server_admin(user, admin)
+
     @defer.inlineCallbacks
     def export_user_data(self, user_id, writer):
         """Write all data we have on the user to the given writer.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index d1a51df6f9..3e9b298154 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -294,12 +294,10 @@ class ApplicationServicesHandler(object):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             return False
-            return
 
         user_info = yield self.store.get_user_by_id(user_id)
         if user_info:
             return False
-            return
 
         # user not found; could be the AS though, so check.
         services = self.store.get_app_services()
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 0f3ebf7ef8..333eb30625 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -21,10 +21,8 @@ import unicodedata
 import attr
 import bcrypt
 import pymacaroons
-from canonicaljson import json
 
 from twisted.internet import defer
-from twisted.web.client import PartialDownloadError
 
 import synapse.util.stringutils as stringutils
 from synapse.api.constants import LoginType
@@ -38,6 +36,8 @@ from synapse.api.errors import (
     UserDeactivatedError,
 )
 from synapse.api.ratelimiting import Ratelimiter
+from synapse.handlers.ui_auth import INTERACTIVE_AUTH_CHECKERS
+from synapse.handlers.ui_auth.checkers import UserInteractiveAuthChecker
 from synapse.logging.context import defer_to_thread
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
@@ -57,13 +57,13 @@ class AuthHandler(BaseHandler):
             hs (synapse.server.HomeServer):
         """
         super(AuthHandler, self).__init__(hs)
-        self.checkers = {
-            LoginType.RECAPTCHA: self._check_recaptcha,
-            LoginType.EMAIL_IDENTITY: self._check_email_identity,
-            LoginType.MSISDN: self._check_msisdn,
-            LoginType.DUMMY: self._check_dummy_auth,
-            LoginType.TERMS: self._check_terms_auth,
-        }
+
+        self.checkers = {}  # type: dict[str, UserInteractiveAuthChecker]
+        for auth_checker_class in INTERACTIVE_AUTH_CHECKERS:
+            inst = auth_checker_class(hs)
+            if inst.is_enabled():
+                self.checkers[inst.AUTH_TYPE] = inst
+
         self.bcrypt_rounds = hs.config.bcrypt_rounds
 
         # This is not a cache per se, but a store of all current sessions that
@@ -157,8 +157,16 @@ class AuthHandler(BaseHandler):
 
         return params
 
+    def get_enabled_auth_types(self):
+        """Return the enabled user-interactive authentication types
+
+        Returns the UI-Auth types which are supported by the homeserver's current
+        config.
+        """
+        return self.checkers.keys()
+
     @defer.inlineCallbacks
-    def check_auth(self, flows, clientdict, clientip, password_servlet=False):
+    def check_auth(self, flows, clientdict, clientip):
         """
         Takes a dictionary sent by the client in the login / registration
         protocol and handles the User-Interactive Auth flow.
@@ -182,16 +190,6 @@ class AuthHandler(BaseHandler):
 
             clientip (str): The IP address of the client.
 
-            password_servlet (bool): Whether the request originated from
-                PasswordRestServlet.
-                XXX: This is a temporary hack to distinguish between checking
-                for threepid validations locally (in the case of password
-                resets) and using the identity server (in the case of binding
-                a 3PID during registration). Once we start using the
-                homeserver for both tasks, this distinction will no longer be
-                necessary.
-
-
         Returns:
             defer.Deferred[dict, dict, str]: a deferred tuple of
                 (creds, params, session_id).
@@ -247,9 +245,7 @@ class AuthHandler(BaseHandler):
         if "type" in authdict:
             login_type = authdict["type"]
             try:
-                result = yield self._check_auth_dict(
-                    authdict, clientip, password_servlet=password_servlet
-                )
+                result = yield self._check_auth_dict(authdict, clientip)
                 if result:
                     creds[login_type] = result
                     self._save_session(session)
@@ -280,7 +276,7 @@ class AuthHandler(BaseHandler):
                     creds,
                     list(clientdict),
                 )
-                return (creds, clientdict, session["id"])
+                return creds, clientdict, session["id"]
 
         ret = self._auth_dict_for_flows(flows, session)
         ret["completed"] = list(creds)
@@ -303,7 +299,7 @@ class AuthHandler(BaseHandler):
             sess["creds"] = {}
         creds = sess["creds"]
 
-        result = yield self.checkers[stagetype](authdict, clientip)
+        result = yield self.checkers[stagetype].check_auth(authdict, clientip)
         if result:
             creds[stagetype] = result
             self._save_session(sess)
@@ -356,7 +352,7 @@ class AuthHandler(BaseHandler):
         return sess.setdefault("serverdict", {}).get(key, default)
 
     @defer.inlineCallbacks
-    def _check_auth_dict(self, authdict, clientip, password_servlet=False):
+    def _check_auth_dict(self, authdict, clientip):
         """Attempt to validate the auth dict provided by a client
 
         Args:
@@ -374,11 +370,7 @@ class AuthHandler(BaseHandler):
         login_type = authdict["type"]
         checker = self.checkers.get(login_type)
         if checker is not None:
-            # XXX: Temporary workaround for having Synapse handle password resets
-            # See AuthHandler.check_auth for further details
-            res = yield checker(
-                authdict, clientip=clientip, password_servlet=password_servlet
-            )
+            res = yield checker.check_auth(authdict, clientip=clientip)
             return res
 
         # build a v1-login-style dict out of the authdict and fall back to the
@@ -391,119 +383,6 @@ class AuthHandler(BaseHandler):
         (canonical_id, callback) = yield self.validate_login(user_id, authdict)
         return canonical_id
 
-    @defer.inlineCallbacks
-    def _check_recaptcha(self, authdict, clientip, **kwargs):
-        try:
-            user_response = authdict["response"]
-        except KeyError:
-            # Client tried to provide captcha but didn't give the parameter:
-            # bad request.
-            raise LoginError(
-                400, "Captcha response is required", errcode=Codes.CAPTCHA_NEEDED
-            )
-
-        logger.info(
-            "Submitting recaptcha response %s with remoteip %s", user_response, clientip
-        )
-
-        # TODO: get this from the homeserver rather than creating a new one for
-        # each request
-        try:
-            client = self.hs.get_simple_http_client()
-            resp_body = yield client.post_urlencoded_get_json(
-                self.hs.config.recaptcha_siteverify_api,
-                args={
-                    "secret": self.hs.config.recaptcha_private_key,
-                    "response": user_response,
-                    "remoteip": clientip,
-                },
-            )
-        except PartialDownloadError as pde:
-            # Twisted is silly
-            data = pde.response
-            resp_body = json.loads(data)
-
-        if "success" in resp_body:
-            # Note that we do NOT check the hostname here: we explicitly
-            # intend the CAPTCHA to be presented by whatever client the
-            # user is using, we just care that they have completed a CAPTCHA.
-            logger.info(
-                "%s reCAPTCHA from hostname %s",
-                "Successful" if resp_body["success"] else "Failed",
-                resp_body.get("hostname"),
-            )
-            if resp_body["success"]:
-                return True
-        raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
-
-    def _check_email_identity(self, authdict, **kwargs):
-        return self._check_threepid("email", authdict, **kwargs)
-
-    def _check_msisdn(self, authdict, **kwargs):
-        return self._check_threepid("msisdn", authdict)
-
-    def _check_dummy_auth(self, authdict, **kwargs):
-        return defer.succeed(True)
-
-    def _check_terms_auth(self, authdict, **kwargs):
-        return defer.succeed(True)
-
-    @defer.inlineCallbacks
-    def _check_threepid(self, medium, authdict, password_servlet=False, **kwargs):
-        if "threepid_creds" not in authdict:
-            raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
-
-        threepid_creds = authdict["threepid_creds"]
-
-        identity_handler = self.hs.get_handlers().identity_handler
-
-        logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
-        if (
-            not password_servlet
-            or self.hs.config.email_password_reset_behaviour == "remote"
-        ):
-            threepid = yield identity_handler.threepid_from_creds(threepid_creds)
-        elif self.hs.config.email_password_reset_behaviour == "local":
-            row = yield self.store.get_threepid_validation_session(
-                medium,
-                threepid_creds["client_secret"],
-                sid=threepid_creds["sid"],
-                validated=True,
-            )
-
-            threepid = (
-                {
-                    "medium": row["medium"],
-                    "address": row["address"],
-                    "validated_at": row["validated_at"],
-                }
-                if row
-                else None
-            )
-
-            if row:
-                # Valid threepid returned, delete from the db
-                yield self.store.delete_threepid_session(threepid_creds["sid"])
-        else:
-            raise SynapseError(
-                400, "Password resets are not enabled on this homeserver"
-            )
-
-        if not threepid:
-            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
-
-        if threepid["medium"] != medium:
-            raise LoginError(
-                401,
-                "Expecting threepid of type '%s', got '%s'"
-                % (medium, threepid["medium"]),
-                errcode=Codes.UNAUTHORIZED,
-            )
-
-        threepid["threepid_creds"] = authdict["threepid_creds"]
-
-        return threepid
-
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
@@ -722,7 +601,7 @@ class AuthHandler(BaseHandler):
                 known_login_type = True
                 is_valid = yield provider.check_password(qualified_user_id, password)
                 if is_valid:
-                    return (qualified_user_id, None)
+                    return qualified_user_id, None
 
             if not hasattr(provider, "get_supported_login_types") or not hasattr(
                 provider, "check_auth"
@@ -766,7 +645,7 @@ class AuthHandler(BaseHandler):
             )
 
             if canonical_user_id:
-                return (canonical_user_id, None)
+                return canonical_user_id, None
 
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
@@ -816,7 +695,7 @@ class AuthHandler(BaseHandler):
                         result = (result, None)
                     return result
 
-        return (None, None)
+        return None, None
 
     @defer.inlineCallbacks
     def _check_local_password(self, user_id, password):
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 5f804d1f13..d83912c9a4 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -73,7 +73,9 @@ class DeactivateAccountHandler(BaseHandler):
         # unbinding
         identity_server_supports_unbinding = True
 
-        threepids = yield self.store.user_get_threepids(user_id)
+        # Retrieve the 3PIDs this user has bound to an identity server
+        threepids = yield self.store.user_get_bound_threepids(user_id)
+
         for threepid in threepids:
             try:
                 result = yield self._identity_handler.try_unbind_threepid(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5c1cf83c9d..71a8f33da3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
     HttpResponseException,
     RequestSendFailed,
 )
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_user(self, user_id):
         """
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
             defer.Deferred: list[dict[str, X]]: info on each device
         """
 
+        set_tag("user_id", user_id)
         device_map = yield self.store.get_devices_by_user(user_id)
 
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
         for device in devices:
             _update_device_from_client_ips(device, ips)
 
+        log_kv(device_map)
         return devices
 
+    @trace
     @defer.inlineCallbacks
     def get_device(self, user_id, device_id):
         """ Retrieve the given device
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
             raise errors.NotFoundError
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
         _update_device_from_client_ips(device, ips)
+
+        set_tag("device", device)
+        set_tag("ips", ips)
+
         return device
 
     @measure_func("device.get_user_ids_changed")
+    @trace
     @defer.inlineCallbacks
     def get_user_ids_changed(self, user_id, from_token):
         """Get list of users that have had the devices updated, or have newly
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
+
+        set_tag("user_id", user_id)
+        set_tag("from_token", from_token)
         now_room_key = yield self.store.get_room_events_max_id()
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
+                log_kv(
+                    {"event": "encountered empty previous state", "room_id": room_id}
+                )
                 for key, event_id in iteritems(current_state_ids):
                     etype, state_key = key
                     if etype != EventTypes.Member:
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
             possibly_joined = []
             possibly_left = []
 
-        return {"changed": list(possibly_joined), "left": list(possibly_left)}
+        result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+
+        log_kv(result)
+
+        return result
 
 
 class DeviceHandler(DeviceWorkerHandler):
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         raise errors.StoreError(500, "Couldn't generate a device ID.")
 
+    @trace
     @defer.inlineCallbacks
     def delete_device(self, user_id, device_id):
         """ Delete the given device
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
+                set_tag("error", True)
+                log_kv(
+                    {"reason": "User doesn't have device id.", "device_id": device_id}
+                )
                 pass
             else:
                 raise
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         yield self.notify_device_update(user_id, [device_id])
 
+    @trace
     @defer.inlineCallbacks
     def delete_all_devices_for_user(self, user_id, except_device_id=None):
         """Delete all of the user's devices
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
+                set_tag("error", True)
+                set_tag("reason", "User doesn't have that device id.")
                 pass
             else:
                 raise
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
             else:
                 raise
 
+    @trace
     @measure_func("notify_device_update")
     @defer.inlineCallbacks
     def notify_device_update(self, user_id, device_ids):
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
             hosts.update(get_domain_from_id(u) for u in users_who_share_room)
             hosts.discard(self.server_name)
 
+        set_tag("target_hosts", hosts)
+
         position = yield self.store.add_device_change_to_streams(
             user_id, device_ids, list(hosts)
         )
@@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler):
             )
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
+                log_kv({"message": "sent device update to host", "host": host})
 
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
@@ -451,12 +483,15 @@ class DeviceListUpdater(object):
             iterable=True,
         )
 
+    @trace
     @defer.inlineCallbacks
     def incoming_device_list_update(self, origin, edu_content):
         """Called on incoming device list update from federation. Responsible
         for parsing the EDU and adding to pending updates list.
         """
 
+        set_tag("origin", origin)
+        set_tag("edu_content", edu_content)
         user_id = edu_content.pop("user_id")
         device_id = edu_content.pop("device_id")
         stream_id = str(edu_content.pop("stream_id"))  # They may come as ints
@@ -471,12 +506,30 @@ class DeviceListUpdater(object):
                 device_id,
                 origin,
             )
+
+            set_tag("error", True)
+            log_kv(
+                {
+                    "message": "Got a device list update edu from a user and "
+                    "device which does not match the origin of the request.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             return
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
+            set_tag("error", True)
+            log_kv(
+                {
+                    "message": "Got an update from a user for which "
+                    "we don't share any rooms",
+                    "other user_id": user_id,
+                }
+            )
             logger.warning(
                 "Got device list update edu for %r/%r, but don't share a room",
                 user_id,
@@ -578,6 +631,7 @@ class DeviceListUpdater(object):
             request:
             https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
         """
+        log_kv({"message": "Doing resync to update device list."})
         # Fetch all devices for the user.
         origin = get_domain_from_id(user_id)
         try:
@@ -594,13 +648,20 @@ class DeviceListUpdater(object):
             # eventually become consistent.
             return
         except FederationDeniedError as e:
+            set_tag("error", True)
+            log_kv({"reason": "FederationDeniedError"})
             logger.info(e)
             return
-        except Exception:
+        except Exception as e:
             # TODO: Remember that we are now out of sync and try again
             # later
+            set_tag("error", True)
+            log_kv(
+                {"message": "Exception raised by federation request", "exception": e}
+            )
             logger.exception("Failed to handle device list update for %s", user_id)
             return
+        log_kv({"result": result})
         stream_id = result["stream_id"]
         devices = result["devices"]
 
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index e1ebb6346c..0043cbea17 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,9 +15,17 @@
 
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    log_kv,
+    set_tag,
+    start_active_span,
+)
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
@@ -78,7 +86,8 @@ class DeviceMessageHandler(object):
 
     @defer.inlineCallbacks
     def send_device_message(self, sender_user_id, message_type, messages):
-
+        set_tag("number_of_messages", len(messages))
+        set_tag("sender", sender_user_id)
         local_messages = {}
         remote_messages = {}
         for user_id, by_device in messages.items():
@@ -100,15 +109,21 @@ class DeviceMessageHandler(object):
 
         message_id = random_string(16)
 
+        context = get_active_span_text_map()
+
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            remote_edu_contents[destination] = {
-                "messages": messages,
-                "sender": sender_user_id,
-                "type": message_type,
-                "message_id": message_id,
-            }
+            with start_active_span("to_device_for_user"):
+                set_tag("destination", destination)
+                remote_edu_contents[destination] = {
+                    "messages": messages,
+                    "sender": sender_user_id,
+                    "type": message_type,
+                    "message_id": message_id,
+                    "org.matrix.opentracing_context": json.dumps(context),
+                }
 
+        log_kv({"local_messages": local_messages})
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
@@ -117,6 +132,7 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
+        log_kv({"remote_messages": remote_messages})
         for destination in remote_messages.keys():
             # Enqueue a new federation transaction to send the new
             # device messages to each remote destination.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1f90b0d278..056fb97acb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import unwrapFirstError
 from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
+    @trace
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         # First get local devices.
         failures = {}
         results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
+        @trace
         @defer.inlineCallbacks
         def do_remote_query(destination):
             """This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
+    @trace
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
+        set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
+                log_kv(
+                    {
+                        "message": "Requested a local key for a user which"
+                        " was not local to the homeserver",
+                        "user_id": user_id,
+                    }
+                )
+                set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
+        log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
+    @trace
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
+        @trace
         @defer.inlineCallbacks
         def claim_client_keys(destination):
+            set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
             ),
         )
 
+        log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
     @defer.inlineCallbacks
+    @tag_args
     def upload_keys_for_user(self, user_id, device_id, keys):
 
         time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
                 user_id,
                 time_now,
             )
+            log_kv(
+                {
+                    "message": "Updating device_keys for user.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             # TODO: Sign the JSON with the server key
             changed = yield self.store.set_e2e_device_keys(
                 user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
             if changed:
                 # Only notify about device updates *if* the keys actually changed
                 yield self.device_handler.notify_device_update(user_id, [device_id])
-
+        else:
+            log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
+            log_kv(
+                {
+                    "message": "Updating one_time_keys for device.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             yield self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
+        else:
+            log_kv(
+                {"message": "Did not update one_time_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
+        set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
     @defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
                     (algorithm, key_id, encode_canonical_json(key).decode("ascii"))
                 )
 
+        log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
         yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..a9d80f708c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.opentracing import log_kv, trace
 from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
+    @trace
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
+            log_kv(results)
             return results
 
+    @trace
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+    @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
             session_id(str): the session whose room_key we're setting
             room_key(dict): the room_key being set
         """
-
+        log_kv(
+            {
+                "message": "Trying to upload room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "user_id": user_id,
+            }
+        )
         # get the room_key for this particular row
         current_room_key = None
         try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
             )
         except StoreError as e:
             if e.code == 404:
-                pass
+                log_kv(
+                    {
+                        "message": "Room key not found.",
+                        "room_id": room_id,
+                        "user_id": user_id,
+                    }
+                )
             else:
                 raise
 
         if self._should_replace_room_key(current_room_key, room_key):
+            log_kv({"message": "Replacing room key."})
             yield self.store.set_e2e_room_key(
                 user_id, version, room_id, session_id, room_key
             )
+        else:
+            log_kv({"message": "Not replacing room_key."})
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
+    @trace
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
+    @trace
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
+    @trace
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 2f1f10a9af..5e748687e3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -167,7 +167,6 @@ class EventHandler(BaseHandler):
 
         if not event:
             return None
-            return
 
         users = yield self.store.get_users_in_room(event.room_id)
         is_peeking = user.to_string() not in users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c86903b98b..f72b81d419 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler):
                     ours = yield self.store.get_state_groups_ids(room_id, seen)
 
                     # state_maps is a list of mappings from (type, state_key) to event_id
-                    # type: list[dict[tuple[str, str], str]]
-                    state_maps = list(ours.values())
+                    state_maps = list(
+                        ours.values()
+                    )  # type: list[dict[tuple[str, str], str]]
 
                     # we don't need this any more, let's delete it.
                     del ours
@@ -1427,7 +1428,7 @@ class FederationHandler(BaseHandler):
         assert event.user_id == user_id
         assert event.state_key == user_id
         assert event.room_id == room_id
-        return (origin, event, format_ver)
+        return origin, event, format_ver
 
     @defer.inlineCallbacks
     @log_function
@@ -2529,12 +2530,17 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+    def on_exchange_third_party_invite_request(self, room_id, event_dict):
         """Handle an exchange_third_party_invite request from a remote server
 
         The remote server will call this when it wants to turn a 3pid invite
         into a normal m.room.member invite.
 
+        Args:
+            room_id (str): The ID of the room.
+
+            event_dict (dict[str, Any]): Dictionary containing the event body.
+
         Returns:
             Deferred: resolves (to None)
         """
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index d199521b58..6d42a1aed8 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -18,10 +18,12 @@
 """Utilities for interacting with Identity Servers"""
 
 import logging
+import urllib
 
 from canonicaljson import json
 
 from twisted.internet import defer
+from twisted.internet.error import TimeoutError
 
 from synapse.api.errors import (
     CodeMessageException,
@@ -29,6 +31,9 @@ from synapse.api.errors import (
     HttpResponseException,
     SynapseError,
 )
+from synapse.config.emailconfig import ThreepidBehaviour
+from synapse.http.client import SimpleHttpClient
+from synapse.util.stringutils import random_string
 
 from ._base import BaseHandler
 
@@ -39,90 +44,117 @@ class IdentityHandler(BaseHandler):
     def __init__(self, hs):
         super(IdentityHandler, self).__init__(hs)
 
-        self.http_client = hs.get_simple_http_client()
-        self.federation_http_client = hs.get_http_client()
-
-        self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
-        self.trust_any_id_server_just_for_testing_do_not_use = (
-            hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
+        self.http_client = SimpleHttpClient(hs)
+        # We create a blacklisting instance of SimpleHttpClient for contacting identity
+        # servers specified by clients
+        self.blacklisting_http_client = SimpleHttpClient(
+            hs, ip_blacklist=hs.config.federation_ip_range_blacklist
         )
-
-    def _should_trust_id_server(self, id_server):
-        if id_server not in self.trusted_id_servers:
-            if self.trust_any_id_server_just_for_testing_do_not_use:
-                logger.warn(
-                    "Trusting untrustworthy ID server %r even though it isn't"
-                    " in the trusted id list for testing because"
-                    " 'use_insecure_ssl_client_just_for_testing_do_not_use'"
-                    " is set in the config",
-                    id_server,
-                )
-            else:
-                return False
-        return True
+        self.federation_http_client = hs.get_http_client()
+        self.hs = hs
 
     @defer.inlineCallbacks
-    def threepid_from_creds(self, creds):
-        if "id_server" in creds:
-            id_server = creds["id_server"]
-        elif "idServer" in creds:
-            id_server = creds["idServer"]
-        else:
-            raise SynapseError(400, "No id_server in creds")
+    def threepid_from_creds(self, id_server, creds):
+        """
+        Retrieve and validate a threepid identifier from a "credentials" dictionary against a
+        given identity server
 
-        if "client_secret" in creds:
-            client_secret = creds["client_secret"]
-        elif "clientSecret" in creds:
-            client_secret = creds["clientSecret"]
-        else:
-            raise SynapseError(400, "No client_secret in creds")
+        Args:
+            id_server (str): The identity server to validate 3PIDs against. Must be a
+                complete URL including the protocol (http(s)://)
 
-        if not self._should_trust_id_server(id_server):
-            logger.warn(
-                "%s is not a trusted ID server: rejecting 3pid " + "credentials",
-                id_server,
+            creds (dict[str, str]): Dictionary containing the following keys:
+                * client_secret|clientSecret: A unique secret str provided by the client
+                * sid: The ID of the validation session
+
+        Returns:
+            Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
+                the /getValidated3pid endpoint of the Identity Service API, or None if the
+                threepid was not found
+        """
+        client_secret = creds.get("client_secret") or creds.get("clientSecret")
+        if not client_secret:
+            raise SynapseError(
+                400, "Missing param client_secret in creds", errcode=Codes.MISSING_PARAM
             )
-            return None
+        session_id = creds.get("sid")
+        if not session_id:
+            raise SynapseError(
+                400, "Missing param session_id in creds", errcode=Codes.MISSING_PARAM
+            )
+
+        query_params = {"sid": session_id, "client_secret": client_secret}
+
+        url = id_server + "/_matrix/identity/api/v1/3pid/getValidated3pid"
 
         try:
-            data = yield self.http_client.get_json(
-                "https://%s%s"
-                % (id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid"),
-                {"sid": creds["sid"], "client_secret": client_secret},
-            )
+            data = yield self.http_client.get_json(url, query_params)
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
         except HttpResponseException as e:
-            logger.info("getValidated3pid failed with Matrix error: %r", e)
-            raise e.to_synapse_error()
+            logger.info(
+                "%s returned %i for threepid validation for: %s",
+                id_server,
+                e.code,
+                creds,
+            )
+            return None
 
+        # Old versions of Sydent return a 200 http code even on a failed validation
+        # check. Thus, in addition to the HttpResponseException check above (which
+        # checks for non-200 errors), we need to make sure validation_session isn't
+        # actually an error, identified by the absence of a "medium" key
+        # See https://github.com/matrix-org/sydent/issues/215 for details
         if "medium" in data:
             return data
+
+        logger.info("%s reported non-validated threepid: %s", id_server, creds)
         return None
 
     @defer.inlineCallbacks
-    def bind_threepid(self, creds, mxid):
-        logger.debug("binding threepid %r to %s", creds, mxid)
-        data = None
-
-        if "id_server" in creds:
-            id_server = creds["id_server"]
-        elif "idServer" in creds:
-            id_server = creds["idServer"]
-        else:
-            raise SynapseError(400, "No id_server in creds")
+    def bind_threepid(
+        self, client_secret, sid, mxid, id_server, id_access_token=None, use_v2=True
+    ):
+        """Bind a 3PID to an identity server
+
+        Args:
+            client_secret (str): A unique secret provided by the client
+
+            sid (str): The ID of the validation session
+
+            mxid (str): The MXID to bind the 3PID to
+
+            id_server (str): The domain of the identity server to query
+
+            id_access_token (str): The access token to authenticate to the identity
+                server with, if necessary. Required if use_v2 is true
+
+            use_v2 (bool): Whether to use v2 Identity Service API endpoints. Defaults to True
 
-        if "client_secret" in creds:
-            client_secret = creds["client_secret"]
-        elif "clientSecret" in creds:
-            client_secret = creds["clientSecret"]
+        Returns:
+            Deferred[dict]: The response from the identity server
+        """
+        logger.debug("Proxying threepid bind request for %s to %s", mxid, id_server)
+
+        # If an id_access_token is not supplied, force usage of v1
+        if id_access_token is None:
+            use_v2 = False
+
+        # Decide which API endpoint URLs to use
+        headers = {}
+        bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
+        if use_v2:
+            bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
+            headers["Authorization"] = create_id_access_token_header(id_access_token)
         else:
-            raise SynapseError(400, "No client_secret in creds")
+            bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
 
         try:
-            data = yield self.http_client.post_json_get_json(
-                "https://%s%s" % (id_server, "/_matrix/identity/api/v1/3pid/bind"),
-                {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid},
+            # Use the blacklisting http client as this call is only to identity servers
+            # provided by a client
+            data = yield self.blacklisting_http_client.post_json_get_json(
+                bind_url, bind_data, headers=headers
             )
-            logger.debug("bound threepid %r to %s", creds, mxid)
 
             # Remember where we bound the threepid
             yield self.store.add_user_bound_threepid(
@@ -131,13 +163,28 @@ class IdentityHandler(BaseHandler):
                 address=data["address"],
                 id_server=id_server,
             )
+
+            return data
+        except HttpResponseException as e:
+            if e.code != 404 or not use_v2:
+                logger.error("3PID bind failed with Matrix error: %r", e)
+                raise e.to_synapse_error()
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
         except CodeMessageException as e:
             data = json.loads(e.msg)  # XXX WAT?
-        return data
+            return data
+
+        logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
+        res = yield self.bind_threepid(
+            client_secret, sid, mxid, id_server, id_access_token, use_v2=False
+        )
+        return res
 
     @defer.inlineCallbacks
     def try_unbind_threepid(self, mxid, threepid):
-        """Removes a binding from an identity server
+        """Attempt to remove a 3PID from an identity server, or if one is not provided, all
+        identity servers we're aware the binding is present on
 
         Args:
             mxid (str): Matrix user ID of binding to be removed
@@ -188,6 +235,8 @@ class IdentityHandler(BaseHandler):
             server doesn't support unbinding
         """
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+        url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
+
         content = {
             "mxid": mxid,
             "threepid": {"medium": threepid["medium"], "address": threepid["address"]},
@@ -199,14 +248,18 @@ class IdentityHandler(BaseHandler):
         auth_headers = self.federation_http_client.build_auth_headers(
             destination=None,
             method="POST",
-            url_bytes="/_matrix/identity/api/v1/3pid/unbind".encode("ascii"),
+            url_bytes=url_bytes,
             content=content,
             destination_is=id_server,
         )
         headers = {b"Authorization": auth_headers}
 
         try:
-            yield self.http_client.post_json_get_json(url, content, headers)
+            # Use the blacklisting http client as this call is only to identity servers
+            # provided by a client
+            yield self.blacklisting_http_client.post_json_get_json(
+                url, content, headers
+            )
             changed = True
         except HttpResponseException as e:
             changed = False
@@ -215,7 +268,9 @@ class IdentityHandler(BaseHandler):
                 logger.warn("Received %d response while unbinding threepid", e.code)
             else:
                 logger.error("Failed to unbind threepid on identity server: %s", e)
-                raise SynapseError(502, "Failed to contact identity server")
+                raise SynapseError(500, "Failed to contact identity server")
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
 
         yield self.store.remove_user_bound_threepid(
             user_id=mxid,
@@ -227,58 +282,310 @@ class IdentityHandler(BaseHandler):
         return changed
 
     @defer.inlineCallbacks
+    def send_threepid_validation(
+        self,
+        email_address,
+        client_secret,
+        send_attempt,
+        send_email_func,
+        next_link=None,
+    ):
+        """Send a threepid validation email for password reset or
+        registration purposes
+
+        Args:
+            email_address (str): The user's email address
+            client_secret (str): The provided client secret
+            send_attempt (int): Which send attempt this is
+            send_email_func (func): A function that takes an email address, token,
+                                    client_secret and session_id, sends an email
+                                    and returns a Deferred.
+            next_link (str|None): The URL to redirect the user to after validation
+
+        Returns:
+            The new session_id upon success
+
+        Raises:
+            SynapseError is an error occurred when sending the email
+        """
+        # Check that this email/client_secret/send_attempt combo is new or
+        # greater than what we've seen previously
+        session = yield self.store.get_threepid_validation_session(
+            "email", client_secret, address=email_address, validated=False
+        )
+
+        # Check to see if a session already exists and that it is not yet
+        # marked as validated
+        if session and session.get("validated_at") is None:
+            session_id = session["session_id"]
+            last_send_attempt = session["last_send_attempt"]
+
+            # Check that the send_attempt is higher than previous attempts
+            if send_attempt <= last_send_attempt:
+                # If not, just return a success without sending an email
+                return session_id
+        else:
+            # An non-validated session does not exist yet.
+            # Generate a session id
+            session_id = random_string(16)
+
+        if next_link:
+            # Manipulate the next_link to add the sid, because the caller won't get
+            # it until we send a response, by which time we've sent the mail.
+            if "?" in next_link:
+                next_link += "&"
+            else:
+                next_link += "?"
+            next_link += "sid=" + urllib.parse.quote(session_id)
+
+        # Generate a new validation token
+        token = random_string(32)
+
+        # Send the mail with the link containing the token, client_secret
+        # and session_id
+        try:
+            yield send_email_func(email_address, token, client_secret, session_id)
+        except Exception:
+            logger.exception(
+                "Error sending threepid validation email to %s", email_address
+            )
+            raise SynapseError(500, "An error was encountered when sending the email")
+
+        token_expires = (
+            self.hs.clock.time_msec() + self.hs.config.email_validation_token_lifetime
+        )
+
+        yield self.store.start_or_continue_validation_session(
+            "email",
+            email_address,
+            session_id,
+            client_secret,
+            send_attempt,
+            next_link,
+            token,
+            token_expires,
+        )
+
+        return session_id
+
+    @defer.inlineCallbacks
     def requestEmailToken(
         self, id_server, email, client_secret, send_attempt, next_link=None
     ):
-        if not self._should_trust_id_server(id_server):
-            raise SynapseError(
-                400, "Untrusted ID server '%s'" % id_server, Codes.SERVER_NOT_TRUSTED
-            )
+        """
+        Request an external server send an email on our behalf for the purposes of threepid
+        validation.
+
+        Args:
+            id_server (str): The identity server to proxy to
+            email (str): The email to send the message to
+            client_secret (str): The unique client_secret sends by the user
+            send_attempt (int): Which attempt this is
+            next_link: A link to redirect the user to once they submit the token
 
+        Returns:
+            The json response body from the server
+        """
         params = {
             "email": email,
             "client_secret": client_secret,
             "send_attempt": send_attempt,
         }
-
         if next_link:
-            params.update({"next_link": next_link})
+            params["next_link"] = next_link
+
+        if self.hs.config.using_identity_server_from_trusted_list:
+            # Warn that a deprecated config option is in use
+            logger.warn(
+                'The config option "trust_identity_server_for_password_resets" '
+                'has been replaced by "account_threepid_delegate". '
+                "Please consult the sample config at docs/sample_config.yaml for "
+                "details and update your config file."
+            )
 
         try:
             data = yield self.http_client.post_json_get_json(
-                "https://%s%s"
-                % (id_server, "/_matrix/identity/api/v1/validate/email/requestToken"),
+                id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
                 params,
             )
             return data
         except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e.to_synapse_error()
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
 
     @defer.inlineCallbacks
     def requestMsisdnToken(
-        self, id_server, country, phone_number, client_secret, send_attempt, **kwargs
+        self,
+        id_server,
+        country,
+        phone_number,
+        client_secret,
+        send_attempt,
+        next_link=None,
     ):
-        if not self._should_trust_id_server(id_server):
-            raise SynapseError(
-                400, "Untrusted ID server '%s'" % id_server, Codes.SERVER_NOT_TRUSTED
-            )
+        """
+        Request an external server send an SMS message on our behalf for the purposes of
+        threepid validation.
+        Args:
+            id_server (str): The identity server to proxy to
+            country (str): The country code of the phone number
+            phone_number (str): The number to send the message to
+            client_secret (str): The unique client_secret sends by the user
+            send_attempt (int): Which attempt this is
+            next_link: A link to redirect the user to once they submit the token
 
+        Returns:
+            The json response body from the server
+        """
         params = {
             "country": country,
             "phone_number": phone_number,
             "client_secret": client_secret,
             "send_attempt": send_attempt,
         }
-        params.update(kwargs)
+        if next_link:
+            params["next_link"] = next_link
+
+        if self.hs.config.using_identity_server_from_trusted_list:
+            # Warn that a deprecated config option is in use
+            logger.warn(
+                'The config option "trust_identity_server_for_password_resets" '
+                'has been replaced by "account_threepid_delegate". '
+                "Please consult the sample config at docs/sample_config.yaml for "
+                "details and update your config file."
+            )
 
         try:
             data = yield self.http_client.post_json_get_json(
-                "https://%s%s"
-                % (id_server, "/_matrix/identity/api/v1/validate/msisdn/requestToken"),
+                id_server + "/_matrix/identity/api/v1/validate/msisdn/requestToken",
                 params,
             )
-            return data
         except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e.to_synapse_error()
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+
+        assert self.hs.config.public_baseurl
+
+        # we need to tell the client to send the token back to us, since it doesn't
+        # otherwise know where to send it, so add submit_url response parameter
+        # (see also MSC2078)
+        data["submit_url"] = (
+            self.hs.config.public_baseurl
+            + "_matrix/client/unstable/add_threepid/msisdn/submit_token"
+        )
+        return data
+
+    @defer.inlineCallbacks
+    def validate_threepid_session(self, client_secret, sid):
+        """Validates a threepid session with only the client secret and session ID
+        Tries validating against any configured account_threepid_delegates as well as locally.
+
+        Args:
+            client_secret (str): A secret provided by the client
+
+            sid (str): The ID of the session
+
+        Returns:
+            Dict[str, str|int] if validation was successful, otherwise None
+        """
+        # XXX: We shouldn't need to keep wrapping and unwrapping this value
+        threepid_creds = {"client_secret": client_secret, "sid": sid}
+
+        # We don't actually know which medium this 3PID is. Thus we first assume it's email,
+        # and if validation fails we try msisdn
+        validation_session = None
+
+        # Try to validate as email
+        if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
+            # Ask our delegated email identity server
+            validation_session = yield self.threepid_from_creds(
+                self.hs.config.account_threepid_delegate_email, threepid_creds
+            )
+        elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
+            # Get a validated session matching these details
+            validation_session = yield self.store.get_threepid_validation_session(
+                "email", client_secret, sid=sid, validated=True
+            )
+
+        if validation_session:
+            return validation_session
+
+        # Try to validate as msisdn
+        if self.hs.config.account_threepid_delegate_msisdn:
+            # Ask our delegated msisdn identity server
+            validation_session = yield self.threepid_from_creds(
+                self.hs.config.account_threepid_delegate_msisdn, threepid_creds
+            )
+
+        return validation_session
+
+    @defer.inlineCallbacks
+    def proxy_msisdn_submit_token(self, id_server, client_secret, sid, token):
+        """Proxy a POST submitToken request to an identity server for verification purposes
+
+        Args:
+            id_server (str): The identity server URL to contact
+
+            client_secret (str): Secret provided by the client
+
+            sid (str): The ID of the session
+
+            token (str): The verification token
+
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
+        Returns:
+            Deferred[dict]: The response dict from the identity server
+        """
+        body = {"client_secret": client_secret, "sid": sid, "token": token}
+
+        try:
+            return (
+                yield self.http_client.post_json_get_json(
+                    id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
+                    body,
+                )
+            )
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+        except HttpResponseException as e:
+            logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
+            raise SynapseError(400, "Error contacting the identity server")
+
+
+def create_id_access_token_header(id_access_token):
+    """Create an Authorization header for passing to SimpleHttpClient as the header value
+    of an HTTP request.
+
+    Args:
+        id_access_token (str): An identity server access token.
+
+    Returns:
+        list[str]: The ascii-encoded bearer token encased in a list.
+    """
+    # Prefix with Bearer
+    bearer_token = "Bearer %s" % id_access_token
+
+    # Encode headers to standard ascii
+    bearer_token.encode("ascii")
+
+    # Return as a list as that's how SimpleHttpClient takes header values
+    return [bearer_token]
+
+
+class LookupAlgorithm:
+    """
+    Supported hashing algorithms when performing a 3PID lookup.
+
+    SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
+        encoding
+    NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
+    """
+
+    SHA256 = "sha256"
+    NONE = "none"
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 42d6650ed9..f991efeee3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -449,8 +449,7 @@ class InitialSyncHandler(BaseHandler):
             #  * The user is a guest user, and has joined the room
             # else it will throw.
             member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            return (member_event.membership, member_event.event_id)
-            return
+            return member_event.membership, member_event.event_id
         except AuthError:
             visibility = yield self.state_handler.get_current_state(
                 room_id, EventTypes.RoomHistoryVisibility, ""
@@ -459,8 +458,7 @@ class InitialSyncHandler(BaseHandler):
                 visibility
                 and visibility.content["history_visibility"] == "world_readable"
             ):
-                return (Membership.JOIN, None)
-                return
+                return Membership.JOIN, None
             raise AuthError(
                 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
             )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a5e23c4caf..0f8cce8ffe 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from twisted.internet import defer
 from twisted.internet.defer import succeed
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes
+from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -222,6 +222,13 @@ class MessageHandler(object):
         }
 
 
+# The duration (in ms) after which rooms should be removed
+# `_rooms_to_exclude_from_dummy_event_insertion` (with the effect that we will try
+# to generate a dummy event for them once more)
+#
+_DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
+
+
 class EventCreationHandler(object):
     def __init__(self, hs):
         self.hs = hs
@@ -258,6 +265,13 @@ class EventCreationHandler(object):
             self.config.block_events_without_consent_error
         )
 
+        # Rooms which should be excluded from dummy insertion. (For instance,
+        # those without local users who can send events into the room).
+        #
+        # map from room id to time-of-last-attempt.
+        #
+        self._rooms_to_exclude_from_dummy_event_insertion = {}  # type: dict[str, int]
+
         # we need to construct a ConsentURIBuilder here, as it checks that the necessary
         # config options, but *only* if we have a configuration for which we are
         # going to need it.
@@ -469,6 +483,9 @@ class EventCreationHandler(object):
 
         u = yield self.store.get_user_by_id(user_id)
         assert u is not None
+        if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
+            # support and bot users are not required to consent
+            return
         if u["appservice_id"] is not None:
             # users registered by an appservice are exempt
             return
@@ -726,7 +743,27 @@ class EventCreationHandler(object):
         assert not self.config.worker_app
 
         if ratelimit:
-            yield self.base_handler.ratelimit(requester)
+            # We check if this is a room admin redacting an event so that we
+            # can apply different ratelimiting. We do this by simply checking
+            # it's not a self-redaction (to avoid having to look up whether the
+            # user is actually admin or not).
+            is_admin_redaction = False
+            if event.type == EventTypes.Redaction:
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=True,
+                )
+
+                is_admin_redaction = (
+                    original_event and event.sender != original_event.sender
+                )
+
+            yield self.base_handler.ratelimit(
+                requester, is_admin_redaction=is_admin_redaction
+            )
 
         yield self.base_handler.maybe_kick_guest_users(event, context)
 
@@ -865,9 +902,11 @@ class EventCreationHandler(object):
         """Background task to send dummy events into rooms that have a large
         number of extremities
         """
-
+        self._expire_rooms_to_exclude_from_dummy_event_insertion()
         room_ids = yield self.store.get_rooms_with_many_extremities(
-            min_count=10, limit=5
+            min_count=10,
+            limit=5,
+            room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(),
         )
 
         for room_id in room_ids:
@@ -881,32 +920,61 @@ class EventCreationHandler(object):
             members = yield self.state.get_current_users_in_room(
                 room_id, latest_event_ids=latest_event_ids
             )
+            dummy_event_sent = False
+            for user_id in members:
+                if not self.hs.is_mine_id(user_id):
+                    continue
+                requester = create_requester(user_id)
+                try:
+                    event, context = yield self.create_event(
+                        requester,
+                        {
+                            "type": "org.matrix.dummy_event",
+                            "content": {},
+                            "room_id": room_id,
+                            "sender": user_id,
+                        },
+                        prev_events_and_hashes=prev_events_and_hashes,
+                    )
 
-            user_id = None
-            for member in members:
-                if self.hs.is_mine_id(member):
-                    user_id = member
-                    break
-
-            if not user_id:
-                # We don't have a joined user.
-                # TODO: We should do something here to stop the room from
-                # appearing next time.
-                continue
+                    event.internal_metadata.proactively_send = False
 
-            requester = create_requester(user_id)
+                    yield self.send_nonmember_event(
+                        requester, event, context, ratelimit=False
+                    )
+                    dummy_event_sent = True
+                    break
+                except ConsentNotGivenError:
+                    logger.info(
+                        "Failed to send dummy event into room %s for user %s due to "
+                        "lack of consent. Will try another user" % (room_id, user_id)
+                    )
+                except AuthError:
+                    logger.info(
+                        "Failed to send dummy event into room %s for user %s due to "
+                        "lack of power. Will try another user" % (room_id, user_id)
+                    )
 
-            event, context = yield self.create_event(
-                requester,
-                {
-                    "type": "org.matrix.dummy_event",
-                    "content": {},
-                    "room_id": room_id,
-                    "sender": user_id,
-                },
-                prev_events_and_hashes=prev_events_and_hashes,
+            if not dummy_event_sent:
+                # Did not find a valid user in the room, so remove from future attempts
+                # Exclusion is time limited, so the room will be rechecked in the future
+                # dependent on _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+                logger.info(
+                    "Failed to send dummy event into room %s. Will exclude it from "
+                    "future attempts until cache expires" % (room_id,)
+                )
+                now = self.clock.time_msec()
+                self._rooms_to_exclude_from_dummy_event_insertion[room_id] = now
+
+    def _expire_rooms_to_exclude_from_dummy_event_insertion(self):
+        expire_before = self.clock.time_msec() - _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY
+        to_expire = set()
+        for room_id, time in self._rooms_to_exclude_from_dummy_event_insertion.items():
+            if time < expire_before:
+                to_expire.add(room_id)
+        for room_id in to_expire:
+            logger.debug(
+                "Expiring room id %s from dummy event insertion exclusion cache",
+                room_id,
             )
-
-            event.internal_metadata.proactively_send = False
-
-            yield self.send_nonmember_event(requester, event, context, ratelimit=False)
+            del self._rooms_to_exclude_from_dummy_event_insertion[room_id]
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d83aab3f74..5744f4579d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -70,6 +70,7 @@ class PaginationHandler(object):
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
+        self._server_name = hs.hostname
 
         self.pagination_lock = ReadWriteLock()
         self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
         """
         return self._purges_by_id.get(purge_id)
 
+    async def purge_room(self, room_id):
+        """Purge the given room from the database"""
+        with (await self.pagination_lock.write(room_id)):
+            # check we know about the room
+            await self.store.get_room_version(room_id)
+
+            # first check that we have no users in this room
+            joined = await defer.maybeDeferred(
+                self.store.is_host_joined, room_id, self._server_name
+            )
+
+            if joined:
+                raise SynapseError(400, "Users are still joined to this room")
+
+            await self.store.purge_room(room_id)
+
     @defer.inlineCallbacks
     def get_messages(
         self,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 94a9ca0357..053cf66b28 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -255,7 +255,7 @@ class PresenceHandler(object):
         self.unpersisted_users_changes = set()
 
         if unpersisted:
-            logger.info("Persisting %d upersisted presence updates", len(unpersisted))
+            logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
             yield self.store.update_presence(
                 [self.user_to_current_state[user_id] for user_id in unpersisted]
             )
@@ -1032,7 +1032,7 @@ class PresenceEventSource(object):
                 #
                 # Hence this guard where we just return nothing so that the sync
                 # doesn't return. C.f. #5503.
-                return ([], max_token)
+                return [], max_token
 
             presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
@@ -1279,7 +1279,7 @@ def get_interested_parties(store, states):
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)
 
-    return (room_ids_to_states, users_to_states)
+    return room_ids_to_states, users_to_states
 
 
 @defer.inlineCallbacks
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 2cc237e6a5..8690f69d45 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -34,7 +34,7 @@ from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
-MAX_DISPLAYNAME_LEN = 100
+MAX_DISPLAYNAME_LEN = 256
 MAX_AVATAR_URL_LEN = 1000
 
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 73973502a4..6854c751a6 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -148,7 +148,7 @@ class ReceiptEventSource(object):
         to_key = yield self.get_current_key()
 
         if from_key == to_key:
-            return ([], to_key)
+            return [], to_key
 
         events = yield self.store.get_linearized_receipts_for_rooms(
             room_ids, from_key=from_key, to_key=to_key
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4631fab94e..06bd03b77c 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,13 +24,11 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     ConsentNotGivenError,
-    InvalidCaptchaError,
     LimitExceededError,
     RegistrationError,
     SynapseError,
 )
 from synapse.config.server import is_threepid_reserved
-from synapse.http.client import CaptchaServerHttpClient
 from synapse.http.servlet import assert_params_in_dict
 from synapse.replication.http.login import RegisterDeviceReplicationServlet
 from synapse.replication.http.register import (
@@ -39,7 +37,6 @@ from synapse.replication.http.register import (
 )
 from synapse.types import RoomAlias, RoomID, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
-from synapse.util.threepids import check_3pid_allowed
 
 from ._base import BaseHandler
 
@@ -59,7 +56,6 @@ class RegistrationHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
-        self.captcha_client = CaptchaServerHttpClient(hs)
         self.identity_handler = self.hs.get_handlers().identity_handler
         self.ratelimiter = hs.get_registration_ratelimiter()
 
@@ -279,16 +275,12 @@ class RegistrationHandler(BaseHandler):
         fake_requester = create_requester(user_id)
 
         # try to create the room if we're the first real user on the server. Note
-        # that an auto-generated support user is not a real user and will never be
+        # that an auto-generated support or bot user is not a real user and will never be
         # the user to create the room
         should_auto_create_rooms = False
-        is_support = yield self.store.is_support_user(user_id)
-        # There is an edge case where the first user is the support user, then
-        # the room is never created, though this seems unlikely and
-        # recoverable from given the support user being involved in the first
-        # place.
-        if self.hs.config.autocreate_auto_join_rooms and not is_support:
-            count = yield self.store.count_all_users()
+        is_real_user = yield self.store.is_real_user(user_id)
+        if self.hs.config.autocreate_auto_join_rooms and is_real_user:
+            count = yield self.store.count_real_users()
             should_auto_create_rooms = count == 1
         for r in self.hs.config.auto_join_rooms:
             logger.info("Auto-joining %s to %s", user_id, r)
@@ -362,70 +354,6 @@ class RegistrationHandler(BaseHandler):
         )
         return user_id
 
-    @defer.inlineCallbacks
-    def check_recaptcha(self, ip, private_key, challenge, response):
-        """
-        Checks a recaptcha is correct.
-
-        Used only by c/s api v1
-        """
-
-        captcha_response = yield self._validate_captcha(
-            ip, private_key, challenge, response
-        )
-        if not captcha_response["valid"]:
-            logger.info(
-                "Invalid captcha entered from %s. Error: %s",
-                ip,
-                captcha_response["error_url"],
-            )
-            raise InvalidCaptchaError(error_url=captcha_response["error_url"])
-        else:
-            logger.info("Valid captcha entered from %s", ip)
-
-    @defer.inlineCallbacks
-    def register_email(self, threepidCreds):
-        """
-        Registers emails with an identity server.
-
-        Used only by c/s api v1
-        """
-
-        for c in threepidCreds:
-            logger.info(
-                "validating threepidcred sid %s on id server %s",
-                c["sid"],
-                c["idServer"],
-            )
-            try:
-                threepid = yield self.identity_handler.threepid_from_creds(c)
-            except Exception:
-                logger.exception("Couldn't validate 3pid")
-                raise RegistrationError(400, "Couldn't validate 3pid")
-
-            if not threepid:
-                raise RegistrationError(400, "Couldn't validate 3pid")
-            logger.info(
-                "got threepid with medium '%s' and address '%s'",
-                threepid["medium"],
-                threepid["address"],
-            )
-
-            if not check_3pid_allowed(self.hs, threepid["medium"], threepid["address"]):
-                raise RegistrationError(403, "Third party identifier is not allowed")
-
-    @defer.inlineCallbacks
-    def bind_emails(self, user_id, threepidCreds):
-        """Links emails with a user ID and informs an identity server.
-
-        Used only by c/s api v1
-        """
-
-        # Now we have a matrix ID, bind it to the threepids we were given
-        for c in threepidCreds:
-            # XXX: This should be a deferred list, shouldn't it?
-            yield self.identity_handler.bind_threepid(c, user_id)
-
     def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
         # don't allow people to register the server notices mxid
         if self._server_notices_mxid is not None:
@@ -464,44 +392,7 @@ class RegistrationHandler(BaseHandler):
         return str(id)
 
     @defer.inlineCallbacks
-    def _validate_captcha(self, ip_addr, private_key, challenge, response):
-        """Validates the captcha provided.
-
-        Used only by c/s api v1
-
-        Returns:
-            dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
-
-        """
-        response = yield self._submit_captcha(ip_addr, private_key, challenge, response)
-        # parse Google's response. Lovely format..
-        lines = response.split("\n")
-        json = {
-            "valid": lines[0] == "true",
-            "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?"
-            + "error=%s" % lines[1],
-        }
-        return json
-
-    @defer.inlineCallbacks
-    def _submit_captcha(self, ip_addr, private_key, challenge, response):
-        """
-        Used only by c/s api v1
-        """
-        data = yield self.captcha_client.post_urlencoded_get_raw(
-            "http://www.recaptcha.net:80/recaptcha/api/verify",
-            args={
-                "privatekey": private_key,
-                "remoteip": ip_addr,
-                "challenge": challenge,
-                "response": response,
-            },
-        )
-        return data
-
-    @defer.inlineCallbacks
     def _join_user_to_room(self, requester, room_identifier):
-        room_id = None
         room_member_handler = self.hs.get_room_member_handler()
         if RoomID.is_valid(room_identifier):
             room_id = room_identifier
@@ -622,7 +513,7 @@ class RegistrationHandler(BaseHandler):
                 initial_display_name=initial_display_name,
                 is_guest=is_guest,
             )
-            return (r["device_id"], r["access_token"])
+            return r["device_id"], r["access_token"]
 
         valid_until_ms = None
         if self.session_lifetime is not None:
@@ -648,9 +539,7 @@ class RegistrationHandler(BaseHandler):
         return (device_id, access_token)
 
     @defer.inlineCallbacks
-    def post_registration_actions(
-        self, user_id, auth_result, access_token, bind_email, bind_msisdn
-    ):
+    def post_registration_actions(self, user_id, auth_result, access_token):
         """A user has completed registration
 
         Args:
@@ -659,18 +548,10 @@ class RegistrationHandler(BaseHandler):
                 registered user.
             access_token (str|None): The access token of the newly logged in
                 device, or None if `inhibit_login` enabled.
-            bind_email (bool): Whether to bind the email with the identity
-                server.
-            bind_msisdn (bool): Whether to bind the msisdn with the identity
-                server.
         """
         if self.hs.config.worker_app:
             yield self._post_registration_client(
-                user_id=user_id,
-                auth_result=auth_result,
-                access_token=access_token,
-                bind_email=bind_email,
-                bind_msisdn=bind_msisdn,
+                user_id=user_id, auth_result=auth_result, access_token=access_token
             )
             return
 
@@ -683,13 +564,11 @@ class RegistrationHandler(BaseHandler):
             ):
                 yield self.store.upsert_monthly_active_user(user_id)
 
-            yield self._register_email_threepid(
-                user_id, threepid, access_token, bind_email
-            )
+            yield self._register_email_threepid(user_id, threepid, access_token)
 
         if auth_result and LoginType.MSISDN in auth_result:
             threepid = auth_result[LoginType.MSISDN]
-            yield self._register_msisdn_threepid(user_id, threepid, bind_msisdn)
+            yield self._register_msisdn_threepid(user_id, threepid)
 
         if auth_result and LoginType.TERMS in auth_result:
             yield self._on_user_consented(user_id, self.hs.config.user_consent_version)
@@ -708,14 +587,12 @@ class RegistrationHandler(BaseHandler):
         yield self.post_consent_actions(user_id)
 
     @defer.inlineCallbacks
-    def _register_email_threepid(self, user_id, threepid, token, bind_email):
+    def _register_email_threepid(self, user_id, threepid, token):
         """Add an email address as a 3pid identifier
 
         Also adds an email pusher for the email address, if configured in the
         HS config
 
-        Also optionally binds emails to the given user_id on the identity server
-
         Must be called on master.
 
         Args:
@@ -723,8 +600,6 @@ class RegistrationHandler(BaseHandler):
             threepid (object): m.login.email.identity auth response
             token (str|None): access_token for the user, or None if not logged
                 in.
-            bind_email (bool): true if the client requested the email to be
-                bound at the identity server
         Returns:
             defer.Deferred:
         """
@@ -766,29 +641,15 @@ class RegistrationHandler(BaseHandler):
                 data={},
             )
 
-        if bind_email:
-            logger.info("bind_email specified: binding")
-            logger.debug("Binding emails %s to %s" % (threepid, user_id))
-            yield self.identity_handler.bind_threepid(
-                threepid["threepid_creds"], user_id
-            )
-        else:
-            logger.info("bind_email not specified: not binding email")
-
     @defer.inlineCallbacks
-    def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
+    def _register_msisdn_threepid(self, user_id, threepid):
         """Add a phone number as a 3pid identifier
 
-        Also optionally binds msisdn to the given user_id on the identity server
-
         Must be called on master.
 
         Args:
             user_id (str): id of user
             threepid (object): m.login.msisdn auth response
-            token (str): access_token for the user
-            bind_email (bool): true if the client requested the email to be
-                bound at the identity server
         Returns:
             defer.Deferred:
         """
@@ -804,12 +665,3 @@ class RegistrationHandler(BaseHandler):
         yield self._auth_handler.add_threepid(
             user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
         )
-
-        if bind_msisdn:
-            logger.info("bind_msisdn specified: binding")
-            logger.debug("Binding msisdn %s to %s", threepid, user_id)
-            yield self.identity_handler.bind_threepid(
-                threepid["threepid_creds"], user_id
-            )
-        else:
-            logger.info("bind_msisdn not specified: not binding msisdn")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5caa90c3b7..970be3c846 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -560,6 +560,18 @@ class RoomCreationHandler(BaseHandler):
 
         yield self.event_creation_handler.assert_accepted_privacy_policy(requester)
 
+        power_level_content_override = config.get("power_level_content_override")
+        if (
+            power_level_content_override
+            and "users" in power_level_content_override
+            and user_id not in power_level_content_override["users"]
+        ):
+            raise SynapseError(
+                400,
+                "Not a valid power_level_content_override: 'users' did not contain %s"
+                % (user_id,),
+            )
+
         invite_3pid_list = config.get("invite_3pid", [])
 
         visibility = config.get("visibility", None)
@@ -567,8 +579,8 @@ class RoomCreationHandler(BaseHandler):
 
         room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
 
+        directory_handler = self.hs.get_handlers().directory_handler
         if room_alias:
-            directory_handler = self.hs.get_handlers().directory_handler
             yield directory_handler.create_association(
                 requester=requester,
                 room_id=room_id,
@@ -604,7 +616,7 @@ class RoomCreationHandler(BaseHandler):
             initial_state=initial_state,
             creation_content=creation_content,
             room_alias=room_alias,
-            power_level_content_override=config.get("power_level_content_override"),
+            power_level_content_override=power_level_content_override,
             creator_join_profile=creator_join_profile,
         )
 
@@ -653,6 +665,7 @@ class RoomCreationHandler(BaseHandler):
 
         for invite_3pid in invite_3pid_list:
             id_server = invite_3pid["id_server"]
+            id_access_token = invite_3pid.get("id_access_token")  # optional
             address = invite_3pid["address"]
             medium = invite_3pid["medium"]
             yield self.hs.get_room_member_handler().do_3pid_invite(
@@ -663,6 +676,7 @@ class RoomCreationHandler(BaseHandler):
                 id_server,
                 requester,
                 txn_id=None,
+                id_access_token=id_access_token,
             )
 
         result = {"room_id": room_id}
@@ -840,7 +854,6 @@ class RoomContextHandler(object):
         )
         if not event:
             return None
-            return
 
         filtered = yield (filter_evts([event]))
         if not filtered:
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index e9094ad02b..a7e55f00e5 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -25,6 +25,7 @@ from unpaddedbase64 import decode_base64, encode_base64
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.errors import Codes, HttpResponseException
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -485,7 +486,33 @@ class RoomListHandler(BaseHandler):
             return {"chunk": [], "total_room_count_estimate": 0}
 
         if search_filter:
-            # We currently don't support searching across federation, so we have
+            # Searching across federation is defined in MSC2197.
+            # However, the remote homeserver may or may not actually support it.
+            # So we first try an MSC2197 remote-filtered search, then fall back
+            # to a locally-filtered search if we must.
+
+            try:
+                res = yield self._get_remote_list_cached(
+                    server_name,
+                    limit=limit,
+                    since_token=since_token,
+                    include_all_networks=include_all_networks,
+                    third_party_instance_id=third_party_instance_id,
+                    search_filter=search_filter,
+                )
+                return res
+            except HttpResponseException as hre:
+                syn_err = hre.to_synapse_error()
+                if hre.code in (404, 405) or syn_err.errcode in (
+                    Codes.UNRECOGNIZED,
+                    Codes.NOT_FOUND,
+                ):
+                    logger.debug("Falling back to locally-filtered /publicRooms")
+                else:
+                    raise  # Not an error that should trigger a fallback.
+
+            # if we reach this point, then we fall back to the situation where
+            # we currently don't support searching across federation, so we have
             # to do it manually without pagination
             limit = None
             since_token = None
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 249a6d9c5d..94cd0cf3ef 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -25,13 +25,17 @@ from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
+from twisted.internet.error import TimeoutError
 
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
+from synapse.handlers.identity import LookupAlgorithm, create_id_access_token_header
+from synapse.http.client import SimpleHttpClient
 from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.hash import sha256_and_url_safe_base64
 
 from ._base import BaseHandler
 
@@ -59,7 +63,11 @@ class RoomMemberHandler(object):
         self.auth = hs.get_auth()
         self.state_handler = hs.get_state_handler()
         self.config = hs.config
-        self.simple_http_client = hs.get_simple_http_client()
+        # We create a blacklisting instance of SimpleHttpClient for contacting identity
+        # servers specified by clients
+        self.simple_http_client = SimpleHttpClient(
+            hs, ip_blacklist=hs.config.federation_ip_range_blacklist
+        )
 
         self.federation_handler = hs.get_handlers().federation_handler
         self.directory_handler = hs.get_handlers().directory_handler
@@ -100,7 +108,7 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Attempt to reject an invite for a room this server is not in. If we
         fail to do so we locally mark the invite as rejected.
 
@@ -510,9 +518,7 @@ class RoomMemberHandler(object):
         return res
 
     @defer.inlineCallbacks
-    def send_membership_event(
-        self, requester, event, context, remote_room_hosts=None, ratelimit=True
-    ):
+    def send_membership_event(self, requester, event, context, ratelimit=True):
         """
         Change the membership status of a user in a room.
 
@@ -522,16 +528,10 @@ class RoomMemberHandler(object):
                 act as the sender, will be skipped.
             event (SynapseEvent): The membership event.
             context: The context of the event.
-            is_guest (bool): Whether the sender is a guest.
-            room_hosts ([str]): Homeservers which are likely to already be in
-                the room, and could be danced with in order to join this
-                homeserver for the first time.
             ratelimit (bool): Whether to rate limit this request.
         Raises:
             SynapseError if there was a problem changing the membership.
         """
-        remote_room_hosts = remote_room_hosts or []
-
         target_user = UserID.from_string(event.state_key)
         room_id = event.room_id
 
@@ -634,7 +634,7 @@ class RoomMemberHandler(object):
             servers.remove(room_alias.domain)
         servers.insert(0, room_alias.domain)
 
-        return (RoomID.from_string(room_id), servers)
+        return RoomID.from_string(room_id), servers
 
     @defer.inlineCallbacks
     def _get_inviter(self, user_id, room_id):
@@ -646,7 +646,15 @@ class RoomMemberHandler(object):
 
     @defer.inlineCallbacks
     def do_3pid_invite(
-        self, room_id, inviter, medium, address, id_server, requester, txn_id
+        self,
+        room_id,
+        inviter,
+        medium,
+        address,
+        id_server,
+        requester,
+        txn_id,
+        id_access_token=None,
     ):
         if self.config.block_non_admin_invites:
             is_requester_admin = yield self.auth.is_server_admin(requester.user)
@@ -669,7 +677,12 @@ class RoomMemberHandler(object):
                 Codes.FORBIDDEN,
             )
 
-        invitee = yield self._lookup_3pid(id_server, medium, address)
+        if not self._enable_lookup:
+            raise SynapseError(
+                403, "Looking up third-party identifiers is denied from this server"
+            )
+
+        invitee = yield self._lookup_3pid(id_server, medium, address, id_access_token)
 
         if invitee:
             yield self.update_membership(
@@ -677,11 +690,18 @@ class RoomMemberHandler(object):
             )
         else:
             yield self._make_and_store_3pid_invite(
-                requester, id_server, medium, address, room_id, inviter, txn_id=txn_id
+                requester,
+                id_server,
+                medium,
+                address,
+                room_id,
+                inviter,
+                txn_id=txn_id,
+                id_access_token=id_access_token,
             )
 
     @defer.inlineCallbacks
-    def _lookup_3pid(self, id_server, medium, address):
+    def _lookup_3pid(self, id_server, medium, address, id_access_token=None):
         """Looks up a 3pid in the passed identity server.
 
         Args:
@@ -689,14 +709,48 @@ class RoomMemberHandler(object):
                 of the identity server to use.
             medium (str): The type of the third party identifier (e.g. "email").
             address (str): The third party identifier (e.g. "foo@example.com").
+            id_access_token (str|None): The access token to authenticate to the identity
+                server with
+
+        Returns:
+            str|None: the matrix ID of the 3pid, or None if it is not recognized.
+        """
+        if id_access_token is not None:
+            try:
+                results = yield self._lookup_3pid_v2(
+                    id_server, id_access_token, medium, address
+                )
+                return results
+
+            except Exception as e:
+                # Catch HttpResponseExcept for a non-200 response code
+                # Check if this identity server does not know about v2 lookups
+                if isinstance(e, HttpResponseException) and e.code == 404:
+                    # This is an old identity server that does not yet support v2 lookups
+                    logger.warning(
+                        "Attempted v2 lookup on v1 identity server %s. Falling "
+                        "back to v1",
+                        id_server,
+                    )
+                else:
+                    logger.warning("Error when looking up hashing details: %s", e)
+                    return None
+
+        return (yield self._lookup_3pid_v1(id_server, medium, address))
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v1(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server using v1 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
 
         Returns:
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
-        if not self._enable_lookup:
-            raise SynapseError(
-                403, "Looking up third-party identifiers is denied from this server"
-            )
         try:
             data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
@@ -708,20 +762,136 @@ class RoomMemberHandler(object):
                     raise AuthError(401, "No signatures on 3pid binding")
                 yield self._verify_any_signature(data, id_server)
                 return data["mxid"]
-
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
         except IOError as e:
-            logger.warn("Error from identity server lookup: %s" % (e,))
+            logger.warning("Error from v1 identity server lookup: %s" % (e,))
+
+        return None
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
+        """Looks up a 3pid in the passed identity server using v2 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            id_access_token (str): The access token to authenticate to the identity server with
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
+        """
+        # Check what hashing details are supported by this identity server
+        try:
+            hash_details = yield self.simple_http_client.get_json(
+                "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
+                {"access_token": id_access_token},
+            )
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+
+        if not isinstance(hash_details, dict):
+            logger.warning(
+                "Got non-dict object when checking hash details of %s%s: %s",
+                id_server_scheme,
+                id_server,
+                hash_details,
+            )
+            raise SynapseError(
+                400,
+                "Non-dict object from %s%s during v2 hash_details request: %s"
+                % (id_server_scheme, id_server, hash_details),
+            )
+
+        # Extract information from hash_details
+        supported_lookup_algorithms = hash_details.get("algorithms")
+        lookup_pepper = hash_details.get("lookup_pepper")
+        if (
+            not supported_lookup_algorithms
+            or not isinstance(supported_lookup_algorithms, list)
+            or not lookup_pepper
+            or not isinstance(lookup_pepper, str)
+        ):
+            raise SynapseError(
+                400,
+                "Invalid hash details received from identity server %s%s: %s"
+                % (id_server_scheme, id_server, hash_details),
+            )
+
+        # Check if any of the supported lookup algorithms are present
+        if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
+            # Perform a hashed lookup
+            lookup_algorithm = LookupAlgorithm.SHA256
+
+            # Hash address, medium and the pepper with sha256
+            to_hash = "%s %s %s" % (address, medium, lookup_pepper)
+            lookup_value = sha256_and_url_safe_base64(to_hash)
+
+        elif LookupAlgorithm.NONE in supported_lookup_algorithms:
+            # Perform a non-hashed lookup
+            lookup_algorithm = LookupAlgorithm.NONE
+
+            # Combine together plaintext address and medium
+            lookup_value = "%s %s" % (address, medium)
+
+        else:
+            logger.warning(
+                "None of the provided lookup algorithms of %s are supported: %s",
+                id_server,
+                supported_lookup_algorithms,
+            )
+            raise SynapseError(
+                400,
+                "Provided identity server does not support any v2 lookup "
+                "algorithms that this homeserver supports.",
+            )
+
+        # Authenticate with identity server given the access token from the client
+        headers = {"Authorization": create_id_access_token_header(id_access_token)}
+
+        try:
+            lookup_results = yield self.simple_http_client.post_json_get_json(
+                "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
+                {
+                    "addresses": [lookup_value],
+                    "algorithm": lookup_algorithm,
+                    "pepper": lookup_pepper,
+                },
+                headers=headers,
+            )
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+        except Exception as e:
+            logger.warning("Error when performing a v2 3pid lookup: %s", e)
+            raise SynapseError(
+                500, "Unknown error occurred during identity server lookup"
+            )
+
+        # Check for a mapping from what we looked up to an MXID
+        if "mappings" not in lookup_results or not isinstance(
+            lookup_results["mappings"], dict
+        ):
+            logger.warning("No results from 3pid lookup")
             return None
 
+        # Return the MXID if it's available, or None otherwise
+        mxid = lookup_results["mappings"].get(lookup_value)
+        return mxid
+
     @defer.inlineCallbacks
     def _verify_any_signature(self, data, server_hostname):
         if server_hostname not in data["signatures"]:
             raise AuthError(401, "No signature from server %s" % (server_hostname,))
         for key_name, signature in data["signatures"][server_hostname].items():
-            key_data = yield self.simple_http_client.get_json(
-                "%s%s/_matrix/identity/api/v1/pubkey/%s"
-                % (id_server_scheme, server_hostname, key_name)
-            )
+            try:
+                key_data = yield self.simple_http_client.get_json(
+                    "%s%s/_matrix/identity/api/v1/pubkey/%s"
+                    % (id_server_scheme, server_hostname, key_name)
+                )
+            except TimeoutError:
+                raise SynapseError(500, "Timed out contacting identity server")
             if "public_key" not in key_data:
                 raise AuthError(
                     401, "No public key named %s from %s" % (key_name, server_hostname)
@@ -737,7 +907,15 @@ class RoomMemberHandler(object):
 
     @defer.inlineCallbacks
     def _make_and_store_3pid_invite(
-        self, requester, id_server, medium, address, room_id, user, txn_id
+        self,
+        requester,
+        id_server,
+        medium,
+        address,
+        room_id,
+        user,
+        txn_id,
+        id_access_token=None,
     ):
         room_state = yield self.state_handler.get_current_state(room_id)
 
@@ -786,6 +964,7 @@ class RoomMemberHandler(object):
                 room_name=room_name,
                 inviter_display_name=inviter_display_name,
                 inviter_avatar_url=inviter_avatar_url,
+                id_access_token=id_access_token,
             )
         )
 
@@ -823,6 +1002,7 @@ class RoomMemberHandler(object):
         room_name,
         inviter_display_name,
         inviter_avatar_url,
+        id_access_token=None,
     ):
         """
         Asks an identity server for a third party invite.
@@ -842,6 +1022,8 @@ class RoomMemberHandler(object):
             inviter_display_name (str): The current display name of the
                 inviter.
             inviter_avatar_url (str): The URL of the inviter's avatar.
+            id_access_token (str|None): The access token to authenticate to the identity
+                server with
 
         Returns:
             A deferred tuple containing:
@@ -852,12 +1034,6 @@ class RoomMemberHandler(object):
                 display_name (str): A user-friendly name to represent the invited
                     user.
         """
-
-        is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
-            id_server_scheme,
-            id_server,
-        )
-
         invite_config = {
             "medium": medium,
             "address": address,
@@ -871,22 +1047,70 @@ class RoomMemberHandler(object):
             "sender_avatar_url": inviter_avatar_url,
         }
 
-        try:
-            data = yield self.simple_http_client.post_json_get_json(
-                is_url, invite_config
-            )
-        except HttpResponseException as e:
-            # Some identity servers may only support application/x-www-form-urlencoded
-            # types. This is especially true with old instances of Sydent, see
-            # https://github.com/matrix-org/sydent/pull/170
-            logger.info(
-                "Failed to POST %s with JSON, falling back to urlencoded form: %s",
-                is_url,
-                e,
+        # Add the identity service access token to the JSON body and use the v2
+        # Identity Service endpoints if id_access_token is present
+        data = None
+        base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server)
+
+        if id_access_token:
+            key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
+                id_server_scheme,
+                id_server,
             )
-            data = yield self.simple_http_client.post_urlencoded_get_json(
-                is_url, invite_config
+
+            # Attempt a v2 lookup
+            url = base_url + "/v2/store-invite"
+            try:
+                data = yield self.simple_http_client.post_json_get_json(
+                    url,
+                    invite_config,
+                    {"Authorization": create_id_access_token_header(id_access_token)},
+                )
+            except TimeoutError:
+                raise SynapseError(500, "Timed out contacting identity server")
+            except HttpResponseException as e:
+                if e.code != 404:
+                    logger.info("Failed to POST %s with JSON: %s", url, e)
+                    raise e
+
+        if data is None:
+            key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+                id_server_scheme,
+                id_server,
             )
+            url = base_url + "/api/v1/store-invite"
+
+            try:
+                data = yield self.simple_http_client.post_json_get_json(
+                    url, invite_config
+                )
+            except TimeoutError:
+                raise SynapseError(500, "Timed out contacting identity server")
+            except HttpResponseException as e:
+                logger.warning(
+                    "Error trying to call /store-invite on %s%s: %s",
+                    id_server_scheme,
+                    id_server,
+                    e,
+                )
+
+            if data is None:
+                # Some identity servers may only support application/x-www-form-urlencoded
+                # types. This is especially true with old instances of Sydent, see
+                # https://github.com/matrix-org/sydent/pull/170
+                try:
+                    data = yield self.simple_http_client.post_urlencoded_get_json(
+                        url, invite_config
+                    )
+                except HttpResponseException as e:
+                    logger.warning(
+                        "Error calling /store-invite on %s%s with fallback "
+                        "encoding: %s",
+                        id_server_scheme,
+                        id_server,
+                        e,
+                    )
+                    raise e
 
         # TODO: Check for success
         token = data["token"]
@@ -894,8 +1118,7 @@ class RoomMemberHandler(object):
         if "public_key" in data:
             fallback_public_key = {
                 "public_key": data["public_key"],
-                "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid"
-                % (id_server_scheme, id_server),
+                "key_validity_url": key_validity_url,
             }
         else:
             fallback_public_key = public_keys[0]
@@ -903,7 +1126,7 @@ class RoomMemberHandler(object):
         if not public_keys:
             public_keys.append(fallback_public_key)
         display_name = data["display_name"]
-        return (token, public_keys, fallback_public_key, display_name)
+        return token, public_keys, fallback_public_key, display_name
 
     @defer.inlineCallbacks
     def _is_host_in_room(self, current_state_ids):
@@ -962,9 +1185,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         )
 
         if complexity:
-            if complexity["v1"] > max_complexity:
-                return True
-            return False
+            return complexity["v1"] > max_complexity
         return None
 
     @defer.inlineCallbacks
@@ -980,10 +1201,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         max_complexity = self.hs.config.limit_remote_rooms.complexity
         complexity = yield self.store.get_room_complexity(room_id)
 
-        if complexity["v1"] > max_complexity:
-            return True
-
-        return False
+        return complexity["v1"] > max_complexity
 
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -1062,7 +1280,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             # The 'except' clause is very broad, but we need to
             # capture everything from DNS failures upwards
             #
-            logger.warn("Failed to reject invite: %s", e)
+            logger.warning("Failed to reject invite: %s", e)
 
             yield self.store.locally_reject_invite(target.to_string(), room_id)
             return {}
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index a1ce6929cf..cc9e6b9bd0 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -21,6 +21,8 @@ from saml2.client import Saml2Client
 from synapse.api.errors import SynapseError
 from synapse.http.servlet import parse_string
 from synapse.rest.client.v1.login import SSOAuthHandler
+from synapse.types import UserID, map_username_to_mxid_localpart
+from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
 
@@ -29,12 +31,26 @@ class SamlHandler:
     def __init__(self, hs):
         self._saml_client = Saml2Client(hs.config.saml2_sp_config)
         self._sso_auth_handler = SSOAuthHandler(hs)
+        self._registration_handler = hs.get_registration_handler()
+
+        self._clock = hs.get_clock()
+        self._datastore = hs.get_datastore()
+        self._hostname = hs.hostname
+        self._saml2_session_lifetime = hs.config.saml2_session_lifetime
+        self._mxid_source_attribute = hs.config.saml2_mxid_source_attribute
+        self._grandfathered_mxid_source_attribute = (
+            hs.config.saml2_grandfathered_mxid_source_attribute
+        )
+        self._mxid_mapper = hs.config.saml2_mxid_mapper
+
+        # identifier for the external_ids table
+        self._auth_provider_id = "saml"
 
         # a map from saml session id to Saml2SessionData object
         self._outstanding_requests_dict = {}
 
-        self._clock = hs.get_clock()
-        self._saml2_session_lifetime = hs.config.saml2_session_lifetime
+        # a lock on the mappings
+        self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock)
 
     def handle_redirect_request(self, client_redirect_url):
         """Handle an incoming request to /login/sso/redirect
@@ -60,7 +76,7 @@ class SamlHandler:
         # this shouldn't happen!
         raise Exception("prepare_for_authenticate didn't return a Location header")
 
-    def handle_saml_response(self, request):
+    async def handle_saml_response(self, request):
         """Handle an incoming request to /_matrix/saml2/authn_response
 
         Args:
@@ -77,6 +93,10 @@ class SamlHandler:
         # the dict.
         self.expire_sessions()
 
+        user_id = await self._map_saml_response_to_user(resp_bytes)
+        self._sso_auth_handler.complete_sso_login(user_id, request, relay_state)
+
+    async def _map_saml_response_to_user(self, resp_bytes):
         try:
             saml2_auth = self._saml_client.parse_authn_request_response(
                 resp_bytes,
@@ -91,18 +111,88 @@ class SamlHandler:
             logger.warning("SAML2 response was not signed")
             raise SynapseError(400, "SAML2 response was not signed")
 
-        if "uid" not in saml2_auth.ava:
+        logger.info("SAML2 response: %s", saml2_auth.origxml)
+        logger.info("SAML2 mapped attributes: %s", saml2_auth.ava)
+
+        try:
+            remote_user_id = saml2_auth.ava["uid"][0]
+        except KeyError:
             logger.warning("SAML2 response lacks a 'uid' attestation")
             raise SynapseError(400, "uid not in SAML2 response")
 
+        try:
+            mxid_source = saml2_auth.ava[self._mxid_source_attribute][0]
+        except KeyError:
+            logger.warning(
+                "SAML2 response lacks a '%s' attestation", self._mxid_source_attribute
+            )
+            raise SynapseError(
+                400, "%s not in SAML2 response" % (self._mxid_source_attribute,)
+            )
+
         self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
 
-        username = saml2_auth.ava["uid"][0]
         displayName = saml2_auth.ava.get("displayName", [None])[0]
 
-        return self._sso_auth_handler.on_successful_auth(
-            username, request, relay_state, user_display_name=displayName
-        )
+        with (await self._mapping_lock.queue(self._auth_provider_id)):
+            # first of all, check if we already have a mapping for this user
+            logger.info(
+                "Looking for existing mapping for user %s:%s",
+                self._auth_provider_id,
+                remote_user_id,
+            )
+            registered_user_id = await self._datastore.get_user_by_external_id(
+                self._auth_provider_id, remote_user_id
+            )
+            if registered_user_id is not None:
+                logger.info("Found existing mapping %s", registered_user_id)
+                return registered_user_id
+
+            # backwards-compatibility hack: see if there is an existing user with a
+            # suitable mapping from the uid
+            if (
+                self._grandfathered_mxid_source_attribute
+                and self._grandfathered_mxid_source_attribute in saml2_auth.ava
+            ):
+                attrval = saml2_auth.ava[self._grandfathered_mxid_source_attribute][0]
+                user_id = UserID(
+                    map_username_to_mxid_localpart(attrval), self._hostname
+                ).to_string()
+                logger.info(
+                    "Looking for existing account based on mapped %s %s",
+                    self._grandfathered_mxid_source_attribute,
+                    user_id,
+                )
+
+                users = await self._datastore.get_users_by_id_case_insensitive(user_id)
+                if users:
+                    registered_user_id = list(users.keys())[0]
+                    logger.info("Grandfathering mapping to %s", registered_user_id)
+                    await self._datastore.record_user_external_id(
+                        self._auth_provider_id, remote_user_id, registered_user_id
+                    )
+                    return registered_user_id
+
+            # figure out a new mxid for this user
+            base_mxid_localpart = self._mxid_mapper(mxid_source)
+
+            suffix = 0
+            while True:
+                localpart = base_mxid_localpart + (str(suffix) if suffix else "")
+                if not await self._datastore.get_users_by_id_case_insensitive(
+                    UserID(localpart, self._hostname).to_string()
+                ):
+                    break
+                suffix += 1
+            logger.info("Allocating mxid for new user with localpart %s", localpart)
+
+            registered_user_id = await self._registration_handler.register_user(
+                localpart=localpart, default_display_name=displayName
+            )
+            await self._datastore.record_user_external_id(
+                self._auth_provider_id, remote_user_id, registered_user_id
+            )
+            return registered_user_id
 
     def expire_sessions(self):
         expire_before = self._clock.time_msec() - self._saml2_session_lifetime
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..cbac7c347a 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -14,15 +14,14 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled:
+        if not self.hs.config.stats_enabled or self._is_processing:
             return
 
-        if self._is_processing:
-            return
+        self._is_processing = True
 
         @defer.inlineCallbacks
         def process():
@@ -75,39 +73,78 @@ class StatsHandler(StateDeltasHandler):
             finally:
                 self._is_processing = False
 
-        self._is_processing = True
         run_as_background_process("stats.notify_new_event", process)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
-
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
-            return None
+            self.pos = yield self.store.get_stats_positions()
 
         # Loop round handling deltas until we're up to date
+
         while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
-                    return
+            # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+            # deltas, since there is otherwise a chance that we could miss updates which arrive
+            # after we check the deltas.
+            room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
+            if self.pos == room_max_stream_ordering:
+                break
+
+            deltas = yield self.store.get_current_state_deltas(self.pos)
+
+            if deltas:
+                logger.debug("Handling %d state deltas", len(deltas))
+                room_deltas, user_deltas = yield self._handle_deltas(deltas)
+
+                max_pos = deltas[-1]["stream_id"]
+            else:
+                room_deltas = {}
+                user_deltas = {}
+                max_pos = room_max_stream_ordering
+
+            # Then count deltas for total_events and total_event_bytes.
+            room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+                self.pos, max_pos
+            )
 
-                logger.info("Handling %d state deltas", len(deltas))
-                yield self._handle_deltas(deltas)
+            for room_id, fields in room_count.items():
+                room_deltas.setdefault(room_id, {}).update(fields)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+            for user_id, fields in user_count.items():
+                user_deltas.setdefault(user_id, {}).update(fields)
 
-                event_processing_positions.labels("stats").set(self.pos)
+            logger.debug("room_deltas: %s", room_deltas)
+            logger.debug("user_deltas: %s", user_deltas)
+
+            # Always call this so that we update the stats position.
+            yield self.store.bulk_update_stats_delta(
+                self.clock.time_msec(),
+                updates={"room": room_deltas, "user": user_deltas},
+                stream_id=max_pos,
+            )
+
+            logger.debug("Handled room stats to %s -> %s", self.pos, max_pos)
+
+            event_processing_positions.labels("stats").set(max_pos)
+
+            self.pos = max_pos
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+
+        Returns:
+            Deferred[tuple[dict[str, Counter], dict[str, counter]]]
+            Resovles to two dicts, the room deltas and the user deltas,
+            mapping from room/user ID to changes in the various fields.
         """
-        Called with the state deltas to process
-        """
+
+        room_to_stats_deltas = {}
+        user_to_stats_deltas = {}
+
+        room_to_state_updates = {}
+
         for delta in deltas:
             typ = delta["type"]
             state_key = delta["state_key"]
@@ -115,11 +152,10 @@ class StatsHandler(StateDeltasHandler):
             event_id = delta["event_id"]
             stream_id = delta["stream_id"]
             prev_event_id = delta["prev_event_id"]
-            stream_pos = delta["stream_id"]
 
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+            logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -131,203 +167,132 @@ class StatsHandler(StateDeltasHandler):
                 continue
 
             if event_id is None and prev_event_id is None:
-                # Errr...
+                logger.error(
+                    "event ID is None and so is the previous event ID. stream_id: %s",
+                    stream_id,
+                )
                 continue
 
             event_content = {}
 
+            sender = None
             if event_id is not None:
                 event = yield self.store.get_event(event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
+                    sender = event.sender
+
+            # All the values in this dict are deltas (RELATIVE changes)
+            room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
 
-            # We use stream_pos here rather than fetch by event_id as event_id
-            # may be None
-            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            room_state = room_to_state_updates.setdefault(room_id, {})
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] += 1
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
-
-                if prev_membership == membership:
-                    continue
 
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
-                    )
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif membership == prev_membership:
+                    pass  # noop
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] -= 1
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
-                    )
+                    room_stats_delta["invited_members"] -= 1
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
-                    )
+                    room_stats_delta["left_members"] -= 1
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
-                    )
+                    room_stats_delta["banned_members"] -= 1
                 else:
-                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError(
+                        "%r is not a valid prev_membership" % (prev_membership,)
+                    )
 
+                if membership == prev_membership:
+                    pass  # noop
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
-                    )
+                    room_stats_delta["joined_members"] += 1
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
-                    )
+                    room_stats_delta["invited_members"] += 1
+
+                    if sender and self.is_mine_id(sender):
+                        user_to_stats_deltas.setdefault(sender, Counter())[
+                            "invites_sent"
+                        ] += 1
+
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
-                    )
+                    room_stats_delta["left_members"] += 1
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
-                    )
+                    room_stats_delta["banned_members"] += 1
                 else:
-                    err = "%s is not a valid membership" % (repr(membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError("%r is not a valid membership" % (membership,))
 
                 user_id = state_key
                 if self.is_mine_id(user_id):
-                    # update user_stats as it's one of our users
-                    public = yield self._is_public_room(room_id)
-
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
-                        )
+                    # this accounts for transitions like leave → ban and so on.
+                    has_changed_joinedness = (prev_membership == Membership.JOIN) != (
+                        membership == Membership.JOIN
+                    )
 
-            elif typ == EventTypes.Create:
-                # Newly created room. Add it with all blank portions.
-                yield self.store.update_room_state(
-                    room_id,
-                    {
-                        "join_rules": None,
-                        "history_visibility": None,
-                        "encryption": None,
-                        "name": None,
-                        "topic": None,
-                        "avatar": None,
-                        "canonical_alias": None,
-                    },
-                )
+                    if has_changed_joinedness:
+                        delta = +1 if membership == Membership.JOIN else -1
 
-            elif typ == EventTypes.JoinRules:
-                yield self.store.update_room_state(
-                    room_id, {"join_rules": event_content.get("join_rule")}
-                )
+                        user_to_stats_deltas.setdefault(user_id, Counter())[
+                            "joined_rooms"
+                        ] += delta
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
-                )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+                        room_stats_delta["local_users_in_room"] += delta
 
-            elif typ == EventTypes.RoomHistoryVisibility:
-                yield self.store.update_room_state(
-                    room_id,
-                    {"history_visibility": event_content.get("history_visibility")},
+            elif typ == EventTypes.Create:
+                room_state["is_federatable"] = (
+                    event_content.get("m.federate", True) is True
                 )
-
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                if sender and self.is_mine_id(sender):
+                    user_to_stats_deltas.setdefault(sender, Counter())[
+                        "rooms_created"
+                    ] += 1
+            elif typ == EventTypes.JoinRules:
+                room_state["join_rules"] = event_content.get("join_rule")
+            elif typ == EventTypes.RoomHistoryVisibility:
+                room_state["history_visibility"] = event_content.get(
+                    "history_visibility"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
-
             elif typ == EventTypes.Encryption:
-                yield self.store.update_room_state(
-                    room_id, {"encryption": event_content.get("algorithm")}
-                )
+                room_state["encryption"] = event_content.get("algorithm")
             elif typ == EventTypes.Name:
-                yield self.store.update_room_state(
-                    room_id, {"name": event_content.get("name")}
-                )
+                room_state["name"] = event_content.get("name")
             elif typ == EventTypes.Topic:
-                yield self.store.update_room_state(
-                    room_id, {"topic": event_content.get("topic")}
-                )
+                room_state["topic"] = event_content.get("topic")
             elif typ == EventTypes.RoomAvatar:
-                yield self.store.update_room_state(
-                    room_id, {"avatar": event_content.get("url")}
-                )
+                room_state["avatar"] = event_content.get("url")
             elif typ == EventTypes.CanonicalAlias:
-                yield self.store.update_room_state(
-                    room_id, {"canonical_alias": event_content.get("alias")}
-                )
-
-    @defer.inlineCallbacks
-    def update_public_room_stats(self, ts, room_id, is_public):
-        """
-        Increment/decrement a user's number of public rooms when a room they are
-        in changes to/from public visibility.
+                room_state["canonical_alias"] = event_content.get("alias")
+            elif typ == EventTypes.GuestAccess:
+                room_state["guest_access"] = event_content.get("guest_access")
 
-        Args:
-            ts (int): Timestamp in seconds
-            room_id (str)
-            is_public (bool)
-        """
-        # For now, blindly iterate over all local users in the room so that
-        # we can handle the whole problem of copying buckets over as needed
-        user_ids = yield self.store.get_users_in_room(room_id)
-
-        for user_id in user_ids:
-            if self.hs.is_mine(UserID.from_string(user_id)):
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
-                )
+        for room_id, state in room_to_state_updates.items():
+            yield self.store.update_room_state(room_id, state)
 
-    @defer.inlineCallbacks
-    def _is_public_room(self, room_id):
-        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
-        history_visibility = yield self.state.get_current_state(
-            room_id, EventTypes.RoomHistoryVisibility
-        )
-
-        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
-            (
-                history_visibility
-                and history_visibility.content.get("history_visibility")
-                == "world_readable"
-            )
-        ):
-            return True
-        else:
-            return False
+        return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 98da2318a0..19bca6717f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -378,7 +378,7 @@ class SyncHandler(object):
                 event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
-        return (now_token, ephemeral_by_room)
+        return now_token, ephemeral_by_room
 
     @defer.inlineCallbacks
     def _load_filtered_recents(
@@ -578,7 +578,6 @@ class SyncHandler(object):
 
         if not last_events:
             return None
-            return
 
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
@@ -786,9 +785,8 @@ class SyncHandler(object):
                         batch.events[0].event_id, state_filter=state_filter
                     )
                 else:
-                    # Its not clear how we get here, but empirically we do
-                    # (#5407). Logging has been added elsewhere to try and
-                    # figure out where this state comes from.
+                    # We can get here if the user has ignored the senders of all
+                    # the recent events.
                     state_at_timeline_start = yield self.get_state_at(
                         room_id, stream_position=now_token, state_filter=state_filter
                     )
@@ -1333,7 +1331,7 @@ class SyncHandler(object):
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        return ([], [], [], [])
+                        return [], [], [], []
 
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id
@@ -1643,7 +1641,7 @@ class SyncHandler(object):
                 )
             room_entries.append(entry)
 
-        return (room_entries, invited, newly_joined_rooms, newly_left_rooms)
+        return room_entries, invited, newly_joined_rooms, newly_left_rooms
 
     @defer.inlineCallbacks
     def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1717,7 +1715,7 @@ class SyncHandler(object):
                     )
                 )
 
-        return (room_entries, invited, [])
+        return room_entries, invited, []
 
     @defer.inlineCallbacks
     def _generate_room_entry(
@@ -1771,20 +1769,9 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
-        if not batch and batch.limited:
-            # This resulted in #5407, which is weird, so lets log! We do it
-            # here as we have the maximum amount of information.
-            user_id = sync_result_builder.sync_config.user.to_string()
-            logger.info(
-                "Issue #5407: Found limited batch with no events. user %s, room %s,"
-                " sync_config %s, newly_joined %s, events %s, batch %s.",
-                user_id,
-                room_id,
-                sync_config,
-                newly_joined,
-                events,
-                batch,
-            )
+        # Note: `batch` can be both empty and limited here in the case where
+        # `_load_filtered_recents` can't find any events the user should see
+        # (e.g. due to having ignored the sender of the last 50 events).
 
         if newly_joined:
             # debug for https://github.com/matrix-org/synapse/issues/4422
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f882330293..ca8ae9fb5b 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -319,4 +319,4 @@ class TypingNotificationEventSource(object):
         return self.get_typing_handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
-        return ([], pagination_config.from_key)
+        return [], pagination_config.from_key
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
new file mode 100644
index 0000000000..824f37f8f8
--- /dev/null
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -0,0 +1,22 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module implements user-interactive auth verification.
+
+TODO: move more stuff out of AuthHandler in here.
+
+"""
+
+from synapse.handlers.ui_auth.checkers import INTERACTIVE_AUTH_CHECKERS  # noqa: F401
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
new file mode 100644
index 0000000000..29aa1e5aaf
--- /dev/null
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -0,0 +1,247 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from canonicaljson import json
+
+from twisted.internet import defer
+from twisted.web.client import PartialDownloadError
+
+from synapse.api.constants import LoginType
+from synapse.api.errors import Codes, LoginError, SynapseError
+from synapse.config.emailconfig import ThreepidBehaviour
+
+logger = logging.getLogger(__name__)
+
+
+class UserInteractiveAuthChecker:
+    """Abstract base class for an interactive auth checker"""
+
+    def __init__(self, hs):
+        pass
+
+    def is_enabled(self):
+        """Check if the configuration of the homeserver allows this checker to work
+
+        Returns:
+            bool: True if this login type is enabled.
+        """
+
+    def check_auth(self, authdict, clientip):
+        """Given the authentication dict from the client, attempt to check this step
+
+        Args:
+            authdict (dict): authentication dictionary from the client
+            clientip (str): The IP address of the client.
+
+        Raises:
+            SynapseError if authentication failed
+
+        Returns:
+            Deferred: the result of authentication (to pass back to the client?)
+        """
+        raise NotImplementedError()
+
+
+class DummyAuthChecker(UserInteractiveAuthChecker):
+    AUTH_TYPE = LoginType.DUMMY
+
+    def is_enabled(self):
+        return True
+
+    def check_auth(self, authdict, clientip):
+        return defer.succeed(True)
+
+
+class TermsAuthChecker(UserInteractiveAuthChecker):
+    AUTH_TYPE = LoginType.TERMS
+
+    def is_enabled(self):
+        return True
+
+    def check_auth(self, authdict, clientip):
+        return defer.succeed(True)
+
+
+class RecaptchaAuthChecker(UserInteractiveAuthChecker):
+    AUTH_TYPE = LoginType.RECAPTCHA
+
+    def __init__(self, hs):
+        super().__init__(hs)
+        self._enabled = bool(hs.config.recaptcha_private_key)
+        self._http_client = hs.get_simple_http_client()
+        self._url = hs.config.recaptcha_siteverify_api
+        self._secret = hs.config.recaptcha_private_key
+
+    def is_enabled(self):
+        return self._enabled
+
+    @defer.inlineCallbacks
+    def check_auth(self, authdict, clientip):
+        try:
+            user_response = authdict["response"]
+        except KeyError:
+            # Client tried to provide captcha but didn't give the parameter:
+            # bad request.
+            raise LoginError(
+                400, "Captcha response is required", errcode=Codes.CAPTCHA_NEEDED
+            )
+
+        logger.info(
+            "Submitting recaptcha response %s with remoteip %s", user_response, clientip
+        )
+
+        # TODO: get this from the homeserver rather than creating a new one for
+        # each request
+        try:
+            resp_body = yield self._http_client.post_urlencoded_get_json(
+                self._url,
+                args={
+                    "secret": self._secret,
+                    "response": user_response,
+                    "remoteip": clientip,
+                },
+            )
+        except PartialDownloadError as pde:
+            # Twisted is silly
+            data = pde.response
+            resp_body = json.loads(data)
+
+        if "success" in resp_body:
+            # Note that we do NOT check the hostname here: we explicitly
+            # intend the CAPTCHA to be presented by whatever client the
+            # user is using, we just care that they have completed a CAPTCHA.
+            logger.info(
+                "%s reCAPTCHA from hostname %s",
+                "Successful" if resp_body["success"] else "Failed",
+                resp_body.get("hostname"),
+            )
+            if resp_body["success"]:
+                return True
+        raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+
+class _BaseThreepidAuthChecker:
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def _check_threepid(self, medium, authdict):
+        if "threepid_creds" not in authdict:
+            raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
+
+        threepid_creds = authdict["threepid_creds"]
+
+        identity_handler = self.hs.get_handlers().identity_handler
+
+        logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
+
+        # msisdns are currently always ThreepidBehaviour.REMOTE
+        if medium == "msisdn":
+            if not self.hs.config.account_threepid_delegate_msisdn:
+                raise SynapseError(
+                    400, "Phone number verification is not enabled on this homeserver"
+                )
+            threepid = yield identity_handler.threepid_from_creds(
+                self.hs.config.account_threepid_delegate_msisdn, threepid_creds
+            )
+        elif medium == "email":
+            if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
+                assert self.hs.config.account_threepid_delegate_email
+                threepid = yield identity_handler.threepid_from_creds(
+                    self.hs.config.account_threepid_delegate_email, threepid_creds
+                )
+            elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
+                threepid = None
+                row = yield self.store.get_threepid_validation_session(
+                    medium,
+                    threepid_creds["client_secret"],
+                    sid=threepid_creds["sid"],
+                    validated=True,
+                )
+
+                if row:
+                    threepid = {
+                        "medium": row["medium"],
+                        "address": row["address"],
+                        "validated_at": row["validated_at"],
+                    }
+
+                    # Valid threepid returned, delete from the db
+                    yield self.store.delete_threepid_session(threepid_creds["sid"])
+            else:
+                raise SynapseError(
+                    400, "Email address verification is not enabled on this homeserver"
+                )
+        else:
+            # this can't happen!
+            raise AssertionError("Unrecognized threepid medium: %s" % (medium,))
+
+        if not threepid:
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+        if threepid["medium"] != medium:
+            raise LoginError(
+                401,
+                "Expecting threepid of type '%s', got '%s'"
+                % (medium, threepid["medium"]),
+                errcode=Codes.UNAUTHORIZED,
+            )
+
+        threepid["threepid_creds"] = authdict["threepid_creds"]
+
+        return threepid
+
+
+class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
+    AUTH_TYPE = LoginType.EMAIL_IDENTITY
+
+    def __init__(self, hs):
+        UserInteractiveAuthChecker.__init__(self, hs)
+        _BaseThreepidAuthChecker.__init__(self, hs)
+
+    def is_enabled(self):
+        return self.hs.config.threepid_behaviour_email in (
+            ThreepidBehaviour.REMOTE,
+            ThreepidBehaviour.LOCAL,
+        )
+
+    def check_auth(self, authdict, clientip):
+        return self._check_threepid("email", authdict)
+
+
+class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
+    AUTH_TYPE = LoginType.MSISDN
+
+    def __init__(self, hs):
+        UserInteractiveAuthChecker.__init__(self, hs)
+        _BaseThreepidAuthChecker.__init__(self, hs)
+
+    def is_enabled(self):
+        return bool(self.hs.config.account_threepid_delegate_msisdn)
+
+    def check_auth(self, authdict, clientip):
+        return self._check_threepid("msisdn", authdict)
+
+
+INTERACTIVE_AUTH_CHECKERS = [
+    DummyAuthChecker,
+    TermsAuthChecker,
+    RecaptchaAuthChecker,
+    EmailIdentityAuthChecker,
+    MsisdnAuthChecker,
+]
+"""A list of UserInteractiveAuthChecker classes"""