summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/synctl.py178
-rw-r--r--synapse/storage/engines/__init__.py2
-rw-r--r--synapse/storage/engines/postgres.py13
-rw-r--r--synapse/storage/engines/sqlite3.py2
4 files changed, 165 insertions, 30 deletions
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 39f4bf6e53..bb41962d47 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -14,11 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import sys
+import argparse
+import collections
+import glob
 import os
 import os.path
-import subprocess
 import signal
+import subprocess
+import sys
 import yaml
 
 SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"]
@@ -28,60 +31,181 @@ RED = "\x1b[1;31m"
 NORMAL = "\x1b[m"
 
 
+def write(message, colour=NORMAL, stream=sys.stdout):
+    if colour == NORMAL:
+        stream.write(message + "\n")
+    else:
+        stream.write(colour + message + NORMAL + "\n")
+
+
 def start(configfile):
-    print ("Starting ...")
+    write("Starting ...")
     args = SYNAPSE
     args.extend(["--daemonize", "-c", configfile])
 
     try:
         subprocess.check_call(args)
-        print (GREEN + "started" + NORMAL)
+        write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
+    except subprocess.CalledProcessError as e:
+        write(
+            "error starting (exit code: %d); see above for logs" % e.returncode,
+            colour=RED,
+        )
+
+
+def start_worker(app, configfile, worker_configfile):
+    args = [
+        "python", "-B",
+        "-m", app,
+        "-c", configfile,
+        "-c", worker_configfile
+    ]
+
+    try:
+        subprocess.check_call(args)
+        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
     except subprocess.CalledProcessError as e:
