summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py9
-rw-r--r--synapse/api/auth.py15
-rwxr-xr-xsynapse/app/homeserver.py2
-rw-r--r--synapse/config/emailconfig.py11
-rw-r--r--synapse/config/key.py27
-rw-r--r--synapse/crypto/context_factory.py180
-rw-r--r--synapse/crypto/keyring.py7
-rw-r--r--synapse/federation/sender/per_destination_queue.py40
-rw-r--r--synapse/handlers/auth.py1
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/metrics/__init__.py112
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/python_dependencies.py9
-rw-r--r--synapse/res/templates/password_reset_success.html2
-rw-r--r--synapse/rest/client/v2_alpha/account.py28
-rw-r--r--synapse/rest/client/v2_alpha/account_validity.py2
-rw-r--r--synapse/storage/__init__.py44
-rw-r--r--synapse/storage/deviceinbox.py4
-rw-r--r--synapse/storage/events.py44
-rw-r--r--synapse/storage/keys.py8
-rw-r--r--synapse/storage/registration.py116
-rw-r--r--synapse/storage/schema/delta/55/users_alter_deactivated.sql19
22 files changed, 514 insertions, 172 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 77a4cfc3a5..0c01546789 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,6 +17,13 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
+import sys
+
+# Check that we're not running on an unsupported Python version.
+if sys.version_info < (3, 5):
+    print("Synapse requires Python 3.5 or above.")
+    sys.exit(1)
+
 try:
     from twisted.internet import protocol
     from twisted.internet.protocol import Factory
@@ -27,4 +34,4 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.0.0rc1"
+__version__ = "1.0.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 0c6c93a87b..79e2808dc5 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -184,11 +184,22 @@ class Auth(object):
         return event_auth.get_public_keys(invite_event)
 
     @defer.inlineCallbacks
-    def get_user_by_req(self, request, allow_guest=False, rights="access"):
+    def get_user_by_req(
+        self,
+        request,
+        allow_guest=False,
+        rights="access",
+        allow_expired=False,
+    ):
         """ Get a registered user's ID.
 
         Args:
             request - An HTTP request with an access_token query parameter.
+            allow_expired - Whether to allow the request through even if the account is
+                expired. If true, Synapse will still require an access token to be
+                provided but won't check if the account it belongs to has expired. This
+                works thanks to /login delivering access tokens regardless of accounts'
+                expiration.
         Returns:
             defer.Deferred: resolves to a ``synapse.types.Requester`` object
         Raises:
@@ -229,7 +240,7 @@ class Auth(object):
             is_guest = user_info["is_guest"]
 
             # Deny the request if the user account has expired.
-            if self._account_validity.enabled:
+            if self._account_validity.enabled and not allow_expired:
                 user_id = user.to_string()
                 expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
                 if expiration_ts is not None and self.clock.time_msec() >= expiration_ts:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index df524a23dd..b27b12e73d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -176,7 +176,6 @@ class SynapseHomeServer(HomeServer):
 
             resources.update({
                 "/_matrix/client/api/v1": client_resource,
-                "/_synapse/password_reset": client_resource,
                 "/_matrix/client/r0": client_resource,
                 "/_matrix/client/unstable": client_resource,
                 "/_matrix/client/v2_alpha": client_resource,
@@ -541,6 +540,7 @@ def run(hs):
         stats["total_room_count"] = room_count
 
         stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
+        stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
         stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index ae04252906..86018dfcce 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -19,15 +19,12 @@ from __future__ import print_function
 
 # This file can't be called email.py because if it is, we cannot:
 import email.utils
-import logging
 import os
 
 import pkg_resources
 
 from ._base import Config, ConfigError
 
-logger = logging.getLogger(__name__)
-
 
 class EmailConfig(Config):
     def read_config(self, config):
@@ -85,10 +82,12 @@ class EmailConfig(Config):
         self.email_password_reset_behaviour = (
             "remote" if email_trust_identity_server_for_password_resets else "local"
         )
+        self.password_resets_were_disabled_due_to_email_config = False
         if self.email_password_reset_behaviour == "local" and email_config == {}:
-            logger.warn(
-                "User password resets have been disabled due to lack of email config"
-            )
+            # We cannot warn the user this has happened here
+            # Instead do so when a user attempts to reset their password
+            self.password_resets_were_disabled_due_to_email_config = True
+
             self.email_password_reset_behaviour = "off"
 
         # Get lifetime of a validation token in milliseconds
diff --git a/synapse/config/key.py b/synapse/config/key.py
index aba7092ccd..424875feae 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -41,6 +41,15 @@ validation or TLS certificate validation. This is likely to be very insecure. If
 you are *sure* you want to do this, set 'accept_keys_insecurely' on the
 keyserver configuration."""
 
