summary refs log tree commit diff
diff options
context:
space:
mode:
authorNeil Johnson <neil@matrix.org>2019-06-06 17:37:08 +0100
committerNeil Johnson <neil@matrix.org>2019-06-06 17:37:08 +0100
commit7668b73676b2ee4e1c18690e158fbe1e9e76cf12 (patch)
tree17e634e9ee12f024e5e287758694e960bcefe8b3
parentuse DEFAULT_ROOM_VERSION (diff)
parentAdd ability to perform password reset via email without trusting the identity... (diff)
downloadsynapse-7668b73676b2ee4e1c18690e158fbe1e9e76cf12.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into neilj/default-room-version-v4
Diffstat (limited to '')
-rw-r--r--.buildkite/pipeline.yml4
-rw-r--r--INSTALL.md25
-rw-r--r--UPGRADE.rst49
-rw-r--r--changelog.d/5221.bugfix1
-rw-r--r--changelog.d/5369.bugfix1
-rw-r--r--changelog.d/5370.misc1
-rw-r--r--changelog.d/5371.feature1
-rw-r--r--changelog.d/5374.feature1
-rw-r--r--changelog.d/5377.feature1
-rw-r--r--docs/MSC1711_certificates_FAQ.md12
-rw-r--r--docs/sample_config.yaml103
-rw-r--r--synapse/api/errors.py9
-rwxr-xr-xsynapse/app/homeserver.py1
-rw-r--r--synapse/config/emailconfig.py153
-rw-r--r--synapse/config/key.py228
-rw-r--r--synapse/crypto/keyring.py72
-rw-r--r--synapse/handlers/auth.py64
-rw-r--r--synapse/handlers/identity.py13
-rw-r--r--synapse/handlers/presence.py8
-rw-r--r--synapse/push/mailer.py85
-rw-r--r--synapse/push/pusher.py4
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/res/templates/password_reset.html9
-rw-r--r--synapse/res/templates/password_reset.txt7
-rw-r--r--synapse/res/templates/password_reset_failure.html6
-rw-r--r--synapse/res/templates/password_reset_success.html6
-rw-r--r--synapse/rest/client/v2_alpha/account.py243
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/registration.py290
-rw-r--r--synapse/storage/schema/delta/55/track_threepid_validations.sql31
-rw-r--r--synapse/storage/stream.py12
-rw-r--r--tests/crypto/test_keyring.py43
-rw-r--r--tests/http/federation/test_matrix_federation_agent.py1
-rw-r--r--tests/utils.py1
35 files changed, 1317 insertions, 178 deletions
diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml
index 44b258dca6..b805b2d839 100644
--- a/.buildkite/pipeline.yml
+++ b/.buildkite/pipeline.yml
@@ -36,8 +36,6 @@ steps:
           image: "python:3.6"
           propagate-environment: true
 
-  - wait
-
   - command:
       - "python -m pip install tox"
       - "tox -e check-sampleconfig"
@@ -46,6 +44,8 @@ steps:
       - docker#v3.0.1:
           image: "python:3.6"
 
+  - wait
+
   - command:
       - "python -m pip install tox"
       - "tox -e py27,codecov"
