diff --git a/docs/admin_api/media_admin_api.md b/docs/admin_api/media_admin_api.md
new file mode 100644
index 0000000000..abdbc1ea86
--- /dev/null
+++ b/docs/admin_api/media_admin_api.md
@@ -0,0 +1,23 @@
+# List all media in a room
+
+This API gets a list of known media in a room.
+
+The API is:
+```
+GET /_matrix/client/r0/admin/room/<room_id>/media
+```
+including an `access_token` of a server admin.
+
+It returns a JSON body like the following:
+```
+{
+ "local": [
+ "mxc://localhost/xwvutsrqponmlkjihgfedcba",
+ "mxc://localhost/abcdefghijklmnopqrstuvwx"
+ ],
+ "remote": [
+ "mxc://matrix.org/xwvutsrqponmlkjihgfedcba",
+ "mxc://matrix.org/abcdefghijklmnopqrstuvwx"
+ ]
+}
+```
diff --git a/scripts/move_remote_media_to_new_store.py b/scripts/move_remote_media_to_new_store.py
new file mode 100755
index 0000000000..7914ead889
--- /dev/null
+++ b/scripts/move_remote_media_to_new_store.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Moves a list of remote media from one media store to another.
+
+The input should be a list of media files to be moved, one per line. Each line
+should be formatted::
+
+ <origin server>|<file id>
+
+This can be extracted from postgres with::
+
+ psql --tuples-only -A -c "select media_origin, filesystem_id from
+ matrix.remote_media_cache where ..."
+
+To use, pipe the above into::
+
+ PYTHON_PATH=. ./scripts/move_remote_media_to_new_store.py <source repo> <dest repo>
+"""
+
+from __future__ import print_function
+
+import argparse
+import logging
+
+import sys
+
+import os
+
+import shutil
+
+from synapse.rest.media.v1.filepath import MediaFilePaths
+
+logger = logging.getLogger()
+
+
+def main(src_repo, dest_repo):
+ src_paths = MediaFilePaths(src_repo)
+ dest_paths = MediaFilePaths(dest_repo)
+ for line in sys.stdin:
+ line = line.strip()
+ parts = line.split('|')
+ if len(parts) != 2:
+ print("Unable to parse input line %s" % line, file=sys.stderr)
+ exit(1)
+
+ move_media(parts[0], parts[1], src_paths, dest_paths)
+
+
+def move_media(origin_server, file_id, src_paths, dest_paths):
+ """Move the given file, and any thumbnails, to the dest repo
+
+ Args:
+ origin_server (str):
+ file_id (str):
+ src_paths (MediaFilePaths):
+ dest_paths (MediaFilePaths):
+ """
+ logger.info("%s/%s", origin_server, file_id)
+
+ # check that the original exists
+ original_file = src_paths.remote_media_filepath(origin_server, file_id)
+ if not os.path.exists(original_file):
+ logger.warn(
+ "Original for %s/%s (%s) does not exist",
+ origin_server, file_id, original_file,
+ )
+ else:
+ mkdir_and_move(
+ original_file,
+ dest_paths.remote_media_filepath(origin_server, file_id),
+ )
+
+ # now look for thumbnails
+ original_thumb_dir = src_paths.remote_media_thumbnail_dir(
+ origin_server, file_id,
+ )
+ if not os.path.exists(original_thumb_dir):
+ return
+
+ mkdir_and_move(
+ original_thumb_dir,
+ dest_paths.remote_media_thumbnail_dir(origin_server, file_id)
+ )
+
+
+def mkdir_and_move(original_file, dest_file):
+ dirname = os.path.dirname(dest_file)
+ if not os.path.exists(dirname):
+ logger.debug("mkdir %s", dirname)
+ os.makedirs(dirname)
+ logger.debug("mv %s %s", original_file, dest_file)
+ shutil.move(original_file, dest_file)
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description=__doc__,
+ formatter_class = argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument(
+ "-v", action='store_true', help='enable debug logging')
+ parser.add_argument(
+ "src_repo",
+ help="Path to source content repo",
+ )
+ parser.add_argument(
+ "dest_repo",
+ help="Path to source content repo",
+ )
+ args = parser.parse_args()
+
+ logging_config = {
+ "level": logging.DEBUG if args.v else logging.INFO,
+ "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"
+ }
+ logging.basicConfig(**logging_config)
+
+ main(args.src_repo, args.dest_repo)
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 79b35b3e7c..aa15f73f36 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -46,6 +46,7 @@ class Codes(object):
THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED"
THREEPID_IN_USE = "M_THREEPID_IN_USE"
THREEPID_NOT_FOUND = "M_THREEPID_NOT_FOUND"
+ THREEPID_DENIED = "M_THREEPID_DENIED"
INVALID_USERNAME = "M_INVALID_USERNAME"
SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED"
@@ -140,6 +141,32 @@ class RegistrationError(SynapseError):
pass
+class FederationDeniedError(SynapseError):
+ """An error raised when the server tries to federate with a server which
+ is not on its federation whitelist.
+
+ Attributes:
+ destination (str): The destination which has been denied
+ """
+
+ def __init__(self, destination):
+ """Raised by federation client or server to indicate that we are
+ are deliberately not attempting to contact a given server because it is
+ not on our federation whitelist.
+
+ Args:
+ destination (str): the domain in question
+ """
+
+ self.destination = destination
+
+ super(FederationDeniedError, self).__init__(
+ code=403,
+ msg="Federation denied with %s." % (self.destination,),
+ errcode=Codes.FORBIDDEN,
+ )
+
+
class InteractiveAuthIncompleteError(Exception):
"""An error raised when UI auth is not yet complete
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 7d0c2879ae..c6fe4516d1 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -49,19 +49,6 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index dc3f6efd43..3b3352798d 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -64,19 +64,6 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index a072291e1f..4de43c41f0 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -58,19 +58,6 @@ class FederationReaderSlavedStore(
class FederationReaderServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 09e9488f06..f760826d27 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -76,19 +76,6 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index ae531c0aa4..e32ee8fe93 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -118,19 +118,6 @@ class FrontendProxySlavedStore(
class FrontendProxyServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 92ab3b311b..cb82a415a6 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -266,19 +266,6 @@ class SynapseHomeServer(HomeServer):
except IncorrectDatabaseSetup as e:
quit_with_error(e.message)
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(config_options):
"""
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index eab1597aaa..1ed1ca8772 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -60,19 +60,6 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 7fbbb0b0e1..32ccea3f13 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -81,19 +81,6 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 0abba3016e..f87531f1b6 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -246,19 +246,6 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index a48c4a2ae6..494ccb702c 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -92,19 +92,6 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index ef917fc9f2..336959094b 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -31,6 +31,8 @@ class RegistrationConfig(Config):
strtobool(str(config["disable_registration"]))
)
+ self.registrations_require_3pid = config.get("registrations_require_3pid", [])
+ self.allowed_local_3pids = config.get("allowed_local_3pids", [])
self.registration_shared_secret = config.get("registration_shared_secret")
self.bcrypt_rounds = config.get("bcrypt_rounds", 12)
@@ -52,6 +54,23 @@ class RegistrationConfig(Config):
# Enable registration for new users.
enable_registration: False
+ # The user must provide all of the below types of 3PID when registering.
+ #
+ # registrations_require_3pid:
+ # - email
+ # - msisdn
+
+ # Mandate that users are only allowed to associate certain formats of
+ # 3PIDs with accounts on this server.
+ #
+ # allowed_local_3pids:
+ # - medium: email
+ # pattern: ".*@matrix\\.org"
+ # - medium: email
+ # pattern: ".*@vector\\.im"
+ # - medium: msisdn
+ # pattern: "\\+44"
+
# If set, allows registration by anyone who also has the shared
# secret, even if registration is otherwise disabled.
registration_shared_secret: "%(registration_shared_secret)s"
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 6baa474931..25ea77738a 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -16,6 +16,8 @@
from ._base import Config, ConfigError
from collections import namedtuple
+from synapse.util.module_loader import load_module
+
MISSING_NETADDR = (
"Missing netaddr library. This is required for URL preview API."
@@ -36,6 +38,14 @@ ThumbnailRequirement = namedtuple(
"ThumbnailRequirement", ["width", "height", "method", "media_type"]
)
+MediaStorageProviderConfig = namedtuple(
+ "MediaStorageProviderConfig", (
+ "store_local", # Whether to store newly uploaded local files
+ "store_remote", # Whether to store newly downloaded remote files
+ "store_synchronous", # Whether to wait for successful storage for local uploads
+ ),
+)
+
def parse_thumbnail_requirements(thumbnail_sizes):
""" Takes a list of dictionaries with "width", "height", and "method" keys
@@ -73,16 +83,61 @@ class ContentRepositoryConfig(Config):
self.media_store_path = self.ensure_directory(config["media_store_path"])
- self.backup_media_store_path = config.get("backup_media_store_path")
- if self.backup_media_store_path:
- self.backup_media_store_path = self.ensure_directory(
- self.backup_media_store_path
- )
+ backup_media_store_path = config.get("backup_media_store_path")
- self.synchronous_backup_media_store = config.get(
+ synchronous_backup_media_store = config.get(
"synchronous_backup_media_store", False
)
+ storage_providers = config.get("media_storage_providers", [])
+
+ if backup_media_store_path:
+ if storage_providers:
+ raise ConfigError(
+ "Cannot use both 'backup_media_store_path' and 'storage_providers'"
+ )
+
+ storage_providers = [{
+ "module": "file_system",
+ "store_local": True,
+ "store_synchronous": synchronous_backup_media_store,
+ "store_remote": True,
+ "config": {
+ "directory": backup_media_store_path,
+ }
+ }]
+
+ # This is a list of config that can be used to create the storage
+ # providers. The entries are tuples of (Class, class_config,
+ # MediaStorageProviderConfig), where Class is the class of the provider,
+ # the class_config the config to pass to it, and
+ # MediaStorageProviderConfig are options for StorageProviderWrapper.
+ #
+ # We don't create the storage providers here as not all workers need
+ # them to be started.
+ self.media_storage_providers = []
+
+ for provider_config in storage_providers:
+ # We special case the module "file_system" so as not to need to
+ # expose FileStorageProviderBackend
+ if provider_config["module"] == "file_system":
+ provider_config["module"] = (
+ "synapse.rest.media.v1.storage_provider"
+ ".FileStorageProviderBackend"
+ )
+
+ provider_class, parsed_config = load_module(provider_config)
+
+ wrapper_config = MediaStorageProviderConfig(
+ provider_config.get("store_local", False),
+ provider_config.get("store_remote", False),
+ provider_config.get("store_synchronous", False),
+ )
+
+ self.media_storage_providers.append(
+ (provider_class, parsed_config, wrapper_config,)
+ )
+
self.uploads_path = self.ensure_directory(config["uploads_path"])
self.dynamic_thumbnails = config["dynamic_thumbnails"]
self.thumbnail_requirements = parse_thumbnail_requirements(
@@ -127,13 +182,19 @@ class ContentRepositoryConfig(Config):
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"
- # A secondary directory where uploaded images and attachments are
- # stored as a backup.
- # backup_media_store_path: "%(media_store)s"
-
- # Whether to wait for successful write to backup media store before
- # returning successfully.
- # synchronous_backup_media_store: false
+ # Media storage providers allow media to be stored in different
+ # locations.
+ # media_storage_providers:
+ # - module: file_system
+ # # Whether to write new local files.
+ # store_local: false
+ # # Whether to write new remote media
+ # store_remote: false
+ # # Whether to block upload requests waiting for write to this
+ # # provider to complete
+ # store_synchronous: false
+ # config:
+ # directory: /mnt/some/other/directory
# Directory where in-progress uploads are stored.
uploads_path: "%(uploads_path)s"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 436dd8a6fe..8f0b6d1f28 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -55,6 +55,17 @@ class ServerConfig(Config):
"block_non_admin_invites", False,
)
+ # FIXME: federation_domain_whitelist needs sytests
+ self.federation_domain_whitelist = None
+ federation_domain_whitelist = config.get(
+ "federation_domain_whitelist", None
+ )
+ # turn the whitelist into a hash for speed of lookup
+ if federation_domain_whitelist is not None:
+ self.federation_domain_whitelist = {}
+ for domain in federation_domain_whitelist:
+ self.federation_domain_whitelist[domain] = True
+
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
self.public_baseurl += '/'
@@ -210,6 +221,17 @@ class ServerConfig(Config):
# (except those sent by local server admins). The default is False.
# block_non_admin_invites: True
+ # Restrict federation to the following whitelist of domains.
+ # N.B. we recommend also firewalling your federation listener to limit
+ # inbound federation traffic as early as possible, rather than relying
+ # purely on this application-layer restriction. If not specified, the
+ # default is to whitelist everything.
+ #
+ # federation_domain_whitelist:
+ # - lon.example.com
+ # - nyc.example.com
+ # - syd.example.com
+
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b1fe03f702..813907f7f2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -23,7 +23,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.errors import (
- CodeMessageException, HttpResponseException, SynapseError,
+ CodeMessageException, HttpResponseException, SynapseError, FederationDeniedError
)
from synapse.events import builder
from synapse.federation.federation_base import (
@@ -266,6 +266,9 @@ class FederationClient(FederationBase):
except NotRetryingDestination as e:
logger.info(e.message)
continue
+ except FederationDeniedError as e:
+ logger.info(e.message)
+ continue
except Exception as e:
pdu_attempts[destination] = now
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d39f46583..a141ec9953 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction, Edu
-from synapse.api.errors import HttpResponseException
+from synapse.api.errors import HttpResponseException, FederationDeniedError
from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -490,6 +490,8 @@ class TransactionQueue(object):
(e.retry_last_ts + e.retry_interval) / 1000.0
),
)
+ except FederationDeniedError as e:
+ logger.info(e)
except Exception as e:
logger.warn(
"TX [%s] Failed to send transaction: %s",
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 1f3ce238f6..5488e82985 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -212,6 +212,9 @@ class TransportLayerClient(object):
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if the remote destination
+ is not in our federation whitelist
"""
valid_memberships = {Membership.JOIN, Membership.LEAVE}
if membership not in valid_memberships:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2b02b021ec..06c16ba4fa 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, SynapseError, FederationDeniedError
from synapse.http.server import JsonResource
from synapse.http.servlet import (
parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
@@ -81,6 +81,7 @@ class Authenticator(object):
self.keyring = hs.get_keyring()
self.server_name = hs.hostname
self.store = hs.get_datastore()
+ self.federation_domain_whitelist = hs.config.federation_domain_whitelist
# A method just so we can pass 'self' as the authenticator to the Servlets
@defer.inlineCallbacks
@@ -92,6 +93,12 @@ class Authenticator(object):
"signatures": {},
}
+ if (
+ self.federation_domain_whitelist is not None and
+ self.server_name not in self.federation_domain_whitelist
+ ):
+ raise FederationDeniedError(self.server_name)
+
if content is not None:
json_request["content"] = content
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 2152efc692..0e83453851 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,6 +14,7 @@
# limitations under the License.
from synapse.api import errors
from synapse.api.constants import EventTypes
+from synapse.api.errors import FederationDeniedError
from synapse.util import stringutils
from synapse.util.async import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
@@ -513,6 +514,9 @@ class DeviceListEduUpdater(object):
# This makes it more likely that the device lists will
# eventually become consistent.
return
+ except FederationDeniedError as e:
+ logger.info(e)
+ return
except Exception:
# TODO: Remember that we are now out of sync and try again
# later
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index f7fad15c62..d996aa90bb 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -17,7 +17,8 @@ import logging
from twisted.internet import defer
-from synapse.types import get_domain_from_id
+from synapse.api.errors import SynapseError
+from synapse.types import get_domain_from_id, UserID
from synapse.util.stringutils import random_string
@@ -33,7 +34,7 @@ class DeviceMessageHandler(object):
"""
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
- self.is_mine_id = hs.is_mine_id
+ self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()
hs.get_replication_layer().register_edu_handler(
@@ -52,6 +53,12 @@ class DeviceMessageHandler(object):
message_type = content["type"]
message_id = content["message_id"]
for user_id, by_device in content["messages"].items():
+ # we use UserID.from_string to catch invalid user ids
+ if not self.is_mine(UserID.from_string(user_id)):
+ logger.warning("Request for keys for non-local user %s",
+ user_id)
+ raise SynapseError(400, "Not a user here")
+
messages_by_device = {
device_id: {
"content": message_content,
@@ -77,7 +84,8 @@ class DeviceMessageHandler(object):
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
- if self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if self.is_mine(UserID.from_string(user_id)):
messages_by_device = {
device_id: {
"content": message_content,
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 668a90e495..9aa95f89e6 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -19,8 +19,10 @@ import logging
from canonicaljson import encode_canonical_json
from twisted.internet import defer
-from synapse.api.errors import SynapseError, CodeMessageException
-from synapse.types import get_domain_from_id
+from synapse.api.errors import (
+ SynapseError, CodeMessageException, FederationDeniedError,
+)
+from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
@@ -32,7 +34,7 @@ class E2eKeysHandler(object):
self.store = hs.get_datastore()
self.federation = hs.get_replication_layer()
self.device_handler = hs.get_device_handler()
- self.is_mine_id = hs.is_mine_id
+ self.is_mine = hs.is_mine
self.clock = hs.get_clock()
# doesn't really work as part of the generic query API, because the
@@ -70,7 +72,8 @@ class E2eKeysHandler(object):
remote_queries = {}
for user_id, device_ids in device_keys_query.items():
- if self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if self.is_mine(UserID.from_string(user_id)):
local_query[user_id] = device_ids
else:
remote_queries[user_id] = device_ids
@@ -139,6 +142,10 @@ class E2eKeysHandler(object):
failures[destination] = {
"status": 503, "message": "Not ready for retry",
}
+ except FederationDeniedError as e:
+ failures[destination] = {
+ "status": 403, "message": "Federation Denied",
+ }
except Exception as e:
# include ConnectionRefused and other errors
failures[destination] = {
@@ -170,7 +177,8 @@ class E2eKeysHandler(object):
result_dict = {}
for user_id, device_ids in query.items():
- if not self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s",
user_id)
raise SynapseError(400, "Not a user here")
@@ -213,7 +221,8 @@ class E2eKeysHandler(object):
remote_queries = {}
for user_id, device_keys in query.get("one_time_keys", {}).items():
- if self.is_mine_id(user_id):
+ # we use UserID.from_string to catch invalid user ids
+ if self.is_mine(UserID.from_string(user_id)):
for device_id, algorithm in device_keys.items():
local_query.append((user_id, device_id, algorithm))
else:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac70730885..8ee9434c9b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -22,6 +22,7 @@ from ._base import BaseHandler
from synapse.api.errors import (
AuthError, FederationError, StoreError, CodeMessageException, SynapseError,
+ FederationDeniedError,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
@@ -782,6 +783,9 @@ class FederationHandler(BaseHandler):
except NotRetryingDestination as e:
logger.info(e.message)
continue
+ except FederationDeniedError as e:
+ logger.info(e)
+ continue
except Exception as e:
logger.exception(
"Failed to backfill from %s because %s",
@@ -804,13 +808,12 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
+ resolve = logcontext.preserve_fn(
+ self.state_handler.resolve_state_groups_for_events
+ )
states = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.preserve_fn(self.state_handler.resolve_state_groups)(
- room_id, [e]
- )
- for e in event_ids
- ], consumeErrors=True,
+ [resolve(room_id, [e]) for e in event_ids],
+ consumeErrors=True,
))
states = dict(zip(event_ids, [s.state for s in states]))
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 5b808beac1..9021d4d57f 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient
from synapse import types
from synapse.types import UserID
from synapse.util.async import run_on_reactor
+from synapse.util.threepids import check_3pid_allowed
from ._base import BaseHandler
logger = logging.getLogger(__name__)
@@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler):
"""
for c in threepidCreds:
- logger.info("validating theeepidcred sid %s on id server %s",
+ logger.info("validating threepidcred sid %s on id server %s",
c['sid'], c['idServer'])
try:
identity_handler = self.hs.get_handlers().identity_handler
@@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler):
logger.info("got threepid with medium '%s' and address '%s'",
threepid['medium'], threepid['address'])
+ if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']):
+ raise RegistrationError(
+ 403, "Third party identifier is not allowed"
+ )
+
@defer.inlineCallbacks
def bind_emails(self, user_id, threepidCreds):
"""Links emails with a user ID and informs an identity server.
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index bb40075387..dfa09141ed 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -203,7 +203,8 @@ class RoomListHandler(BaseHandler):
if limit:
step = limit + 1
else:
- step = len(rooms_to_scan)
+ # step cannot be zero
+ step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
chunk = []
for i in xrange(0, len(rooms_to_scan), step):
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 4abb479ae3..f3e4973c2e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -18,6 +18,7 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
+from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util import logcontext
import synapse.metrics
@@ -30,6 +31,7 @@ from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web.client import (
BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent,
readBody, PartialDownloadError,
+ HTTPConnectionPool,
)
from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.http import PotentialDataLoss
@@ -64,13 +66,23 @@ class SimpleHttpClient(object):
"""
def __init__(self, hs):
self.hs = hs
+
+ pool = HTTPConnectionPool(reactor)
+
+ # the pusher makes lots of concurrent SSL connections to sygnal, and
+ # tends to do so in batches, so we need to allow the pool to keep lots
+ # of idle connections around.
+ pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
+ pool.cachedConnectionTimeout = 2 * 60
+
# The default context factory in Twisted 14.0.0 (which we require) is
# BrowserLikePolicyForHTTPS which will do regular cert validation
# 'like a browser'
self.agent = Agent(
reactor,
connectTimeout=15,
- contextFactory=hs.get_http_client_context_factory()
+ contextFactory=hs.get_http_client_context_factory(),
+ pool=pool,
)
self.user_agent = hs.version_string
self.clock = hs.get_clock()
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index e2b99ef3bd..87639b9151 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -357,8 +357,7 @@ def _get_hosts_for_srv_record(dns_client, host):
def eb(res, record_type):
if res.check(DNSNameError):
return []
- logger.warn("Error looking up %s for %s: %s",
- record_type, host, res, res.value)
+ logger.warn("Error looking up %s for %s: %s", record_type, host, res)
return res
# no logcontexts here, so we can safely fire these off and gatherResults
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 833496b72d..9145405cb0 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -27,7 +27,7 @@ import synapse.metrics
from canonicaljson import encode_canonical_json
from synapse.api.errors import (
- SynapseError, Codes, HttpResponseException,
+ SynapseError, Codes, HttpResponseException, FederationDeniedError,
)
from signedjson.sign import sign_json
@@ -123,11 +123,22 @@ class MatrixFederationHttpClient(object):
Fails with ``HTTPRequestException``: if we get an HTTP response
code >= 300.
+
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
+
(May also fail with plenty of other Exceptions for things like DNS
failures, connection failures, SSL failures.)
"""
+ if (
+ self.hs.config.federation_domain_whitelist and
+ destination not in self.hs.config.federation_domain_whitelist
+ ):
+ raise FederationDeniedError(destination)
+
limiter = yield synapse.util.retryutils.get_retry_limiter(
destination,
self.clock,
@@ -308,6 +319,9 @@ class MatrixFederationHttpClient(object):
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
if not json_data_callback:
@@ -368,6 +382,9 @@ class MatrixFederationHttpClient(object):
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
def body_callback(method, url_bytes, headers_dict):
@@ -422,6 +439,9 @@ class MatrixFederationHttpClient(object):
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
logger.debug("get_json args: %s", args)
@@ -475,6 +495,9 @@ class MatrixFederationHttpClient(object):
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
response = yield self._request(
@@ -518,6 +541,9 @@ class MatrixFederationHttpClient(object):
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
encoded_args = {}
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 269b65ca41..165c684d0d 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -93,6 +93,8 @@ response_db_txn_count = metrics.register_counter(
),
)
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
response_db_txn_duration = metrics.register_counter(
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
@@ -100,6 +102,10 @@ response_db_txn_duration = metrics.register_counter(
),
)
+# seconds spent waiting for a db connection, when processing this request
+response_db_sched_duration = metrics.register_counter(
+ "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
+)
_next_request_id = 0
@@ -316,15 +322,6 @@ class JsonResource(HttpServer, resource.Resource):
def _send_response(self, request, code, response_json_object,
response_code_message=None):
- # could alternatively use request.notifyFinish() and flip a flag when
- # the Deferred fires, but since the flag is RIGHT THERE it seems like
- # a waste.
- if request._disconnected:
- logger.warn(
- "Not sending response to request %s, already disconnected.",
- request)
- return
-
outgoing_responses_counter.inc(request.method, str(code))
# TODO: Only enable CORS for the requests that need it.
@@ -377,7 +374,10 @@ class RequestMetrics(object):
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
- context.db_txn_duration, request.method, self.name, tag
+ context.db_txn_duration_ms / 1000., request.method, self.name, tag
+ )
+ response_db_sched_duration.inc_by(
+ context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
@@ -400,6 +400,15 @@ class RootRedirect(resource.Resource):
def respond_with_json(request, code, json_object, send_cors=False,
response_code_message=None, pretty_print=False,
version_string="", canonical_json=True):
+ # could alternatively use request.notifyFinish() and flip a flag when
+ # the Deferred fires, but since the flag is RIGHT THERE it seems like
+ # a waste.
+ if request._disconnected:
+ logger.warn(
+ "Not sending response to request %s, already disconnected.",
+ request)
+ return
+
if pretty_print:
json_bytes = encode_pretty_printed_json(json_object) + "\n"
else:
diff --git a/synapse/http/site.py b/synapse/http/site.py
index cd1492b1c3..e422c8dfae 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -66,14 +66,15 @@ class SynapseRequest(Request):
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
db_txn_count = context.db_txn_count
- db_txn_duration = context.db_txn_duration
+ db_txn_duration_ms = context.db_txn_duration_ms
+ db_sched_duration_ms = context.db_sched_duration_ms
except Exception:
ru_utime, ru_stime = (0, 0)
- db_txn_count, db_txn_duration = (0, 0)
+ db_txn_count, db_txn_duration_ms = (0, 0)
self.site.access_logger.info(
"%s - %s - {%s}"
- " Processed request: %dms (%dms, %dms) (%dms/%d)"
+ " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
@@ -81,7 +82,8 @@ class SynapseRequest(Request):
int(time.time() * 1000) - self.start_time,
int(ru_utime * 1000),
int(ru_stime * 1000),
- int(db_txn_duration * 1000),
+ db_sched_duration_ms,
+ db_txn_duration_ms,
int(db_txn_count),
self.sentLength,
self.code,
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 2265e6e8d6..e0cfb7d08f 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -146,10 +146,15 @@ def runUntilCurrentTimer(func):
num_pending += 1
num_pending += len(reactor.threadCallQueue)
-
start = time.time() * 1000
ret = func(*args, **kwargs)
end = time.time() * 1000
+
+ # record the amount of wallclock time spent running pending calls.
+ # This is a proxy for the actual amount of time between reactor polls,
+ # since about 25% of time is actually spent running things triggered by
+ # I/O events, but that is harder to capture without rewriting half the
+ # reactor.
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index f480aae614..1e783e5ff4 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -15,6 +15,9 @@
from itertools import chain
+import logging
+
+logger = logging.getLogger(__name__)
def flatten(items):
@@ -153,7 +156,11 @@ class CallbackMetric(BaseMetric):
self.callback = callback
def render(self):
- value = self.callback()
+ try:
+ value = self.callback()
+ except Exception:
+ logger.exception("Failed to render %s", self.name)
+ return ["# FAILED to render " + self.name]
if self.is_scalar():
return list(self._render_for_labels([], value))
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index c16f61452c..2cbac571b8 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,21 +13,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-from synapse.push import PusherConfigException
+import logging
from twisted.internet import defer, reactor
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-import logging
import push_rule_evaluator
import push_tools
-
+import synapse
+from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+http_push_processed_counter = metrics.register_counter(
+ "http_pushes_processed",
+)
+
+http_push_failed_counter = metrics.register_counter(
+ "http_pushes_failed",
+)
+
class HttpPusher(object):
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
@@ -152,9 +161,16 @@ class HttpPusher(object):
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
+ logger.info(
+ "Processing %i unprocessed push actions for %s starting at "
+ "stream_ordering %s",
+ len(unprocessed), self.name, self.last_stream_ordering,
+ )
+
for push_action in unprocessed:
processed = yield self._process_one(push_action)
if processed:
+ http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
yield self.store.update_pusher_last_stream_ordering_and_success(
@@ -169,6 +185,7 @@ class HttpPusher(object):
self.failing_since
)
else:
+ http_push_failed_counter.inc()
if not self.failing_since:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
@@ -316,7 +333,10 @@ class HttpPusher(object):
try:
resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
except Exception:
- logger.warn("Failed to push %s ", self.url)
+ logger.warn(
+ "Failed to push event %s to %s",
+ event.event_id, self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
@@ -325,7 +345,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
- logger.info("Sending updated badge count %d to %r", badge, self.user_id)
+ logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
'id': '',
@@ -347,7 +367,10 @@ class HttpPusher(object):
try:
resp = yield self.http_client.post_json_get_json(self.url, d)
except Exception:
- logger.exception("Failed to push %s ", self.url)
+ logger.warn(
+ "Failed to send badge count to %s",
+ self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 5022808ea9..0615e5d807 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -289,6 +289,27 @@ class QuarantineMediaInRoom(ClientV1RestServlet):
defer.returnValue((200, {"num_quarantined": num_quarantined}))
+class ListMediaInRoom(ClientV1RestServlet):
+ """Lists all of the media in a given room.
+ """
+ PATTERNS = client_path_patterns("/admin/room/(?P<room_id>[^/]+)/media")
+
+ def __init__(self, hs):
+ super(ListMediaInRoom, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+ is_admin = yield self.auth.is_server_admin(requester.user)
+ if not is_admin:
+ raise AuthError(403, "You are not a server admin")
+
+ local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
+
+ defer.returnValue((200, {"local": local_mxcs, "remote": remote_mxcs}))
+
+
class ResetPasswordRestServlet(ClientV1RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
@@ -487,3 +508,4 @@ def register_servlets(hs, http_server):
SearchUsersRestServlet(hs).register(http_server)
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
+ ListMediaInRoom(hs).register(http_server)
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py
index 32ed1d3ab2..5c5fa8f7ab 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1/register.py
@@ -70,10 +70,15 @@ class RegisterRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers()
def on_GET(self, request):
+
+ require_email = 'email' in self.hs.config.registrations_require_3pid
+ require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid
+
+ flows = []
if self.hs.config.enable_registration_captcha:
- return (
- 200,
- {"flows": [
+ # only support the email-only flow if we don't require MSISDN 3PIDs
+ if not require_msisdn:
+ flows.extend([
{
"type": LoginType.RECAPTCHA,
"stages": [
@@ -82,27 +87,34 @@ class RegisterRestServlet(ClientV1RestServlet):
LoginType.PASSWORD
]
},
+ ])
+ # only support 3PIDless registration if no 3PIDs are required
+ if not require_email and not require_msisdn:
+ flows.extend([
{
"type": LoginType.RECAPTCHA,
"stages": [LoginType.RECAPTCHA, LoginType.PASSWORD]
}
- ]}
- )
+ ])
else:
- return (
- 200,
- {"flows": [
+ # only support the email-only flow if we don't require MSISDN 3PIDs
+ if require_email or not require_msisdn:
+ flows.extend([
{
"type": LoginType.EMAIL_IDENTITY,
"stages": [
LoginType.EMAIL_IDENTITY, LoginType.PASSWORD
]
- },
+ }
+ ])
+ # only support 3PIDless registration if no 3PIDs are required
+ if not require_email and not require_msisdn:
+ flows.extend([
{
"type": LoginType.PASSWORD
}
- ]}
- )
+ ])
+ return (200, {"flows": flows})
@defer.inlineCallbacks
def on_POST(self, request):
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 682a0af9fc..867ec8602c 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -195,15 +195,20 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
content = parse_json_object_from_request(request)
+ event_dict = {
+ "type": event_type,
+ "content": content,
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ }
+
+ if 'ts' in request.args and requester.app_service:
+ event_dict['origin_server_ts'] = parse_integer(request, "ts", 0)
+
msg_handler = self.handlers.message_handler
event = yield msg_handler.create_and_send_nonmember_event(
requester,
- {
- "type": event_type,
- "content": content,
- "room_id": room_id,
- "sender": requester.user.to_string(),
- },
+ event_dict,
txn_id=txn_id,
)
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 385a3ad2ec..30523995af 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -26,6 +26,7 @@ from synapse.http.servlet import (
)
from synapse.util.async import run_on_reactor
from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.threepids import check_3pid_allowed
from ._base import client_v2_patterns, interactive_auth_handler
logger = logging.getLogger(__name__)
@@ -47,6 +48,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
'id_server', 'client_secret', 'email', 'send_attempt'
])
+ if not check_3pid_allowed(self.hs, "email", body['email']):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ )
+
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
'email', body['email']
)
@@ -78,6 +84,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet):
msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+ if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ )
+
existingUid = yield self.datastore.get_user_id_by_threepid(
'msisdn', msisdn
)
@@ -217,6 +228,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
if absent:
raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+ if not check_3pid_allowed(self.hs, "email", body['email']):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ )
+
existingUid = yield self.datastore.get_user_id_by_threepid(
'email', body['email']
)
@@ -255,6 +271,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+ if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ )
+
existingUid = yield self.datastore.get_user_id_by_threepid(
'msisdn', msisdn
)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index e9d88a8895..c6f4680a76 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -26,6 +26,7 @@ from synapse.http.servlet import (
RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string
)
from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.threepids import check_3pid_allowed
from ._base import client_v2_patterns, interactive_auth_handler
@@ -70,6 +71,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
'id_server', 'client_secret', 'email', 'send_attempt'
])
+ if not check_3pid_allowed(self.hs, "email", body['email']):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ )
+
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
'email', body['email']
)
@@ -105,6 +111,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+ if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED,
+ )
+
existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
'msisdn', msisdn
)
@@ -305,31 +316,67 @@ class RegisterRestServlet(RestServlet):
if 'x_show_msisdn' in body and body['x_show_msisdn']:
show_msisdn = True
+ # FIXME: need a better error than "no auth flow found" for scenarios
+ # where we required 3PID for registration but the user didn't give one
+ require_email = 'email' in self.hs.config.registrations_require_3pid
+ require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid
+
+ flows = []
if self.hs.config.enable_registration_captcha:
- flows = [
- [LoginType.RECAPTCHA],
- [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
- ]
+ # only support 3PIDless registration if no 3PIDs are required
+ if not require_email and not require_msisdn:
+ flows.extend([[LoginType.RECAPTCHA]])
+ # only support the email-only flow if we don't require MSISDN 3PIDs
+ if not require_msisdn:
+ flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]])
+
if show_msisdn:
+ # only support the MSISDN-only flow if we don't require email 3PIDs
+ if not require_email:
+ flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]])
+ # always let users provide both MSISDN & email
flows.extend([
- [LoginType.MSISDN, LoginType.RECAPTCHA],
[LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
])
else:
- flows = [
- [LoginType.DUMMY],
- [LoginType.EMAIL_IDENTITY],
- ]
+ # only support 3PIDless registration if no 3PIDs are required
+ if not require_email and not require_msisdn:
+ flows.extend([[LoginType.DUMMY]])
+ # only support the email-only flow if we don't require MSISDN 3PIDs
+ if not require_msisdn:
+ flows.extend([[LoginType.EMAIL_IDENTITY]])
+
if show_msisdn:
+ # only support the MSISDN-only flow if we don't require email 3PIDs
+ if not require_email or require_msisdn:
+ flows.extend([[LoginType.MSISDN]])
+ # always let users provide both MSISDN & email
flows.extend([
- [LoginType.MSISDN],
- [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
+ [LoginType.MSISDN, LoginType.EMAIL_IDENTITY]
])
auth_result, params, session_id = yield self.auth_handler.check_auth(
flows, body, self.hs.get_ip_from_request(request)
)
+ # Check that we're not trying to register a denied 3pid.
+ #
+ # the user-facing checks will probably already have happened in
+ # /register/email/requestToken when we requested a 3pid, but that's not
+ # guaranteed.
+
+ if auth_result:
+ for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]:
+ if login_type in auth_result:
+ medium = auth_result[login_type]['medium']
+ address = auth_result[login_type]['address']
+
+ if not check_3pid_allowed(self.hs, medium, address):
+ raise SynapseError(
+ 403, "Third party identifier is not allowed",
+ Codes.THREEPID_DENIED,
+ )
+
if registered_user_id is not None:
logger.info(
"Already registered user ID %r for this session",
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index cc2842aa72..17e6079cba 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -93,6 +93,7 @@ class RemoteKey(Resource):
self.store = hs.get_datastore()
self.version_string = hs.version_string
self.clock = hs.get_clock()
+ self.federation_domain_whitelist = hs.config.federation_domain_whitelist
def render_GET(self, request):
self.async_render_GET(request)
@@ -137,6 +138,13 @@ class RemoteKey(Resource):
logger.info("Handling query for keys %r", query)
store_queries = []
for server_name, key_ids in query.items():
+ if (
+ self.federation_domain_whitelist is not None and
+ server_name not in self.federation_domain_whitelist
+ ):
+ logger.debug("Federation denied with %s", server_name)
+ continue
+
if not key_ids:
key_ids = (None,)
for key_id in key_ids:
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 578d7f07c5..bb79599379 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -27,15 +27,14 @@ from .identicon_resource import IdenticonResource
from .preview_url_resource import PreviewUrlResource
from .filepath import MediaFilePaths
from .thumbnailer import Thumbnailer
-from .storage_provider import (
- StorageProviderWrapper, FileStorageProviderBackend,
-)
+from .storage_provider import StorageProviderWrapper
from .media_storage import MediaStorage
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.util.stringutils import random_string
-from synapse.api.errors import SynapseError, HttpResponseException, \
- NotFoundError
+from synapse.api.errors import (
+ SynapseError, HttpResponseException, NotFoundError, FederationDeniedError,
+)
from synapse.util.async import Linearizer
from synapse.util.stringutils import is_ascii
@@ -53,7 +52,7 @@ import urlparse
logger = logging.getLogger(__name__)
-UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000
+UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
class MediaRepository(object):
@@ -75,22 +74,21 @@ class MediaRepository(object):
self.remote_media_linearizer = Linearizer(name="media_remote")
self.recently_accessed_remotes = set()
+ self.recently_accessed_locals = set()
+
+ self.federation_domain_whitelist = hs.config.federation_domain_whitelist
# List of StorageProviders where we should search for media and
# potentially upload to.
storage_providers = []
- # TODO: Move this into config and allow other storage providers to be
- # defined.
- if hs.config.backup_media_store_path:
- backend = FileStorageProviderBackend(
- self.primary_base_path, hs.config.backup_media_store_path,
- )
+ for clz, provider_config, wrapper_config in hs.config.media_storage_providers:
+ backend = clz(hs, provider_config)
provider = StorageProviderWrapper(
backend,
- store=True,
- store_synchronous=hs.config.synchronous_backup_media_store,
- store_remote=True,
+ store_local=wrapper_config.store_local,
+ store_remote=wrapper_config.store_remote,
+ store_synchronous=wrapper_config.store_synchronous,
)
storage_providers.append(provider)
@@ -99,19 +97,34 @@ class MediaRepository(object):
)
self.clock.looping_call(
- self._update_recently_accessed_remotes,
- UPDATE_RECENTLY_ACCESSED_REMOTES_TS
+ self._update_recently_accessed,
+ UPDATE_RECENTLY_ACCESSED_TS,
)
@defer.inlineCallbacks
- def _update_recently_accessed_remotes(self):
- media = self.recently_accessed_remotes
+ def _update_recently_accessed(self):
+ remote_media = self.recently_accessed_remotes
self.recently_accessed_remotes = set()
+ local_media = self.recently_accessed_locals
+ self.recently_accessed_locals = set()
+
yield self.store.update_cached_last_access_time(
- media, self.clock.time_msec()
+ local_media, remote_media, self.clock.time_msec()
)
+ def mark_recently_accessed(self, server_name, media_id):
+ """Mark the given media as recently accessed.
+
+ Args:
+ server_name (str|None): Origin server of media, or None if local
+ media_id (str): The media ID of the content
+ """
+ if server_name:
+ self.recently_accessed_remotes.add((server_name, media_id))
+ else:
+ self.recently_accessed_locals.add(media_id)
+
@defer.inlineCallbacks
def create_content(self, media_type, upload_name, content, content_length,
auth_user):
@@ -173,6 +186,8 @@ class MediaRepository(object):
respond_404(request)
return
+ self.mark_recently_accessed(None, media_id)
+
media_type = media_info["media_type"]
media_length = media_info["media_length"]
upload_name = name if name else media_info["upload_name"]
@@ -204,7 +219,13 @@ class MediaRepository(object):
Deferred: Resolves once a response has successfully been written
to request
"""
- self.recently_accessed_remotes.add((server_name, media_id))
+ if (
+ self.federation_domain_whitelist is not None and
+ server_name not in self.federation_domain_whitelist
+ ):
+ raise FederationDeniedError(server_name)
+
+ self.mark_recently_accessed(server_name, media_id)
# We linearize here to ensure that we don't try and download remote
# media multiple times concurrently
@@ -238,6 +259,12 @@ class MediaRepository(object):
Returns:
Deferred[dict]: The media_info of the file
"""
+ if (
+ self.federation_domain_whitelist is not None and
+ server_name not in self.federation_domain_whitelist
+ ):
+ raise FederationDeniedError(server_name)
+
# We linearize here to ensure that we don't try and download remote
# media multiple times concurrently
key = (server_name, media_id)
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 5c3e4e5a65..e8e8b3986d 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -197,6 +197,14 @@ class MediaStorage(object):
str
"""
if file_info.url_cache:
+ if file_info.thumbnail:
+ return self.filepaths.url_cache_thumbnail_rel(
+ media_id=file_info.file_id,
+ width=file_info.thumbnail_width,
+ height=file_info.thumbnail_height,
+ content_type=file_info.thumbnail_type,
+ method=file_info.thumbnail_method,
+ )
return self.filepaths.url_cache_filepath_rel(file_info.file_id)
if file_info.server_name:
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 981f01e417..31fe7aa75c 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -12,6 +12,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import cgi
+import datetime
+import errno
+import fnmatch
+import itertools
+import logging
+import os
+import re
+import shutil
+import sys
+import traceback
+import ujson as json
+import urlparse
from twisted.web.server import NOT_DONE_YET
from twisted.internet import defer
@@ -33,18 +46,6 @@ from synapse.http.server import (
from synapse.util.async import ObservableDeferred
from synapse.util.stringutils import is_ascii
-import os
-import re
-import fnmatch
-import cgi
-import ujson as json
-import urlparse
-import itertools
-import datetime
-import errno
-import shutil
-
-import logging
logger = logging.getLogger(__name__)
@@ -286,17 +287,28 @@ class PreviewUrlResource(Resource):
url_cache=True,
)
- try:
- with self.media_storage.store_into_file(file_info) as (f, fname, finish):
+ with self.media_storage.store_into_file(file_info) as (f, fname, finish):
+ try:
logger.debug("Trying to get url '%s'" % url)
length, headers, uri, code = yield self.client.get_file(
url, output_stream=f, max_size=self.max_spider_size,
)
+ except Exception as e:
# FIXME: pass through 404s and other error messages nicely
+ logger.warn("Error downloading %s: %r", url, e)
+ raise SynapseError(
+ 500, "Failed to download content: %s" % (
+ traceback.format_exception_only(sys.exc_type, e),
+ ),
+ Codes.UNKNOWN,
+ )
+ yield finish()
- yield finish()
-
- media_type = headers["Content-Type"][0]
+ try:
+ if "Content-Type" in headers:
+ media_type = headers["Content-Type"][0]
+ else:
+ media_type = "application/octet-stream"
time_now_ms = self.clock.time_msec()
content_disposition = headers.get("Content-Disposition", None)
@@ -336,10 +348,11 @@ class PreviewUrlResource(Resource):
)
except Exception as e:
- raise SynapseError(
- 500, ("Failed to download content: %s" % e),
- Codes.UNKNOWN
- )
+ logger.error("Error handling downloaded %s: %r", url, e)
+ # TODO: we really ought to delete the downloaded file in this
+ # case, since we won't have recorded it in the db, and will
+ # therefore not expire it.
+ raise
defer.returnValue({
"media_type": media_type,
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 2ad602e101..c188192f2b 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -17,6 +17,7 @@ from twisted.internet import defer, threads
from .media_storage import FileResponder
+from synapse.config._base import Config
from synapse.util.logcontext import preserve_fn
import logging
@@ -64,19 +65,19 @@ class StorageProviderWrapper(StorageProvider):
Args:
backend (StorageProvider)
- store (bool): Whether to store new files or not.
+ store_local (bool): Whether to store new local files or not.
store_synchronous (bool): Whether to wait for file to be successfully
uploaded, or todo the upload in the backgroud.
store_remote (bool): Whether remote media should be uploaded
"""
- def __init__(self, backend, store, store_synchronous, store_remote):
+ def __init__(self, backend, store_local, store_synchronous, store_remote):
self.backend = backend
- self.store = store
+ self.store_local = store_local
self.store_synchronous = store_synchronous
self.store_remote = store_remote
def store_file(self, path, file_info):
- if not self.store:
+ if not file_info.server_name and not self.store_local:
return defer.succeed(None)
if file_info.server_name and not self.store_remote:
@@ -97,13 +98,13 @@ class FileStorageProviderBackend(StorageProvider):
"""A storage provider that stores files in a directory on a filesystem.
Args:
- cache_directory (str): Base path of the local media repository
- base_directory (str): Base path to store new files
+ hs (HomeServer)
+ config: The config returned by `parse_config`.
"""
- def __init__(self, cache_directory, base_directory):
- self.cache_directory = cache_directory
- self.base_directory = base_directory
+ def __init__(self, hs, config):
+ self.cache_directory = hs.config.media_store_path
+ self.base_directory = config
def store_file(self, path, file_info):
"""See StorageProvider.store_file"""
@@ -125,3 +126,15 @@ class FileStorageProviderBackend(StorageProvider):
backup_fname = os.path.join(self.base_directory, path)
if os.path.isfile(backup_fname):
return FileResponder(open(backup_fname, "rb"))
+
+ @staticmethod
+ def parse_config(config):
+ """Called on startup to parse config supplied. This should parse
+ the config and raise if there is a problem.
+
+ The returned value is passed into the constructor.
+
+ In this case we only care about a single param, the directory, so let's
+ just pull that out.
+ """
+ return Config.ensure_directory(config["directory"])
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index 49e4af514c..58ada49711 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -67,6 +67,7 @@ class ThumbnailResource(Resource):
yield self._respond_local_thumbnail(
request, media_id, width, height, method, m_type
)
+ self.media_repo.mark_recently_accessed(None, media_id)
else:
if self.dynamic_thumbnails:
yield self._select_or_generate_remote_thumbnail(
@@ -78,6 +79,7 @@ class ThumbnailResource(Resource):
request, server_name, media_id,
width, height, method, m_type
)
+ self.media_repo.mark_recently_accessed(server_name, media_id)
@defer.inlineCallbacks
def _respond_local_thumbnail(self, request, media_id, width, height,
diff --git a/synapse/server.py b/synapse/server.py
index 99693071b6..3173aed1d0 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -66,7 +66,7 @@ from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
)
-from synapse.state import StateHandler
+from synapse.state import StateHandler, StateResolutionHandler
from synapse.storage import DataStore
from synapse.streams.events import EventSources
from synapse.util import Clock
@@ -102,6 +102,7 @@ class HomeServer(object):
'v1auth',
'auth',
'state_handler',
+ 'state_resolution_handler',
'presence_handler',
'sync_handler',
'typing_handler',
@@ -224,6 +225,9 @@ class HomeServer(object):
def build_state_handler(self):
return StateHandler(self)
+ def build_state_resolution_handler(self):
+ return StateResolutionHandler(self)
+
def build_presence_handler(self):
return PresenceHandler(self)
@@ -307,6 +311,23 @@ class HomeServer(object):
**self.db_config.get("args", {})
)
+ def get_db_conn(self, run_new_connection=True):
+ """Makes a new connection to the database, skipping the db pool
+
+ Returns:
+ Connection: a connection object implementing the PEP-249 spec
+ """
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
def build_media_repository_resource(self):
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 41416ef252..c3a9a3847b 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -34,6 +34,9 @@ class HomeServer(object):
def get_state_handler(self) -> synapse.state.StateHandler:
pass
+ def get_state_resolution_handler(self) -> synapse.state.StateResolutionHandler:
+ pass
+
def get_deactivate_account_handler(self) -> synapse.handlers.deactivate_account.DeactivateAccountHandler:
pass
diff --git a/synapse/state.py b/synapse/state.py
index 9e624b4937..273f9911ca 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -58,7 +58,11 @@ class _StateCacheEntry(object):
__slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"]
def __init__(self, state, state_group, prev_group=None, delta_ids=None):
+ # dict[(str, str), str] map from (type, state_key) to event_id
self.state = frozendict(state)
+
+ # the ID of a state group if one and only one is involved.
+ # otherwise, None otherwise?
self.state_group = state_group
self.prev_group = prev_group
@@ -81,31 +85,19 @@ class _StateCacheEntry(object):
class StateHandler(object):
- """ Responsible for doing state conflict resolution.
+ """Fetches bits of state from the stores, and does state resolution
+ where necessary
"""
def __init__(self, hs):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
self.hs = hs
-
- # dict of set of event_ids -> _StateCacheEntry.
- self._state_cache = None
- self.resolve_linearizer = Linearizer(name="state_resolve_lock")
+ self._state_resolution_handler = hs.get_state_resolution_handler()
def start_caching(self):
- logger.debug("start_caching")
-
- self._state_cache = ExpiringCache(
- cache_name="state_cache",
- clock=self.clock,
- max_len=SIZE_OF_CACHE,
- expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
- iterable=True,
- reset_expiry_on_get=True,
- )
-
- self._state_cache.start()
+ # TODO: remove this shim
+ self._state_resolution_handler.start_caching()
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key="",
@@ -127,7 +119,7 @@ class StateHandler(object):
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_state")
- ret = yield self.resolve_state_groups(room_id, latest_event_ids)
+ ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
state = ret.state
if event_type:
@@ -146,19 +138,27 @@ class StateHandler(object):
defer.returnValue(state)
@defer.inlineCallbacks
- def get_current_state_ids(self, room_id, event_type=None, state_key="",
- latest_event_ids=None):
+ def get_current_state_ids(self, room_id, latest_event_ids=None):
+ """Get the current state, or the state at a set of events, for a room
+
+ Args:
+ room_id (str):
+
+ latest_event_ids (iterable[str]|None): if given, the forward
+ extremities to resolve. If None, we look them up from the
+ database (via a cache)
+
+ Returns:
+ Deferred[dict[(str, str), str)]]: the state dict, mapping from
+ (event_type, state_key) -> event_id
+ """
if not latest_event_ids:
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_state_ids")
- ret = yield self.resolve_state_groups(room_id, latest_event_ids)
+ ret = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
state = ret.state
- if event_type:
- defer.returnValue(state.get((event_type, state_key)))
- return
-
defer.returnValue(state)
@defer.inlineCallbacks
@@ -166,7 +166,7 @@ class StateHandler(object):
if not latest_event_ids:
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_user_in_room")
- entry = yield self.resolve_state_groups(room_id, latest_event_ids)
+ entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
joined_users = yield self.store.get_joined_users_from_state(room_id, entry)
defer.returnValue(joined_users)
@@ -175,7 +175,7 @@ class StateHandler(object):
if not latest_event_ids:
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
logger.debug("calling resolve_state_groups from get_current_hosts_in_room")
- entry = yield self.resolve_state_groups(room_id, latest_event_ids)
+ entry = yield self.resolve_state_groups_for_events(room_id, latest_event_ids)
joined_hosts = yield self.store.get_joined_hosts(room_id, entry)
defer.returnValue(joined_hosts)
@@ -233,7 +233,7 @@ class StateHandler(object):
defer.returnValue(context)
logger.debug("calling resolve_state_groups from compute_event_context")
- entry = yield self.resolve_state_groups(
+ entry = yield self.resolve_state_groups_for_events(
event.room_id, [e for e, _ in event.prev_events],
)
@@ -275,16 +275,16 @@ class StateHandler(object):
defer.returnValue(context)
@defer.inlineCallbacks
- @log_function
- def resolve_state_groups(self, room_id, event_ids):
+ def resolve_state_groups_for_events(self, room_id, event_ids):
""" Given a list of event_ids this method fetches the state at each
event, resolves conflicts between them and returns them.
+ Args:
+ room_id (str):
+ event_ids (list[str]):
+
Returns:
- a Deferred tuple of (`state_group`, `state`, `prev_state`).
- `state_group` is the name of a state group if one and only one is
- involved. `state` is a map from (type, state_key) to event, and
- `prev_state` is a list of event ids.
+ Deferred[_StateCacheEntry]: resolved state
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
@@ -295,13 +295,7 @@ class StateHandler(object):
room_id, event_ids
)
- logger.debug(
- "resolve_state_groups state_groups %s",
- state_groups_ids.keys()
- )
-
- group_names = frozenset(state_groups_ids.keys())
- if len(group_names) == 1:
+ if len(state_groups_ids) == 1:
name, state_list = state_groups_ids.items().pop()
prev_group, delta_ids = yield self.store.get_state_group_delta(name)
@@ -313,6 +307,92 @@ class StateHandler(object):
delta_ids=delta_ids,
))
+ result = yield self._state_resolution_handler.resolve_state_groups(
+ room_id, state_groups_ids, self._state_map_factory,
+ )
+ defer.returnValue(result)
+
+ def _state_map_factory(self, ev_ids):
+ return self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
+ )
+
+ def resolve_events(self, state_sets, event):
+ logger.info(
+ "Resolving state for %s with %d groups", event.room_id, len(state_sets)
+ )
+ state_set_ids = [{
+ (ev.type, ev.state_key): ev.event_id
+ for ev in st
+ } for st in state_sets]
+
+ state_map = {
+ ev.event_id: ev
+ for st in state_sets
+ for ev in st
+ }
+
+ with Measure(self.clock, "state._resolve_events"):
+ new_state = resolve_events_with_state_map(state_set_ids, state_map)
+
+ new_state = {
+ key: state_map[ev_id] for key, ev_id in new_state.items()
+ }
+
+ return new_state
+
+
+class StateResolutionHandler(object):
+ """Responsible for doing state conflict resolution.
+
+ Note that the storage layer depends on this handler, so all functions must
+ be storage-independent.
+ """
+ def __init__(self, hs):
+ self.clock = hs.get_clock()
+
+ # dict of set of event_ids -> _StateCacheEntry.
+ self._state_cache = None
+ self.resolve_linearizer = Linearizer(name="state_resolve_lock")
+
+ def start_caching(self):
+ logger.debug("start_caching")
+
+ self._state_cache = ExpiringCache(
+ cache_name="state_cache",
+ clock=self.clock,
+ max_len=SIZE_OF_CACHE,
+ expiry_ms=EVICTION_TIMEOUT_SECONDS * 1000,
+ iterable=True,
+ reset_expiry_on_get=True,
+ )
+
+ self._state_cache.start()
+
+ @defer.inlineCallbacks
+ @log_function
+ def resolve_state_groups(self, room_id, state_groups_ids, state_map_factory):
+ """Resolves conflicts between a set of state groups
+
+ Always generates a new state group (unless we hit the cache), so should
+ not be called for a single state group
+
+ Args:
+ room_id (str): room we are resolving for (used for logging)
+ state_groups_ids (dict[int, dict[(str, str), str]]):
+ map from state group id to the state in that state group
+ (where 'state' is a map from state key to event id)
+
+ Returns:
+ Deferred[_StateCacheEntry]: resolved state
+ """
+ logger.debug(
+ "resolve_state_groups state_groups %s",
+ state_groups_ids.keys()
+ )
+
+ group_names = frozenset(state_groups_ids.keys())
+
with (yield self.resolve_linearizer.queue(group_names)):
if self._state_cache is not None:
cache = self._state_cache.get(group_names, None)
@@ -341,17 +421,19 @@ class StateHandler(object):
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
- new_state = yield resolve_events(
+ new_state = yield resolve_events_with_factory(
state_groups_ids.values(),
- state_map_factory=lambda ev_ids: self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False,
- ),
+ state_map_factory=state_map_factory,
)
else:
new_state = {
key: e_ids.pop() for key, e_ids in state.items()
}
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
state_group = None
new_state_event_ids = frozenset(new_state.values())
for sg, events in state_groups_ids.items():
@@ -388,30 +470,6 @@ class StateHandler(object):
defer.returnValue(cache)
- def resolve_events(self, state_sets, event):
- logger.info(
- "Resolving state for %s with %d groups", event.room_id, len(state_sets)
- )
- state_set_ids = [{
- (ev.type, ev.state_key): ev.event_id
- for ev in st
- } for st in state_sets]
-
- state_map = {
- ev.event_id: ev
- for st in state_sets
- for ev in st
- }
-
- with Measure(self.clock, "state._resolve_events"):
- new_state = resolve_events(state_set_ids, state_map)
-
- new_state = {
- key: state_map[ev_id] for key, ev_id in new_state.items()
- }
-
- return new_state
-
def _ordered_events(events):
def key_func(e):
@@ -420,19 +478,17 @@ def _ordered_events(events):
return sorted(events, key=key_func)
-def resolve_events(state_sets, state_map_factory):
+def resolve_events_with_state_map(state_sets, state_map):
"""
Args:
state_sets(list): List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.
- state_map_factory(dict|callable): If callable, then will be called
- with a list of event_ids that are needed, and should return with
- a Deferred of dict of event_id to event. Otherwise, should be
- a dict from event_id to event of all events in state_sets.
+ state_map(dict): a dict from event_id to event, for all events in
+ state_sets.
Returns
- dict[(str, str), synapse.events.FrozenEvent] is a map from
- (type, state_key) to event.
+ dict[(str, str), str]:
+ a map from (type, state_key) to event_id.
"""
if len(state_sets) == 1:
return state_sets[0]
@@ -441,13 +497,6 @@ def resolve_events(state_sets, state_map_factory):
state_sets,
)
- if callable(state_map_factory):
- return _resolve_with_state_fac(
- unconflicted_state, conflicted_state, state_map_factory
- )
-
- state_map = state_map_factory
-
auth_events = _create_auth_events_from_maps(
unconflicted_state, conflicted_state, state_map
)
@@ -461,6 +510,21 @@ def _seperate(state_sets):
"""Takes the state_sets and figures out which keys are conflicted and
which aren't. i.e., which have multiple different event_ids associated
with them in different state sets.
+
+ Args:
+ state_sets(list[dict[(str, str), str]]):
+ List of dicts of (type, state_key) -> event_id, which are the
+ different state groups to resolve.
+
+ Returns:
+ (dict[(str, str), str], dict[(str, str), set[str]]):
+ A tuple of (unconflicted_state, conflicted_state), where:
+
+ unconflicted_state is a dict mapping (type, state_key)->event_id
+ for unconflicted state keys.
+
+ conflicted_state is a dict mapping (type, state_key) to a set of
+ event ids for conflicted state keys.
"""
unconflicted_state = dict(state_sets[0])
conflicted_state = {}
@@ -491,8 +555,26 @@ def _seperate(state_sets):
@defer.inlineCallbacks
-def _resolve_with_state_fac(unconflicted_state, conflicted_state,
- state_map_factory):
+def resolve_events_with_factory(state_sets, state_map_factory):
+ """
+ Args:
+ state_sets(list): List of dicts of (type, state_key) -> event_id,
+ which are the different state groups to resolve.
+ state_map_factory(func): will be called
+ with a list of event_ids that are needed, and should return with
+ a Deferred of dict of event_id to event.
+
+ Returns
+ Deferred[dict[(str, str), str]]:
+ a map from (type, state_key) to event_id.
+ """
+ if len(state_sets) == 1:
+ defer.returnValue(state_sets[0])
+
+ unconflicted_state, conflicted_state = _seperate(
+ state_sets,
+ )
+
needed_events = set(
event_id
for event_ids in conflicted_state.itervalues()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index b971f0cb18..68125006eb 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -291,33 +291,33 @@ class SQLBaseStore(object):
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
- """Wraps the .runInteraction() method on the underlying db_pool."""
- current_context = LoggingContext.current_context()
+ """Starts a transaction on the database and runs a given function
- start_time = time.time() * 1000
+ Arguments:
+ desc (str): description of the transaction, for logging and metrics
+ func (func): callback function, which will be called with a
+ database transaction (twisted.enterprise.adbapi.Transaction) as
+ its first argument, followed by `args` and `kwargs`.
+
+ args (list): positional args to pass to `func`
+ kwargs (dict): named args to pass to `func`
+
+ Returns:
+ Deferred: The result of func
+ """
+ current_context = LoggingContext.current_context()
after_callbacks = []
final_callbacks = []
def inner_func(conn, *args, **kwargs):
- with LoggingContext("runInteraction") as context:
- sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
-
- if self.database_engine.is_connection_closed(conn):
- logger.debug("Reconnecting closed database connection")
- conn.reconnect()
-
- current_context.copy_to(context)
- return self._new_transaction(
- conn, desc, after_callbacks, final_callbacks, current_context,
- func, *args, **kwargs
- )
+ return self._new_transaction(
+ conn, desc, after_callbacks, final_callbacks, current_context,
+ func, *args, **kwargs
+ )
try:
- with PreserveLoggingContext():
- result = yield self._db_pool.runWithConnection(
- inner_func, *args, **kwargs
- )
+ result = yield self.runWithConnection(inner_func, *args, **kwargs)
for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
@@ -329,14 +329,27 @@ class SQLBaseStore(object):
@defer.inlineCallbacks
def runWithConnection(self, func, *args, **kwargs):
- """Wraps the .runInteraction() method on the underlying db_pool."""
+ """Wraps the .runWithConnection() method on the underlying db_pool.
+
+ Arguments:
+ func (func): callback function, which will be called with a
+ database connection (twisted.enterprise.adbapi.Connection) as
+ its first argument, followed by `args` and `kwargs`.
+ args (list): positional args to pass to `func`
+ kwargs (dict): named args to pass to `func`
+
+ Returns:
+ Deferred: The result of func
+ """
current_context = LoggingContext.current_context()
start_time = time.time() * 1000
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context:
- sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
+ sched_duration_ms = time.time() * 1000 - start_time
+ sql_scheduling_timer.inc_by(sched_duration_ms)
+ current_context.add_database_scheduled(sched_duration_ms)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ad1d782705..dd28c2efe3 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,7 +27,7 @@ from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
-from synapse.state import resolve_events
+from synapse.state import resolve_events_with_factory
from synapse.util.caches.descriptors import cached
from synapse.types import get_domain_from_id
@@ -110,7 +110,7 @@ class _EventPeristenceQueue(object):
end_item.events_and_contexts.extend(events_and_contexts)
return end_item.deferred.observe()
- deferred = ObservableDeferred(defer.Deferred())
+ deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True)
queue.append(self._EventPersistQueueItem(
events_and_contexts=events_and_contexts,
@@ -152,8 +152,8 @@ class _EventPeristenceQueue(object):
try:
ret = yield per_item_callback(item)
item.deferred.callback(ret)
- except Exception as e:
- item.deferred.errback(e)
+ except Exception:
+ item.deferred.errback()
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
@@ -386,11 +386,18 @@ class EventsStore(SQLBaseStore):
if all_single_prev_not_state:
continue
- state = yield self._calculate_state_delta(
- room_id, ev_ctx_rm, new_latest_event_ids
+ logger.info(
+ "Calculating state delta for room %s", room_id,
)
- if state:
- current_state_for_room[room_id] = state
+ current_state = yield self._get_new_state_after_events(
+ ev_ctx_rm, new_latest_event_ids,
+ )
+ if current_state is not None:
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ if delta is not None:
+ current_state_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -467,20 +474,22 @@ class EventsStore(SQLBaseStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids):
- """Calculate the new state deltas for a room.
+ def _get_new_state_after_events(self, events_context, new_latest_event_ids):
+ """Calculate the current state dict after adding some new events to
+ a room
- Assumes that we are only persisting events for one room at a time.
+ Args:
+ events_context (list[(EventBase, EventContext)]):
+ events and contexts which are being added to the room
+
+ new_latest_event_ids (iterable[str]):
+ the new forward extremities for the room.
Returns:
- 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert. `new_state` is the full set of state.
- May return None if there are no changes to be applied.
+ Deferred[dict[(str,str), str]|None]:
+ None if there are no changes to the room state, or
+ a dict of (type, state_key) -> event_id].
"""
- # Now we need to work out the different state sets for
- # each state extremities
state_sets = []
state_groups = set()
missing_event_ids = []
@@ -523,18 +532,23 @@ class EventsStore(SQLBaseStore):
state_sets.extend(group_to_state.itervalues())
if not new_latest_event_ids:
- current_state = {}
+ defer.returnValue({})
elif was_updated:
if len(state_sets) == 1:
# If there is only one state set, then we know what the current
# state is.
- current_state = state_sets[0]
+ defer.returnValue(state_sets[0])
else:
# We work out the current state by passing the state sets to the
# state resolution algorithm. It may ask for some events, including
# the events we have yet to persist, so we need a slightly more
# complicated event lookup function than simply looking the events
# up in the db.
+
+ logger.info(
+ "Resolving state with %i state sets", len(state_sets),
+ )
+
events_map = {ev.event_id: ev for ev, _ in events_context}
@defer.inlineCallbacks
@@ -557,13 +571,26 @@ class EventsStore(SQLBaseStore):
to_return.update(evs)
defer.returnValue(to_return)
- current_state = yield resolve_events(
+ current_state = yield resolve_events_with_factory(
state_sets,
state_map_factory=get_events,
)
+ defer.returnValue(current_state)
else:
return
+ @defer.inlineCallbacks
+ def _calculate_state_delta(self, room_id, current_state):
+ """Calculate the new state deltas for a room.
+
+ Assumes that we are only persisting events for one room at a time.
+
+ Returns:
+ 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+ i.e. (type, state_key) -> event_id. `to_delete` are the entries to
+ first be deleted from current_state_events, `to_insert` are entries
+ to insert. `new_state` is the full set of state.
+ """
existing_state = yield self.get_current_state_ids(room_id)
existing_events = set(existing_state.itervalues())
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index 6ebc372498..e6cdbb0545 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -173,7 +173,14 @@ class MediaRepositoryStore(BackgroundUpdateStore):
desc="store_cached_remote_media",
)
- def update_cached_last_access_time(self, origin_id_tuples, time_ts):
+ def update_cached_last_access_time(self, local_media, remote_media, time_ms):
+ """Updates the last access time of the given media
+
+ Args:
+ local_media (iterable[str]): Set of media_ids
+ remote_media (iterable[(str, str)]): Set of (server_name, media_id)
+ time_ms: Current time in milliseconds
+ """
def update_cache_txn(txn):
sql = (
"UPDATE remote_media_cache SET last_access_ts = ?"
@@ -181,8 +188,18 @@ class MediaRepositoryStore(BackgroundUpdateStore):
)
txn.executemany(sql, (
- (time_ts, media_origin, media_id)
- for media_origin, media_id in origin_id_tuples
+ (time_ms, media_origin, media_id)
+ for media_origin, media_id in remote_media
+ ))
+
+ sql = (
+ "UPDATE local_media_repository SET last_access_ts = ?"
+ " WHERE media_id = ?"
+ )
+
+ txn.executemany(sql, (
+ (time_ms, media_id)
+ for media_id in local_media
))
return self.runInteraction("update_cached_last_access_time", update_cache_txn)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index d1691bbac2..c845a0cec5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 46
+SCHEMA_VERSION = 47
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 23688430b7..cf2c4dae39 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -533,73 +533,114 @@ class RoomStore(SQLBaseStore):
)
self.is_room_blocked.invalidate((room_id,))
+ def get_media_mxcs_in_room(self, room_id):
+ """Retrieves all the local and remote media MXC URIs in a given room
+
+ Args:
+ room_id (str)
+
+ Returns:
+ The local and remote media as a lists of tuples where the key is
+ the hostname and the value is the media ID.
+ """
+ def _get_media_mxcs_in_room_txn(txn):
+ local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
+ local_media_mxcs = []
+ remote_media_mxcs = []
+
+ # Convert the IDs to MXC URIs
+ for media_id in local_mxcs:
+ local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+ for hostname, media_id in remote_mxcs:
+ remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
+
+ return local_media_mxcs, remote_media_mxcs
+ return self.runInteraction("get_media_ids_in_room", _get_media_mxcs_in_room_txn)
+
def quarantine_media_ids_in_room(self, room_id, quarantined_by):
"""For a room loops through all events with media and quarantines
the associated media
"""
- def _get_media_ids_in_room(txn):
- mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
+ def _quarantine_media_in_room_txn(txn):
+ local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
+ total_media_quarantined = 0
- next_token = self.get_current_events_token() + 1
+ # Now update all the tables to set the quarantined_by flag
- total_media_quarantined = 0
+ txn.executemany("""
+ UPDATE local_media_repository
+ SET quarantined_by = ?
+ WHERE media_id = ?
+ """, ((quarantined_by, media_id) for media_id in local_mxcs))
- while next_token:
- sql = """
- SELECT stream_ordering, content FROM events
- WHERE room_id = ?
- AND stream_ordering < ?
- AND contains_url = ? AND outlier = ?
- ORDER BY stream_ordering DESC
- LIMIT ?
+ txn.executemany(
"""
- txn.execute(sql, (room_id, next_token, True, False, 100))
-
- next_token = None
- local_media_mxcs = []
- remote_media_mxcs = []
- for stream_ordering, content_json in txn:
- next_token = stream_ordering
- content = json.loads(content_json)
-
- content_url = content.get("url")
- thumbnail_url = content.get("info", {}).get("thumbnail_url")
-
- for url in (content_url, thumbnail_url):
- if not url:
- continue
- matches = mxc_re.match(url)
- if matches:
- hostname = matches.group(1)
- media_id = matches.group(2)
- if hostname == self.hostname:
- local_media_mxcs.append(media_id)
- else:
- remote_media_mxcs.append((hostname, media_id))
-
- # Now update all the tables to set the quarantined_by flag
-
- txn.executemany("""
- UPDATE local_media_repository
+ UPDATE remote_media_cache
SET quarantined_by = ?
- WHERE media_id = ?
- """, ((quarantined_by, media_id) for media_id in local_media_mxcs))
-
- txn.executemany(
- """
- UPDATE remote_media_cache
- SET quarantined_by = ?
- WHERE media_origin AND media_id = ?
- """,
- (
- (quarantined_by, origin, media_id)
- for origin, media_id in remote_media_mxcs
- )
+ WHERE media_origin = ? AND media_id = ?
+ """,
+ (
+ (quarantined_by, origin, media_id)
+ for origin, media_id in remote_mxcs
)
+ )
- total_media_quarantined += len(local_media_mxcs)
- total_media_quarantined += len(remote_media_mxcs)
+ total_media_quarantined += len(local_mxcs)
+ total_media_quarantined += len(remote_mxcs)
return total_media_quarantined
- return self.runInteraction("get_media_ids_in_room", _get_media_ids_in_room)
+ return self.runInteraction(
+ "quarantine_media_in_room",
+ _quarantine_media_in_room_txn,
+ )
+
+ def _get_media_mxcs_in_room_txn(self, txn, room_id):
+ """Retrieves all the local and remote media MXC URIs in a given room
+
+ Args:
+ txn (cursor)
+ room_id (str)
+
+ Returns:
+ The local and remote media as a lists of tuples where the key is
+ the hostname and the value is the media ID.
+ """
+ mxc_re = re.compile("^mxc://([^/]+)/([^/#?]+)")
+
+ next_token = self.get_current_events_token() + 1
+ local_media_mxcs = []
+ remote_media_mxcs = []
+
+ while next_token:
+ sql = """
+ SELECT stream_ordering, content FROM events
+ WHERE room_id = ?
+ AND stream_ordering < ?
+ AND contains_url = ? AND outlier = ?
+ ORDER BY stream_ordering DESC
+ LIMIT ?
+ """
+ txn.execute(sql, (room_id, next_token, True, False, 100))
+
+ next_token = None
+ for stream_ordering, content_json in txn:
+ next_token = stream_ordering
+ content = json.loads(content_json)
+
+ content_url = content.get("url")
+ thumbnail_url = content.get("info", {}).get("thumbnail_url")
+
+ for url in (content_url, thumbnail_url):
+ if not url:
+ continue
+ matches = mxc_re.match(url)
+ if matches:
+ hostname = matches.group(1)
+ media_id = matches.group(2)
+ if hostname == self.hostname:
+ local_media_mxcs.append(media_id)
+ else:
+ remote_media_mxcs.append((hostname, media_id))
+
+ return local_media_mxcs, remote_media_mxcs
diff --git a/synapse/storage/schema/delta/47/last_access_media.sql b/synapse/storage/schema/delta/47/last_access_media.sql
new file mode 100644
index 0000000000..f505fb22b5
--- /dev/null
+++ b/synapse/storage/schema/delta/47/last_access_media.sql
@@ -0,0 +1,16 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE local_media_repository ADD COLUMN last_access_ts BIGINT;
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index c9bff408ef..dfdcbb3181 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -641,8 +641,12 @@ class UserDirectoryStore(SQLBaseStore):
"""
if self.hs.config.user_directory_search_all_users:
- join_clause = ""
- where_clause = "?<>''" # naughty hack to keep the same number of binds
+ # make s.user_id null to keep the ordering algorithm happy
+ join_clause = """
+ CROSS JOIN (SELECT NULL as user_id) AS s
+ """
+ join_args = ()
+ where_clause = "1=1"
else:
join_clause = """
LEFT JOIN users_in_public_rooms AS p USING (user_id)
@@ -651,6 +655,7 @@ class UserDirectoryStore(SQLBaseStore):
WHERE user_id = ? AND share_private
) AS s USING (user_id)
"""
+ join_args = (user_id,)
where_clause = "(s.user_id IS NOT NULL OR p.user_id IS NOT NULL)"
if isinstance(self.database_engine, PostgresEngine):
@@ -692,7 +697,7 @@ class UserDirectoryStore(SQLBaseStore):
avatar_url IS NULL
LIMIT ?
""" % (join_clause, where_clause)
- args = (user_id, full_query, exact_query, prefix_query, limit + 1,)
+ args = join_args + (full_query, exact_query, prefix_query, limit + 1,)
elif isinstance(self.database_engine, Sqlite3Engine):
search_query = _parse_query_sqlite(search_term)
@@ -710,7 +715,7 @@ class UserDirectoryStore(SQLBaseStore):
avatar_url IS NULL
LIMIT ?
""" % (join_clause, where_clause)
- args = (user_id, search_query, limit + 1)
+ args = join_args + (search_query, limit + 1)
else:
# This should be unreachable.
raise Exception("Unrecognized database engine")
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 48c9f6802d..94fa7cac98 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -52,13 +52,17 @@ except Exception:
class LoggingContext(object):
"""Additional context for log formatting. Contexts are scoped within a
"with" block.
+
Args:
name (str): Name for the context for debugging.
"""
__slots__ = [
- "previous_context", "name", "usage_start", "usage_end", "main_thread",
- "__dict__", "tag", "alive",
+ "previous_context", "name", "ru_stime", "ru_utime",
+ "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+ "usage_start", "usage_end",
+ "main_thread", "alive",
+ "request", "tag",
]
thread_local = threading.local()
@@ -83,6 +87,9 @@ class LoggingContext(object):
def add_database_transaction(self, duration_ms):
pass
+ def add_database_scheduled(self, sched_ms):
+ pass
+
def __nonzero__(self):
return False
@@ -94,9 +101,17 @@ class LoggingContext(object):
self.ru_stime = 0.
self.ru_utime = 0.
self.db_txn_count = 0
- self.db_txn_duration = 0.
+
+ # ms spent waiting for db txns, excluding scheduling time
+ self.db_txn_duration_ms = 0
+
+ # ms spent waiting for db txns to be scheduled
+ self.db_sched_duration_ms = 0
+
self.usage_start = None
+ self.usage_end = None
self.main_thread = threading.current_thread()
+ self.request = None
self.tag = ""
self.alive = True
@@ -105,7 +120,11 @@ class LoggingContext(object):
@classmethod
def current_context(cls):
- """Get the current logging context from thread local storage"""
+ """Get the current logging context from thread local storage
+
+ Returns:
+ LoggingContext: the current logging context
+ """
return getattr(cls.thread_local, "current_context", cls.sentinel)
@classmethod
@@ -155,11 +174,13 @@ class LoggingContext(object):
self.alive = False
def copy_to(self, record):
- """Copy fields from this context to the record"""
- for key, value in self.__dict__.items():
- setattr(record, key, value)
+ """Copy logging fields from this context to a log record or
+ another LoggingContext
+ """
- record.ru_utime, record.ru_stime = self.get_resource_usage()
+ # 'request' is the only field we currently use in the logger, so that's
+ # all we need to copy
+ record.request = self.request
def start(self):
if threading.current_thread() is not self.main_thread:
@@ -194,7 +215,16 @@ class LoggingContext(object):
def add_database_transaction(self, duration_ms):
self.db_txn_count += 1
- self.db_txn_duration += duration_ms / 1000.
+ self.db_txn_duration_ms += duration_ms
+
+ def add_database_scheduled(self, sched_ms):
+ """Record a use of the database pool
+
+ Args:
+ sched_ms (int): number of milliseconds it took us to get a
+ connection
+ """
+ self.db_sched_duration_ms += sched_ms
class LoggingContextFilter(logging.Filter):
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 8d22ff3068..e4b5687a4b 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -28,7 +28,7 @@ logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
# total number of times we have hit this block
-response_count = metrics.register_counter(
+block_counter = metrics.register_counter(
"block_count",
labels=["block_name"],
alternative_names=(
@@ -72,13 +72,19 @@ block_db_txn_count = metrics.register_counter(
),
)
+# seconds spent waiting for db txns, excluding scheduling time, in this block
block_db_txn_duration = metrics.register_counter(
"block_db_txn_duration_seconds", labels=["block_name"],
alternative_names=(
- metrics.name_prefix + "_block_db_txn_count:total",
+ metrics.name_prefix + "_block_db_txn_duration:total",
),
)
+# seconds spent waiting for a db connection, in this block
+block_db_sched_duration = metrics.register_counter(
+ "block_db_sched_duration_seconds", labels=["block_name"],
+)
+
def measure_func(name):
def wrapper(func):
@@ -95,7 +101,9 @@ def measure_func(name):
class Measure(object):
__slots__ = [
"clock", "name", "start_context", "start", "new_context", "ru_utime",
- "ru_stime", "db_txn_count", "db_txn_duration", "created_context"
+ "ru_stime",
+ "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
+ "created_context",
]
def __init__(self, clock, name):
@@ -115,13 +123,16 @@ class Measure(object):
self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
self.db_txn_count = self.start_context.db_txn_count
- self.db_txn_duration = self.start_context.db_txn_duration
+ self.db_txn_duration_ms = self.start_context.db_txn_duration_ms
+ self.db_sched_duration_ms = self.start_context.db_sched_duration_ms
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return
duration = self.clock.time_msec() - self.start
+
+ block_counter.inc(self.name)
block_timer.inc_by(duration, self.name)
context = LoggingContext.current_context()
@@ -145,7 +156,12 @@ class Measure(object):
context.db_txn_count - self.db_txn_count, self.name
)
block_db_txn_duration.inc_by(
- context.db_txn_duration - self.db_txn_duration, self.name
+ (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.,
+ self.name
+ )
+ block_db_sched_duration.inc_by(
+ (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.,
+ self.name
)
if self.created_context:
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 1adedbb361..47b0bb5eb3 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -26,6 +26,18 @@ logger = logging.getLogger(__name__)
class NotRetryingDestination(Exception):
def __init__(self, retry_last_ts, retry_interval, destination):
+ """Raised by the limiter (and federation client) to indicate that we are
+ are deliberately not attempting to contact a given server.
+
+ Args:
+ retry_last_ts (int): the unix ts in milliseconds of our last attempt
+ to contact the server. 0 indicates that the last attempt was
+ successful or that we've never actually attempted to connect.
+ retry_interval (int): the time in milliseconds to wait until the next
+ attempt.
+ destination (str): the domain in question
+ """
+
msg = "Not retrying server %s." % (destination,)
super(NotRetryingDestination, self).__init__(msg)
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
new file mode 100644
index 0000000000..75efa0117b
--- /dev/null
+++ b/synapse/util/threepids.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def check_3pid_allowed(hs, medium, address):
+ """Checks whether a given format of 3PID is allowed to be used on this HS
+
+ Args:
+ hs (synapse.server.HomeServer): server
+ medium (str): 3pid medium - e.g. email, msisdn
+ address (str): address within that medium (e.g. "wotan@matrix.org")
+ msisdns need to first have been canonicalised
+ Returns:
+ bool: whether the 3PID medium/address is allowed to be added to this HS
+ """
+
+ if hs.config.allowed_local_3pids:
+ for constraint in hs.config.allowed_local_3pids:
+ logger.debug(
+ "Checking 3PID %s (%s) against %s (%s)",
+ address, medium, constraint['pattern'], constraint['medium'],
+ )
+ if (
+ medium == constraint['medium'] and
+ re.match(constraint['pattern'], address)
+ ):
+ return True
+ else:
+ return True
+
+ return False
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 570312da84..d4ec02ffc2 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -68,7 +68,7 @@ class KeyringTestCase(unittest.TestCase):
def check_context(self, _, expected):
self.assertEquals(
- getattr(LoggingContext.current_context(), "test_key", None),
+ getattr(LoggingContext.current_context(), "request", None),
expected
)
@@ -82,7 +82,7 @@ class KeyringTestCase(unittest.TestCase):
lookup_2_deferred = defer.Deferred()
with LoggingContext("one") as context_one:
- context_one.test_key = "one"
+ context_one.request = "one"
wait_1_deferred = kr.wait_for_previous_lookups(
["server1"],
@@ -96,7 +96,7 @@ class KeyringTestCase(unittest.TestCase):
wait_1_deferred.addBoth(self.check_context, "one")
with LoggingContext("two") as context_two:
- context_two.test_key = "two"
+ context_two.request = "two"
# set off another wait. It should block because the first lookup
# hasn't yet completed.
@@ -137,7 +137,7 @@ class KeyringTestCase(unittest.TestCase):
@defer.inlineCallbacks
def get_perspectives(**kwargs):
self.assertEquals(
- LoggingContext.current_context().test_key, "11",
+ LoggingContext.current_context().request, "11",
)
with logcontext.PreserveLoggingContext():
yield persp_deferred
@@ -145,7 +145,7 @@ class KeyringTestCase(unittest.TestCase):
self.http_client.post_json.side_effect = get_perspectives
with LoggingContext("11") as context_11:
- context_11.test_key = "11"
+ context_11.request = "11"
# start off a first set of lookups
res_deferreds = kr.verify_json_objects_for_server(
@@ -167,13 +167,13 @@ class KeyringTestCase(unittest.TestCase):
# wait a tick for it to send the request to the perspectives server
# (it first tries the datastore)
- yield async.sleep(0.005)
+ yield async.sleep(1) # XXX find out why this takes so long!
self.http_client.post_json.assert_called_once()
self.assertIs(LoggingContext.current_context(), context_11)
context_12 = LoggingContext("12")
- context_12.test_key = "12"
+ context_12.request = "12"
with logcontext.PreserveLoggingContext(context_12):
# a second request for a server with outstanding requests
# should block rather than start a second call
@@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
res_deferreds_2 = kr.verify_json_objects_for_server(
[("server10", json1)],
)
- yield async.sleep(0.005)
+ yield async.sleep(01)
self.http_client.post_json.assert_not_called()
res_deferreds_2[0].addBoth(self.check_context, None)
@@ -211,7 +211,7 @@ class KeyringTestCase(unittest.TestCase):
sentinel_context = LoggingContext.current_context()
with LoggingContext("one") as context_one:
- context_one.test_key = "one"
+ context_one.request = "one"
defer = kr.verify_json_for_server("server9", {})
try:
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 19f5ed6bce..d92bf240b1 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -143,7 +143,6 @@ class E2eKeysHandlerTestCase(unittest.TestCase):
except errors.SynapseError:
pass
- @unittest.DEBUG
@defer.inlineCallbacks
def test_claim_one_time_key(self):
local_user = "@boris:" + self.hs.hostname
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 81063f19a1..74f104e3b8 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -15,6 +15,8 @@
from twisted.internet import defer, reactor
from tests import unittest
+import tempfile
+
from mock import Mock, NonCallableMock
from tests.utils import setup_test_homeserver
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -41,7 +43,9 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
self.event_id = 0
server_factory = ReplicationStreamProtocolFactory(self.hs)
- listener = reactor.listenUNIX("\0xxx", server_factory)
+ # XXX: mktemp is unsafe and should never be used. but we're just a test.
+ path = tempfile.mktemp(prefix="base_slaved_store_test_case_socket")
+ listener = reactor.listenUNIX(path, server_factory)
self.addCleanup(listener.stopListening)
self.streamer = server_factory.streamer
@@ -49,7 +53,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)
- client_connector = reactor.connectUNIX("\0xxx", client_factory)
+ client_connector = reactor.connectUNIX(path, client_factory)
self.addCleanup(client_factory.stopTrying)
self.addCleanup(client_connector.disconnect)
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index 096f771bea..8aba456510 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -49,6 +49,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.hs.get_auth_handler = Mock(return_value=self.auth_handler)
self.hs.get_device_handler = Mock(return_value=self.device_handler)
self.hs.config.enable_registration = True
+ self.hs.config.registrations_require_3pid = []
self.hs.config.auto_join_rooms = []
# init the thing we're testing
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
new file mode 100644
index 0000000000..0891308f25
--- /dev/null
+++ b/tests/storage/test_user_directory.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.storage import UserDirectoryStore
+from synapse.storage.roommember import ProfileInfo
+from tests import unittest
+from tests.utils import setup_test_homeserver
+
+ALICE = "@alice:a"
+BOB = "@bob:b"
+BOBBY = "@bobby:a"
+
+
+class UserDirectoryStoreTestCase(unittest.TestCase):
+ @defer.inlineCallbacks
+ def setUp(self):
+ self.hs = yield setup_test_homeserver()
+ self.store = UserDirectoryStore(None, self.hs)
+
+ # alice and bob are both in !room_id. bobby is not but shares
+ # a homeserver with alice.
+ yield self.store.add_profiles_to_user_dir(
+ "!room:id",
+ {
+ ALICE: ProfileInfo(None, "alice"),
+ BOB: ProfileInfo(None, "bob"),
+ BOBBY: ProfileInfo(None, "bobby")
+ },
+ )
+ yield self.store.add_users_to_public_room(
+ "!room:id",
+ [ALICE, BOB],
+ )
+ yield self.store.add_users_who_share_room(
+ "!room:id",
+ False,
+ (
+ (ALICE, BOB),
+ (BOB, ALICE),
+ ),
+ )
+
+ @defer.inlineCallbacks
+ def test_search_user_dir(self):
+ # normally when alice searches the directory she should just find
+ # bob because bobby doesn't share a room with her.
+ r = yield self.store.search_user_dir(ALICE, "bob", 10)
+ self.assertFalse(r["limited"])
+ self.assertEqual(1, len(r["results"]))
+ self.assertDictEqual(r["results"][0], {
+ "user_id": BOB,
+ "display_name": "bob",
+ "avatar_url": None,
+ })
+
+ @defer.inlineCallbacks
+ def test_search_user_dir_all_users(self):
+ self.hs.config.user_directory_search_all_users = True
+ try:
+ r = yield self.store.search_user_dir(ALICE, "bob", 10)
+ self.assertFalse(r["limited"])
+ self.assertEqual(2, len(r["results"]))
+ self.assertDictEqual(r["results"][0], {
+ "user_id": BOB,
+ "display_name": "bob",
+ "avatar_url": None,
+ })
+ self.assertDictEqual(r["results"][1], {
+ "user_id": BOBBY,
+ "display_name": "bobby",
+ "avatar_url": None,
+ })
+ finally:
+ self.hs.config.user_directory_search_all_users = False
diff --git a/tests/test_state.py b/tests/test_state.py
index feb84f3d48..d16e1b3b8b 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -19,7 +19,7 @@ from twisted.internet import defer
from synapse.events import FrozenEvent
from synapse.api.auth import Auth
from synapse.api.constants import EventTypes, Membership
-from synapse.state import StateHandler
+from synapse.state import StateHandler, StateResolutionHandler
from .utils import MockClock
@@ -148,11 +148,13 @@ class StateTestCase(unittest.TestCase):
)
hs = Mock(spec_set=[
"get_datastore", "get_auth", "get_state_handler", "get_clock",
+ "get_state_resolution_handler",
])
hs.get_datastore.return_value = self.store
hs.get_state_handler.return_value = None
hs.get_clock.return_value = MockClock()
hs.get_auth.return_value = Auth(hs)
+ hs.get_state_resolution_handler = lambda: StateResolutionHandler(hs)
self.store.get_next_state_group.side_effect = Mock
self.store.get_state_group_delta.return_value = (None, None)
diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py
index e2f7765f49..4850722bc5 100644
--- a/tests/util/test_logcontext.py
+++ b/tests/util/test_logcontext.py
@@ -12,12 +12,12 @@ class LoggingContextTestCase(unittest.TestCase):
def _check_test_key(self, value):
self.assertEquals(
- LoggingContext.current_context().test_key, value
+ LoggingContext.current_context().request, value
)
def test_with_context(self):
with LoggingContext() as context_one:
- context_one.test_key = "test"
+ context_one.request = "test"
self._check_test_key("test")
@defer.inlineCallbacks
@@ -25,14 +25,14 @@ class LoggingContextTestCase(unittest.TestCase):
@defer.inlineCallbacks
def competing_callback():
with LoggingContext() as competing_context:
- competing_context.test_key = "competing"
+ competing_context.request = "competing"
yield sleep(0)
self._check_test_key("competing")
reactor.callLater(0, competing_callback)
with LoggingContext() as context_one:
- context_one.test_key = "one"
+ context_one.request = "one"
yield sleep(0)
self._check_test_key("one")
@@ -43,14 +43,14 @@ class LoggingContextTestCase(unittest.TestCase):
@defer.inlineCallbacks
def cb():
- context_one.test_key = "one"
+ context_one.request = "one"
yield function()
self._check_test_key("one")
callback_completed[0] = True
with LoggingContext() as context_one:
- context_one.test_key = "one"
+ context_one.request = "one"
# fire off function, but don't wait on it.
logcontext.preserve_fn(cb)()
@@ -107,7 +107,7 @@ class LoggingContextTestCase(unittest.TestCase):
sentinel_context = LoggingContext.current_context()
with LoggingContext() as context_one:
- context_one.test_key = "one"
+ context_one.request = "one"
d1 = logcontext.make_deferred_yieldable(blocking_function())
# make sure that the context was reset by make_deferred_yieldable
@@ -124,7 +124,7 @@ class LoggingContextTestCase(unittest.TestCase):
argument isn't actually a deferred"""
with LoggingContext() as context_one:
- context_one.test_key = "one"
+ context_one.request = "one"
d1 = logcontext.make_deferred_yieldable("bum")
self._check_test_key("one")
diff --git a/tests/utils.py b/tests/utils.py
index 44e5f75093..8efd3a3475 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -13,27 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.http.server import HttpServer
-from synapse.api.errors import cs_error, CodeMessageException, StoreError
-from synapse.api.constants import EventTypes
-from synapse.storage.prepare_database import prepare_database
-from synapse.storage.engines import create_engine
-from synapse.server import HomeServer
-from synapse.federation.transport import server
-from synapse.util.ratelimitutils import FederationRateLimiter
-
-from synapse.util.logcontext import LoggingContext
-
-from twisted.internet import defer, reactor
-from twisted.enterprise.adbapi import ConnectionPool
-
-from collections import namedtuple
-from mock import patch, Mock
import hashlib
+from inspect import getcallargs
import urllib
import urlparse
-from inspect import getcallargs
+from mock import Mock, patch
+from twisted.internet import defer, reactor
+
+from synapse.api.errors import CodeMessageException, cs_error
+from synapse.federation.transport import server
+from synapse.http.server import HttpServer
+from synapse.server import HomeServer
+from synapse.storage import PostgresEngine
+from synapse.storage.engines import create_engine
+from synapse.storage.prepare_database import prepare_database
+from synapse.util.logcontext import LoggingContext
+from synapse.util.ratelimitutils import FederationRateLimiter
+
+# set this to True to run the tests against postgres instead of sqlite.
+# It requires you to have a local postgres database called synapse_test, within
+# which ALL TABLES WILL BE DROPPED
+USE_POSTGRES_FOR_TESTS = False
@defer.inlineCallbacks
@@ -57,36 +58,70 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
config.worker_app = None
config.email_enable_notifs = False
config.block_non_admin_invites = False
+ config.federation_domain_whitelist = None
+ config.user_directory_search_all_users = False
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
config.update_user_directory = False
config.use_frozen_dicts = True
- config.database_config = {"name": "sqlite3"}
config.ldap_enabled = False
if "clock" not in kargs:
kargs["clock"] = MockClock()
+ if USE_POSTGRES_FOR_TESTS:
+ config.database_config = {
+ "name": "psycopg2",
+ "args": {
+ "database": "synapse_test",
+ "cp_min": 1,
+ "cp_max": 5,
+ },
+ }
+ else:
+ config.database_config = {
+ "name": "sqlite3",
+ "args": {
+ "database": ":memory:",
+ "cp_min": 1,
+ "cp_max": 1,
+ },
+ }
+
+ db_engine = create_engine(config.database_config)
+
+ # we need to configure the connection pool to run the on_new_connection
+ # function, so that we can test code that uses custom sqlite functions
+ # (like rank).
+ config.database_config["args"]["cp_openfun"] = db_engine.on_new_connection
+
if datastore is None:
- db_pool = SQLiteMemoryDbPool()
- yield db_pool.prepare()
hs = HomeServer(
- name, db_pool=db_pool, config=config,
+ name, config=config,
+ db_config=config.database_config,
version_string="Synapse/tests",
- database_engine=create_engine(config.database_config),
- get_db_conn=db_pool.get_db_conn,
+ database_engine=db_engine,
room_list_handler=object(),
tls_server_context_factory=Mock(),
**kargs
)
+ db_conn = hs.get_db_conn()
+ # make sure that the database is empty
+ if isinstance(db_engine, PostgresEngine):
+ cur = db_conn.cursor()
+ cur.execute("SELECT tablename FROM pg_tables where schemaname='public'")
+ rows = cur.fetchall()
+ for r in rows:
+ cur.execute("DROP TABLE %s CASCADE" % r[0])
+ yield prepare_database(db_conn, db_engine, config)
hs.setup()
else:
hs = HomeServer(
name, db_pool=None, datastore=datastore, config=config,
version_string="Synapse/tests",
- database_engine=create_engine(config.database_config),
+ database_engine=db_engine,
room_list_handler=object(),
tls_server_context_factory=Mock(),
**kargs
@@ -305,168 +340,6 @@ class MockClock(object):
return d
-class SQLiteMemoryDbPool(ConnectionPool, object):
- def __init__(self):
- super(SQLiteMemoryDbPool, self).__init__(
- "sqlite3", ":memory:",
- cp_min=1,
- cp_max=1,
- )
-
- self.config = Mock()
- self.config.password_providers = []
- self.config.database_config = {"name": "sqlite3"}
-
- def prepare(self):
- engine = self.create_engine()
- return self.runWithConnection(
- lambda conn: prepare_database(conn, engine, self.config)
- )
-
- def get_db_conn(self):
- conn = self.connect()
- engine = self.create_engine()
- prepare_database(conn, engine, self.config)
- return conn
-
- def create_engine(self):
- return create_engine(self.config.database_config)
-
-
-class MemoryDataStore(object):
-
- Room = namedtuple(
- "Room",
- ["room_id", "is_public", "creator"]
- )
-
- def __init__(self):
- self.tokens_to_users = {}
- self.paths_to_content = {}
-
- self.members = {}
- self.rooms = {}
-
- self.current_state = {}
- self.events = []
-
- class Snapshot(namedtuple("Snapshot", "room_id user_id membership_state")):
- def fill_out_prev_events(self, event):
- pass
-
- def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
- return self.Snapshot(
- room_id, user_id, self.get_room_member(user_id, room_id)
- )
-
- def register(self, user_id, token, password_hash):
- if user_id in self.tokens_to_users.values():
- raise StoreError(400, "User in use.")
- self.tokens_to_users[token] = user_id
-
- def get_user_by_access_token(self, token):
- try:
- return {
- "name": self.tokens_to_users[token],
- }
- except Exception:
- raise StoreError(400, "User does not exist.")
-
- def get_room(self, room_id):
- try:
- return self.rooms[room_id]
- except Exception:
- return None
-
- def store_room(self, room_id, room_creator_user_id, is_public):
- if room_id in self.rooms:
- raise StoreError(409, "Conflicting room!")
-
- room = MemoryDataStore.Room(
- room_id=room_id,
- is_public=is_public,
- creator=room_creator_user_id
- )
- self.rooms[room_id] = room
-
- def get_room_member(self, user_id, room_id):
- return self.members.get(room_id, {}).get(user_id)
-
- def get_room_members(self, room_id, membership=None):
- if membership:
- return [
- v for k, v in self.members.get(room_id, {}).items()
- if v.membership == membership
- ]
- else:
- return self.members.get(room_id, {}).values()
-
- def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
- return [
- m[user_id] for m in self.members.values()
- if user_id in m and m[user_id].membership in membership_list
- ]
-
- def get_room_events_stream(self, user_id=None, from_key=None, to_key=None,
- limit=0, with_feedback=False):
- return ([], from_key) # TODO
-
- def get_joined_hosts_for_room(self, room_id):
- return defer.succeed([])
-
- def persist_event(self, event):
- if event.type == EventTypes.Member:
- room_id = event.room_id
- user = event.state_key
- self.members.setdefault(room_id, {})[user] = event
-
- if hasattr(event, "state_key"):
- key = (event.room_id, event.type, event.state_key)
- self.current_state[key] = event
-
- self.events.append(event)
-
- def get_current_state(self, room_id, event_type=None, state_key=""):
- if event_type:
- key = (room_id, event_type, state_key)
- if self.current_state.get(key):
- return [self.current_state.get(key)]
- return None
- else:
- return [
- e for e in self.current_state
- if e[0] == room_id
- ]
-
- def set_presence_state(self, user_localpart, state):
- return defer.succeed({"state": 0})
-
- def get_presence_list(self, user_localpart, accepted):
- return []
-
- def get_room_events_max_id(self):
- return "s0" # TODO (erikj)
-
- def get_send_event_level(self, room_id):
- return defer.succeed(0)
-
- def get_power_level(self, room_id, user_id):
- return defer.succeed(0)
-
- def get_add_state_level(self, room_id):
- return defer.succeed(0)
-
- def get_room_join_rule(self, room_id):
- # TODO (erikj): This should be configurable
- return defer.succeed("invite")
-
- def get_ops_levels(self, room_id):
- return defer.succeed((5, 5, 5))
-
- def insert_client_ip(self, user, access_token, ip, user_agent):
- return defer.succeed(None)
-
-
def _format_call(args, kwargs):
return ", ".join(
["%r" % (a) for a in args] +
|