+RELYING_ON_MATRIX_KEY_ERROR = """\
+Your server is configured to accept key server responses without TLS certificate
+validation, and which are only signed by the old (possibly compromised)
+matrix.org signing key 'ed25519:auto'. This likely isn't what you want to do,
+and you should enable 'federation_verify_certificates' in your configuration.
+
+If you are *sure* you want to do this, set 'accept_keys_insecurely' on the
+trusted_key_server configuration."""
+
 
 logger = logging.getLogger(__name__)
 
@@ -340,10 +349,20 @@ def _parse_key_servers(key_servers, federation_verify_certificates):
                 result.verify_keys[key_id] = verify_key
 
         if (
-            not verify_keys
-            and not server.get("accept_keys_insecurely")
-            and not federation_verify_certificates
+            not federation_verify_certificates and
+            not server.get("accept_keys_insecurely")
         ):
-            raise ConfigError(INSECURE_NOTARY_ERROR)
+            _assert_keyserver_has_verify_keys(result)
 
         yield result
+
+
+def _assert_keyserver_has_verify_keys(trusted_key_server):
+    if not trusted_key_server.verify_keys:
+        raise ConfigError(INSECURE_NOTARY_ERROR)
+
+    # also check that they are not blindly checking the old matrix.org key
+    if trusted_key_server.server_name == "matrix.org" and any(
+        key_id == "ed25519:auto" for key_id in trusted_key_server.verify_keys
+    ):
+        raise ConfigError(RELYING_ON_MATRIX_KEY_ERROR)
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 59ea087e66..2bc5cc3807 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -15,10 +15,13 @@
 
 import logging
 
+import idna
+from service_identity import VerificationError
+from service_identity.pyopenssl import verify_hostname, verify_ip_address
 from zope.interface import implementer
 
 from OpenSSL import SSL, crypto
-from twisted.internet._sslverify import ClientTLSOptions, _defaultCurveName
+from twisted.internet._sslverify import _defaultCurveName
 from twisted.internet.abstract import isIPAddress, isIPv6Address
 from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
 from twisted.internet.ssl import CertificateOptions, ContextFactory, platformTrust
@@ -56,79 +59,19 @@ class ServerContextFactory(ContextFactory):
         return self._context
 
 
