summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-09-19 18:13:31 +0100
committerRichard van der Hoff <richard@matrix.org>2019-09-19 18:13:31 +0100
commitb65327ff661b87991566314cf07088a7e789b69b (patch)
tree75da1c79db8165ee6b3676b784a0d567551cb82d /synapse/handlers
parentbetter logging (diff)
parentfix sample config (diff)
downloadsynapse-b65327ff661b87991566314cf07088a7e789b69b.tar.xz
Merge branch 'develop' into rav/saml_mapping_work
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py43
-rw-r--r--synapse/handlers/account_data.py4
-rw-r--r--synapse/handlers/account_validity.py12
-rw-r--r--synapse/handlers/admin.py19
-rw-r--r--synapse/handlers/appservice.py2
-rw-r--r--synapse/handlers/auth.py53
-rw-r--r--synapse/handlers/device.py65
-rw-r--r--synapse/handlers/devicemessage.py30
-rw-r--r--synapse/handlers/e2e_keys.py52
-rw-r--r--synapse/handlers/e2e_room_keys.py28
-rw-r--r--synapse/handlers/events.py1
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/handlers/identity.py370
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py27
-rw-r--r--synapse/handlers/pagination.py17
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py170
-rw-r--r--synapse/handlers/room.py19
-rw-r--r--synapse/handlers/room_list.py29
-rw-r--r--synapse/handlers/room_member.py303
-rw-r--r--synapse/handlers/stats.py315
-rw-r--r--synapse/handlers/sync.py31
-rw-r--r--synapse/handlers/typing.py2
26 files changed, 1051 insertions, 571 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index c29c78bd65..d15c6282fb 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -45,6 +45,7 @@ class BaseHandler(object):
         self.state_handler = hs.get_state_handler()
         self.distributor = hs.get_distributor()
         self.ratelimiter = hs.get_ratelimiter()
+        self.admin_redaction_ratelimiter = hs.get_admin_redaction_ratelimiter()
         self.clock = hs.get_clock()
         self.hs = hs
 
@@ -53,7 +54,7 @@ class BaseHandler(object):
         self.event_builder_factory = hs.get_event_builder_factory()
 
     @defer.inlineCallbacks
-    def ratelimit(self, requester, update=True):
+    def ratelimit(self, requester, update=True, is_admin_redaction=False):
         """Ratelimits requests.
 
         Args:
@@ -62,6 +63,9 @@ class BaseHandler(object):
                 Set to False when doing multiple checks for one request (e.g.
                 to check up front if we would reject the request), and set to
                 True for the last call for a given request.
+            is_admin_redaction (bool): Whether this is a room admin/moderator
+                redacting an event. If so then we may apply different
+                ratelimits depending on config.
 
         Raises:
             LimitExceededError if the request should be ratelimited
@@ -90,16 +94,33 @@ class BaseHandler(object):
             messages_per_second = override.messages_per_second
             burst_count = override.burst_count
         else:
-            messages_per_second = self.hs.config.rc_message.per_second
-            burst_count = self.hs.config.rc_message.burst_count
-
-        allowed, time_allowed = self.ratelimiter.can_do_action(
-            user_id,
-            time_now,
-            rate_hz=messages_per_second,
-            burst_count=burst_count,
-            update=update,
-        )
+            # We default to different values if this is an admin redaction and
+            # the config is set
+            if is_admin_redaction and self.hs.config.rc_admin_redaction:
+                messages_per_second = self.hs.config.rc_admin_redaction.per_second
+                burst_count = self.hs.config.rc_admin_redaction.burst_count
+            else:
+                messages_per_second = self.hs.config.rc_message.per_second
+                burst_count = self.hs.config.rc_message.burst_count
+
+        if is_admin_redaction and self.hs.config.rc_admin_redaction:
+            # If we have separate config for admin redactions we use a separate
+            # ratelimiter
+            allowed, time_allowed = self.admin_redaction_ratelimiter.can_do_action(
+                user_id,
+                time_now,
+                rate_hz=messages_per_second,
+                burst_count=burst_count,
+                update=update,
+            )
+        else:
+            allowed, time_allowed = self.ratelimiter.can_do_action(
+                user_id,
+                time_now,
+                rate_hz=messages_per_second,
+                burst_count=burst_count,
+                update=update,
+            )
         if not allowed:
             raise LimitExceededError(
                 retry_after_ms=int(1000 * (time_allowed - time_now))
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 8acd9f9a83..38bc67191c 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -51,8 +51,8 @@ class AccountDataEventSource(object):
                     {"type": account_data_type, "content": content, "room_id": room_id}
                 )
 
-        return (results, current_stream_id)
+        return results, current_stream_id
 
     @defer.inlineCallbacks
     def get_pagination_rows(self, user, config, key):
-        return ([], config.to_id)
+        return [], config.to_id
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 34574f1a12..d04e0fe576 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -38,6 +38,7 @@ logger = logging.getLogger(__name__)
 class AccountValidityHandler(object):
     def __init__(self, hs):
         self.hs = hs
+        self.config = hs.config
         self.store = self.hs.get_datastore()
         self.sendmail = self.hs.get_sendmail()
         self.clock = self.hs.get_clock()
@@ -62,9 +63,14 @@ class AccountValidityHandler(object):
             self._raw_from = email.utils.parseaddr(self._from_string)[1]
 
             self._template_html, self._template_text = load_jinja2_templates(
-                config=self.hs.config,
-                template_html_name=self.hs.config.email_expiry_template_html,
-                template_text_name=self.hs.config.email_expiry_template_text,
+                self.config.email_template_dir,
+                [
+                    self.config.email_expiry_template_html,
+                    self.config.email_expiry_template_text,
+                ],
+                apply_format_ts_filter=True,
+                apply_mxc_to_http_filter=True,
+                public_baseurl=self.config.public_baseurl,
             )
 
             # Check the renewal emails to send and send them every 30min.
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 2f22f56ca4..1a87b58838 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -94,6 +94,25 @@ class AdminHandler(BaseHandler):
 
         return ret
 
+    def get_user_server_admin(self, user):
+        """
+        Get the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+        """
+        return self.store.is_server_admin(user)
+
+    def set_user_server_admin(self, user, admin):
+        """
+        Set the admin bit on a user.
+
+        Args:
+            user_id (UserID): the (necessarily local) user to manipulate
+            admin (bool): whether or not the user should be an admin of this server
+        """
+        return self.store.set_server_admin(user, admin)
+
     @defer.inlineCallbacks
     def export_user_data(self, user_id, writer):
         """Write all data we have on the user to the given writer.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index d1a51df6f9..3e9b298154 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -294,12 +294,10 @@ class ApplicationServicesHandler(object):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             return False
-            return
 
         user_info = yield self.store.get_user_by_id(user_id)
         if user_info:
             return False
-            return
 
         # user not found; could be the AS though, so check.
         services = self.store.get_app_services()
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 0f3ebf7ef8..374372b69e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -38,6 +38,7 @@ from synapse.api.errors import (
     UserDeactivatedError,
 )
 from synapse.api.ratelimiting import Ratelimiter
+from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.logging.context import defer_to_thread
 from synapse.module_api import ModuleApi
 from synapse.types import UserID
@@ -158,7 +159,7 @@ class AuthHandler(BaseHandler):
         return params
 
     @defer.inlineCallbacks
-    def check_auth(self, flows, clientdict, clientip, password_servlet=False):
+    def check_auth(self, flows, clientdict, clientip):
         """
         Takes a dictionary sent by the client in the login / registration
         protocol and handles the User-Interactive Auth flow.
@@ -182,16 +183,6 @@ class AuthHandler(BaseHandler):
 
             clientip (str): The IP address of the client.
 
-            password_servlet (bool): Whether the request originated from
-                PasswordRestServlet.
-                XXX: This is a temporary hack to distinguish between checking
-                for threepid validations locally (in the case of password
-                resets) and using the identity server (in the case of binding
-                a 3PID during registration). Once we start using the
-                homeserver for both tasks, this distinction will no longer be
-                necessary.
-
-
         Returns:
             defer.Deferred[dict, dict, str]: a deferred tuple of
                 (creds, params, session_id).
@@ -247,9 +238,7 @@ class AuthHandler(BaseHandler):
         if "type" in authdict:
             login_type = authdict["type"]
             try:
-                result = yield self._check_auth_dict(
-                    authdict, clientip, password_servlet=password_servlet
-                )
+                result = yield self._check_auth_dict(authdict, clientip)
                 if result:
                     creds[login_type] = result
                     self._save_session(session)
@@ -280,7 +269,7 @@ class AuthHandler(BaseHandler):
                     creds,
                     list(clientdict),
                 )
-                return (creds, clientdict, session["id"])
+                return creds, clientdict, session["id"]
 
         ret = self._auth_dict_for_flows(flows, session)
         ret["completed"] = list(creds)
@@ -356,7 +345,7 @@ class AuthHandler(BaseHandler):
         return sess.setdefault("serverdict", {}).get(key, default)
 
     @defer.inlineCallbacks
-    def _check_auth_dict(self, authdict, clientip, password_servlet=False):
+    def _check_auth_dict(self, authdict, clientip):
         """Attempt to validate the auth dict provided by a client
 
         Args:
@@ -374,11 +363,7 @@ class AuthHandler(BaseHandler):
         login_type = authdict["type"]
         checker = self.checkers.get(login_type)
         if checker is not None:
-            # XXX: Temporary workaround for having Synapse handle password resets
-            # See AuthHandler.check_auth for further details
-            res = yield checker(
-                authdict, clientip=clientip, password_servlet=password_servlet
-            )
+            res = yield checker(authdict, clientip=clientip)
             return res
 
         # build a v1-login-style dict out of the authdict and fall back to the
@@ -449,7 +434,7 @@ class AuthHandler(BaseHandler):
         return defer.succeed(True)
 
     @defer.inlineCallbacks
-    def _check_threepid(self, medium, authdict, password_servlet=False, **kwargs):
+    def _check_threepid(self, medium, authdict, **kwargs):
         if "threepid_creds" not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -458,12 +443,18 @@ class AuthHandler(BaseHandler):
         identity_handler = self.hs.get_handlers().identity_handler
 
         logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
