diff --git a/synapse/__init__.py b/synapse/__init__.py
index d0e8d7c21b..0c01546789 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,6 +17,13 @@
""" This is a reference implementation of a Matrix home server.
"""
+import sys
+
+# Check that we're not running on an unsupported Python version.
+if sys.version_info < (3, 5):
+ print("Synapse requires Python 3.5 or above.")
+ sys.exit(1)
+
try:
from twisted.internet import protocol
from twisted.internet.protocol import Factory
@@ -27,4 +34,4 @@ try:
except ImportError:
pass
-__version__ = "0.99.5.2"
+__version__ = "1.0.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 0c6c93a87b..79e2808dc5 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -184,11 +184,22 @@ class Auth(object):
return event_auth.get_public_keys(invite_event)
@defer.inlineCallbacks
- def get_user_by_req(self, request, allow_guest=False, rights="access"):
+ def get_user_by_req(
+ self,
+ request,
+ allow_guest=False,
+ rights="access",
+ allow_expired=False,
+ ):
""" Get a registered user's ID.
Args:
request - An HTTP request with an access_token query parameter.
+ allow_expired - Whether to allow the request through even if the account is
+ expired. If true, Synapse will still require an access token to be
+ provided but won't check if the account it belongs to has expired. This
+ works thanks to /login delivering access tokens regardless of accounts'
+ expiration.
Returns:
defer.Deferred: resolves to a ``synapse.types.Requester`` object
Raises:
@@ -229,7 +240,7 @@ class Auth(object):
is_guest = user_info["is_guest"]
# Deny the request if the user account has expired.
- if self._account_validity.enabled:
+ if self._account_validity.enabled and not allow_expired:
user_id = user.to_string()
expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
if expiration_ts is not None and self.clock.time_msec() >= expiration_ts:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index df524a23dd..b27b12e73d 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -176,7 +176,6 @@ class SynapseHomeServer(HomeServer):
resources.update({
"/_matrix/client/api/v1": client_resource,
- "/_synapse/password_reset": client_resource,
"/_matrix/client/r0": client_resource,
"/_matrix/client/unstable": client_resource,
"/_matrix/client/v2_alpha": client_resource,
@@ -541,6 +540,7 @@ def run(hs):
stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
+ stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index ae04252906..86018dfcce 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -19,15 +19,12 @@ from __future__ import print_function
# This file can't be called email.py because if it is, we cannot:
import email.utils
-import logging
import os
import pkg_resources
from ._base import Config, ConfigError
-logger = logging.getLogger(__name__)
-
class EmailConfig(Config):
def read_config(self, config):
@@ -85,10 +82,12 @@ class EmailConfig(Config):
self.email_password_reset_behaviour = (
"remote" if email_trust_identity_server_for_password_resets else "local"
)
+ self.password_resets_were_disabled_due_to_email_config = False
if self.email_password_reset_behaviour == "local" and email_config == {}:
- logger.warn(
- "User password resets have been disabled due to lack of email config"
- )
+ # We cannot warn the user this has happened here
+ # Instead do so when a user attempts to reset their password
+ self.password_resets_were_disabled_due_to_email_config = True
+
self.email_password_reset_behaviour = "off"
# Get lifetime of a validation token in milliseconds
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 5c4fc8ff21..acadef4fd3 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -38,6 +38,7 @@ from .server import ServerConfig
from .server_notices_config import ServerNoticesConfig
from .spam_checker import SpamCheckerConfig
from .stats import StatsConfig
+from .third_party_event_rules import ThirdPartyRulesConfig
from .tls import TlsConfig
from .user_directory import UserDirectoryConfig
from .voip import VoipConfig
@@ -73,5 +74,6 @@ class HomeServerConfig(
StatsConfig,
ServerNoticesConfig,
RoomDirectoryConfig,
+ ThirdPartyRulesConfig,
):
pass
diff --git a/synapse/config/key.py b/synapse/config/key.py
index aba7092ccd..424875feae 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -41,6 +41,15 @@ validation or TLS certificate validation. This is likely to be very insecure. If
you are *sure* you want to do this, set 'accept_keys_insecurely' on the
keyserver configuration."""
+RELYING_ON_MATRIX_KEY_ERROR = """\
+Your server is configured to accept key server responses without TLS certificate
+validation, and which are only signed by the old (possibly compromised)
+matrix.org signing key 'ed25519:auto'. This likely isn't what you want to do,
+and you should enable 'federation_verify_certificates' in your configuration.
+
+If you are *sure* you want to do this, set 'accept_keys_insecurely' on the
+trusted_key_server configuration."""
+
logger = logging.getLogger(__name__)
@@ -340,10 +349,20 @@ def _parse_key_servers(key_servers, federation_verify_certificates):
result.verify_keys[key_id] = verify_key
if (
- not verify_keys
- and not server.get("accept_keys_insecurely")
- and not federation_verify_certificates
+ not federation_verify_certificates and
+ not server.get("accept_keys_insecurely")
):
- raise ConfigError(INSECURE_NOTARY_ERROR)
+ _assert_keyserver_has_verify_keys(result)
yield result
+
+
+def _assert_keyserver_has_verify_keys(trusted_key_server):
+ if not trusted_key_server.verify_keys:
+ raise ConfigError(INSECURE_NOTARY_ERROR)
+
+ # also check that they are not blindly checking the old matrix.org key
+ if trusted_key_server.server_name == "matrix.org" and any(
+ key_id == "ed25519:auto" for key_id in trusted_key_server.verify_keys
+ ):
+ raise ConfigError(RELYING_ON_MATRIX_KEY_ERROR)
diff --git a/synapse/config/third_party_event_rules.py b/synapse/config/third_party_event_rules.py
new file mode 100644
index 0000000000..a89dd5f98a
--- /dev/null
+++ b/synapse/config/third_party_event_rules.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.module_loader import load_module
+
+from ._base import Config
+
+
+class ThirdPartyRulesConfig(Config):
+ def read_config(self, config):
+ self.third_party_event_rules = None
+
+ provider = config.get("third_party_event_rules", None)
+ if provider is not None:
+ self.third_party_event_rules = load_module(provider)
+
+ def default_config(self, **kwargs):
+ return """\
+ # Server admins can define a Python module that implements extra rules for
+ # allowing or denying incoming events. In order to work, this module needs to
+ # override the methods defined in synapse/events/third_party_rules.py.
+ #
+ # This feature is designed to be used in closed federations only, where each
+ # participating server enforces the same rules.
+ #
+ #third_party_event_rules:
+ # module: "my_custom_project.SuperRulesSet"
+ # config:
+ # example_option: 'things'
+ """
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 59ea087e66..2bc5cc3807 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -15,10 +15,13 @@
import logging
+import idna
+from service_identity import VerificationError
+from service_identity.pyopenssl import verify_hostname, verify_ip_address
from zope.interface import implementer
from OpenSSL import SSL, crypto
-from twisted.internet._sslverify import ClientTLSOptions, _defaultCurveName
+from twisted.internet._sslverify import _defaultCurveName
from twisted.internet.abstract import isIPAddress, isIPv6Address
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
from twisted.internet.ssl import CertificateOptions, ContextFactory, platformTrust
@@ -56,79 +59,19 @@ class ServerContextFactory(ContextFactory):
return self._context
-def _idnaBytes(text):
- """
- Convert some text typed by a human into some ASCII bytes. This is a
- copy of twisted.internet._idna._idnaBytes. For documentation, see the
- twisted documentation.
- """
- try:
- import idna
- except ImportError:
- return text.encode("idna")
- else:
- return idna.encode(text)
-
-
-def _tolerateErrors(wrapped):
- """
- Wrap up an info_callback for pyOpenSSL so that if something goes wrong
- the error is immediately logged and the connection is dropped if possible.
- This is a copy of twisted.internet._sslverify._tolerateErrors. For
- documentation, see the twisted documentation.
- """
-
- def infoCallback(connection, where, ret):
- try:
- return wrapped(connection, where, ret)
- except: # noqa: E722, taken from the twisted implementation
- f = Failure()
- logger.exception("Error during info_callback")
- connection.get_app_data().failVerification(f)
-
- return infoCallback
+class ClientTLSOptionsFactory(object):
+ """Factory for Twisted SSLClientConnectionCreators that are used to make connections
+ to remote servers for federation.
+ Uses one of two OpenSSL context objects for all connections, depending on whether
+ we should do SSL certificate verification.
-@implementer(IOpenSSLClientConnectionCreator)
-class ClientTLSOptionsNoVerify(object):
- """
- Client creator for TLS without certificate identity verification. This is a
- copy of twisted.internet._sslverify.ClientTLSOptions with the identity
- verification left out. For documentation, see the twisted documentation.
+ get_options decides whether we should do SSL certificate verification and
+ constructs an SSLClientConnectionCreator factory accordingly.
"""
- def __init__(self, hostname, ctx):
- self._ctx = ctx
-
- if isIPAddress(hostname) or isIPv6Address(hostname):
- self._hostnameBytes = hostname.encode('ascii')
- self._sendSNI = False
- else:
- self._hostnameBytes = _idnaBytes(hostname)
- self._sendSNI = True
-
- ctx.set_info_callback(_tolerateErrors(self._identityVerifyingInfoCallback))
-
- def clientConnectionForTLS(self, tlsProtocol):
- context = self._ctx
- connection = SSL.Connection(context, None)
- connection.set_app_data(tlsProtocol)
- return connection
-
- def _identityVerifyingInfoCallback(self, connection, where, ret):
- # Literal IPv4 and IPv6 addresses are not permitted
- # as host names according to the RFCs
- if where & SSL.SSL_CB_HANDSHAKE_START and self._sendSNI:
- connection.set_tlsext_host_name(self._hostnameBytes)
-
-
-class ClientTLSOptionsFactory(object):
- """Factory for Twisted ClientTLSOptions that are used to make connections
- to remote servers for federation."""
-
def __init__(self, config):
self._config = config
- self._options_noverify = CertificateOptions()
# Check if we're using a custom list of a CA certificates
trust_root = config.federation_ca_trust_root
@@ -136,11 +79,13 @@ class ClientTLSOptionsFactory(object):
# Use CA root certs provided by OpenSSL
trust_root = platformTrust()
- self._options_verify = CertificateOptions(trustRoot=trust_root)
+ self._verify_ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
+ self._verify_ssl_context.set_info_callback(self._context_info_cb)
- def get_options(self, host):
- # Use _makeContext so that we get a fresh OpenSSL CTX each time.
+ self._no_verify_ssl_context = CertificateOptions().getContext()
+ self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
+ def get_options(self, host):
# Check if certificate verification has been enabled
should_verify = self._config.federation_verify_certificates
@@ -151,6 +96,93 @@ class ClientTLSOptionsFactory(object):
should_verify = False
break
- if should_verify:
- return ClientTLSOptions(host, self._options_verify._makeContext())
- return ClientTLSOptionsNoVerify(host, self._options_noverify._makeContext())
+ ssl_context = (
+ self._verify_ssl_context if should_verify else self._no_verify_ssl_context
+ )
+
+ return SSLClientConnectionCreator(host, ssl_context, should_verify)
+
+ @staticmethod
+ def _context_info_cb(ssl_connection, where, ret):
+ """The 'information callback' for our openssl context object."""
+ # we assume that the app_data on the connection object has been set to
+ # a TLSMemoryBIOProtocol object. (This is done by SSLClientConnectionCreator)
+ tls_protocol = ssl_connection.get_app_data()
+ try:
+ # ... we further assume that SSLClientConnectionCreator has set the
+ # '_synapse_tls_verifier' attribute to a ConnectionVerifier object.
+ tls_protocol._synapse_tls_verifier.verify_context_info_cb(
+ ssl_connection, where
+ )
+ except: # noqa: E722, taken from the twisted implementation
+ logger.exception("Error during info_callback")
+ f = Failure()
+ tls_protocol.failVerification(f)
+
+
+@implementer(IOpenSSLClientConnectionCreator)
+class SSLClientConnectionCreator(object):
+ """Creates openssl connection objects for client connections.
+
+ Replaces twisted.internet.ssl.ClientTLSOptions
+ """
+
+ def __init__(self, hostname, ctx, verify_certs):
+ self._ctx = ctx
+ self._verifier = ConnectionVerifier(hostname, verify_certs)
+
+ def clientConnectionForTLS(self, tls_protocol):
+ context = self._ctx
+ connection = SSL.Connection(context, None)
+
+ # as per twisted.internet.ssl.ClientTLSOptions, we set the application
+ # data to our TLSMemoryBIOProtocol...
+ connection.set_app_data(tls_protocol)
+
+ # ... and we also gut-wrench a '_synapse_tls_verifier' attribute into the
+ # tls_protocol so that the SSL context's info callback has something to
+ # call to do the cert verification.
+ setattr(tls_protocol, "_synapse_tls_verifier", self._verifier)
+ return connection
+
+
+class ConnectionVerifier(object):
+ """Set the SNI, and do cert verification
+
+ This is a thing which is attached to the TLSMemoryBIOProtocol, and is called by
+ the ssl context's info callback.
+ """
+
+ # This code is based on twisted.internet.ssl.ClientTLSOptions.
+
+ def __init__(self, hostname, verify_certs):
+ self._verify_certs = verify_certs
+
+ if isIPAddress(hostname) or isIPv6Address(hostname):
+ self._hostnameBytes = hostname.encode("ascii")
+ self._is_ip_address = True
+ else:
+ # twisted's ClientTLSOptions falls back to the stdlib impl here if
+ # idna is not installed, but points out that lacks support for
+ # IDNA2008 (http://bugs.python.org/issue17305).
+ #
+ # We can rely on having idna.
+ self._hostnameBytes = idna.encode(hostname)
+ self._is_ip_address = False
+
+ self._hostnameASCII = self._hostnameBytes.decode("ascii")
+
+ def verify_context_info_cb(self, ssl_connection, where):
+ if where & SSL.SSL_CB_HANDSHAKE_START and not self._is_ip_address:
+ ssl_connection.set_tlsext_host_name(self._hostnameBytes)
+
+ if where & SSL.SSL_CB_HANDSHAKE_DONE and self._verify_certs:
+ try:
+ if self._is_ip_address:
+ verify_ip_address(ssl_connection, self._hostnameASCII)
+ else:
+ verify_hostname(ssl_connection, self._hostnameASCII)
+ except VerificationError:
+ f = Failure()
+ tls_protocol = ssl_connection.get_app_data()
+ tls_protocol.failVerification(f)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 96964b0d50..6f603f1961 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -750,13 +750,6 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
verify_signed_json(response, perspective_name, perspective_keys[key_id])
verified = True
- if perspective_name == "matrix.org" and key_id == "ed25519:auto":
- logger.warning(
- "Trusting trusted_key_server responses signed by the "
- "compromised matrix.org signing key 'ed25519:auto'. "
- "This is a placebo."
- )
-
if not verified:
raise KeyLookupError(
"Response not signed with a known key: signed with: %r, known keys: %r"
diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py
new file mode 100644
index 0000000000..9f98d51523
--- /dev/null
+++ b/synapse/events/third_party_rules.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+
+class ThirdPartyEventRules(object):
+ """Allows server admins to provide a Python module implementing an extra set of rules
+ to apply when processing events.
+
+ This is designed to help admins of closed federations with enforcing custom
+ behaviours.
+ """
+
+ def __init__(self, hs):
+ self.third_party_rules = None
+
+ self.store = hs.get_datastore()
+
+ module = None
+ config = None
+ if hs.config.third_party_event_rules:
+ module, config = hs.config.third_party_event_rules
+
+ if module is not None:
+ self.third_party_rules = module(config=config)
+
+ @defer.inlineCallbacks
+ def check_event_allowed(self, event, context):
+ """Check if a provided event should be allowed in the given context.
+
+ Args:
+ event (synapse.events.EventBase): The event to be checked.
+ context (synapse.events.snapshot.EventContext): The context of the event.
+
+ Returns:
+ defer.Deferred(bool), True if the event should be allowed, False if not.
+ """
+ if self.third_party_rules is None:
+ defer.returnValue(True)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ # Retrieve the state events from the database.
+ state_events = {}
+ for key, event_id in prev_state_ids.items():
+ state_events[key] = yield self.store.get_event(event_id, allow_none=True)
+
+ ret = yield self.third_party_rules.check_event_allowed(event, state_events)
+ defer.returnValue(ret)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 564c57203d..22a2735405 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -189,11 +189,21 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
- device_message_edus, device_stream_id, dev_list_id = (
- # We have to keep 2 free slots for presence and rr_edus
- yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2)
+ # We have to keep 2 free slots for presence and rr_edus
+ limit = MAX_EDUS_PER_TRANSACTION - 2
+
+ device_update_edus, dev_list_id = (
+ yield self._get_device_update_edus(limit)
+ )
+
+ limit -= len(device_update_edus)
+
+ to_device_edus, device_stream_id = (
+ yield self._get_to_device_message_edus(limit)
)
+ pending_edus = device_update_edus + to_device_edus
+
# BEGIN CRITICAL SECTION
#
# In order to avoid a race condition, we need to make sure that
@@ -208,10 +218,6 @@ class PerDestinationQueue(object):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
- pending_edus = []
-
- # We can only include at most 100 EDUs per transactions
- # rr_edus and pending_presence take at most one slot each
pending_edus.extend(self._get_rr_edus(force_flush=False))
pending_presence = self._pending_presence
self._pending_presence = {}
@@ -232,7 +238,6 @@ class PerDestinationQueue(object):
)
)
- pending_edus.extend(device_message_edus)
pending_edus.extend(
self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
)
@@ -272,10 +277,13 @@ class PerDestinationQueue(object):
sent_edus_by_type.labels(edu.edu_type).inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
- if device_message_edus:
+ if to_device_edus:
yield self._store.delete_device_msgs_for_remote(
self._destination, device_stream_id
)
+
+ # also mark the device updates as sent
+ if device_update_edus:
logger.info(
"Marking as sent %r %r", self._destination, dev_list_id
)
@@ -347,7 +355,7 @@ class PerDestinationQueue(object):
return pending_edus
@defer.inlineCallbacks
- def _get_new_device_messages(self, limit):
+ def _get_device_update_edus(self, limit):
last_device_list = self._last_device_list_stream_id
# Retrieve list of new device updates to send to the destination
@@ -366,15 +374,19 @@ class PerDestinationQueue(object):
assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs"
+ defer.returnValue((edus, now_stream_id))
+
+ @defer.inlineCallbacks
+ def _get_to_device_message_edus(self, limit):
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
contents, stream_id = yield self._store.get_new_device_msgs_for_remote(
self._destination,
last_device_stream_id,
to_device_stream_id,
- limit - len(edus),
+ limit,
)
- edus.extend(
+ edus = [
Edu(
origin=self._server_name,
destination=self._destination,
@@ -382,6 +394,6 @@ class PerDestinationQueue(object):
content=content,
)
for content in contents
- )
+ ]
- defer.returnValue((edus, stream_id, now_stream_id))
+ defer.returnValue((edus, stream_id))
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index e5dda1975f..469ab8ac7b 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -132,9 +132,10 @@ class GroupAttestionRenewer(object):
self.is_mine_id = hs.is_mine_id
self.attestations = hs.get_groups_attestation_signing()
- self._renew_attestations_loop = self.clock.looping_call(
- self._start_renew_attestations, 30 * 60 * 1000,
- )
+ if not hs.config.worker_app:
+ self._renew_attestations_loop = self.clock.looping_call(
+ self._start_renew_attestations, 30 * 60 * 1000,
+ )
@defer.inlineCallbacks
def on_renew_attestation(self, group_id, user_id, content):
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 261446517d..5e0b92eb1c 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -110,6 +110,9 @@ class AccountValidityHandler(object):
# Stop right here if the user doesn't have at least one email address.
# In this case, they will have to ask their server admin to renew their
# account manually.
+ # We don't need to do a specific check to make sure the account isn't
+ # deactivated, as a deactivated account isn't supposed to have any
+ # email address attached to it.
if not addresses:
return
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 7f8ddc99c6..a0cf37a9f9 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -479,6 +479,7 @@ class AuthHandler(BaseHandler):
medium,
threepid_creds["client_secret"],
sid=threepid_creds["sid"],
+ validated=True,
)
threepid = {
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 6a91f7698e..7378b56c1d 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -42,6 +43,8 @@ class DeactivateAccountHandler(BaseHandler):
# it left off (if it has work left to do).
hs.get_reactor().callWhenRunning(self._start_user_parting)
+ self._account_validity_enabled = hs.config.account_validity.enabled
+
@defer.inlineCallbacks
def deactivate_account(self, user_id, erase_data, id_server=None):
"""Deactivate a user's account
@@ -114,6 +117,13 @@ class DeactivateAccountHandler(BaseHandler):
# parts users from rooms (if it isn't already running)
self._start_user_parting()
+ # Remove all information on the user from the account_validity table.
+ if self._account_validity_enabled:
+ yield self.store.delete_account_validity_for_user(user_id)
+
+ # Mark the user as deactivated.
+ yield self.store.set_user_deactivated_status(user_id, True)
+
defer.returnValue(identity_server_supports_unbinding)
def _start_user_parting(self):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac5ca79143..93e064cda3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -33,6 +34,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.api.errors import (
AuthError,
CodeMessageException,
+ Codes,
FederationDeniedError,
FederationError,
RequestSendFailed,
@@ -127,6 +129,8 @@ class FederationHandler(BaseHandler):
self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
+ self.third_party_event_rules = hs.get_third_party_event_rules()
+
@defer.inlineCallbacks
def on_receive_pdu(
self, origin, pdu, sent_to_us_directly=False,
@@ -1258,6 +1262,15 @@ class FederationHandler(BaseHandler):
logger.warn("Failed to create join %r because %s", event, e)
raise e
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ logger.info("Creation of join %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_join_request`
yield self.auth.check_from_context(
@@ -1300,6 +1313,15 @@ class FederationHandler(BaseHandler):
origin, event
)
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ logger.info("Sending of join %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
logger.debug(
"on_send_join_request: After _handle_new_event: %s, sigs: %s",
event.event_id,
@@ -1458,6 +1480,15 @@ class FederationHandler(BaseHandler):
builder=builder,
)
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ logger.warning("Creation of leave %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
try:
# The remote hasn't signed it yet, obviously. We'll do the full checks
# when we get the event back in `on_send_leave_request`
@@ -1484,10 +1515,19 @@ class FederationHandler(BaseHandler):
event.internal_metadata.outlier = False
- yield self._handle_new_event(
+ context = yield self._handle_new_event(
origin, event
)
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ logger.info("Sending of leave %s forbidden by third-party rules", event)
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
logger.debug(
"on_send_leave_request: After _handle_new_event: %s, sigs: %s",
event.event_id,
@@ -2550,6 +2590,18 @@ class FederationHandler(BaseHandler):
builder=builder
)
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ logger.info(
+ "Creation of threepid invite %s forbidden by third-party rules",
+ event,
+ )
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
event, context = yield self.add_display_name_to_third_party_invite(
room_version, event_dict, event, context
)
@@ -2598,6 +2650,18 @@ class FederationHandler(BaseHandler):
builder=builder,
)
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ logger.warning(
+ "Exchange of threepid invite %s forbidden by third-party rules",
+ event,
+ )
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
event, context = yield self.add_display_name_to_third_party_invite(
room_version, event_dict, event, context
)
@@ -2613,12 +2677,6 @@ class FederationHandler(BaseHandler):
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
- # XXX we send the invite here, but send_membership_event also sends it,
- # so we end up making two requests. I think this is redundant.
- returned_invite = yield self.send_invite(origin, event)
- # TODO: Make sure the signatures actually are correct.
- event.signatures.update(returned_invite.signatures)
-
member_handler = self.hs.get_room_member_handler()
yield member_handler.send_membership_event(None, event, context)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b02469ceb..11650dc80c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
-# Copyright 2014 - 2016 OpenMarket Ltd
-# Copyright 2017 - 2018 New Vector Ltd
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -248,6 +249,7 @@ class EventCreationHandler(object):
self.action_generator = hs.get_action_generator()
self.spam_checker = hs.get_spam_checker()
+ self.third_party_event_rules = hs.get_third_party_event_rules()
self._block_events_without_consent_error = (
self.config.block_events_without_consent_error
@@ -658,6 +660,14 @@ class EventCreationHandler(object):
else:
room_version = yield self.store.get_room_version(event.room_id)
+ event_allowed = yield self.third_party_event_rules.check_event_allowed(
+ event, context,
+ )
+ if not event_allowed:
+ raise SynapseError(
+ 403, "This event is not allowed in this context", Codes.FORBIDDEN,
+ )
+
try:
yield self.auth.check_from_context(room_version, event, context)
except AuthError as err:
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 77fe68818b..5c073fff07 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -17,7 +17,7 @@
import logging
from io import BytesIO
-from six import text_type
+from six import raise_from, text_type
from six.moves import urllib
import treq
@@ -542,10 +542,15 @@ class SimpleHttpClient(object):
length = yield make_deferred_yieldable(
_readBodyToFile(response, output_stream, max_size)
)
+ except SynapseError:
+ # This can happen e.g. because the body is too large.
+ raise
except Exception as e:
- logger.exception("Failed to download body")
- raise SynapseError(
- 502, ("Failed to download remote body: %s" % e), Codes.UNKNOWN
+ raise_from(
+ SynapseError(
+ 502, ("Failed to download remote body: %s" % e),
+ ),
+ e
)
defer.returnValue(
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..8aee14a8a8 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
import attr
from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
from twisted.internet import reactor
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy(object):
-
@staticmethod
def collect():
for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
try:
calls = self.caller()
except Exception:
- logger.exception(
- "Exception running callback for LaterGauge(%s)",
- self.name,
- )
+ logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
@@ -116,9 +112,7 @@ class InFlightGauge(object):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
- "_MetricsEntry",
- attrs={x: attr.ib(0) for x in sub_metrics},
- slots=True,
+ "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)
# Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
Note: may be called by a separate thread.
"""
- in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+ in_flight = GaugeMetricFamily(
+ self.name + "_total", self.desc, labels=self.labels
+ )
metrics_by_key = {}
@@ -179,7 +175,9 @@ class InFlightGauge(object):
yield in_flight
for name in self.sub_metrics:
- gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+ gauge = GaugeMetricFamily(
+ "_".join([self.name, name]), "", labels=self.labels
+ )
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
@@ -193,12 +191,74 @@ class InFlightGauge(object):
all_gauges[self.name] = self
+@attr.s(hash=True)
+class BucketCollector(object):
+ """
+ Like a Histogram, but allows buckets to be point-in-time instead of
+ incrementally added to.
+
+ Args:
+ name (str): Base name of metric to be exported to Prometheus.
+ data_collector (callable -> dict): A synchronous callable that
+ returns a dict mapping bucket to number of items in the
+ bucket. If these buckets are not the same as the buckets
+ given to this class, they will be remapped into them.
+ buckets (list[float]): List of floats/ints of the buckets to
+ give to Prometheus. +Inf is ignored, if given.
+
+ """
+
+ name = attr.ib()
+ data_collector = attr.ib()
+ buckets = attr.ib()
+
+ def collect(self):
+
+ # Fetch the data -- this must be synchronous!
+ data = self.data_collector()
+
+ buckets = {}
+
+ res = []
+ for x in data.keys():
+ for i, bound in enumerate(self.buckets):
+ if x <= bound:
+ buckets[bound] = buckets.get(bound, 0) + data[x]
+
+ for i in self.buckets:
+ res.append([str(i), buckets.get(i, 0)])
+
+ res.append(["+Inf", sum(data.values())])
+
+ metric = HistogramMetricFamily(
+ self.name,
+ "",
+ buckets=res,
+ sum_value=sum([x * y for x, y in data.items()]),
+ )
+ yield metric
+
+ def __attrs_post_init__(self):
+ self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+ if self.buckets != sorted(self.buckets):
+ raise ValueError("Buckets not sorted")
+
+ self.buckets = tuple(self.buckets)
+
+ if self.name in all_gauges.keys():
+ logger.warning("%s already registered, reregistering" % (self.name,))
+ REGISTRY.unregister(all_gauges.pop(self.name))
+
+ REGISTRY.register(self)
+ all_gauges[self.name] = self
+
+
#
# Detailed CPU metrics
#
-class CPUMetrics(object):
+class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
@@ -237,13 +297,28 @@ gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
- buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
- 5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+ buckets=[
+ 0.0025,
+ 0.005,
+ 0.01,
+ 0.025,
+ 0.05,
+ 0.10,
+ 0.25,
+ 0.50,
+ 1.00,
+ 2.50,
+ 5.00,
+ 7.50,
+ 15.00,
+ 30.00,
+ 45.00,
+ 60.00,
+ ],
)
class GCCounts(object):
-
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
@@ -279,9 +354,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter(
- "synapse_event_processing_loop_count",
- "Event processing loop iterations",
- ["name"],
+ "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
)
event_processing_loop_room_count = Counter(
@@ -311,7 +384,6 @@ last_ticked = time.time()
class ReactorLastSeenMetric(object):
-
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
@@ -325,7 +397,6 @@ REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
-
@functools.wraps(func)
def f(*args, **kwargs):
now = reactor.seconds()
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 4bc9eb7313..099f9545ab 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -117,7 +117,7 @@ class Mailer(object):
link = (
self.hs.config.public_baseurl +
- "_synapse/password_reset/email/submit_token"
+ "_matrix/client/unstable/password_reset/email/submit_token"
"?token=%s&client_secret=%s&sid=%s" %
(token, client_secret, sid)
)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c78f2cb15e..11ace2bfb1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -44,7 +44,10 @@ REQUIREMENTS = [
"canonicaljson>=1.1.3",
"signedjson>=1.0.0",
"pynacl>=1.2.1",
- "service_identity>=16.0.0",
+ "idna>=2.5",
+
+ # validating SSL certs for IP addresses requires service_identity 18.1.
+ "service_identity>=18.1.0",
# our logcontext handling relies on the ability to cancel inlineCallbacks
# (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7.
@@ -62,7 +65,7 @@ REQUIREMENTS = [
"sortedcontainers>=1.4.4",
"psutil>=2.0.0",
"pymacaroons>=0.13.0",
- "msgpack>=0.5.0",
+ "msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"six>=1.10",
# prometheus_client 0.4.0 changed the format of counter metrics
@@ -77,7 +80,7 @@ REQUIREMENTS = [
]
CONDITIONAL_REQUIREMENTS = {
- "email": ["Jinja2>=2.9", "bleach>=1.4.2"],
+ "email": ["Jinja2>=2.9", "bleach>=1.4.3"],
"matrix-synapse-ldap3": ["matrix-synapse-ldap3>=0.1"],
# we use execute_batch, which arrived in psycopg 2.7.
diff --git a/synapse/res/templates/password_reset_success.html b/synapse/res/templates/password_reset_success.html
index 7b6fa5e6f0..7324d66d1e 100644
--- a/synapse/res/templates/password_reset_success.html
+++ b/synapse/res/templates/password_reset_success.html
@@ -1,6 +1,6 @@
<html>
<head></head>
<body>
-<p>Your password was successfully reset. You may now close this window.</p>
+<p>Your email has now been validated, please return to your client to reset your password. You may now close this window.</p>
</body>
</html>
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index e4c63b69b9..ab75f6c2b2 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import re
from six.moves import http_client
@@ -68,7 +67,13 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
if self.config.email_password_reset_behaviour == "off":
- raise SynapseError(400, "Password resets have been disabled on this server")
+ if self.config.password_resets_were_disabled_due_to_email_config:
+ logger.warn(
+ "User password resets have been disabled due to lack of email config"
+ )
+ raise SynapseError(
+ 400, "Email-based password resets have been disabled on this server",
+ )
body = parse_json_object_from_request(request)
@@ -196,9 +201,6 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
@defer.inlineCallbacks
def on_POST(self, request):
- if not self.config.email_password_reset_behaviour == "off":
- raise SynapseError(400, "Password resets have been disabled on this server")
-
body = parse_json_object_from_request(request)
assert_params_in_dict(body, [
@@ -228,9 +230,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
class PasswordResetSubmitTokenServlet(RestServlet):
"""Handles 3PID validation token submission"""
- PATTERNS = [
- re.compile("^/_synapse/password_reset/(?P<medium>[^/]*)/submit_token/*$"),
- ]
+ PATTERNS = client_patterns(
+ "/password_reset/(?P<medium>[^/]*)/submit_token/*$",
+ releases=(),
+ unstable=True,
+ )
def __init__(self, hs):
"""
@@ -251,6 +255,14 @@ class PasswordResetSubmitTokenServlet(RestServlet):
400,
"This medium is currently not supported for password resets",
)
+ if self.config.email_password_reset_behaviour == "off":
+ if self.config.password_resets_were_disabled_due_to_email_config:
+ logger.warn(
+ "User password resets have been disabled due to lack of email config"
+ )
+ raise SynapseError(
+ 400, "Email-based password resets have been disabled on this server",
+ )
sid = parse_string(request, "sid")
client_secret = parse_string(request, "client_secret")
diff --git a/synapse/rest/client/v2_alpha/account_validity.py b/synapse/rest/client/v2_alpha/account_validity.py
index 55c4ed5660..63bdc33564 100644
--- a/synapse/rest/client/v2_alpha/account_validity.py
+++ b/synapse/rest/client/v2_alpha/account_validity.py
@@ -79,7 +79,7 @@ class AccountValiditySendMailServlet(RestServlet):
if not self.account_validity.renew_by_email_enabled:
raise AuthError(403, "Account renewal via email is disabled on this server.")
- requester = yield self.auth.get_user_by_req(request)
+ requester = yield self.auth.get_user_by_req(request, allow_expired=True)
user_id = requester.user.to_string()
yield self.account_activity_handler.send_renewal_email_to_user(user_id)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 8569677355..a4929dd5db 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -386,8 +386,10 @@ class MediaRepository(object):
raise SynapseError(502, "Failed to fetch remote media")
except SynapseError:
- logger.exception("Failed to fetch remote media %s/%s",
- server_name, media_id)
+ logger.warn(
+ "Failed to fetch remote media %s/%s",
+ server_name, media_id,
+ )
raise
except NotRetryingDestination:
logger.warn("Not retrying destination %r", server_name)
diff --git a/synapse/server.py b/synapse/server.py
index 9229a68a8d..a54e023cc9 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017-2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -35,6 +37,7 @@ from synapse.crypto import context_factory
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker
+from synapse.events.third_party_rules import ThirdPartyEventRules
from synapse.events.utils import EventClientSerializer
from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import (
@@ -178,6 +181,7 @@ class HomeServer(object):
'groups_attestation_renewer',
'secrets',
'spam_checker',
+ 'third_party_event_rules',
'room_member_handler',
'federation_registry',
'server_notices_manager',
@@ -483,6 +487,9 @@ class HomeServer(object):
def build_spam_checker(self):
return SpamChecker(self)
+ def build_third_party_event_rules(self):
+ return ThirdPartyEventRules(self)
+
def build_room_member_handler(self):
if self.config.worker_app:
return RoomMemberWorkerHandler(self)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 71316f7d09..0ca6f6121f 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -279,23 +279,37 @@ class DataStore(
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
+ return self.runInteraction("count_daily_users", self._count_users, yesterday,)
- def _count_users(txn):
- yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
-
- sql = """
- SELECT COALESCE(count(*), 0) FROM (
- SELECT user_id FROM user_ips
- WHERE last_seen > ?
- GROUP BY user_id
- ) u
- """
-
- txn.execute(sql, (yesterday,))
- count, = txn.fetchone()
- return count
+ def count_monthly_users(self):
+ """
+ Counts the number of users who used this homeserver in the last 30 days.
+ Note this method is intended for phonehome metrics only and is different
+ from the mau figure in synapse.storage.monthly_active_users which,
+ amongst other things, includes a 3 day grace period before a user counts.
+ """
+ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
+ return self.runInteraction(
+ "count_monthly_users",
+ self._count_users,
+ thirty_days_ago,
+ )
- return self.runInteraction("count_users", _count_users)
+ def _count_users(self, txn, time_from):
+ """
+ Returns number of users seen in the past time_from period
+ """
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT user_id FROM user_ips
+ WHERE last_seen > ?
+ GROUP BY user_id
+ ) u
+ """
+ txn.execute(sql, (time_from,))
+ count, = txn.fetchone()
+ return count
def count_r30_users(self):
"""
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ae891aa332..941c07fce5 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -299,12 +299,12 @@ class SQLBaseStore(object):
def select_users_with_no_expiration_date_txn(txn):
"""Retrieves the list of registered users with no expiration date from the
- database.
+ database, filtering out deactivated users.
"""
sql = (
"SELECT users.name FROM users"
" LEFT JOIN account_validity ON (users.name = account_validity.user_id)"
- " WHERE account_validity.user_id is NULL;"
+ " WHERE account_validity.user_id is NULL AND users.deactivated = 0;"
)
txn.execute(sql, [])
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 9b0a99cb49..4ea0deea4f 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -138,6 +138,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
if not has_changed or last_stream_id == current_stream_id:
return defer.succeed(([], current_stream_id))
+ if limit <= 0:
+ # This can happen if we run out of room for EDUs in the transaction.
+ return defer.succeed(([], last_stream_id))
+
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..f631fb1733 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,39 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ # Counter of number of extremities to count
+ self._current_forward_extremities_amount = c_counter()
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -568,17 +595,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +661,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 5300720dbb..e3655ad8d7 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -80,6 +80,14 @@ class KeyStore(SQLBaseStore):
for row in txn:
server_name, key_id, key_bytes, ts_valid_until_ms = row
+
+ if ts_valid_until_ms is None:
+ # Old keys may be stored with a ts_valid_until_ms of null,
+ # in which case we treat this as if it was set to `0`, i.e.
+ # it won't match key requests that define a minimum
+ # `ts_valid_until_ms`.
+ ts_valid_until_ms = 0
+
res = FetchKeyResult(
verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
valid_until_ts=ts_valid_until_ms,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 9b41cbd757..d36917e4d6 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import re
from six import iterkeys
@@ -31,6 +32,8 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
+logger = logging.getLogger(__name__)
+
class RegistrationWorkerStore(SQLBaseStore):
def __init__(self, db_conn, hs):
@@ -249,6 +252,20 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
+ def delete_account_validity_for_user(self, user_id):
+ """Deletes the entry for the given user in the account validity table, removing
+ their expiration date and renewal token.
+
+ Args:
+ user_id (str): ID of the user to remove from the account validity table.
+ """
+ yield self._simple_delete_one(
+ table="account_validity",
+ keyvalues={"user_id": user_id},
+ desc="delete_account_validity_for_user",
+ )
+
+ @defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
table="users",
@@ -598,12 +615,78 @@ class RegistrationStore(
"user_threepids_grandfather", self._bg_user_threepids_grandfather,
)
+ self.register_background_update_handler(
+ "users_set_deactivated_flag", self._backgroud_update_set_deactivated_flag,
+ )
+
# Create a background job for culling expired 3PID validity tokens
hs.get_clock().looping_call(
self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS,
)
@defer.inlineCallbacks
+ def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
+ """Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
+ for each of them.
+ """
+
+ last_user = progress.get("user_id", "")
+
+ def _backgroud_update_set_deactivated_flag_txn(txn):
+ txn.execute(
+ """
+ SELECT
+ users.name,
+ COUNT(access_tokens.token) AS count_tokens,
+ COUNT(user_threepids.address) AS count_threepids
+ FROM users
+ LEFT JOIN access_tokens ON (access_tokens.user_id = users.name)
+ LEFT JOIN user_threepids ON (user_threepids.user_id = users.name)
+ WHERE (users.password_hash IS NULL OR users.password_hash = '')
+ AND (users.appservice_id IS NULL OR users.appservice_id = '')
+ AND users.is_guest = 0
+ AND users.name > ?
+ GROUP BY users.name
+ ORDER BY users.name ASC
+ LIMIT ?;
+ """,
+ (last_user, batch_size),
+ )
+
+ rows = self.cursor_to_dict(txn)
+
+ if not rows:
+ return True
+
+ rows_processed_nb = 0
+
+ for user in rows:
+ if not user["count_tokens"] and not user["count_threepids"]:
+ self.set_user_deactivated_status_txn(txn, user["user_id"], True)
+ rows_processed_nb += 1
+
+ logger.info("Marked %d rows as deactivated", rows_processed_nb)
+
+ self._background_update_progress_txn(
+ txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
+ )
+
+ if batch_size > len(rows):
+ return True
+ else:
+ return False
+
+ end = yield self.runInteraction(
+ "users_set_deactivated_flag",
+ _backgroud_update_set_deactivated_flag_txn,
+ )
+
+ if end:
+ yield self._end_background_update("users_set_deactivated_flag")
+
+ defer.returnValue(batch_size)
+
+ @defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -998,7 +1081,7 @@ class RegistrationStore(
client_secret,
address=None,
sid=None,
- validated=None,
+ validated=True,
):
"""Gets a session_id and last_send_attempt (if available) for a
client_secret/medium/(address|session_id) combo
@@ -1268,3 +1351,50 @@ class RegistrationStore(
"delete_threepid_session",
delete_threepid_session_txn,
)
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,),
+ )
+
+ @defer.inlineCallbacks
+ def set_user_deactivated_status(self, user_id, deactivated):
+ """Set the `deactivated` property for the provided user to the provided value.
+
+ Args:
+ user_id (str): The ID of the user to set the status for.
+ deactivated (bool): The value to set for `deactivated`.
+ """
+
+ yield self.runInteraction(
+ "set_user_deactivated_status",
+ self.set_user_deactivated_status_txn,
+ user_id, deactivated,
+ )
+
+ @cachedInlineCallbacks()
+ def get_user_deactivated_status(self, user_id):
+ """Retrieve the value for the `deactivated` property for the provided user.
+
+ Args:
+ user_id (str): The ID of the user to retrieve the status for.
+
+ Returns:
+ defer.Deferred(bool): The requested value.
+ """
+
+ res = yield self._simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="deactivated",
+ desc="get_user_deactivated_status",
+ )
+
+ # Convert the integer into a boolean.
+ defer.returnValue(res == 1)
diff --git a/synapse/storage/schema/delta/55/users_alter_deactivated.sql b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
new file mode 100644
index 0000000000..dabdde489b
--- /dev/null
+++ b/synapse/storage/schema/delta/55/users_alter_deactivated.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD deactivated SMALLINT DEFAULT 0 NOT NULL;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('users_set_deactivated_flag', '{}');
|