diff options
Diffstat (limited to 'synapse')
-rwxr-xr-x | synapse/app/synctl.py | 178 | ||||
-rw-r--r-- | synapse/storage/engines/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/engines/postgres.py | 13 | ||||
-rw-r--r-- | synapse/storage/engines/sqlite3.py | 2 |
4 files changed, 165 insertions, 30 deletions
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 39f4bf6e53..bb41962d47 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -14,11 +14,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys +import argparse +import collections +import glob import os import os.path -import subprocess import signal +import subprocess +import sys import yaml SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"] @@ -28,60 +31,181 @@ RED = "\x1b[1;31m" NORMAL = "\x1b[m" +def write(message, colour=NORMAL, stream=sys.stdout): + if colour == NORMAL: + stream.write(message + "\n") + else: + stream.write(colour + message + NORMAL + "\n") + + def start(configfile): - print ("Starting ...") + write("Starting ...") args = SYNAPSE args.extend(["--daemonize", "-c", configfile]) try: subprocess.check_call(args) - print (GREEN + "started" + NORMAL) + write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN) + except subprocess.CalledProcessError as e: + write( + "error starting (exit code: %d); see above for logs" % e.returncode, + colour=RED, + ) + + +def start_worker(app, configfile, worker_configfile): + args = [ + "python", "-B", + "-m", app, + "-c", configfile, + "-c", worker_configfile + ] + + try: + subprocess.check_call(args) + write("started %s(%r)" % (app, worker_configfile), colour=GREEN) except subprocess.CalledProcessError as e: - print ( - RED + - "error starting (exit code: %d); see above for logs" % e.returncode + - NORMAL + write( + "error starting %s(%r) (exit code: %d); see above for logs" % ( + app, worker_configfile, e.returncode, + ), + colour=RED, ) -def stop(pidfile): +def stop(pidfile, app): if os.path.exists(pidfile): pid = int(open(pidfile).read()) os.kill(pid, signal.SIGTERM) - print (GREEN + "stopped" + NORMAL) + write("stopped %s" % (app,), colour=GREEN) + + +Worker = collections.namedtuple("Worker", [ + "app", "configfile", "pidfile", "cache_factor" +]) def main(): - configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml" + + parser = argparse.ArgumentParser() + + parser.add_argument( + "action", + choices=["start", "stop", "restart"], + help="whether to start, stop or restart the synapse", + ) + parser.add_argument( + "configfile", + nargs="?", + default="homeserver.yaml", + help="the homeserver config file, defaults to homserver.yaml", + ) + parser.add_argument( + "-w", "--worker", + metavar="WORKERCONFIG", + help="start or stop a single worker", + ) + parser.add_argument( + "-a", "--all-processes", + metavar="WORKERCONFIGDIR", + help="start or stop all the workers in the given directory" + " and the main synapse process", + ) + + options = parser.parse_args() + + if options.worker and options.all_processes: + write( + 'Cannot use "--worker" with "--all-processes"', + stream=sys.stderr + ) + sys.exit(1) + + configfile = options.configfile if not os.path.exists(configfile): - sys.stderr.write( + write( "No config file found\n" "To generate a config file, run '%s -c %s --generate-config" " --server-name=<server name>'\n" % ( - " ".join(SYNAPSE), configfile - ) + " ".join(SYNAPSE), options.configfile + ), + stream=sys.stderr, ) sys.exit(1) - config = yaml.load(open(configfile)) + with open(configfile) as stream: + config = yaml.load(stream) + pidfile = config["pid_file"] - cache_factor = config.get("synctl_cache_factor", None) + cache_factor = config.get("synctl_cache_factor") + start_stop_synapse = True if cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) - action = sys.argv[1] if sys.argv[1:] else "usage" - if action == "start": - start(configfile) - elif action == "stop": - stop(pidfile) - elif action == "restart": - stop(pidfile) - start(configfile) - else: - sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],)) - sys.exit(1) + worker_configfiles = [] + if options.worker: + start_stop_synapse = False + worker_configfile = options.worker + if not os.path.exists(worker_configfile): + write( + "No worker config found at %r" % (worker_configfile,), + stream=sys.stderr, + ) + sys.exit(1) + worker_configfiles.append(worker_configfile) + + if options.all_processes: + worker_configdir = options.all_processes + if not os.path.isdir(worker_configdir): + write( + "No worker config directory found at %r" % (worker_configdir,), + stream=sys.stderr, + ) + sys.exit(1) + worker_configfiles.extend(sorted(glob.glob( + os.path.join(worker_configdir, "*.yaml") + ))) + + workers = [] + for worker_configfile in worker_configfiles: + with open(worker_configfile) as stream: + worker_config = yaml.load(stream) + worker_app = worker_config["worker_app"] + worker_pidfile = worker_config["worker_pid_file"] + worker_daemonize = worker_config["worker_daemonize"] + assert worker_daemonize # TODO print something more user friendly + worker_cache_factor = worker_config.get("synctl_cache_factor") + workers.append(Worker( + worker_app, worker_configfile, worker_pidfile, worker_cache_factor, + )) + + action = options.action + + if action == "stop" or action == "restart": + for worker in workers: + stop(worker.pidfile, worker.app) + + if start_stop_synapse: + stop(pidfile, "synapse.app.homeserver") + + # TODO: Wait for synapse to actually shutdown before starting it again + + if action == "start" or action == "restart": + if start_stop_synapse: + start(configfile) + + for worker in workers: + if worker.cache_factor: + os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor) + + start_worker(worker.app, configfile, worker.configfile) + + if cache_factor: + os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) + else: + os.environ.pop("SYNAPSE_CACHE_FACTOR", None) if __name__ == "__main__": diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py index 7bb5de1fe7..338b495611 100644 --- a/synapse/storage/engines/__init__.py +++ b/synapse/storage/engines/__init__.py @@ -32,7 +32,7 @@ def create_engine(database_config): if engine_class: module = importlib.import_module(name) - return engine_class(module) + return engine_class(module, database_config) raise RuntimeError( "Unsupported database engine '%s'" % (name,) diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index c2290943b4..a6ae79dfad 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -19,9 +19,10 @@ from ._base import IncorrectDatabaseSetup class PostgresEngine(object): single_threaded = False - def __init__(self, database_module): + def __init__(self, database_module, database_config): self.module = database_module self.module.extensions.register_type(self.module.extensions.UNICODE) + self.synchronous_commit = database_config.get("synchronous_commit", True) def check_database(self, txn): txn.execute("SHOW SERVER_ENCODING") @@ -40,9 +41,19 @@ class PostgresEngine(object): db_conn.set_isolation_level( self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ ) + # Asynchronous commit, don't wait for the server to call fsync before + # ending the transaction. + # https://www.postgresql.org/docs/current/static/wal-async-commit.html + if not self.synchronous_commit: + cursor = db_conn.cursor() + cursor.execute("SET synchronous_commit TO OFF") + cursor.close() def is_deadlock(self, error): if isinstance(error, self.module.DatabaseError): + # https://www.postgresql.org/docs/current/static/errcodes-appendix.html + # "40001" serialization_failure + # "40P01" deadlock_detected return error.pgcode in ["40001", "40P01"] return False diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py index 14203aa500..755c9a1f07 100644 --- a/synapse/storage/engines/sqlite3.py +++ b/synapse/storage/engines/sqlite3.py @@ -21,7 +21,7 @@ import struct class Sqlite3Engine(object): single_threaded = True - def __init__(self, database_module): + def __init__(self, database_module, database_config): self.module = database_module def check_database(self, txn): |