-        if (
-            not password_servlet
-            or self.hs.config.email_password_reset_behaviour == "remote"
-        ):
-            threepid = yield identity_handler.threepid_from_creds(threepid_creds)
-        elif self.hs.config.email_password_reset_behaviour == "local":
+        if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
+            if medium == "email":
+                threepid = yield identity_handler.threepid_from_creds(
+                    self.hs.config.account_threepid_delegate_email, threepid_creds
+                )
+            elif medium == "msisdn":
+                threepid = yield identity_handler.threepid_from_creds(
+                    self.hs.config.account_threepid_delegate_msisdn, threepid_creds
+                )
+            else:
+                raise SynapseError(400, "Unrecognized threepid medium: %s" % (medium,))
+        elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
             row = yield self.store.get_threepid_validation_session(
                 medium,
                 threepid_creds["client_secret"],
@@ -722,7 +713,7 @@ class AuthHandler(BaseHandler):
                 known_login_type = True
                 is_valid = yield provider.check_password(qualified_user_id, password)
                 if is_valid:
-                    return (qualified_user_id, None)
+                    return qualified_user_id, None
 
             if not hasattr(provider, "get_supported_login_types") or not hasattr(
                 provider, "check_auth"
@@ -766,7 +757,7 @@ class AuthHandler(BaseHandler):
             )
 
             if canonical_user_id:
-                return (canonical_user_id, None)
+                return canonical_user_id, None
 
         if not known_login_type:
             raise SynapseError(400, "Unknown login type %s" % login_type)
@@ -816,7 +807,7 @@ class AuthHandler(BaseHandler):
                         result = (result, None)
                     return result
 
-        return (None, None)
+        return None, None
 
     @defer.inlineCallbacks
     def _check_local_password(self, user_id, password):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5c1cf83c9d..71a8f33da3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
     HttpResponseException,
     RequestSendFailed,
 )
+from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
@@ -45,6 +46,7 @@ class DeviceWorkerHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self._auth_handler = hs.get_auth_handler()
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_user(self, user_id):
         """
@@ -56,6 +58,7 @@ class DeviceWorkerHandler(BaseHandler):
             defer.Deferred: list[dict[str, X]]: info on each device
         """
 
+        set_tag("user_id", user_id)
         device_map = yield self.store.get_devices_by_user(user_id)
 
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
@@ -64,8 +67,10 @@ class DeviceWorkerHandler(BaseHandler):
         for device in devices:
             _update_device_from_client_ips(device, ips)
 
+        log_kv(device_map)
         return devices
 
+    @trace
     @defer.inlineCallbacks
     def get_device(self, user_id, device_id):
         """ Retrieve the given device
@@ -85,9 +90,14 @@ class DeviceWorkerHandler(BaseHandler):
             raise errors.NotFoundError
         ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
         _update_device_from_client_ips(device, ips)
+
+        set_tag("device", device)
+        set_tag("ips", ips)
+
         return device
 
     @measure_func("device.get_user_ids_changed")
+    @trace
     @defer.inlineCallbacks
     def get_user_ids_changed(self, user_id, from_token):
         """Get list of users that have had the devices updated, or have newly
@@ -97,6 +107,9 @@ class DeviceWorkerHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
+
+        set_tag("user_id", user_id)
+        set_tag("from_token", from_token)
         now_room_key = yield self.store.get_room_events_max_id()
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
@@ -148,6 +161,9 @@ class DeviceWorkerHandler(BaseHandler):
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
+                log_kv(
+                    {"event": "encountered empty previous state", "room_id": room_id}
+                )
                 for key, event_id in iteritems(current_state_ids):
                     etype, state_key = key
                     if etype != EventTypes.Member:
@@ -200,7 +216,11 @@ class DeviceWorkerHandler(BaseHandler):
             possibly_joined = []
             possibly_left = []
 
-        return {"changed": list(possibly_joined), "left": list(possibly_left)}
+        result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+
+        log_kv(result)
+
+        return result
 
 
 class DeviceHandler(DeviceWorkerHandler):
@@ -267,6 +287,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         raise errors.StoreError(500, "Couldn't generate a device ID.")
 
+    @trace
     @defer.inlineCallbacks
     def delete_device(self, user_id, device_id):
         """ Delete the given device
@@ -284,6 +305,10 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
+                set_tag("error", True)
+                log_kv(
+                    {"reason": "User doesn't have device id.", "device_id": device_id}
+                )
                 pass
             else:
                 raise
@@ -296,6 +321,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         yield self.notify_device_update(user_id, [device_id])
 
+    @trace
     @defer.inlineCallbacks
     def delete_all_devices_for_user(self, user_id, except_device_id=None):
         """Delete all of the user's devices
@@ -331,6 +357,8 @@ class DeviceHandler(DeviceWorkerHandler):
         except errors.StoreError as e:
             if e.code == 404:
                 # no match
+                set_tag("error", True)
+                set_tag("reason", "User doesn't have that device id.")
                 pass
             else:
                 raise
@@ -371,6 +399,7 @@ class DeviceHandler(DeviceWorkerHandler):
             else:
                 raise
 
+    @trace
     @measure_func("notify_device_update")
     @defer.inlineCallbacks
     def notify_device_update(self, user_id, device_ids):
@@ -386,6 +415,8 @@ class DeviceHandler(DeviceWorkerHandler):
             hosts.update(get_domain_from_id(u) for u in users_who_share_room)
             hosts.discard(self.server_name)
 
+        set_tag("target_hosts", hosts)
+
         position = yield self.store.add_device_change_to_streams(
             user_id, device_ids, list(hosts)
         )
@@ -405,6 +436,7 @@ class DeviceHandler(DeviceWorkerHandler):
             )
             for host in hosts:
                 self.federation_sender.send_device_messages(host)
+                log_kv({"message": "sent device update to host", "host": host})
 
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
@@ -451,12 +483,15 @@ class DeviceListUpdater(object):
             iterable=True,
         )
 
+    @trace
     @defer.inlineCallbacks
     def incoming_device_list_update(self, origin, edu_content):
         """Called on incoming device list update from federation. Responsible
         for parsing the EDU and adding to pending updates list.
         """
 
+        set_tag("origin", origin)
+        set_tag("edu_content", edu_content)
         user_id = edu_content.pop("user_id")
         device_id = edu_content.pop("device_id")
         stream_id = str(edu_content.pop("stream_id"))  # They may come as ints
@@ -471,12 +506,30 @@ class DeviceListUpdater(object):
                 device_id,
                 origin,
             )
