#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import synapse from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string from synapse.api.urls import FEDERATION_PREFIX from synapse.federation.transport.server import TransportLayerServer from synapse.crypto import context_factory from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize import sys import logging import gc logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( SlavedEventStore, SlavedKeyStore, BaseSlavedStore, ): pass class FederationReaderServer(HomeServer): def get_db_conn(self, run_new_connection=True): # Any param beginning with cp_ is a parameter for adbapi, and should # not be passed to the database engine. db_params = { k: v for k, v in self.db_config.get("args", {}).items() if not k.startswith("cp_") } db_conn = self.database_engine.module.connect(**db_params) if run_new_connection: self.database_engine.on_new_connection(db_conn) return db_conn def setup(self): logger.info("Setting up.") self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) logger.info("Finished setting up.") def _listen_http(self, listener_config): port = listener_config["port"] bind_address = listener_config.get("bind_address", "") site_tag = listener_config.get("tag", port) resources = {} for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(self) elif name == "federation": resources.update({ FEDERATION_PREFIX: TransportLayerServer(self), }) root_resource = create_resource_tree(resources, Resource()) reactor.listenTCP( port, SynapseSite( "synapse.access.http.%s" % (site_tag,), site_tag, listener_config, root_resource, ), interface=bind_address ) logger.info("Synapse federation reader now listening on port %d", port) def start_listening(self, listeners): for listener in listeners: if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": reactor.listenTCP( listener["port"], manhole( username="matrix", password="rabbithole", globals={"hs": self}, ), interface=listener.get("bind_address", '127.0.0.1') ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @defer.inlineCallbacks def replicate(self): http_client = self.get_simple_http_client() store = self.get_datastore() replication_url = self.config.worker_replication_url while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(5) def start(config_options): try: config = HomeServerConfig.load_config( "Synapse federation reader", config_options ) except ConfigError as e: sys.stderr.write("\n" + e.message + "\n") sys.exit(1) assert config.worker_app == "synapse.app.federation_reader" setup_logging(config.worker_log_config, config.worker_log_file) database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) ss = FederationReaderServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, version_string=get_version_string("Synapse", synapse), database_engine=database_engine, ) ss.setup() ss.get_handlers() ss.start_listening(config.worker_listeners) def run(): with LoggingContext("run"): logger.info("Running") change_resource_limit(config.soft_file_limit) if config.gc_thresholds: gc.set_threshold(*config.gc_thresholds) reactor.run() def start(): ss.get_datastore().start_profiling() ss.replicate() reactor.callWhenRunning(start) if config.worker_daemonize: daemon = Daemonize( app="synapse-federation-reader", pid=config.worker_pid_file, action=run, auto_close_fds=False, verbose=True, logger=logger, ) daemon.start() else: run() if __name__ == '__main__': with LoggingContext("main"): start(sys.argv[1:])