diff options
author | Travis Ralston <travpc@gmail.com> | 2018-02-01 18:05:47 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-01 18:05:47 -0700 |
commit | 6e87b34f7b7c595d4e5b96e4a0281b8327e5b8b7 (patch) | |
tree | 35701fcc0456593d736a3405f72a5dfd624b1cad | |
parent | pep8 (diff) | |
parent | Merge pull request #2837 from matrix-org/rav/fix_quarantine_media (diff) | |
download | synapse-6e87b34f7b7c595d4e5b96e4a0281b8327e5b8b7.tar.xz |
Merge branch 'develop' into travis/admin-list-media
47 files changed, 716 insertions, 386 deletions
diff --git a/scripts/move_remote_media_to_new_store.py b/scripts/move_remote_media_to_new_store.py new file mode 100755 index 0000000000..7914ead889 --- /dev/null +++ b/scripts/move_remote_media_to_new_store.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Moves a list of remote media from one media store to another. + +The input should be a list of media files to be moved, one per line. Each line +should be formatted:: + + <origin server>|<file id> + +This can be extracted from postgres with:: + + psql --tuples-only -A -c "select media_origin, filesystem_id from + matrix.remote_media_cache where ..." + +To use, pipe the above into:: + + PYTHON_PATH=. ./scripts/move_remote_media_to_new_store.py <source repo> <dest repo> +""" + +from __future__ import print_function + +import argparse +import logging + +import sys + +import os + +import shutil + +from synapse.rest.media.v1.filepath import MediaFilePaths + +logger = logging.getLogger() + + +def main(src_repo, dest_repo): + src_paths = MediaFilePaths(src_repo) + dest_paths = MediaFilePaths(dest_repo) + for line in sys.stdin: + line = line.strip() + parts = line.split('|') + if len(parts) != 2: + print("Unable to parse input line %s" % line, file=sys.stderr) + exit(1) + + move_media(parts[0], parts[1], src_paths, dest_paths) + + +def move_media(origin_server, file_id, src_paths, dest_paths): + """Move the given file, and any thumbnails, to the dest repo + + Args: + origin_server (str): + file_id (str): + src_paths (MediaFilePaths): + dest_paths (MediaFilePaths): + """ + logger.info("%s/%s", origin_server, file_id) + + # check that the original exists + original_file = src_paths.remote_media_filepath(origin_server, file_id) + if not os.path.exists(original_file): + logger.warn( + "Original for %s/%s (%s) does not exist", + origin_server, file_id, original_file, + ) + else: + mkdir_and_move( + original_file, + dest_paths.remote_media_filepath(origin_server, file_id), + ) + + # now look for thumbnails + original_thumb_dir = src_paths.remote_media_thumbnail_dir( + origin_server, file_id, + ) + if not os.path.exists(original_thumb_dir): + return + + mkdir_and_move( + original_thumb_dir, + dest_paths.remote_media_thumbnail_dir(origin_server, file_id) + ) + + +def mkdir_and_move(original_file, dest_file): + dirname = os.path.dirname(dest_file) + if not os.path.exists(dirname): + logger.debug("mkdir %s", dirname) + os.makedirs(dirname) + logger.debug("mv %s %s", original_file, dest_file) + shutil.move(original_file, dest_file) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class = argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-v", action='store_true', help='enable debug logging') + parser.add_argument( + "src_repo", + help="Path to source content repo", + ) + parser.add_argument( + "dest_repo", + help="Path to source content repo", + ) + args = parser.parse_args() + + logging_config = { + "level": logging.DEBUG if args.v else logging.INFO, + "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s" + } + logging.basicConfig(**logging_config) + + main(args.src_repo, args.dest_repo) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 79b35b3e7c..aa15f73f36 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -46,6 +46,7 @@ class Codes(object): THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" THREEPID_IN_USE = "M_THREEPID_IN_USE" THREEPID_NOT_FOUND = "M_THREEPID_NOT_FOUND" + THREEPID_DENIED = "M_THREEPID_DENIED" INVALID_USERNAME = "M_INVALID_USERNAME" SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" @@ -140,6 +141,32 @@ class RegistrationError(SynapseError): pass +class FederationDeniedError(SynapseError): + """An error raised when the server tries to federate with a server which + is not on its federation whitelist. + + Attributes: + destination (str): The destination which has been denied + """ + + def __init__(self, destination): + """Raised by federation client or server to indicate that we are + are deliberately not attempting to contact a given server because it is + not on our federation whitelist. + + Args: + destination (str): the domain in question + """ + + self.destination = destination + + super(FederationDeniedError, self).__init__( + code=403, + msg="Federation denied with %s." % (self.destination,), + errcode=Codes.FORBIDDEN, + ) + + class InteractiveAuthIncompleteError(Exception): """An error raised when UI auth is not yet complete diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 7d0c2879ae..c6fe4516d1 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -49,19 +49,6 @@ class AppserviceSlaveStore( class AppserviceServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = AppserviceSlaveStore(self.get_db_conn(), self) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index dc3f6efd43..3b3352798d 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -64,19 +64,6 @@ class ClientReaderSlavedStore( class ClientReaderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index a072291e1f..4de43c41f0 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -58,19 +58,6 @@ class FederationReaderSlavedStore( class FederationReaderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 09e9488f06..f760826d27 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -76,19 +76,6 @@ class FederationSenderSlaveStore( class FederationSenderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index ae531c0aa4..e32ee8fe93 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -118,19 +118,6 @@ class FrontendProxySlavedStore( class FrontendProxyServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 92ab3b311b..cb82a415a6 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -266,19 +266,6 @@ class SynapseHomeServer(HomeServer): except IncorrectDatabaseSetup as e: quit_with_error(e.message) - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(config_options): """ diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index eab1597aaa..1ed1ca8772 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -60,19 +60,6 @@ class MediaRepositorySlavedStore( class MediaRepositoryServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 7fbbb0b0e1..32ccea3f13 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -81,19 +81,6 @@ class PusherSlaveStore( class PusherServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = PusherSlaveStore(self.get_db_conn(), self) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 0abba3016e..f87531f1b6 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -246,19 +246,6 @@ class SynchrotronApplicationService(object): class SynchrotronServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index a48c4a2ae6..494ccb702c 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -92,19 +92,6 @@ class UserDirectorySlaveStore( class UserDirectoryServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index ef917fc9f2..336959094b 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -31,6 +31,8 @@ class RegistrationConfig(Config): strtobool(str(config["disable_registration"])) ) + self.registrations_require_3pid = config.get("registrations_require_3pid", []) + self.allowed_local_3pids = config.get("allowed_local_3pids", []) self.registration_shared_secret = config.get("registration_shared_secret") self.bcrypt_rounds = config.get("bcrypt_rounds", 12) @@ -52,6 +54,23 @@ class RegistrationConfig(Config): # Enable registration for new users. enable_registration: False + # The user must provide all of the below types of 3PID when registering. + # + # registrations_require_3pid: + # - email + # - msisdn + + # Mandate that users are only allowed to associate certain formats of + # 3PIDs with accounts on this server. + # + # allowed_local_3pids: + # - medium: email + # pattern: ".*@matrix\\.org" + # - medium: email + # pattern: ".*@vector\\.im" + # - medium: msisdn + # pattern: "\\+44" + # If set, allows registration by anyone who also has the shared # secret, even if registration is otherwise disabled. registration_shared_secret: "%(registration_shared_secret)s" diff --git a/synapse/config/server.py b/synapse/config/server.py index 436dd8a6fe..8f0b6d1f28 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -55,6 +55,17 @@ class ServerConfig(Config): "block_non_admin_invites", False, ) + # FIXME: federation_domain_whitelist needs sytests + self.federation_domain_whitelist = None + federation_domain_whitelist = config.get( + "federation_domain_whitelist", None + ) + # turn the whitelist into a hash for speed of lookup + if federation_domain_whitelist is not None: + self.federation_domain_whitelist = {} + for domain in federation_domain_whitelist: + self.federation_domain_whitelist[domain] = True + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' @@ -210,6 +221,17 @@ class ServerConfig(Config): # (except those sent by local server admins). The default is False. # block_non_admin_invites: True + # Restrict federation to the following whitelist of domains. + # N.B. we recommend also firewalling your federation listener to limit + # inbound federation traffic as early as possible, rather than relying + # purely on this application-layer restriction. If not specified, the + # default is to whitelist everything. + # + # federation_domain_whitelist: + # - lon.example.com + # - nyc.example.com + # - syd.example.com + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b1fe03f702..813907f7f2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.errors import ( - CodeMessageException, HttpResponseException, SynapseError, + CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError ) from synapse.events import builder from synapse.federation.federation_base import ( @@ -266,6 +266,9 @@ class FederationClient(FederationBase): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e.message) + continue except Exception as e: pdu_attempts[destination] = now diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d39f46583..a141ec9953 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,7 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction, Edu -from synapse.api.errors import HttpResponseException +from synapse.api.errors import HttpResponseException, FederationDeniedError from synapse.util import logcontext, PreserveLoggingContext from synapse.util.async import run_on_reactor from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter @@ -490,6 +490,8 @@ class TransactionQueue(object): (e.retry_last_ts + e.retry_interval) / 1000.0 ), ) + except FederationDeniedError as e: + logger.info(e) except Exception as e: logger.warn( "TX [%s] Failed to send transaction: %s", diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1f3ce238f6..5488e82985 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -212,6 +212,9 @@ class TransportLayerClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if the remote destination + is not in our federation whitelist """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2b02b021ec..06c16ba4fa 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, FederationDeniedError from synapse.http.server import JsonResource from synapse.http.servlet import ( parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, @@ -81,6 +81,7 @@ class Authenticator(object): self.keyring = hs.get_keyring() self.server_name = hs.hostname self.store = hs.get_datastore() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks @@ -92,6 +93,12 @@ class Authenticator(object): "signatures": {}, } + if ( + self.federation_domain_whitelist is not None and + self.server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(self.server_name) + if content is not None: json_request["content"] = content diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 2152efc692..0e83453851 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api import errors from synapse.api.constants import EventTypes +from synapse.api.errors import FederationDeniedError from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache @@ -513,6 +514,9 @@ class DeviceListEduUpdater(object): # This makes it more likely that the device lists will # eventually become consistent. return + except FederationDeniedError as e: + logger.info(e) + return except Exception: # TODO: Remember that we are now out of sync and try again # later diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5af8abf66b..9aa95f89e6 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -19,7 +19,9 @@ import logging from canonicaljson import encode_canonical_json from twisted.internet import defer -from synapse.api.errors import SynapseError, CodeMessageException +from synapse.api.errors import ( + SynapseError, CodeMessageException, FederationDeniedError, +) from synapse.types import get_domain_from_id, UserID from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination @@ -140,6 +142,10 @@ class E2eKeysHandler(object): failures[destination] = { "status": 503, "message": "Not ready for retry", } + except FederationDeniedError as e: + failures[destination] = { + "status": 403, "message": "Federation Denied", + } except Exception as e: # include ConnectionRefused and other errors failures[destination] = { diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ac70730885..677532c87b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -22,6 +22,7 @@ from ._base import BaseHandler from synapse.api.errors import ( AuthError, FederationError, StoreError, CodeMessageException, SynapseError, + FederationDeniedError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator @@ -782,6 +783,9 @@ class FederationHandler(BaseHandler): except NotRetryingDestination as e: logger.info(e.message) continue + except FederationDeniedError as e: + logger.info(e) + continue except Exception as e: logger.exception( "Failed to backfill from %s because %s", diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5b808beac1..9021d4d57f 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient from synapse import types from synapse.types import UserID from synapse.util.async import run_on_reactor +from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler): """ for c in threepidCreds: - logger.info("validating theeepidcred sid %s on id server %s", + logger.info("validating threepidcred sid %s on id server %s", c['sid'], c['idServer']) try: identity_handler = self.hs.get_handlers().identity_handler @@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler): logger.info("got threepid with medium '%s' and address '%s'", threepid['medium'], threepid['address']) + if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']): + raise RegistrationError( + 403, "Third party identifier is not allowed" + ) + @defer.inlineCallbacks def bind_emails(self, user_id, threepidCreds): """Links emails with a user ID and informs an identity server. diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index bb40075387..dfa09141ed 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler): if limit: step = limit + 1 else: - step = len(rooms_to_scan) + # step cannot be zero + step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 chunk = [] for i in xrange(0, len(rooms_to_scan), step): diff --git a/synapse/http/client.py b/synapse/http/client.py index 4abb479ae3..f3e4973c2e 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -18,6 +18,7 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) +from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable from synapse.util import logcontext import synapse.metrics @@ -30,6 +31,7 @@ from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.web.client import ( BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent, readBody, PartialDownloadError, + HTTPConnectionPool, ) from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer from twisted.web.http import PotentialDataLoss @@ -64,13 +66,23 @@ class SimpleHttpClient(object): """ def __init__(self, hs): self.hs = hs + + pool = HTTPConnectionPool(reactor) + + # the pusher makes lots of concurrent SSL connections to sygnal, and + # tends to do so in batches, so we need to allow the pool to keep lots + # of idle connections around. + pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5)) + pool.cachedConnectionTimeout = 2 * 60 + # The default context factory in Twisted 14.0.0 (which we require) is # BrowserLikePolicyForHTTPS which will do regular cert validation # 'like a browser' self.agent = Agent( reactor, connectTimeout=15, - contextFactory=hs.get_http_client_context_factory() + contextFactory=hs.get_http_client_context_factory(), + pool=pool, ) self.user_agent = hs.version_string self.clock = hs.get_clock() diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index e2b99ef3bd..87639b9151 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -357,8 +357,7 @@ def _get_hosts_for_srv_record(dns_client, host): def eb(res, record_type): if res.check(DNSNameError): return [] - logger.warn("Error looking up %s for %s: %s", - record_type, host, res, res.value) + logger.warn("Error looking up %s for %s: %s", record_type, host, res) return res # no logcontexts here, so we can safely fire these off and gatherResults diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 833496b72d..9145405cb0 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,7 @@ import synapse.metrics from canonicaljson import encode_canonical_json from synapse.api.errors import ( - SynapseError, Codes, HttpResponseException, + SynapseError, Codes, HttpResponseException, FederationDeniedError, ) from signedjson.sign import sign_json @@ -123,11 +123,22 @@ class MatrixFederationHttpClient(object): Fails with ``HTTPRequestException``: if we get an HTTP response code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist + (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if ( + self.hs.config.federation_domain_whitelist and + destination not in self.hs.config.federation_domain_whitelist + ): + raise FederationDeniedError(destination) + limiter = yield synapse.util.retryutils.get_retry_limiter( destination, self.clock, @@ -308,6 +319,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ if not json_data_callback: @@ -368,6 +382,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ def body_callback(method, url_bytes, headers_dict): @@ -422,6 +439,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ logger.debug("get_json args: %s", args) @@ -475,6 +495,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ response = yield self._request( @@ -518,6 +541,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ encoded_args = {} diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2265e6e8d6..e0cfb7d08f 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -146,10 +146,15 @@ def runUntilCurrentTimer(func): num_pending += 1 num_pending += len(reactor.threadCallQueue) - start = time.time() * 1000 ret = func(*args, **kwargs) end = time.time() * 1000 + + # record the amount of wallclock time spent running pending calls. + # This is a proxy for the actual amount of time between reactor polls, + # since about 25% of time is actually spent running things triggered by + # I/O events, but that is harder to capture without rewriting half the + # reactor. tick_time.inc_by(end - start) pending_calls_metric.inc_by(num_pending) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index c16f61452c..2cbac571b8 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -13,21 +13,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from synapse.push import PusherConfigException +import logging from twisted.internet import defer, reactor from twisted.internet.error import AlreadyCalled, AlreadyCancelled -import logging import push_rule_evaluator import push_tools - +import synapse +from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +http_push_processed_counter = metrics.register_counter( + "http_pushes_processed", +) + +http_push_failed_counter = metrics.register_counter( + "http_pushes_failed", +) + class HttpPusher(object): INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes @@ -152,9 +161,16 @@ class HttpPusher(object): self.user_id, self.last_stream_ordering, self.max_stream_ordering ) + logger.info( + "Processing %i unprocessed push actions for %s starting at " + "stream_ordering %s", + len(unprocessed), self.name, self.last_stream_ordering, + ) + for push_action in unprocessed: processed = yield self._process_one(push_action) if processed: + http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] yield self.store.update_pusher_last_stream_ordering_and_success( @@ -169,6 +185,7 @@ class HttpPusher(object): self.failing_since ) else: + http_push_failed_counter.inc() if not self.failing_since: self.failing_since = self.clock.time_msec() yield self.store.update_pusher_failing_since( @@ -316,7 +333,10 @@ class HttpPusher(object): try: resp = yield self.http_client.post_json_get_json(self.url, notification_dict) except Exception: - logger.warn("Failed to push %s ", self.url) + logger.warn( + "Failed to push event %s to %s", + event.event_id, self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: @@ -325,7 +345,7 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): - logger.info("Sending updated badge count %d to %r", badge, self.user_id) + logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { 'id': '', @@ -347,7 +367,10 @@ class HttpPusher(object): try: resp = yield self.http_client.post_json_get_json(self.url, d) except Exception: - logger.exception("Failed to push %s ", self.url) + logger.warn( + "Failed to send badge count to %s", + self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 32ed1d3ab2..5c5fa8f7ab 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -70,10 +70,15 @@ class RegisterRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() def on_GET(self, request): + + require_email = 'email' in self.hs.config.registrations_require_3pid + require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid + + flows = [] if self.hs.config.enable_registration_captcha: - return ( - 200, - {"flows": [ + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: + flows.extend([ { "type": LoginType.RECAPTCHA, "stages": [ @@ -82,27 +87,34 @@ class RegisterRestServlet(ClientV1RestServlet): LoginType.PASSWORD ] }, + ]) + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([ { "type": LoginType.RECAPTCHA, "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD] } - ]} - ) + ]) else: - return ( - 200, - {"flows": [ + # only support the email-only flow if we don't require MSISDN 3PIDs + if require_email or not require_msisdn: + flows.extend([ { "type": LoginType.EMAIL_IDENTITY, "stages": [ LoginType.EMAIL_IDENTITY, LoginType.PASSWORD ] - }, + } + ]) + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([ { "type": LoginType.PASSWORD } - ]} - ) + ]) + return (200, {"flows": flows}) @defer.inlineCallbacks def on_POST(self, request): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 682a0af9fc..867ec8602c 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -195,15 +195,20 @@ class RoomSendEventRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) + event_dict = { + "type": event_type, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + } + + if 'ts' in request.args and requester.app_service: + event_dict['origin_server_ts'] = parse_integer(request, "ts", 0) + msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( requester, - { - "type": event_type, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - }, + event_dict, txn_id=txn_id, ) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 385a3ad2ec..30523995af 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -26,6 +26,7 @@ from synapse.http.servlet import ( ) from synapse.util.async import run_on_reactor from synapse.util.msisdn import phone_number_to_msisdn +from synapse.util.threepids import check_3pid_allowed from ._base import client_v2_patterns, interactive_auth_handler logger = logging.getLogger(__name__) @@ -47,6 +48,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): 'id_server', 'client_secret', 'email', 'send_attempt' ]) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] ) @@ -78,6 +84,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn ) @@ -217,6 +228,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): if absent: raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.datastore.get_user_id_by_threepid( 'email', body['email'] ) @@ -255,6 +271,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index e9d88a8895..c6f4680a76 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -26,6 +26,7 @@ from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string ) from synapse.util.msisdn import phone_number_to_msisdn +from synapse.util.threepids import check_3pid_allowed from ._base import client_v2_patterns, interactive_auth_handler @@ -70,6 +71,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): 'id_server', 'client_secret', 'email', 'send_attempt' ]) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] ) @@ -105,6 +111,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'msisdn', msisdn ) @@ -305,31 +316,67 @@ class RegisterRestServlet(RestServlet): if 'x_show_msisdn' in body and body['x_show_msisdn']: show_msisdn = True + # FIXME: need a better error than "no auth flow found" for scenarios + # where we required 3PID for registration but the user didn't give one + require_email = 'email' in self.hs.config.registrations_require_3pid + require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid + + flows = [] if self.hs.config.enable_registration_captcha: - flows = [ - [LoginType.RECAPTCHA], - [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], - ] + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([[LoginType.RECAPTCHA]]) + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: + flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]]) + if show_msisdn: + # only support the MSISDN-only flow if we don't require email 3PIDs + if not require_email: + flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]]) + # always let users provide both MSISDN & email flows.extend([ - [LoginType.MSISDN, LoginType.RECAPTCHA], [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], ]) else: - flows = [ - [LoginType.DUMMY], - [LoginType.EMAIL_IDENTITY], - ] + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([[LoginType.DUMMY]]) + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: + flows.extend([[LoginType.EMAIL_IDENTITY]]) + if show_msisdn: + # only support the MSISDN-only flow if we don't require email 3PIDs + if not require_email or require_msisdn: + flows.extend([[LoginType.MSISDN]]) + # always let users provide both MSISDN & email flows.extend([ - [LoginType.MSISDN], - [LoginType.MSISDN, LoginType.EMAIL_IDENTITY], + [LoginType.MSISDN, LoginType.EMAIL_IDENTITY] ]) auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) + # Check that we're not trying to register a denied 3pid. + # + # the user-facing checks will probably already have happened in + # /register/email/requestToken when we requested a 3pid, but that's not + # guaranteed. + + if auth_result: + for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]: + if login_type in auth_result: + medium = auth_result[login_type]['medium'] + address = auth_result[login_type]['address'] + + if not check_3pid_allowed(self.hs, medium, address): + raise SynapseError( + 403, "Third party identifier is not allowed", + Codes.THREEPID_DENIED, + ) + if registered_user_id is not None: logger.info( "Already registered user ID %r for this session", diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index cc2842aa72..17e6079cba 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -93,6 +93,7 @@ class RemoteKey(Resource): self.store = hs.get_datastore() self.version_string = hs.version_string self.clock = hs.get_clock() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist def render_GET(self, request): self.async_render_GET(request) @@ -137,6 +138,13 @@ class RemoteKey(Resource): logger.info("Handling query for keys %r", query) store_queries = [] for server_name, key_ids in query.items(): + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + logger.debug("Federation denied with %s", server_name) + continue + if not key_ids: key_ids = (None,) for key_id in key_ids: diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 4f56bcf577..485db8577a 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -32,8 +32,9 @@ from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.util.stringutils import random_string -from synapse.api.errors import SynapseError, HttpResponseException, \ - NotFoundError +from synapse.api.errors import ( + SynapseError, HttpResponseException, NotFoundError, FederationDeniedError, +) from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii @@ -75,6 +76,8 @@ class MediaRepository(object): self.recently_accessed_remotes = set() self.recently_accessed_locals = set() + self.federation_domain_whitelist = hs.config.federation_domain_whitelist + # List of StorageProviders where we should search for media and # potentially upload to. storage_providers = [] @@ -216,6 +219,12 @@ class MediaRepository(object): Deferred: Resolves once a response has successfully been written to request """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + self.mark_recently_accessed(server_name, media_id) # We linearize here to ensure that we don't try and download remote @@ -250,6 +259,12 @@ class MediaRepository(object): Returns: Deferred[dict]: The media_info of the file """ + if ( + self.federation_domain_whitelist is not None and + server_name not in self.federation_domain_whitelist + ): + raise FederationDeniedError(server_name) + # We linearize here to ensure that we don't try and download remote # media multiple times concurrently key = (server_name, media_id) diff --git a/synapse/server.py b/synapse/server.py index 99693071b6..ff8a8fbc46 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -307,6 +307,23 @@ class HomeServer(object): **self.db_config.get("args", {}) ) + def get_db_conn(self, run_new_connection=True): + """Makes a new connection to the database, skipping the db pool + + Returns: + Connection: a connection object implementing the PEP-249 spec + """ + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + def build_media_repository_resource(self): # build the media repo resource. This indirects through the HomeServer # to ensure that we only have a single instance of diff --git a/synapse/state.py b/synapse/state.py index 1f9abf9d3d..4c8247e7c2 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -146,8 +146,20 @@ class StateHandler(object): defer.returnValue(state) @defer.inlineCallbacks - def get_current_state_ids(self, room_id, event_type=None, state_key="", - latest_event_ids=None): + def get_current_state_ids(self, room_id, latest_event_ids=None): + """Get the current state, or the state at a set of events, for a room + + Args: + room_id (str): + + latest_event_ids (iterable[str]|None): if given, the forward + extremities to resolve. If None, we look them up from the + database (via a cache) + + Returns: + Deferred[dict[(str, str), str)]]: the state dict, mapping from + (event_type, state_key) -> event_id + """ if not latest_event_ids: latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) @@ -155,10 +167,6 @@ class StateHandler(object): ret = yield self.resolve_state_groups(room_id, latest_event_ids) state = ret.state - if event_type: - defer.returnValue(state.get((event_type, state_key))) - return - defer.returnValue(state) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7a9cd3ec90..33fccfa7a8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -110,7 +110,7 @@ class _EventPeristenceQueue(object): end_item.events_and_contexts.extend(events_and_contexts) return end_item.deferred.observe() - deferred = ObservableDeferred(defer.Deferred()) + deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) queue.append(self._EventPersistQueueItem( events_and_contexts=events_and_contexts, @@ -152,8 +152,8 @@ class _EventPeristenceQueue(object): try: ret = yield per_item_callback(item) item.deferred.callback(ret) - except Exception as e: - item.deferred.errback(e) + except Exception: + item.deferred.errback() finally: queue = self._event_persist_queues.pop(room_id, None) if queue: diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 961ad5abca..cf2c4dae39 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -577,7 +577,7 @@ class RoomStore(SQLBaseStore): """ UPDATE remote_media_cache SET quarantined_by = ? - WHERE media_origin AND media_id = ? + WHERE media_origin = ? AND media_id = ? """, ( (quarantined_by, origin, media_id) diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py index f150ef0103..dfdcbb3181 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py @@ -641,13 +641,12 @@ class UserDirectoryStore(SQLBaseStore): """ if self.hs.config.user_directory_search_all_users: - # dummy to keep the number of binds & aliases the same + # make s.user_id null to keep the ordering algorithm happy join_clause = """ - LEFT JOIN ( - SELECT NULL as user_id WHERE NULL = ? - ) AS s USING (user_id)" + CROSS JOIN (SELECT NULL as user_id) AS s """ - where_clause = "" + join_args = () + where_clause = "1=1" else: join_clause = """ LEFT JOIN users_in_public_rooms AS p USING (user_id) @@ -656,6 +655,7 @@ class UserDirectoryStore(SQLBaseStore): WHERE user_id = ? AND share_private ) AS s USING (user_id) """ + join_args = (user_id,) where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)" if isinstance(self.database_engine, PostgresEngine): @@ -697,7 +697,7 @@ class UserDirectoryStore(SQLBaseStore): avatar_url IS NULL LIMIT ? """ % (join_clause, where_clause) - args = (user_id, full_query, exact_query, prefix_query, limit + 1,) + args = join_args + (full_query, exact_query, prefix_query, limit + 1,) elif isinstance(self.database_engine, Sqlite3Engine): search_query = _parse_query_sqlite(search_term) @@ -715,7 +715,7 @@ class UserDirectoryStore(SQLBaseStore): avatar_url IS NULL LIMIT ? """ % (join_clause, where_clause) - args = (user_id, search_query, limit + 1) + args = join_args + (search_query, limit + 1) else: # This should be unreachable. raise Exception("Unrecognized database engine") diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 1adedbb361..47b0bb5eb3 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -26,6 +26,18 @@ logger = logging.getLogger(__name__) class NotRetryingDestination(Exception): def __init__(self, retry_last_ts, retry_interval, destination): + """Raised by the limiter (and federation client) to indicate that we are + are deliberately not attempting to contact a given server. + + Args: + retry_last_ts (int): the unix ts in milliseconds of our last attempt + to contact the server. 0 indicates that the last attempt was + successful or that we've never actually attempted to connect. + retry_interval (int): the time in milliseconds to wait until the next + attempt. + destination (str): the domain in question + """ + msg = "Not retrying server %s." % (destination,) super(NotRetryingDestination, self).__init__(msg) diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py new file mode 100644 index 0000000000..75efa0117b --- /dev/null +++ b/synapse/util/threepids.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +logger = logging.getLogger(__name__) + + +def check_3pid_allowed(hs, medium, address): + """Checks whether a given format of 3PID is allowed to be used on this HS + + Args: + hs (synapse.server.HomeServer): server + medium (str): 3pid medium - e.g. email, msisdn + address (str): address within that medium (e.g. "wotan@matrix.org") + msisdns need to first have been canonicalised + Returns: + bool: whether the 3PID medium/address is allowed to be added to this HS + """ + + if hs.config.allowed_local_3pids: + for constraint in hs.config.allowed_local_3pids: + logger.debug( + "Checking 3PID %s (%s) against %s (%s)", + address, medium, constraint['pattern'], constraint['medium'], + ) + if ( + medium == constraint['medium'] and + re.match(constraint['pattern'], address) + ): + return True + else: + return True + + return False diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index c899fecf5d..d4ec02ffc2 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -167,7 +167,7 @@ class KeyringTestCase(unittest.TestCase): # wait a tick for it to send the request to the perspectives server # (it first tries the datastore) - yield async.sleep(0.005) + yield async.sleep(1) # XXX find out why this takes so long! self.http_client.post_json.assert_called_once() self.assertIs(LoggingContext.current_context(), context_11) @@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase): res_deferreds_2 = kr.verify_json_objects_for_server( [("server10", json1)], ) - yield async.sleep(0.005) + yield async.sleep(01) self.http_client.post_json.assert_not_called() res_deferreds_2[0].addBoth(self.check_context, None) diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 19f5ed6bce..d92bf240b1 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -143,7 +143,6 @@ class E2eKeysHandlerTestCase(unittest.TestCase): except errors.SynapseError: pass - @unittest.DEBUG @defer.inlineCallbacks def test_claim_one_time_key(self): local_user = "@boris:" + self.hs.hostname diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 81063f19a1..74f104e3b8 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor from tests import unittest +import tempfile + from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -41,7 +43,9 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.event_id = 0 server_factory = ReplicationStreamProtocolFactory(self.hs) - listener = reactor.listenUNIX("\0xxx", server_factory) + # XXX: mktemp is unsafe and should never be used. but we're just a test. + path = tempfile.mktemp(prefix="base_slaved_store_test_case_socket") + listener = reactor.listenUNIX(path, server_factory) self.addCleanup(listener.stopListening) self.streamer = server_factory.streamer @@ -49,7 +53,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): client_factory = ReplicationClientFactory( self.hs, "client_name", self.replication_handler ) - client_connector = reactor.connectUNIX("\0xxx", client_factory) + client_connector = reactor.connectUNIX(path, client_factory) self.addCleanup(client_factory.stopTrying) self.addCleanup(client_connector.disconnect) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 096f771bea..8aba456510 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -49,6 +49,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.hs.get_auth_handler = Mock(return_value=self.auth_handler) self.hs.get_device_handler = Mock(return_value=self.device_handler) self.hs.config.enable_registration = True + self.hs.config.registrations_require_3pid = [] self.hs.config.auto_join_rooms = [] # init the thing we're testing diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py new file mode 100644 index 0000000000..0891308f25 --- /dev/null +++ b/tests/storage/test_user_directory.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.storage import UserDirectoryStore +from synapse.storage.roommember import ProfileInfo +from tests import unittest +from tests.utils import setup_test_homeserver + +ALICE = "@alice:a" +BOB = "@bob:b" +BOBBY = "@bobby:a" + + +class UserDirectoryStoreTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield setup_test_homeserver() + self.store = UserDirectoryStore(None, self.hs) + + # alice and bob are both in !room_id. bobby is not but shares + # a homeserver with alice. + yield self.store.add_profiles_to_user_dir( + "!room:id", + { + ALICE: ProfileInfo(None, "alice"), + BOB: ProfileInfo(None, "bob"), + BOBBY: ProfileInfo(None, "bobby") + }, + ) + yield self.store.add_users_to_public_room( + "!room:id", + [ALICE, BOB], + ) + yield self.store.add_users_who_share_room( + "!room:id", + False, + ( + (ALICE, BOB), + (BOB, ALICE), + ), + ) + + @defer.inlineCallbacks + def test_search_user_dir(self): + # normally when alice searches the directory she should just find + # bob because bobby doesn't share a room with her. + r = yield self.store.search_user_dir(ALICE, "bob", 10) + self.assertFalse(r["limited"]) + self.assertEqual(1, len(r["results"])) + self.assertDictEqual(r["results"][0], { + "user_id": BOB, + "display_name": "bob", + "avatar_url": None, + }) + + @defer.inlineCallbacks + def test_search_user_dir_all_users(self): + self.hs.config.user_directory_search_all_users = True + try: + r = yield self.store.search_user_dir(ALICE, "bob", 10) + self.assertFalse(r["limited"]) + self.assertEqual(2, len(r["results"])) + self.assertDictEqual(r["results"][0], { + "user_id": BOB, + "display_name": "bob", + "avatar_url": None, + }) + self.assertDictEqual(r["results"][1], { + "user_id": BOBBY, + "display_name": "bobby", + "avatar_url": None, + }) + finally: + self.hs.config.user_directory_search_all_users = False diff --git a/tests/utils.py b/tests/utils.py index 44e5f75093..8efd3a3475 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -13,27 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import HttpServer -from synapse.api.errors import cs_error, CodeMessageException, StoreError -from synapse.api.constants import EventTypes -from synapse.storage.prepare_database import prepare_database -from synapse.storage.engines import create_engine -from synapse.server import HomeServer -from synapse.federation.transport import server -from synapse.util.ratelimitutils import FederationRateLimiter - -from synapse.util.logcontext import LoggingContext - -from twisted.internet import defer, reactor -from twisted.enterprise.adbapi import ConnectionPool - -from collections import namedtuple -from mock import patch, Mock import hashlib +from inspect import getcallargs import urllib import urlparse -from inspect import getcallargs +from mock import Mock, patch +from twisted.internet import defer, reactor + +from synapse.api.errors import CodeMessageException, cs_error +from synapse.federation.transport import server +from synapse.http.server import HttpServer +from synapse.server import HomeServer +from synapse.storage import PostgresEngine +from synapse.storage.engines import create_engine +from synapse.storage.prepare_database import prepare_database +from synapse.util.logcontext import LoggingContext +from synapse.util.ratelimitutils import FederationRateLimiter + +# set this to True to run the tests against postgres instead of sqlite. +# It requires you to have a local postgres database called synapse_test, within +# which ALL TABLES WILL BE DROPPED +USE_POSTGRES_FOR_TESTS = False @defer.inlineCallbacks @@ -57,36 +58,70 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs): config.worker_app = None config.email_enable_notifs = False config.block_non_admin_invites = False + config.federation_domain_whitelist = None + config.user_directory_search_all_users = False # disable user directory updates, because they get done in the # background, which upsets the test runner. config.update_user_directory = False config.use_frozen_dicts = True - config.database_config = {"name": "sqlite3"} config.ldap_enabled = False if "clock" not in kargs: kargs["clock"] = MockClock() + if USE_POSTGRES_FOR_TESTS: + config.database_config = { + "name": "psycopg2", + "args": { + "database": "synapse_test", + "cp_min": 1, + "cp_max": 5, + }, + } + else: + config.database_config = { + "name": "sqlite3", + "args": { + "database": ":memory:", + "cp_min": 1, + "cp_max": 1, + }, + } + + db_engine = create_engine(config.database_config) + + # we need to configure the connection pool to run the on_new_connection + # function, so that we can test code that uses custom sqlite functions + # (like rank). + config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection + if datastore is None: - db_pool = SQLiteMemoryDbPool() - yield db_pool.prepare() hs = HomeServer( - name, db_pool=db_pool, config=config, + name, config=config, + db_config=config.database_config, version_string="Synapse/tests", - database_engine=create_engine(config.database_config), - get_db_conn=db_pool.get_db_conn, + database_engine=db_engine, room_list_handler=object(), tls_server_context_factory=Mock(), **kargs ) + db_conn = hs.get_db_conn() + # make sure that the database is empty + if isinstance(db_engine, PostgresEngine): + cur = db_conn.cursor() + cur.execute("SELECT tablename FROM pg_tables where schemaname='public'") + rows = cur.fetchall() + for r in rows: + cur.execute("DROP TABLE %s CASCADE" % r[0]) + yield prepare_database(db_conn, db_engine, config) hs.setup() else: hs = HomeServer( name, db_pool=None, datastore=datastore, config=config, version_string="Synapse/tests", - database_engine=create_engine(config.database_config), + database_engine=db_engine, room_list_handler=object(), tls_server_context_factory=Mock(), **kargs @@ -305,168 +340,6 @@ class MockClock(object): return d -class SQLiteMemoryDbPool(ConnectionPool, object): - def __init__(self): - super(SQLiteMemoryDbPool, self).__init__( - "sqlite3", ":memory:", - cp_min=1, - cp_max=1, - ) - - self.config = Mock() - self.config.password_providers = [] - self.config.database_config = {"name": "sqlite3"} - - def prepare(self): - engine = self.create_engine() - return self.runWithConnection( - lambda conn: prepare_database(conn, engine, self.config) - ) - - def get_db_conn(self): - conn = self.connect() - engine = self.create_engine() - prepare_database(conn, engine, self.config) - return conn - - def create_engine(self): - return create_engine(self.config.database_config) - - -class MemoryDataStore(object): - - Room = namedtuple( - "Room", - ["room_id", "is_public", "creator"] - ) - - def __init__(self): - self.tokens_to_users = {} - self.paths_to_content = {} - - self.members = {} - self.rooms = {} - - self.current_state = {} - self.events = [] - - class Snapshot(namedtuple("Snapshot", "room_id user_id membership_state")): - def fill_out_prev_events(self, event): - pass - - def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): - return self.Snapshot( - room_id, user_id, self.get_room_member(user_id, room_id) - ) - - def register(self, user_id, token, password_hash): - if user_id in self.tokens_to_users.values(): - raise StoreError(400, "User in use.") - self.tokens_to_users[token] = user_id - - def get_user_by_access_token(self, token): - try: - return { - "name": self.tokens_to_users[token], - } - except Exception: - raise StoreError(400, "User does not exist.") - - def get_room(self, room_id): - try: - return self.rooms[room_id] - except Exception: - return None - - def store_room(self, room_id, room_creator_user_id, is_public): - if room_id in self.rooms: - raise StoreError(409, "Conflicting room!") - - room = MemoryDataStore.Room( - room_id=room_id, - is_public=is_public, - creator=room_creator_user_id - ) - self.rooms[room_id] = room - - def get_room_member(self, user_id, room_id): - return self.members.get(room_id, {}).get(user_id) - - def get_room_members(self, room_id, membership=None): - if membership: - return [ - v for k, v in self.members.get(room_id, {}).items() - if v.membership == membership - ] - else: - return self.members.get(room_id, {}).values() - - def get_rooms_for_user_where_membership_is(self, user_id, membership_list): - return [ - m[user_id] for m in self.members.values() - if user_id in m and m[user_id].membership in membership_list - ] - - def get_room_events_stream(self, user_id=None, from_key=None, to_key=None, - limit=0, with_feedback=False): - return ([], from_key) # TODO - - def get_joined_hosts_for_room(self, room_id): - return defer.succeed([]) - - def persist_event(self, event): - if event.type == EventTypes.Member: - room_id = event.room_id - user = event.state_key - self.members.setdefault(room_id, {})[user] = event - - if hasattr(event, "state_key"): - key = (event.room_id, event.type, event.state_key) - self.current_state[key] = event - - self.events.append(event) - - def get_current_state(self, room_id, event_type=None, state_key=""): - if event_type: - key = (room_id, event_type, state_key) - if self.current_state.get(key): - return [self.current_state.get(key)] - return None - else: - return [ - e for e in self.current_state - if e[0] == room_id - ] - - def set_presence_state(self, user_localpart, state): - return defer.succeed({"state": 0}) - - def get_presence_list(self, user_localpart, accepted): - return [] - - def get_room_events_max_id(self): - return "s0" # TODO (erikj) - - def get_send_event_level(self, room_id): - return defer.succeed(0) - - def get_power_level(self, room_id, user_id): - return defer.succeed(0) - - def get_add_state_level(self, room_id): - return defer.succeed(0) - - def get_room_join_rule(self, room_id): - # TODO (erikj): This should be configurable - return defer.succeed("invite") - - def get_ops_levels(self, room_id): - return defer.succeed((5, 5, 5)) - - def insert_client_ip(self, user, access_token, ip, user_agent): - return defer.succeed(None) - - def _format_call(args, kwargs): return ", ".join( ["%r" % (a) for a in args] + |