+
+            set_tag("error", True)
+            log_kv(
+                {
+                    "message": "Got a device list update edu from a user and "
+                    "device which does not match the origin of the request.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             return
 
         room_ids = yield self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
+            set_tag("error", True)
+            log_kv(
+                {
+                    "message": "Got an update from a user for which "
+                    "we don't share any rooms",
+                    "other user_id": user_id,
+                }
+            )
             logger.warning(
                 "Got device list update edu for %r/%r, but don't share a room",
                 user_id,
@@ -578,6 +631,7 @@ class DeviceListUpdater(object):
             request:
             https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
         """
+        log_kv({"message": "Doing resync to update device list."})
         # Fetch all devices for the user.
         origin = get_domain_from_id(user_id)
         try:
@@ -594,13 +648,20 @@ class DeviceListUpdater(object):
             # eventually become consistent.
             return
         except FederationDeniedError as e:
+            set_tag("error", True)
+            log_kv({"reason": "FederationDeniedError"})
             logger.info(e)
             return
-        except Exception:
+        except Exception as e:
             # TODO: Remember that we are now out of sync and try again
             # later
+            set_tag("error", True)
+            log_kv(
+                {"message": "Exception raised by federation request", "exception": e}
+            )
             logger.exception("Failed to handle device list update for %s", user_id)
             return
+        log_kv({"result": result})
         stream_id = result["stream_id"]
         devices = result["devices"]
 
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index e1ebb6346c..0043cbea17 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,9 +15,17 @@
 
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    log_kv,
+    set_tag,
+    start_active_span,
+)
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
@@ -78,7 +86,8 @@ class DeviceMessageHandler(object):
 
     @defer.inlineCallbacks
     def send_device_message(self, sender_user_id, message_type, messages):
-
+        set_tag("number_of_messages", len(messages))
+        set_tag("sender", sender_user_id)
         local_messages = {}
         remote_messages = {}
         for user_id, by_device in messages.items():
@@ -100,15 +109,21 @@ class DeviceMessageHandler(object):
 
         message_id = random_string(16)
 
+        context = get_active_span_text_map()
+
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            remote_edu_contents[destination] = {
-                "messages": messages,
-                "sender": sender_user_id,
-                "type": message_type,
-                "message_id": message_id,
-            }
+            with start_active_span("to_device_for_user"):
+                set_tag("destination", destination)
+                remote_edu_contents[destination] = {
+                    "messages": messages,
+                    "sender": sender_user_id,
+                    "type": message_type,
+                    "message_id": message_id,
+                    "org.matrix.opentracing_context": json.dumps(context),
+                }
 
+        log_kv({"local_messages": local_messages})
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
@@ -117,6 +132,7 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
+        log_kv({"remote_messages": remote_messages})
         for destination in remote_messages.keys():
             # Enqueue a new federation transaction to send the new
             # device messages to each remote destination.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1f90b0d278..056fb97acb 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -24,6 +24,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException, SynapseError
 from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import unwrapFirstError
 from synapse.util.retryutils import NotRetryingDestination
@@ -46,6 +47,7 @@ class E2eKeysHandler(object):
             "client_keys", self.on_federation_query_client_keys
         )
 
+    @trace
     @defer.inlineCallbacks
     def query_devices(self, query_body, timeout):
         """ Handle a device key query from a client
@@ -81,6 +83,9 @@ class E2eKeysHandler(object):
             else:
                 remote_queries[user_id] = device_ids
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         # First get local devices.
         failures = {}
         results = {}
@@ -121,6 +126,7 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Now fetch any devices that we don't have in our cache
+        @trace
         @defer.inlineCallbacks
         def do_remote_query(destination):
             """This is called when we are querying the device list of a user on
@@ -185,6 +191,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -198,6 +206,7 @@ class E2eKeysHandler(object):
 
         return {"device_keys": results, "failures": failures}
 
+    @trace
     @defer.inlineCallbacks
     def query_local_devices(self, query):
         """Get E2E device keys for local users
@@ -210,6 +219,7 @@ class E2eKeysHandler(object):
             defer.Deferred: (resolves to dict[string, dict[string, dict]]):
                  map from user_id -> device_id -> device details
         """
+        set_tag("local_query", query)
         local_query = []
 
         result_dict = {}
@@ -217,6 +227,14 @@ class E2eKeysHandler(object):
             # we use UserID.from_string to catch invalid user ids
             if not self.is_mine(UserID.from_string(user_id)):
                 logger.warning("Request for keys for non-local user %s", user_id)
+                log_kv(
+                    {
+                        "message": "Requested a local key for a user which"
+                        " was not local to the homeserver",
+                        "user_id": user_id,
+                    }
+                )
+                set_tag("error", True)
                 raise SynapseError(400, "Not a user here")
 
             if not device_ids:
@@ -241,6 +259,7 @@ class E2eKeysHandler(object):
                     r["unsigned"]["device_display_name"] = display_name
                 result_dict[user_id][device_id] = r
 
+        log_kv(results)
         return result_dict
 
     @defer.inlineCallbacks
@@ -251,6 +270,7 @@ class E2eKeysHandler(object):
         res = yield self.query_local_devices(device_keys_query)
         return {"device_keys": res}
 
+    @trace
     @defer.inlineCallbacks
     def claim_one_time_keys(self, query, timeout):
         local_query = []
@@ -265,6 +285,9 @@ class E2eKeysHandler(object):
                 domain = get_domain_from_id(user_id)
                 remote_queries.setdefault(domain, {})[user_id] = device_keys
 
+        set_tag("local_key_query", local_query)
+        set_tag("remote_key_query", remote_queries)
+
         results = yield self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
@@ -276,8 +299,10 @@ class E2eKeysHandler(object):
                         key_id: json.loads(json_bytes)
                     }
 
+        @trace
         @defer.inlineCallbacks
         def claim_client_keys(destination):
+            set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
                 remote_result = yield self.federation.claim_client_keys(
@@ -290,6 +315,8 @@ class E2eKeysHandler(object):
             except Exception as e:
                 failure = _exception_to_failure(e)
                 failures[destination] = failure
+                set_tag("error", True)
+                set_tag("reason", failure)
 
         yield make_deferred_yieldable(
             defer.gatherResults(
@@ -313,9 +340,11 @@ class E2eKeysHandler(object):
             ),
         )
 
+        log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
     @defer.inlineCallbacks
+    @tag_args
     def upload_keys_for_user(self, user_id, device_id, keys):
 
         time_now = self.clock.time_msec()
@@ -329,6 +358,13 @@ class E2eKeysHandler(object):
                 user_id,
                 time_now,
             )
+            log_kv(
+                {
+                    "message": "Updating device_keys for user.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             # TODO: Sign the JSON with the server key
             changed = yield self.store.set_e2e_device_keys(
                 user_id, device_id, time_now, device_keys
@@ -336,12 +372,24 @@ class E2eKeysHandler(object):
             if changed:
                 # Only notify about device updates *if* the keys actually changed
                 yield self.device_handler.notify_device_update(user_id, [device_id])
-
+        else:
+            log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
+            log_kv(
+                {
+                    "message": "Updating one_time_keys for device.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
             yield self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
+        else:
+            log_kv(
+                {"message": "Did not update one_time_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
@@ -352,6 +400,7 @@ class E2eKeysHandler(object):
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
+        set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
     @defer.inlineCallbacks
@@ -395,6 +444,7 @@ class E2eKeysHandler(object):
                     (algorithm, key_id, encode_canonical_json(key).decode("ascii"))
                 )
 
+        log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
         yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
 
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 41b871fc59..a9d80f708c 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -26,6 +26,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
+from synapse.logging.opentracing import log_kv, trace
 from synapse.util.async_helpers import Linearizer
 
 logger = logging.getLogger(__name__)
@@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
         # changed.
         self._upload_linearizer = Linearizer("upload_room_keys_lock")
 
+    @trace
     @defer.inlineCallbacks
     def get_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
@@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
                 user_id, version, room_id, session_id
             )
 
+            log_kv(results)
             return results
 
+    @trace
     @defer.inlineCallbacks
     def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
@@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
         with (yield self._upload_linearizer.queue(user_id)):
             yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
 
+    @trace
     @defer.inlineCallbacks
     def upload_room_keys(self, user_id, version, room_keys):
         """Bulk upload a list of room keys into a given backup version, asserting
@@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
             session_id(str): the session whose room_key we're setting
             room_key(dict): the room_key being set
         """
-
+        log_kv(
+            {
+                "message": "Trying to upload room key",
+                "room_id": room_id,
+                "session_id": session_id,
+                "user_id": user_id,
+            }
+        )
         # get the room_key for this particular row
         current_room_key = None
         try:
@@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
             )
         except StoreError as e:
             if e.code == 404:
-                pass
+                log_kv(
+                    {
+                        "message": "Room key not found.",
+                        "room_id": room_id,
+                        "user_id": user_id,
+                    }
+                )
             else:
                 raise
 
         if self._should_replace_room_key(current_room_key, room_key):
+            log_kv({"message": "Replacing room key."})
             yield self.store.set_e2e_room_key(
                 user_id, version, room_id, session_id, room_key
             )
+        else:
+            log_kv({"message": "Not replacing room_key."})
 
     @staticmethod
     def _should_replace_room_key(current_room_key, room_key):
@@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
                 return False
         return True
 
+    @trace
     @defer.inlineCallbacks
     def create_version(self, user_id, version_info):
         """Create a new backup version.  This automatically becomes the new
@@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
                     raise
             return res
 
+    @trace
     @defer.inlineCallbacks
     def delete_version(self, user_id, version=None):
         """Deletes a given version of the user's e2e_room_keys backup
@@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
                 else:
                     raise
 
+    @trace
     @defer.inlineCallbacks
     def update_version(self, user_id, version, version_info):
         """Update the info about a given version of the user's backup
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 2f1f10a9af..5e748687e3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -167,7 +167,6 @@ class EventHandler(BaseHandler):
 
         if not event:
             return None
-            return
 
         users = yield self.store.get_users_in_room(event.room_id)
         is_peeking = user.to_string() not in users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c86903b98b..f72b81d419 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler):
                     ours = yield self.store.get_state_groups_ids(room_id, seen)
 
                     # state_maps is a list of mappings from (type, state_key) to event_id
-                    # type: list[dict[tuple[str, str], str]]
-                    state_maps = list(ours.values())
+                    state_maps = list(
+                        ours.values()
+                    )  # type: list[dict[tuple[str, str], str]]
 
                     # we don't need this any more, let's delete it.
                     del ours
@@ -1427,7 +1428,7 @@ class FederationHandler(BaseHandler):
         assert event.user_id == user_id
         assert event.state_key == user_id
         assert event.room_id == room_id
-        return (origin, event, format_ver)
+        return origin, event, format_ver
 
     @defer.inlineCallbacks
     @log_function
@@ -2529,12 +2530,17 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+    def on_exchange_third_party_invite_request(self, room_id, event_dict):
         """Handle an exchange_third_party_invite request from a remote server
 
         The remote server will call this when it wants to turn a 3pid invite
         into a normal m.room.member invite.
 
+        Args:
+            room_id (str): The ID of the room.
+
+            event_dict (dict[str, Any]): Dictionary containing the event body.
+
         Returns:
             Deferred: resolves (to None)
         """
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index d199521b58..512f38e5a6 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -29,6 +29,7 @@ from synapse.api.errors import (
     HttpResponseException,
     SynapseError,
 )
+from synapse.util.stringutils import random_string
 
 from ._base import BaseHandler
 
@@ -41,86 +42,130 @@ class IdentityHandler(BaseHandler):
 
         self.http_client = hs.get_simple_http_client()
         self.federation_http_client = hs.get_http_client()
+        self.hs = hs
 
-        self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
-        self.trust_any_id_server_just_for_testing_do_not_use = (
-            hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
-        )
+    def _extract_items_from_creds_dict(self, creds):
+        """
+        Retrieve entries from a "credentials" dictionary
 
-    def _should_trust_id_server(self, id_server):
-        if id_server not in self.trusted_id_servers:
-            if self.trust_any_id_server_just_for_testing_do_not_use:
-                logger.warn(
-                    "Trusting untrustworthy ID server %r even though it isn't"
-                    " in the trusted id list for testing because"
-                    " 'use_insecure_ssl_client_just_for_testing_do_not_use'"
-                    " is set in the config",
-                    id_server,
-                )
-            else:
-                return False
-        return True
+        Args:
+            creds (dict[str, str]): Dictionary of credentials that contain the following keys:
+                * client_secret|clientSecret: A unique secret str provided by the client
+                * id_server|idServer: the domain of the identity server to query
+                * id_access_token: The access token to authenticate to the identity
+                    server with.
+
+        Returns:
+            tuple(str, str, str|None): A tuple containing the client_secret, the id_server,
+                and the id_access_token value if available.
+        """
+        client_secret = creds.get("client_secret") or creds.get("clientSecret")
+        if not client_secret:
+            raise SynapseError(
+                400, "No client_secret in creds", errcode=Codes.MISSING_PARAM
+            )
+
+        id_server = creds.get("id_server") or creds.get("idServer")
+        if not id_server:
+            raise SynapseError(
+                400, "No id_server in creds", errcode=Codes.MISSING_PARAM
+            )
+
+        id_access_token = creds.get("id_access_token")
+        return client_secret, id_server, id_access_token
 
     @defer.inlineCallbacks
-    def threepid_from_creds(self, creds):
-        if "id_server" in creds:
-            id_server = creds["id_server"]
-        elif "idServer" in creds:
-            id_server = creds["idServer"]
-        else:
-            raise SynapseError(400, "No id_server in creds")
+    def threepid_from_creds(self, id_server, creds):
+        """
+        Retrieve and validate a threepid identifier from a "credentials" dictionary against a
+        given identity server
 
-        if "client_secret" in creds:
-            client_secret = creds["client_secret"]
-        elif "clientSecret" in creds:
-            client_secret = creds["clientSecret"]
-        else:
-            raise SynapseError(400, "No client_secret in creds")
+        Args:
+            id_server (str|None): The identity server to validate 3PIDs against. If None,
+                we will attempt to extract id_server creds
 
-        if not self._should_trust_id_server(id_server):
-            logger.warn(
-                "%s is not a trusted ID server: rejecting 3pid " + "credentials",
-                id_server,
-            )
-            return None
+            creds (dict[str, str]): Dictionary containing the following keys:
+                * id_server|idServer: An optional domain name of an identity server
+                * client_secret|clientSecret: A unique secret str provided by the client
+                * sid: The ID of the validation session
 
-        try:
-            data = yield self.http_client.get_json(
-                "https://%s%s"
-                % (id_server, "/_matrix/identity/api/v1/3pid/getValidated3pid"),
-                {"sid": creds["sid"], "client_secret": client_secret},
+        Returns:
+            Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
+                the /getValidated3pid endpoint of the Identity Service API, or None if the
+                threepid was not found
+        """
+        client_secret = creds.get("client_secret") or creds.get("clientSecret")
+        if not client_secret:
+            raise SynapseError(
+                400, "Missing param client_secret in creds", errcode=Codes.MISSING_PARAM
             )
-        except HttpResponseException as e:
-            logger.info("getValidated3pid failed with Matrix error: %r", e)
-            raise e.to_synapse_error()
+        session_id = creds.get("sid")
+        if not session_id:
+            raise SynapseError(
+                400, "Missing param session_id in creds", errcode=Codes.MISSING_PARAM
+            )
+        if not id_server:
+            # Attempt to get the id_server from the creds dict
+            id_server = creds.get("id_server") or creds.get("idServer")
+            if not id_server:
+                raise SynapseError(
+                    400, "Missing param id_server in creds", errcode=Codes.MISSING_PARAM
+                )
 
-        if "medium" in data:
-            return data
-        return None
+        query_params = {"sid": session_id, "client_secret": client_secret}
+
+        url = "https://%s%s" % (
+            id_server,
+            "/_matrix/identity/api/v1/3pid/getValidated3pid",
+        )
+
+        data = yield self.http_client.get_json(url, query_params)
+        return data if "medium" in data else None
 
     @defer.inlineCallbacks
-    def bind_threepid(self, creds, mxid):
+    def bind_threepid(self, creds, mxid, use_v2=True):
+        """Bind a 3PID to an identity server
+
+        Args:
+            creds (dict[str, str]): Dictionary of credentials that contain the following keys:
+                * client_secret|clientSecret: A unique secret str provided by the client
+                * id_server|idServer: the domain of the identity server to query
+                * id_access_token: The access token to authenticate to the identity
+                    server with. Required if use_v2 is true
+            mxid (str): The MXID to bind the 3PID to
+            use_v2 (bool): Whether to use v2 Identity Service API endpoints
+
+        Returns:
+            Deferred[dict]: The response from the identity server
+        """
         logger.debug("binding threepid %r to %s", creds, mxid)
-        data = None
 
-        if "id_server" in creds:
-            id_server = creds["id_server"]
-        elif "idServer" in creds:
-            id_server = creds["idServer"]
-        else:
-            raise SynapseError(400, "No id_server in creds")
+        client_secret, id_server, id_access_token = self._extract_items_from_creds_dict(
+            creds
+        )
 
-        if "client_secret" in creds:
-            client_secret = creds["client_secret"]
-        elif "clientSecret" in creds:
-            client_secret = creds["clientSecret"]
+        sid = creds.get("sid")
+        if not sid:
+            raise SynapseError(
+                400, "No sid in three_pid_creds", errcode=Codes.MISSING_PARAM
+            )
+
+        # If an id_access_token is not supplied, force usage of v1
+        if id_access_token is None:
+            use_v2 = False
+
+        # Decide which API endpoint URLs to use
+        headers = {}
+        bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
+        if use_v2:
+            bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
+            headers["Authorization"] = create_id_access_token_header(id_access_token)
         else:
-            raise SynapseError(400, "No client_secret in creds")
+            bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
 
         try:
             data = yield self.http_client.post_json_get_json(
-                "https://%s%s" % (id_server, "/_matrix/identity/api/v1/3pid/bind"),
-                {"sid": creds["sid"], "client_secret": client_secret, "mxid": mxid},
+                bind_url, bind_data, headers=headers
             )
             logger.debug("bound threepid %r to %s", creds, mxid)
 
@@ -131,13 +176,23 @@ class IdentityHandler(BaseHandler):
                 address=data["address"],
                 id_server=id_server,
             )
+
+            return data
+        except HttpResponseException as e:
+            if e.code != 404 or not use_v2:
+                logger.error("3PID bind failed with Matrix error: %r", e)
+                raise e.to_synapse_error()
         except CodeMessageException as e:
             data = json.loads(e.msg)  # XXX WAT?
-        return data
+            return data
+
+        logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
+        return (yield self.bind_threepid(creds, mxid, use_v2=False))
 
     @defer.inlineCallbacks
     def try_unbind_threepid(self, mxid, threepid):
-        """Removes a binding from an identity server
+        """Attempt to remove a 3PID from an identity server, or if one is not provided, all
+        identity servers we're aware the binding is present on
 
         Args:
             mxid (str): Matrix user ID of binding to be removed
@@ -188,6 +243,8 @@ class IdentityHandler(BaseHandler):
             server doesn't support unbinding
         """
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
+        url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
+
         content = {
             "mxid": mxid,
             "threepid": {"medium": threepid["medium"], "address": threepid["address"]},
@@ -199,7 +256,7 @@ class IdentityHandler(BaseHandler):
         auth_headers = self.federation_http_client.build_auth_headers(
             destination=None,
             method="POST",
-            url_bytes="/_matrix/identity/api/v1/3pid/unbind".encode("ascii"),
+            url_bytes=url_bytes,
             content=content,
             destination_is=id_server,
         )
@@ -227,27 +284,121 @@ class IdentityHandler(BaseHandler):
         return changed
 
     @defer.inlineCallbacks
+    def send_threepid_validation(
+        self,
+        email_address,
+        client_secret,
+        send_attempt,
+        send_email_func,
+        next_link=None,
+    ):
+        """Send a threepid validation email for password reset or
+        registration purposes
+
+        Args:
+            email_address (str): The user's email address
+            client_secret (str): The provided client secret
+            send_attempt (int): Which send attempt this is
+            send_email_func (func): A function that takes an email address, token,
+                                    client_secret and session_id, sends an email
+                                    and returns a Deferred.
+            next_link (str|None): The URL to redirect the user to after validation
+
+        Returns:
+            The new session_id upon success
+
+        Raises:
+            SynapseError is an error occurred when sending the email
+        """
+        # Check that this email/client_secret/send_attempt combo is new or
+        # greater than what we've seen previously
+        session = yield self.store.get_threepid_validation_session(
+            "email", client_secret, address=email_address, validated=False
+        )
+
+        # Check to see if a session already exists and that it is not yet
+        # marked as validated
+        if session and session.get("validated_at") is None:
+            session_id = session["session_id"]
+            last_send_attempt = session["last_send_attempt"]
+
+            # Check that the send_attempt is higher than previous attempts
+            if send_attempt <= last_send_attempt:
+                # If not, just return a success without sending an email
+                return session_id
+        else:
+            # An non-validated session does not exist yet.
+            # Generate a session id
+            session_id = random_string(16)
+
+        # Generate a new validation token
+        token = random_string(32)
+
+        # Send the mail with the link containing the token, client_secret
+        # and session_id
+        try:
+            yield send_email_func(email_address, token, client_secret, session_id)
+        except Exception:
+            logger.exception(
+                "Error sending threepid validation email to %s", email_address
+            )
+            raise SynapseError(500, "An error was encountered when sending the email")
+
+        token_expires = (
+            self.hs.clock.time_msec() + self.hs.config.email_validation_token_lifetime
+        )
+
+        yield self.store.start_or_continue_validation_session(
+            "email",
+            email_address,
+            session_id,
+            client_secret,
+            send_attempt,
+            next_link,
+            token,
+            token_expires,
+        )
+
+        return session_id
+
+    @defer.inlineCallbacks
     def requestEmailToken(
         self, id_server, email, client_secret, send_attempt, next_link=None
     ):
-        if not self._should_trust_id_server(id_server):
-            raise SynapseError(
-                400, "Untrusted ID server '%s'" % id_server, Codes.SERVER_NOT_TRUSTED
-            )
+        """
+        Request an external server send an email on our behalf for the purposes of threepid
+        validation.
+
+        Args:
+            id_server (str): The identity server to proxy to
+            email (str): The email to send the message to
+            client_secret (str): The unique client_secret sends by the user
+            send_attempt (int): Which attempt this is
+            next_link: A link to redirect the user to once they submit the token
 
+        Returns:
+            The json response body from the server
+        """
         params = {
             "email": email,
             "client_secret": client_secret,
             "send_attempt": send_attempt,
         }
-
         if next_link:
-            params.update({"next_link": next_link})
+            params["next_link"] = next_link
+
+        if self.hs.config.using_identity_server_from_trusted_list:
+            # Warn that a deprecated config option is in use
+            logger.warn(
+                'The config option "trust_identity_server_for_password_resets" '
+                'has been replaced by "account_threepid_delegate". '
+                "Please consult the sample config at docs/sample_config.yaml for "
+                "details and update your config file."
+            )
 
         try:
             data = yield self.http_client.post_json_get_json(
-                "https://%s%s"
-                % (id_server, "/_matrix/identity/api/v1/validate/email/requestToken"),
+                id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
                 params,
             )
             return data
@@ -257,28 +408,85 @@ class IdentityHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def requestMsisdnToken(
-        self, id_server, country, phone_number, client_secret, send_attempt, **kwargs
+        self,
+        id_server,
+        country,
+        phone_number,
+        client_secret,
+        send_attempt,
+        next_link=None,
     ):
-        if not self._should_trust_id_server(id_server):
-            raise SynapseError(
-                400, "Untrusted ID server '%s'" % id_server, Codes.SERVER_NOT_TRUSTED
-            )
+        """
+        Request an external server send an SMS message on our behalf for the purposes of
+        threepid validation.
+        Args:
+            id_server (str): The identity server to proxy to
+            country (str): The country code of the phone number
+            phone_number (str): The number to send the message to
+            client_secret (str): The unique client_secret sends by the user
+            send_attempt (int): Which attempt this is
+            next_link: A link to redirect the user to once they submit the token
 
+        Returns:
+            The json response body from the server
+        """
         params = {
             "country": country,
             "phone_number": phone_number,
             "client_secret": client_secret,
             "send_attempt": send_attempt,
         }
-        params.update(kwargs)
+        if next_link:
+            params["next_link"] = next_link
+
+        if self.hs.config.using_identity_server_from_trusted_list:
+            # Warn that a deprecated config option is in use
+            logger.warn(
+                'The config option "trust_identity_server_for_password_resets" '
+                'has been replaced by "account_threepid_delegate". '
+                "Please consult the sample config at docs/sample_config.yaml for "
+                "details and update your config file."
+            )
 
         try:
             data = yield self.http_client.post_json_get_json(
-                "https://%s%s"
-                % (id_server, "/_matrix/identity/api/v1/validate/msisdn/requestToken"),
+                id_server + "/_matrix/identity/api/v1/validate/msisdn/requestToken",
                 params,
             )
             return data
         except HttpResponseException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e.to_synapse_error()
+
+
+def create_id_access_token_header(id_access_token):
+    """Create an Authorization header for passing to SimpleHttpClient as the header value
+    of an HTTP request.
+
+    Args:
+        id_access_token (str): An identity server access token.
+
+    Returns:
+        list[str]: The ascii-encoded bearer token encased in a list.
+    """
+    # Prefix with Bearer
+    bearer_token = "Bearer %s" % id_access_token
+
+    # Encode headers to standard ascii
+    bearer_token.encode("ascii")
+
+    # Return as a list as that's how SimpleHttpClient takes header values
+    return [bearer_token]
+
+
+class LookupAlgorithm:
+    """
+    Supported hashing algorithms when performing a 3PID lookup.
+
+    SHA256 - Hashing an (address, medium, pepper) combo with sha256, then url-safe base64
+        encoding
+    NONE - Not performing any hashing. Simply sending an (address, medium) combo in plaintext
+    """
+
+    SHA256 = "sha256"
+    NONE = "none"
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 42d6650ed9..f991efeee3 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -449,8 +449,7 @@ class InitialSyncHandler(BaseHandler):
             #  * The user is a guest user, and has joined the room
             # else it will throw.
             member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
-            return (member_event.membership, member_event.event_id)
-            return
+            return member_event.membership, member_event.event_id
         except AuthError:
             visibility = yield self.state_handler.get_current_state(
                 room_id, EventTypes.RoomHistoryVisibility, ""
@@ -459,8 +458,7 @@ class InitialSyncHandler(BaseHandler):
                 visibility
                 and visibility.content["history_visibility"] == "world_readable"
             ):
-                return (Membership.JOIN, None)
-                return
+                return Membership.JOIN, None
             raise AuthError(
                 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
             )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a5e23c4caf..1f8272784e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -24,7 +24,7 @@ from twisted.internet import defer
 from twisted.internet.defer import succeed
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RelationTypes
+from synapse.api.constants import EventTypes, Membership, RelationTypes, UserTypes
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -469,6 +469,9 @@ class EventCreationHandler(object):
 
         u = yield self.store.get_user_by_id(user_id)
         assert u is not None
+        if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
+            # support and bot users are not required to consent
+            return
         if u["appservice_id"] is not None:
             # users registered by an appservice are exempt
             return
@@ -726,7 +729,27 @@ class EventCreationHandler(object):
         assert not self.config.worker_app
 
         if ratelimit:
-            yield self.base_handler.ratelimit(requester)
+            # We check if this is a room admin redacting an event so that we
+            # can apply different ratelimiting. We do this by simply checking
+            # it's not a self-redaction (to avoid having to look up whether the
+            # user is actually admin or not).
+            is_admin_redaction = False
+            if event.type == EventTypes.Redaction:
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=True,
+                )
+
+                is_admin_redaction = (
+                    original_event and event.sender != original_event.sender
+                )
+
+            yield self.base_handler.ratelimit(
+                requester, is_admin_redaction=is_admin_redaction
+            )
 
         yield self.base_handler.maybe_kick_guest_users(event, context)
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d83aab3f74..5744f4579d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -70,6 +70,7 @@ class PaginationHandler(object):
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
         self.clock = hs.get_clock()
+        self._server_name = hs.hostname
 
         self.pagination_lock = ReadWriteLock()
         self._purges_in_progress_by_room = set()
@@ -153,6 +154,22 @@ class PaginationHandler(object):
         """
         return self._purges_by_id.get(purge_id)
 
+    async def purge_room(self, room_id):
+        """Purge the given room from the database"""
+        with (await self.pagination_lock.write(room_id)):
+            # check we know about the room
+            await self.store.get_room_version(room_id)
+
+            # first check that we have no users in this room
+            joined = await defer.maybeDeferred(
+                self.store.is_host_joined, room_id, self._server_name
+            )
+
+            if joined:
+                raise SynapseError(400, "Users are still joined to this room")
+
+            await self.store.purge_room(room_id)
+
     @defer.inlineCallbacks
     def get_messages(
         self,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 94a9ca0357..053cf66b28 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -255,7 +255,7 @@ class PresenceHandler(object):
         self.unpersisted_users_changes = set()
 
         if unpersisted:
-            logger.info("Persisting %d upersisted presence updates", len(unpersisted))
+            logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
             yield self.store.update_presence(
                 [self.user_to_current_state[user_id] for user_id in unpersisted]
             )
@@ -1032,7 +1032,7 @@ class PresenceEventSource(object):
                 #
                 # Hence this guard where we just return nothing so that the sync
                 # doesn't return. C.f. #5503.
-                return ([], max_token)
+                return [], max_token
 
             presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
@@ -1279,7 +1279,7 @@ def get_interested_parties(store, states):
         # Always notify self
         users_to_states.setdefault(state.user_id, []).append(state)
 
-    return (room_ids_to_states, users_to_states)
+    return room_ids_to_states, users_to_states
 
 
 @defer.inlineCallbacks
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 2cc237e6a5..8690f69d45 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -34,7 +34,7 @@ from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
-MAX_DISPLAYNAME_LEN = 100
+MAX_DISPLAYNAME_LEN = 256
 MAX_AVATAR_URL_LEN = 1000
 
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 73973502a4..6854c751a6 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -148,7 +148,7 @@ class ReceiptEventSource(object):
         to_key = yield self.get_current_key()
 
         if from_key == to_key:
-            return ([], to_key)
+            return [], to_key
 
         events = yield self.store.get_linearized_receipts_for_rooms(
             room_ids, from_key=from_key, to_key=to_key
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 4631fab94e..06bd03b77c 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -24,13 +24,11 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     ConsentNotGivenError,
-    InvalidCaptchaError,
     LimitExceededError,
     RegistrationError,
     SynapseError,
 )
 from synapse.config.server import is_threepid_reserved
-from synapse.http.client import CaptchaServerHttpClient
 from synapse.http.servlet import assert_params_in_dict
 from synapse.replication.http.login import RegisterDeviceReplicationServlet
 from synapse.replication.http.register import (
@@ -39,7 +37,6 @@ from synapse.replication.http.register import (
 )
 from synapse.types import RoomAlias, RoomID, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
-from synapse.util.threepids import check_3pid_allowed
 
 from ._base import BaseHandler
 
@@ -59,7 +56,6 @@ class RegistrationHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
-        self.captcha_client = CaptchaServerHttpClient(hs)
         self.identity_handler = self.hs.get_handlers().identity_handler
         self.ratelimiter = hs.get_registration_ratelimiter()
 
@@ -279,16 +275,12 @@ class RegistrationHandler(BaseHandler):
         fake_requester = create_requester(user_id)
 
         # try to create the room if we're the first real user on the server. Note
-        # that an auto-generated support user is not a real user and will never be
+        # that an auto-generated support or bot user is not a real user and will never be
         # the user to create the room
         should_auto_create_rooms = False
-        is_support = yield self.store.is_support_user(user_id)
-        # There is an edge case where the first user is the support user, then
-        # the room is never created, though this seems unlikely and
-        # recoverable from given the support user being involved in the first
-        # place.
-        if self.hs.config.autocreate_auto_join_rooms and not is_support:
-            count = yield self.store.count_all_users()
+        is_real_user = yield self.store.is_real_user(user_id)
+        if self.hs.config.autocreate_auto_join_rooms and is_real_user:
+            count = yield self.store.count_real_users()
             should_auto_create_rooms = count == 1
         for r in self.hs.config.auto_join_rooms:
             logger.info("Auto-joining %s to %s", user_id, r)
@@ -362,70 +354,6 @@ class RegistrationHandler(BaseHandler):
         )
         return user_id
 
-    @defer.inlineCallbacks
-    def check_recaptcha(self, ip, private_key, challenge, response):
-        """
-        Checks a recaptcha is correct.
-
-        Used only by c/s api v1
-        """
-
-        captcha_response = yield self._validate_captcha(
-            ip, private_key, challenge, response
-        )
-        if not captcha_response["valid"]:
-            logger.info(
-                "Invalid captcha entered from %s. Error: %s",
-                ip,
-                captcha_response["error_url"],
-            )
-            raise InvalidCaptchaError(error_url=captcha_response["error_url"])
-        else:
-            logger.info("Valid captcha entered from %s", ip)
-
-    @defer.inlineCallbacks
-    def register_email(self, threepidCreds):
-        """
-        Registers emails with an identity server.
-
-        Used only by c/s api v1
-        """
-
-        for c in threepidCreds:
-            logger.info(
-                "validating threepidcred sid %s on id server %s",
-                c["sid"],
-                c["idServer"],
-            )
-            try:
-                threepid = yield self.identity_handler.threepid_from_creds(c)
-            except Exception:
-                logger.exception("Couldn't validate 3pid")
-                raise RegistrationError(400, "Couldn't validate 3pid")
-
-            if not threepid:
-                raise RegistrationError(400, "Couldn't validate 3pid")
-            logger.info(
-                "got threepid with medium '%s' and address '%s'",
-                threepid["medium"],
-                threepid["address"],
-            )
-
-            if not check_3pid_allowed(self.hs, threepid["medium"], threepid["address"]):
-                raise RegistrationError(403, "Third party identifier is not allowed")
-
-    @defer.inlineCallbacks
-    def bind_emails(self, user_id, threepidCreds):
-        """Links emails with a user ID and informs an identity server.
-
-        Used only by c/s api v1
-        """
-
-        # Now we have a matrix ID, bind it to the threepids we were given
-        for c in threepidCreds:
-            # XXX: This should be a deferred list, shouldn't it?
-            yield self.identity_handler.bind_threepid(c, user_id)
-
     def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
         # don't allow people to register the server notices mxid
         if self._server_notices_mxid is not None:
@@ -464,44 +392,7 @@ class RegistrationHandler(BaseHandler):
         return str(id)
 
     @defer.inlineCallbacks
-    def _validate_captcha(self, ip_addr, private_key, challenge, response):
-        """Validates the captcha provided.
-
-        Used only by c/s api v1
-
-        Returns:
-            dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
-
-        """
-        response = yield self._submit_captcha(ip_addr, private_key, challenge, response)
-        # parse Google's response. Lovely format..
-        lines = response.split("\n")
-        json = {
-            "valid": lines[0] == "true",
-            "error_url": "http://www.recaptcha.net/recaptcha/api/challenge?"
-            + "error=%s" % lines[1],
-        }
-        return json
-
-    @defer.inlineCallbacks
-    def _submit_captcha(self, ip_addr, private_key, challenge, response):
-        """
-        Used only by c/s api v1
-        """
-        data = yield self.captcha_client.post_urlencoded_get_raw(
-            "http://www.recaptcha.net:80/recaptcha/api/verify",
-            args={
-                "privatekey": private_key,
-                "remoteip": ip_addr,
-                "challenge": challenge,
-                "response": response,
-            },
-        )
-        return data
-
-    @defer.inlineCallbacks
     def _join_user_to_room(self, requester, room_identifier):
-        room_id = None
         room_member_handler = self.hs.get_room_member_handler()
         if RoomID.is_valid(room_identifier):
             room_id = room_identifier
@@ -622,7 +513,7 @@ class RegistrationHandler(BaseHandler):
                 initial_display_name=initial_display_name,
                 is_guest=is_guest,
             )
-            return (r["device_id"], r["access_token"])
+            return r["device_id"], r["access_token"]
 
         valid_until_ms = None
         if self.session_lifetime is not None:
@@ -648,9 +539,7 @@ class RegistrationHandler(BaseHandler):
         return (device_id, access_token)
 
     @defer.inlineCallbacks
-    def post_registration_actions(
-        self, user_id, auth_result, access_token, bind_email, bind_msisdn
-    ):
+    def post_registration_actions(self, user_id, auth_result, access_token):
         """A user has completed registration
 
         Args:
@@ -659,18 +548,10 @@ class RegistrationHandler(BaseHandler):
                 registered user.
             access_token (str|None): The access token of the newly logged in
                 device, or None if `inhibit_login` enabled.
-            bind_email (bool): Whether to bind the email with the identity
-                server.
-            bind_msisdn (bool): Whether to bind the msisdn with the identity
-                server.
         """
         if self.hs.config.worker_app:
             yield self._post_registration_client(
-                user_id=user_id,
-                auth_result=auth_result,
-                access_token=access_token,
-                bind_email=bind_email,
-                bind_msisdn=bind_msisdn,
+                user_id=user_id, auth_result=auth_result, access_token=access_token
             )
             return
 
@@ -683,13 +564,11 @@ class RegistrationHandler(BaseHandler):
             ):
                 yield self.store.upsert_monthly_active_user(user_id)
 
