diff options
Diffstat (limited to 'synapse/app')
-rwxr-xr-x | synapse/app/homeserver.py | 358 | ||||
-rw-r--r-- | synapse/app/pusher.py | 396 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 537 | ||||
-rwxr-xr-x | synapse/app/synctl.py | 4 |
4 files changed, 971 insertions, 324 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fcdc8e6e10..22e1721fc4 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,14 +16,10 @@ import synapse -import contextlib +import gc import logging import os -import re -import resource -import subprocess import sys -import time from synapse.config._base import ConfigError from synapse.python_dependencies import ( @@ -33,22 +29,15 @@ from synapse.python_dependencies import ( from synapse.rest import ClientRestResource from synapse.storage.engines import create_engine, IncorrectDatabaseSetup from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.server import HomeServer - -from twisted.conch.manhole import ColoredManhole -from twisted.conch.insults import insults -from twisted.conch import manhole_ssh -from twisted.cred import checkers, portal - - from twisted.internet import reactor, task, defer from twisted.application import service from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File -from twisted.web.server import Site, GzipEncoderFactory, Request +from twisted.web.server import GzipEncoderFactory from synapse.http.server import RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -66,6 +55,13 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.federation.transport.server import TransportLayerServer +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.manhole import manhole + +from synapse.http.site import SynapseSite + from synapse import events from daemonize import Daemonize @@ -73,9 +69,6 @@ from daemonize import Daemonize logger = logging.getLogger("synapse.app.homeserver") -ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') - - def gz_wrap(r): return EncodingResourceWrapper(r, [GzipEncoderFactory()]) @@ -173,7 +166,12 @@ class SynapseHomeServer(HomeServer): if name == "replication": resources[REPLICATION_PREFIX] = ReplicationResource(self) - root_resource = create_resource_tree(resources) + if WEB_CLIENT_PREFIX in resources: + root_resource = RootRedirect(WEB_CLIENT_PREFIX) + else: + root_resource = Resource() + + root_resource = create_resource_tree(resources, root_resource) if tls: reactor.listenSSL( port, @@ -206,24 +204,13 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( - matrix="rabbithole" - ) - - rlm = manhole_ssh.TerminalRealm() - rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( - ColoredManhole, - { - "__name__": "__console__", - "hs": self, - } - ) - - f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - reactor.listenTCP( listener["port"], - f, + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), interface=listener.get("bind_address", '127.0.0.1') ) else: @@ -245,7 +232,7 @@ class SynapseHomeServer(HomeServer): except IncorrectDatabaseSetup as e: quit_with_error(e.message) - def get_db_conn(self): + def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. db_params = { @@ -254,7 +241,8 @@ class SynapseHomeServer(HomeServer): } db_conn = self.database_engine.module.connect(**db_params) - self.database_engine.on_new_connection(db_conn) + if run_new_connection: + self.database_engine.on_new_connection(db_conn) return db_conn @@ -268,86 +256,6 @@ def quit_with_error(error_string): sys.exit(1) -def get_version_string(): - try: - null = open(os.devnull, 'w') - cwd = os.path.dirname(os.path.abspath(__file__)) - try: - git_branch = subprocess.check_output( - ['git', 'rev-parse', '--abbrev-ref', 'HEAD'], - stderr=null, - cwd=cwd, - ).strip() - git_branch = "b=" + git_branch - except subprocess.CalledProcessError: - git_branch = "" - - try: - git_tag = subprocess.check_output( - ['git', 'describe', '--exact-match'], - stderr=null, - cwd=cwd, - ).strip() - git_tag = "t=" + git_tag - except subprocess.CalledProcessError: - git_tag = "" - - try: - git_commit = subprocess.check_output( - ['git', 'rev-parse', '--short', 'HEAD'], - stderr=null, - cwd=cwd, - ).strip() - except subprocess.CalledProcessError: - git_commit = "" - - try: - dirty_string = "-this_is_a_dirty_checkout" - is_dirty = subprocess.check_output( - ['git', 'describe', '--dirty=' + dirty_string], - stderr=null, - cwd=cwd, - ).strip().endswith(dirty_string) - - git_dirty = "dirty" if is_dirty else "" - except subprocess.CalledProcessError: - git_dirty = "" - - if git_branch or git_tag or git_commit or git_dirty: - git_version = ",".join( - s for s in - (git_branch, git_tag, git_commit, git_dirty,) - if s - ) - - return ( - "Synapse/%s (%s)" % ( - synapse.__version__, git_version, - ) - ).encode("ascii") - except Exception as e: - logger.info("Failed to check for git repository: %s", e) - - return ("Synapse/%s" % (synapse.__version__,)).encode("ascii") - - -def change_resource_limit(soft_file_no): - try: - soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) - - if not soft_file_no: - soft_file_no = hard - - resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard)) - logger.info("Set file limit to: %d", soft_file_no) - - resource.setrlimit( - resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) - ) - except (ValueError, resource.error) as e: - logger.warn("Failed to set file or core limit: %s", e) - - def setup(config_options): """ Args: @@ -377,7 +285,7 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = get_version_string() + version_string = get_version_string("Synapse", synapse) logger.info("Server hostname: %s", config.server_name) logger.info("Server version: %s", version_string) @@ -386,7 +294,7 @@ def setup(config_options): tls_server_context_factory = context_factory.ServerContextFactory(config) - database_engine = create_engine(config) + database_engine = create_engine(config.database_config) config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection hs = SynapseHomeServer( @@ -402,8 +310,10 @@ def setup(config_options): logger.info("Preparing database: %s...", config.database_config['name']) try: - db_conn = hs.get_db_conn() - database_engine.prepare_database(db_conn) + db_conn = hs.get_db_conn(run_new_connection=False) + prepare_database(db_conn, database_engine, config=config) + database_engine.on_new_connection(db_conn) + hs.run_startup_checks(db_conn, database_engine) db_conn.commit() @@ -442,215 +352,13 @@ class SynapseService(service.Service): def startService(self): hs = setup(self.config) change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) def stopService(self): return self._port.stopListening() -class SynapseRequest(Request): - def __init__(self, site, *args, **kw): - Request.__init__(self, *args, **kw) - self.site = site - self.authenticated_entity = None - self.start_time = 0 - - def __repr__(self): - # We overwrite this so that we don't log ``access_token`` - return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( - self.__class__.__name__, - id(self), - self.method, - self.get_redacted_uri(), - self.clientproto, - self.site.site_tag, - ) - - def get_redacted_uri(self): - return ACCESS_TOKEN_RE.sub( - r'\1<redacted>\3', - self.uri - ) - - def get_user_agent(self): - return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] - - def started_processing(self): - self.site.access_logger.info( - "%s - %s - Received request: %s %s", - self.getClientIP(), - self.site.site_tag, - self.method, - self.get_redacted_uri() - ) - self.start_time = int(time.time() * 1000) - - def finished_processing(self): - - try: - context = LoggingContext.current_context() - ru_utime, ru_stime = context.get_resource_usage() - db_txn_count = context.db_txn_count - db_txn_duration = context.db_txn_duration - except: - ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration = (0, 0) - - self.site.access_logger.info( - "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms) (%dms/%d)" - " %sB %s \"%s %s %s\" \"%s\"", - self.getClientIP(), - self.site.site_tag, - self.authenticated_entity, - int(time.time() * 1000) - self.start_time, - int(ru_utime * 1000), - int(ru_stime * 1000), - int(db_txn_duration * 1000), - int(db_txn_count), - self.sentLength, - self.code, - self.method, - self.get_redacted_uri(), - self.clientproto, - self.get_user_agent(), - ) - - @contextlib.contextmanager - def processing(self): - self.started_processing() - yield - self.finished_processing() - - -class XForwardedForRequest(SynapseRequest): - def __init__(self, *args, **kw): - SynapseRequest.__init__(self, *args, **kw) - - """ - Add a layer on top of another request that only uses the value of an - X-Forwarded-For header as the result of C{getClientIP}. - """ - def getClientIP(self): - """ - @return: The client address (the first address) in the value of the - I{X-Forwarded-For header}. If the header is not present, return - C{b"-"}. - """ - return self.requestHeaders.getRawHeaders( - b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() - - -class SynapseRequestFactory(object): - def __init__(self, site, x_forwarded_for): - self.site = site - self.x_forwarded_for = x_forwarded_for - - def __call__(self, *args, **kwargs): - if self.x_forwarded_for: - return XForwardedForRequest(self.site, *args, **kwargs) - else: - return SynapseRequest(self.site, *args, **kwargs) - - -class SynapseSite(Site): - """ - Subclass of a twisted http Site that does access logging with python's - standard logging - """ - def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): - Site.__init__(self, resource, *args, **kwargs) - - self.site_tag = site_tag - - proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(self, proxied) - self.access_logger = logging.getLogger(logger_name) - - def log(self, request): - pass - - -def create_resource_tree(desired_tree, redirect_root_to_web_client=True): - """Create the resource tree for this Home Server. - - This in unduly complicated because Twisted does not support putting - child resources more than 1 level deep at a time. - - Args: - web_client (bool): True to enable the web client. - redirect_root_to_web_client (bool): True to redirect '/' to the - location of the web client. This does nothing if web_client is not - True. - """ - if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree: - root_resource = RootRedirect(WEB_CLIENT_PREFIX) - else: - root_resource = Resource() - - # ideally we'd just use getChild and putChild but getChild doesn't work - # unless you give it a Request object IN ADDITION to the name :/ So - # instead, we'll store a copy of this mapping so we can actually add - # extra resources to existing nodes. See self._resource_id for the key. - resource_mappings = {} - for full_path, res in desired_tree.items(): - logger.info("Attaching %s to path %s", res, full_path) - last_resource = root_resource - for path_seg in full_path.split('/')[1:-1]: - if path_seg not in last_resource.listNames(): - # resource doesn't exist, so make a "dummy resource" - child_resource = Resource() - last_resource.putChild(path_seg, child_resource) - res_id = _resource_id(last_resource, path_seg) - resource_mappings[res_id] = child_resource - last_resource = child_resource - else: - # we have an existing Resource, use that instead. - res_id = _resource_id(last_resource, path_seg) - last_resource = resource_mappings[res_id] - - # =========================== - # now attach the actual desired resource - last_path_seg = full_path.split('/')[-1] - - # if there is already a resource here, thieve its children and - # replace it - res_id = _resource_id(last_resource, last_path_seg) - if res_id in resource_mappings: - # there is a dummy resource at this path already, which needs - # to be replaced with the desired resource. - existing_dummy_resource = resource_mappings[res_id] - for child_name in existing_dummy_resource.listNames(): - child_res_id = _resource_id( - existing_dummy_resource, child_name - ) - child_resource = resource_mappings[child_res_id] - # steal the children - res.putChild(child_name, child_resource) - - # finally, insert the desired resource in the right place - last_resource.putChild(last_path_seg, res) - res_id = _resource_id(last_resource, last_path_seg) - resource_mappings[res_id] = res - - return root_resource - - -def _resource_id(resource, path_seg): - """Construct an arbitrary resource ID so you can retrieve the mapping - later. - - If you want to represent resource A putChild resource B with path C, - the mapping should looks like _resource_id(A,C) = B. - - Args: - resource (Resource): The *parent* Resourceb - path_seg (str): The name of the child Resource to be attached. - Returns: - str: A unique string which can be a key to the child Resource. - """ - return "%s-%s" % (resource, path_seg) - - def run(hs): PROFILE_SYNAPSE = False if PROFILE_SYNAPSE: @@ -717,6 +425,8 @@ def run(hs): # sys.settrace(logcontext_tracer) with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) + if hs.config.gc_thresholds: + gc.set_threshold(*hs.config.gc_thresholds) reactor.run() if hs.config.daemonize: diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py new file mode 100644 index 0000000000..4ec23d84c1 --- /dev/null +++ b/synapse/app/pusher.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.emailconfig import EmailConfig +from synapse.config.key import KeyConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.storage.roommember import RoomMemberStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.storage.engines import create_engine +from synapse.storage import DataStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import gc +import sys +import logging + +logger = logging.getLogger("synapse.app.pusher") + + +class SlaveConfig(DatabaseConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.start_pushers = True + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.public_baseurl = config["public_baseurl"] + + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + + # some things used by the auth handler but not actually used in the + # pusher codebase + self.bcrypt_rounds = None + self.ldap_enabled = None + self.ldap_server = None + self.ldap_port = None + self.ldap_tls = None + self.ldap_search_base = None + self.ldap_search_property = None + self.ldap_email_property = None + self.ldap_full_name_property = None + + # We would otherwise try to use the registration shared secret as the + # macaroon shared secret if there was no macaroon_shared_secret, but + # that means pulling in RegistrationConfig too. We don't need to be + # backwards compaitible in the pusher codebase so just make people set + # macaroon_shared_secret. We set this to None to prevent it referencing + # an undefined key. + self.registration_shared_secret = None + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("pusher.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: [] + # Enable a ssh manhole listener on the pusher. + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the pusher. + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + + """ % locals() + + +class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig): + pass + + +class PusherSlaveStore( + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, + SlavedAccountDataStore +): + update_pusher_last_stream_ordering_and_success = ( + DataStore.update_pusher_last_stream_ordering_and_success.__func__ + ) + + update_pusher_failing_since = ( + DataStore.update_pusher_failing_since.__func__ + ) + + update_pusher_last_stream_ordering = ( + DataStore.update_pusher_last_stream_ordering.__func__ + ) + + get_throttle_params_by_room = ( + DataStore.get_throttle_params_by_room.__func__ + ) + + set_throttle_params = ( + DataStore.set_throttle_params.__func__ + ) + + get_time_of_last_push_action_before = ( + DataStore.get_time_of_last_push_action_before.__func__ + ) + + get_profile_displayname = ( + DataStore.get_profile_displayname.__func__ + ) + + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + + +class PusherServer(HomeServer): + + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = PusherSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def remove_pusher(self, app_id, push_key, user_id): + http_client = self.get_simple_http_client() + replication_url = self.config.replication_url + url = replication_url + "/remove_pushers" + return http_client.post_json_get_json(url, { + "remove": [{ + "app_id": app_id, + "push_key": push_key, + "user_id": user_id, + }] + }) + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse pusher now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + pusher_pool = self.get_pusherpool() + clock = self.get_clock() + + def stop_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + pushers_for_user = pusher_pool.pushers.get(user_id, {}) + pusher = pushers_for_user.pop(key, None) + if pusher is None: + return + logger.info("Stopping pusher %r / %r", user_id, key) + pusher.on_stop() + + def start_pusher(user_id, app_id, pushkey): + key = "%s:%s" % (app_id, pushkey) + logger.info("Starting pusher %r / %r", user_id, key) + return pusher_pool._refresh_pusher(app_id, pushkey, user_id) + + @defer.inlineCallbacks + def poke_pushers(results): + pushers_rows = set( + map(tuple, results.get("pushers", {}).get("rows", [])) + ) + deleted_pushers_rows = set( + map(tuple, results.get("deleted_pushers", {}).get("rows", [])) + ) + for row in sorted(pushers_rows | deleted_pushers_rows): + if row in deleted_pushers_rows: + user_id, app_id, pushkey = row[1:4] + stop_pusher(user_id, app_id, pushkey) + elif row in pushers_rows: + user_id = row[1] + app_id = row[5] + pushkey = row[8] + yield start_pusher(user_id, app_id, pushkey) + + stream = results.get("events") + if stream: + min_stream_id = stream["rows"][0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_notifications)( + min_stream_id, max_stream_id + ) + + stream = results.get("receipts") + if stream: + rows = stream["rows"] + affected_room_ids = set(row[1] for row in rows) + min_stream_id = rows[0][0] + max_stream_id = stream["position"] + preserve_fn(pusher_pool.on_new_receipts)( + min_stream_id, max_stream_id, affected_room_ids + ) + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + poke_pushers(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(30) + + +def setup(config_options): + try: + config = PusherSlaveConfig.load_config( + "Synapse pusher", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ps = PusherServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening() + + change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) + + def start(): + ps.replicate() + ps.get_pusherpool().start() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + return ps + + +if __name__ == '__main__': + with LoggingContext("main"): + ps = setup(sys.argv[1:]) + + if ps.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ps.config.soft_file_limit) + if ps.config.gc_thresholds: + gc.set_threshold(*ps.config.gc_thresholds) + reactor.run() + + daemon = Daemonize( + app="synapse-pusher", + pid=ps.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py new file mode 100644 index 0000000000..297e199453 --- /dev/null +++ b/synapse/app/synchrotron.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.api.constants import EventTypes, PresenceState +from synapse.config._base import ConfigError +from synapse.config.database import DatabaseConfig +from synapse.config.logger import LoggingConfig +from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent +from synapse.handlers.presence import PresenceHandler +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.rest.client.v2_alpha import sync +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore, UserPresenceState +from synapse.storage.roommember import RoomMemberStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.stringutils import random_string +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import contextlib +import gc +import ujson as json + +logger = logging.getLogger("synapse.app.synchrotron") + + +class SynchrotronConfig(DatabaseConfig, LoggingConfig, AppServiceConfig): + def read_config(self, config): + self.replication_url = config["replication_url"] + self.server_name = config["server_name"] + self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get( + "use_insecure_ssl_client_just_for_testing_do_not_use", False + ) + self.user_agent_suffix = None + self.listeners = config["listeners"] + self.soft_file_limit = config.get("soft_file_limit") + self.daemonize = config.get("daemonize") + self.pid_file = self.abspath(config.get("pid_file")) + self.macaroon_secret_key = config["macaroon_secret_key"] + self.expire_access_token = config.get("expire_access_token", False) + + thresholds = config.get("gc_thresholds", None) + if thresholds is not None: + try: + assert len(thresholds) == 3 + self.gc_thresholds = ( + int(thresholds[0]), int(thresholds[1]), int(thresholds[2]), + ) + except: + raise ConfigError( + "Value of `gc_threshold` must be a list of three integers if set" + ) + else: + self.gc_thresholds = None + + def default_config(self, server_name, **kwargs): + pid_file = self.abspath("synchroton.pid") + return """\ + # Slave configuration + + # The replication listener on the synapse to talk to. + #replication_url: https://localhost:{replication_port}/_synapse/replication + + server_name: "%(server_name)s" + + listeners: + # Enable a /sync listener on the synchrontron + #- type: http + # port: {http_port} + # bind_address: "" + # Enable a ssh manhole listener on the synchrotron + # - type: manhole + # port: {manhole_port} + # bind_address: 127.0.0.1 + # Enable a metric listener on the synchrotron + # - type: http + # port: {metrics_port} + # bind_address: 127.0.0.1 + # resources: + # - names: ["metrics"] + # compress: False + + report_stats: False + + daemonize: False + + pid_file: %(pid_file)s + """ % locals() + + +class SynchrotronSlavedStore( + SlavedPushRuleStore, + SlavedEventStore, + SlavedReceiptsStore, + SlavedAccountDataStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + SlavedFilteringStore, + SlavedPresenceStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStore because the constructor is different +): + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + +UPDATE_SYNCING_USERS_MS = 10 * 1000 + + +class SynchrotronPresence(object): + def __init__(self, hs): + self.http_client = hs.get_simple_http_client() + self.store = hs.get_datastore() + self.user_to_num_current_syncs = {} + self.syncing_users_url = hs.config.replication_url + "/syncing_users" + self.clock = hs.get_clock() + + active_presence = self.store.take_presence_startup_info() + self.user_to_current_state = { + state.user_id: state + for state in active_presence + } + + self.process_id = random_string(16) + logger.info("Presence process_id is %r", self.process_id) + + self._sending_sync = False + self._need_to_send_sync = False + self.clock.looping_call( + self._send_syncing_users_regularly, + UPDATE_SYNCING_USERS_MS, + ) + + reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) + + def set_state(self, user, state): + # TODO Hows this supposed to work? + pass + + get_states = PresenceHandler.get_states.__func__ + current_state_for_users = PresenceHandler.current_state_for_users.__func__ + + @defer.inlineCallbacks + def user_syncing(self, user_id, affect_presence): + if affect_presence: + curr_sync = self.user_to_num_current_syncs.get(user_id, 0) + self.user_to_num_current_syncs[user_id] = curr_sync + 1 + prev_states = yield self.current_state_for_users([user_id]) + if prev_states[user_id].state == PresenceState.OFFLINE: + # TODO: Don't block the sync request on this HTTP hit. + yield self._send_syncing_users_now() + + def _end(): + # We check that the user_id is in user_to_num_current_syncs because + # user_to_num_current_syncs may have been cleared if we are + # shutting down. + if affect_presence and user_id in self.user_to_num_current_syncs: + self.user_to_num_current_syncs[user_id] -= 1 + + @contextlib.contextmanager + def _user_syncing(): + try: + yield + finally: + _end() + + defer.returnValue(_user_syncing()) + + @defer.inlineCallbacks + def _on_shutdown(self): + # When the synchrotron is shutdown tell the master to clear the in + # progress syncs for this process + self.user_to_num_current_syncs.clear() + yield self._send_syncing_users_now() + + def _send_syncing_users_regularly(self): + # Only send an update if we aren't in the middle of sending one. + if not self._sending_sync: + preserve_fn(self._send_syncing_users_now)() + + @defer.inlineCallbacks + def _send_syncing_users_now(self): + if self._sending_sync: + # We don't want to race with sending another update. + # Instead we wait for that update to finish and send another + # update afterwards. + self._need_to_send_sync = True + return + + # Flag that we are sending an update. + self._sending_sync = True + + yield self.http_client.post_json_get_json(self.syncing_users_url, { + "process_id": self.process_id, + "syncing_users": [ + user_id for user_id, count in self.user_to_num_current_syncs.items() + if count > 0 + ], + }) + + # Unset the flag as we are no longer sending an update. + self._sending_sync = False + if self._need_to_send_sync: + # If something happened while we were sending the update then + # we might need to send another update. + # TODO: Check if the update that was sent matches the current state + # as we only need to send an update if they are different. + self._need_to_send_sync = False + yield self._send_syncing_users_now() + + def process_replication(self, result): + stream = result.get("presence", {"rows": []}) + for row in stream["rows"]: + ( + position, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) = row + self.user_to_current_state[user_id] = UserPresenceState( + user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, status_msg, + currently_active + ) + + +class SynchrotronTyping(object): + def __init__(self, hs): + self._latest_room_serial = 0 + self._room_serials = {} + self._room_typing = {} + + def stream_positions(self): + return {"typing": self._latest_room_serial} + + def process_replication(self, result): + stream = result.get("typing") + if stream: + self._latest_room_serial = int(stream["position"]) + + for row in stream["rows"]: + position, room_id, typing_json = row + typing = json.loads(typing_json) + self._room_serials[room_id] = position + self._room_typing[room_id] = typing + + +class SynchrotronApplicationService(object): + def notify_interested_services(self, event): + pass + + +class SynchrotronServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + sync.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse synchrotron now listening on port %d", port) + + def start_listening(self): + for listener in self.config.listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.replication_url + clock = self.get_clock() + notifier = self.get_notifier() + presence_handler = self.get_presence_handler() + typing_handler = self.get_typing_handler() + + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + store.get_presence_list_accepted.invalidate_all() + + def notify_from_stream( + result, stream_name, stream_key, room=None, user=None + ): + stream = result.get(stream_name) + if stream: + position_index = stream["field_names"].index("position") + if room: + room_index = stream["field_names"].index(room) + if user: + user_index = stream["field_names"].index(user) + + users = () + rooms = () + for row in stream["rows"]: + position = row[position_index] + + if user: + users = (row[user_index],) + + if room: + rooms = (row[room_index],) + + notifier.on_new_event( + stream_key, position, users=users, rooms=rooms + ) + + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + + notify_from_stream( + result, "push_rules", "push_rules_key", user="user_id" + ) + notify_from_stream( + result, "user_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "room_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "tag_account_data", "account_data_key", user="user_id" + ) + notify_from_stream( + result, "receipts", "receipt_key", room="room_id" + ) + notify_from_stream( + result, "typing", "typing_key", room="room_id" + ) + + next_expire_broken_caches_ms = 0 + while True: + try: + args = store.stream_positions() + args.update(typing_handler.stream_positions()) + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) + yield store.process_replication(result) + typing_handler.process_replication(result) + presence_handler.process_replication(result) + notify(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + def build_presence_handler(self): + return SynchrotronPresence(self) + + def build_typing_handler(self): + return SynchrotronTyping(self) + + +def setup(config_options): + try: + config = SynchrotronConfig.load_config( + "Synapse synchrotron", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + if not config: + sys.exit(0) + + config.setup_logging() + + database_engine = create_engine(config.database_config) + + ss = SynchrotronServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + application_service_handler=SynchrotronApplicationService(), + ) + + ss.setup() + ss.start_listening() + + change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + ss.set_threshold(*ss.config.gc_thresholds) + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + return ss + + +if __name__ == '__main__': + with LoggingContext("main"): + ss = setup(sys.argv[1:]) + + if ss.config.daemonize: + def run(): + with LoggingContext("run"): + change_resource_limit(ss.config.soft_file_limit) + if ss.config.gc_thresholds: + gc.set_threshold(*ss.config.gc_thresholds) + reactor.run() + + daemon = Daemonize( + app="synapse-synchrotron", + pid=ss.config.pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + + daemon.start() + else: + reactor.run() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index ab3a31d7b7..39f4bf6e53 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -66,6 +66,10 @@ def main(): config = yaml.load(open(configfile)) pidfile = config["pid_file"] + cache_factor = config.get("synctl_cache_factor", None) + + if cache_factor: + os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) action = sys.argv[1] if sys.argv[1:] else "usage" if action == "start": |