diff --git a/INSTALL.md b/INSTALL.md
index 1934593148..d3a450f40f 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -5,6 +5,7 @@
   * [Prebuilt packages](#prebuilt-packages)
 * [Setting up Synapse](#setting-up-synapse)
   * [TLS certificates](#tls-certificates)
+  * [Email](#email)
   * [Registering a user](#registering-a-user)
   * [Setting up a TURN server](#setting-up-a-turn-server)
   * [URL previews](#url-previews)
@@ -394,9 +395,31 @@ To configure Synapse to expose an HTTPS port, you will need to edit
   instance, if using certbot, use `fullchain.pem` as your certificate, not
   `cert.pem`).
 
-For those of you upgrading your TLS certificate in readiness for Synapse 1.0,
+For those of you upgrading your TLS certificate for Synapse 1.0 compliance,
 please take a look at [our guide](docs/MSC1711_certificates_FAQ.md#configuring-certificates-for-compatibility-with-synapse-100).
 
+## Email
+
+It is desirable for Synapse to have the capability to send email. For example,
+this is required to support the 'password reset' feature.
+
+To configure an SMTP server for Synapse, modify the configuration section
+headed ``email``, and be sure to have at least the ``smtp_host``, ``smtp_port``
+and ``notif_from`` fields filled out. You may also need to set ``smtp_user``,
+``smtp_pass``, and ``require_transport_security``..
+
+If Synapse is not configured with an SMTP server, password reset via email will
+ be disabled by default.
+
+Alternatively it is possible delegate the sending of email to the server's
+identity server. Doing so is convenient but not recommended, since a malicious
+or compromised identity server could theoretically hijack a given user's
+account by redirecting mail.
+
+If you are absolutely certain that you wish to use the server's identity server
+for password resets, set ``trust_identity_server_for_password_resets`` to
+``true`` under the ``email:`` configuration section.
+
 ## Registering a user
 
 You will need at least one user on your server in order to use a Matrix
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 228222d534..6032a505c9 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -49,6 +49,55 @@ returned by the Client-Server API:
     # configured on port 443.
     curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
 
+Upgrading to v1.0
+=================
+
+Validation of TLS certificates
+------------------------------
+
+Synapse v1.0 is the first release to enforce
+validation of TLS certificates for the federation API. It is therefore
+essential that your certificates are correctly configured. See the `FAQ
+<docs/MSC1711_certificates_FAQ.md>`_ for more information.
+
+Note, v1.0 installations will also no longer be able to federate with servers
+that have not correctly configured their certificates.
+
+In rare cases, it may be desirable to disable certificate checking: for
+example, it might be essential to be able to federate with a given legacy
+server in a closed federation. This can be done in one of two ways:-
+
+* Configure the global switch ``federation_verify_certificates`` to ``false``.
+* Configure a whitelist of server domains to trust via ``federation_certificate_verification_whitelist``.
+
+See the `sample configuration file <docs/sample_config.yaml>`_
+for more details on these settings. 
+
+Email
+-----
+When a user requests a password reset, Synapse will send an email to the 
+user to confirm the request.
+
+Previous versions of Synapse delegated the job of sending this email to an
+identity server. If the identity server was somehow malicious or became
+compromised, it would be theoretically possible to hijack an account through
+this means.
+
+Therefore, by default, Synapse v1.0 will send the confirmation email itself. If
+Synapse is not configured with an SMTP server, password reset via email will be
+disabled.
+
+To configure an SMTP server for Synapse, modify the configuration section
+headed ``email``, and be sure to have at least the ``smtp_host``, ``smtp_port``
+and ``notif_from`` fields filled out. You may also need to set ``smtp_user``,
+``smtp_pass``, and ``require_transport_security``.
+
+If you are absolutely certain that you wish to continue using an identity
+server for password resets, set ``trust_identity_server_for_password_resets`` to ``true``.
+
+See the `sample configuration file <docs/sample_config.yaml>`_
+for more details on these settings.
+
 Upgrading to v0.99.0
 ====================
 
diff --git a/changelog.d/5221.bugfix b/changelog.d/5221.bugfix
new file mode 100644
index 0000000000..03aa363d15
--- /dev/null
+++ b/changelog.d/5221.bugfix
@@ -0,0 +1 @@
+Fix race when backfilling in rooms with worker mode.
diff --git a/changelog.d/5369.bugfix b/changelog.d/5369.bugfix
new file mode 100644
index 0000000000..cc61618f39
--- /dev/null
+++ b/changelog.d/5369.bugfix
@@ -0,0 +1 @@
+Fix missing logcontext warnings on shutdown.
diff --git a/changelog.d/5370.misc b/changelog.d/5370.misc
new file mode 100644
index 0000000000..b0473ef280
--- /dev/null
+++ b/changelog.d/5370.misc
@@ -0,0 +1 @@
+Don't run CI build checks until sample config check has passed.
diff --git a/changelog.d/5371.feature b/changelog.d/5371.feature
new file mode 100644
index 0000000000..7f960630e0
--- /dev/null
+++ b/changelog.d/5371.feature
@@ -0,0 +1 @@
+Update upgrade and installation guides ahead of 1.0.
diff --git a/changelog.d/5374.feature b/changelog.d/5374.feature
new file mode 100644
index 0000000000..17937637ab
--- /dev/null
+++ b/changelog.d/5374.feature
@@ -0,0 +1 @@
+Replace the `perspectives` configuration section with `trusted_key_servers`, and make validating the signatures on responses optional (since TLS will do this job for us).
diff --git a/changelog.d/5377.feature b/changelog.d/5377.feature
new file mode 100644
index 0000000000..6aae41847a
--- /dev/null
+++ b/changelog.d/5377.feature
@@ -0,0 +1 @@
+Add ability to perform password reset via email without trusting the identity server.
diff --git a/docs/MSC1711_certificates_FAQ.md b/docs/MSC1711_certificates_FAQ.md
index 37f7f669c9..599462bdcb 100644
--- a/docs/MSC1711_certificates_FAQ.md
+++ b/docs/MSC1711_certificates_FAQ.md
@@ -68,16 +68,14 @@ Admins should upgrade and configure a valid CA cert. Homeservers that require a
 .well-known entry (see below), should retain their SRV record and use it
 alongside their .well-known record.
 
-**>= 5th March 2019  - Synapse 1.0.0 is released**
+**10th June 2019  - Synapse 1.0.0 is released**
 
-1.0.0 will land no sooner than 1 month after 0.99.0, leaving server admins one
-month after 5th February to upgrade to 0.99.0 and deploy their certificates. In
+1.0.0 is scheduled for release on 10th June. In
 accordance with the the [S2S spec](https://matrix.org/docs/spec/server_server/r0.1.0.html)
 1.0.0 will enforce certificate validity. This means that any homeserver without a
 valid certificate after this point will no longer be able to federate with
 1.0.0 servers.
 
-
 ## Configuring certificates for compatibility with Synapse 1.0.0
 
 ### If you do not currently have an SRV record
@@ -146,9 +144,9 @@ You can do this with a `.well-known` file as follows:
     with Synapse 0.34 and earlier.
 
   2. Give Synapse a certificate corresponding to the target domain
-    (`customer.example.net` in the above example). You can either use Synapse's 
-    built-in [ACME support](./ACME.md) for this (via the `domain` parameter in 
-    the `acme` section), or acquire a certificate yourself and give it to 
+    (`customer.example.net` in the above example). You can either use Synapse's
+    built-in [ACME support](./ACME.md) for this (via the `domain` parameter in
+    the `acme` section), or acquire a certificate yourself and give it to
     Synapse via `tls_certificate_path` and `tls_private_key_path`.
 
  3. Restart Synapse to ensure the new certificate is loaded.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 2f37e71601..ea73306fb9 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -952,12 +952,43 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
 
 # The trusted servers to download signing keys from.
 #
-#perspectives:
-#  servers:
-#    "matrix.org":
-#      verify_keys:
-#        "ed25519:auto":
-#          key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+# When we need to fetch a signing key, each server is tried in parallel.
+#
+# Normally, the connection to the key server is validated via TLS certificates.
+# Additional security can be provided by configuring a `verify key`, which
+# will make synapse check that the response is signed by that key.
+#
+# This setting supercedes an older setting named `perspectives`. The old format
+# is still supported for backwards-compatibility, but it is deprecated.
+#
+# Options for each entry in the list include:
+#
+#    server_name: the name of the server. required.
+#
+#    verify_keys: an optional map from key id to base64-encoded public key.
+#       If specified, we will check that the response is signed by at least
+#       one of the given keys.
+#
+#    accept_keys_insecurely: a boolean. Normally, if `verify_keys` is unset,
+#       and federation_verify_certificates is not `true`, synapse will refuse
+#       to start, because this would allow anyone who can spoof DNS responses
+#       to masquerade as the trusted key server. If you know what you are doing
+#       and are sure that your network environment provides a secure connection
+#       to the key server, you can set this to `true` to override this
+#       behaviour.
+#
+# An example configuration might look like:
+#
+#trusted_key_servers:
+#  - server_name: "my_trusted_server.example.com"
+#    verify_keys:
+#      "ed25519:auto": "abcdefghijklmnopqrstuvwxyzabcdefghijklmopqr"
+#  - server_name: "my_other_trusted_server.example.com"
+#
+# The default configuration is:
+#
+#trusted_key_servers:
+#  - server_name: "matrix.org"
 
 
 # Enable SAML2 for registration and login. Uses pysaml2.
@@ -1034,10 +1065,8 @@ password_config:
 
 
 
-# Enable sending emails for notification events or expiry notices
-# Defining a custom URL for Riot is only needed if email notifications
-# should contain links to a self-hosted installation of Riot; when set
-# the "app_name" setting is ignored.
+# Enable sending emails for password resets, notification events or
+# account expiry notices
 #
 # If your SMTP server requires authentication, the optional smtp_user &
 # smtp_pass variables should be used
@@ -1045,22 +1074,64 @@ password_config:
 #email:
 #   enable_notifs: false
 #   smtp_host: "localhost"
-#   smtp_port: 25
+#   smtp_port: 25 # SSL: 465, STARTTLS: 587
 #   smtp_user: "exampleusername"
 #   smtp_pass: "examplepassword"
 #   require_transport_security: False
 #   notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
 #   app_name: Matrix
-#   # if template_dir is unset, uses the example templates that are part of
-#   # the Synapse distribution.
+#
+#   # Enable email notifications by default
+#   notif_for_new_users: True
+#
+#   # Defining a custom URL for Riot is only needed if email notifications
+#   # should contain links to a self-hosted installation of Riot; when set
+#   # the "app_name" setting is ignored
+#   riot_base_url: "http://localhost/riot"
+#
+#   # Enable sending password reset emails via the configured, trusted
+#   # identity servers
+#   #
+#   # IMPORTANT! This will give a malicious or overtaken identity server
+#   # the ability to reset passwords for your users! Make absolutely sure
+#   # that you want to do this! It is strongly recommended that password
+#   # reset emails be sent by the homeserver instead
+#   #
+#   # If this option is set to false and SMTP options have not been
+#   # configured, resetting user passwords via email will be disabled
+#   #trust_identity_server_for_password_resets: false
+#
+#   # Configure the time that a validation email or text message code
+#   # will expire after sending
+#   #
+#   # This is currently used for password resets
+#   #validation_token_lifetime: 1h
+#
+#   # Template directory. All template files should be stored within this
+#   # directory
+#   #
 #   #template_dir: res/templates
+#
+#   # Templates for email notifications
+#   #
 #   notif_template_html: notif_mail.html
 #   notif_template_text: notif_mail.txt
-#   # Templates for account expiry notices.
+#
+#   # Templates for account expiry notices
+#   #
 #   expiry_template_html: notice_expiry.html
 #   expiry_template_text: notice_expiry.txt
-#   notif_for_new_users: True
-#   riot_base_url: "http://localhost/riot"
+#
+#   # Templates for password reset emails sent by the homeserver
+#   #
+#   #password_reset_template_html: password_reset.html
+#   #password_reset_template_text: password_reset.txt
+#
+#   # Templates for password reset success and failure pages that a user
+#   # will see after attempting to reset their password
+#   #
+#   #password_reset_template_success_html: password_reset_success.html
+#   #password_reset_template_failure_html: password_reset_failure.html
 
 
 #password_providers:
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index e91697049c..66201d6efe 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -339,6 +339,15 @@ class UnsupportedRoomVersionError(SynapseError):
         )
 
 
+class ThreepidValidationError(SynapseError):
+    """An error raised when there was a problem authorising an event."""
+
+    def __init__(self, *args, **kwargs):
+        if "errcode" not in kwargs:
+            kwargs["errcode"] = Codes.FORBIDDEN
+        super(ThreepidValidationError, self).__init__(*args, **kwargs)
+
+
 class IncompatibleRoomVersionError(SynapseError):
     """A server is trying to join a room whose version it does not support.
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 1045d28949..df524a23dd 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -176,6 +176,7 @@ class SynapseHomeServer(HomeServer):
 
             resources.update({
                 "/_matrix/client/api/v1": client_resource,
+                "/_synapse/password_reset": client_resource,
                 "/_matrix/client/r0": client_resource,
                 "/_matrix/client/unstable": client_resource,
                 "/_matrix/client/v2_alpha": client_resource,
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 8400471f40..ae04252906 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -50,6 +50,11 @@ class EmailConfig(Config):
         else:
             self.email_app_name = "Matrix"
 
+        # TODO: Rename notif_from to something more generic, or have a separate
+        # from for password resets, message notifications, etc?
+        # Currently the email section is a bit bogged down with settings for
+        # multiple functions. Would be good to split it out into separate
+        # sections and only put the common ones under email:
         self.email_notif_from = email_config.get("notif_from", None)
         if self.email_notif_from is not None:
             # make sure it's valid
@@ -74,7 +79,28 @@ class EmailConfig(Config):
             "account_validity", {},
         ).get("renew_at")
 
-        if self.email_enable_notifs or account_validity_renewal_enabled:
+        email_trust_identity_server_for_password_resets = email_config.get(
+            "trust_identity_server_for_password_resets", False,
+        )
+        self.email_password_reset_behaviour = (
+            "remote" if email_trust_identity_server_for_password_resets else "local"
+        )
+        if self.email_password_reset_behaviour == "local" and email_config == {}:
+            logger.warn(
+                "User password resets have been disabled due to lack of email config"
+            )
+            self.email_password_reset_behaviour = "off"
+
+        # Get lifetime of a validation token in milliseconds
+        self.email_validation_token_lifetime = self.parse_duration(
+            email_config.get("validation_token_lifetime", "1h")
+        )
+
+        if (
+            self.email_enable_notifs
+            or account_validity_renewal_enabled
+            or self.email_password_reset_behaviour == "local"
+        ):
             # make sure we can import the required deps
             import jinja2
             import bleach
@@ -82,6 +108,67 @@ class EmailConfig(Config):
             jinja2
             bleach
 
+        if self.email_password_reset_behaviour == "local":
+            required = [
+                "smtp_host",
+                "smtp_port",
+                "notif_from",
+            ]
+
+            missing = []
+            for k in required:
+                if k not in email_config:
+                    missing.append(k)
+
+            if (len(missing) > 0):
+                raise RuntimeError(
+                    "email.password_reset_behaviour is set to 'local' "
+                    "but required keys are missing: %s" %
+                    (", ".join(["email." + k for k in missing]),)
+                )
+
+            # Templates for password reset emails
+            self.email_password_reset_template_html = email_config.get(
+                "password_reset_template_html", "password_reset.html",
+            )
+            self.email_password_reset_template_text = email_config.get(
+                "password_reset_template_text", "password_reset.txt",
+            )
+            self.email_password_reset_failure_template = email_config.get(
+                "password_reset_failure_template", "password_reset_failure.html",
+            )
+            # This template does not support any replaceable variables, so we will
+            # read it from the disk once during setup
+            email_password_reset_success_template = email_config.get(
+                "password_reset_success_template", "password_reset_success.html",
+            )
+
+            # Check templates exist
+            for f in [self.email_password_reset_template_html,
+                      self.email_password_reset_template_text,
+                      self.email_password_reset_failure_template,
+                      email_password_reset_success_template]:
+                p = os.path.join(self.email_template_dir, f)
+                if not os.path.isfile(p):
+                    raise ConfigError("Unable to find template file %s" % (p, ))
+
+            # Retrieve content of web templates
+            filepath = os.path.join(
+                self.email_template_dir,
+                email_password_reset_success_template,
+            )
+            self.email_password_reset_success_html_content = self.read_file(
+                filepath,
+                "email.password_reset_template_success_html",
+            )
+
+            if config.get("public_baseurl") is None:
+                raise RuntimeError(
+                    "email.password_reset_behaviour is set to 'local' but no "
+                    "public_baseurl is set. This is necessary to generate password "
+                    "reset links"
+                )
+
         if self.email_enable_notifs:
             required = [
                 "smtp_host",
@@ -121,10 +208,6 @@ class EmailConfig(Config):
             self.email_riot_base_url = email_config.get(
                 "riot_base_url", None
             )
-        else:
-            self.email_enable_notifs = False
-            # Not much point setting defaults for the rest: it would be an
-            # error for them to be used.
 
         if account_validity_renewal_enabled:
             self.email_expiry_template_html = email_config.get(
@@ -141,10 +224,8 @@ class EmailConfig(Config):
 
     def default_config(self, config_dir_path, server_name, **kwargs):
         return """
-        # Enable sending emails for notification events or expiry notices
-        # Defining a custom URL for Riot is only needed if email notifications
-        # should contain links to a self-hosted installation of Riot; when set
-        # the "app_name" setting is ignored.
+        # Enable sending emails for password resets, notification events or
+        # account expiry notices
         #
         # If your SMTP server requires authentication, the optional smtp_user &
         # smtp_pass variables should be used
@@ -152,20 +233,62 @@ class EmailConfig(Config):
         #email:
         #   enable_notifs: false
         #   smtp_host: "localhost"
-        #   smtp_port: 25
+        #   smtp_port: 25 # SSL: 465, STARTTLS: 587
         #   smtp_user: "exampleusername"
         #   smtp_pass: "examplepassword"
         #   require_transport_security: False
         #   notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>"
         #   app_name: Matrix
-        #   # if template_dir is unset, uses the example templates that are part of
-        #   # the Synapse distribution.
+        #
+        #   # Enable email notifications by default
+        #   notif_for_new_users: True
+        #
+        #   # Defining a custom URL for Riot is only needed if email notifications
+        #   # should contain links to a self-hosted installation of Riot; when set
+        #   # the "app_name" setting is ignored
+        #   riot_base_url: "http://localhost/riot"
+        #
+        #   # Enable sending password reset emails via the configured, trusted
+        #   # identity servers
+        #   #
+        #   # IMPORTANT! This will give a malicious or overtaken identity server
+        #   # the ability to reset passwords for your users! Make absolutely sure
+        #   # that you want to do this! It is strongly recommended that password
+        #   # reset emails be sent by the homeserver instead
+        #   #
+        #   # If this option is set to false and SMTP options have not been
+        #   # configured, resetting user passwords via email will be disabled
+        #   #trust_identity_server_for_password_resets: false
+        #
+        #   # Configure the time that a validation email or text message code
+        #   # will expire after sending
+        #   #
+        #   # This is currently used for password resets
+        #   #validation_token_lifetime: 1h
+        #
+        #   # Template directory. All template files should be stored within this
+        #   # directory
+        #   #
         #   #template_dir: res/templates
+        #
+        #   # Templates for email notifications
+        #   #
         #   notif_template_html: notif_mail.html
         #   notif_template_text: notif_mail.txt
-        #   # Templates for account expiry notices.
+        #
+        #   # Templates for account expiry notices
+        #   #
         #   expiry_template_html: notice_expiry.html
         #   expiry_template_text: notice_expiry.txt
-        #   notif_for_new_users: True
-        #   riot_base_url: "http://localhost/riot"
+        #
+        #   # Templates for password reset emails sent by the homeserver
+        #   #
+        #   #password_reset_template_html: password_reset.html
+        #   #password_reset_template_text: password_reset.txt
+        #
+        #   # Templates for password reset success and failure pages that a user
+        #   # will see after attempting to reset their password
+        #   #
+        #   #password_reset_template_success_html: password_reset_success.html
+        #   #password_reset_template_failure_html: password_reset_failure.html
         """
diff --git a/synapse/config/key.py b/synapse/config/key.py
index eb10259818..aba7092ccd 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,6 +18,8 @@ import hashlib
 import logging
 import os
 
+import attr
+import jsonschema
 from signedjson.key import (
     NACL_ED25519,
     decode_signing_key_base64,
@@ -32,11 +35,27 @@ from synapse.util.stringutils import random_string, random_string_with_symbols
 
 from ._base import Config, ConfigError
 
+INSECURE_NOTARY_ERROR = """\
+Your server is configured to accept key server responses without signature
+validation or TLS certificate validation. This is likely to be very insecure. If
+you are *sure* you want to do this, set 'accept_keys_insecurely' on the
+keyserver configuration."""
+
+
 logger = logging.getLogger(__name__)
 
 
-class KeyConfig(Config):
+@attr.s
+class TrustedKeyServer(object):
+    # string: name of the server.
+    server_name = attr.ib()
 
+    # dict[str,VerifyKey]|None: map from key id to key object, or None to disable
+    # signature verification.
+    verify_keys = attr.ib(default=None)
+
+
+class KeyConfig(Config):
     def read_config(self, config):
         # the signing key can be specified inline or in a separate file
         if "signing_key" in config:
@@ -49,16 +68,27 @@ class KeyConfig(Config):
             config.get("old_signing_keys", {})
         )
         self.key_refresh_interval = self.parse_duration(
-            config.get("key_refresh_interval", "1d"),
+            config.get("key_refresh_interval", "1d")
         )
-        self.perspectives = self.read_perspectives(
-            config.get("perspectives", {}).get("servers", {
-                "matrix.org": {"verify_keys": {
-                    "ed25519:auto": {
-                        "key": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw",
-                    }
-                }}
-            })
+
+        # if neither trusted_key_servers nor perspectives are given, use the default.
+        if "perspectives" not in config and "trusted_key_servers" not in config:
+            key_servers = [{"server_name": "matrix.org"}]
+        else:
+            key_servers = config.get("trusted_key_servers", [])
+
+            if not isinstance(key_servers, list):
+                raise ConfigError(
+                    "trusted_key_servers, if given, must be a list, not a %s"
+                    % (type(key_servers).__name__,)
+                )
+
+            # merge the 'perspectives' config into the 'trusted_key_servers' config.
+            key_servers.extend(_perspectives_to_key_servers(config))
+
+        # list of TrustedKeyServer objects
+        self.key_servers = list(
+            _parse_key_servers(key_servers, self.federation_verify_certificates)
         )
 
         self.macaroon_secret_key = config.get(
@@ -78,8 +108,9 @@ class KeyConfig(Config):
         # falsification of values
         self.form_secret = config.get("form_secret", None)
 
-    def default_config(self, config_dir_path, server_name, generate_secrets=False,
-                       **kwargs):
+    def default_config(
+        self, config_dir_path, server_name, generate_secrets=False, **kwargs
+    ):
         base_key_name = os.path.join(config_dir_path, server_name)
 
         if generate_secrets:
@@ -91,7 +122,8 @@ class KeyConfig(Config):
             macaroon_secret_key = "# macaroon_secret_key: <PRIVATE STRING>"
             form_secret = "# form_secret: <PRIVATE STRING>"
 
-        return """\
+        return (
+            """\
         # a secret which is used to sign access tokens. If none is specified,
         # the registration_shared_secret is used, if one is given; otherwise,
         # a secret key is derived from the signing key.
@@ -133,33 +165,53 @@ class KeyConfig(Config):
 
         # The trusted servers to download signing keys from.
         #
-        #perspectives:
-        #  servers:
-        #    "matrix.org":
-        #      verify_keys:
-        #        "ed25519:auto":
-        #          key: "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
-        """ % locals()
-
-    def read_perspectives(self, perspectives_servers):
-        servers = {}
-        for server_name, server_config in perspectives_servers.items():
-            for key_id, key_data in server_config["verify_keys"].items():
-                if is_signing_algorithm_supported(key_id):
-                    key_base64 = key_data["key"]
-                    key_bytes = decode_base64(key_base64)
-                    verify_key = decode_verify_key_bytes(key_id, key_bytes)
-                    servers.setdefault(server_name, {})[key_id] = verify_key
-        return servers
+        # When we need to fetch a signing key, each server is tried in parallel.
+        #
+        # Normally, the connection to the key server is validated via TLS certificates.
+        # Additional security can be provided by configuring a `verify key`, which
+        # will make synapse check that the response is signed by that key.
+        #
+        # This setting supercedes an older setting named `perspectives`. The old format
+        # is still supported for backwards-compatibility, but it is deprecated.
+        #
+        # Options for each entry in the list include:
+        #
+        #    server_name: the name of the server. required.
+        #
+        #    verify_keys: an optional map from key id to base64-encoded public key.
+        #       If specified, we will check that the response is signed by at least
+        #       one of the given keys.
+        #
+        #    accept_keys_insecurely: a boolean. Normally, if `verify_keys` is unset,
+        #       and federation_verify_certificates is not `true`, synapse will refuse
+        #       to start, because this would allow anyone who can spoof DNS responses
+        #       to masquerade as the trusted key server. If you know what you are doing
+        #       and are sure that your network environment provides a secure connection
+        #       to the key server, you can set this to `true` to override this
+        #       behaviour.
+        #
+        # An example configuration might look like:
+        #
+        #trusted_key_servers:
+        #  - server_name: "my_trusted_server.example.com"
+        #    verify_keys:
+        #      "ed25519:auto": "abcdefghijklmnopqrstuvwxyzabcdefghijklmopqr"
+        #  - server_name: "my_other_trusted_server.example.com"
+        #
+        # The default configuration is:
+        #
+        #trusted_key_servers:
+        #  - server_name: "matrix.org"
+        """
+            % locals()
+        )
 
     def read_signing_key(self, signing_key_path):
         signing_keys = self.read_file(signing_key_path, "signing_key")
         try:
             return read_signing_keys(signing_keys.splitlines(True))
         except Exception as e:
-            raise ConfigError(
-                "Error reading signing_key: %s" % (str(e))
-            )
+            raise ConfigError("Error reading signing_key: %s" % (str(e)))
 
     def read_old_signing_keys(self, old_signing_keys):
         keys = {}
@@ -182,9 +234,7 @@ class KeyConfig(Config):
         if not self.path_exists(signing_key_path):
             with open(signing_key_path, "w") as signing_key_file:
                 key_id = "a_" + random_string(4)
-                write_signing_keys(
-                    signing_key_file, (generate_signing_key(key_id),),
-                )
+                write_signing_keys(signing_key_file, (generate_signing_key(key_id),))
         else:
             signing_keys = self.read_file(signing_key_path, "signing_key")
             if len(signing_keys.split("\n")[0].split()) == 1:
@@ -194,6 +244,106 @@ class KeyConfig(Config):
                     NACL_ED25519, key_id, signing_keys.split("\n")[0]
                 )
                 with open(signing_key_path, "w") as signing_key_file:
-                    write_signing_keys(
-                        signing_key_file, (key,),
+                    write_signing_keys(signing_key_file, (key,))
+
+
+def _perspectives_to_key_servers(config):
+    """Convert old-style 'perspectives' configs into new-style 'trusted_key_servers'
+
+    Returns an iterable of entries to add to trusted_key_servers.
+    """
+
+    # 'perspectives' looks like:
+    #
+    # {
+    #     "servers": {
+    #         "matrix.org": {
+    #             "verify_keys": {
+    #                 "ed25519:auto": {
+    #                     "key": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+    #                 }
+    #             }
+    #         }
+    #     }
+    # }
+    #
+    # 'trusted_keys' looks like:
+    #
+    # [
+    #     {
+    #         "server_name": "matrix.org",
+    #         "verify_keys": {
+    #             "ed25519:auto": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw",
+    #         }
+    #     }
+    # ]
+
+    perspectives_servers = config.get("perspectives", {}).get("servers", {})
+
+    for server_name, server_opts in perspectives_servers.items():
+        trusted_key_server_entry = {"server_name": server_name}
+        verify_keys = server_opts.get("verify_keys")
+        if verify_keys is not None:
+            trusted_key_server_entry["verify_keys"] = {
+                key_id: key_data["key"] for key_id, key_data in verify_keys.items()
+            }
+        yield trusted_key_server_entry
+
+
+TRUSTED_KEY_SERVERS_SCHEMA = {
+    "$schema": "http://json-schema.org/draft-04/schema#",
+    "description": "schema for the trusted_key_servers setting",
+    "type": "array",
+    "items": {
+        "type": "object",
+        "properties": {
+            "server_name": {"type": "string"},
+            "verify_keys": {
+                "type": "object",
+                # each key must be a base64 string
+                "additionalProperties": {"type": "string"},
+            },
+        },
+        "required": ["server_name"],
+    },
+}
+
+
+def _parse_key_servers(key_servers, federation_verify_certificates):
+    try:
+        jsonschema.validate(key_servers, TRUSTED_KEY_SERVERS_SCHEMA)
+    except jsonschema.ValidationError as e:
+        raise ConfigError("Unable to parse 'trusted_key_servers': " + e.message)
+
+    for server in key_servers:
+        server_name = server["server_name"]
+        result = TrustedKeyServer(server_name=server_name)
+
+        verify_keys = server.get("verify_keys")
+        if verify_keys is not None:
+            result.verify_keys = {}
+            for key_id, key_base64 in verify_keys.items():
+                if not is_signing_algorithm_supported(key_id):
+                    raise ConfigError(
+                        "Unsupported signing algorithm on key %s for server %s in "
+                        "trusted_key_servers" % (key_id, server_name)
                     )
+                try:
+                    key_bytes = decode_base64(key_base64)
+                    verify_key = decode_verify_key_bytes(key_id, key_bytes)
+                except Exception as e:
+                    raise ConfigError(
+                        "Unable to parse key %s for server %s in "
+                        "trusted_key_servers: %s" % (key_id, server_name, e)
+                    )
+
+                result.verify_keys[key_id] = verify_key
+
+        if (
+            not verify_keys
+            and not server.get("accept_keys_insecurely")
+            and not federation_verify_certificates
+        ):
+            raise ConfigError(INSECURE_NOTARY_ERROR)
+
+        yield result
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 2b6b5913bc..96964b0d50 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -585,25 +585,27 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
         super(PerspectivesKeyFetcher, self).__init__(hs)
         self.clock = hs.get_clock()
         self.client = hs.get_http_client()
-        self.perspective_servers = self.config.perspectives
+        self.key_servers = self.config.key_servers
 
     @defer.inlineCallbacks
     def get_keys(self, keys_to_fetch):
         """see KeyFetcher.get_keys"""
 
         @defer.inlineCallbacks
-        def get_key(perspective_name, perspective_keys):
+        def get_key(key_server):
             try:
                 result = yield self.get_server_verify_key_v2_indirect(
-                    keys_to_fetch, perspective_name, perspective_keys
+                    keys_to_fetch, key_server
                 )
                 defer.returnValue(result)
             except KeyLookupError as e:
-                logger.warning("Key lookup failed from %r: %s", perspective_name, e)
+                logger.warning(
+                    "Key lookup failed from %r: %s", key_server.server_name, e
+                )
             except Exception as e:
                 logger.exception(
                     "Unable to get key from %r: %s %s",
-                    perspective_name,
+                    key_server.server_name,
                     type(e).__name__,
                     str(e),
                 )
@@ -613,8 +615,8 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
         results = yield logcontext.make_deferred_yieldable(
             defer.gatherResults(
                 [
-                    run_in_background(get_key, p_name, p_keys)
-                    for p_name, p_keys in self.perspective_servers.items()
+                    run_in_background(get_key, server)
+                    for server in self.key_servers
                 ],
                 consumeErrors=True,
             ).addErrback(unwrapFirstError)
@@ -629,17 +631,15 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
 
     @defer.inlineCallbacks
     def get_server_verify_key_v2_indirect(
-        self, keys_to_fetch, perspective_name, perspective_keys
+        self, keys_to_fetch, key_server
     ):
         """
         Args:
             keys_to_fetch (dict[str, dict[str, int]]):
                 the keys to be fetched. server_name -> key_id -> min_valid_ts
 
-            perspective_name (str): name of the notary server to query for the keys
-
-            perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
-                notary server
+            key_server (synapse.config.key.TrustedKeyServer): notary server to query for
+                the keys
 
         Returns:
             Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult]]]: map
@@ -649,6 +649,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
             KeyLookupError if there was an error processing the entire response from
                 the server
         """
+        perspective_name = key_server.server_name
         logger.info(
             "Requesting keys %s from notary server %s",
             keys_to_fetch.items(),
@@ -689,11 +690,13 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
                 )
 
             try:
-                processed_response = yield self._process_perspectives_response(
-                    perspective_name,
-                    perspective_keys,
+                self._validate_perspectives_response(
+                    key_server,
                     response,
-                    time_added_ms=time_now_ms,
+                )
+
+                processed_response = yield self.process_v2_response(
+                    perspective_name, response, time_added_ms=time_now_ms
                 )
             except KeyLookupError as e:
                 logger.warning(
@@ -717,28 +720,24 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
 
         defer.returnValue(keys)
 
-    def _process_perspectives_response(
-        self, perspective_name, perspective_keys, response, time_added_ms
+    def _validate_perspectives_response(
+        self, key_server, response,
     ):
-        """Parse a 'Server Keys' structure from the result of a /key/query request
-
-        Checks that the entry is correctly signed by the perspectives server, and then
-        passes over to process_v2_response
+        """Optionally check the signature on the result of a /key/query request
 
         Args:
-            perspective_name (str): the name of the notary server that produced this
-                result
-
-            perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
-                notary server
+            key_server (synapse.config.key.TrustedKeyServer): the notary server that
+                produced this result
 
             response (dict): the json-decoded Server Keys response object
+        """
+        perspective_name = key_server.server_name
+        perspective_keys = key_server.verify_keys
 
-            time_added_ms (int): the timestamp to record in server_keys_json
+        if perspective_keys is None:
+            # signature checking is disabled on this server
+            return
 
-        Returns:
-            Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
-        """
         if (
             u"signatures" not in response
             or perspective_name not in response[u"signatures"]
@@ -751,6 +750,13 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
                 verify_signed_json(response, perspective_name, perspective_keys[key_id])
                 verified = True
 
+                if perspective_name == "matrix.org" and key_id == "ed25519:auto":
+                    logger.warning(
+                        "Trusting trusted_key_server responses signed by the "
+                        "compromised matrix.org signing key 'ed25519:auto'. "
+                        "This is a placebo."
+                    )
+
         if not verified:
             raise KeyLookupError(
                 "Response not signed with a known key: signed with: %r, known keys: %r"
@@ -760,10 +766,6 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
                 )
             )
 
-        return self.process_v2_response(
-            perspective_name, response, time_added_ms=time_added_ms
-        )
-
 
 class ServerKeyFetcher(BaseV2KeyFetcher):
     """KeyFetcher impl which fetches keys from the origin servers"""
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index aa5d89a9ac..7f8ddc99c6 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
         defer.returnValue(params)
 
     @defer.inlineCallbacks
-    def check_auth(self, flows, clientdict, clientip):
+    def check_auth(self, flows, clientdict, clientip, password_servlet=False):
         """
         Takes a dictionary sent by the client in the login / registration
         protocol and handles the User-Interactive Auth flow.
@@ -186,6 +186,16 @@ class AuthHandler(BaseHandler):
 
             clientip (str): The IP address of the client.
 
+            password_servlet (bool): Whether the request originated from
+                PasswordRestServlet.
+                XXX: This is a temporary hack to distinguish between checking
+                for threepid validations locally (in the case of password
+                resets) and using the identity server (in the case of binding
+                a 3PID during registration). Once we start using the
+                homeserver for both tasks, this distinction will no longer be
+                necessary.
+
+
         Returns:
             defer.Deferred[dict, dict, str]: a deferred tuple of
                 (creds, params, session_id).
@@ -241,7 +251,9 @@ class AuthHandler(BaseHandler):
         if 'type' in authdict:
             login_type = authdict['type']
             try:
-                result = yield self._check_auth_dict(authdict, clientip)
+                result = yield self._check_auth_dict(
+                    authdict, clientip, password_servlet=password_servlet,
+                )
                 if result:
                     creds[login_type] = result
                     self._save_session(session)
@@ -351,7 +363,7 @@ class AuthHandler(BaseHandler):
         return sess.setdefault('serverdict', {}).get(key, default)
 
     @defer.inlineCallbacks
-    def _check_auth_dict(self, authdict, clientip):
+    def _check_auth_dict(self, authdict, clientip, password_servlet=False):
         """Attempt to validate the auth dict provided by a client
 
         Args:
@@ -369,7 +381,13 @@ class AuthHandler(BaseHandler):
         login_type = authdict['type']
         checker = self.checkers.get(login_type)
         if checker is not None:
-            res = yield checker(authdict, clientip)
+            # XXX: Temporary workaround for having Synapse handle password resets
+            # See AuthHandler.check_auth for further details
+            res = yield checker(
+                authdict,
+                clientip=clientip,
+                password_servlet=password_servlet,
+            )
             defer.returnValue(res)
 
         # build a v1-login-style dict out of the authdict and fall back to the
@@ -383,7 +401,7 @@ class AuthHandler(BaseHandler):
         defer.returnValue(canonical_id)
 
     @defer.inlineCallbacks
-    def _check_recaptcha(self, authdict, clientip):
+    def _check_recaptcha(self, authdict, clientip, **kwargs):
         try:
             user_response = authdict["response"]
         except KeyError:
@@ -429,20 +447,20 @@ class AuthHandler(BaseHandler):
                 defer.returnValue(True)
         raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-    def _check_email_identity(self, authdict, _):
-        return self._check_threepid('email', authdict)
+    def _check_email_identity(self, authdict, **kwargs):
+        return self._check_threepid('email', authdict, **kwargs)
 
-    def _check_msisdn(self, authdict, _):
+    def _check_msisdn(self, authdict, **kwargs):
         return self._check_threepid('msisdn', authdict)
 
-    def _check_dummy_auth(self, authdict, _):
+    def _check_dummy_auth(self, authdict, **kwargs):
         return defer.succeed(True)
 
-    def _check_terms_auth(self, authdict, _):
+    def _check_terms_auth(self, authdict, **kwargs):
         return defer.succeed(True)
 
     @defer.inlineCallbacks
-    def _check_threepid(self, medium, authdict):
+    def _check_threepid(self, medium, authdict, password_servlet=False, **kwargs):
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -451,7 +469,29 @@ class AuthHandler(BaseHandler):
         identity_handler = self.hs.get_handlers().identity_handler
 
         logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
-        threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+        if (
+            not password_servlet
+            or self.hs.config.email_password_reset_behaviour == "remote"
+        ):
+            threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+        elif self.hs.config.email_password_reset_behaviour == "local":
+            row = yield self.store.get_threepid_validation_session(
+                medium,
+                threepid_creds["client_secret"],
+                sid=threepid_creds["sid"],
+            )
+
+            threepid = {
+                "medium": row["medium"],
+                "address": row["address"],
+                "validated_at": row["validated_at"],
+            } if row else None
+
+            if row:
+                # Valid threepid returned, delete from the db
+                yield self.store.delete_threepid_session(threepid_creds["sid"])
+        else:
+            raise SynapseError(400, "Password resets are not enabled on this homeserver")
 
         if not threepid:
             raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 22469486d7..04caf65793 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -247,7 +247,14 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(changed)
 
     @defer.inlineCallbacks
-    def requestEmailToken(self, id_server, email, client_secret, send_attempt, **kwargs):
+    def requestEmailToken(
+        self,
+        id_server,
+        email,
+        client_secret,
+        send_attempt,
+        next_link=None,
+    ):
         if not self._should_trust_id_server(id_server):
             raise SynapseError(
                 400, "Untrusted ID server '%s'" % id_server,
@@ -259,7 +266,9 @@ class IdentityHandler(BaseHandler):
             'client_secret': client_secret,
             'send_attempt': send_attempt,
         }
-        params.update(kwargs)
+
+        if next_link:
+            params.update({'next_link': next_link})
 
         try:
             data = yield self.http_client.post_json_get_json(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e49c8203ef..557fb5f83d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -158,7 +158,13 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger(
+            "before",
+            "shutdown",
+            run_as_background_process,
+            "presence.on_shutdown",
+            self._on_shutdown,
+        )
 
         self.serial_to_user = {}
         self._next_serial = 1
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index c269bcf4a4..4bc9eb7313 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -80,10 +80,10 @@ ALLOWED_ATTRS = {
 
 
 class Mailer(object):
-    def __init__(self, hs, app_name, notif_template_html, notif_template_text):
+    def __init__(self, hs, app_name, template_html, template_text):
         self.hs = hs
-        self.notif_template_html = notif_template_html
-        self.notif_template_text = notif_template_text
+        self.template_html = template_html
+        self.template_text = template_text
 
         self.sendmail = self.hs.get_sendmail()
         self.store = self.hs.get_datastore()
@@ -94,21 +94,48 @@ class Mailer(object):
         logger.info("Created Mailer for app_name %s" % app_name)
 
     @defer.inlineCallbacks
-    def send_notification_mail(self, app_id, user_id, email_address,
-                               push_actions, reason):
-        try:
-            from_string = self.hs.config.email_notif_from % {
-                "app": self.app_name
-            }
-        except TypeError:
-            from_string = self.hs.config.email_notif_from
+    def send_password_reset_mail(
+        self,
+        email_address,
+        token,
+        client_secret,
+        sid,
+    ):
+        """Send an email with a password reset link to a user
+
+        Args:
+            email_address (str): Email address we're sending the password
+                reset to
+            token (str): Unique token generated by the server to verify
+                password reset email was received
+            client_secret (str): Unique token generated by the client to
+                group together multiple email sending attempts
+            sid (str): The generated session ID
+        """
+        if email.utils.parseaddr(email_address)[1] == '':
+            raise RuntimeError("Invalid 'to' email address")
+
+        link = (
+            self.hs.config.public_baseurl +
+            "_synapse/password_reset/email/submit_token"
+            "?token=%s&client_secret=%s&sid=%s" %
+            (token, client_secret, sid)
+        )
 
-        raw_from = email.utils.parseaddr(from_string)[1]
-        raw_to = email.utils.parseaddr(email_address)[1]
+        template_vars = {
+            "link": link,
+        }
 
-        if raw_to == '':
-            raise RuntimeError("Invalid 'to' address")
+        yield self.send_email(
+            email_address,
+            "[%s] Password Reset Email" % self.hs.config.server_name,
+            template_vars,
+        )
 
+    @defer.inlineCallbacks
+    def send_notification_mail(self, app_id, user_id, email_address,
+                               push_actions, reason):
+        """Send email regarding a user's room notifications"""
         rooms_in_order = deduped_ordered_list(
             [pa['room_id'] for pa in push_actions]
         )
@@ -176,14 +203,36 @@ class Mailer(object):
             "reason": reason,
         }
 
-        html_text = self.notif_template_html.render(**template_vars)
+        yield self.send_email(
+            email_address,
+            "[%s] %s" % (self.app_name, summary_text),
+            template_vars,
+        )
+
+    @defer.inlineCallbacks
+    def send_email(self, email_address, subject, template_vars):
+        """Send an email with the given information and template text"""
+        try:
+            from_string = self.hs.config.email_notif_from % {
+                "app": self.app_name
+            }
+        except TypeError:
+            from_string = self.hs.config.email_notif_from
+
+        raw_from = email.utils.parseaddr(from_string)[1]
+        raw_to = email.utils.parseaddr(email_address)[1]
+
+        if raw_to == '':
+            raise RuntimeError("Invalid 'to' address")
+
+        html_text = self.template_html.render(**template_vars)
         html_part = MIMEText(html_text, "html", "utf8")
 
-        plain_text = self.notif_template_text.render(**template_vars)
+        plain_text = self.template_text.render(**template_vars)
         text_part = MIMEText(plain_text, "plain", "utf8")
 
         multipart_msg = MIMEMultipart('alternative')
-        multipart_msg['Subject'] = "[%s] %s" % (self.app_name, summary_text)
+        multipart_msg['Subject'] = subject
         multipart_msg['From'] = from_string
         multipart_msg['To'] = email_address
         multipart_msg['Date'] = email.utils.formatdate()
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 14bc7823cf..aff85daeb5 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -70,8 +70,8 @@ class PusherFactory(object):
             mailer = Mailer(
                 hs=self.hs,
                 app_name=app_name,
-                notif_template_html=self.notif_template_html,
-                notif_template_text=self.notif_template_text,
+                template_html=self.notif_template_html,
+                template_text=self.notif_template_text,
             )
             self.mailers[app_name] = mailer
         return EmailPusher(self.hs, pusherdict, mailer)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index f64baa4d58..c78f2cb15e 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -77,7 +77,7 @@ REQUIREMENTS = [
 ]
 
 CONDITIONAL_REQUIREMENTS = {
-    "email.enable_notifs": ["Jinja2>=2.9", "bleach>=1.4.2"],
+    "email": ["Jinja2>=2.9", "bleach>=1.4.2"],
     "matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"],
 
     # we use execute_batch, which arrived in psycopg 2.7.
diff --git a/synapse/res/templates/password_reset.html b/synapse/res/templates/password_reset.html
new file mode 100644
index 0000000000..4fa7b36734
--- /dev/null
+++ b/synapse/res/templates/password_reset.html
@@ -0,0 +1,9 @@
+<html>
+<body>
+    <p>A password reset request has been received for your Matrix account. If this was you, please click the link below to confirm resetting your password:</p>
+
+    <a href="{{ link }}">{{ link }}</a>
+
+    <p>If this was not you, please disregard this email and contact your server administrator. Thank you.</p>
+</body>
+</html>
diff --git a/synapse/res/templates/password_reset.txt b/synapse/res/templates/password_reset.txt
new file mode 100644
index 0000000000..f0deff59a7
--- /dev/null
+++ b/synapse/res/templates/password_reset.txt
@@ -0,0 +1,7 @@
+A password reset request has been received for your Matrix account. If this
+was you, please click the link below to confirm resetting your password:
+
+{{ link }}
+
+If this was not you, please disregard this email and contact your server
+administrator. Thank you.
diff --git a/synapse/res/templates/password_reset_failure.html b/synapse/res/templates/password_reset_failure.html
new file mode 100644
index 0000000000..0b132cf8db
--- /dev/null
+++ b/synapse/res/templates/password_reset_failure.html
@@ -0,0 +1,6 @@
+<html>
+<head></head>
+<body>
+<p>{{ failure_reason }}. Your password has not been reset.</p>
+</body>
+</html>
diff --git a/synapse/res/templates/password_reset_success.html b/synapse/res/templates/password_reset_success.html
new file mode 100644
index 0000000000..7b6fa5e6f0
--- /dev/null
+++ b/synapse/res/templates/password_reset_success.html
@@ -0,0 +1,6 @@
+<html>
+<head></head>
+<body>
+<p>Your password was successfully reset. You may now close this window.</p>
+</body>
+</html>
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index ca35dc3c83..e4c63b69b9 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -15,19 +15,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import re
 
 from six.moves import http_client
 
+import jinja2
+
 from twisted.internet import defer
 
 from synapse.api.constants import LoginType
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
+from synapse.http.server import finish_request
 from synapse.http.servlet import (
     RestServlet,
     assert_params_in_dict,
     parse_json_object_from_request,
+    parse_string,
 )
 from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.stringutils import random_string
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_patterns, interactive_auth_handler
@@ -41,17 +47,42 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
     def __init__(self, hs):
         super(EmailPasswordRequestTokenRestServlet, self).__init__()
         self.hs = hs
+        self.datastore = hs.get_datastore()
+        self.config = hs.config
         self.identity_handler = hs.get_handlers().identity_handler
 
+        if self.config.email_password_reset_behaviour == "local":
+            from synapse.push.mailer import Mailer, load_jinja2_templates
+            templates = load_jinja2_templates(
+                config=hs.config,
+                template_html_name=hs.config.email_password_reset_template_html,
+                template_text_name=hs.config.email_password_reset_template_text,
+            )
+            self.mailer = Mailer(
+                hs=self.hs,
+                app_name=self.config.email_app_name,
+                template_html=templates[0],
+                template_text=templates[1],
+            )
+
     @defer.inlineCallbacks
     def on_POST(self, request):
+        if self.config.email_password_reset_behaviour == "off":
+            raise SynapseError(400, "Password resets have been disabled on this server")
+
         body = parse_json_object_from_request(request)
 
         assert_params_in_dict(body, [
-            'id_server', 'client_secret', 'email', 'send_attempt'
+            'client_secret', 'email', 'send_attempt'
         ])
 
-        if not check_3pid_allowed(self.hs, "email", body['email']):
+        # Extract params from body
+        client_secret = body["client_secret"]
+        email = body["email"]
+        send_attempt = body["send_attempt"]
+        next_link = body.get("next_link")  # Optional param
+
+        if not check_3pid_allowed(self.hs, "email", email):
             raise SynapseError(
                 403,
                 "Your email domain is not authorized on this server",
@@ -59,15 +90,100 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
             )
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
-            'email', body['email']
+            'email', email,
         )
 
         if existingUid is None:
             raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND)
 
-        ret = yield self.identity_handler.requestEmailToken(**body)
+        if self.config.email_password_reset_behaviour == "remote":
+            if 'id_server' not in body:
+                raise SynapseError(400, "Missing 'id_server' param in body")
+
+            # Have the identity server handle the password reset flow
+            ret = yield self.identity_handler.requestEmailToken(
+                body["id_server"], email, client_secret, send_attempt, next_link,
+            )
+        else:
+            # Send password reset emails from Synapse
+            sid = yield self.send_password_reset(
+                email, client_secret, send_attempt, next_link,
+            )
+
+            # Wrap the session id in a JSON object
+            ret = {"sid": sid}
+
         defer.returnValue((200, ret))
 
+    @defer.inlineCallbacks
+    def send_password_reset(
+        self,
+        email,
+        client_secret,
+        send_attempt,
+        next_link=None,
+    ):
+        """Send a password reset email
+
+        Args:
+            email (str): The user's email address
+            client_secret (str): The provided client secret
+            send_attempt (int): Which send attempt this is
+
+        Returns:
+            The new session_id upon success
+
+        Raises:
+            SynapseError is an error occurred when sending the email
+        """
+        # Check that this email/client_secret/send_attempt combo is new or
+        # greater than what we've seen previously
+        session = yield self.datastore.get_threepid_validation_session(
+            "email", client_secret, address=email, validated=False,
+        )
+
+        # Check to see if a session already exists and that it is not yet
+        # marked as validated
+        if session and session.get("validated_at") is None:
+            session_id = session['session_id']
+            last_send_attempt = session['last_send_attempt']
+
+            # Check that the send_attempt is higher than previous attempts
+            if send_attempt <= last_send_attempt:
+                # If not, just return a success without sending an email
+                defer.returnValue(session_id)
+        else:
+            # An non-validated session does not exist yet.
+            # Generate a session id
+            session_id = random_string(16)
+
+        # Generate a new validation token
+        token = random_string(32)
+
+        # Send the mail with the link containing the token, client_secret
+        # and session_id
+        try:
+            yield self.mailer.send_password_reset_mail(
+                email, token, client_secret, session_id,
+            )
+        except Exception:
+            logger.exception(
+                "Error sending a password reset email to %s", email,
+            )
+            raise SynapseError(
+                500, "An error was encountered when sending the password reset email"
+            )
+
+        token_expires = (self.hs.clock.time_msec() +
+                         self.config.email_validation_token_lifetime)
+
+        yield self.datastore.start_or_continue_validation_session(
+            "email", email, session_id, client_secret, send_attempt,
+            next_link, token, token_expires,
+        )
+
+        defer.returnValue(session_id)
+
 
 class MsisdnPasswordRequestTokenRestServlet(RestServlet):
     PATTERNS = client_patterns("/account/password/msisdn/requestToken$")
@@ -80,6 +196,9 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
+        if not self.config.email_password_reset_behaviour == "off":
+            raise SynapseError(400, "Password resets have been disabled on this server")
+
         body = parse_json_object_from_request(request)
 
         assert_params_in_dict(body, [
@@ -107,6 +226,118 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class PasswordResetSubmitTokenServlet(RestServlet):
+    """Handles 3PID validation token submission"""
+    PATTERNS = [
+        re.compile("^/_synapse/password_reset/(?P<medium>[^/]*)/submit_token/*$"),
+    ]
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(PasswordResetSubmitTokenServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.config = hs.config
+        self.clock = hs.get_clock()
+        self.datastore = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, medium):
+        if medium != "email":
+            raise SynapseError(
+                400,
+                "This medium is currently not supported for password resets",
+            )
+
+        sid = parse_string(request, "sid")
+        client_secret = parse_string(request, "client_secret")
+        token = parse_string(request, "token")
+
+        # Attempt to validate a 3PID sesssion
+        try:
+            # Mark the session as valid
+            next_link = yield self.datastore.validate_threepid_session(
+                sid,
+                client_secret,
+                token,
+                self.clock.time_msec(),
+            )
+
+            # Perform a 302 redirect if next_link is set
+            if next_link:
+                if next_link.startswith("file:///"):
+                    logger.warn(
+                        "Not redirecting to next_link as it is a local file: address"
+                    )
+                else:
+                    request.setResponseCode(302)
+                    request.setHeader("Location", next_link)
+                    finish_request(request)
+                    defer.returnValue(None)
+
+            # Otherwise show the success template
+            html = self.config.email_password_reset_success_html_content
+            request.setResponseCode(200)
+        except ThreepidValidationError as e:
+            # Show a failure page with a reason
+            html = self.load_jinja2_template(
+                self.config.email_template_dir,
+                self.config.email_password_reset_failure_template,
+                template_vars={
+                    "failure_reason": e.msg,
+                }
+            )
+            request.setResponseCode(e.code)
+
+        request.write(html.encode('utf-8'))
+        finish_request(request)
+        defer.returnValue(None)
+
+    def load_jinja2_template(self, template_dir, template_filename, template_vars):
+        """Loads a jinja2 template with variables to insert
+
+        Args:
+            template_dir (str): The directory where templates are stored
+            template_filename (str): The name of the template in the template_dir
+            template_vars (Dict): Dictionary of keys in the template
+                alongside their values to insert
+
+        Returns:
+            str containing the contents of the rendered template
+        """
+        loader = jinja2.FileSystemLoader(template_dir)
+        env = jinja2.Environment(loader=loader)
+
+        template = env.get_template(template_filename)
+        return template.render(**template_vars)
+
+    @defer.inlineCallbacks
+    def on_POST(self, request, medium):
+        if medium != "email":
+            raise SynapseError(
+                400,
+                "This medium is currently not supported for password resets",
+            )
+
+        body = parse_json_object_from_request(request)
+        assert_params_in_dict(body, [
+            'sid', 'client_secret', 'token',
+        ])
+
+        valid, _ = yield self.datastore.validate_threepid_validation_token(
+            body['sid'],
+            body['client_secret'],
+            body['token'],
+            self.clock.time_msec(),
+        )
+        response_code = 200 if valid else 400
+
+        defer.returnValue((response_code, {"success": valid}))
+
+
 class PasswordRestServlet(RestServlet):
     PATTERNS = client_patterns("/account/password$")
 
@@ -144,6 +375,7 @@ class PasswordRestServlet(RestServlet):
             result, params, _ = yield self.auth_handler.check_auth(
                 [[LoginType.EMAIL_IDENTITY], [LoginType.MSISDN]],
                 body, self.hs.get_ip_from_request(request),
+                password_servlet=True,
             )
 
             if LoginType.EMAIL_IDENTITY in result:
@@ -417,6 +649,7 @@ class WhoamiRestServlet(RestServlet):
 def register_servlets(hs, http_server):
     EmailPasswordRequestTokenRestServlet(hs).register(http_server)
     MsisdnPasswordRequestTokenRestServlet(hs).register(http_server)
+    PasswordResetSubmitTokenServlet(hs).register(http_server)
     PasswordRestServlet(hs).register(http_server)
     DeactivateAccountRestServlet(hs).register(http_server)
     EmailThreepidRequestTokenRestServlet(hs).register(http_server)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 52891bb9eb..ae891aa332 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -588,6 +588,10 @@ class SQLBaseStore(object):
         Args:
             table : string giving the table name
             values : dict of new column names and values for them
+            or_ignore : bool stating whether an exception should be raised
+                when a conflicting row already exists. If True, False will be
+                returned by the function instead
+            desc : string giving a description of the transaction
 
         Returns:
             bool: Whether the row was inserted or not. Only useful when
@@ -1228,8 +1232,8 @@ class SQLBaseStore(object):
         )
 
         txn.execute(select_sql, list(keyvalues.values()))
-
         row = txn.fetchone()
+
         if not row:
             if allow_none:
                 return None
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c1711bc8bd..23a4baa484 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 54
+SCHEMA_VERSION = 55
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 4cf159ba81..9b41cbd757 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -17,17 +17,20 @@
 
 import re
 
+from six import iterkeys
 from six.moves import range
 
 from twisted.internet import defer
 
 from synapse.api.constants import UserTypes
-from synapse.api.errors import Codes, StoreError
+from synapse.api.errors import Codes, StoreError, ThreepidValidationError
 from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.types import UserID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
+THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
@@ -422,7 +425,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         defer.returnValue(None)
 
     @defer.inlineCallbacks
-    def get_user_id_by_threepid(self, medium, address):
+    def get_user_id_by_threepid(self, medium, address, require_verified=False):
         """Returns user id from threepid
 
         Args:
@@ -595,6 +598,11 @@ class RegistrationStore(
             "user_threepids_grandfather", self._bg_user_threepids_grandfather,
         )
 
+        # Create a background job for culling expired 3PID validity tokens
+        hs.get_clock().looping_call(
+            self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
+        )
+
     @defer.inlineCallbacks
     def add_access_token_to_user(self, user_id, token, device_id=None):
         """Adds an access token for the given user.
@@ -963,7 +971,6 @@ class RegistrationStore(
         We do this by grandfathering in existing user threepids assuming that
         they used one of the server configured trusted identity servers.
         """
-
         id_servers = set(self.config.trusted_third_party_id_servers)
 
         def _bg_user_threepids_grandfather_txn(txn):
@@ -984,3 +991,280 @@ class RegistrationStore(
         yield self._end_background_update("user_threepids_grandfather")
 
         defer.returnValue(1)
+
+    def get_threepid_validation_session(
+        self,
+        medium,
+        client_secret,
+        address=None,
+        sid=None,
+        validated=None,
+    ):
+        """Gets a session_id and last_send_attempt (if available) for a
+        client_secret/medium/(address|session_id) combo
+
+        Args:
+            medium (str|None): The medium of the 3PID
+            address (str|None): The address of the 3PID
+            sid (str|None): The ID of the validation session
+            client_secret (str|None): A unique string provided by the client to
+                help identify this validation attempt
+            validated (bool|None): Whether sessions should be filtered by
+                whether they have been validated already or not. None to
+                perform no filtering
+
+        Returns:
+            deferred {str, int}|None: A dict containing the
+                latest session_id and send_attempt count for this 3PID.
+                Otherwise None if there hasn't been a previous attempt
+        """
+        keyvalues = {
+            "medium": medium,
+            "client_secret": client_secret,
+        }
+        if address:
+            keyvalues["address"] = address
+        if sid:
+            keyvalues["session_id"] = sid
+
+        assert(address or sid)
+
+        def get_threepid_validation_session_txn(txn):
+            sql = """
+                SELECT address, session_id, medium, client_secret,
+                last_send_attempt, validated_at
+                FROM threepid_validation_session WHERE %s
+                """ % (" AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),)
+
+            if validated is not None:
+                sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
+
+            sql += " LIMIT 1"
+
+            txn.execute(sql, list(keyvalues.values()))
+            rows = self.cursor_to_dict(txn)
+            if not rows:
+                return None
+
+            return rows[0]
+
+        return self.runInteraction(
+            "get_threepid_validation_session",
+            get_threepid_validation_session_txn,
+        )
+
+    def validate_threepid_session(
+        self,
+        session_id,
+        client_secret,
+        token,
+        current_ts,
+    ):
+        """Attempt to validate a threepid session using a token
+
+        Args:
+            session_id (str): The id of a validation session
+            client_secret (str): A unique string provided by the client to
+                help identify this validation attempt
+            token (str): A validation token
+            current_ts (int): The current unix time in milliseconds. Used for
+                checking token expiry status
+
+        Returns:
+            deferred str|None: A str representing a link to redirect the user
+            to if there is one.
+        """
+        # Insert everything into a transaction in order to run atomically
+        def validate_threepid_session_txn(txn):
+            row = self._simple_select_one_txn(
+                txn,
+                table="threepid_validation_session",
+                keyvalues={"session_id": session_id},
+                retcols=["client_secret", "validated_at"],
+                allow_none=True,
+            )
+
+            if not row:
+                raise ThreepidValidationError(400, "Unknown session_id")
+            retrieved_client_secret = row["client_secret"]
+            validated_at = row["validated_at"]
+
+            if retrieved_client_secret != client_secret:
+                raise ThreepidValidationError(
+                    400, "This client_secret does not match the provided session_id",
+                )
+
+            row = self._simple_select_one_txn(
+                txn,
+                table="threepid_validation_token",
+                keyvalues={"session_id": session_id, "token": token},
+                retcols=["expires", "next_link"],
+                allow_none=True,
+            )
+
+            if not row:
+                raise ThreepidValidationError(
+                    400, "Validation token not found or has expired",
+                )
+            expires = row["expires"]
+            next_link = row["next_link"]
+
+            # If the session is already validated, no need to revalidate
+            if validated_at:
+                return next_link
+
+            if expires <= current_ts:
+                raise ThreepidValidationError(
+                    400, "This token has expired. Please request a new one",
+                )
+
+            # Looks good. Validate the session
+            self._simple_update_txn(
+                txn,
+                table="threepid_validation_session",
+                keyvalues={"session_id": session_id},
+                updatevalues={"validated_at": self.clock.time_msec()},
+            )
+
+            return next_link
+
+        # Return next_link if it exists
+        return self.runInteraction(
+            "validate_threepid_session_txn",
+            validate_threepid_session_txn,
+        )
+
+    def upsert_threepid_validation_session(
+        self,
+        medium,
+        address,
+        client_secret,
+        send_attempt,
+        session_id,
+        validated_at=None,
+    ):
+        """Upsert a threepid validation session
+        Args:
+            medium (str): The medium of the 3PID
+            address (str): The address of the 3PID
+            client_secret (str): A unique string provided by the client to
+                help identify this validation attempt
+            send_attempt (int): The latest send_attempt on this session
+            session_id (str): The id of this validation session
+            validated_at (int|None): The unix timestamp in milliseconds of
+                when the session was marked as valid
+        """
+        insertion_values = {
+            "medium": medium,
+            "address": address,
+            "client_secret": client_secret,
+        }
+
+        if validated_at:
+            insertion_values["validated_at"] = validated_at
+
+        return self._simple_upsert(
+            table="threepid_validation_session",
+            keyvalues={"session_id": session_id},
+            values={"last_send_attempt": send_attempt},
+            insertion_values=insertion_values,
+            desc="upsert_threepid_validation_session",
+        )
+
+    def start_or_continue_validation_session(
+        self,
+        medium,
+        address,
+        session_id,
+        client_secret,
+        send_attempt,
+        next_link,
+        token,
+        token_expires,
+    ):
+        """Creates a new threepid validation session if it does not already
+        exist and associates a new validation token with it
+
+        Args:
+            medium (str): The medium of the 3PID
+            address (str): The address of the 3PID
+            session_id (str): The id of this validation session
+            client_secret (str): A unique string provided by the client to
+                help identify this validation attempt
+            send_attempt (int): The latest send_attempt on this session
+            next_link (str|None): The link to redirect the user to upon
+                successful validation
+            token (str): The validation token
+            token_expires (int): The timestamp for which after the token
+                will no longer be valid
+        """
+        def start_or_continue_validation_session_txn(txn):
+            # Create or update a validation session
+            self._simple_upsert_txn(
+                txn,
+                table="threepid_validation_session",
+                keyvalues={"session_id": session_id},
+                values={"last_send_attempt": send_attempt},
+                insertion_values={
+                    "medium": medium,
+                    "address": address,
+                    "client_secret": client_secret,
+                },
+            )
+
+            # Create a new validation token with this session ID
+            self._simple_insert_txn(
+                txn,
+                table="threepid_validation_token",
+                values={
+                    "session_id": session_id,
+                    "token": token,
+                    "next_link": next_link,
+                    "expires": token_expires,
+                },
+            )
+
+        return self.runInteraction(
+            "start_or_continue_validation_session",
+            start_or_continue_validation_session_txn,
+        )
+
+    def cull_expired_threepid_validation_tokens(self):
+        """Remove threepid validation tokens with expiry dates that have passed"""
+        def cull_expired_threepid_validation_tokens_txn(txn, ts):
+            sql = """
+            DELETE FROM threepid_validation_token WHERE
+            expires < ?
+            """
+            return txn.execute(sql, (ts,))
+
+        return self.runInteraction(
+            "cull_expired_threepid_validation_tokens",
+            cull_expired_threepid_validation_tokens_txn,
+            self.clock.time_msec(),
+        )
+
+    def delete_threepid_session(self, session_id):
+        """Removes a threepid validation session from the database. This can
+        be done after validation has been performed and whatever action was
+        waiting on it has been carried out
+
+        Args:
+            session_id (str): The ID of the session to delete
+        """
+        def delete_threepid_session_txn(txn):
+            self._simple_delete_txn(
+                txn,
+                table="threepid_validation_token",
+                keyvalues={"session_id": session_id},
+            )
+            self._simple_delete_txn(
+                txn,
+                table="threepid_validation_session",
+                keyvalues={"session_id": session_id},
+            )
+
+        return self.runInteraction(
+            "delete_threepid_session",
+            delete_threepid_session_txn,
+        )
diff --git a/synapse/storage/schema/delta/55/track_threepid_validations.sql b/synapse/storage/schema/delta/55/track_threepid_validations.sql
new file mode 100644
index 0000000000..a8eced2e0a
--- /dev/null
+++ b/synapse/storage/schema/delta/55/track_threepid_validations.sql
@@ -0,0 +1,31 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+CREATE TABLE IF NOT EXISTS threepid_validation_session (
+    session_id TEXT PRIMARY KEY,
+    medium TEXT NOT NULL,
+    address TEXT NOT NULL,
+    client_secret TEXT NOT NULL,
+    last_send_attempt BIGINT NOT NULL,
+    validated_at BIGINT
+);
+
+CREATE TABLE IF NOT EXISTS threepid_validation_token (
+    token TEXT PRIMARY KEY,
+    session_id TEXT NOT NULL,
+    next_link TEXT,
+    expires BIGINT NOT NULL
+);
+
+CREATE INDEX threepid_validation_token_session_id ON threepid_validation_token(session_id);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 529ad4ea79..6f7f65d96b 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -592,8 +592,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     def get_max_topological_token(self, room_id, stream_key):
+        """Get the max topological token in a room before the given stream
+        ordering.
+
+        Args:
+            room_id (str)
+            stream_key (int)
+
+        Returns:
+            Deferred[int]
+        """
         sql = (
-            "SELECT max(topological_ordering) FROM events"
+            "SELECT coalesce(max(topological_ordering), 0) FROM events"
             " WHERE room_id = ? AND stream_ordering < ?"
         )
         return self._execute(
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 18121f4f6c..4b1901ce31 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -19,7 +19,7 @@ from mock import Mock
 import canonicaljson
 import signedjson.key
 import signedjson.sign
-from signedjson.key import get_verify_key
+from signedjson.key import encode_verify_key_base64, get_verify_key
 
 from twisted.internet import defer
 
@@ -40,7 +40,7 @@ class MockPerspectiveServer(object):
 
     def get_verify_keys(self):
         vk = signedjson.key.get_verify_key(self.key)
-        return {"%s:%s" % (vk.alg, vk.version): vk}
+        return {"%s:%s" % (vk.alg, vk.version): encode_verify_key_base64(vk)}
 
     def get_signed_key(self, server_name, verify_key):
         key_id = "%s:%s" % (verify_key.alg, verify_key.version)
@@ -48,9 +48,7 @@ class MockPerspectiveServer(object):
             "server_name": server_name,
             "old_verify_keys": {},
             "valid_until_ts": time.time() * 1000 + 3600,
-            "verify_keys": {
-                key_id: {"key": signedjson.key.encode_verify_key_base64(verify_key)}
-            },
+            "verify_keys": {key_id: {"key": encode_verify_key_base64(verify_key)}},
         }
         self.sign_response(res)
         return res
@@ -63,10 +61,18 @@ class KeyringTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
         self.mock_perspective_server = MockPerspectiveServer()
         self.http_client = Mock()
-        hs = self.setup_test_homeserver(handlers=None, http_client=self.http_client)
-        keys = self.mock_perspective_server.get_verify_keys()
-        hs.config.perspectives = {self.mock_perspective_server.server_name: keys}
-        return hs
+
+        config = self.default_config()
+        config["trusted_key_servers"] = [
+            {
+                "server_name": self.mock_perspective_server.server_name,
+                "verify_keys": self.mock_perspective_server.get_verify_keys(),
+            }
+        ]
+
+        return self.setup_test_homeserver(
+            handlers=None, http_client=self.http_client, config=config
+        )
 
     def check_context(self, _, expected):
         self.assertEquals(
@@ -371,10 +377,18 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
         self.mock_perspective_server = MockPerspectiveServer()
         self.http_client = Mock()
-        hs = self.setup_test_homeserver(handlers=None, http_client=self.http_client)
-        keys = self.mock_perspective_server.get_verify_keys()
-        hs.config.perspectives = {self.mock_perspective_server.server_name: keys}
-        return hs
+
+        config = self.default_config()
+        config["trusted_key_servers"] = [
+            {
+                "server_name": self.mock_perspective_server.server_name,
+                "verify_keys": self.mock_perspective_server.get_verify_keys(),
+            }
+        ]
+
+        return self.setup_test_homeserver(
+            handlers=None, http_client=self.http_client, config=config
+        )
 
     def test_get_keys_from_perspectives(self):
         # arbitrarily advance the clock a bit
@@ -439,8 +453,7 @@ class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
         self.assertEqual(res["ts_valid_until_ms"], VALID_UNTIL_TS)
 
         self.assertEqual(
-            bytes(res["key_json"]),
-            canonicaljson.encode_canonical_json(response),
+            bytes(res["key_json"]), canonicaljson.encode_canonical_json(response)
         )
 
     def test_invalid_perspectives_responses(self):
diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py
index 4153da4da7..05880a1048 100644
--- a/tests/http/federation/test_matrix_federation_agent.py
+++ b/tests/http/federation/test_matrix_federation_agent.py
@@ -57,6 +57,7 @@ class MatrixFederationAgentTests(TestCase):
         # present will not be trusted. We should do better here, though.
         config_dict = default_config("test", parse=False)
         config_dict["federation_verify_certificates"] = False
+        config_dict["trusted_key_servers"] = []
         config = HomeServerConfig()
         config.parse_config_dict(config_dict)
 
diff --git a/tests/utils.py b/tests/utils.py
index a4c430148f..f8c7ad2604 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -132,7 +132,6 @@ def default_config(name, parse=False):
         "password_providers": [],
         "worker_replication_url": "",
         "worker_app": None,
-        "email_enable_notifs": False,
         "block_non_admin_invites": False,
         "federation_domain_whitelist": None,
         "filter_timeline_limit": 5000,