-            yield self._register_email_threepid(
-                user_id, threepid, access_token, bind_email
-            )
+            yield self._register_email_threepid(user_id, threepid, access_token)
 
         if auth_result and LoginType.MSISDN in auth_result:
             threepid = auth_result[LoginType.MSISDN]
-            yield self._register_msisdn_threepid(user_id, threepid, bind_msisdn)
+            yield self._register_msisdn_threepid(user_id, threepid)
 
         if auth_result and LoginType.TERMS in auth_result:
             yield self._on_user_consented(user_id, self.hs.config.user_consent_version)
@@ -708,14 +587,12 @@ class RegistrationHandler(BaseHandler):
         yield self.post_consent_actions(user_id)
 
     @defer.inlineCallbacks
-    def _register_email_threepid(self, user_id, threepid, token, bind_email):
+    def _register_email_threepid(self, user_id, threepid, token):
         """Add an email address as a 3pid identifier
 
         Also adds an email pusher for the email address, if configured in the
         HS config
 
-        Also optionally binds emails to the given user_id on the identity server
-
         Must be called on master.
 
         Args:
@@ -723,8 +600,6 @@ class RegistrationHandler(BaseHandler):
             threepid (object): m.login.email.identity auth response
             token (str|None): access_token for the user, or None if not logged
                 in.
-            bind_email (bool): true if the client requested the email to be
-                bound at the identity server
         Returns:
             defer.Deferred:
         """
