diff options
Diffstat (limited to 'synapse')
68 files changed, 1136 insertions, 347 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 2d52d26af5..56df3f5ac6 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -17,6 +17,7 @@ """ This is a reference implementation of a Matrix home server. """ +import os import sys # Check that we're not running on an unsupported Python version. @@ -36,3 +37,10 @@ except ImportError: pass __version__ = "1.4.0" + +if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): + # We import here so that we don't have to install a bunch of deps when + # running the packaging tox test. + from synapse.util.patch_inline_callbacks import do_patch + + do_patch() diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 774326dff9..eb54f56853 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -605,13 +605,13 @@ def run(hs): @defer.inlineCallbacks def generate_monthly_active_users(): current_mau_count = 0 - reserved_count = 0 + reserved_users = () store = hs.get_datastore() if hs.config.limit_usage_by_mau or hs.config.mau_stats_only: current_mau_count = yield store.get_monthly_active_count() - reserved_count = yield store.get_registered_reserved_users_count() + reserved_users = yield store.get_registered_reserved_users() current_mau_gauge.set(float(current_mau_count)) - registered_reserved_users_mau_gauge.set(float(reserved_count)) + registered_reserved_users_mau_gauge.set(float(len(reserved_users))) max_mau_gauge.set(float(hs.config.max_mau_value)) def start_generate_monthly_active_users(): diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 31f6530978..08619404bb 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -18,7 +18,9 @@ import argparse import errno import os +from collections import OrderedDict from textwrap import dedent +from typing import Any, MutableMapping, Optional from six import integer_types @@ -51,7 +53,56 @@ Missing mandatory `server_name` config option. """ +def path_exists(file_path): + """Check if a file exists + + Unlike os.path.exists, this throws an exception if there is an error + checking if the file exists (for example, if there is a perms error on + the parent dir). + + Returns: + bool: True if the file exists; False if not. + """ + try: + os.stat(file_path) + return True + except OSError as e: + if e.errno != errno.ENOENT: + raise e + return False + + class Config(object): + """ + A configuration section, containing configuration keys and values. + + Attributes: + section (str): The section title of this config object, such as + "tls" or "logger". This is used to refer to it on the root + logger (for example, `config.tls.some_option`). Must be + defined in subclasses. + """ + + section = None + + def __init__(self, root_config=None): + self.root = root_config + + def __getattr__(self, item: str) -> Any: + """ + Try and fetch a configuration option that does not exist on this class. + + This is so that existing configs that rely on `self.value`, where value + is actually from a different config section, continue to work. + """ + if item in ["generate_config_section", "read_config"]: + raise AttributeError(item) + + if self.root is None: + raise AttributeError(item) + else: + return self.root._get_unclassed_config(self.section, item) + @staticmethod def parse_size(value): if isinstance(value, integer_types): @@ -88,22 +139,7 @@ class Config(object): @classmethod def path_exists(cls, file_path): - """Check if a file exists - - Unlike os.path.exists, this throws an exception if there is an error - checking if the file exists (for example, if there is a perms error on - the parent dir). - - Returns: - bool: True if the file exists; False if not. - """ - try: - os.stat(file_path) - return True - except OSError as e: - if e.errno != errno.ENOENT: - raise e - return False + return path_exists(file_path) @classmethod def check_file(cls, file_path, config_name): @@ -136,42 +172,106 @@ class Config(object): with open(file_path) as file_stream: return file_stream.read() - def invoke_all(self, name, *args, **kargs): - """Invoke all instance methods with the given name and arguments in the - class's MRO. + +class RootConfig(object): + """ + Holder of an application's configuration. + + What configuration this object holds is defined by `config_classes`, a list + of Config classes that will be instantiated and given the contents of a + configuration file to read. They can then be accessed on this class by their + section name, defined in the Config or dynamically set to be the name of the + class, lower-cased and with "Config" removed. + """ + + config_classes = [] + + def __init__(self): + self._configs = OrderedDict() + + for config_class in self.config_classes: + if config_class.section is None: + raise ValueError("%r requires a section name" % (config_class,)) + + try: + conf = config_class(self) + except Exception as e: + raise Exception("Failed making %s: %r" % (config_class.section, e)) + self._configs[config_class.section] = conf + + def __getattr__(self, item: str) -> Any: + """ + Redirect lookups on this object either to config objects, or values on + config objects, so that `config.tls.blah` works, as well as legacy uses + of things like `config.server_name`. It will first look up the config + section name, and then values on those config classes. + """ + if item in self._configs.keys(): + return self._configs[item] + + return self._get_unclassed_config(None, item) + + def _get_unclassed_config(self, asking_section: Optional[str], item: str): + """ + Fetch a config value from one of the instantiated config classes that + has not been fetched directly. + + Args: + asking_section: If this check is coming from a Config child, which + one? This section will not be asked if it has the value. + item: The configuration value key. + + Raises: + AttributeError if no config classes have the config key. The body + will contain what sections were checked. + """ + for key, val in self._configs.items(): + if key == asking_section: + continue + + if item in dir(val): + return getattr(val, item) + + raise AttributeError(item, "not found in %s" % (list(self._configs.keys()),)) + + def invoke_all(self, func_name: str, *args, **kwargs) -> MutableMapping[str, Any]: + """ + Invoke a function on all instantiated config objects this RootConfig is + configured to use. Args: - name (str): Name of function to invoke + func_name: Name of function to invoke *args **kwargs - Returns: - list: The list of the return values from each method called + ordered dictionary of config section name and the result of the + function from it. """ - results = [] - for cls in type(self).mro(): - if name in cls.__dict__: - results.append(getattr(cls, name)(self, *args, **kargs)) - return results + res = OrderedDict() + + for name, config in self._configs.items(): + if hasattr(config, func_name): + res[name] = getattr(config, func_name)(*args, **kwargs) + + return res @classmethod - def invoke_all_static(cls, name, *args, **kargs): - """Invoke all static methods with the given name and arguments in the - class's MRO. + def invoke_all_static(cls, func_name: str, *args, **kwargs): + """ + Invoke a static function on config objects this RootConfig is + configured to use. Args: - name (str): Name of function to invoke + func_name: Name of function to invoke *args **kwargs - Returns: - list: The list of the return values from each method called + ordered dictionary of config section name and the result of the + function from it. """ - results = [] - for c in cls.mro(): - if name in c.__dict__: - results.append(getattr(c, name)(*args, **kargs)) - return results + for config in cls.config_classes: + if hasattr(config, func_name): + getattr(config, func_name)(*args, **kwargs) def generate_config( self, @@ -187,7 +287,8 @@ class Config(object): tls_private_key_path=None, acme_domain=None, ): - """Build a default configuration file + """ + Build a default configuration file This is used when the user explicitly asks us to generate a config file (eg with --generate_config). @@ -242,6 +343,7 @@ class Config(object): Returns: str: the yaml config file """ + return "\n\n".join( dedent(conf) for conf in self.invoke_all( @@ -257,7 +359,7 @@ class Config(object): tls_certificate_path=tls_certificate_path, tls_private_key_path=tls_private_key_path, acme_domain=acme_domain, - ) + ).values() ) @classmethod @@ -444,7 +546,7 @@ class Config(object): ) (config_path,) = config_files - if not cls.path_exists(config_path): + if not path_exists(config_path): print("Generating config file %s" % (config_path,)) if config_args.data_directory: @@ -469,7 +571,7 @@ class Config(object): open_private_ports=config_args.open_private_ports, ) - if not cls.path_exists(config_dir_path): + if not path_exists(config_dir_path): os.makedirs(config_dir_path) with open(config_path, "w") as config_file: config_file.write("# vim:ft=yaml\n\n") @@ -518,7 +620,7 @@ class Config(object): return obj - def parse_config_dict(self, config_dict, config_dir_path, data_dir_path): + def parse_config_dict(self, config_dict, config_dir_path=None, data_dir_path=None): """Read the information from the config dict into this Config object. Args: @@ -607,3 +709,6 @@ def find_config_files(search_paths): else: config_files.append(config_path) return config_files + + +__all__ = ["Config", "RootConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi new file mode 100644 index 0000000000..86bc965ee4 --- /dev/null +++ b/synapse/config/_base.pyi @@ -0,0 +1,135 @@ +from typing import Any, List, Optional + +from synapse.config import ( + api, + appservice, + captcha, + cas, + consent_config, + database, + emailconfig, + groups, + jwt_config, + key, + logger, + metrics, + password, + password_auth_providers, + push, + ratelimiting, + registration, + repository, + room_directory, + saml2_config, + server, + server_notices_config, + spam_checker, + stats, + third_party_event_rules, + tls, + tracer, + user_directory, + voip, + workers, +) + +class ConfigError(Exception): ... + +MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS: str +MISSING_REPORT_STATS_SPIEL: str +MISSING_SERVER_NAME: str + +def path_exists(file_path: str): ... + +class RootConfig: + server: server.ServerConfig + tls: tls.TlsConfig + database: database.DatabaseConfig + logging: logger.LoggingConfig + ratelimit: ratelimiting.RatelimitConfig + media: repository.ContentRepositoryConfig + captcha: captcha.CaptchaConfig + voip: voip.VoipConfig + registration: registration.RegistrationConfig + metrics: metrics.MetricsConfig + api: api.ApiConfig + appservice: appservice.AppServiceConfig + key: key.KeyConfig + saml2: saml2_config.SAML2Config + cas: cas.CasConfig + jwt: jwt_config.JWTConfig + password: password.PasswordConfig + email: emailconfig.EmailConfig + worker: workers.WorkerConfig + authproviders: password_auth_providers.PasswordAuthProviderConfig + push: push.PushConfig + spamchecker: spam_checker.SpamCheckerConfig + groups: groups.GroupsConfig + userdirectory: user_directory.UserDirectoryConfig + consent: consent_config.ConsentConfig + stats: stats.StatsConfig + servernotices: server_notices_config.ServerNoticesConfig + roomdirectory: room_directory.RoomDirectoryConfig + thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig + tracer: tracer.TracerConfig + + config_classes: List = ... + def __init__(self) -> None: ... + def invoke_all(self, func_name: str, *args: Any, **kwargs: Any): ... + @classmethod + def invoke_all_static(cls, func_name: str, *args: Any, **kwargs: Any) -> None: ... + def __getattr__(self, item: str): ... + def parse_config_dict( + self, + config_dict: Any, + config_dir_path: Optional[Any] = ..., + data_dir_path: Optional[Any] = ..., + ) -> None: ... + read_config: Any = ... + def generate_config( + self, + config_dir_path: str, + data_dir_path: str, + server_name: str, + generate_secrets: bool = ..., + report_stats: Optional[str] = ..., + open_private_ports: bool = ..., + listeners: Optional[Any] = ..., + database_conf: Optional[Any] = ..., + tls_certificate_path: Optional[str] = ..., + tls_private_key_path: Optional[str] = ..., + acme_domain: Optional[str] = ..., + ): ... + @classmethod + def load_or_generate_config(cls, description: Any, argv: Any): ... + @classmethod + def load_config(cls, description: Any, argv: Any): ... + @classmethod + def add_arguments_to_parser(cls, config_parser: Any) -> None: ... + @classmethod + def load_config_with_parser(cls, parser: Any, argv: Any): ... + def generate_missing_files( + self, config_dict: dict, config_dir_path: str + ) -> None: ... + +class Config: + root: RootConfig + def __init__(self, root_config: Optional[RootConfig] = ...) -> None: ... + def __getattr__(self, item: str, from_root: bool = ...): ... + @staticmethod + def parse_size(value: Any): ... + @staticmethod + def parse_duration(value: Any): ... + @staticmethod + def abspath(file_path: Optional[str]): ... + @classmethod + def path_exists(cls, file_path: str): ... + @classmethod + def check_file(cls, file_path: str, config_name: str): ... + @classmethod + def ensure_directory(cls, dir_path: str): ... + @classmethod + def read_file(cls, file_path: str, config_name: str): ... + +def read_config_files(config_files: List[str]): ... +def find_config_files(search_paths: List[str]): ... diff --git a/synapse/config/api.py b/synapse/config/api.py index dddea79a8a..74cd53a8ed 100644 --- a/synapse/config/api.py +++ b/synapse/config/api.py @@ -18,6 +18,8 @@ from ._base import Config class ApiConfig(Config): + section = "api" + def read_config(self, config, **kwargs): self.room_invite_state_types = config.get( "room_invite_state_types", diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index 28d36b1bc3..9b4682222d 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -30,6 +30,8 @@ logger = logging.getLogger(__name__) class AppServiceConfig(Config): + section = "appservice" + def read_config(self, config, **kwargs): self.app_service_config_files = config.get("app_service_config_files", []) self.notify_appservices = config.get("notify_appservices", True) diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py index 8dac8152cf..44bd5c6799 100644 --- a/synapse/config/captcha.py +++ b/synapse/config/captcha.py @@ -16,6 +16,8 @@ from ._base import Config class CaptchaConfig(Config): + section = "captcha" + def read_config(self, config, **kwargs): self.recaptcha_private_key = config.get("recaptcha_private_key") self.recaptcha_public_key = config.get("recaptcha_public_key") diff --git a/synapse/config/cas.py b/synapse/config/cas.py index ebe34d933b..4526c1a67b 100644 --- a/synapse/config/cas.py +++ b/synapse/config/cas.py @@ -22,17 +22,21 @@ class CasConfig(Config): cas_server_url: URL of CAS server """ + section = "cas" + def read_config(self, config, **kwargs): cas_config = config.get("cas_config", None) if cas_config: self.cas_enabled = cas_config.get("enabled", True) self.cas_server_url = cas_config["server_url"] self.cas_service_url = cas_config["service_url"] + self.cas_displayname_attribute = cas_config.get("displayname_attribute") self.cas_required_attributes = cas_config.get("required_attributes", {}) else: self.cas_enabled = False self.cas_server_url = None self.cas_service_url = None + self.cas_displayname_attribute = None self.cas_required_attributes = {} def generate_config_section(self, config_dir_path, server_name, **kwargs): @@ -43,6 +47,7 @@ class CasConfig(Config): # enabled: true # server_url: "https://cas-server.com" # service_url: "https://homeserver.domain.com:8448" + # #displayname_attribute: name # #required_attributes: # # name: value """ diff --git a/synapse/config/consent_config.py b/synapse/config/consent_config.py index 48976e17b1..62c4c44d60 100644 --- a/synapse/config/consent_config.py +++ b/synapse/config/consent_config.py @@ -73,6 +73,9 @@ DEFAULT_CONFIG = """\ class ConsentConfig(Config): + + section = "consent" + def __init__(self, *args): super(ConsentConfig, self).__init__(*args) diff --git a/synapse/config/database.py b/synapse/config/database.py index 118aafbd4a..0e2509f0b1 100644 --- a/synapse/config/database.py +++ b/synapse/config/database.py @@ -21,6 +21,8 @@ from ._base import Config class DatabaseConfig(Config): + section = "database" + def read_config(self, config, **kwargs): self.event_cache_size = self.parse_size(config.get("event_cache_size", "10K")) diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index d9b43de660..658897a77e 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -28,6 +28,8 @@ from ._base import Config, ConfigError class EmailConfig(Config): + section = "email" + def read_config(self, config, **kwargs): # TODO: We should separate better the email configuration from the notification # and account validity config. diff --git a/synapse/config/groups.py b/synapse/config/groups.py index 2a522b5f44..d6862d9a64 100644 --- a/synapse/config/groups.py +++ b/synapse/config/groups.py @@ -17,6 +17,8 @@ from ._base import Config class GroupsConfig(Config): + section = "groups" + def read_config(self, config, **kwargs): self.enable_group_creation = config.get("enable_group_creation", False) self.group_creation_prefix = config.get("group_creation_prefix", "") diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 72acad4f18..6e348671c7 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from ._base import RootConfig from .api import ApiConfig from .appservice import AppServiceConfig from .captcha import CaptchaConfig @@ -46,36 +47,37 @@ from .voip import VoipConfig from .workers import WorkerConfig -class HomeServerConfig( - ServerConfig, - TlsConfig, - DatabaseConfig, - LoggingConfig, - RatelimitConfig, - ContentRepositoryConfig, - CaptchaConfig, - VoipConfig, - RegistrationConfig, - MetricsConfig, - ApiConfig, - AppServiceConfig, - KeyConfig, - SAML2Config, - CasConfig, - JWTConfig, - PasswordConfig, - EmailConfig, - WorkerConfig, - PasswordAuthProviderConfig, - PushConfig, - SpamCheckerConfig, - GroupsConfig, - UserDirectoryConfig, - ConsentConfig, - StatsConfig, - ServerNoticesConfig, - RoomDirectoryConfig, - ThirdPartyRulesConfig, - TracerConfig, -): - pass +class HomeServerConfig(RootConfig): + + config_classes = [ + ServerConfig, + TlsConfig, + DatabaseConfig, + LoggingConfig, + RatelimitConfig, + ContentRepositoryConfig, + CaptchaConfig, + VoipConfig, + RegistrationConfig, + MetricsConfig, + ApiConfig, + AppServiceConfig, + KeyConfig, + SAML2Config, + CasConfig, + JWTConfig, + PasswordConfig, + EmailConfig, + WorkerConfig, + PasswordAuthProviderConfig, + PushConfig, + SpamCheckerConfig, + GroupsConfig, + UserDirectoryConfig, + ConsentConfig, + StatsConfig, + ServerNoticesConfig, + RoomDirectoryConfig, + ThirdPartyRulesConfig, + TracerConfig, + ] diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py index 36d87cef03..a568726985 100644 --- a/synapse/config/jwt_config.py +++ b/synapse/config/jwt_config.py @@ -23,6 +23,8 @@ MISSING_JWT = """Missing jwt library. This is required for jwt login. class JWTConfig(Config): + section = "jwt" + def read_config(self, config, **kwargs): jwt_config = config.get("jwt_config", None) if jwt_config: diff --git a/synapse/config/key.py b/synapse/config/key.py index f039f96e9c..ec5d430afb 100644 --- a/synapse/config/key.py +++ b/synapse/config/key.py @@ -92,6 +92,8 @@ class TrustedKeyServer(object): class KeyConfig(Config): + section = "key" + def read_config(self, config, config_dir_path, **kwargs): # the signing key can be specified inline or in a separate file if "signing_key" in config: diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 767ecfdf09..d609ec111b 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -84,6 +84,8 @@ root: class LoggingConfig(Config): + section = "logging" + def read_config(self, config, **kwargs): self.log_config = self.abspath(config.get("log_config")) self.no_redirect_stdio = config.get("no_redirect_stdio", False) diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py index ec35a6b868..282a43bddb 100644 --- a/synapse/config/metrics.py +++ b/synapse/config/metrics.py @@ -34,6 +34,8 @@ class MetricsFlags(object): class MetricsConfig(Config): + section = "metrics" + def read_config(self, config, **kwargs): self.enable_metrics = config.get("enable_metrics", False) self.report_stats = config.get("report_stats", None) diff --git a/synapse/config/password.py b/synapse/config/password.py index d5b5953f2f..2a634ac751 100644 --- a/synapse/config/password.py +++ b/synapse/config/password.py @@ -20,6 +20,8 @@ class PasswordConfig(Config): """Password login configuration """ + section = "password" + def read_config(self, config, **kwargs): password_config = config.get("password_config", {}) if password_config is None: diff --git a/synapse/config/password_auth_providers.py b/synapse/config/password_auth_providers.py index c50e244394..9746bbc681 100644 --- a/synapse/config/password_auth_providers.py +++ b/synapse/config/password_auth_providers.py @@ -23,6 +23,8 @@ LDAP_PROVIDER = "ldap_auth_provider.LdapAuthProvider" class PasswordAuthProviderConfig(Config): + section = "authproviders" + def read_config(self, config, **kwargs): self.password_providers = [] # type: List[Any] providers = [] diff --git a/synapse/config/push.py b/synapse/config/push.py index 1b932722a5..0910958649 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -18,6 +18,8 @@ from ._base import Config class PushConfig(Config): + section = "push" + def read_config(self, config, **kwargs): push_config = config.get("push", {}) self.push_include_content = push_config.get("include_content", True) diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 587e2862b7..947f653e03 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -36,6 +36,8 @@ class FederationRateLimitConfig(object): class RatelimitConfig(Config): + section = "ratelimiting" + def read_config(self, config, **kwargs): # Load the new-style messages config if it exists. Otherwise fall back diff --git a/synapse/config/registration.py b/synapse/config/registration.py index bef89e2bf4..b3e3e6dda2 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -24,6 +24,8 @@ from synapse.util.stringutils import random_string_with_symbols class AccountValidityConfig(Config): + section = "accountvalidity" + def __init__(self, config, synapse_config): self.enabled = config.get("enabled", False) self.renew_by_email_enabled = "renew_at" in config @@ -77,6 +79,8 @@ class AccountValidityConfig(Config): class RegistrationConfig(Config): + section = "registration" + def read_config(self, config, **kwargs): self.enable_registration = bool( strtobool(str(config.get("enable_registration", False))) diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 14740891f3..d0205e14b9 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -78,6 +78,8 @@ def parse_thumbnail_requirements(thumbnail_sizes): class ContentRepositoryConfig(Config): + section = "media" + def read_config(self, config, **kwargs): # Only enable the media repo if either the media repo is enabled or the diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py index a92693017b..7c9f05bde4 100644 --- a/synapse/config/room_directory.py +++ b/synapse/config/room_directory.py @@ -19,6 +19,8 @@ from ._base import Config, ConfigError class RoomDirectoryConfig(Config): + section = "roomdirectory" + def read_config(self, config, **kwargs): self.enable_room_list_search = config.get("enable_room_list_search", True) diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py index ab34b41ca8..c407e13680 100644 --- a/synapse/config/saml2_config.py +++ b/synapse/config/saml2_config.py @@ -55,6 +55,8 @@ def _dict_merge(merge_dict, into_dict): class SAML2Config(Config): + section = "saml2" + def read_config(self, config, **kwargs): self.saml2_enabled = False diff --git a/synapse/config/server.py b/synapse/config/server.py index 709bd387e5..afc4d6a4ab 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -58,6 +58,8 @@ on how to configure the new listener. class ServerConfig(Config): + section = "server" + def read_config(self, config, **kwargs): self.server_name = config["server_name"] self.server_context = config.get("server_context", None) diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices_config.py index 6d4285ef93..6ea2ea8869 100644 --- a/synapse/config/server_notices_config.py +++ b/synapse/config/server_notices_config.py @@ -59,6 +59,8 @@ class ServerNoticesConfig(Config): None if server notices are not enabled. """ + section = "servernotices" + def __init__(self, *args): super(ServerNoticesConfig, self).__init__(*args) self.server_notices_mxid = None diff --git a/synapse/config/spam_checker.py b/synapse/config/spam_checker.py index e40797ab50..36e0ddab5c 100644 --- a/synapse/config/spam_checker.py +++ b/synapse/config/spam_checker.py @@ -19,6 +19,8 @@ from ._base import Config class SpamCheckerConfig(Config): + section = "spamchecker" + def read_config(self, config, **kwargs): self.spam_checker = None diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b18ddbd1fa..62485189ea 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -25,6 +25,8 @@ class StatsConfig(Config): Configuration for the behaviour of synapse's stats engine """ + section = "stats" + def read_config(self, config, **kwargs): self.stats_enabled = True self.stats_bucket_size = 86400 * 1000 diff --git a/synapse/config/third_party_event_rules.py b/synapse/config/third_party_event_rules.py index b3431441b9..10a99c792e 100644 --- a/synapse/config/third_party_event_rules.py +++ b/synapse/config/third_party_event_rules.py @@ -19,6 +19,8 @@ from ._base import Config class ThirdPartyRulesConfig(Config): + section = "thirdpartyrules" + def read_config(self, config, **kwargs): self.third_party_event_rules = None diff --git a/synapse/config/tls.py b/synapse/config/tls.py index fc47ba3e9a..f06341eb67 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -18,6 +18,7 @@ import os import warnings from datetime import datetime from hashlib import sha256 +from typing import List import six @@ -33,7 +34,9 @@ logger = logging.getLogger(__name__) class TlsConfig(Config): - def read_config(self, config, config_dir_path, **kwargs): + section = "tls" + + def read_config(self, config: dict, config_dir_path: str, **kwargs): acme_config = config.get("acme", None) if acme_config is None: @@ -57,7 +60,7 @@ class TlsConfig(Config): self.tls_certificate_file = self.abspath(config.get("tls_certificate_path")) self.tls_private_key_file = self.abspath(config.get("tls_private_key_path")) - if self.has_tls_listener(): + if self.root.server.has_tls_listener(): if not self.tls_certificate_file: raise ConfigError( "tls_certificate_path must be specified if TLS-enabled listeners are " @@ -108,7 +111,7 @@ class TlsConfig(Config): ) # Support globs (*) in whitelist values - self.federation_certificate_verification_whitelist = [] + self.federation_certificate_verification_whitelist = [] # type: List[str] for entry in fed_whitelist_entries: try: entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii")) diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py index 85d99a3166..8be1346113 100644 --- a/synapse/config/tracer.py +++ b/synapse/config/tracer.py @@ -19,6 +19,8 @@ from ._base import Config, ConfigError class TracerConfig(Config): + section = "tracing" + def read_config(self, config, **kwargs): opentracing_config = config.get("opentracing") if opentracing_config is None: diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py index f6313e17d4..c8d19c5d6b 100644 --- a/synapse/config/user_directory.py +++ b/synapse/config/user_directory.py @@ -21,6 +21,8 @@ class UserDirectoryConfig(Config): Configuration for the behaviour of the /user_directory API """ + section = "userdirectory" + def read_config(self, config, **kwargs): self.user_directory_search_enabled = True self.user_directory_search_all_users = False diff --git a/synapse/config/voip.py b/synapse/config/voip.py index 2ca0e1cf70..a68a3068aa 100644 --- a/synapse/config/voip.py +++ b/synapse/config/voip.py @@ -16,6 +16,8 @@ from ._base import Config class VoipConfig(Config): + section = "voip" + def read_config(self, config, **kwargs): self.turn_uris = config.get("turn_uris", []) self.turn_shared_secret = config.get("turn_shared_secret") diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 1ec4998625..fef72ed974 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -21,6 +21,8 @@ class WorkerConfig(Config): They have their own pid_file and listener configuration. They use the replication_url to talk to the main synapse process.""" + section = "worker" + def read_config(self, config, **kwargs): self.worker_app = config.get("worker_app") diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index d50e691436..8f10b6adbb 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2018 New Vector Ltd +# Copyright 2019 Michael Telatynski <7t3chguy@gmail.com> # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,16 +21,16 @@ from six import string_types from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.api.errors import Codes, SynapseError from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute logger = logging.getLogger(__name__) -# TODO: Allow users to "knock" or simpkly join depending on rules +# TODO: Allow users to "knock" or simply join depending on rules # TODO: Federation admin APIs -# TODO: is_priveged flag to users and is_public to users and rooms +# TODO: is_privileged flag to users and is_public to users and rooms # TODO: Audit log for admins (profile updates, membership changes, users who tried # to join but were rejected, etc) # TODO: Flairs @@ -590,7 +591,18 @@ class GroupsServerHandler(object): ) # TODO: Check if user knocked - # TODO: Check if user is already invited + + invited_users = yield self.store.get_invited_users_in_group(group_id) + if user_id in invited_users: + raise SynapseError( + 400, "User already invited to group", errcode=Codes.BAD_STATE + ) + + user_results = yield self.store.get_users_in_group( + group_id, include_private=True + ) + if user_id in [user_result["user_id"] for user_result in user_results]: + raise SynapseError(400, "User already in group") content = { "profile": {"name": group["name"], "avatar_url": group["avatar_url"]}, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 053cf66b28..2a5f1a007d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -803,17 +803,25 @@ class PresenceHandler(object): # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "presence_delta"): - deltas = yield self.store.get_current_state_deltas(self._event_pos) - if not deltas: + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self._event_pos == room_max_stream_ordering: return + logger.debug( + "Processing presence stats %s->%s", + self._event_pos, + room_max_stream_ordering, + ) + max_pos, deltas = yield self.store.get_current_state_deltas( + self._event_pos, room_max_stream_ordering + ) yield self._handle_state_delta(deltas) - self._event_pos = deltas[-1]["stream_id"] + self._event_pos = max_pos # Expose current event processing position to prometheus synapse.metrics.event_processing_positions.labels("presence").set( - self._event_pos + max_pos ) @defer.inlineCallbacks diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 970be3c846..2816bd8f87 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -28,6 +28,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.http.endpoint import parse_and_validate_server_name from synapse.storage.state import StateFilter from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID from synapse.util import stringutils @@ -554,7 +555,8 @@ class RoomCreationHandler(BaseHandler): invite_list = config.get("invite", []) for i in invite_list: try: - UserID.from_string(i) + uid = UserID.from_string(i) + parse_and_validate_server_name(uid.domain) except Exception: raise SynapseError(400, "Invalid user_id: %s" % (i,)) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 95a244d86c..380e2fad5e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -203,23 +203,11 @@ class RoomMemberHandler(object): prev_member_event = yield self.store.get_event(prev_member_event_id) newly_joined = prev_member_event.membership != Membership.JOIN if newly_joined: - yield self._user_joined_room(target, room_id) - - # Copy over direct message status and room tags if this is a join - # on an upgraded room - - # Check if this is an upgraded room - predecessor = yield self.store.get_room_predecessor(room_id) - - if predecessor: - # It is an upgraded room. Copy over old tags - self.copy_room_tags_and_direct_to_room( - predecessor["room_id"], room_id, user_id - ) - # Copy over push rules - yield self.store.copy_push_rules_from_room_to_room_for_user( - predecessor["room_id"], room_id, user_id + # Copy over user state if we're joining an upgraded room + yield self.copy_user_state_if_room_upgrade( + room_id, requester.user.to_string() ) + yield self._user_joined_room(target, room_id) elif event.membership == Membership.LEAVE: if prev_member_event_id: prev_member_event = yield self.store.get_event(prev_member_event_id) @@ -463,10 +451,16 @@ class RoomMemberHandler(object): if requester.is_guest: content["kind"] = "guest" - ret = yield self._remote_join( + remote_join_response = yield self._remote_join( requester, remote_room_hosts, room_id, target, content ) - return ret + + # Copy over user state if this is a join on an remote upgraded room + yield self.copy_user_state_if_room_upgrade( + room_id, requester.user.to_string() + ) + + return remote_join_response elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: @@ -504,6 +498,38 @@ class RoomMemberHandler(object): return res @defer.inlineCallbacks + def copy_user_state_if_room_upgrade(self, new_room_id, user_id): + """Copy user-specific information when they join a new room if that new room is the + result of a room upgrade + + Args: + new_room_id (str): The ID of the room the user is joining + user_id (str): The ID of the user + + Returns: + Deferred + """ + # Check if the new room is an upgraded room + predecessor = yield self.store.get_room_predecessor(new_room_id) + if not predecessor: + return + + logger.debug( + "Found predecessor for %s: %s. Copying over room tags and push " "rules", + new_room_id, + predecessor, + ) + + # It is an upgraded room. Copy over old tags + yield self.copy_room_tags_and_direct_to_room( + predecessor["room_id"], new_room_id, user_id + ) + # Copy over push rules + yield self.store.copy_push_rules_from_room_to_room_for_user( + predecessor["room_id"], new_room_id, user_id + ) + + @defer.inlineCallbacks def send_membership_event(self, requester, event, context, ratelimit=True): """ Change the membership status of a user in a room. diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index c62b113115..466daf9202 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -87,21 +87,23 @@ class StatsHandler(StateDeltasHandler): # Be sure to read the max stream_ordering *before* checking if there are any outstanding # deltas, since there is otherwise a chance that we could miss updates which arrive # after we check the deltas. - room_max_stream_ordering = yield self.store.get_room_max_stream_ordering() + room_max_stream_ordering = self.store.get_room_max_stream_ordering() if self.pos == room_max_stream_ordering: break - deltas = yield self.store.get_current_state_deltas(self.pos) + logger.debug( + "Processing room stats %s->%s", self.pos, room_max_stream_ordering + ) + max_pos, deltas = yield self.store.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) if deltas: logger.debug("Handling %d state deltas", len(deltas)) room_deltas, user_deltas = yield self._handle_deltas(deltas) - - max_pos = deltas[-1]["stream_id"] else: room_deltas = {} user_deltas = {} - max_pos = room_max_stream_ordering # Then count deltas for total_events and total_event_bytes. room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes( diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index e53669e40d..624f05ab5b 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler): # Loop round handling deltas until we're up to date while True: with Measure(self.clock, "user_dir_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) - if not deltas: + room_max_stream_ordering = self.store.get_room_max_stream_ordering() + if self.pos == room_max_stream_ordering: return + logger.debug( + "Processing user stats %s->%s", self.pos, room_max_stream_ordering + ) + max_pos, deltas = yield self.store.get_current_state_deltas( + self.pos, room_max_stream_ordering + ) + logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) - self.pos = deltas[-1]["stream_id"] + self.pos = max_pos # Expose current event processing position to prometheus synapse.metrics.event_processing_positions.labels("user_dir").set( - self.pos + max_pos ) - yield self.store.update_user_directory_stream_pos(self.pos) + yield self.store.update_user_directory_stream_pos(max_pos) @defer.inlineCallbacks def _handle_deltas(self, deltas): diff --git a/synapse/http/server.py b/synapse/http/server.py index cb9158fe1b..2ccb210fd6 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -388,7 +388,7 @@ class DirectServeResource(resource.Resource): if not callback: return super().render(request) - resp = callback(request) + resp = trace_servlet(self.__class__.__name__)(callback)(request) # If it's a coroutine, turn it into a Deferred if isinstance(resp, types.CoroutineType): diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index cd1ff6a518..0638cec429 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -169,6 +169,7 @@ import contextlib import inspect import logging import re +import types from functools import wraps from typing import Dict @@ -778,8 +779,7 @@ def trace_servlet(servlet_name, extract_context=False): return func @wraps(func) - @defer.inlineCallbacks - def _trace_servlet_inner(request, *args, **kwargs): + async def _trace_servlet_inner(request, *args, **kwargs): request_tags = { "request_id": request.get_request_id(), tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, @@ -796,8 +796,14 @@ def trace_servlet(servlet_name, extract_context=False): scope = start_active_span(servlet_name, tags=request_tags) with scope: - result = yield defer.maybeDeferred(func, request, *args, **kwargs) - return result + result = func(request, *args, **kwargs) + + if not isinstance(result, (types.CoroutineType, defer.Deferred)): + # Some servlets aren't async and just return results + # directly, so we handle that here. + return result + + return await result return _trace_servlet_inner diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 9cddbc752a..8414af08cb 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -377,6 +377,7 @@ class CasTicketServlet(RestServlet): super(CasTicketServlet, self).__init__() self.cas_server_url = hs.config.cas_server_url self.cas_service_url = hs.config.cas_service_url + self.cas_displayname_attribute = hs.config.cas_displayname_attribute self.cas_required_attributes = hs.config.cas_required_attributes self._sso_auth_handler = SSOAuthHandler(hs) self._http_client = hs.get_simple_http_client() @@ -400,6 +401,7 @@ class CasTicketServlet(RestServlet): def handle_cas_response(self, request, cas_response_body, client_redirect_url): user, attributes = self.parse_cas_response(cas_response_body) + displayname = attributes.pop(self.cas_displayname_attribute, None) for required_attribute, required_value in self.cas_required_attributes.items(): # If required attribute was not in CAS Response - Forbidden @@ -414,7 +416,7 @@ class CasTicketServlet(RestServlet): raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED) return self._sso_auth_handler.on_successful_auth( - user, request, client_redirect_url + user, request, client_redirect_url, displayname ) def parse_cas_response(self, cas_response_body): diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index c6ddf24c8d..17a8bc7366 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer -from synapse.api.errors import AuthError, Codes, StoreError, SynapseError +from synapse.api.errors import AuthError, NotFoundError, StoreError, SynapseError from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID @@ -52,13 +52,15 @@ class GetFilterRestServlet(RestServlet): raise SynapseError(400, "Invalid filter_id") try: - filter = yield self.filtering.get_user_filter( + filter_collection = yield self.filtering.get_user_filter( user_localpart=target_user.localpart, filter_id=filter_id ) + except StoreError as e: + if e.code != 404: + raise + raise NotFoundError("No such filter") - return 200, filter.get_filter_json() - except (KeyError, StoreError): - raise SynapseError(400, "No such filter", errcode=Codes.NOT_FOUND) + return 200, filter_collection.get_filter_json() class CreateFilterRestServlet(RestServlet): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index c98c5a3802..a883c8adda 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -21,7 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.constants import PresenceState -from synapse.api.errors import SynapseError +from synapse.api.errors import Codes, StoreError, SynapseError from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection from synapse.events.utils import ( format_event_for_client_v2_without_room_id, @@ -119,25 +119,32 @@ class SyncRestServlet(RestServlet): request_key = (user, timeout, since, filter_id, full_state, device_id) - if filter_id: - if filter_id.startswith("{"): - try: - filter_object = json.loads(filter_id) - set_timeline_upper_limit( - filter_object, self.hs.config.filter_timeline_limit - ) - except Exception: - raise SynapseError(400, "Invalid filter JSON") - self.filtering.check_valid_filter(filter_object) - filter = FilterCollection(filter_object) - else: - filter = yield self.filtering.get_user_filter(user.localpart, filter_id) + if filter_id is None: + filter_collection = DEFAULT_FILTER_COLLECTION + elif filter_id.startswith("{"): + try: + filter_object = json.loads(filter_id) + set_timeline_upper_limit( + filter_object, self.hs.config.filter_timeline_limit + ) + except Exception: + raise SynapseError(400, "Invalid filter JSON") + self.filtering.check_valid_filter(filter_object) + filter_collection = FilterCollection(filter_object) else: - filter = DEFAULT_FILTER_COLLECTION + try: + filter_collection = yield self.filtering.get_user_filter( + user.localpart, filter_id + ) + except StoreError as err: + if err.code != 404: + raise + # fix up the description and errcode to be more useful + raise SynapseError(400, "No such filter", errcode=Codes.INVALID_PARAM) sync_config = SyncConfig( user=user, - filter_collection=filter, + filter_collection=filter_collection, is_guest=requester.is_guest, request_key=request_key, device_id=device_id, @@ -171,7 +178,7 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() response_content = yield self.encode_response( - time_now, sync_result, requester.access_token_id, filter + time_now, sync_result, requester.access_token_id, filter_collection ) return 200, response_content diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 7a56cd4b6c..0c68c3aad5 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -270,7 +270,7 @@ class PreviewUrlResource(DirectServeResource): logger.debug("Calculated OG for %s as %s" % (url, og)) - jsonog = json.dumps(og).encode("utf8") + jsonog = json.dumps(og) # store OG in history-aware DB cache yield self.store.store_url_cache( @@ -283,7 +283,7 @@ class PreviewUrlResource(DirectServeResource): media_info["created_ts"], ) - return jsonog + return jsonog.encode("utf8") @defer.inlineCallbacks def _download_url(self, url, user): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index abe16334ec..f5906fcd54 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -20,6 +20,7 @@ import random import sys import threading import time +from typing import Iterable, Tuple from six import PY2, iteritems, iterkeys, itervalues from six.moves import builtins, intern, range @@ -30,7 +31,7 @@ from prometheus_client import Histogram from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.logging.context import LoggingContext, PreserveLoggingContext +from synapse.logging.context import LoggingContext, make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id @@ -550,8 +551,9 @@ class SQLBaseStore(object): return func(conn, *args, **kwargs) - with PreserveLoggingContext(): - result = yield self._db_pool.runWithConnection(inner_func, *args, **kwargs) + result = yield make_deferred_yieldable( + self._db_pool.runWithConnection(inner_func, *args, **kwargs) + ) return result @@ -1162,19 +1164,18 @@ class SQLBaseStore(object): if not iterable: return [] - sql = "SELECT %s FROM %s" % (", ".join(retcols), table) - - clauses = [] - values = [] - clauses.append("%s IN (%s)" % (column, ",".join("?" for _ in iterable))) - values.extend(iterable) + clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) + clauses = [clause] for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) - if clauses: - sql = "%s WHERE %s" % (sql, " AND ".join(clauses)) + sql = "SELECT %s FROM %s WHERE %s" % ( + ", ".join(retcols), + table, + " AND ".join(clauses), + ) txn.execute(sql, values) return cls.cursor_to_dict(txn) @@ -1323,10 +1324,8 @@ class SQLBaseStore(object): sql = "DELETE FROM %s" % table - clauses = [] - values = [] - clauses.append("%s IN (%s)" % (column, ",".join("?" for _ in iterable))) - values.extend(iterable) + clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable) + clauses = [clause] for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) @@ -1693,3 +1692,30 @@ def db_to_json(db_content): except Exception: logging.warning("Tried to decode '%r' as JSON and failed", db_content) raise + + +def make_in_list_sql_clause( + database_engine, column: str, iterable: Iterable +) -> Tuple[str, Iterable]: + """Returns an SQL clause that checks the given column is in the iterable. + + On SQLite this expands to `column IN (?, ?, ...)`, whereas on Postgres + it expands to `column = ANY(?)`. While both DBs support the `IN` form, + using the `ANY` form on postgres means that it views queries with + different length iterables as the same, helping the query stats. + + Args: + database_engine + column: Name of the column + iterable: The values to check the column against. + + Returns: + A tuple of SQL query and the args + """ + + if database_engine.supports_using_any_list: + # This should hopefully be faster, but also makes postgres query + # stats easier to understand. + return "%s = ANY(?)" % (column,), [list(iterable)] + else: + return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index 70bc2bb2cc..f04aad0743 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -20,7 +20,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.logging.opentracing import log_kv, set_tag, trace -from synapse.storage._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util.caches.expiringcache import ExpiringCache @@ -378,15 +378,15 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) else: if not devices: continue - sql = ( - "SELECT device_id FROM devices" - " WHERE user_id = ? AND device_id IN (" - + ",".join("?" * len(devices)) - + ")" + + clause, args = make_in_list_sql_clause( + txn.database_engine, "device_id", devices ) + sql = "SELECT device_id FROM devices WHERE user_id = ? AND " + clause + # TODO: Maybe this needs to be done in batches if there are # too many local devices for a given user. - txn.execute(sql, [user_id] + devices) + txn.execute(sql, [user_id] + list(args)) for row in txn: # Only insert into the local inbox if the device exists on # this server diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 111bfb3d64..ac5239e50a 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -28,7 +28,12 @@ from synapse.logging.opentracing import ( whitelisted_homeserver, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import Cache, SQLBaseStore, db_to_json +from synapse.storage._base import ( + Cache, + SQLBaseStore, + db_to_json, + make_in_list_sql_clause, +) from synapse.storage.background_updates import BackgroundUpdateStore from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList @@ -448,11 +453,14 @@ class DeviceWorkerStore(SQLBaseStore): sql = """ SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ? - AND user_id IN (%s) + AND """ for chunk in batch_iter(to_check, 100): - txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk) + clause, args = make_in_list_sql_clause( + txn.database_engine, "user_id", chunk + ) + txn.execute(sql + clause, (from_key,) + tuple(args)) changes.update(user_id for user_id, in txn) return changes diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 601617b21e..b7c4eda338 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -22,6 +22,13 @@ class PostgresEngine(object): def __init__(self, database_module, database_config): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) + + # Disables passing `bytes` to txn.execute, c.f. #6186. If you do + # actually want to use bytes than wrap it in `bytearray`. + def _disable_bytes_adapter(_): + raise Exception("Passing bytes to DB is disabled.") + + self.module.extensions.register_adapter(bytes, _disable_bytes_adapter) self.synchronous_commit = database_config.get("synchronous_commit", True) self._version = None # unknown as yet @@ -79,6 +86,12 @@ class PostgresEngine(object): """ return True + @property + def supports_using_any_list(self): + """Do we support using `a = ANY(?)` and passing a list + """ + return True + def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): # https://www.postgresql.org/docs/current/static/errcodes-appendix.html diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ac92109366..ddad17dc5a 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -46,6 +46,12 @@ class Sqlite3Engine(object): """ return self.module.sqlite_version_info >= (3, 15, 0) + @property + def supports_using_any_list(self): + """Do we support using `a = ANY(?)` and passing a list + """ + return False + def check_database(self, txn): pass diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index f5e8c39262..47cc10d32a 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -25,7 +25,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import SQLBaseStore +from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.events_worker import EventsWorkerStore from synapse.storage.signatures import SignatureWorkerStore from synapse.util.caches.descriptors import cached @@ -68,7 +68,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas else: results = set() - base_sql = "SELECT auth_id FROM event_auth WHERE event_id IN (%s)" + base_sql = "SELECT auth_id FROM event_auth WHERE " front = set(event_ids) while front: @@ -76,7 +76,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas front_list = list(front) chunks = [front_list[x : x + 100] for x in range(0, len(front), 100)] for chunk in chunks: - txn.execute(base_sql % (",".join(["?"] * len(chunk)),), chunk) + clause, args = make_in_list_sql_clause( + txn.database_engine, "event_id", chunk + ) + txn.execute(base_sql + clause, list(args)) new_front.update([r[0] for r in txn]) new_front -= results diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bb6ff0595a..ee49ef235d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -39,6 +39,7 @@ from synapse.logging.utils import log_function from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore +from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore @@ -641,14 +642,16 @@ class EventsStore( LEFT JOIN rejections USING (event_id) LEFT JOIN event_json USING (event_id) WHERE - prev_event_id IN (%s) - AND NOT events.outlier + NOT events.outlier AND rejections.event_id IS NULL - """ % ( - ",".join("?" for _ in batch), + AND + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "prev_event_id", batch ) - txn.execute(sql, batch) + txn.execute(sql + clause, args) results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed")) for chunk in batch_iter(event_ids, 100): @@ -695,13 +698,15 @@ class EventsStore( LEFT JOIN rejections USING (event_id) LEFT JOIN event_json USING (event_id) WHERE - event_id IN (%s) - AND NOT events.outlier - """ % ( - ",".join("?" for _ in to_recursively_check), + NOT events.outlier + AND + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "event_id", to_recursively_check ) - txn.execute(sql, to_recursively_check) + txn.execute(sql + clause, args) to_recursively_check = [] for event_id, prev_event_id, metadata, rejected in txn: @@ -1543,10 +1548,14 @@ class EventsStore( " FROM events as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(ev_map)),) + " WHERE " + ) + + clause, args = make_in_list_sql_clause( + self.database_engine, "e.event_id", list(ev_map) + ) - txn.execute(sql, list(ev_map)) + txn.execute(sql + clause, args) rows = self.cursor_to_dict(txn) for row in rows: event = ev_map[row["event_id"]] @@ -2249,11 +2258,12 @@ class EventsStore( sql = """ SELECT DISTINCT state_group FROM event_to_state_groups LEFT JOIN events_to_purge AS ep USING (event_id) - WHERE state_group IN (%s) AND ep.event_id IS NULL - """ % ( - ",".join("?" for _ in current_search), + WHERE ep.event_id IS NULL AND + """ + clause, args = make_in_list_sql_clause( + txn.database_engine, "state_group", current_search ) - txn.execute(sql, list(current_search)) + txn.execute(sql + clause, list(args)) referenced = set(sg for sg, in txn) referenced_groups |= referenced diff --git a/synapse/storage/events_bg_updates.py b/synapse/storage/events_bg_updates.py index 5717baf48c..31ea6f917f 100644 --- a/synapse/storage/events_bg_updates.py +++ b/synapse/storage/events_bg_updates.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore logger = logging.getLogger(__name__) @@ -71,6 +72,19 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): "redactions_received_ts", self._redactions_received_ts ) + # This index gets deleted in `event_fix_redactions_bytes` update + self.register_background_index_update( + "event_fix_redactions_bytes_create_index", + index_name="redactions_censored_redacts", + table="redactions", + columns=["redacts"], + where_clause="have_censored", + ) + + self.register_background_update_handler( + "event_fix_redactions_bytes", self._event_fix_redactions_bytes + ) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -312,12 +326,13 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): INNER JOIN event_json USING (event_id) LEFT JOIN rejections USING (event_id) WHERE - prev_event_id IN (%s) - AND NOT events.outlier - """ % ( - ",".join("?" for _ in to_check), + NOT events.outlier + AND + """ + clause, args = make_in_list_sql_clause( + self.database_engine, "prev_event_id", to_check ) - txn.execute(sql, to_check) + txn.execute(sql + clause, list(args)) for prev_event_id, event_id, metadata, rejected in txn: if event_id in graph: @@ -458,3 +473,33 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): yield self._end_background_update("redactions_received_ts") return count + + @defer.inlineCallbacks + def _event_fix_redactions_bytes(self, progress, batch_size): + """Undoes hex encoded censored redacted event JSON. + """ + + def _event_fix_redactions_bytes_txn(txn): + # This update is quite fast due to new index. + txn.execute( + """ + UPDATE event_json + SET + json = convert_from(json::bytea, 'utf8') + FROM redactions + WHERE + redactions.have_censored + AND event_json.event_id = redactions.redacts + AND json NOT LIKE '{%'; + """ + ) + + txn.execute("DROP INDEX redactions_censored_redacts") + + yield self.runInteraction( + "_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn + ) + + yield self._end_background_update("event_fix_redactions_bytes") + + return 1 diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 57ce0304e9..4c4b76bd93 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -31,12 +31,11 @@ from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.types import get_domain_from_id from synapse.util import batch_iter from synapse.util.metrics import Measure -from ._base import SQLBaseStore - logger = logging.getLogger(__name__) @@ -623,10 +622,14 @@ class EventsWorkerStore(SQLBaseStore): " rej.reason " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(evs)),) + " WHERE " + ) - txn.execute(sql, evs) + clause, args = make_in_list_sql_clause( + txn.database_engine, "e.event_id", evs + ) + + txn.execute(sql + clause, args) for row in txn: event_id = row[0] @@ -640,11 +643,11 @@ class EventsWorkerStore(SQLBaseStore): } # check for redactions - redactions_sql = ( - "SELECT event_id, redacts FROM redactions WHERE redacts IN (%s)" - ) % (",".join(["?"] * len(evs)),) + redactions_sql = "SELECT event_id, redacts FROM redactions WHERE " + + clause, args = make_in_list_sql_clause(txn.database_engine, "redacts", evs) - txn.execute(redactions_sql, evs) + txn.execute(redactions_sql + clause, args) for (redacter, redacted) in txn: d = event_dict.get(redacted) @@ -753,10 +756,11 @@ class EventsWorkerStore(SQLBaseStore): results = set() def have_seen_events_txn(txn, chunk): - sql = "SELECT event_id FROM events as e WHERE e.event_id IN (%s)" % ( - ",".join("?" * len(chunk)), + sql = "SELECT event_id FROM events as e WHERE " + clause, args = make_in_list_sql_clause( + txn.database_engine, "e.event_id", chunk ) - txn.execute(sql, chunk) + txn.execute(sql + clause, args) for (event_id,) in txn: results.add(event_id) diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 23b48f6cea..7c2a7da836 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -51,7 +51,7 @@ class FilteringStore(SQLBaseStore): "SELECT filter_id FROM user_filters " "WHERE user_id = ? AND filter_json = ?" ) - txn.execute(sql, (user_localpart, def_json)) + txn.execute(sql, (user_localpart, bytearray(def_json))) filter_id_response = txn.fetchone() if filter_id_response is not None: return filter_id_response[0] @@ -68,7 +68,7 @@ class FilteringStore(SQLBaseStore): "INSERT INTO user_filters (user_id, filter_id, filter_json)" "VALUES(?, ?, ?)" ) - txn.execute(sql, (user_localpart, filter_id, def_json)) + txn.execute(sql, (user_localpart, filter_id, bytearray(def_json))) return filter_id diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py index 752e9788a2..3803604be7 100644 --- a/synapse/storage/monthly_active_users.py +++ b/synapse/storage/monthly_active_users.py @@ -32,7 +32,6 @@ class MonthlyActiveUsersStore(SQLBaseStore): super(MonthlyActiveUsersStore, self).__init__(None, hs) self._clock = hs.get_clock() self.hs = hs - self.reserved_users = () # Do not add more reserved users than the total allowable number self._new_transaction( dbconn, @@ -51,7 +50,6 @@ class MonthlyActiveUsersStore(SQLBaseStore): txn (cursor): threepids (list[dict]): List of threepid dicts to reserve """ - reserved_user_list = [] for tp in threepids: user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"]) @@ -60,10 +58,8 @@ class MonthlyActiveUsersStore(SQLBaseStore): is_support = self.is_support_user_txn(txn, user_id) if not is_support: self.upsert_monthly_active_user_txn(txn, user_id) - reserved_user_list.append(user_id) else: logger.warning("mau limit reserved threepid %s not found in db" % tp) - self.reserved_users = tuple(reserved_user_list) @defer.inlineCallbacks def reap_monthly_active_users(self): @@ -74,8 +70,11 @@ class MonthlyActiveUsersStore(SQLBaseStore): Deferred[] """ - def _reap_users(txn): - # Purge stale users + def _reap_users(txn, reserved_users): + """ + Args: + reserved_users (tuple): reserved users to preserve + """ thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30) query_args = [thirty_days_ago] @@ -83,20 +82,19 @@ class MonthlyActiveUsersStore(SQLBaseStore): # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres # when len(reserved_users) == 0. Works fine on sqlite. - if len(self.reserved_users) > 0: + if len(reserved_users) > 0: # questionmarks is a hack to overcome sqlite not supporting # tuples in 'WHERE IN %s' - questionmarks = "?" * len(self.reserved_users) + question_marks = ",".join("?" * len(reserved_users)) - query_args.extend(self.reserved_users) - sql = base_sql + """ AND user_id NOT IN ({})""".format( - ",".join(questionmarks) - ) + query_args.extend(reserved_users) + sql = base_sql + " AND user_id NOT IN ({})".format(question_marks) else: sql = base_sql txn.execute(sql, query_args) + max_mau_value = self.hs.config.max_mau_value if self.hs.config.limit_usage_by_mau: # If MAU user count still exceeds the MAU threshold, then delete on # a least recently active basis. @@ -106,31 +104,52 @@ class MonthlyActiveUsersStore(SQLBaseStore): # While Postgres does not require 'LIMIT', but also does not support # negative LIMIT values. So there is no way to write it that both can # support - safe_guard = self.hs.config.max_mau_value - len(self.reserved_users) - # Must be greater than zero for postgres - safe_guard = safe_guard if safe_guard > 0 else 0 - query_args = [safe_guard] - - base_sql = """ - DELETE FROM monthly_active_users - WHERE user_id NOT IN ( - SELECT user_id FROM monthly_active_users - ORDER BY timestamp DESC - LIMIT ? + if len(reserved_users) == 0: + sql = """ + DELETE FROM monthly_active_users + WHERE user_id NOT IN ( + SELECT user_id FROM monthly_active_users + ORDER BY timestamp DESC + LIMIT ? ) - """ + """ + txn.execute(sql, (max_mau_value,)) # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres # when len(reserved_users) == 0. Works fine on sqlite. - if len(self.reserved_users) > 0: - query_args.extend(self.reserved_users) - sql = base_sql + """ AND user_id NOT IN ({})""".format( - ",".join(questionmarks) - ) else: - sql = base_sql - txn.execute(sql, query_args) + # Must be >= 0 for postgres + num_of_non_reserved_users_to_remove = max( + max_mau_value - len(reserved_users), 0 + ) + + # It is important to filter reserved users twice to guard + # against the case where the reserved user is present in the + # SELECT, meaning that a legitmate mau is deleted. + sql = """ + DELETE FROM monthly_active_users + WHERE user_id NOT IN ( + SELECT user_id FROM monthly_active_users + WHERE user_id NOT IN ({}) + ORDER BY timestamp DESC + LIMIT ? + ) + AND user_id NOT IN ({}) + """.format( + question_marks, question_marks + ) + + query_args = [ + *reserved_users, + num_of_non_reserved_users_to_remove, + *reserved_users, + ] - yield self.runInteraction("reap_monthly_active_users", _reap_users) + txn.execute(sql, query_args) + + reserved_users = yield self.get_registered_reserved_users() + yield self.runInteraction( + "reap_monthly_active_users", _reap_users, reserved_users + ) # It seems poor to invalidate the whole cache, Postgres supports # 'Returning' which would allow me to invalidate only the # specific users, but sqlite has no way to do this and instead @@ -159,21 +178,25 @@ class MonthlyActiveUsersStore(SQLBaseStore): return self.runInteraction("count_users", _count_users) @defer.inlineCallbacks - def get_registered_reserved_users_count(self): - """Of the reserved threepids defined in config, how many are associated + def get_registered_reserved_users(self): + """Of the reserved threepids defined in config, which are associated with registered users? Returns: - Defered[int]: Number of real reserved users + Defered[list]: Real reserved users """ - count = 0 - for tp in self.hs.config.mau_limits_reserved_threepids: + users = [] + + for tp in self.hs.config.mau_limits_reserved_threepids[ + : self.hs.config.max_mau_value + ]: user_id = yield self.hs.get_datastore().get_user_id_by_threepid( tp["medium"], tp["address"] ) if user_id: - count = count + 1 - return count + users.append(user_id) + + return users @defer.inlineCallbacks def upsert_monthly_active_user(self, user_id): diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 5db6f2d84a..3a641f538b 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -18,11 +18,10 @@ from collections import namedtuple from twisted.internet import defer from synapse.api.constants import PresenceState +from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.util import batch_iter from synapse.util.caches.descriptors import cached, cachedList -from ._base import SQLBaseStore - class UserPresenceState( namedtuple( @@ -119,14 +118,13 @@ class PresenceStore(SQLBaseStore): ) # Delete old rows to stop database from getting really big - sql = ( - "DELETE FROM presence_stream WHERE" " stream_id < ?" " AND user_id IN (%s)" - ) + sql = "DELETE FROM presence_stream WHERE stream_id < ? AND " for states in batch_iter(presence_states, 50): - args = [stream_id] - args.extend(s.user_id for s in states) - txn.execute(sql % (",".join("?" for _ in states),), args) + clause, args = make_in_list_sql_clause( + self.database_engine, "user_id", [s.user_id for s in states] + ) + txn.execute(sql + clause, [stream_id] + list(args)) def get_all_presence_updates(self, last_id, current_id): if last_id == current_id: diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 3e0e834a62..b12e80440a 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -241,7 +241,7 @@ class PusherStore(PusherWorkerStore): "device_display_name": device_display_name, "ts": pushkey_ts, "lang": lang, - "data": encode_canonical_json(data), + "data": bytearray(encode_canonical_json(data)), "last_stream_ordering": last_stream_ordering, "profile_tag": profile_tag, "id": stream_id, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 290ddb30e8..0c24430f28 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -21,12 +21,11 @@ from canonicaljson import json from twisted.internet import defer +from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause +from synapse.storage.util.id_generators import StreamIdGenerator from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache -from ._base import SQLBaseStore -from .util.id_generators import StreamIdGenerator - logger = logging.getLogger(__name__) @@ -217,24 +216,26 @@ class ReceiptsWorkerStore(SQLBaseStore): def f(txn): if from_key: - sql = ( - "SELECT * FROM receipts_linearized WHERE" - " room_id IN (%s) AND stream_id > ? AND stream_id <= ?" - ) % (",".join(["?"] * len(room_ids))) - args = list(room_ids) - args.extend([from_key, to_key]) + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id > ? AND stream_id <= ? AND + """ + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) - txn.execute(sql, args) + txn.execute(sql + clause, [from_key, to_key] + list(args)) else: - sql = ( - "SELECT * FROM receipts_linearized WHERE" - " room_id IN (%s) AND stream_id <= ?" - ) % (",".join(["?"] * len(room_ids))) + sql = """ + SELECT * FROM receipts_linearized WHERE + stream_id <= ? AND + """ - args = list(room_ids) - args.append(to_key) + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) - txn.execute(sql, args) + txn.execute(sql + clause, [to_key] + list(args)) return self.cursor_to_dict(txn) @@ -433,13 +434,19 @@ class ReceiptsStore(ReceiptsWorkerStore): # we need to points in graph -> linearized form. # TODO: Make this better. def graph_to_linear(txn): - query = ( - "SELECT event_id WHERE room_id = ? AND stream_ordering IN (" - " SELECT max(stream_ordering) WHERE event_id IN (%s)" - ")" - ) % (",".join(["?"] * len(event_ids))) + clause, args = make_in_list_sql_clause( + self.database_engine, "event_id", event_ids + ) + + sql = """ + SELECT event_id WHERE room_id = ? AND stream_ordering IN ( + SELECT max(stream_ordering) WHERE %s + ) + """ % ( + clause, + ) - txn.execute(query, [room_id] + event_ids) + txn.execute(sql, [room_id] + list(args)) rows = txn.fetchall() if rows: return rows[0][0] diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 4e606a8380..ff63487823 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -26,7 +26,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.storage._base import LoggingTransaction +from synapse.storage._base import LoggingTransaction, make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import Sqlite3Engine from synapse.storage.events_worker import EventsWorkerStore @@ -372,6 +372,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): results = [] if membership_list: if self._current_state_events_membership_up_to_date: + clause, args = make_in_list_sql_clause( + self.database_engine, "c.membership", membership_list + ) sql = """ SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering FROM current_state_events AS c @@ -379,11 +382,14 @@ class RoomMemberWorkerStore(EventsWorkerStore): WHERE c.type = 'm.room.member' AND state_key = ? - AND c.membership IN (%s) + AND %s """ % ( - ",".join("?" * len(membership_list)) + clause, ) else: + clause, args = make_in_list_sql_clause( + self.database_engine, "m.membership", membership_list + ) sql = """ SELECT room_id, e.sender, m.membership, event_id, e.stream_ordering FROM current_state_events AS c @@ -392,12 +398,12 @@ class RoomMemberWorkerStore(EventsWorkerStore): WHERE c.type = 'm.room.member' AND state_key = ? - AND m.membership IN (%s) + AND %s """ % ( - ",".join("?" * len(membership_list)) + clause, ) - txn.execute(sql, (user_id, *membership_list)) + txn.execute(sql, (user_id, *args)) results = [RoomsForUser(**r) for r in self.cursor_to_dict(txn)] if do_invite: diff --git a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres index f7bcc5e2f2..67471f3ef5 100644 --- a/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres +++ b/synapse/storage/schema/delta/56/redaction_censor3_fix_update.sql.postgres @@ -15,12 +15,11 @@ -- There was a bug where we may have updated censored redactions as bytes, --- which can (somehow) cause json to be inserted hex encoded. This goes and --- undoes any such hex encoded JSON. -UPDATE event_json SET json = convert_from(json::bytea, 'utf8') -WHERE event_id IN ( - SELECT event_json.event_id - FROM event_json - INNER JOIN redactions ON (event_json.event_id = redacts) - WHERE have_censored AND json NOT LIKE '{%' -); +-- which can (somehow) cause json to be inserted hex encoded. These updates go +-- and undoes any such hex encoded JSON. + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_fix_redactions_bytes_create_index', '{}'); + +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ('event_fix_redactions_bytes', '{}', 'event_fix_redactions_bytes_create_index'); diff --git a/synapse/storage/schema/delta/56/unique_user_filter_index.py b/synapse/storage/schema/delta/56/unique_user_filter_index.py index 60031f23ca..1de8b54961 100644 --- a/synapse/storage/schema/delta/56/unique_user_filter_index.py +++ b/synapse/storage/schema/delta/56/unique_user_filter_index.py @@ -5,42 +5,48 @@ from synapse.storage.engines import PostgresEngine logger = logging.getLogger(__name__) +""" +This migration updates the user_filters table as follows: + + - drops any (user_id, filter_id) duplicates + - makes the columns NON-NULLable + - turns the index into a UNIQUE index +""" + + def run_upgrade(cur, database_engine, *args, **kwargs): + pass + + +def run_create(cur, database_engine, *args, **kwargs): if isinstance(database_engine, PostgresEngine): select_clause = """ - CREATE TEMPORARY TABLE user_filters_migration AS SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json - FROM user_filters; + FROM user_filters """ else: select_clause = """ - CREATE TEMPORARY TABLE user_filters_migration AS - SELECT * FROM user_filters GROUP BY user_id, filter_id; + SELECT * FROM user_filters GROUP BY user_id, filter_id """ - sql = ( - """ - BEGIN; - %s - DROP INDEX user_filters_by_user_id_filter_id; - DELETE FROM user_filters; - ALTER TABLE user_filters - ALTER COLUMN user_id SET NOT NULL, - ALTER COLUMN filter_id SET NOT NULL, - ALTER COLUMN filter_json SET NOT NULL; - INSERT INTO user_filters(user_id, filter_id, filter_json) - SELECT * FROM user_filters_migration; - DROP TABLE user_filters_migration; - CREATE UNIQUE INDEX user_filters_by_user_id_filter_id_unique - ON user_filters(user_id, filter_id); - END; - """ - % select_clause + sql = """ + DROP TABLE IF EXISTS user_filters_migration; + DROP INDEX IF EXISTS user_filters_unique; + CREATE TABLE user_filters_migration ( + user_id TEXT NOT NULL, + filter_id BIGINT NOT NULL, + filter_json BYTEA NOT NULL + ); + INSERT INTO user_filters_migration (user_id, filter_id, filter_json) + %s; + CREATE UNIQUE INDEX user_filters_unique ON user_filters_migration + (user_id, filter_id); + DROP TABLE user_filters; + ALTER TABLE user_filters_migration RENAME TO user_filters; + """ % ( + select_clause, ) + if isinstance(database_engine, PostgresEngine): cur.execute(sql) else: cur.executescript(sql) - - -def run_create(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 6ba4190f1a..7695bf09fc 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -24,6 +24,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.storage._base import make_in_list_sql_clause from synapse.storage.engines import PostgresEngine, Sqlite3Engine from .background_updates import BackgroundUpdateStore @@ -385,8 +386,10 @@ class SearchStore(SearchBackgroundUpdateStore): # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. if len(room_ids) < 500: - clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)) - args.extend(room_ids) + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + clauses = [clause] local_clauses = [] for key in keys: @@ -492,8 +495,10 @@ class SearchStore(SearchBackgroundUpdateStore): # Make sure we don't explode because the person is in too many rooms. # We filter the results below regardless. if len(room_ids) < 500: - clauses.append("room_id IN (%s)" % (",".join(["?"] * len(room_ids)),)) - args.extend(room_ids) + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + clauses = [clause] local_clauses = [] for key in keys: diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py index 5fdb442104..28f33ec18f 100644 --- a/synapse/storage/state_deltas.py +++ b/synapse/storage/state_deltas.py @@ -21,7 +21,7 @@ logger = logging.getLogger(__name__) class StateDeltasStore(SQLBaseStore): - def get_current_state_deltas(self, prev_stream_id): + def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int): """Fetch a list of room state changes since the given stream id Each entry in the result contains the following fields: @@ -36,15 +36,27 @@ class StateDeltasStore(SQLBaseStore): Args: prev_stream_id (int): point to get changes since (exclusive) + max_stream_id (int): the point that we know has been correctly persisted + - ie, an upper limit to return changes from. Returns: - Deferred[list[dict]]: results + Deferred[tuple[int, list[dict]]: A tuple consisting of: + - the stream id which these results go up to + - list of current_state_delta_stream rows. If it is empty, we are + up to date. """ prev_stream_id = int(prev_stream_id) + + # check we're not going backwards + assert prev_stream_id <= max_stream_id + if not self._curr_state_delta_stream_cache.has_any_entity_changed( prev_stream_id ): - return [] + # if the CSDs haven't changed between prev_stream_id and now, we + # know for certain that they haven't changed between prev_stream_id and + # max_stream_id. + return max_stream_id, [] def get_current_state_deltas_txn(txn): # First we calculate the max stream id that will give us less than @@ -54,21 +66,29 @@ class StateDeltasStore(SQLBaseStore): sql = """ SELECT stream_id, count(*) FROM current_state_delta_stream - WHERE stream_id > ? + WHERE stream_id > ? AND stream_id <= ? GROUP BY stream_id ORDER BY stream_id ASC LIMIT 100 """ - txn.execute(sql, (prev_stream_id,)) + txn.execute(sql, (prev_stream_id, max_stream_id)) total = 0 - max_stream_id = prev_stream_id - for max_stream_id, count in txn: + + for stream_id, count in txn: total += count if total > 100: # We arbitarily limit to 100 entries to ensure we don't # select toooo many. + logger.debug( + "Clipping current_state_delta_stream rows to stream_id %i", + stream_id, + ) + clipped_stream_id = stream_id break + else: + # if there's no problem, we may as well go right up to the max_stream_id + clipped_stream_id = max_stream_id # Now actually get the deltas sql = """ @@ -77,8 +97,8 @@ class StateDeltasStore(SQLBaseStore): WHERE ? < stream_id AND stream_id <= ? ORDER BY stream_id ASC """ - txn.execute(sql, (prev_stream_id, max_stream_id)) - return self.cursor_to_dict(txn) + txn.execute(sql, (prev_stream_id, clipped_stream_id)) + return clipped_stream_id, self.cursor_to_dict(txn) return self.runInteraction( "get_current_state_deltas", get_current_state_deltas_txn diff --git a/synapse/storage/user_erasure_store.py b/synapse/storage/user_erasure_store.py index 05cabc2282..aa4f0da5f0 100644 --- a/synapse/storage/user_erasure_store.py +++ b/synapse/storage/user_erasure_store.py @@ -56,15 +56,15 @@ class UserErasureWorkerStore(SQLBaseStore): # iterate it multiple times, and (b) avoiding duplicates. user_ids = tuple(set(user_ids)) - def _get_erased_users(txn): - txn.execute( - "SELECT user_id FROM erased_users WHERE user_id IN (%s)" - % (",".join("?" * len(user_ids))), - user_ids, - ) - return set(r[0] for r in txn) - - erased_users = yield self.runInteraction("are_users_erased", _get_erased_users) + rows = yield self._simple_select_many_batch( + table="erased_users", + column="user_id", + iterable=user_ids, + retcols=("user_id",), + desc="are_users_erased", + ) + erased_users = set(row["user_id"] for row in rows) + res = dict((u, u in erased_users) for u in user_ids) return res diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py new file mode 100644 index 0000000000..3925927f9f --- /dev/null +++ b/synapse/util/patch_inline_callbacks.py @@ -0,0 +1,219 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import functools +import sys +from typing import Any, Callable, List + +from twisted.internet import defer +from twisted.internet.defer import Deferred +from twisted.python.failure import Failure + +# Tracks if we've already patched inlineCallbacks +_already_patched = False + + +def do_patch(): + """ + Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit + """ + + from synapse.logging.context import LoggingContext + + global _already_patched + + orig_inline_callbacks = defer.inlineCallbacks + if _already_patched: + return + + def new_inline_callbacks(f): + @functools.wraps(f) + def wrapped(*args, **kwargs): + start_context = LoggingContext.current_context() + changes = [] # type: List[str] + orig = orig_inline_callbacks(_check_yield_points(f, changes)) + + try: + res = orig(*args, **kwargs) + except Exception: + if LoggingContext.current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + + err = "%s changed context from %s to %s on exception" % ( + f, + start_context, + LoggingContext.current_context(), + ) + print(err, file=sys.stderr) + raise Exception(err) + raise + + if not isinstance(res, Deferred) or res.called: + if LoggingContext.current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + + err = "Completed %s changed context from %s to %s" % ( + f, + start_context, + LoggingContext.current_context(), + ) + # print the error to stderr because otherwise all we + # see in travis-ci is the 500 error + print(err, file=sys.stderr) + raise Exception(err) + return res + + if LoggingContext.current_context() != LoggingContext.sentinel: + err = ( + "%s returned incomplete deferred in non-sentinel context " + "%s (start was %s)" + ) % (f, LoggingContext.current_context(), start_context) + print(err, file=sys.stderr) + raise Exception(err) + + def check_ctx(r): + if LoggingContext.current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + err = "%s completion of %s changed context from %s to %s" % ( + "Failure" if isinstance(r, Failure) else "Success", + f, + start_context, + LoggingContext.current_context(), + ) + print(err, file=sys.stderr) + raise Exception(err) + return r + + res.addBoth(check_ctx) + return res + + return wrapped + + defer.inlineCallbacks = new_inline_callbacks + _already_patched = True + + +def _check_yield_points(f: Callable, changes: List[str]): + """Wraps a generator that is about to be passed to defer.inlineCallbacks + checking that after every yield the log contexts are correct. + + It's perfectly valid for log contexts to change within a function, e.g. due + to new Measure blocks, so such changes are added to the given `changes` + list instead of triggering an exception. + + Args: + f: generator function to wrap + changes: A list of strings detailing how the contexts + changed within a function. + + Returns: + function + """ + + from synapse.logging.context import LoggingContext + + @functools.wraps(f) + def check_yield_points_inner(*args, **kwargs): + gen = f(*args, **kwargs) + + last_yield_line_no = gen.gi_frame.f_lineno + result = None # type: Any + while True: + expected_context = LoggingContext.current_context() + + try: + isFailure = isinstance(result, Failure) + if isFailure: + d = result.throwExceptionIntoGenerator(gen) + else: + d = gen.send(result) + except (StopIteration, defer._DefGen_Return) as e: + if LoggingContext.current_context() != expected_context: + # This happens when the context is lost sometime *after* the + # final yield and returning. E.g. we forgot to yield on a + # function that returns a deferred. + # + # We don't raise here as it's perfectly valid for contexts to + # change in a function, as long as it sets the correct context + # on resolving (which is checked separately). + err = ( + "Function %r returned and changed context from %s to %s," + " in %s between %d and end of func" + % ( + f.__qualname__, + expected_context, + LoggingContext.current_context(), + f.__code__.co_filename, + last_yield_line_no, + ) + ) + changes.append(err) + return getattr(e, "value", None) + + frame = gen.gi_frame + + if isinstance(d, defer.Deferred) and not d.called: + # This happens if we yield on a deferred that doesn't follow + # the log context rules without wrapping in a `make_deferred_yieldable`. + # We raise here as this should never happen. + if LoggingContext.current_context() is not LoggingContext.sentinel: + err = ( + "%s yielded with context %s rather than sentinel," + " yielded on line %d in %s" + % ( + frame.f_code.co_name, + LoggingContext.current_context(), + frame.f_lineno, + frame.f_code.co_filename, + ) + ) + raise Exception(err) + + try: + result = yield d + except Exception as e: + result = Failure(e) + + if LoggingContext.current_context() != expected_context: + + # This happens because the context is lost sometime *after* the + # previous yield and *after* the current yield. E.g. the + # deferred we waited on didn't follow the rules, or we forgot to + # yield on a function between the two yield points. + # + # We don't raise here as its perfectly valid for contexts to + # change in a function, as long as it sets the correct context + # on resolving (which is checked separately). + err = ( + "%s changed context from %s to %s, happened between lines %d and %d in %s" + % ( + frame.f_code.co_name, + expected_context, + LoggingContext.current_context(), + last_yield_line_no, + frame.f_lineno, + frame.f_code.co_filename, + ) + ) + changes.append(err) + + last_yield_line_no = frame.f_lineno + + return check_yield_points_inner |