diff options
author | Erik Johnston <erik@matrix.org> | 2016-06-20 14:18:04 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-06-20 14:18:04 +0100 |
commit | bc72d381b2d5322982204bb214453e2af56f70d5 (patch) | |
tree | 4f6932f9a32e5e25b80e8202be75d04e50a9686e /synapse | |
parent | point to the CAPTCHA docs (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-bc72d381b2d5322982204bb214453e2af56f70d5.tar.xz |
Merge branch 'release-v0.16.1' of github.com:matrix-org/synapse v0.16.1
Diffstat (limited to 'synapse')
33 files changed, 545 insertions, 504 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index dc211e9637..3cd79b1247 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.16.0" +__version__ = "0.16.1" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 22e1721fc4..40ffd9bf0d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -266,10 +266,9 @@ def setup(config_options): HomeServer """ try: - config = HomeServerConfig.load_config( + config = HomeServerConfig.load_or_generate_config( "Synapse Homeserver", config_options, - generate_section="Homeserver" ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4ec23d84c1..4f1d18ab5f 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -18,10 +18,8 @@ import synapse from synapse.server import HomeServer from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.emailconfig import EmailConfig -from synapse.config.key import KeyConfig +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore @@ -43,98 +41,13 @@ from twisted.web.resource import Resource from daemonize import Daemonize -import gc import sys import logging +import gc logger = logging.getLogger("synapse.app.pusher") -class SlaveConfig(DatabaseConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.start_pushers = True - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.public_baseurl = config["public_baseurl"] - - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None - - # some things used by the auth handler but not actually used in the - # pusher codebase - self.bcrypt_rounds = None - self.ldap_enabled = None - self.ldap_server = None - self.ldap_port = None - self.ldap_tls = None - self.ldap_search_base = None - self.ldap_search_property = None - self.ldap_email_property = None - self.ldap_full_name_property = None - - # We would otherwise try to use the registration shared secret as the - # macaroon shared secret if there was no macaroon_shared_secret, but - # that means pulling in RegistrationConfig too. We don't need to be - # backwards compaitible in the pusher codebase so just make people set - # macaroon_shared_secret. We set this to None to prevent it referencing - # an undefined key. - self.registration_shared_secret = None - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("pusher.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: [] - # Enable a ssh manhole listener on the pusher. - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the pusher. - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - - """ % locals() - - -class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig): - pass - - class PusherSlaveStore( SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore @@ -199,7 +112,7 @@ class PusherServer(HomeServer): def remove_pusher(self, app_id, push_key, user_id): http_client = self.get_simple_http_client() - replication_url = self.config.replication_url + replication_url = self.config.worker_replication_url url = replication_url + "/remove_pushers" return http_client.post_json_get_json(url, { "remove": [{ @@ -232,8 +145,8 @@ class PusherServer(HomeServer): ) logger.info("Synapse pusher now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -253,7 +166,7 @@ class PusherServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.config.worker_replication_url pusher_pool = self.get_pusherpool() clock = self.get_clock() @@ -329,19 +242,30 @@ class PusherServer(HomeServer): yield sleep(30) -def setup(config_options): +def start(config_options): try: - config = PusherSlaveConfig.load_config( + config = HomeServerConfig.load_config( "Synapse pusher", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + assert config.worker_app == "synapse.app.pusher" + + setup_logging(config.worker_log_config, config.worker_log_file) - config.setup_logging() + if config.start_pushers: + sys.stderr.write( + "\nThe pushers must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``start_pushers: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.start_pushers = True database_engine = create_engine(config.database_config) @@ -354,11 +278,15 @@ def setup(config_options): ) ps.setup() - ps.start_listening() - - change_resource_limit(ps.config.soft_file_limit) - if ps.config.gc_thresholds: - gc.set_threshold(*ps.config.gc_thresholds) + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() def start(): ps.replicate() @@ -367,30 +295,20 @@ def setup(config_options): reactor.callWhenRunning(start) - return ps + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-pusher", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ps = setup(sys.argv[1:]) - - if ps.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ps.config.soft_file_limit) - if ps.config.gc_thresholds: - gc.set_threshold(*ps.config.gc_thresholds) - reactor.run() - - daemon = Daemonize( - app="synapse-pusher", - pid=ps.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + ps = start(sys.argv[1:]) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 297e199453..8cf5bbbb6d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -18,9 +18,8 @@ import synapse from synapse.api.constants import EventTypes, PresenceState from synapse.config._base import ConfigError -from synapse.config.database import DatabaseConfig -from synapse.config.logger import LoggingConfig -from synapse.config.appservice import AppServiceConfig +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.events import FrozenEvent from synapse.handlers.presence import PresenceHandler from synapse.http.site import SynapseSite @@ -63,70 +62,6 @@ import ujson as json logger = logging.getLogger("synapse.app.synchrotron") -class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): - def read_config(self, config): - self.replication_url = config["replication_url"] - self.server_name = config["server_name"] - self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( - "use_insecure_ssl_client_just_for_testing_do_not_use", False - ) - self.user_agent_suffix = None - self.listeners = config["listeners"] - self.soft_file_limit = config.get("soft_file_limit") - self.daemonize = config.get("daemonize") - self.pid_file = self.abspath(config.get("pid_file")) - self.macaroon_secret_key = config["macaroon_secret_key"] - self.expire_access_token = config.get("expire_access_token", False) - - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None - - def default_config(self, server_name, **kwargs): - pid_file = self.abspath("synchroton.pid") - return """\ - # Slave configuration - - # The replication listener on the synapse to talk to. - #replication_url: https://localhost:{replication_port}/_synapse/replication - - server_name: "%(server_name)s" - - listeners: - # Enable a /sync listener on the synchrontron - #- type: http - # port: {http_port} - # bind_address: "" - # Enable a ssh manhole listener on the synchrotron - # - type: manhole - # port: {manhole_port} - # bind_address: 127.0.0.1 - # Enable a metric listener on the synchrotron - # - type: http - # port: {metrics_port} - # bind_address: 127.0.0.1 - # resources: - # - names: ["metrics"] - # compress: False - - report_stats: False - - daemonize: False - - pid_file: %(pid_file)s - """ % locals() - - class SynchrotronSlavedStore( SlavedPushRuleStore, SlavedEventStore, @@ -163,7 +98,7 @@ class SynchrotronPresence(object): self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} - self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() active_presence = self.store.take_presence_startup_info() @@ -350,8 +285,8 @@ class SynchrotronServer(HomeServer): ) logger.info("Synapse synchrotron now listening on port %d", port) - def start_listening(self): - for listener in self.config.listeners: + def start_listening(self, listeners): + for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": @@ -371,7 +306,7 @@ class SynchrotronServer(HomeServer): def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() - replication_url = self.config.replication_url + replication_url = self.config.worker_replication_url clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() @@ -470,19 +405,18 @@ class SynchrotronServer(HomeServer): return SynchrotronTyping(self) -def setup(config_options): +def start(config_options): try: - config = SynchrotronConfig.load_config( + config = HomeServerConfig.load_config( "Synapse synchrotron", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) - if not config: - sys.exit(0) + assert config.worker_app == "synapse.app.synchrotron" - config.setup_logging() + setup_logging(config.worker_log_config, config.worker_log_file) database_engine = create_engine(config.database_config) @@ -496,11 +430,15 @@ def setup(config_options): ) ss.setup() - ss.start_listening() - - change_resource_limit(ss.config.soft_file_limit) - if ss.config.gc_thresholds: - ss.set_threshold(*ss.config.gc_thresholds) + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() def start(): ss.get_datastore().start_profiling() @@ -508,30 +446,20 @@ def setup(config_options): reactor.callWhenRunning(start) - return ss + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-synchrotron", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() if __name__ == '__main__': with LoggingContext("main"): - ss = setup(sys.argv[1:]) - - if ss.config.daemonize: - def run(): - with LoggingContext("run"): - change_resource_limit(ss.config.soft_file_limit) - if ss.config.gc_thresholds: - gc.set_threshold(*ss.config.gc_thresholds) - reactor.run() - - daemon = Daemonize( - app="synapse-synchrotron", - pid=ss.config.pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - reactor.run() + start(sys.argv[1:]) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 7449f36491..af9f17bf7b 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -157,9 +157,40 @@ class Config(object): return default_config, config @classmethod - def load_config(cls, description, argv, generate_section=None): + def load_config(cls, description, argv): + config_parser = argparse.ArgumentParser( + description=description, + ) + config_parser.add_argument( + "-c", "--config-path", + action="append", + metavar="CONFIG_FILE", + help="Specify config file. Can be given multiple times and" + " may specify directories containing *.yaml files." + ) + + config_parser.add_argument( + "--keys-directory", + metavar="DIRECTORY", + help="Where files such as certs and signing keys are stored when" + " their location is given explicitly in the config." + " Defaults to the directory containing the last config file", + ) + + config_args = config_parser.parse_args(argv) + + config_files = find_config_files(search_paths=config_args.config_path) + obj = cls() + obj.read_config_files( + config_files, + keys_directory=config_args.keys_directory, + generate_keys=False, + ) + return obj + @classmethod + def load_or_generate_config(cls, description, argv): config_parser = argparse.ArgumentParser(add_help=False) config_parser.add_argument( "-c", "--config-path", @@ -176,7 +207,7 @@ class Config(object): config_parser.add_argument( "--report-stats", action="store", - help="Stuff", + help="Whether the generated config reports anonymized usage statistics", choices=["yes", "no"] ) config_parser.add_argument( @@ -197,36 +228,11 @@ class Config(object): ) config_args, remaining_args = config_parser.parse_known_args(argv) + config_files = find_config_files(search_paths=config_args.config_path) + generate_keys = config_args.generate_keys - config_files = [] - if config_args.config_path: - for config_path in config_args.config_path: - if os.path.isdir(config_path): - # We accept specifying directories as config paths, we search - # inside that directory for all files matching *.yaml, and then - # we apply them in *sorted* order. - files = [] - for entry in os.listdir(config_path): - entry_path = os.path.join(config_path, entry) - if not os.path.isfile(entry_path): - print ( - "Found subdirectory in config directory: %r. IGNORING." - ) % (entry_path, ) - continue - - if not entry.endswith(".yaml"): - print ( - "Found file in config directory that does not" - " end in '.yaml': %r. IGNORING." - ) % (entry_path, ) - continue - - files.append(entry_path) - - config_files.extend(sorted(files)) - else: - config_files.append(config_path) + obj = cls() if config_args.generate_config: if config_args.report_stats is None: @@ -299,28 +305,43 @@ class Config(object): " -c CONFIG-FILE\"" ) - if config_args.keys_directory: - config_dir_path = config_args.keys_directory - else: - config_dir_path = os.path.dirname(config_args.config_path[-1]) - config_dir_path = os.path.abspath(config_dir_path) + obj.read_config_files( + config_files, + keys_directory=config_args.keys_directory, + generate_keys=generate_keys, + ) + + if generate_keys: + return None + + obj.invoke_all("read_arguments", args) + + return obj + + def read_config_files(self, config_files, keys_directory=None, + generate_keys=False): + if not keys_directory: + keys_directory = os.path.dirname(config_files[-1]) + + config_dir_path = os.path.abspath(keys_directory) specified_config = {} for config_file in config_files: - yaml_config = cls.read_config_file(config_file) + yaml_config = self.read_config_file(config_file) specified_config.update(yaml_config) if "server_name" not in specified_config: raise ConfigError(MISSING_SERVER_NAME) server_name = specified_config["server_name"] - _, config = obj.generate_config( + _, config = self.generate_config( config_dir_path=config_dir_path, server_name=server_name, is_generating_file=False, ) config.pop("log_config") config.update(specified_config) + if "report_stats" not in config: raise ConfigError( MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" + @@ -328,11 +349,51 @@ class Config(object): ) if generate_keys: - obj.invoke_all("generate_files", config) + self.invoke_all("generate_files", config) return - obj.invoke_all("read_config", config) - - obj.invoke_all("read_arguments", args) - - return obj + self.invoke_all("read_config", config) + + +def find_config_files(search_paths): + """Finds config files using a list of search paths. If a path is a file + then that file path is added to the list. If a search path is a directory + then all the "*.yaml" files in that directory are added to the list in + sorted order. + + Args: + search_paths(list(str)): A list of paths to search. + + Returns: + list(str): A list of file paths. + """ + + config_files = [] + if search_paths: + for config_path in search_paths: + if os.path.isdir(config_path): + # We accept specifying directories as config paths, we search + # inside that directory for all files matching *.yaml, and then + # we apply them in *sorted* order. + files = [] + for entry in os.listdir(config_path): + entry_path = os.path.join(config_path, entry) + if not os.path.isfile(entry_path): + print ( + "Found subdirectory in config directory: %r. IGNORING." + ) % (entry_path, ) + continue + + if not entry.endswith(".yaml"): + print ( + "Found file in config directory that does not" + " end in '.yaml': %r. IGNORING." + ) % (entry_path, ) + continue + + files.append(entry_path) + + config_files.extend(sorted(files)) + else: + config_files.append(config_path) + return config_files diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index fc2445484c..79b0534b3b 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -32,13 +32,15 @@ from .password import PasswordConfig from .jwt import JWTConfig from .ldap import LDAPConfig from .emailconfig import EmailConfig +from .workers import WorkerConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, VoipConfig, RegistrationConfig, MetricsConfig, ApiConfig, AppServiceConfig, KeyConfig, SAML2Config, CasConfig, - JWTConfig, LDAPConfig, PasswordConfig, EmailConfig,): + JWTConfig, LDAPConfig, PasswordConfig, EmailConfig, + WorkerConfig,): pass diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 5047db898f..dc68683fbc 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -126,54 +126,58 @@ class LoggingConfig(Config): ) def setup_logging(self): - log_format = ( - "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" - " - %(message)s" - ) - if self.log_config is None: - - level = logging.INFO - level_for_storage = logging.INFO - if self.verbosity: - level = logging.DEBUG - if self.verbosity > 1: - level_for_storage = logging.DEBUG - - # FIXME: we need a logging.WARN for a -q quiet option - logger = logging.getLogger('') - logger.setLevel(level) - - logging.getLogger('synapse.storage').setLevel(level_for_storage) - - formatter = logging.Formatter(log_format) - if self.log_file: - # TODO: Customisable file size / backup count - handler = logging.handlers.RotatingFileHandler( - self.log_file, maxBytes=(1000 * 1000 * 100), backupCount=3 - ) - - def sighup(signum, stack): - logger.info("Closing log file due to SIGHUP") - handler.doRollover() - logger.info("Opened new log file due to SIGHUP") - - # TODO(paul): obviously this is a terrible mechanism for - # stealing SIGHUP, because it means no other part of synapse - # can use it instead. If we want to catch SIGHUP anywhere - # else as well, I'd suggest we find a nicer way to broadcast - # it around. - if getattr(signal, "SIGHUP"): - signal.signal(signal.SIGHUP, sighup) - else: - handler = logging.StreamHandler() - handler.setFormatter(formatter) - - handler.addFilter(LoggingContextFilter(request="")) - - logger.addHandler(handler) + setup_logging(self.log_config, self.log_file, self.verbosity) + + +def setup_logging(log_config=None, log_file=None, verbosity=None): + log_format = ( + "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" + " - %(message)s" + ) + if log_config is None: + + level = logging.INFO + level_for_storage = logging.INFO + if verbosity: + level = logging.DEBUG + if verbosity > 1: + level_for_storage = logging.DEBUG + + # FIXME: we need a logging.WARN for a -q quiet option + logger = logging.getLogger('') + logger.setLevel(level) + + logging.getLogger('synapse.storage').setLevel(level_for_storage) + + formatter = logging.Formatter(log_format) + if log_file: + # TODO: Customisable file size / backup count + handler = logging.handlers.RotatingFileHandler( + log_file, maxBytes=(1000 * 1000 * 100), backupCount=3 + ) + + def sighup(signum, stack): + logger.info("Closing log file due to SIGHUP") + handler.doRollover() + logger.info("Opened new log file due to SIGHUP") + + # TODO(paul): obviously this is a terrible mechanism for + # stealing SIGHUP, because it means no other part of synapse + # can use it instead. If we want to catch SIGHUP anywhere + # else as well, I'd suggest we find a nicer way to broadcast + # it around. + if getattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, sighup) else: - with open(self.log_config, 'r') as f: - logging.config.dictConfig(yaml.load(f)) + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + handler.addFilter(LoggingContextFilter(request="")) + + logger.addHandler(handler) + else: + with open(log_config, 'r') as f: + logging.config.dictConfig(yaml.load(f)) - observer = PythonLoggingObserver() - observer.start() + observer = PythonLoggingObserver() + observer.start() diff --git a/synapse/config/server.py b/synapse/config/server.py index 44b8d422e0..7840dc3ad6 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -27,7 +27,7 @@ class ServerConfig(Config): self.daemonize = config.get("daemonize") self.print_pidfile = config.get("print_pidfile") self.user_agent_suffix = config.get("user_agent_suffix") - self.use_frozen_dicts = config.get("use_frozen_dicts", True) + self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") self.secondary_directory_servers = config.get("secondary_directory_servers", []) @@ -38,19 +38,7 @@ class ServerConfig(Config): self.listeners = config.get("listeners", []) - thresholds = config.get("gc_thresholds", None) - if thresholds is not None: - try: - assert len(thresholds) == 3 - self.gc_thresholds = ( - int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), - ) - except: - raise ConfigError( - "Value of `gc_threshold` must be a list of three integers if set" - ) - else: - self.gc_thresholds = None + self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None)) bind_port = config.get("bind_port") if bind_port: @@ -264,3 +252,20 @@ class ServerConfig(Config): type=int, help="Turn on the twisted telnet manhole" " service on the given port.") + + +def read_gc_thresholds(thresholds): + """Reads the three integer thresholds for garbage collection. Ensures that + the thresholds are integers if thresholds are supplied. + """ + if thresholds is None: + return None + try: + assert len(thresholds) == 3 + return ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) diff --git a/synapse/config/workers.py b/synapse/config/workers.py new file mode 100644 index 0000000000..904789d155 --- /dev/null +++ b/synapse/config/workers.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class WorkerConfig(Config): + """The workers are processes run separately to the main synapse process. + They have their own pid_file and listener configuration. They use the + replication_url to talk to the main synapse process.""" + + def read_config(self, config): + self.worker_app = config.get("worker_app") + self.worker_listeners = config.get("worker_listeners") + self.worker_daemonize = config.get("worker_daemonize") + self.worker_pid_file = config.get("worker_pid_file") + self.worker_log_file = config.get("worker_log_file") + self.worker_log_config = config.get("worker_log_config") + self.worker_replication_url = config.get("worker_replication_url") diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index a0b7cb7963..da2f5e8cfd 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -31,6 +31,9 @@ logger = logging.getLogger(__name__) class FederationBase(object): + def __init__(self, hs): + pass + @defer.inlineCallbacks def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False, include_none=False): diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d835c1b038..b06387051c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -52,6 +52,8 @@ sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) class FederationClient(FederationBase): + def __init__(self, hs): + super(FederationClient, self).__init__(hs) def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index f1d231b9d8..2a589524a4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Transaction, Edu +from synapse.util.async import Linearizer from synapse.util.logutils import log_function from synapse.events import FrozenEvent import synapse.metrics @@ -44,6 +45,11 @@ received_queries_counter = metrics.register_counter("received_queries", labels=[ class FederationServer(FederationBase): + def __init__(self, hs): + super(FederationServer, self).__init__(hs) + + self._room_pdu_linearizer = Linearizer() + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -187,13 +193,16 @@ class FederationServer(FederationBase): ) for event in auth_chain: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) else: raise NotImplementedError("Specify an event") @@ -377,10 +386,20 @@ class FederationServer(FederationBase): @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): + logger.info( + "on_get_missing_events: earliest_events: %r, latest_events: %r," + " limit: %d, min_depth: %d", + earliest_events, latest_events, limit, min_depth + ) missing_events = yield self.handler.on_get_missing_events( origin, room_id, earliest_events, latest_events, limit, min_depth ) + if len(missing_events) < 5: + logger.info("Returning %d events: %r", len(missing_events), missing_events) + else: + logger.info("Returning %d events", len(missing_events)) + time_now = self._clock.time_msec() defer.returnValue({ @@ -481,42 +500,59 @@ class FederationServer(FederationBase): pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: if get_missing and prevs - seen: - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if prevs - seen: + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) prevs = {e_id for e_id, _ in pdu.prev_events} seen = set(have_seen.keys()) if prevs - seen: + logger.info( + "Still missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) fetch_state = True if fetch_state: diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 3e062a5eab..ea66a5dcbc 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer): self.hs = hs + super(ReplicationLayer, self).__init__(hs) + def __str__(self): return "<ReplicationLayer(%s)>" % self.server_name diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a1a334955f..8a1965f45a 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -37,7 +37,7 @@ class TransportLayerServer(JsonResource): self.hs = hs self.clock = hs.get_clock() - super(TransportLayerServer, self).__init__(hs) + super(TransportLayerServer, self).__init__(hs, canonical_json=False) self.authenticator = Authenticator(hs) self.ratelimiter = FederationRateLimiter( @@ -528,15 +528,10 @@ class PublicRoomList(BaseFederationServlet): PATH = "/publicRooms" @defer.inlineCallbacks - def on_GET(self, request): + def on_GET(self, origin, content, query): data = yield self.room_list_handler.get_local_public_room_list() defer.returnValue((200, data)) - # Avoid doing remote HS authorization checks which are done by default by - # BaseFederationServlet. - def _wrap(self, code): - return code - SERVLET_CLASSES = ( FederationSendServlet, diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 200793b5ed..b38f81e999 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -626,6 +626,6 @@ class AuthHandler(BaseHandler): Whether self.hash(password) == stored_hash (bool). """ if stored_hash: - return bcrypt.hashpw(password, stored_hash) == stored_hash + return bcrypt.hashpw(password, stored_hash.encode('utf-8')) == stored_hash else: return False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ff83c608e7..6c0bc7eafa 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -345,19 +345,21 @@ class FederationHandler(BaseHandler): ) missing_auth = required_auth - set(auth_events) - results = yield defer.gatherResults( - [ - self.replication_layer.get_pdu( - [dest], - event_id, - outlier=True, - timeout=10000, - ) - for event_id in missing_auth - ], - consumeErrors=True - ).addErrback(unwrapFirstError) - auth_events.update({a.event_id: a for a in results}) + if missing_auth: + logger.info("Missing auth for backfill: %r", missing_auth) + results = yield defer.gatherResults( + [ + self.replication_layer.get_pdu( + [dest], + event_id, + outlier=True, + timeout=10000, + ) + for event_id in missing_auth + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results}) ev_infos = [] for a in auth_events.values(): @@ -399,7 +401,7 @@ class FederationHandler(BaseHandler): # previous to work out the state. # TODO: We can probably do something more clever here. yield self._handle_new_event( - dest, event + dest, event, backfilled=True, ) defer.returnValue(events) @@ -1016,13 +1018,16 @@ class FederationHandler(BaseHandler): res = results.values() for event in res: - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) ) - ) defer.returnValue(res) else: diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index e37409170d..711a6a567f 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -36,13 +36,6 @@ class ProfileHandler(BaseHandler): "profile", self.on_profile_query ) - distributor = hs.get_distributor() - - distributor.observe("registered_user", self.registered_user) - - def registered_user(self, user): - return self.store.create_profile(user.localpart) - @defer.inlineCallbacks def get_displayname(self, target_user): if self.hs.is_mine(target_user): diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index bbc07b045e..0b7517221d 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -23,7 +23,6 @@ from synapse.api.errors import ( from ._base import BaseHandler from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient -from synapse.util.distributor import registered_user import logging import urllib @@ -37,8 +36,6 @@ class RegistrationHandler(BaseHandler): super(RegistrationHandler, self).__init__(hs) self.auth = hs.get_auth() - self.distributor = hs.get_distributor() - self.distributor.declare("registered_user") self.captcha_client = CaptchaServerHttpClient(hs) self._next_generated_user_id = None @@ -140,9 +137,11 @@ class RegistrationHandler(BaseHandler): password_hash=password_hash, was_guest=was_guest, make_guest=make_guest, + create_profile_with_localpart=( + # If the user was a guest then they already have a profile + None if was_guest else user.localpart + ), ) - - yield registered_user(self.distributor, user) else: # autogen a sequential user ID attempts = 0 @@ -160,7 +159,8 @@ class RegistrationHandler(BaseHandler): user_id=user_id, token=token, password_hash=password_hash, - make_guest=make_guest + make_guest=make_guest, + create_profile_with_localpart=user.localpart, ) except SynapseError: # if user id is taken, just generate another @@ -168,7 +168,6 @@ class RegistrationHandler(BaseHandler): user_id = None token = None attempts += 1 - yield registered_user(self.distributor, user) # We used to generate default identicons here, but nowadays # we want clients to generate their own as part of their branding @@ -201,8 +200,8 @@ class RegistrationHandler(BaseHandler): token=token, password_hash="", appservice_id=service_id, + create_profile_with_localpart=user.localpart, ) - yield registered_user(self.distributor, user) defer.returnValue((user_id, token)) @defer.inlineCallbacks @@ -248,9 +247,9 @@ class RegistrationHandler(BaseHandler): yield self.store.register( user_id=user_id, token=token, - password_hash=None + password_hash=None, + create_profile_with_localpart=user.localpart, ) - yield registered_user(self.distributor, user) except Exception as e: yield self.store.add_access_token_to_user(user_id, token) # Ignore Registration errors @@ -388,17 +387,16 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - auth_handler = self.hs.get_handlers().auth_handler - token = auth_handler.generate_short_term_login_token(user_id, duration_seconds) + token = self.auth_handler().generate_short_term_login_token( + user_id, duration_seconds) if need_register: yield self.store.register( user_id=user_id, token=token, - password_hash=None + password_hash=None, + create_profile_with_localpart=user.localpart, ) - - yield registered_user(self.distributor, user) else: yield self.store.user_delete_access_tokens(user_id=user_id) yield self.store.add_access_token_to_user(user_id=user_id, token=token) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9fd34588dd..ae44c7a556 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -20,7 +20,7 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken from synapse.api.constants import ( - EventTypes, JoinRules, RoomCreationPreset, + EventTypes, JoinRules, RoomCreationPreset, Membership, ) from synapse.api.errors import AuthError, StoreError, SynapseError from synapse.util import stringutils @@ -367,14 +367,10 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def handle_room(room_id): - # We pull each bit of state out indvidually to avoid pulling the - # full state into memory. Due to how the caching works this should - # be fairly quick, even if not originally in the cache. - def get_state(etype, state_key): - return self.state_handler.get_current_state(room_id, etype, state_key) + current_state = yield self.state_handler.get_current_state(room_id) # Double check that this is actually a public room. - join_rules_event = yield get_state(EventTypes.JoinRules, "") + join_rules_event = current_state.get((EventTypes.JoinRules, "")) if join_rules_event: join_rule = join_rules_event.content.get("join_rule", None) if join_rule and join_rule != JoinRules.PUBLIC: @@ -382,47 +378,51 @@ class RoomListHandler(BaseHandler): result = {"room_id": room_id} - joined_users = yield self.store.get_users_in_room(room_id) - if len(joined_users) == 0: + num_joined_users = len([ + 1 for _, event in current_state.items() + if event.type == EventTypes.Member + and event.membership == Membership.JOIN + ]) + if num_joined_users == 0: return - result["num_joined_members"] = len(joined_users) + result["num_joined_members"] = num_joined_users aliases = yield self.store.get_aliases_for_room(room_id) if aliases: result["aliases"] = aliases - name_event = yield get_state(EventTypes.Name, "") + name_event = yield current_state.get((EventTypes.Name, "")) if name_event: name = name_event.content.get("name", None) if name: result["name"] = name - topic_event = yield get_state(EventTypes.Topic, "") + topic_event = current_state.get((EventTypes.Topic, "")) if topic_event: topic = topic_event.content.get("topic", None) if topic: result["topic"] = topic - canonical_event = yield get_state(EventTypes.CanonicalAlias, "") + canonical_event = current_state.get((EventTypes.CanonicalAlias, "")) if canonical_event: canonical_alias = canonical_event.content.get("alias", None) if canonical_alias: result["canonical_alias"] = canonical_alias - visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "") + visibility_event = current_state.get((EventTypes.RoomHistoryVisibility, "")) visibility = None if visibility_event: visibility = visibility_event.content.get("history_visibility", None) result["world_readable"] = visibility == "world_readable" - guest_event = yield get_state(EventTypes.GuestAccess, "") + guest_event = current_state.get((EventTypes.GuestAccess, "")) guest = None if guest_event: guest = guest_event.content.get("guest_access", None) result["guest_can_join"] = guest == "can_join" - avatar_event = yield get_state("m.room.avatar", "") + avatar_event = current_state.get(("m.room.avatar", "")) if avatar_event: avatar_url = avatar_event.content.get("url", None) if avatar_url: diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 861b8f7989..5589296c09 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -221,6 +221,9 @@ class TypingHandler(object): def get_all_typing_updates(self, last_id, current_id): # TODO: Work out a way to do this without scanning the entire state. + if last_id == current_id: + return [] + rows = [] for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: diff --git a/synapse/http/client.py b/synapse/http/client.py index c7fa692435..3ec9bc7faf 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -24,12 +24,13 @@ from synapse.http.endpoint import SpiderEndpoint from canonicaljson import encode_canonical_json -from twisted.internet import defer, reactor, ssl, protocol +from twisted.internet import defer, reactor, ssl, protocol, task from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint from twisted.web.client import ( BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent, - readBody, FileBodyProducer, PartialDownloadError, + readBody, PartialDownloadError, ) +from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone @@ -468,3 +469,26 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory): def creatorForNetloc(self, hostname, port): return self + + +class FileBodyProducer(TwistedFileBodyProducer): + """Workaround for https://twistedmatrix.com/trac/ticket/8473 + + We override the pauseProducing and resumeProducing methods in twisted's + FileBodyProducer so that they do not raise exceptions if the task has + already completed. + """ + + def pauseProducing(self): + try: + super(FileBodyProducer, self).pauseProducing() + except task.TaskDone: + # task has already completed + pass + + def resumeProducing(self): + try: + super(FileBodyProducer, self).resumeProducing() + except task.NotPaused: + # task was not paused (probably because it had already completed) + pass diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index e5c3929cd7..1028731bc9 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -273,16 +273,16 @@ class Mailer(object): sender_state_event = room_state[("m.room.member", event.sender)] sender_name = name_from_member_event(sender_state_event) - sender_avatar_url = None - if "avatar_url" in sender_state_event.content: - sender_avatar_url = sender_state_event.content["avatar_url"] + sender_avatar_url = sender_state_event.content.get("avatar_url") # 'hash' for deterministically picking default images: use # sender_hash % the number of default images to choose from sender_hash = string_ordinal_total(event.sender) + msgtype = event.content.get("msgtype") + ret = { - "msgtype": event.content["msgtype"], + "msgtype": msgtype, "is_historical": event.event_id != notif['event_id'], "id": event.event_id, "ts": event.origin_server_ts, @@ -291,9 +291,9 @@ class Mailer(object): "sender_hash": sender_hash, } - if event.content["msgtype"] == "m.text": + if msgtype == "m.text": self.add_text_message_vars(ret, event) - elif event.content["msgtype"] == "m.image": + elif msgtype == "m.image": self.add_image_message_vars(ret, event) if "body" in event.content: @@ -302,16 +302,17 @@ class Mailer(object): return ret def add_text_message_vars(self, messagevars, event): - if "format" in event.content: - msgformat = event.content["format"] - else: - msgformat = None + msgformat = event.content.get("format") + messagevars["format"] = msgformat - if msgformat == "org.matrix.custom.html": - messagevars["body_text_html"] = safe_markup(event.content["formatted_body"]) - else: - messagevars["body_text_html"] = safe_text(event.content["body"]) + formatted_body = event.content.get("formatted_body") + body = event.content.get("body") + + if msgformat == "org.matrix.custom.html" and formatted_body: + messagevars["body_text_html"] = safe_markup(formatted_body) + elif body: + messagevars["body_text_html"] = safe_text(body) return messagevars diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index d1afa0f0d5..498bb9e18a 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -45,30 +45,27 @@ class EventStreamRestServlet(ClientV1RestServlet): raise SynapseError(400, "Guest users must specify room_id param") if "room_id" in request.args: room_id = request.args["room_id"][0] - try: - handler = self.handlers.event_stream_handler - pagin_config = PaginationConfig.from_request(request) - timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS - if "timeout" in request.args: - try: - timeout = int(request.args["timeout"][0]) - except ValueError: - raise SynapseError(400, "timeout must be in milliseconds.") - - as_client_event = "raw" not in request.args - - chunk = yield handler.get_stream( - requester.user.to_string(), - pagin_config, - timeout=timeout, - as_client_event=as_client_event, - affect_presence=(not is_guest), - room_id=room_id, - is_guest=is_guest, - ) - except: - logger.exception("Event stream failed") - raise + + handler = self.handlers.event_stream_handler + pagin_config = PaginationConfig.from_request(request) + timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS + if "timeout" in request.args: + try: + timeout = int(request.args["timeout"][0]) + except ValueError: + raise SynapseError(400, "timeout must be in milliseconds.") + + as_client_event = "raw" not in request.args + + chunk = yield handler.get_stream( + requester.user.to_string(), + pagin_config, + timeout=timeout, + as_client_event=as_client_event, + affect_presence=(not is_guest), + room_id=room_id, + is_guest=is_guest, + ) defer.returnValue((200, chunk)) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index db52a1fc39..86fbe2747d 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -72,8 +72,6 @@ class RoomCreateRestServlet(ClientV1RestServlet): def get_room_config(self, request): user_supplied_config = parse_json_object_from_request(request) - # default visibility - user_supplied_config.setdefault("visibility", "public") return user_supplied_config def on_OPTIONS(self, request): @@ -279,6 +277,13 @@ class PublicRoomListRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request): + try: + yield self.auth.get_user_by_req(request) + except AuthError: + # This endpoint isn't authed, but its useful to know who's hitting + # it if they *do* supply an access token + pass + handler = self.hs.get_room_list_handler() data = yield handler.get_aggregated_public_room_list() diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index d96bf9afe2..2468c3ac42 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -26,6 +26,7 @@ from .thumbnailer import Thumbnailer from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.util.stringutils import random_string +from synapse.api.errors import SynapseError from twisted.internet import defer, threads @@ -134,10 +135,15 @@ class MediaRepository(object): request_path = "/".join(( "/_matrix/media/v1/download", server_name, media_id, )) - length, headers = yield self.client.get_file( - server_name, request_path, output_stream=f, - max_size=self.max_upload_size, - ) + try: + length, headers = yield self.client.get_file( + server_name, request_path, output_stream=f, + max_size=self.max_upload_size, + ) + except Exception as e: + logger.warn("Failed to fetch remoted media %r", e) + raise SynapseError(502, "Failed to fetch remoted media") + media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 37dd1de899..74c64f1371 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -252,7 +252,8 @@ class PreviewUrlResource(Resource): og = {} for tag in tree.xpath("//*/meta[starts-with(@property, 'og:')]"): - og[tag.attrib['property']] = tag.attrib['content'] + if 'content' in tag.attrib: + og[tag.attrib['property']] = tag.attrib['content'] # TODO: grab article: meta tags too, e.g.: @@ -279,7 +280,7 @@ class PreviewUrlResource(Resource): # TODO: consider inlined CSS styles as well as width & height attribs images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]") images = sorted(images, key=lambda i: ( - -1 * int(i.attrib['width']) * int(i.attrib['height']) + -1 * float(i.attrib['width']) * float(i.attrib['height']) )) if not images: images = tree.xpath("//img[@src]") @@ -287,9 +288,9 @@ class PreviewUrlResource(Resource): og['og:image'] = images[0].attrib['src'] # pre-cache the image for posterity - # FIXME: it might be cleaner to use the same flow as the main /preview_url request - # itself and benefit from the same caching etc. But for now we just rely on the - # caching on the master request to speed things up. + # FIXME: it might be cleaner to use the same flow as the main /preview_url + # request itself and benefit from the same caching etc. But for now we + # just rely on the caching on the master request to speed things up. if 'og:image' in og and og['og:image']: image_info = yield self._download_url( self._rebase_url(og['og:image'], media_info['uri']), requester.user diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ec7e8d40d2..3fa226e92d 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -138,6 +138,9 @@ class AccountDataStore(SQLBaseStore): A deferred pair of lists of tuples of stream_id int, user_id string, room_id string, type string, and content string. """ + if last_room_id == current_id and last_global_id == current_id: + return defer.succeed(([], [])) + def get_updated_account_data_txn(txn): sql = ( "SELECT stream_id, user_id, account_data_type, content" diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 3fab57a7e8..d03f7c541e 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -118,6 +118,9 @@ class PresenceStore(SQLBaseStore): ) def get_all_presence_updates(self, last_id, current_id): + if last_id == current_id: + return defer.succeed([]) + def get_all_presence_updates_txn(txn): sql = ( "SELECT stream_id, user_id, state, last_active_ts," diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 786d6f6d67..8183b7f1b0 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -421,6 +421,9 @@ class PushRuleStore(SQLBaseStore): def get_all_push_rule_updates(self, last_id, current_id, limit): """Get all the push rules changes that have happend on the server""" + if last_id == current_id: + return defer.succeed([]) + def get_all_push_rule_updates_txn(txn): sql = ( "SELECT stream_id, event_stream_ordering, user_id, rule_id," diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index bda84a744a..3de9e0f709 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -76,7 +76,8 @@ class RegistrationStore(SQLBaseStore): @defer.inlineCallbacks def register(self, user_id, token, password_hash, - was_guest=False, make_guest=False, appservice_id=None): + was_guest=False, make_guest=False, appservice_id=None, + create_profile_with_localpart=None): """Attempts to register an account. Args: @@ -88,6 +89,8 @@ class RegistrationStore(SQLBaseStore): make_guest (boolean): True if the the new user should be guest, false to add a regular user account. appservice_id (str): The ID of the appservice registering the user. + create_profile_with_localpart (str): Optionally create a profile for + the given localpart. Raises: StoreError if the user_id could not be registered. """ @@ -99,7 +102,8 @@ class RegistrationStore(SQLBaseStore): password_hash, was_guest, make_guest, - appservice_id + appservice_id, + create_profile_with_localpart, ) self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) @@ -112,7 +116,8 @@ class RegistrationStore(SQLBaseStore): password_hash, was_guest, make_guest, - appservice_id + appservice_id, + create_profile_with_localpart, ): now = int(self.clock.time()) @@ -157,6 +162,12 @@ class RegistrationStore(SQLBaseStore): (next_id, user_id, token,) ) + if create_profile_with_localpart: + txn.execute( + "INSERT INTO profiles(user_id) VALUES (?)", + (create_profile_with_localpart,) + ) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py index 9da23f34cb..5a2c1aa59b 100644 --- a/synapse/storage/tags.py +++ b/synapse/storage/tags.py @@ -68,6 +68,9 @@ class TagsStore(SQLBaseStore): A deferred list of tuples of stream_id int, user_id string, room_id string, tag string and content string. """ + if last_id == current_id: + defer.returnValue([]) + def get_all_updated_tags_txn(txn): sql = ( "SELECT stream_id, user_id, room_id" diff --git a/synapse/types.py b/synapse/types.py index 7b6ae44bdd..f639651a73 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -22,7 +22,10 @@ Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) def get_domain_from_id(string): - return string.split(":", 1)[1] + try: + return string.split(":", 1)[1] + except IndexError: + raise SynapseError(400, "Invalid ID: %r", string) class DomainSpecificString( diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index d7cccc06b1..e68f94ce77 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -27,10 +27,6 @@ import logging logger = logging.getLogger(__name__) -def registered_user(distributor, user): - return distributor.fire("registered_user", user) - - def user_left_room(distributor, user, room_id): return preserve_context_over_fn( distributor.fire, |