@@ -766,29 +641,15 @@ class RegistrationHandler(BaseHandler):
                 data={},
             )
 
-        if bind_email:
-            logger.info("bind_email specified: binding")
-            logger.debug("Binding emails %s to %s" % (threepid, user_id))
-            yield self.identity_handler.bind_threepid(
-                threepid["threepid_creds"], user_id
-            )
-        else:
-            logger.info("bind_email not specified: not binding email")
-
     @defer.inlineCallbacks
-    def _register_msisdn_threepid(self, user_id, threepid, bind_msisdn):
+    def _register_msisdn_threepid(self, user_id, threepid):
         """Add a phone number as a 3pid identifier
 
-        Also optionally binds msisdn to the given user_id on the identity server
-
         Must be called on master.
 
         Args:
             user_id (str): id of user
             threepid (object): m.login.msisdn auth response
-            token (str): access_token for the user
-            bind_email (bool): true if the client requested the email to be
-                bound at the identity server
         Returns:
             defer.Deferred:
         """
@@ -804,12 +665,3 @@ class RegistrationHandler(BaseHandler):
         yield self._auth_handler.add_threepid(
             user_id, threepid["medium"], threepid["address"], threepid["validated_at"]
         )
-
-        if bind_msisdn:
-            logger.info("bind_msisdn specified: binding")
-            logger.debug("Binding msisdn %s to %s", threepid, user_id)
-            yield self.identity_handler.bind_threepid(
-                threepid["threepid_creds"], user_id
-            )
-        else:
-            logger.info("bind_msisdn not specified: not binding msisdn")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5caa90c3b7..970be3c846 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -560,6 +560,18 @@ class RoomCreationHandler(BaseHandler):
 
         yield self.event_creation_handler.assert_accepted_privacy_policy(requester)
 