-def _idnaBytes(text):
-    """
-    Convert some text typed by a human into some ASCII bytes. This is a
-    copy of twisted.internet._idna._idnaBytes. For documentation, see the
-    twisted documentation.
-    """
-    try:
-        import idna
-    except ImportError:
-        return text.encode("idna")
-    else:
-        return idna.encode(text)
-
-
-def _tolerateErrors(wrapped):
-    """
-    Wrap up an info_callback for pyOpenSSL so that if something goes wrong
-    the error is immediately logged and the connection is dropped if possible.
-    This is a copy of twisted.internet._sslverify._tolerateErrors. For
-    documentation, see the twisted documentation.
-    """
-
-    def infoCallback(connection, where, ret):
-        try:
-            return wrapped(connection, where, ret)
-        except:  # noqa: E722, taken from the twisted implementation
-            f = Failure()
-            logger.exception("Error during info_callback")
-            connection.get_app_data().failVerification(f)
-
-    return infoCallback
+class ClientTLSOptionsFactory(object):
+    """Factory for Twisted SSLClientConnectionCreators that are used to make connections
+    to remote servers for federation.
 
+    Uses one of two OpenSSL context objects for all connections, depending on whether
+    we should do SSL certificate verification.
 
-@implementer(IOpenSSLClientConnectionCreator)
-class ClientTLSOptionsNoVerify(object):
-    """
-    Client creator for TLS without certificate identity verification. This is a
-    copy of twisted.internet._sslverify.ClientTLSOptions with the identity
-    verification left out. For documentation, see the twisted documentation.
+    get_options decides whether we should do SSL certificate verification and
+    constructs an SSLClientConnectionCreator factory accordingly.
     """
 
-    def __init__(self, hostname, ctx):
-        self._ctx = ctx
-
-        if isIPAddress(hostname) or isIPv6Address(hostname):
-            self._hostnameBytes = hostname.encode('ascii')
-            self._sendSNI = False
-        else:
-            self._hostnameBytes = _idnaBytes(hostname)
-            self._sendSNI = True
-
-        ctx.set_info_callback(_tolerateErrors(self._identityVerifyingInfoCallback))
-
-    def clientConnectionForTLS(self, tlsProtocol):
-        context = self._ctx
-        connection = SSL.Connection(context, None)
-        connection.set_app_data(tlsProtocol)
-        return connection
-
-    def _identityVerifyingInfoCallback(self, connection, where, ret):
-        # Literal IPv4 and IPv6 addresses are not permitted
-        # as host names according to the RFCs
-        if where & SSL.SSL_CB_HANDSHAKE_START and self._sendSNI:
-            connection.set_tlsext_host_name(self._hostnameBytes)
-
-
-class ClientTLSOptionsFactory(object):
-    """Factory for Twisted ClientTLSOptions that are used to make connections
-    to remote servers for federation."""
-
     def __init__(self, config):
         self._config = config
-        self._options_noverify = CertificateOptions()
 
         # Check if we're using a custom list of a CA certificates
         trust_root = config.federation_ca_trust_root
@@ -136,11 +79,13 @@ class ClientTLSOptionsFactory(object):
             # Use CA root certs provided by OpenSSL
             trust_root = platformTrust()
 
-        self._options_verify = CertificateOptions(trustRoot=trust_root)
+        self._verify_ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
+        self._verify_ssl_context.set_info_callback(self._context_info_cb)
 
-    def get_options(self, host):
-        # Use _makeContext so that we get a fresh OpenSSL CTX each time.
+        self._no_verify_ssl_context = CertificateOptions().getContext()
+        self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
 
+    def get_options(self, host):
         # Check if certificate verification has been enabled
         should_verify = self._config.federation_verify_certificates
 
@@ -151,6 +96,93 @@ class ClientTLSOptionsFactory(object):
                     should_verify = False
                     break
 
-        if should_verify:
-            return ClientTLSOptions(host, self._options_verify._makeContext())
-        return ClientTLSOptionsNoVerify(host, self._options_noverify._makeContext())
+        ssl_context = (
+            self._verify_ssl_context if should_verify else self._no_verify_ssl_context
+        )
+
+        return SSLClientConnectionCreator(host, ssl_context, should_verify)
+
+    @staticmethod
+    def _context_info_cb(ssl_connection, where, ret):
+        """The 'information callback' for our openssl context object."""
+        # we assume that the app_data on the connection object has been set to
+        # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
+        tls_protocol = ssl_connection.get_app_data()
+        try:
+            # ... we further assume that SSLClientConnectionCreator has set the
+            # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
+            tls_protocol._synapse_tls_verifier.verify_context_info_cb(
+                ssl_connection, where
+            )
+        except:  # noqa: E722, taken from the twisted implementation
+            logger.exception("Error during info_callback")
+            f = Failure()
+            tls_protocol.failVerification(f)
+
+
+@implementer(IOpenSSLClientConnectionCreator)
+class SSLClientConnectionCreator(object):
+    """Creates openssl connection objects for client connections.
+
+    Replaces twisted.internet.ssl.ClientTLSOptions
+    """
+
+    def __init__(self, hostname, ctx, verify_certs):
+        self._ctx = ctx
+        self._verifier = ConnectionVerifier(hostname, verify_certs)
+
+    def clientConnectionForTLS(self, tls_protocol):
+        context = self._ctx
+        connection = SSL.Connection(context, None)
+
+        # as per twisted.internet.ssl.ClientTLSOptions, we set the application
+        # data to our TLSMemoryBIOProtocol...
+        connection.set_app_data(tls_protocol)
+
+        # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
+        # tls_protocol so that the SSL context's info callback has something to
+        # call to do the cert verification.
+        setattr(tls_protocol, "_synapse_tls_verifier", self._verifier)
+        return connection
+
+
+class ConnectionVerifier(object):
+    """Set the SNI, and do cert verification
+
+    This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
+    the ssl context's info callback.
+    """
+
+    # This code is based on twisted.internet.ssl.ClientTLSOptions.
+
+    def __init__(self, hostname, verify_certs):
+        self._verify_certs = verify_certs
+
+        if isIPAddress(hostname) or isIPv6Address(hostname):
+            self._hostnameBytes = hostname.encode("ascii")
+            self._is_ip_address = True
+        else:
+            # twisted's ClientTLSOptions falls back to the stdlib impl here if
+            # idna is not installed, but points out that lacks support for
+            # IDNA2008 (http://bugs.python.org/issue17305).
+            #
+            # We can rely on having idna.
+            self._hostnameBytes = idna.encode(hostname)
+            self._is_ip_address = False
+
+        self._hostnameASCII = self._hostnameBytes.decode("ascii")
+
+    def verify_context_info_cb(self, ssl_connection, where):
+        if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
+            ssl_connection.set_tlsext_host_name(self._hostnameBytes)
+
+        if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
+            try:
+                if self._is_ip_address:
+                    verify_ip_address(ssl_connection, self._hostnameASCII)
+                else:
+                    verify_hostname(ssl_connection, self._hostnameASCII)
+            except VerificationError:
+                f = Failure()
+                tls_protocol = ssl_connection.get_app_data()
+                tls_protocol.failVerification(f)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 96964b0d50..6f603f1961 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -750,13 +750,6 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
                 verify_signed_json(response, perspective_name, perspective_keys[key_id])
                 verified = True
 
-                if perspective_name == "matrix.org" and key_id == "ed25519:auto":
-                    logger.warning(
-                        "Trusting trusted_key_server responses signed by the "
-                        "compromised matrix.org signing key 'ed25519:auto'. "
-                        "This is a placebo."
-                    )
-
         if not verified:
             raise KeyLookupError(
                 "Response not signed with a known key: signed with: %r, known keys: %r"
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 564c57203d..22a2735405 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -189,11 +189,21 @@ class PerDestinationQueue(object):
 
             pending_pdus = []
             while True:
-                device_message_edus, device_stream_id, dev_list_id = (
-                    # We have to keep 2 free slots for presence and rr_edus
-                    yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
+                # We have to keep 2 free slots for presence and rr_edus
+                limit = MAX_EDUS_PER_TRANSACTION - 2
+
+                device_update_edus, dev_list_id = (
+                    yield self._get_device_update_edus(limit)
+                )
+
+                limit -= len(device_update_edus)
+
+                to_device_edus, device_stream_id = (
+                    yield self._get_to_device_message_edus(limit)
                 )
 
+                pending_edus = device_update_edus + to_device_edus
+
                 # BEGIN CRITICAL SECTION
                 #
                 # In order to avoid a race condition, we need to make sure that
@@ -208,10 +218,6 @@ class PerDestinationQueue(object):
                 # We can only include at most 50 PDUs per transactions
                 pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
 
-                pending_edus = []
-
-                # We can only include at most 100 EDUs per transactions
-                # rr_edus and pending_presence take at most one slot each
                 pending_edus.extend(self._get_rr_edus(force_flush=False))
                 pending_presence = self._pending_presence
                 self._pending_presence = {}
@@ -232,7 +238,6 @@ class PerDestinationQueue(object):
                         )
                     )
 
-                pending_edus.extend(device_message_edus)
                 pending_edus.extend(
                     self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
                 )
@@ -272,10 +277,13 @@ class PerDestinationQueue(object):
                         sent_edus_by_type.labels(edu.edu_type).inc()
                     # Remove the acknowledged device messages from the database
                     # Only bother if we actually sent some device messages
-                    if device_message_edus:
+                    if to_device_edus:
                         yield self._store.delete_device_msgs_for_remote(
                             self._destination, device_stream_id
                         )
+
+                    # also mark the device updates as sent
+                    if device_update_edus:
                         logger.info(
                             "Marking as sent %r %r", self._destination, dev_list_id
                         )
@@ -347,7 +355,7 @@ class PerDestinationQueue(object):
         return pending_edus
 
     @defer.inlineCallbacks
-    def _get_new_device_messages(self, limit):
+    def _get_device_update_edus(self, limit):
         last_device_list = self._last_device_list_stream_id
 
         # Retrieve list of new device updates to send to the destination
@@ -366,15 +374,19 @@ class PerDestinationQueue(object):
 
         assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
 
+        defer.returnValue((edus, now_stream_id))
+
+    @defer.inlineCallbacks
+    def _get_to_device_message_edus(self, limit):
         last_device_stream_id = self._last_device_stream_id
         to_device_stream_id = self._store.get_to_device_stream_token()
         contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
             self._destination,
             last_device_stream_id,
             to_device_stream_id,
-            limit - len(edus),
+            limit,
         )
-        edus.extend(
+        edus = [
             Edu(
                 origin=self._server_name,
                 destination=self._destination,
@@ -382,6 +394,6 @@ class PerDestinationQueue(object):
                 content=content,
             )
             for content in contents
-        )
+        ]
 
-        defer.returnValue((edus, stream_id, now_stream_id))
+        defer.returnValue((edus, stream_id))
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7f8ddc99c6..a0cf37a9f9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -479,6 +479,7 @@ class AuthHandler(BaseHandler):
                 medium,
                 threepid_creds["client_secret"],
                 sid=threepid_creds["sid"],
+                validated=True,
             )
 
             threepid = {
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 6a91f7698e..b29089d82c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -114,6 +115,9 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        # Mark the user as deactivated.
+        yield self.store.set_user_deactivated_status(user_id, True)
+
         defer.returnValue(identity_server_supports_unbinding)
 
     def _start_user_parting(self):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..0d3ae1a43d 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
 
 import attr
 from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
 
 from twisted.internet import reactor
 
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 
 
 class RegistryProxy(object):
-
     @staticmethod
     def collect():
         for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
         try:
             calls = self.caller()
         except Exception:
-            logger.exception(
-                "Exception running callback for LaterGauge(%s)",
-                self.name,
-            )
+            logger.exception("Exception running callback for LaterGauge(%s)", self.name)
             yield g
             return
 
@@ -116,9 +112,7 @@ class InFlightGauge(object):
         # Create a class which have the sub_metrics values as attributes, which
         # default to 0 on initialization. Used to pass to registered callbacks.
         self._metrics_class = attr.make_class(
-            "_MetricsEntry",
-            attrs={x: attr.ib(0) for x in sub_metrics},
-            slots=True,
+            "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
         )
 
         # Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
 
         Note: may be called by a separate thread.
         """
