diff options
73 files changed, 1134 insertions, 157 deletions
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 3d75853aa7..f9de78a460 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -59,9 +59,10 @@ To create a changelog entry, make a new file in the ``changelog.d`` file named in the format of ``PRnumber.type``. The type can be one of ``feature``, ``bugfix``, ``removal`` (also used for deprecations), or ``misc`` (for internal-only changes). The content of -the file is your changelog entry, which can contain RestructuredText -formatting. A note of contributors is welcomed in changelogs for -non-misc changes (the content of misc changes is not displayed). +the file is your changelog entry, which can contain Markdown +formatting. Adding credits to the changelog is encouraged, we value +your contributions and would like to have you shouted out in the +release notes! For example, a fix in PR #1234 would have its changelog entry in ``changelog.d/1234.bugfix``, and contain content like "The security levels of diff --git a/README.rst b/README.rst index 4c5971d043..d6f34ba9d1 100644 --- a/README.rst +++ b/README.rst @@ -167,11 +167,6 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate a synapse server in a single Docker image, at https://hub.docker.com/r/avhost/docker-matrix/tags/ -Also, Martin Giess has created an auto-deployment process with vagrant/ansible, -tested with VirtualBox/AWS/DigitalOcean - see -https://github.com/EMnify/matrix-synapse-auto-deploy -for details. - Configuring synapse ------------------- diff --git a/changelog.d/3378.misc b/changelog.d/3378.misc new file mode 100644 index 0000000000..8f88f88e69 --- /dev/null +++ b/changelog.d/3378.misc @@ -0,0 +1 @@ +Removed the link to the unmaintained matrix-synapse-auto-deploy project from the readme. diff --git a/changelog.d/3680.feature b/changelog.d/3680.feature new file mode 100644 index 0000000000..4edaaf76a8 --- /dev/null +++ b/changelog.d/3680.feature @@ -0,0 +1 @@ +Server notices for resource limit blocking diff --git a/changelog.d/3725.misc b/changelog.d/3725.misc new file mode 100644 index 0000000000..91ab9d7137 --- /dev/null +++ b/changelog.d/3725.misc @@ -0,0 +1 @@ +The synapse.storage module has been ported to Python 3. diff --git a/changelog.d/3730.misc b/changelog.d/3730.misc new file mode 100644 index 0000000000..b1ea84f732 --- /dev/null +++ b/changelog.d/3730.misc @@ -0,0 +1 @@ +The CONTRIBUTING guidelines have been updated to mention our use of Markdown and that .misc files have content. diff --git a/changelog.d/3734.misc b/changelog.d/3734.misc new file mode 100644 index 0000000000..4f6e4b3848 --- /dev/null +++ b/changelog.d/3734.misc @@ -0,0 +1 @@ +Reference the need for an HTTP replication port when using the federation_reader worker diff --git a/changelog.d/3746.misc b/changelog.d/3746.misc new file mode 100644 index 0000000000..fc00ee773a --- /dev/null +++ b/changelog.d/3746.misc @@ -0,0 +1 @@ +Fix MAU cache invalidation due to missing yield diff --git a/changelog.d/3747.bugfix b/changelog.d/3747.bugfix new file mode 100644 index 0000000000..c41e2a1213 --- /dev/null +++ b/changelog.d/3747.bugfix @@ -0,0 +1 @@ +Fix bug where we resent "limit exceeded" server notices repeatedly diff --git a/changelog.d/3749.feature b/changelog.d/3749.feature new file mode 100644 index 0000000000..9f8837b106 --- /dev/null +++ b/changelog.d/3749.feature @@ -0,0 +1 @@ +Add mau_trial_days config param, so that users only get counted as MAU after N days. diff --git a/changelog.d/3751.feature b/changelog.d/3751.feature new file mode 100644 index 0000000000..dc9742b15b --- /dev/null +++ b/changelog.d/3751.feature @@ -0,0 +1 @@ +Require twisted 17.1 or later (fixes [#3741](https://github.com/matrix-org/synapse/issues/3741)). diff --git a/changelog.d/3753.bugfix b/changelog.d/3753.bugfix new file mode 100644 index 0000000000..b4301267df --- /dev/null +++ b/changelog.d/3753.bugfix @@ -0,0 +1 @@ +Fix bug where we broke sync when using limit_usage_by_mau but hadn't configured server notices diff --git a/changelog.d/3754.bugfix b/changelog.d/3754.bugfix new file mode 100644 index 0000000000..6e3ec80194 --- /dev/null +++ b/changelog.d/3754.bugfix @@ -0,0 +1 @@ +Fix 'federation_domain_whitelist' such that an empty list correctly blocks all outbound federation traffic diff --git a/changelog.d/3755.bugfix b/changelog.d/3755.bugfix new file mode 100644 index 0000000000..6a1f83f0ce --- /dev/null +++ b/changelog.d/3755.bugfix @@ -0,0 +1 @@ +Fix tagging of server notice rooms diff --git a/changelog.d/3756.bugfix b/changelog.d/3756.bugfix new file mode 100644 index 0000000000..6a1f83f0ce --- /dev/null +++ b/changelog.d/3756.bugfix @@ -0,0 +1 @@ +Fix tagging of server notice rooms diff --git a/changelog.d/3758.bugfix b/changelog.d/3758.bugfix new file mode 100644 index 0000000000..862739bfe8 --- /dev/null +++ b/changelog.d/3758.bugfix @@ -0,0 +1 @@ +Fix 'admin_uri' config variable and error parameter to be 'admin_contact' to match the spec. diff --git a/changelog.d/3760.bugfix b/changelog.d/3760.bugfix new file mode 100644 index 0000000000..ce61fb8a2b --- /dev/null +++ b/changelog.d/3760.bugfix @@ -0,0 +1 @@ +Don't return non-LL-member state in incremental sync state blocks diff --git a/changelog.d/3764.misc b/changelog.d/3764.misc new file mode 100644 index 0000000000..f3614f1982 --- /dev/null +++ b/changelog.d/3764.misc @@ -0,0 +1 @@ +Make sure that we close db connections opened during init \ No newline at end of file diff --git a/changelog.d/3768.bugfix b/changelog.d/3768.bugfix new file mode 100644 index 0000000000..a039a7fa68 --- /dev/null +++ b/changelog.d/3768.bugfix @@ -0,0 +1 @@ +Fix bug in sending presence over federation diff --git a/changelog.d/3777.bugfix b/changelog.d/3777.bugfix new file mode 100644 index 0000000000..46efc543a7 --- /dev/null +++ b/changelog.d/3777.bugfix @@ -0,0 +1 @@ +Fix bug where preserved threepid user comes to sign up and server is mau blocked diff --git a/docs/workers.rst b/docs/workers.rst index 81146a211f..101e950020 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -74,7 +74,7 @@ replication endpoints that it's talking to on the main synapse process. ``worker_replication_port`` should point to the TCP replication listener port and ``worker_replication_http_port`` should point to the HTTP replication port. -Currently, only the ``event_creator`` worker requires specifying +Currently, the ``event_creator`` and ``federation_reader`` workers require specifying ``worker_replication_http_port``. For instance:: diff --git a/jenkins/prepare_synapse.sh b/jenkins/prepare_synapse.sh index a30179f2aa..d95ca846c4 100755 --- a/jenkins/prepare_synapse.sh +++ b/jenkins/prepare_synapse.sh @@ -31,5 +31,5 @@ $TOX_BIN/pip install 'setuptools>=18.5' $TOX_BIN/pip install 'pip>=10' { python synapse/python_dependencies.py - echo lxml psycopg2 + echo lxml } | xargs $TOX_BIN/pip install diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 6502a6be7b..34382e4e3c 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -26,6 +26,7 @@ import synapse.types from synapse import event_auth from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.api.errors import AuthError, Codes, ResourceLimitError +from synapse.config.server import is_threepid_reserved from synapse.types import UserID from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache from synapse.util.caches.lrucache import LruCache @@ -775,34 +776,56 @@ class Auth(object): ) @defer.inlineCallbacks - def check_auth_blocking(self, user_id=None): + def check_auth_blocking(self, user_id=None, threepid=None): """Checks if the user should be rejected for some external reason, such as monthly active user limiting or global disable flag Args: user_id(str|None): If present, checks for presence against existing MAU cohort + + threepid(dict|None): If present, checks for presence against configured + reserved threepid. Used in cases where the user is trying register + with a MAU blocked server, normally they would be rejected but their + threepid is on the reserved list. user_id and + threepid should never be set at the same time. """ + + # Never fail an auth check for the server notices users + # This can be a problem where event creation is prohibited due to blocking + if user_id == self.hs.config.server_notices_mxid: + return + if self.hs.config.hs_disabled: raise ResourceLimitError( 403, self.hs.config.hs_disabled_message, - errcode=Codes.RESOURCE_LIMIT_EXCEED, - admin_uri=self.hs.config.admin_uri, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, + admin_contact=self.hs.config.admin_contact, limit_type=self.hs.config.hs_disabled_limit_type ) if self.hs.config.limit_usage_by_mau is True: - # If the user is already part of the MAU cohort + assert not (user_id and threepid) + + # If the user is already part of the MAU cohort or a trial user if user_id: timestamp = yield self.store.user_last_seen_monthly_active(user_id) if timestamp: return + + is_trial = yield self.store.is_trial_user(user_id) + if is_trial: + return + elif threepid: + # If the user does not exist yet, but is signing up with a + # reserved threepid then pass auth check + if is_threepid_reserved(self.hs.config, threepid): + return # Else if there is no room in the MAU bucket, bail current_mau = yield self.store.get_monthly_active_count() if current_mau >= self.hs.config.max_mau_value: raise ResourceLimitError( 403, "Monthly Active User Limit Exceeded", - - admin_uri=self.hs.config.admin_uri, - errcode=Codes.RESOURCE_LIMIT_EXCEED, + admin_contact=self.hs.config.admin_contact, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, limit_type="monthly_active_user" ) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 912bf024bf..c2630c4c64 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -78,6 +78,7 @@ class EventTypes(object): Name = "m.room.name" ServerACL = "m.room.server_acl" + Pinned = "m.room.pinned_events" class RejectedReason(object): @@ -108,3 +109,6 @@ DEFAULT_ROOM_VERSION = RoomVersions.V1 # vdh-test-version is a placeholder to get room versioning support working and tested # until we have a working v2. KNOWN_ROOM_VERSIONS = {RoomVersions.V1, RoomVersions.VDH_TEST} + +ServerNoticeMsgType = "m.server_notice" +ServerNoticeLimitReached = "m.server_notice.usage_limit_reached" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index e26001ab12..2e7f98404d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -56,7 +56,7 @@ class Codes(object): SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN" CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM" - RESOURCE_LIMIT_EXCEED = "M_RESOURCE_LIMIT_EXCEED" + RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED" UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION" INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION" @@ -238,11 +238,11 @@ class ResourceLimitError(SynapseError): """ def __init__( self, code, msg, - errcode=Codes.RESOURCE_LIMIT_EXCEED, - admin_uri=None, + errcode=Codes.RESOURCE_LIMIT_EXCEEDED, + admin_contact=None, limit_type=None, ): - self.admin_uri = admin_uri + self.admin_contact = admin_contact self.limit_type = limit_type super(ResourceLimitError, self).__init__(code, msg, errcode=errcode) @@ -250,7 +250,7 @@ class ResourceLimitError(SynapseError): return cs_error( self.msg, self.errcode, - admin_uri=self.admin_uri, + admin_contact=self.admin_contact, limit_type=self.limit_type ) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 3348a8ec6d..86b5067400 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -51,10 +51,7 @@ class AppserviceSlaveStore( class AppserviceServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = AppserviceSlaveStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = AppserviceSlaveStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index ab79a45646..ce2b113dbb 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -74,10 +74,7 @@ class ClientReaderSlavedStore( class ClientReaderServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = ClientReaderSlavedStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index a34c89fa99..f98e456ea0 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -90,10 +90,7 @@ class EventCreatorSlavedStore( class EventCreatorServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = EventCreatorSlavedStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7d8105778d..60f5973505 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -72,10 +72,7 @@ class FederationReaderSlavedStore( class FederationReaderServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = FederationReaderSlavedStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index d59007099b..60dd09aac3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -78,10 +78,7 @@ class FederationSenderSlaveStore( class FederationSenderServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = FederationSenderSlaveStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 8d484c1cd4..8c0b9c67b0 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -148,10 +148,7 @@ class FrontendProxySlavedStore( class FrontendProxyServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = FrontendProxySlavedStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 005921dcf7..3eb5b663de 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -62,7 +62,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer -from synapse.storage import are_all_users_on_domain +from synapse.storage import DataStore, are_all_users_on_domain from synapse.storage.engines import IncorrectDatabaseSetup, create_engine from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.util.caches import CACHE_SIZE_FACTOR @@ -111,6 +111,8 @@ def build_resource_for_web_client(hs): class SynapseHomeServer(HomeServer): + DATASTORE_CLASS = DataStore + def _listener_http(self, config, listener_config): port = listener_config["port"] bind_addresses = listener_config["bind_addresses"] @@ -356,13 +358,13 @@ def setup(config_options): logger.info("Preparing database: %s...", config.database_config['name']) try: - db_conn = hs.get_db_conn(run_new_connection=False) - prepare_database(db_conn, database_engine, config=config) - database_engine.on_new_connection(db_conn) + with hs.get_db_conn(run_new_connection=False) as db_conn: + prepare_database(db_conn, database_engine, config=config) + database_engine.on_new_connection(db_conn) - hs.run_startup_checks(db_conn, database_engine) + hs.run_startup_checks(db_conn, database_engine) - db_conn.commit() + db_conn.commit() except UpgradeDatabaseException: sys.stderr.write( "\nFailed to upgrade database.\n" diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index fd1f6cbf7e..e3dbb3b4e6 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -60,10 +60,7 @@ class MediaRepositorySlavedStore( class MediaRepositoryServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = MediaRepositorySlavedStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index a4fc7e91fa..244c604de9 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -78,10 +78,7 @@ class PusherSlaveStore( class PusherServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = PusherSlaveStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = PusherSlaveStore def remove_pusher(self, app_id, push_key, user_id): self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 27e1998660..6662340797 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -249,10 +249,7 @@ class SynchrotronApplicationService(object): class SynchrotronServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = SynchrotronSlavedStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 1388a42b59..96ffcaf073 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -94,10 +94,7 @@ class UserDirectorySlaveStore( class UserDirectoryServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = UserDirectorySlaveStore def _listen_http(self, listener_config): port = listener_config["port"] diff --git a/synapse/config/server.py b/synapse/config/server.py index 68a612e594..c1c7c0105e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -77,10 +77,15 @@ class ServerConfig(Config): self.max_mau_value = config.get( "max_mau_value", 0, ) + self.mau_limits_reserved_threepids = config.get( "mau_limit_reserved_threepids", [] ) + self.mau_trial_days = config.get( + "mau_trial_days", 0, + ) + # Options to disable HS self.hs_disabled = config.get("hs_disabled", False) self.hs_disabled_message = config.get("hs_disabled_message", "") @@ -88,7 +93,7 @@ class ServerConfig(Config): # Admin uri to direct users at should their instance become blocked # due to resource constraints - self.admin_uri = config.get("admin_uri", None) + self.admin_contact = config.get("admin_contact", None) # FIXME: federation_domain_whitelist needs sytests self.federation_domain_whitelist = None @@ -352,7 +357,7 @@ class ServerConfig(Config): # Homeserver blocking # # How to reach the server admin, used in ResourceLimitError - # admin_uri: 'mailto:admin@server.com' + # admin_contact: 'mailto:admin@server.com' # # Global block config # @@ -365,6 +370,7 @@ class ServerConfig(Config): # Enables monthly active user checking # limit_usage_by_mau: False # max_mau_value: 50 + # mau_trial_days: 2 # # Sometimes the server admin will want to ensure certain accounts are # never blocked by mau checking. These accounts are specified here. @@ -398,6 +404,23 @@ class ServerConfig(Config): " service on the given port.") +def is_threepid_reserved(config, threepid): + """Check the threepid against the reserved threepid config + Args: + config(ServerConfig) - to access server config attributes + threepid(dict) - The threepid to test for + + Returns: + boolean Is the threepid undertest reserved_user + """ + + for tp in config.mau_limits_reserved_threepids: + if (threepid['medium'] == tp['medium'] + and threepid['address'] == tp['address']): + return True + return False + + def read_gc_thresholds(thresholds): """Reads the three integer thresholds for garbage collection. Ensures that the thresholds are integers if thresholds are supplied. diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 0bb468385d..6f5995735a 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -32,7 +32,7 @@ Events are replicated via a separate events stream. import logging from collections import namedtuple -from six import iteritems, itervalues +from six import iteritems from sortedcontainers import SortedDict @@ -117,7 +117,7 @@ class FederationRemoteSendQueue(object): user_ids = set( user_id - for uids in itervalues(self.presence_changed) + for uids in self.presence_changed.values() for user_id in uids ) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index f03ee1476b..1e53f2c635 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -125,6 +125,7 @@ class RegistrationHandler(BaseHandler): guest_access_token=None, make_guest=False, admin=False, + threepid=None, ): """Registers a new client on the server. @@ -145,7 +146,7 @@ class RegistrationHandler(BaseHandler): RegistrationError if there was a problem registering. """ - yield self.auth.check_auth_blocking() + yield self.auth.check_auth_blocking(threepid=threepid) password_hash = None if password: password_hash = yield self.auth_handler().hash(password) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 648debc8aa..ef20c2296c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -745,9 +745,16 @@ class SyncHandler(object): state_ids = {} if lazy_load_members: if types: + # We're returning an incremental sync, with no "gap" since + # the previous sync, so normally there would be no state to return + # But we're lazy-loading, so the client might need some more + # member events to understand the events in this timeline. + # So we fish out all the member events corresponding to the + # timeline here, and then dedupe any redundant ones below. + state_ids = yield self.store.get_state_ids_for_event( batch.events[0].event_id, types=types, - filtered_types=filtered_types, + filtered_types=None, # we only want members! ) if lazy_load_members and not include_redundant_members: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 44b61e70a4..b34bb8e31a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -133,7 +133,7 @@ class MatrixFederationHttpClient(object): failures, connection failures, SSL failures.) """ if ( - self.hs.config.federation_domain_whitelist and + self.hs.config.federation_domain_whitelist is not None and destination not in self.hs.config.federation_domain_whitelist ): raise FederationDeniedError(destination) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 987eec3ef2..942d7c721f 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -39,7 +39,7 @@ REQUIREMENTS = { "signedjson>=1.0.0": ["signedjson>=1.0.0"], "pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], - "Twisted>=16.0.0": ["twisted>=16.0.0"], + "Twisted>=17.1.0": ["twisted>=17.1.0"], # We use crypto.get_elliptic_curve which is only supported in >=0.15 "pyopenssl>=0.15": ["OpenSSL>=0.15"], @@ -78,6 +78,9 @@ CONDITIONAL_REQUIREMENTS = { "affinity": { "affinity": ["affinity"], }, + "postgres": { + "psycopg2>=2.6": ["psycopg2"] + } } diff --git a/synapse/rest/client/v1_only/register.py b/synapse/rest/client/v1_only/register.py index 5e99cffbcb..dadb376b02 100644 --- a/synapse/rest/client/v1_only/register.py +++ b/synapse/rest/client/v1_only/register.py @@ -23,6 +23,7 @@ from twisted.internet import defer import synapse.util.stringutils as stringutils from synapse.api.constants import LoginType from synapse.api.errors import Codes, SynapseError +from synapse.config.server import is_threepid_reserved from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request from synapse.rest.client.v1.base import ClientV1RestServlet from synapse.types import create_requester @@ -281,12 +282,20 @@ class RegisterRestServlet(ClientV1RestServlet): register_json["user"].encode("utf-8") if "user" in register_json else None ) + threepid = None + if session.get(LoginType.EMAIL_IDENTITY): + threepid = session["threepidCreds"] handler = self.handlers.registration_handler (user_id, token) = yield handler.register( localpart=desired_user_id, - password=password + password=password, + threepid=threepid, ) + # Necessary due to auth checks prior to the threepid being + # written to the db + if is_threepid_reserved(self.hs.config, threepid): + yield self.store.upsert_monthly_active_user(user_id) if session[LoginType.EMAIL_IDENTITY]: logger.debug("Binding emails %s to %s" % ( diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 2f64155d13..2fb4d43ccb 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -26,6 +26,7 @@ import synapse import synapse.types from synapse.api.constants import LoginType from synapse.api.errors import Codes, SynapseError, UnrecognizedRequestError +from synapse.config.server import is_threepid_reserved from synapse.http.servlet import ( RestServlet, assert_params_in_dict, @@ -395,12 +396,21 @@ class RegisterRestServlet(RestServlet): if desired_username is not None: desired_username = desired_username.lower() + threepid = None + if auth_result: + threepid = auth_result.get(LoginType.EMAIL_IDENTITY) + (registered_user_id, _) = yield self.registration_handler.register( localpart=desired_username, password=new_password, guest_access_token=guest_access_token, generate_token=False, + threepid=threepid, ) + # Necessary due to auth checks prior to the threepid being + # written to the db + if is_threepid_reserved(self.hs.config, threepid): + yield self.store.upsert_monthly_active_user(registered_user_id) # remember that we've now registered that user account, and with # what user ID (since the user may not have specified) diff --git a/synapse/server.py b/synapse/server.py index a6fbc6ec0c..938a05f9dc 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -19,6 +19,7 @@ # partial one for unit test mocking. # Imports required for the default HomeServer() implementation +import abc import logging from twisted.enterprise import adbapi @@ -81,7 +82,6 @@ from synapse.server_notices.server_notices_manager import ServerNoticesManager from synapse.server_notices.server_notices_sender import ServerNoticesSender from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender from synapse.state import StateHandler, StateResolutionHandler -from synapse.storage import DataStore from synapse.streams.events import EventSources from synapse.util import Clock from synapse.util.distributor import Distributor @@ -111,6 +111,8 @@ class HomeServer(object): config (synapse.config.homeserver.HomeserverConfig): """ + __metaclass__ = abc.ABCMeta + DEPENDENCIES = [ 'http_client', 'db_pool', @@ -172,6 +174,11 @@ class HomeServer(object): 'room_context_handler', ] + # This is overridden in derived application classes + # (such as synapse.app.homeserver.SynapseHomeServer) and gives the class to be + # instantiated during setup() for future return by get_datastore() + DATASTORE_CLASS = abc.abstractproperty() + def __init__(self, hostname, reactor=None, **kwargs): """ Args: @@ -188,13 +195,16 @@ class HomeServer(object): self.distributor = Distributor() self.ratelimiter = Ratelimiter() + self.datastore = None + # Other kwargs are explicit dependencies for depname in kwargs: setattr(self, depname, kwargs[depname]) def setup(self): logger.info("Setting up.") - self.datastore = DataStore(self.get_db_conn(), self) + with self.get_db_conn() as conn: + self.datastore = self.DATASTORE_CLASS(conn, self) logger.info("Finished setting up.") def get_reactor(self): diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py new file mode 100644 index 0000000000..af15cba0ee --- /dev/null +++ b/synapse/server_notices/resource_limits_server_notices.py @@ -0,0 +1,203 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.constants import ( + EventTypes, + ServerNoticeLimitReached, + ServerNoticeMsgType, +) +from synapse.api.errors import AuthError, ResourceLimitError, SynapseError +from synapse.server_notices.server_notices_manager import SERVER_NOTICE_ROOM_TAG + +logger = logging.getLogger(__name__) + + +class ResourceLimitsServerNotices(object): + """ Keeps track of whether the server has reached it's resource limit and + ensures that the client is kept up to date. + """ + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ + self._server_notices_manager = hs.get_server_notices_manager() + self._store = hs.get_datastore() + self._auth = hs.get_auth() + self._config = hs.config + self._resouce_limited = False + self._message_handler = hs.get_message_handler() + self._state = hs.get_state_handler() + + self._notifier = hs.get_notifier() + + @defer.inlineCallbacks + def maybe_send_server_notice_to_user(self, user_id): + """Check if we need to send a notice to this user, this will be true in + two cases. + 1. The server has reached its limit does not reflect this + 2. The room state indicates that the server has reached its limit when + actually the server is fine + + Args: + user_id (str): user to check + + Returns: + Deferred + """ + if self._config.hs_disabled is True: + return + + if self._config.limit_usage_by_mau is False: + return + + if not self._server_notices_manager.is_enabled(): + # Don't try and send server notices unles they've been enabled + return + + timestamp = yield self._store.user_last_seen_monthly_active(user_id) + if timestamp is None: + # This user will be blocked from receiving the notice anyway. + # In practice, not sure we can ever get here + return + + # Determine current state of room + + room_id = yield self._server_notices_manager.get_notice_room_for_user(user_id) + + if not room_id: + logger.warn("Failed to get server notices room") + return + + yield self._check_and_set_tags(user_id, room_id) + currently_blocked, ref_events = yield self._is_room_currently_blocked(room_id) + + try: + # Normally should always pass in user_id if you have it, but in + # this case are checking what would happen to other users if they + # were to arrive. + try: + yield self._auth.check_auth_blocking() + is_auth_blocking = False + except ResourceLimitError as e: + is_auth_blocking = True + event_content = e.msg + event_limit_type = e.limit_type + + if currently_blocked and not is_auth_blocking: + # Room is notifying of a block, when it ought not to be. + # Remove block notification + content = { + "pinned": ref_events + } + yield self._server_notices_manager.send_notice( + user_id, content, EventTypes.Pinned, '', + ) + + elif not currently_blocked and is_auth_blocking: + # Room is not notifying of a block, when it ought to be. + # Add block notification + content = { + 'body': event_content, + 'msgtype': ServerNoticeMsgType, + 'server_notice_type': ServerNoticeLimitReached, + 'admin_contact': self._config.admin_contact, + 'limit_type': event_limit_type + } + event = yield self._server_notices_manager.send_notice( + user_id, content, EventTypes.Message, + ) + + content = { + "pinned": [ + event.event_id, + ] + } + yield self._server_notices_manager.send_notice( + user_id, content, EventTypes.Pinned, '', + ) + + except SynapseError as e: + logger.error("Error sending resource limits server notice: %s", e) + + @defer.inlineCallbacks + def _check_and_set_tags(self, user_id, room_id): + """ + Since server notices rooms were originally not with tags, + important to check that tags have been set correctly + Args: + user_id(str): the user in question + room_id(str): the server notices room for that user + """ + tags = yield self._store.get_tags_for_room(user_id, room_id) + need_to_set_tag = True + if tags: + if SERVER_NOTICE_ROOM_TAG in tags: + # tag already present, nothing to do here + need_to_set_tag = False + if need_to_set_tag: + max_id = yield self._store.add_tag_to_room( + user_id, room_id, SERVER_NOTICE_ROOM_TAG, {} + ) + self._notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + + @defer.inlineCallbacks + def _is_room_currently_blocked(self, room_id): + """ + Determines if the room is currently blocked + + Args: + room_id(str): The room id of the server notices room + + Returns: + + bool: Is the room currently blocked + list: The list of pinned events that are unrelated to limit blocking + This list can be used as a convenience in the case where the block + is to be lifted and the remaining pinned event references need to be + preserved + """ + currently_blocked = False + pinned_state_event = None + try: + pinned_state_event = yield self._state.get_current_state( + room_id, event_type=EventTypes.Pinned + ) + except AuthError: + # The user has yet to join the server notices room + pass + + referenced_events = [] + if pinned_state_event is not None: + referenced_events = list(pinned_state_event.content.get('pinned', [])) + + events = yield self._store.get_events(referenced_events) + for event_id, event in iteritems(events): + if event.type != EventTypes.Message: + continue + if event.content.get("msgtype") == ServerNoticeMsgType: + currently_blocked = True + # remove event in case we need to disable blocking later on. + if event_id in referenced_events: + referenced_events.remove(event.event_id) + + defer.returnValue((currently_blocked, referenced_events)) diff --git a/synapse/server_notices/server_notices_manager.py b/synapse/server_notices/server_notices_manager.py index a26deace53..c5cc6d728e 100644 --- a/synapse/server_notices/server_notices_manager.py +++ b/synapse/server_notices/server_notices_manager.py @@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks logger = logging.getLogger(__name__) +SERVER_NOTICE_ROOM_TAG = "m.server_notice" + class ServerNoticesManager(object): def __init__(self, hs): @@ -37,6 +39,8 @@ class ServerNoticesManager(object): self._event_creation_handler = hs.get_event_creation_handler() self._is_mine_id = hs.is_mine_id + self._notifier = hs.get_notifier() + def is_enabled(self): """Checks if server notices are enabled on this server. @@ -46,7 +50,10 @@ class ServerNoticesManager(object): return self._config.server_notices_mxid is not None @defer.inlineCallbacks - def send_notice(self, user_id, event_content): + def send_notice( + self, user_id, event_content, + type=EventTypes.Message, state_key=None + ): """Send a notice to the given user Creates the server notices room, if none exists. @@ -54,9 +61,11 @@ class ServerNoticesManager(object): Args: user_id (str): mxid of user to send event to. event_content (dict): content of event to send + type(EventTypes): type of event + is_state_event(bool): Is the event a state event Returns: - Deferred[None] + Deferred[FrozenEvent] """ room_id = yield self.get_notice_room_for_user(user_id) @@ -65,15 +74,20 @@ class ServerNoticesManager(object): logger.info("Sending server notice to %s", user_id) - yield self._event_creation_handler.create_and_send_nonmember_event( - requester, { - "type": EventTypes.Message, - "room_id": room_id, - "sender": system_mxid, - "content": event_content, - }, - ratelimit=False, + event_dict = { + "type": type, + "room_id": room_id, + "sender": system_mxid, + "content": event_content, + } + + if state_key is not None: + event_dict['state_key'] = state_key + + res = yield self._event_creation_handler.create_and_send_nonmember_event( + requester, event_dict, ratelimit=False, ) + defer.returnValue(res) @cachedInlineCallbacks() def get_notice_room_for_user(self, user_id): @@ -142,5 +156,12 @@ class ServerNoticesManager(object): ) room_id = info['room_id'] + max_id = yield self._store.add_tag_to_room( + user_id, room_id, SERVER_NOTICE_ROOM_TAG, {}, + ) + self._notifier.on_new_event( + "account_data_key", max_id, users=[user_id] + ) + logger.info("Created server notices room %s for %s", room_id, user_id) defer.returnValue(room_id) diff --git a/synapse/server_notices/server_notices_sender.py b/synapse/server_notices/server_notices_sender.py index 5d23965f34..6121b2f267 100644 --- a/synapse/server_notices/server_notices_sender.py +++ b/synapse/server_notices/server_notices_sender.py @@ -12,7 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + from synapse.server_notices.consent_server_notices import ConsentServerNotices +from synapse.server_notices.resource_limits_server_notices import ( + ResourceLimitsServerNotices, +) class ServerNoticesSender(object): @@ -25,34 +30,34 @@ class ServerNoticesSender(object): Args: hs (synapse.server.HomeServer): """ - # todo: it would be nice to make this more dynamic - self._consent_server_notices = ConsentServerNotices(hs) + self._server_notices = ( + ConsentServerNotices(hs), + ResourceLimitsServerNotices(hs) + ) + @defer.inlineCallbacks def on_user_syncing(self, user_id): """Called when the user performs a sync operation. Args: user_id (str): mxid of user who synced - - Returns: - Deferred """ - return self._consent_server_notices.maybe_send_server_notice_to_user( - user_id, - ) + for sn in self._server_notices: + yield sn.maybe_send_server_notice_to_user( + user_id, + ) + @defer.inlineCallbacks def on_user_ip(self, user_id): """Called on the master when a worker process saw a client request. Args: user_id (str): mxid - - Returns: - Deferred """ # The synchrotrons use a stubbed version of ServerNoticesSender, so # we check for notices to send to the user in on_user_ip as well as # in on_user_syncing - return self._consent_server_notices.maybe_send_server_notice_to_user( - user_id, - ) + for sn in self._server_notices: + yield sn.maybe_send_server_notice_to_user( + user_id, + ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 08dffd774f..be61147b9b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,9 +17,10 @@ import sys import threading import time -from six import iteritems, iterkeys, itervalues +from six import PY2, iteritems, iterkeys, itervalues from six.moves import intern, range +from canonicaljson import json from prometheus_client import Histogram from twisted.internet import defer @@ -1216,3 +1217,32 @@ class _RollbackButIsFineException(Exception): something went wrong. """ pass + + +def db_to_json(db_content): + """ + Take some data from a database row and return a JSON-decoded object. + + Args: + db_content (memoryview|buffer|bytes|bytearray|unicode) + """ + # psycopg2 on Python 3 returns memoryview objects, which we need to + # cast to bytes to decode + if isinstance(db_content, memoryview): + db_content = db_content.tobytes() + + # psycopg2 on Python 2 returns buffer objects, which we need to cast to + # bytes to decode + if PY2 and isinstance(db_content, buffer): + db_content = bytes(db_content) + + # Decode it to a Unicode string before feeding it to json.loads, so we + # consistenty get a Unicode-containing object out. + if isinstance(db_content, (bytes, bytearray)): + db_content = db_content.decode('utf8') + + try: + return json.loads(db_content) + except Exception: + logging.warning("Tried to decode '%r' as JSON and failed", db_content) + raise diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 73646da025..e06b0bc56d 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -169,7 +169,7 @@ class DeviceInboxStore(BackgroundUpdateStore): local_by_user_then_device = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} - devices = messages_by_device.keys() + devices = list(messages_by_device.keys()) if len(devices) == 1 and devices[0] == "*": # Handle wildcard device_ids. sql = ( diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index c0943ecf91..d10ff9e4b9 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -24,7 +24,7 @@ from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList -from ._base import Cache, SQLBaseStore +from ._base import Cache, SQLBaseStore, db_to_json logger = logging.getLogger(__name__) @@ -411,7 +411,7 @@ class DeviceStore(SQLBaseStore): if device is not None: key_json = device.get("key_json", None) if key_json: - result["keys"] = json.loads(key_json) + result["keys"] = db_to_json(key_json) device_display_name = device.get("device_display_name", None) if device_display_name: result["device_display_name"] = device_display_name @@ -466,7 +466,7 @@ class DeviceStore(SQLBaseStore): retcol="content", desc="_get_cached_user_device", ) - defer.returnValue(json.loads(content)) + defer.returnValue(db_to_json(content)) @cachedInlineCallbacks() def _get_cached_devices_for_user(self, user_id): @@ -479,7 +479,7 @@ class DeviceStore(SQLBaseStore): desc="_get_cached_devices_for_user", ) defer.returnValue({ - device["device_id"]: json.loads(device["content"]) + device["device_id"]: db_to_json(device["content"]) for device in devices }) @@ -511,7 +511,7 @@ class DeviceStore(SQLBaseStore): key_json = device.get("key_json", None) if key_json: - result["keys"] = json.loads(key_json) + result["keys"] = db_to_json(key_json) device_display_name = device.get("device_display_name", None) if device_display_name: result["device_display_name"] = device_display_name diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 523b4360c3..1f1721e820 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -14,13 +14,13 @@ # limitations under the License. from six import iteritems -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.util.caches.descriptors import cached -from ._base import SQLBaseStore +from ._base import SQLBaseStore, db_to_json class EndToEndKeyStore(SQLBaseStore): @@ -90,7 +90,7 @@ class EndToEndKeyStore(SQLBaseStore): for user_id, device_keys in iteritems(results): for device_id, device_info in iteritems(device_keys): - device_info["keys"] = json.loads(device_info.pop("key_json")) + device_info["keys"] = db_to_json(device_info.pop("key_json")) defer.returnValue(results) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 8a0386c1a4..42225f8a2a 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -41,13 +41,18 @@ class PostgresEngine(object): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) + + # Set the bytea output to escape, vs the default of hex + cursor = db_conn.cursor() + cursor.execute("SET bytea_output TO escape") + # Asynchronous commit, don't wait for the server to call fsync before # ending the transaction. # https://www.postgresql.org/docs/current/static/wal-async-commit.html if not self.synchronous_commit: - cursor = db_conn.cursor() cursor.execute("SET synchronous_commit TO OFF") - cursor.close() + + cursor.close() def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f39c8c8461..8bf87f38f7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -19,7 +19,7 @@ import logging from collections import OrderedDict, deque, namedtuple from functools import wraps -from six import iteritems +from six import iteritems, text_type from six.moves import range from canonicaljson import json @@ -1220,7 +1220,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore "sender": event.sender, "contains_url": ( "url" in event.content - and isinstance(event.content["url"], basestring) + and isinstance(event.content["url"], text_type) ), } for event, _ in events_and_contexts @@ -1529,7 +1529,7 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore contains_url = "url" in content if contains_url: - contains_url &= isinstance(content["url"], basestring) + contains_url &= isinstance(content["url"], text_type) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. @@ -1910,9 +1910,9 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore (room_id,) ) rows = txn.fetchall() - max_depth = max(row[0] for row in rows) + max_depth = max(row[1] for row in rows) - if max_depth <= token.topological: + if max_depth < token.topological: # We need to ensure we don't delete all the events from the database # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 59822178ff..a8326f5296 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import itertools import logging from collections import namedtuple @@ -265,7 +266,7 @@ class EventsWorkerStore(SQLBaseStore): """ with Measure(self._clock, "_fetch_event_list"): try: - event_id_lists = zip(*event_list)[0] + event_id_lists = list(zip(*event_list))[0] event_ids = [ item for sublist in event_id_lists for item in sublist ] @@ -299,14 +300,14 @@ class EventsWorkerStore(SQLBaseStore): logger.exception("do_fetch") # We only want to resolve deferreds from the main thread - def fire(evs): + def fire(evs, exc): for _, d in evs: if not d.called: with PreserveLoggingContext(): - d.errback(e) + d.errback(exc) with PreserveLoggingContext(): - self.hs.get_reactor().callFromThread(fire, event_list) + self.hs.get_reactor().callFromThread(fire, event_list, e) @defer.inlineCallbacks def _enqueue_events(self, events, check_redacted=True, allow_rejected=False): diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 2d5896c5b4..6ddcc909bf 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.api.errors import Codes, SynapseError from synapse.util.caches.descriptors import cachedInlineCallbacks -from ._base import SQLBaseStore +from ._base import SQLBaseStore, db_to_json class FilteringStore(SQLBaseStore): @@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore): desc="get_user_filter", ) - defer.returnValue(json.loads(bytes(def_json).decode("utf-8"))) + defer.returnValue(db_to_json(def_json)) def add_user_filter(self, user_localpart, user_filter): def_json = encode_canonical_json(user_filter) diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 06f9a75a97..c7899d7fd2 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -36,7 +36,6 @@ class MonthlyActiveUsersStore(SQLBaseStore): @defer.inlineCallbacks def initialise_reserved_users(self, threepids): - # TODO Why can't I do this in init? store = self.hs.get_datastore() reserved_user_list = [] @@ -147,6 +146,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): return count return self.runInteraction("count_users", _count_users) + @defer.inlineCallbacks def upsert_monthly_active_user(self, user_id): """ Updates or inserts monthly active user member @@ -155,7 +155,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): Deferred[bool]: True if a new entry was created, False if an existing one was updated. """ - is_insert = self._simple_upsert( + is_insert = yield self._simple_upsert( desc="upsert_monthly_active_user", table="monthly_active_users", keyvalues={ @@ -200,6 +200,11 @@ class MonthlyActiveUsersStore(SQLBaseStore): user_id(str): the user_id to query """ if self.hs.config.limit_usage_by_mau: + is_trial = yield self.is_trial_user(user_id) + if is_trial: + # we don't track trial users in the MAU table. + return + last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id) now = self.hs.get_clock().time_msec() diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8443bd4c1b..c7987bfcdd 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -15,7 +15,8 @@ # limitations under the License. import logging -import types + +import six from canonicaljson import encode_canonical_json, json @@ -27,6 +28,11 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) +if six.PY2: + db_binary_type = buffer +else: + db_binary_type = memoryview + class PusherWorkerStore(SQLBaseStore): def _decode_pushers_rows(self, rows): @@ -34,18 +40,18 @@ class PusherWorkerStore(SQLBaseStore): dataJson = r['data'] r['data'] = None try: - if isinstance(dataJson, types.BufferType): + if isinstance(dataJson, db_binary_type): dataJson = str(dataJson).decode("UTF8") r['data'] = json.loads(dataJson) except Exception as e: logger.warn( "Invalid JSON in data for pusher %d: %s, %s", - r['id'], dataJson, e.message, + r['id'], dataJson, e.args[0], ) pass - if isinstance(r['pushkey'], types.BufferType): + if isinstance(r['pushkey'], db_binary_type): r['pushkey'] = str(r['pushkey']).decode("UTF8") return rows diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 07333f777d..26b429e307 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -26,6 +26,11 @@ from synapse.util.caches.descriptors import cached, cachedInlineCallbacks class RegistrationWorkerStore(SQLBaseStore): + def __init__(self, db_conn, hs): + super(RegistrationWorkerStore, self).__init__(db_conn, hs) + + self.config = hs.config + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( @@ -36,12 +41,33 @@ class RegistrationWorkerStore(SQLBaseStore): retcols=[ "name", "password_hash", "is_guest", "consent_version", "consent_server_notice_sent", - "appservice_id", + "appservice_id", "creation_ts", ], allow_none=True, desc="get_user_by_id", ) + @defer.inlineCallbacks + def is_trial_user(self, user_id): + """Checks if user is in the "trial" period, i.e. within the first + N days of registration defined by `mau_trial_days` config + + Args: + user_id (str) + + Returns: + Deferred[bool] + """ + + info = yield self.get_user_by_id(user_id) + if not info: + defer.returnValue(False) + + now = self.clock.time_msec() + trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000 + is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms + defer.returnValue(is_trial) + @cached() def get_user_by_access_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 428e7fa36e..0c42bd3322 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -18,14 +18,14 @@ from collections import namedtuple import six -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches.descriptors import cached -from ._base import SQLBaseStore +from ._base import SQLBaseStore, db_to_json # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview @@ -95,7 +95,8 @@ class TransactionStore(SQLBaseStore): ) if result and result["response_code"]: - return result["response_code"], json.loads(str(result["response_json"])) + return result["response_code"], db_to_json(result["response_json"]) + else: return None diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index 022d81ce3e..f65a27e5f1 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -457,8 +457,8 @@ class AuthTestCase(unittest.TestCase): with self.assertRaises(ResourceLimitError) as e: yield self.auth.check_auth_blocking() - self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri) - self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) + self.assertEquals(e.exception.admin_contact, self.hs.config.admin_contact) + self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) self.assertEquals(e.exception.code, 403) # Ensure does not throw an error @@ -468,11 +468,36 @@ class AuthTestCase(unittest.TestCase): yield self.auth.check_auth_blocking() @defer.inlineCallbacks + def test_reserved_threepid(self): + self.hs.config.limit_usage_by_mau = True + self.hs.config.max_mau_value = 1 + threepid = {'medium': 'email', 'address': 'reserved@server.com'} + unknown_threepid = {'medium': 'email', 'address': 'unreserved@server.com'} + self.hs.config.mau_limits_reserved_threepids = [threepid] + + yield self.store.register(user_id='user1', token="123", password_hash=None) + with self.assertRaises(ResourceLimitError): + yield self.auth.check_auth_blocking() + + with self.assertRaises(ResourceLimitError): + yield self.auth.check_auth_blocking(threepid=unknown_threepid) + + yield self.auth.check_auth_blocking(threepid=threepid) + + @defer.inlineCallbacks def test_hs_disabled(self): self.hs.config.hs_disabled = True self.hs.config.hs_disabled_message = "Reason for being disabled" with self.assertRaises(ResourceLimitError) as e: yield self.auth.check_auth_blocking() - self.assertEquals(e.exception.admin_uri, self.hs.config.admin_uri) - self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) + self.assertEquals(e.exception.admin_contact, self.hs.config.admin_contact) + self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) self.assertEquals(e.exception.code, 403) + + @defer.inlineCallbacks + def test_server_notices_mxid_special_cased(self): + self.hs.config.hs_disabled = True + user = "@user:server" + self.hs.config.server_notices_mxid = user + self.hs.config.hs_disabled_message = "Reason for being disabled" + yield self.auth.check_auth_blocking(user) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index a01ab471f5..31f54bbd7d 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -51,7 +51,7 @@ class SyncTestCase(tests.unittest.TestCase): self.hs.config.hs_disabled = True with self.assertRaises(ResourceLimitError) as e: yield self.sync_handler.wait_for_sync_for_user(sync_config) - self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) + self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) self.hs.config.hs_disabled = False @@ -59,7 +59,7 @@ class SyncTestCase(tests.unittest.TestCase): with self.assertRaises(ResourceLimitError) as e: yield self.sync_handler.wait_for_sync_for_user(sync_config) - self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEED) + self.assertEquals(e.exception.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) def _generate_sync_config(self, user_id): return SyncConfig( diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 40dc4ea256..530dc8ba6d 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -240,7 +240,6 @@ class RestHelper(object): self.assertEquals(200, code) defer.returnValue(response) - @defer.inlineCallbacks def send(self, room_id, body=None, txn_id=None, tok=None, expect_code=200): if txn_id is None: txn_id = "m%s" % (str(time.time())) @@ -248,9 +247,16 @@ class RestHelper(object): body = "body_text_here" path = "/_matrix/client/r0/rooms/%s/send/m.room.message/%s" % (room_id, txn_id) - content = '{"msgtype":"m.text","body":"%s"}' % body + content = {"msgtype": "m.text", "body": body} if tok: path = path + "?access_token=%s" % tok - (code, response) = yield self.mock_resource.trigger("PUT", path, content) - self.assertEquals(expect_code, code, msg=str(response)) + request, channel = make_request("PUT", path, json.dumps(content).encode('utf8')) + render(request, self.resource, self.hs.get_reactor()) + + assert int(channel.result["code"]) == expect_code, ( + "Expected: %d, got: %d, resp: %r" + % (expect_code, int(channel.result["code"]), channel.result["body"]) + ) + + return channel.json_body diff --git a/tests/server.py b/tests/server.py index c63b2c3100..7dbdb7f8ea 100644 --- a/tests/server.py +++ b/tests/server.py @@ -5,7 +5,7 @@ from six import text_type import attr -from twisted.internet import threads +from twisted.internet import address, threads from twisted.internet.defer import Deferred from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock @@ -63,7 +63,9 @@ class FakeChannel(object): self.result["done"] = True def getPeer(self): - return None + # We give an address so that getClientIP returns a non null entry, + # causing us to record the MAU + return address.IPv4Address(b"TCP", "127.0.0.1", 3423) def getHost(self): return None @@ -91,7 +93,7 @@ class FakeSite: return FakeLogger() -def make_request(method, path, content=b""): +def make_request(method, path, content=b"", access_token=None): """ Make a web request using the given method and path, feed it the content, and return the Request and the Channel underneath. @@ -116,6 +118,11 @@ def make_request(method, path, content=b""): req = SynapseRequest(site, channel) req.process = lambda: b"" req.content = BytesIO(content) + + if access_token: + req.requestHeaders.addRawHeader(b"Authorization", b"Bearer " + access_token) + + req.requestHeaders.addRawHeader(b"X-Forwarded-For", b"127.0.0.1") req.requestReceived(method, path, b"1.1") return req, channel diff --git a/tests/server_notices/__init__.py b/tests/server_notices/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/server_notices/__init__.py diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py new file mode 100644 index 0000000000..5cc7fff39b --- /dev/null +++ b/tests/server_notices/test_resource_limits_server_notices.py @@ -0,0 +1,213 @@ +from mock import Mock + +from twisted.internet import defer + +from synapse.api.constants import EventTypes, ServerNoticeMsgType +from synapse.api.errors import ResourceLimitError +from synapse.handlers.auth import AuthHandler +from synapse.server_notices.resource_limits_server_notices import ( + ResourceLimitsServerNotices, +) + +from tests import unittest +from tests.utils import setup_test_homeserver + + +class AuthHandlers(object): + def __init__(self, hs): + self.auth_handler = AuthHandler(hs) + + +class TestResourceLimitsServerNotices(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield setup_test_homeserver(self.addCleanup, handlers=None) + self.hs.handlers = AuthHandlers(self.hs) + self.auth_handler = self.hs.handlers.auth_handler + self.server_notices_sender = self.hs.get_server_notices_sender() + + # relying on [1] is far from ideal, but the only case where + # ResourceLimitsServerNotices class needs to be isolated is this test, + # general code should never have a reason to do so ... + self._rlsn = self.server_notices_sender._server_notices[1] + if not isinstance(self._rlsn, ResourceLimitsServerNotices): + raise Exception("Failed to find reference to ResourceLimitsServerNotices") + + self._rlsn._store.user_last_seen_monthly_active = Mock( + return_value=defer.succeed(1000) + ) + self._send_notice = self._rlsn._server_notices_manager.send_notice + self._rlsn._server_notices_manager.send_notice = Mock() + self._rlsn._state.get_current_state = Mock(return_value=defer.succeed(None)) + self._rlsn._store.get_events = Mock(return_value=defer.succeed({})) + + self._send_notice = self._rlsn._server_notices_manager.send_notice + + self.hs.config.limit_usage_by_mau = True + self.user_id = "@user_id:test" + + # self.server_notices_mxid = "@server:test" + # self.server_notices_mxid_display_name = None + # self.server_notices_mxid_avatar_url = None + # self.server_notices_room_name = "Server Notices" + + self._rlsn._server_notices_manager.get_notice_room_for_user = Mock( + returnValue="" + ) + self._rlsn._store.add_tag_to_room = Mock() + self._rlsn._store.get_tags_for_room = Mock(return_value={}) + self.hs.config.admin_contact = "mailto:user@test.com" + + @defer.inlineCallbacks + def test_maybe_send_server_notice_to_user_flag_off(self): + """Tests cases where the flags indicate nothing to do""" + # test hs disabled case + self.hs.config.hs_disabled = True + + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + self._send_notice.assert_not_called() + # Test when mau limiting disabled + self.hs.config.hs_disabled = False + self.hs.limit_usage_by_mau = False + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + self._send_notice.assert_not_called() + + @defer.inlineCallbacks + def test_maybe_send_server_notice_to_user_remove_blocked_notice(self): + """Test when user has blocked notice, but should have it removed""" + + self._rlsn._auth.check_auth_blocking = Mock() + mock_event = Mock( + type=EventTypes.Message, + content={"msgtype": ServerNoticeMsgType}, + ) + self._rlsn._store.get_events = Mock(return_value=defer.succeed( + {"123": mock_event} + )) + + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + # Would be better to check the content, but once == remove blocking event + self._send_notice.assert_called_once() + + @defer.inlineCallbacks + def test_maybe_send_server_notice_to_user_remove_blocked_notice_noop(self): + """Test when user has blocked notice, but notice ought to be there (NOOP)""" + self._rlsn._auth.check_auth_blocking = Mock( + side_effect=ResourceLimitError(403, 'foo') + ) + + mock_event = Mock( + type=EventTypes.Message, + content={"msgtype": ServerNoticeMsgType}, + ) + self._rlsn._store.get_events = Mock(return_value=defer.succeed( + {"123": mock_event} + )) + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + self._send_notice.assert_not_called() + + @defer.inlineCallbacks + def test_maybe_send_server_notice_to_user_add_blocked_notice(self): + """Test when user does not have blocked notice, but should have one""" + + self._rlsn._auth.check_auth_blocking = Mock( + side_effect=ResourceLimitError(403, 'foo') + ) + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + # Would be better to check contents, but 2 calls == set blocking event + self.assertTrue(self._send_notice.call_count == 2) + + @defer.inlineCallbacks + def test_maybe_send_server_notice_to_user_add_blocked_notice_noop(self): + """Test when user does not have blocked notice, nor should they (NOOP)""" + + self._rlsn._auth.check_auth_blocking = Mock() + + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + self._send_notice.assert_not_called() + + @defer.inlineCallbacks + def test_maybe_send_server_notice_to_user_not_in_mau_cohort(self): + + """Test when user is not part of the MAU cohort - this should not ever + happen - but ... + """ + + self._rlsn._auth.check_auth_blocking = Mock() + self._rlsn._store.user_last_seen_monthly_active = Mock( + return_value=defer.succeed(None) + ) + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + self._send_notice.assert_not_called() + + +class TestResourceLimitsServerNoticesWithRealRooms(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield setup_test_homeserver(self.addCleanup) + self.store = self.hs.get_datastore() + self.server_notices_sender = self.hs.get_server_notices_sender() + self.server_notices_manager = self.hs.get_server_notices_manager() + self.event_source = self.hs.get_event_sources() + + # relying on [1] is far from ideal, but the only case where + # ResourceLimitsServerNotices class needs to be isolated is this test, + # general code should never have a reason to do so ... + self._rlsn = self.server_notices_sender._server_notices[1] + if not isinstance(self._rlsn, ResourceLimitsServerNotices): + raise Exception("Failed to find reference to ResourceLimitsServerNotices") + + self.hs.config.limit_usage_by_mau = True + self.hs.config.hs_disabled = False + self.hs.config.max_mau_value = 5 + self.hs.config.server_notices_mxid = "@server:test" + self.hs.config.server_notices_mxid_display_name = None + self.hs.config.server_notices_mxid_avatar_url = None + self.hs.config.server_notices_room_name = "Test Server Notice Room" + + self.user_id = "@user_id:test" + + self.hs.config.admin_contact = "mailto:user@test.com" + + @defer.inlineCallbacks + def test_server_notice_only_sent_once(self): + self.store.get_monthly_active_count = Mock( + return_value=1000, + ) + + self.store.user_last_seen_monthly_active = Mock( + return_value=1000, + ) + + # Call the function multiple times to ensure we only send the notice once + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + yield self._rlsn.maybe_send_server_notice_to_user(self.user_id) + + # Now lets get the last load of messages in the service notice room and + # check that there is only one server notice + room_id = yield self.server_notices_manager.get_notice_room_for_user( + self.user_id, + ) + + token = yield self.event_source.get_current_token() + events, _ = yield self.store.get_recent_events_for_room( + room_id, limit=100, end_token=token.room_key, + ) + + count = 0 + for event in events: + if event.type != EventTypes.Message: + continue + if event.content.get("msgtype") != ServerNoticeMsgType: + continue + + count += 1 + + self.assertEqual(count, 1) diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 7cb5f0e4cf..829f47d2e8 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -20,11 +20,11 @@ from mock import Mock from twisted.internet import defer -from synapse.server import HomeServer from synapse.storage._base import SQLBaseStore from synapse.storage.engines import create_engine from tests import unittest +from tests.utils import TestHomeServer class SQLBaseStoreTestCase(unittest.TestCase): @@ -51,7 +51,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): config = Mock() config.event_cache_size = 1 config.database_config = {"name": "sqlite3"} - hs = HomeServer( + hs = TestHomeServer( "test", db_pool=self.db_pool, config=config, diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py new file mode 100644 index 0000000000..f671599cb8 --- /dev/null +++ b/tests/storage/test_purge.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.rest.client.v1 import room + +from tests.unittest import HomeserverTestCase + + +class PurgeTests(HomeserverTestCase): + + user_id = "@red:server" + servlets = [room.register_servlets] + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver("server", http_client=None) + return hs + + def prepare(self, reactor, clock, hs): + self.room_id = self.helper.create_room_as(self.user_id) + + def test_purge(self): + """ + Purging a room will delete everything before the topological point. + """ + # Send four messages to the room + first = self.helper.send(self.room_id, body="test1") + second = self.helper.send(self.room_id, body="test2") + third = self.helper.send(self.room_id, body="test3") + last = self.helper.send(self.room_id, body="test4") + + storage = self.hs.get_datastore() + + # Get the topological token + event = storage.get_topological_token_for_event(last["event_id"]) + self.pump() + event = self.successResultOf(event) + + # Purge everything before this topological token + purge = storage.purge_history(self.room_id, event, True) + self.pump() + self.assertEqual(self.successResultOf(purge), None) + + # Try and get the events + get_first = storage.get_event(first["event_id"]) + get_second = storage.get_event(second["event_id"]) + get_third = storage.get_event(third["event_id"]) + get_last = storage.get_event(last["event_id"]) + self.pump() + + # 1-3 should fail and last will succeed, meaning that 1-3 are deleted + # and last is not. + self.failureResultOf(get_first) + self.failureResultOf(get_second) + self.failureResultOf(get_third) + self.successResultOf(get_last) + + def test_purge_wont_delete_extrems(self): + """ + Purging a room will delete everything before the topological point. + """ + # Send four messages to the room + first = self.helper.send(self.room_id, body="test1") + second = self.helper.send(self.room_id, body="test2") + third = self.helper.send(self.room_id, body="test3") + last = self.helper.send(self.room_id, body="test4") + + storage = self.hs.get_datastore() + + # Set the topological token higher than it should be + event = storage.get_topological_token_for_event(last["event_id"]) + self.pump() + event = self.successResultOf(event) + event = "t{}-{}".format( + *list(map(lambda x: x + 1, map(int, event[1:].split("-")))) + ) + + # Purge everything before this topological token + purge = storage.purge_history(self.room_id, event, True) + self.pump() + f = self.failureResultOf(purge) + self.assertIn("greater than forward", f.value.args[0]) + + # Try and get the events + get_first = storage.get_event(first["event_id"]) + get_second = storage.get_event(second["event_id"]) + get_third = storage.get_event(third["event_id"]) + get_last = storage.get_event(last["event_id"]) + self.pump() + + # Nothing is deleted. + self.successResultOf(get_first) + self.successResultOf(get_second) + self.successResultOf(get_third) + self.successResultOf(get_last) diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index 4eda122edc..3dfb7b903a 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -46,6 +46,7 @@ class RegistrationStoreTestCase(unittest.TestCase): "consent_version": None, "consent_server_notice_sent": None, "appservice_id": None, + "creation_ts": 1000, }, (yield self.store.get_user_by_id(self.user_id)), ) diff --git a/tests/test_mau.py b/tests/test_mau.py new file mode 100644 index 0000000000..0732615447 --- /dev/null +++ b/tests/test_mau.py @@ -0,0 +1,217 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests REST events for /rooms paths.""" + +import json + +from mock import Mock, NonCallableMock + +from synapse.api.constants import LoginType +from synapse.api.errors import Codes, HttpResponseException, SynapseError +from synapse.http.server import JsonResource +from synapse.rest.client.v2_alpha import register, sync +from synapse.util import Clock + +from tests import unittest +from tests.server import ( + ThreadedMemoryReactorClock, + make_request, + render, + setup_test_homeserver, +) + + +class TestMauLimit(unittest.TestCase): + def setUp(self): + self.reactor = ThreadedMemoryReactorClock() + self.clock = Clock(self.reactor) + + self.hs = setup_test_homeserver( + self.addCleanup, + "red", + http_client=None, + clock=self.clock, + reactor=self.reactor, + federation_client=Mock(), + ratelimiter=NonCallableMock(spec_set=["send_message"]), + ) + + self.store = self.hs.get_datastore() + + self.hs.config.registrations_require_3pid = [] + self.hs.config.enable_registration_captcha = False + self.hs.config.recaptcha_public_key = [] + + self.hs.config.limit_usage_by_mau = True + self.hs.config.hs_disabled = False + self.hs.config.max_mau_value = 2 + self.hs.config.mau_trial_days = 0 + self.hs.config.server_notices_mxid = "@server:red" + self.hs.config.server_notices_mxid_display_name = None + self.hs.config.server_notices_mxid_avatar_url = None + self.hs.config.server_notices_room_name = "Test Server Notice Room" + + self.resource = JsonResource(self.hs) + register.register_servlets(self.hs, self.resource) + sync.register_servlets(self.hs, self.resource) + + def test_simple_deny_mau(self): + # Create and sync so that the MAU counts get updated + token1 = self.create_user("kermit1") + self.do_sync_for_user(token1) + token2 = self.create_user("kermit2") + self.do_sync_for_user(token2) + + # We've created and activated two users, we shouldn't be able to + # register new users + with self.assertRaises(SynapseError) as cm: + self.create_user("kermit3") + + e = cm.exception + self.assertEqual(e.code, 403) + self.assertEqual(e.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + + def test_allowed_after_a_month_mau(self): + # Create and sync so that the MAU counts get updated + token1 = self.create_user("kermit1") + self.do_sync_for_user(token1) + token2 = self.create_user("kermit2") + self.do_sync_for_user(token2) + + # Advance time by 31 days + self.reactor.advance(31 * 24 * 60 * 60) + + self.store.reap_monthly_active_users() + + self.reactor.advance(0) + + # We should be able to register more users + token3 = self.create_user("kermit3") + self.do_sync_for_user(token3) + + def test_trial_delay(self): + self.hs.config.mau_trial_days = 1 + + # We should be able to register more than the limit initially + token1 = self.create_user("kermit1") + self.do_sync_for_user(token1) + token2 = self.create_user("kermit2") + self.do_sync_for_user(token2) + token3 = self.create_user("kermit3") + self.do_sync_for_user(token3) + + # Advance time by 2 days + self.reactor.advance(2 * 24 * 60 * 60) + + # Two users should be able to sync + self.do_sync_for_user(token1) + self.do_sync_for_user(token2) + + # But the third should fail + with self.assertRaises(SynapseError) as cm: + self.do_sync_for_user(token3) + + e = cm.exception + self.assertEqual(e.code, 403) + self.assertEqual(e.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + + # And new registrations are now denied too + with self.assertRaises(SynapseError) as cm: + self.create_user("kermit4") + + e = cm.exception + self.assertEqual(e.code, 403) + self.assertEqual(e.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + + def test_trial_users_cant_come_back(self): + self.hs.config.mau_trial_days = 1 + + # We should be able to register more than the limit initially + token1 = self.create_user("kermit1") + self.do_sync_for_user(token1) + token2 = self.create_user("kermit2") + self.do_sync_for_user(token2) + token3 = self.create_user("kermit3") + self.do_sync_for_user(token3) + + # Advance time by 2 days + self.reactor.advance(2 * 24 * 60 * 60) + + # Two users should be able to sync + self.do_sync_for_user(token1) + self.do_sync_for_user(token2) + + # Advance by 2 months so everyone falls out of MAU + self.reactor.advance(60 * 24 * 60 * 60) + self.store.reap_monthly_active_users() + self.reactor.advance(0) + + # We can create as many new users as we want + token4 = self.create_user("kermit4") + self.do_sync_for_user(token4) + token5 = self.create_user("kermit5") + self.do_sync_for_user(token5) + token6 = self.create_user("kermit6") + self.do_sync_for_user(token6) + + # users 2 and 3 can come back to bring us back up to MAU limit + self.do_sync_for_user(token2) + self.do_sync_for_user(token3) + + # New trial users can still sync + self.do_sync_for_user(token4) + self.do_sync_for_user(token5) + self.do_sync_for_user(token6) + + # But old user cant + with self.assertRaises(SynapseError) as cm: + self.do_sync_for_user(token1) + + e = cm.exception + self.assertEqual(e.code, 403) + self.assertEqual(e.errcode, Codes.RESOURCE_LIMIT_EXCEEDED) + + def create_user(self, localpart): + request_data = json.dumps({ + "username": localpart, + "password": "monkey", + "auth": {"type": LoginType.DUMMY}, + }) + + request, channel = make_request(b"POST", b"/register", request_data) + render(request, self.resource, self.reactor) + + if channel.result["code"] != b"200": + raise HttpResponseException( + int(channel.result["code"]), + channel.result["reason"], + channel.result["body"], + ).to_synapse_error() + + access_token = channel.json_body["access_token"] + + return access_token + + def do_sync_for_user(self, token): + request, channel = make_request(b"GET", b"/sync", access_token=token) + render(request, self.resource, self.reactor) + + if channel.result["code"] != b"200": + raise HttpResponseException( + int(channel.result["code"]), + channel.result["reason"], + channel.result["body"], + ).to_synapse_error() diff --git a/tests/test_types.py b/tests/test_types.py index be072d402b..0f5c8bfaf9 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -14,12 +14,12 @@ # limitations under the License. from synapse.api.errors import SynapseError -from synapse.server import HomeServer from synapse.types import GroupID, RoomAlias, UserID from tests import unittest +from tests.utils import TestHomeServer -mock_homeserver = HomeServer(hostname="my.domain") +mock_homeserver = TestHomeServer(hostname="my.domain") class UserIDTestCase(unittest.TestCase): diff --git a/tests/unittest.py b/tests/unittest.py index d852e2465a..8b513bb32b 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -151,6 +151,7 @@ class HomeserverTestCase(TestCase): hijack_auth (bool): Whether to hijack auth to return the user specified in user_id. """ + servlets = [] hijack_auth = True @@ -279,3 +280,13 @@ class HomeserverTestCase(TestCase): kwargs = dict(kwargs) kwargs.update(self._hs_args) return setup_test_homeserver(self.addCleanup, *args, **kwargs) + + def pump(self): + """ + Pump the reactor enough that Deferreds will fire. + """ + self.reactor.pump([0.0] * 100) + + def get_success(self, d): + self.pump() + return self.successResultOf(d) diff --git a/tests/utils.py b/tests/utils.py index 9f7ff94575..63e30dc6c0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -26,10 +26,11 @@ from twisted.internet import defer, reactor from synapse.api.constants import EventTypes from synapse.api.errors import CodeMessageException, cs_error +from synapse.config.server import ServerConfig from synapse.federation.transport import server from synapse.http.server import HttpServer from synapse.server import HomeServer -from synapse.storage import PostgresEngine +from synapse.storage import DataStore, PostgresEngine from synapse.storage.engines import create_engine from synapse.storage.prepare_database import ( _get_or_create_schema_state, @@ -92,10 +93,14 @@ def setupdb(): atexit.register(_cleanup) +class TestHomeServer(HomeServer): + DATASTORE_CLASS = DataStore + + @defer.inlineCallbacks def setup_test_homeserver( cleanup_func, name="test", datastore=None, config=None, reactor=None, - homeserverToUse=HomeServer, **kargs + homeserverToUse=TestHomeServer, **kargs ): """ Setup a homeserver suitable for running tests against. Keyword arguments @@ -142,7 +147,9 @@ def setup_test_homeserver( config.hs_disabled_limit_type = "" config.max_mau_value = 50 config.mau_limits_reserved_threepids = [] - config.admin_uri = None + config.admin_contact = None + config.rc_messages_per_second = 10000 + config.rc_message_burst_count = 10000 # we need a sane default_room_version, otherwise attempts to create rooms will # fail. @@ -152,6 +159,11 @@ def setup_test_homeserver( # background, which upsets the test runner. config.update_user_directory = False + def is_threepid_reserved(threepid): + return ServerConfig.is_threepid_reserved(config, threepid) + + config.is_threepid_reserved.side_effect = is_threepid_reserved + config.use_frozen_dicts = True config.ldap_enabled = False |