+        power_level_content_override = config.get("power_level_content_override")
+        if (
+            power_level_content_override
+            and "users" in power_level_content_override
+            and user_id not in power_level_content_override["users"]
+        ):
+            raise SynapseError(
+                400,
+                "Not a valid power_level_content_override: 'users' did not contain %s"
+                % (user_id,),
+            )
+
         invite_3pid_list = config.get("invite_3pid", [])
 
         visibility = config.get("visibility", None)
@@ -567,8 +579,8 @@ class RoomCreationHandler(BaseHandler):
 
         room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
 
+        directory_handler = self.hs.get_handlers().directory_handler
         if room_alias:
-            directory_handler = self.hs.get_handlers().directory_handler
             yield directory_handler.create_association(
                 requester=requester,
                 room_id=room_id,
@@ -604,7 +616,7 @@ class RoomCreationHandler(BaseHandler):
             initial_state=initial_state,
             creation_content=creation_content,
             room_alias=room_alias,
-            power_level_content_override=config.get("power_level_content_override"),
+            power_level_content_override=power_level_content_override,
             creator_join_profile=creator_join_profile,
         )
 
@@ -653,6 +665,7 @@ class RoomCreationHandler(BaseHandler):
 
         for invite_3pid in invite_3pid_list:
             id_server = invite_3pid["id_server"]
+            id_access_token = invite_3pid.get("id_access_token")  # optional
             address = invite_3pid["address"]
             medium = invite_3pid["medium"]
             yield self.hs.get_room_member_handler().do_3pid_invite(
@@ -663,6 +676,7 @@ class RoomCreationHandler(BaseHandler):
                 id_server,
                 requester,
                 txn_id=None,
+                id_access_token=id_access_token,
             )
 
         result = {"room_id": room_id}
@@ -840,7 +854,6 @@ class RoomContextHandler(object):
         )
         if not event:
             return None
-            return
 
         filtered = yield (filter_evts([event]))
         if not filtered:
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index e9094ad02b..a7e55f00e5 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -25,6 +25,7 @@ from unpaddedbase64 import decode_base64, encode_base64
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.errors import Codes, HttpResponseException
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -485,7 +486,33 @@ class RoomListHandler(BaseHandler):
             return {"chunk": [], "total_room_count_estimate": 0}
 
         if search_filter:
-            # We currently don't support searching across federation, so we have
+            # Searching across federation is defined in MSC2197.
+            # However, the remote homeserver may or may not actually support it.
+            # So we first try an MSC2197 remote-filtered search, then fall back
+            # to a locally-filtered search if we must.
+
+            try:
+                res = yield self._get_remote_list_cached(
+                    server_name,
+                    limit=limit,
+                    since_token=since_token,
+                    include_all_networks=include_all_networks,
+                    third_party_instance_id=third_party_instance_id,
+                    search_filter=search_filter,
+                )
+                return res
+            except HttpResponseException as hre:
+                syn_err = hre.to_synapse_error()
+                if hre.code in (404, 405) or syn_err.errcode in (
+                    Codes.UNRECOGNIZED,
+                    Codes.NOT_FOUND,
+                ):
+                    logger.debug("Falling back to locally-filtered /publicRooms")
+                else:
+                    raise  # Not an error that should trigger a fallback.
+
+            # if we reach this point, then we fall back to the situation where
+            # we currently don't support searching across federation, so we have
             # to do it manually without pagination
             limit = None
             since_token = None
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 249a6d9c5d..35450feb6f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -29,9 +29,11 @@ from twisted.internet import defer
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
+from synapse.handlers.identity import LookupAlgorithm, create_id_access_token_header
 from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
+from synapse.util.hash import sha256_and_url_safe_base64
 
 from ._base import BaseHandler
 
@@ -100,7 +102,7 @@ class RoomMemberHandler(object):
         raise NotImplementedError()
 
     @abc.abstractmethod
-    def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+    def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Attempt to reject an invite for a room this server is not in. If we
         fail to do so we locally mark the invite as rejected.
 
@@ -510,9 +512,7 @@ class RoomMemberHandler(object):
         return res
 
     @defer.inlineCallbacks
-    def send_membership_event(
-        self, requester, event, context, remote_room_hosts=None, ratelimit=True
-    ):
+    def send_membership_event(self, requester, event, context, ratelimit=True):
         """
         Change the membership status of a user in a room.
 
@@ -522,16 +522,10 @@ class RoomMemberHandler(object):
                 act as the sender, will be skipped.
             event (SynapseEvent): The membership event.
             context: The context of the event.
-            is_guest (bool): Whether the sender is a guest.
-            room_hosts ([str]): Homeservers which are likely to already be in
-                the room, and could be danced with in order to join this
-                homeserver for the first time.
             ratelimit (bool): Whether to rate limit this request.
         Raises:
             SynapseError if there was a problem changing the membership.
         """
-        remote_room_hosts = remote_room_hosts or []
-
         target_user = UserID.from_string(event.state_key)
         room_id = event.room_id
 
@@ -634,7 +628,7 @@ class RoomMemberHandler(object):
             servers.remove(room_alias.domain)
         servers.insert(0, room_alias.domain)
 
-        return (RoomID.from_string(room_id), servers)
+        return RoomID.from_string(room_id), servers
 
     @defer.inlineCallbacks
     def _get_inviter(self, user_id, room_id):
@@ -646,7 +640,15 @@ class RoomMemberHandler(object):
 
     @defer.inlineCallbacks
     def do_3pid_invite(
-        self, room_id, inviter, medium, address, id_server, requester, txn_id
+        self,
+        room_id,
+        inviter,
+        medium,
+        address,
+        id_server,
+        requester,
+        txn_id,
+        id_access_token=None,
     ):
         if self.config.block_non_admin_invites:
             is_requester_admin = yield self.auth.is_server_admin(requester.user)
@@ -669,7 +671,12 @@ class RoomMemberHandler(object):
                 Codes.FORBIDDEN,
             )
 
-        invitee = yield self._lookup_3pid(id_server, medium, address)
+        if not self._enable_lookup:
+            raise SynapseError(
+                403, "Looking up third-party identifiers is denied from this server"
+            )
+
+        invitee = yield self._lookup_3pid(id_server, medium, address, id_access_token)
 
         if invitee:
             yield self.update_membership(
@@ -677,11 +684,18 @@ class RoomMemberHandler(object):
             )
         else:
             yield self._make_and_store_3pid_invite(
-                requester, id_server, medium, address, room_id, inviter, txn_id=txn_id
+                requester,
+                id_server,
+                medium,
+                address,
+                room_id,
+                inviter,
+                txn_id=txn_id,
+                id_access_token=id_access_token,
             )
 
     @defer.inlineCallbacks