-        in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+        in_flight = GaugeMetricFamily(
+            self.name + "_total", self.desc, labels=self.labels
+        )
 
         metrics_by_key = {}
 
@@ -179,7 +175,9 @@ class InFlightGauge(object):
         yield in_flight
 
         for name in self.sub_metrics:
-            gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+            gauge = GaugeMetricFamily(
+                "_".join([self.name, name]), "", labels=self.labels
+            )
             for key, metrics in six.iteritems(metrics_by_key):
                 gauge.add_metric(key, getattr(metrics, name))
             yield gauge
@@ -193,12 +191,75 @@ class InFlightGauge(object):
         all_gauges[self.name] = self
 
 
+@attr.s(hash=True)
+class BucketCollector(object):
+    """
+    Like a Histogram, but allows buckets to be point-in-time instead of
+    incrementally added to.
+
+    Args:
+        name (str): Base name of metric to be exported to Prometheus.
+        data_collector (callable -> dict): A synchronous callable that
+            returns a dict mapping bucket to number of items in the
+            bucket. If these buckets are not the same as the buckets
+            given to this class, they will be remapped into them.
+        buckets (list[float]): List of floats/ints of the buckets to
+            give to Prometheus. +Inf is ignored, if given.
+
+    """
+
+    name = attr.ib()
+    data_collector = attr.ib()
+    buckets = attr.ib()
+
+    def collect(self):
+
+        # Fetch the data -- this must be synchronous!
+        data = self.data_collector()
+
+        buckets = {}
+
+        res = []
+        for x in data.keys():
+            for i, bound in enumerate(self.buckets):
+                if x <= bound:
+                    buckets[bound] = buckets.get(bound, 0) + data[x]
+                    break
+
+        for i in self.buckets:
+            res.append([str(i), buckets.get(i, 0)])
+
+        res.append(["+Inf", sum(data.values())])
+
+        metric = HistogramMetricFamily(
+            self.name,
+            "",
+            buckets=res,
+            sum_value=sum([x * y for x, y in data.items()]),
+        )
+        yield metric
+
+    def __attrs_post_init__(self):
+        self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+        if self.buckets != sorted(self.buckets):
+            raise ValueError("Buckets not sorted")
+
+        self.buckets = tuple(self.buckets)
+
+        if self.name in all_gauges.keys():
+            logger.warning("%s already registered, reregistering" % (self.name,))
+            REGISTRY.unregister(all_gauges.pop(self.name))
+
+        REGISTRY.register(self)
+        all_gauges[self.name] = self
+
+
 #
 # Detailed CPU metrics
 #
 