-        print (
-            RED +
-            "error starting (exit code: %d); see above for logs" % e.returncode +
-            NORMAL
+        write(
+            "error starting %s(%r) (exit code: %d); see above for logs" % (
+                app, worker_configfile, e.returncode,
+            ),
+            colour=RED,
         )
 
 
-def stop(pidfile):
+def stop(pidfile, app):
     if os.path.exists(pidfile):
         pid = int(open(pidfile).read())
         os.kill(pid, signal.SIGTERM)
-        print (GREEN + "stopped" + NORMAL)
+        write("stopped %s" % (app,), colour=GREEN)
+
+
+Worker = collections.namedtuple("Worker", [
+    "app", "configfile", "pidfile", "cache_factor"
+])
 
 
 def main():
-    configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml"
+
+    parser = argparse.ArgumentParser()
+
+    parser.add_argument(
+        "action",
+        choices=["start", "stop", "restart"],
+        help="whether to start, stop or restart the synapse",
+    )
+    parser.add_argument(
+        "configfile",
+        nargs="?",
+        default="homeserver.yaml",
+        help="the homeserver config file, defaults to homserver.yaml",
+    )
+    parser.add_argument(
+        "-w", "--worker",
+        metavar="WORKERCONFIG",
+        help="start or stop a single worker",
+    )
+    parser.add_argument(
+        "-a", "--all-processes",
+        metavar="WORKERCONFIGDIR",
+        help="start or stop all the workers in the given directory"
+             " and the main synapse process",
+    )
+
+    options = parser.parse_args()
+
+    if options.worker and options.all_processes:
+        write(
+            'Cannot use "--worker" with "--all-processes"',
+            stream=sys.stderr
+        )
+        sys.exit(1)
+
+    configfile = options.configfile
 
     if not os.path.exists(configfile):
-        sys.stderr.write(
+        write(
             "No config file found\n"
             "To generate a config file, run '%s -c %s --generate-config"
             " --server-name=<server name>'\n" % (
-                " ".join(SYNAPSE), configfile
-            )
+                " ".join(SYNAPSE), options.configfile
+            ),
+            stream=sys.stderr,
         )
         sys.exit(1)
 
-    config = yaml.load(open(configfile))
+    with open(configfile) as stream:
+        config = yaml.load(stream)
+
     pidfile = config["pid_file"]
-    cache_factor = config.get("synctl_cache_factor", None)
+    cache_factor = config.get("synctl_cache_factor")
+    start_stop_synapse = True
 
     if cache_factor:
         os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
 
-    action = sys.argv[1] if sys.argv[1:] else "usage"
-    if action == "start":
-        start(configfile)
-    elif action == "stop":
-        stop(pidfile)
-    elif action == "restart":
-        stop(pidfile)
-        start(configfile)
-    else:
-        sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],))
-        sys.exit(1)
+    worker_configfiles = []
+    if options.worker:
+        start_stop_synapse = False
+        worker_configfile = options.worker
+        if not os.path.exists(worker_configfile):
+            write(
+                "No worker config found at %r" % (worker_configfile,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.append(worker_configfile)
+
+    if options.all_processes:
+        worker_configdir = options.all_processes
+        if not os.path.isdir(worker_configdir):
+            write(
+                "No worker config directory found at %r" % (worker_configdir,),
+                stream=sys.stderr,
+            )
+            sys.exit(1)
+        worker_configfiles.extend(sorted(glob.glob(
+            os.path.join(worker_configdir, "*.yaml")
+        )))
+
+    workers = []
+    for worker_configfile in worker_configfiles:
+        with open(worker_configfile) as stream:
+            worker_config = yaml.load(stream)
+        worker_app = worker_config["worker_app"]
+        worker_pidfile = worker_config["worker_pid_file"]
+        worker_daemonize = worker_config["worker_daemonize"]
+        assert worker_daemonize  # TODO print something more user friendly
+        worker_cache_factor = worker_config.get("synctl_cache_factor")
+        workers.append(Worker(
+            worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+        ))
+
+    action = options.action
+
+    if action == "stop" or action == "restart":
+        for worker in workers:
+            stop(worker.pidfile, worker.app)
+
+        if start_stop_synapse:
+            stop(pidfile, "synapse.app.homeserver")
+
+        # TODO: Wait for synapse to actually shutdown before starting it again
+
+    if action == "start" or action == "restart":
+        if start_stop_synapse:
+            start(configfile)
+
+        for worker in workers:
+            if worker.cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+            start_worker(worker.app, configfile, worker.configfile)
+
+            if cache_factor:
+                os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+            else:
+                os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
 
 
 if __name__ == "__main__":
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 7bb5de1fe7..338b495611 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -32,7 +32,7 @@ def create_engine(database_config):
 
     if engine_class:
         module = importlib.import_module(name)
-        return engine_class(module)
+        return engine_class(module, database_config)
 
     raise RuntimeError(
         "Unsupported database engine '%s'" % (name,)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index c2290943b4..a6ae79dfad 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -19,9 +19,10 @@ from ._base import IncorrectDatabaseSetup
 class PostgresEngine(object):
     single_threaded = False
 
-    def __init__(self, database_module):
+    def __init__(self, database_module, database_config):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
+        self.synchronous_commit = database_config.get("synchronous_commit", True)
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -40,9 +41,19 @@ class PostgresEngine(object):
         db_conn.set_isolation_level(
             self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
         )
+        # Asynchronous commit, don't wait for the server to call fsync before
+        # ending the transaction.
+        # https://www.postgresql.org/docs/current/static/wal-async-commit.html
+        if not self.synchronous_commit:
+            cursor = db_conn.cursor()
+            cursor.execute("SET synchronous_commit TO OFF")
+            cursor.close()
 
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
+            # https://www.postgresql.org/docs/current/static/errcodes-appendix.html
+            # "40001" serialization_failure
+            # "40P01" deadlock_detected
             return error.pgcode in ["40001", "40P01"]
         return False
 
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 14203aa500..755c9a1f07 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -21,7 +21,7 @@ import struct
 class Sqlite3Engine(object):
     single_threaded = True
 
-    def __init__(self, database_module):
+    def __init__(self, database_module, database_config):
         self.module = database_module
 
     def check_database(self, txn):