-    def _lookup_3pid(self, id_server, medium, address):
+    def _lookup_3pid(self, id_server, medium, address, id_access_token=None):
         """Looks up a 3pid in the passed identity server.
 
         Args:
@@ -689,14 +703,48 @@ class RoomMemberHandler(object):
                 of the identity server to use.
             medium (str): The type of the third party identifier (e.g. "email").
             address (str): The third party identifier (e.g. "foo@example.com").
+            id_access_token (str|None): The access token to authenticate to the identity
+                server with
+
+        Returns:
+            str|None: the matrix ID of the 3pid, or None if it is not recognized.
+        """
+        if id_access_token is not None:
+            try:
+                results = yield self._lookup_3pid_v2(
+                    id_server, id_access_token, medium, address
+                )
+                return results
+
+            except Exception as e:
+                # Catch HttpResponseExcept for a non-200 response code
+                # Check if this identity server does not know about v2 lookups
+                if isinstance(e, HttpResponseException) and e.code == 404:
+                    # This is an old identity server that does not yet support v2 lookups
+                    logger.warning(
+                        "Attempted v2 lookup on v1 identity server %s. Falling "
+                        "back to v1",
+                        id_server,
+                    )
+                else:
+                    logger.warning("Error when looking up hashing details: %s", e)
+                    return None
+
+        return (yield self._lookup_3pid_v1(id_server, medium, address))
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v1(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server using v1 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
 
         Returns:
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
-        if not self._enable_lookup:
-            raise SynapseError(
-                403, "Looking up third-party identifiers is denied from this server"
-            )
         try:
             data = yield self.simple_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
@@ -710,9 +758,116 @@ class RoomMemberHandler(object):
                 return data["mxid"]
 
         except IOError as e:
-            logger.warn("Error from identity server lookup: %s" % (e,))
+            logger.warning("Error from v1 identity server lookup: %s" % (e,))
+
+        return None
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
+        """Looks up a 3pid in the passed identity server using v2 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            id_access_token (str): The access token to authenticate to the identity server with
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
+        """
+        # Check what hashing details are supported by this identity server
+        hash_details = yield self.simple_http_client.get_json(
+            "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
+            {"access_token": id_access_token},
+        )
+
+        if not isinstance(hash_details, dict):
+            logger.warning(
+                "Got non-dict object when checking hash details of %s%s: %s",
+                id_server_scheme,
+                id_server,
+                hash_details,
+            )
+            raise SynapseError(
+                400,
+                "Non-dict object from %s%s during v2 hash_details request: %s"
+                % (id_server_scheme, id_server, hash_details),
+            )
+
+        # Extract information from hash_details
+        supported_lookup_algorithms = hash_details.get("algorithms")
+        lookup_pepper = hash_details.get("lookup_pepper")
+        if (
+            not supported_lookup_algorithms
+            or not isinstance(supported_lookup_algorithms, list)
+            or not lookup_pepper
+            or not isinstance(lookup_pepper, str)
+        ):
+            raise SynapseError(
+                400,
+                "Invalid hash details received from identity server %s%s: %s"
+                % (id_server_scheme, id_server, hash_details),
+            )
+
+        # Check if any of the supported lookup algorithms are present
+        if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
+            # Perform a hashed lookup
+            lookup_algorithm = LookupAlgorithm.SHA256
+
+            # Hash address, medium and the pepper with sha256
+            to_hash = "%s %s %s" % (address, medium, lookup_pepper)
+            lookup_value = sha256_and_url_safe_base64(to_hash)
+
+        elif LookupAlgorithm.NONE in supported_lookup_algorithms:
+            # Perform a non-hashed lookup
+            lookup_algorithm = LookupAlgorithm.NONE
+
+            # Combine together plaintext address and medium
+            lookup_value = "%s %s" % (address, medium)
+
+        else:
+            logger.warning(
+                "None of the provided lookup algorithms of %s are supported: %s",
+                id_server,
+                supported_lookup_algorithms,
+            )
+            raise SynapseError(
+                400,
+                "Provided identity server does not support any v2 lookup "
+                "algorithms that this homeserver supports.",
+            )
+
+        # Authenticate with identity server given the access token from the client
+        headers = {"Authorization": create_id_access_token_header(id_access_token)}
+
+        try:
+            lookup_results = yield self.simple_http_client.post_json_get_json(
+                "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
+                {
+                    "addresses": [lookup_value],
+                    "algorithm": lookup_algorithm,
+                    "pepper": lookup_pepper,
+                },
+                headers=headers,
+            )
+        except Exception as e:
+            logger.warning("Error when performing a v2 3pid lookup: %s", e)
+            raise SynapseError(
+                500, "Unknown error occurred during identity server lookup"
+            )
+
+        # Check for a mapping from what we looked up to an MXID
+        if "mappings" not in lookup_results or not isinstance(
+            lookup_results["mappings"], dict
+        ):
+            logger.warning("No results from 3pid lookup")
             return None
 
+        # Return the MXID if it's available, or None otherwise
+        mxid = lookup_results["mappings"].get(lookup_value)
+        return mxid
+
     @defer.inlineCallbacks
     def _verify_any_signature(self, data, server_hostname):
         if server_hostname not in data["signatures"]:
@@ -737,7 +892,15 @@ class RoomMemberHandler(object):
 
     @defer.inlineCallbacks
     def _make_and_store_3pid_invite(
-        self, requester, id_server, medium, address, room_id, user, txn_id
+        self,
+        requester,
+        id_server,
+        medium,
+        address,
+        room_id,
+        user,
+        txn_id,
+        id_access_token=None,
     ):
         room_state = yield self.state_handler.get_current_state(room_id)
 
@@ -786,6 +949,7 @@ class RoomMemberHandler(object):
                 room_name=room_name,
                 inviter_display_name=inviter_display_name,
                 inviter_avatar_url=inviter_avatar_url,
+                id_access_token=id_access_token,
             )
         )
 
@@ -823,6 +987,7 @@ class RoomMemberHandler(object):
         room_name,
         inviter_display_name,
         inviter_avatar_url,
+        id_access_token=None,
     ):
         """
         Asks an identity server for a third party invite.
@@ -842,6 +1007,8 @@ class RoomMemberHandler(object):
             inviter_display_name (str): The current display name of the
                 inviter.
             inviter_avatar_url (str): The URL of the inviter's avatar.
+            id_access_token (str|None): The access token to authenticate to the identity
+                server with
 
         Returns:
             A deferred tuple containing:
@@ -852,12 +1019,6 @@ class RoomMemberHandler(object):
                 display_name (str): A user-friendly name to represent the invited
                     user.
         """
-
-        is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
-            id_server_scheme,
-            id_server,
-        )
-
         invite_config = {
             "medium": medium,
             "address": address,
@@ -871,22 +1032,66 @@ class RoomMemberHandler(object):
             "sender_avatar_url": inviter_avatar_url,
         }
 
-        try:
-            data = yield self.simple_http_client.post_json_get_json(
-                is_url, invite_config
-            )
-        except HttpResponseException as e:
-            # Some identity servers may only support application/x-www-form-urlencoded
-            # types. This is especially true with old instances of Sydent, see
-            # https://github.com/matrix-org/sydent/pull/170
-            logger.info(
-                "Failed to POST %s with JSON, falling back to urlencoded form: %s",
-                is_url,
-                e,
+        # Add the identity service access token to the JSON body and use the v2
+        # Identity Service endpoints if id_access_token is present
+        data = None
+        base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server)
+
+        if id_access_token:
+            key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
+                id_server_scheme,
+                id_server,
             )
-            data = yield self.simple_http_client.post_urlencoded_get_json(
-                is_url, invite_config
+
+            # Attempt a v2 lookup
+            url = base_url + "/v2/store-invite"
+            try:
+                data = yield self.simple_http_client.post_json_get_json(
+                    url,
+                    invite_config,
+                    {"Authorization": create_id_access_token_header(id_access_token)},
+                )
+            except HttpResponseException as e:
+                if e.code != 404:
+                    logger.info("Failed to POST %s with JSON: %s", url, e)
+                    raise e
+
+        if data is None:
+            key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+                id_server_scheme,
+                id_server,
             )
+            url = base_url + "/api/v1/store-invite"
+
+            try:
+                data = yield self.simple_http_client.post_json_get_json(
+                    url, invite_config
+                )
+            except HttpResponseException as e:
+                logger.warning(
+                    "Error trying to call /store-invite on %s%s: %s",
+                    id_server_scheme,
+                    id_server,
+                    e,
+                )
+
+            if data is None:
+                # Some identity servers may only support application/x-www-form-urlencoded
+                # types. This is especially true with old instances of Sydent, see
+                # https://github.com/matrix-org/sydent/pull/170
+                try:
+                    data = yield self.simple_http_client.post_urlencoded_get_json(
+                        url, invite_config
+                    )
+                except HttpResponseException as e:
+                    logger.warning(
+                        "Error calling /store-invite on %s%s with fallback "
+                        "encoding: %s",
+                        id_server_scheme,
+                        id_server,
+                        e,
+                    )
+                    raise e
 
         # TODO: Check for success
         token = data["token"]
@@ -894,8 +1099,7 @@ class RoomMemberHandler(object):
         if "public_key" in data:
             fallback_public_key = {
                 "public_key": data["public_key"],
-                "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid"
-                % (id_server_scheme, id_server),
+                "key_validity_url": key_validity_url,
             }
         else:
             fallback_public_key = public_keys[0]
@@ -903,7 +1107,7 @@ class RoomMemberHandler(object):
         if not public_keys:
             public_keys.append(fallback_public_key)
         display_name = data["display_name"]
-        return (token, public_keys, fallback_public_key, display_name)
+        return token, public_keys, fallback_public_key, display_name
 
     @defer.inlineCallbacks
     def _is_host_in_room(self, current_state_ids):
@@ -962,9 +1166,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         )
 
         if complexity:
-            if complexity["v1"] > max_complexity:
-                return True
-            return False
+            return complexity["v1"] > max_complexity
         return None
 
     @defer.inlineCallbacks
@@ -980,10 +1182,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         max_complexity = self.hs.config.limit_remote_rooms.complexity
         complexity = yield self.store.get_room_complexity(room_id)
 
-        if complexity["v1"] > max_complexity:
-            return True
-
-        return False
+        return complexity["v1"] > max_complexity
 
     @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
@@ -1062,7 +1261,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             # The 'except' clause is very broad, but we need to
             # capture everything from DNS failures upwards
             #
-            logger.warn("Failed to reject invite: %s", e)
+            logger.warning("Failed to reject invite: %s", e)
 
             yield self.store.locally_reject_invite(target.to_string(), room_id)
             return {}
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..cbac7c347a 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -14,15 +14,14 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled:
+        if not self.hs.config.stats_enabled or self._is_processing:
             return
 
-        if self._is_processing:
-            return
+        self._is_processing = True
 
         @defer.inlineCallbacks
         def process():
@@ -75,39 +73,78 @@ class StatsHandler(StateDeltasHandler):
             finally:
                 self._is_processing = False
 
-        self._is_processing = True
         run_as_background_process("stats.notify_new_event", process)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
-
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
-            return None
+            self.pos = yield self.store.get_stats_positions()
 
         # Loop round handling deltas until we're up to date
+
         while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
