diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c63256d3bd..b3f76428b6 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -17,6 +17,7 @@ import logging
import re
import attr
+from canonicaljson import json
from twisted.internet import defer, task
@@ -24,6 +25,9 @@ from synapse.logging import context
logger = logging.getLogger(__name__)
+# Create a custom encoder to reduce the whitespace produced by JSON encoding.
+json_encoder = json.JSONEncoder(separators=(",", ":"))
+
def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 9b09c08b89..c2d72a82cf 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -192,7 +192,7 @@ class Cache(object):
callbacks = [callback] if callback else []
self.check_thread()
observable = ObservableDeferred(value, consumeErrors=True)
- observer = defer.maybeDeferred(observable.observe)
+ observer = observable.observe()
entry = CacheEntry(deferred=observable, callbacks=callbacks)
existing_entry = self._pending_deferred_cache.pop(key, None)
diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py
new file mode 100644
index 0000000000..23393cf49b
--- /dev/null
+++ b/synapse/util/daemonize.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2012, 2013, 2014 Ilya Otyutskiy <ilya.otyutskiy@icloud.com>
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import atexit
+import fcntl
+import logging
+import os
+import signal
+import sys
+
+
+def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -> None:
+ """daemonize the current process
+
+ This calls fork(), and has the main process exit. When it returns we will be
+ running in the child process.
+ """
+
+ # If pidfile already exists, we should read pid from there; to overwrite it, if
+ # locking will fail, because locking attempt somehow purges the file contents.
+ if os.path.isfile(pid_file):
+ with open(pid_file, "r") as pid_fh:
+ old_pid = pid_fh.read()
+
+ # Create a lockfile so that only one instance of this daemon is running at any time.
+ try:
+ lock_fh = open(pid_file, "w")
+ except IOError:
+ print("Unable to create the pidfile.")
+ sys.exit(1)
+
+ try:
+ # Try to get an exclusive lock on the file. This will fail if another process
+ # has the file locked.
+ fcntl.flock(lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ print("Unable to lock on the pidfile.")
+ # We need to overwrite the pidfile if we got here.
+ #
+ # XXX better to avoid overwriting it, surely. this looks racey as the pid file
+ # could be created between us trying to read it and us trying to lock it.
+ with open(pid_file, "w") as pid_fh:
+ pid_fh.write(old_pid)
+ sys.exit(1)
+
+ # Fork, creating a new process for the child.
+ process_id = os.fork()
+
+ if process_id != 0:
+ # parent process: exit.
+
+ # we use os._exit to avoid running the atexit handlers. In particular, that
+ # means we don't flush the logs. This is important because if we are using
+ # a MemoryHandler, we could have logs buffered which are now buffered in both
+ # the main and the child process, so if we let the main process flush the logs,
+ # we'll get two copies.
+ os._exit(0)
+
+ # This is the child process. Continue.
+
+ # Stop listening for signals that the parent process receives.
+ # This is done by getting a new process id.
+ # setpgrp() is an alternative to setsid().
+ # setsid puts the process in a new parent group and detaches its controlling
+ # terminal.
+
+ os.setsid()
+
+ # point stdin, stdout, stderr at /dev/null
+ devnull = "/dev/null"
+ if hasattr(os, "devnull"):
+ # Python has set os.devnull on this system, use it instead as it might be
+ # different than /dev/null.
+ devnull = os.devnull
+
+ devnull_fd = os.open(devnull, os.O_RDWR)
+ os.dup2(devnull_fd, 0)
+ os.dup2(devnull_fd, 1)
+ os.dup2(devnull_fd, 2)
+ os.close(devnull_fd)
+
+ # now that we have redirected stderr to /dev/null, any uncaught exceptions will
+ # get sent to /dev/null, so make sure we log them.
+ #
+ # (we don't normally expect reactor.run to raise any exceptions, but this will
+ # also catch any other uncaught exceptions before we get that far.)
+
+ def excepthook(type_, value, traceback):
+ logger.critical("Unhanded exception", exc_info=(type_, value, traceback))
+
+ sys.excepthook = excepthook
+
+ # Set umask to default to safe file permissions when running as a root daemon. 027
+ # is an octal number which we are typing as 0o27 for Python3 compatibility.
+ os.umask(0o27)
+
+ # Change to a known directory. If this isn't done, starting a daemon in a
+ # subdirectory that needs to be deleted results in "directory busy" errors.
+ os.chdir(chdir)
+
+ try:
+ lock_fh.write("%s" % (os.getpid()))
+ lock_fh.flush()
+ except IOError:
+ logger.error("Unable to write pid to the pidfile.")
+ print("Unable to write pid to the pidfile.")
+ sys.exit(1)
+
+ # write a log line on SIGTERM.
+ def sigterm(signum, frame):
+ logger.warning("Caught signal %s. Stopping daemon." % signum)
+ sys.exit(0)
+
+ signal.signal(signal.SIGTERM, sigterm)
+
+ # Cleanup pid file at exit.
+ def exit():
+ logger.warning("Stopping daemon.")
+ os.remove(pid_file)
+ sys.exit(0)
+
+ atexit.register(exit)
+
+ logger.warning("Starting daemon.")
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index eab78dd256..0e445e01d7 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -63,5 +63,8 @@ def _handle_frozendict(obj):
)
-# A JSONEncoder which is capable of encoding frozendicts without barfing
-frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict)
+# A JSONEncoder which is capable of encoding frozendicts without barfing.
+# Additionally reduce the whitespace produced by JSON encoding.
+frozendict_json_encoder = json.JSONEncoder(
+ default=_handle_frozendict, separators=(",", ":"),
+)
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index ec61e14423..13775b43f9 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import inspect
import logging
from functools import wraps
+from typing import Any, Callable, Optional, TypeVar, cast
from prometheus_client import Counter
-from twisted.internet import defer
-
from synapse.logging.context import LoggingContext, current_context
from synapse.metrics import InFlightGauge
@@ -60,29 +58,37 @@ in_flight = InFlightGauge(
sub_metrics=["real_time_max", "real_time_sum"],
)
+T = TypeVar("T", bound=Callable[..., Any])
-def measure_func(name=None):
- def wrapper(func):
- block_name = func.__name__ if name is None else name
- if inspect.iscoroutinefunction(func):
+def measure_func(name: Optional[str] = None) -> Callable[[T], T]:
+ """
+ Used to decorate an async function with a `Measure` context manager.
+
+ Usage:
+
+ @measure_func()
+ async def foo(...):
+ ...
- @wraps(func)
- async def measured_func(self, *args, **kwargs):
- with Measure(self.clock, block_name):
- r = await func(self, *args, **kwargs)
- return r
+ Which is analogous to:
- else:
+ async def foo(...):
+ with Measure(...):
+ ...
+
+ """
+
+ def wrapper(func: T) -> T:
+ block_name = func.__name__ if name is None else name
- @wraps(func)
- @defer.inlineCallbacks
- def measured_func(self, *args, **kwargs):
- with Measure(self.clock, block_name):
- r = yield func(self, *args, **kwargs)
- return r
+ @wraps(func)
+ async def measured_func(self, *args, **kwargs):
+ with Measure(self.clock, block_name):
+ r = await func(self, *args, **kwargs)
+ return r
- return measured_func
+ return cast(T, measured_func)
return wrapper
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8794317caa..919988d3bc 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -15,8 +15,6 @@
import logging
import random
-from twisted.internet import defer
-
import synapse.logging.context
from synapse.api.errors import CodeMessageException
@@ -54,8 +52,7 @@ class NotRetryingDestination(Exception):
self.destination = destination
-@defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
+async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
"""For a given destination check if we have previously failed to
send a request there and are waiting before retrying the destination.
If we are not ready to retry the destination, this will raise a
@@ -73,9 +70,9 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
Example usage:
try:
- limiter = yield get_retry_limiter(destination, clock, store)
+ limiter = await get_retry_limiter(destination, clock, store)
with limiter:
- response = yield do_request()
+ response = await do_request()
except NotRetryingDestination:
# We aren't ready to retry that destination.
raise
@@ -83,7 +80,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
failure_ts = None
retry_last_ts, retry_interval = (0, 0)
- retry_timings = yield store.get_destination_retry_timings(destination)
+ retry_timings = await store.get_destination_retry_timings(destination)
if retry_timings:
failure_ts = retry_timings["failure_ts"]
@@ -222,10 +219,9 @@ class RetryDestinationLimiter(object):
if self.failure_ts is None:
self.failure_ts = retry_last_ts
- @defer.inlineCallbacks
- def store_retry_timings():
+ async def store_retry_timings():
try:
- yield self.store.set_destination_retry_timings(
+ await self.store.set_destination_retry_timings(
self.destination,
self.failure_ts,
retry_last_ts,
|