-class CPUMetrics(object):
 
+class CPUMetrics(object):
     def __init__(self):
         ticks_per_sec = 100
         try:
@@ -237,13 +298,28 @@ gc_time = Histogram(
     "python_gc_time",
     "Time taken to GC (sec)",
     ["gen"],
-    buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
-             5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+    buckets=[
+        0.0025,
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.10,
+        0.25,
+        0.50,
+        1.00,
+        2.50,
+        5.00,
+        7.50,
+        15.00,
+        30.00,
+        45.00,
+        60.00,
+    ],
 )
 
 
 class GCCounts(object):
-
     def collect(self):
         cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
         for n, m in enumerate(gc.get_count()):
@@ -279,9 +355,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
 events_processed_counter = Counter("synapse_federation_client_events_processed", "")
 
 event_processing_loop_counter = Counter(
-    "synapse_event_processing_loop_count",
-    "Event processing loop iterations",
-    ["name"],
+    "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
 )
 
 event_processing_loop_room_count = Counter(
@@ -311,7 +385,6 @@ last_ticked = time.time()
 
 
 class ReactorLastSeenMetric(object):
-
     def collect(self):
         cm = GaugeMetricFamily(
             "python_twisted_reactor_last_seen",
@@ -325,7 +398,6 @@ REGISTRY.register(ReactorLastSeenMetric())
 
 
 def runUntilCurrentTimer(func):
-
     @functools.wraps(func)
     def f(*args, **kwargs):
         now = reactor.seconds()
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 4bc9eb7313..099f9545ab 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -117,7 +117,7 @@ class Mailer(object):
 
         link = (
             self.hs.config.public_baseurl +
-            "_synapse/password_reset/email/submit_token"
+            "_matrix/client/unstable/password_reset/email/submit_token"
             "?token=%s&client_secret=%s&sid=%s" %
             (token, client_secret, sid)
         )
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c78f2cb15e..11ace2bfb1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -44,7 +44,10 @@ REQUIREMENTS = [
     "canonicaljson>=1.1.3",
     "signedjson>=1.0.0",
     "pynacl>=1.2.1",
-    "service_identity>=16.0.0",
+    "idna>=2.5",
+
+    # validating SSL certs for IP addresses requires service_identity 18.1.
+    "service_identity>=18.1.0",
 
     # our logcontext handling relies on the ability to cancel inlineCallbacks
     # (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7.
@@ -62,7 +65,7 @@ REQUIREMENTS = [
     "sortedcontainers>=1.4.4",
     "psutil>=2.0.0",
     "pymacaroons>=0.13.0",
-    "msgpack>=0.5.0",
+    "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
     "six>=1.10",
     # prometheus_client 0.4.0 changed the format of counter metrics
@@ -77,7 +80,7 @@ REQUIREMENTS = [
 ]
 
 CONDITIONAL_REQUIREMENTS = {
-    "email": ["Jinja2>=2.9", "bleach>=1.4.2"],
+    "email": ["Jinja2>=2.9", "bleach>=1.4.3"],
     "matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"],
 
     # we use execute_batch, which arrived in psycopg 2.7.
diff --git a/synapse/res/templates/password_reset_success.html b/synapse/res/templates/password_reset_success.html
index 7b6fa5e6f0..7324d66d1e 100644
--- a/synapse/res/templates/password_reset_success.html
+++ b/synapse/res/templates/password_reset_success.html
@@ -1,6 +1,6 @@
 <html>
 <head></head>
 <body>
-<p>Your password was successfully reset. You may now close this window.</p>
+<p>Your email has now been validated, please return to your client to reset your password. You may now close this window.</p>
 </body>
 </html>
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index e4c63b69b9..ab75f6c2b2 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -15,7 +15,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-import re
 
 from six.moves import http_client
 
@@ -68,7 +67,13 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request):
         if self.config.email_password_reset_behaviour == "off":
-            raise SynapseError(400, "Password resets have been disabled on this server")
+            if self.config.password_resets_were_disabled_due_to_email_config:
+                logger.warn(
+                    "User password resets have been disabled due to lack of email config"
+                )
+            raise SynapseError(
+                400, "Email-based password resets have been disabled on this server",
+            )
 
         body = parse_json_object_from_request(request)
 
@@ -196,9 +201,6 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request):
-        if not self.config.email_password_reset_behaviour == "off":
-            raise SynapseError(400, "Password resets have been disabled on this server")
-
         body = parse_json_object_from_request(request)
 
         assert_params_in_dict(body, [
@@ -228,9 +230,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
 
 class PasswordResetSubmitTokenServlet(RestServlet):
     """Handles 3PID validation token submission"""
-    PATTERNS = [
-        re.compile("^/_synapse/password_reset/(?P<medium>[^/]*)/submit_token/*$"),
-    ]
+    PATTERNS = client_patterns(
+        "/password_reset/(?P<medium>[^/]*)/submit_token/*$",
+        releases=(),
+        unstable=True,
+    )
 
     def __init__(self, hs):
         """
@@ -251,6 +255,14 @@ class PasswordResetSubmitTokenServlet(RestServlet):
                 400,
                 "This medium is currently not supported for password resets",
             )
+        if self.config.email_password_reset_behaviour == "off":
+            if self.config.password_resets_were_disabled_due_to_email_config:
+                logger.warn(
+                    "User password resets have been disabled due to lack of email config"
+                )
+            raise SynapseError(
+                400, "Email-based password resets have been disabled on this server",
+            )
 
         sid = parse_string(request, "sid")
         client_secret = parse_string(request, "client_secret")
diff --git a/synapse/rest/client/v2_alpha/account_validity.py b/synapse/rest/client/v2_alpha/account_validity.py
index 55c4ed5660..63bdc33564 100644
--- a/synapse/rest/client/v2_alpha/account_validity.py
+++ b/synapse/rest/client/v2_alpha/account_validity.py
@@ -79,7 +79,7 @@ class AccountValiditySendMailServlet(RestServlet):
         if not self.account_validity.renew_by_email_enabled:
             raise AuthError(403, "Account renewal via email is disabled on this server.")
 
-        requester = yield self.auth.get_user_by_req(request)
+        requester = yield self.auth.get_user_by_req(request, allow_expired=True)
         user_id = requester.user.to_string()
         yield self.account_activity_handler.send_renewal_email_to_user(user_id)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..0ca6f6121f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,37 @@ class DataStore(
         """
         Counts the number of users who used this homeserver in the last 24 hours.
         """
+        yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+        return self.runInteraction("count_daily_users", self._count_users, yesterday,)
 
-        def _count_users(txn):
-            yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
-            sql = """
-                SELECT COALESCE(count(*), 0) FROM (
-                    SELECT user_id FROM user_ips
-                    WHERE last_seen > ?
-                    GROUP BY user_id
-                ) u
-            """
-
-            txn.execute(sql, (yesterday,))
-            count, = txn.fetchone()
-            return count
+    def count_monthly_users(self):
+        """
+        Counts the number of users who used this homeserver in the last 30 days.
+        Note this method is intended for phonehome metrics only and is different
+        from the mau figure in synapse.storage.monthly_active_users which,
+        amongst other things, includes a 3 day grace period before a user counts.
+        """
+        thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+        return self.runInteraction(
+            "count_monthly_users",
+            self._count_users,
+            thirty_days_ago,
+        )
 
-        return self.runInteraction("count_users", _count_users)
+    def _count_users(self, txn, time_from):
+        """
+        Returns number of users seen in the past time_from period
+        """
+        sql = """
+            SELECT COALESCE(count(*), 0) FROM (
+                SELECT user_id FROM user_ips
+                WHERE last_seen > ?
+                GROUP BY user_id
+            ) u
+        """
+        txn.execute(sql, (time_from,))
+        count, = txn.fetchone()
+        return count
 
     def count_r30_users(self):
         """
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 9b0a99cb49..4ea0deea4f 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -138,6 +138,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         if not has_changed or last_stream_id == current_stream_id:
             return defer.succeed(([], current_stream_id))
 
+        if limit <= 0:
+            # This can happen if we run out of room for EDUs in the transaction.
+            return defer.succeed(([], last_stream_id))
+
         def get_new_messages_for_remote_destination_txn(txn):
             sql = (
                 "SELECT stream_id, messages_json FROM device_federation_outbox"
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..1578403f79 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
 
 import itertools
 import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
 from functools import wraps
 
 from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,38 @@ class EventsStore(
     EventsWorkerStore,
     BackgroundUpdateStore,
 ):
-
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
 
         self._event_persist_queue = _EventPeristenceQueue()
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+        # Collect metrics on the number of forward extremities that exist.
+        self._current_forward_extremities_amount = {}
+
+        BucketCollector(
+            "synapse_forward_extremities",
+            lambda: self._current_forward_extremities_amount,
+            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+        )
+
+        # Read the extrems every 60 minutes
+        hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+    @defer.inlineCallbacks
+    def _read_forward_extremities(self):
+        def fetch(txn):
+            txn.execute(
+                """
+                select count(*) c from event_forward_extremities
+                group by room_id
+                """
+            )
+            return txn.fetchall()
+
+        res = yield self.runInteraction("read_forward_extremities", fetch)
+        self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
@@ -568,17 +594,11 @@ class EventsStore(
             )
 
             txn.execute(sql, batch)
-            results.extend(
-                r[0]
-                for r in txn
-                if not json.loads(r[1]).get("soft_failed")
-            )
+            results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_events_which_are_prevs",
-                _get_events_which_are_prevs_txn,
-                chunk,
+                "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
             )
 
         defer.returnValue(results)
@@ -640,9 +660,7 @@ class EventsStore(
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_prevs_before_rejected",
-                _get_prevs_before_rejected_txn,
-                chunk,
+                "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
             )
 
         defer.returnValue(existing_prevs)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 5300720dbb..e3655ad8d7 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -80,6 +80,14 @@ class KeyStore(SQLBaseStore):
 
             for row in txn:
                 server_name, key_id, key_bytes, ts_valid_until_ms = row
+
+                if ts_valid_until_ms is None:
+                    # Old keys may be stored with a ts_valid_until_ms of null,
+                    # in which case we treat this as if it was set to `0`, i.e.
+                    # it won't match key requests that define a minimum
+                    # `ts_valid_until_ms`.
+                    ts_valid_until_ms = 0
+
                 res = FetchKeyResult(
                     verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
                     valid_until_ts=ts_valid_until_ms,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b41cbd757..4c5751b57f 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import re
 
 from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
 
+logger = logging.getLogger(__name__)
+
 
 class RegistrationWorkerStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
@@ -598,12 +601,76 @@ class RegistrationStore(
             "user_threepids_grandfather", self._bg_user_threepids_grandfather,
         )
 
+        self.register_background_update_handler(
+            "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag,
+        )
+
         # Create a background job for culling expired 3PID validity tokens
         hs.get_clock().looping_call(
             self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
         )
 
     @defer.inlineCallbacks
+    def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+        """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+        for each of them.
+        """
+
+        last_user = progress.get("user_id", "")
+
+        def _backgroud_update_set_deactivated_flag_txn(txn):
+            txn.execute(
+                """
+                SELECT
+                    users.name,
+                    COUNT(access_tokens.token) AS count_tokens,
+                    COUNT(user_threepids.address) AS count_threepids
+                FROM users
+                    LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+                    LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+                WHERE password_hash IS NULL OR password_hash = ''
+                AND users.name > ?
+                GROUP BY users.name
+                ORDER BY users.name ASC
+                LIMIT ?;
+                """,
+                (last_user, batch_size),
+            )
+
+            rows = self.cursor_to_dict(txn)
+
+            if not rows:
+                return True
+
+            rows_processed_nb = 0
+
+            for user in rows:
+                if not user["count_tokens"] and not user["count_threepids"]:
+                    self.set_user_deactivated_status_txn(txn, user["user_id"], True)
+                    rows_processed_nb += 1
+
+            logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+            self._background_update_progress_txn(
+                txn, "users_set_deactivated_flag", {"user_id": rows[-1]["user_id"]}
+            )
+
+            if batch_size > len(rows):
+                return True
+            else:
+                return False
+
+        end = yield self.runInteraction(
+            "users_set_deactivated_flag",
+            _backgroud_update_set_deactivated_flag_txn,
+        )
+
+        if end:
+            yield self._end_background_update("users_set_deactivated_flag")
+
+        defer.returnValue(batch_size)
+
+    @defer.inlineCallbacks
     def add_access_token_to_user(self, user_id, token, device_id=None):
         """Adds an access token for the given user.
 
@@ -998,7 +1065,7 @@ class RegistrationStore(
         client_secret,
         address=None,
         sid=None,
-        validated=None,
+        validated=True,
     ):
         """Gets a session_id and last_send_attempt (if available) for a
         client_secret/medium/(address|session_id) combo
@@ -1268,3 +1335,50 @@ class RegistrationStore(
             "delete_threepid_session",
             delete_threepid_session_txn,
         )
+
+    def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+        self._simple_update_one_txn(
+            txn=txn,
+            table="users",
+            keyvalues={"name": user_id},
+            updatevalues={"deactivated": 1 if deactivated else 0},
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_deactivated_status, (user_id,),
+        )
+
+    @defer.inlineCallbacks
+    def set_user_deactivated_status(self, user_id, deactivated):
+        """Set the `deactivated` property for the provided user to the provided value.
+
+        Args:
+            user_id (str): The ID of the user to set the status for.
+            deactivated (bool): The value to set for `deactivated`.
+        """
+
+        yield self.runInteraction(
+            "set_user_deactivated_status",
+            self.set_user_deactivated_status_txn,
+            user_id, deactivated,
+        )
+
+    @cachedInlineCallbacks()
+    def get_user_deactivated_status(self, user_id):
+        """Retrieve the value for the `deactivated` property for the provided user.
+
+        Args:
+            user_id (str): The ID of the user to retrieve the status for.
+
+        Returns:
+            defer.Deferred(bool): The requested value.
+        """
+
+        res = yield self._simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user_id},
+            retcol="deactivated",
+            desc="get_user_deactivated_status",
+        )
+
+        # Convert the integer into a boolean.
+        defer.returnValue(res == 1)
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('users_set_deactivated_flag', '{}');