-                    return
+            # Be sure to read the max stream_ordering *before* checking if there are any outstanding
+            # deltas, since there is otherwise a chance that we could miss updates which arrive
+            # after we check the deltas.
+            room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
+            if self.pos == room_max_stream_ordering:
+                break
+
+            deltas = yield self.store.get_current_state_deltas(self.pos)
+
+            if deltas:
+                logger.debug("Handling %d state deltas", len(deltas))
+                room_deltas, user_deltas = yield self._handle_deltas(deltas)
+
+                max_pos = deltas[-1]["stream_id"]
+            else:
+                room_deltas = {}
+                user_deltas = {}
+                max_pos = room_max_stream_ordering
+
+            # Then count deltas for total_events and total_event_bytes.
+            room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+                self.pos, max_pos
+            )
 
-                logger.info("Handling %d state deltas", len(deltas))
-                yield self._handle_deltas(deltas)
+            for room_id, fields in room_count.items():
+                room_deltas.setdefault(room_id, {}).update(fields)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+            for user_id, fields in user_count.items():
+                user_deltas.setdefault(user_id, {}).update(fields)
 
-                event_processing_positions.labels("stats").set(self.pos)
+            logger.debug("room_deltas: %s", room_deltas)
+            logger.debug("user_deltas: %s", user_deltas)
+
+            # Always call this so that we update the stats position.
+            yield self.store.bulk_update_stats_delta(
+                self.clock.time_msec(),
+                updates={"room": room_deltas, "user": user_deltas},
+                stream_id=max_pos,
+            )
+
+            logger.debug("Handled room stats to %s -> %s", self.pos, max_pos)
+
+            event_processing_positions.labels("stats").set(max_pos)
+
+            self.pos = max_pos
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+
+        Returns:
+            Deferred[tuple[dict[str, Counter], dict[str, counter]]]
+            Resovles to two dicts, the room deltas and the user deltas,
+            mapping from room/user ID to changes in the various fields.
         """
-        Called with the state deltas to process
-        """
+
+        room_to_stats_deltas = {}
+        user_to_stats_deltas = {}
+
+        room_to_state_updates = {}
+
         for delta in deltas:
             typ = delta["type"]
             state_key = delta["state_key"]
@@ -115,11 +152,10 @@ class StatsHandler(StateDeltasHandler):
             event_id = delta["event_id"]
             stream_id = delta["stream_id"]
             prev_event_id = delta["prev_event_id"]
-            stream_pos = delta["stream_id"]
 
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+            logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -131,203 +167,132 @@ class StatsHandler(StateDeltasHandler):
                 continue
 
             if event_id is None and prev_event_id is None:
-                # Errr...
+                logger.error(
+                    "event ID is None and so is the previous event ID. stream_id: %s",
+                    stream_id,
+                )
                 continue
 
             event_content = {}
 
+            sender = None
             if event_id is not None:
                 event = yield self.store.get_event(event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
+                    sender = event.sender
+
+            # All the values in this dict are deltas (RELATIVE changes)
+            room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
 
-            # We use stream_pos here rather than fetch by event_id as event_id
-            # may be None
-            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            room_state = room_to_state_updates.setdefault(room_id, {})
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] += 1
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
-
-                if prev_membership == membership:
-                    continue
 
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
-                    )
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif membership == prev_membership:
+                    pass  # noop
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] -= 1
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
-                    )
+                    room_stats_delta["invited_members"] -= 1
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
-                    )
+                    room_stats_delta["left_members"] -= 1
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
-                    )
+                    room_stats_delta["banned_members"] -= 1
                 else:
-                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError(
+                        "%r is not a valid prev_membership" % (prev_membership,)
+                    )
 
+                if membership == prev_membership:
+                    pass  # noop
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
-                    )
+                    room_stats_delta["joined_members"] += 1
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
-                    )
+                    room_stats_delta["invited_members"] += 1
+
+                    if sender and self.is_mine_id(sender):
+                        user_to_stats_deltas.setdefault(sender, Counter())[
+                            "invites_sent"
+                        ] += 1
+
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
-                    )
+                    room_stats_delta["left_members"] += 1
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
-                    )
+                    room_stats_delta["banned_members"] += 1
                 else:
-                    err = "%s is not a valid membership" % (repr(membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError("%r is not a valid membership" % (membership,))
 
                 user_id = state_key
                 if self.is_mine_id(user_id):
-                    # update user_stats as it's one of our users
-                    public = yield self._is_public_room(room_id)
-
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
-                        )
+                    # this accounts for transitions like leave → ban and so on.
+                    has_changed_joinedness = (prev_membership == Membership.JOIN) != (
+                        membership == Membership.JOIN
+                    )
 
-            elif typ == EventTypes.Create:
-                # Newly created room. Add it with all blank portions.
-                yield self.store.update_room_state(
-                    room_id,
-                    {
-                        "join_rules": None,
-                        "history_visibility": None,
-                        "encryption": None,
-                        "name": None,
-                        "topic": None,
-                        "avatar": None,
-                        "canonical_alias": None,
-                    },
-                )
+                    if has_changed_joinedness:
+                        delta = +1 if membership == Membership.JOIN else -1
 
-            elif typ == EventTypes.JoinRules:
-                yield self.store.update_room_state(
-                    room_id, {"join_rules": event_content.get("join_rule")}
-                )
+                        user_to_stats_deltas.setdefault(user_id, Counter())[
+                            "joined_rooms"
+                        ] += delta
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
-                )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+                        room_stats_delta["local_users_in_room"] += delta
 
-            elif typ == EventTypes.RoomHistoryVisibility:
-                yield self.store.update_room_state(
-                    room_id,
-                    {"history_visibility": event_content.get("history_visibility")},
+            elif typ == EventTypes.Create:
+                room_state["is_federatable"] = (
+                    event_content.get("m.federate", True) is True
                 )
-
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                if sender and self.is_mine_id(sender):
+                    user_to_stats_deltas.setdefault(sender, Counter())[
+                        "rooms_created"
+                    ] += 1
+            elif typ == EventTypes.JoinRules:
+                room_state["join_rules"] = event_content.get("join_rule")
+            elif typ == EventTypes.RoomHistoryVisibility:
+                room_state["history_visibility"] = event_content.get(
+                    "history_visibility"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
-
             elif typ == EventTypes.Encryption:
-                yield self.store.update_room_state(
-                    room_id, {"encryption": event_content.get("algorithm")}
-                )
+                room_state["encryption"] = event_content.get("algorithm")
             elif typ == EventTypes.Name:
-                yield self.store.update_room_state(
-                    room_id, {"name": event_content.get("name")}
-                )
+                room_state["name"] = event_content.get("name")
             elif typ == EventTypes.Topic:
-                yield self.store.update_room_state(
-                    room_id, {"topic": event_content.get("topic")}
-                )
+                room_state["topic"] = event_content.get("topic")
             elif typ == EventTypes.RoomAvatar:
-                yield self.store.update_room_state(
-                    room_id, {"avatar": event_content.get("url")}
-                )
+                room_state["avatar"] = event_content.get("url")
             elif typ == EventTypes.CanonicalAlias:
-                yield self.store.update_room_state(
-                    room_id, {"canonical_alias": event_content.get("alias")}
-                )
-
-    @defer.inlineCallbacks
-    def update_public_room_stats(self, ts, room_id, is_public):
-        """
-        Increment/decrement a user's number of public rooms when a room they are
-        in changes to/from public visibility.
+                room_state["canonical_alias"] = event_content.get("alias")
+            elif typ == EventTypes.GuestAccess:
+                room_state["guest_access"] = event_content.get("guest_access")
 
-        Args:
-            ts (int): Timestamp in seconds
-            room_id (str)
-            is_public (bool)
-        """
-        # For now, blindly iterate over all local users in the room so that
-        # we can handle the whole problem of copying buckets over as needed
-        user_ids = yield self.store.get_users_in_room(room_id)
-
-        for user_id in user_ids:
-            if self.hs.is_mine(UserID.from_string(user_id)):
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
-                )
+        for room_id, state in room_to_state_updates.items():
+            yield self.store.update_room_state(room_id, state)
 
-    @defer.inlineCallbacks
-    def _is_public_room(self, room_id):
-        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
-        history_visibility = yield self.state.get_current_state(
-            room_id, EventTypes.RoomHistoryVisibility
-        )
-
-        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
-            (
-                history_visibility
-                and history_visibility.content.get("history_visibility")
-                == "world_readable"
-            )
-        ):
-            return True
-        else:
-            return False
+        return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 98da2318a0..19bca6717f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -378,7 +378,7 @@ class SyncHandler(object):
                 event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
-        return (now_token, ephemeral_by_room)
+        return now_token, ephemeral_by_room
 
     @defer.inlineCallbacks
     def _load_filtered_recents(
@@ -578,7 +578,6 @@ class SyncHandler(object):
 
         if not last_events:
             return None
-            return
 
         last_event = last_events[-1]
         state_ids = yield self.store.get_state_ids_for_event(
@@ -786,9 +785,8 @@ class SyncHandler(object):
                         batch.events[0].event_id, state_filter=state_filter
                     )
                 else:
-                    # Its not clear how we get here, but empirically we do
-                    # (#5407). Logging has been added elsewhere to try and
-                    # figure out where this state comes from.
+                    # We can get here if the user has ignored the senders of all
+                    # the recent events.
                     state_at_timeline_start = yield self.get_state_at(
                         room_id, stream_position=now_token, state_filter=state_filter
                     )
@@ -1333,7 +1331,7 @@ class SyncHandler(object):
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        return ([], [], [], [])
+                        return [], [], [], []
 
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id
@@ -1643,7 +1641,7 @@ class SyncHandler(object):
                 )
             room_entries.append(entry)
 
-        return (room_entries, invited, newly_joined_rooms, newly_left_rooms)
+        return room_entries, invited, newly_joined_rooms, newly_left_rooms
 
     @defer.inlineCallbacks
     def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1717,7 +1715,7 @@ class SyncHandler(object):
                     )
                 )
 
-        return (room_entries, invited, [])
+        return room_entries, invited, []
 
     @defer.inlineCallbacks
     def _generate_room_entry(
@@ -1771,20 +1769,9 @@ class SyncHandler(object):
             newly_joined_room=newly_joined,
         )
 
-        if not batch and batch.limited:
-            # This resulted in #5407, which is weird, so lets log! We do it
-            # here as we have the maximum amount of information.
-            user_id = sync_result_builder.sync_config.user.to_string()
-            logger.info(
-                "Issue #5407: Found limited batch with no events. user %s, room %s,"
-                " sync_config %s, newly_joined %s, events %s, batch %s.",
-                user_id,
-                room_id,
-                sync_config,
-                newly_joined,
-                events,
-                batch,
-            )
+        # Note: `batch` can be both empty and limited here in the case where
+        # `_load_filtered_recents` can't find any events the user should see
+        # (e.g. due to having ignored the sender of the last 50 events).
 
         if newly_joined:
             # debug for https://github.com/matrix-org/synapse/issues/4422
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f882330293..ca8ae9fb5b 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -319,4 +319,4 @@ class TypingNotificationEventSource(object):
         return self.get_typing_handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
-        return ([], pagination_config.from_key)
+        return [], pagination_config.from_key