diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/appservice.py | 1 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 216 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 1 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 1 | ||||
-rw-r--r-- | synapse/app/pusher.py | 1 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 4 |
6 files changed, 224 insertions, 0 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 57587aed25..4e62a84b28 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -187,6 +187,7 @@ def start(config_options): def start(): ps.replicate() ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py new file mode 100644 index 0000000000..9fccc73db3 --- /dev/null +++ b/synapse/app/client_reader.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.rest.client.v1.room import PublicRoomListRestServlet +from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.crypto import context_factory + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.client_reader") + + +class ClientReaderSlavedStore( + SlavedEventStore, + SlavedKeyStore, + RoomStore, + DirectoryStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStore because the constructor is different +): + pass + + +class ClientReaderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + PublicRoomListRestServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse client reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse client reader", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.client_reader" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = ClientReaderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-client-reader", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7355499ae2..1f5ae1937e 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -182,6 +182,7 @@ def start(config_options): reactor.run() def start(): + ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() ss.replicate() diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 9d4c4a0750..6e5ec01c6c 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -188,6 +188,7 @@ def start(config_options): reactor.run() def start(): + ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() ss.replicate() diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8d755a4b33..d59f4a571c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -276,6 +276,7 @@ def start(config_options): ps.replicate() ps.get_pusherpool().start() ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 07d3d047c6..46d390fd0f 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -242,6 +242,9 @@ class SynchrotronTyping(object): self._room_typing = {} def stream_positions(self): + # We must update this typing token from the response of the previous + # sync. In particular, the stream id may "reset" back to zero/a low + # value which we *must* use for the next replication request. return {"typing": self._latest_room_serial} def process_replication(self, result): @@ -462,6 +465,7 @@ def start(config_options): def start(): ss.get_datastore().start_profiling() ss.replicate() + ss.get_state_handler().start_caching() reactor.callWhenRunning(start) |