summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/voip.py4
-rw-r--r--synapse/federation/transaction_queue.py25
-rw-r--r--synapse/handlers/federation.py43
-rw-r--r--synapse/http/client.py6
-rw-r--r--synapse/http/request_metrics.py9
-rw-r--r--synapse/metrics/background_process_metrics.py179
-rw-r--r--synapse/rest/__init__.py43
-rw-r--r--synapse/rest/client/v1/room.py5
-rw-r--r--synapse/rest/client/v1_only/__init__.py3
-rw-r--r--synapse/rest/client/v1_only/base.py39
-rw-r--r--synapse/rest/client/v1_only/register.py (renamed from synapse/rest/client/v1/register.py)7
-rw-r--r--synapse/storage/background_updates.py10
-rw-r--r--synapse/storage/client_ips.py15
-rw-r--r--synapse/storage/events.py10
-rw-r--r--synapse/storage/events_worker.py10
-rw-r--r--synapse/util/caches/expiringcache.py6
-rw-r--r--synapse/util/distributor.py48
17 files changed, 343 insertions, 119 deletions
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a4e16fa96..d07bd24ffd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -30,10 +30,10 @@ class VoipConfig(Config):
         ## Turn ##
 
         # The public URIs of the TURN server to give to clients
-        turn_uris: []
+        #turn_uris: []
 
         # The shared secret used to compute passwords for the TURN server
-        turn_shared_secret: "YOUR_SHARED_SECRET"
+        #turn_shared_secret: "YOUR_SHARED_SECRET"
 
         # The Username and password if the TURN server needs them and
         # does not use a token
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5a956ecfb3..6996d6b695 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
     sent_edus_counter,
     sent_transactions_counter,
 )
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
 
@@ -165,10 +166,11 @@ class TransactionQueue(object):
         if self._is_processing:
             return
 
-        # fire off a processing loop in the background. It's likely it will
-        # outlast the current request, so run it in the sentinel logcontext.
-        with PreserveLoggingContext():
-            self._process_event_queue_loop()
+        # fire off a processing loop in the background
+        run_as_background_process(
+            "process_event_queue_for_federation",
+            self._process_event_queue_loop,
+        )
 
     @defer.inlineCallbacks
     def _process_event_queue_loop(self):
@@ -432,14 +434,11 @@ class TransactionQueue(object):
 
         logger.debug("TX [%s] Starting transaction loop", destination)
 
-        # Drop the logcontext before starting the transaction. It doesn't
-        # really make sense to log all the outbound transactions against
-        # whatever path led us to this point: that's pretty arbitrary really.
-        #
-        # (this also means we can fire off _perform_transaction without
-        # yielding)
-        with logcontext.PreserveLoggingContext():
-            self._transaction_transmission_loop(destination)
+        run_as_background_process(
+            "federation_transaction_transmission_loop",
+            self._transaction_transmission_loop,
+            destination,
+        )
 
     @defer.inlineCallbacks
     def _transaction_transmission_loop(self, destination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 20fb46fc89..65f6041b10 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -906,16 +906,6 @@ class FederationHandler(BaseHandler):
             [auth_id for auth_id, _ in event.auth_events],
             include_given=True
         )
-
-        for event in auth:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         defer.returnValue([e for e in auth])
 
     @log_function
@@ -1371,18 +1361,6 @@ class FederationHandler(BaseHandler):
                     del results[(event.type, event.state_key)]
 
             res = list(results.values())
-            for event in res:
-                # We sign these again because there was a bug where we
-                # incorrectly signed things the first time round
-                if self.is_mine_id(event.event_id):
-                    event.signatures.update(
-                        compute_event_signature(
-                            event,
-                            self.hs.hostname,
-                            self.hs.config.signing_key[0]
-                        )
-                    )
-
             defer.returnValue(res)
         else:
             defer.returnValue([])
@@ -1454,18 +1432,6 @@ class FederationHandler(BaseHandler):
         )
 
         if event:
-            if self.is_mine_id(event.event_id):
-                # FIXME: This is a temporary work around where we occasionally
-                # return events slightly differently than when they were
-                # originally signed
-                event.signatures.update(
-                    compute_event_signature(
-                        event,
-                        self.hs.hostname,
-                        self.hs.config.signing_key[0]
-                    )
-                )
-
             in_room = yield self.auth.check_host_in_room(
                 event.room_id,
                 origin
@@ -1730,15 +1696,6 @@ class FederationHandler(BaseHandler):
             local_auth_chain, remote_auth_chain
         )
 
-        for event in ret["auth_chain"]:
-            event.signatures.update(
-                compute_event_signature(
-                    event,
-                    self.hs.hostname,
-                    self.hs.config.signing_key[0]
-                )
-            )
-
         logger.debug("on_query_auth returning: %s", ret)
 
         defer.returnValue(ret)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6a0d75b2b..25b6307884 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
 from twisted.internet import defer, protocol, reactor, ssl, task
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
-from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
 from twisted.web.client import (
+    Agent,
+    BrowserLikeRedirectAgent,
+    ContentDecoderAgent,
+    FileBodyProducer as TwistedFileBodyProducer,
     GzipDecoder,
     HTTPConnectionPool,
     PartialDownloadError,
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index f24b4b949c..588e280571 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
 )
 
 response_timer = Histogram(
-    "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+    "synapse_http_server_response_time_seconds", "sec",
+    ["method", "servlet", "tag", "code"],
 )
 
 response_ru_utime = Counter(
@@ -171,11 +172,13 @@ class RequestMetrics(object):
                 )
                 return
 
-        outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+        response_code = str(request.code)
+
+        outgoing_responses_counter.labels(request.method, response_code).inc()
 
         response_count.labels(request.method, self.name, tag).inc()
 
-        response_timer.labels(request.method, self.name, tag).observe(
+        response_timer.labels(request.method, self.name, tag, response_code).observe(
             time_sec - self.start
         )
 
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
new file mode 100644
index 0000000000..9d820e44a6
--- /dev/null
+++ b/synapse/metrics/background_process_metrics.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+_background_process_start_count = Counter(
+    "synapse_background_process_start_count",
+    "Number of background processes started",
+    ["name"],
+)
+
+# we set registry=None in all of these to stop them getting registered with
+# the default registry. Instead we collect them all via the CustomCollector,
+# which ensures that we can update them before they are collected.
+#
+_background_process_ru_utime = Counter(
+    "synapse_background_process_ru_utime_seconds",
+    "User CPU time used by background processes, in seconds",
+    ["name"],
+    registry=None,
+)
+
+_background_process_ru_stime = Counter(
+    "synapse_background_process_ru_stime_seconds",
+    "System CPU time used by background processes, in seconds",
+    ["name"],
+    registry=None,
+)
+
+_background_process_db_txn_count = Counter(
+    "synapse_background_process_db_txn_count",
+    "Number of database transactions done by background processes",
+    ["name"],
+    registry=None,
+)
+
+_background_process_db_txn_duration = Counter(
+    "synapse_background_process_db_txn_duration_seconds",
+    ("Seconds spent by background processes waiting for database "
+     "transactions, excluding scheduling time"),
+    ["name"],
+    registry=None,
+)
+
+_background_process_db_sched_duration = Counter(
+    "synapse_background_process_db_sched_duration_seconds",
+    "Seconds spent by background processes waiting for database connections",
+    ["name"],
+    registry=None,
+)
+
+# map from description to a counter, so that we can name our logcontexts
+# incrementally. (It actually duplicates _background_process_start_count, but
+# it's much simpler to do so than to try to combine them.)
+_background_process_counts = dict()  # type: dict[str, int]
+
+# map from description to the currently running background processes.
+#
+# it's kept as a dict of sets rather than a big set so that we can keep track
+# of process descriptions that no longer have any active processes.
+_background_processes = dict()  # type: dict[str, set[_BackgroundProcess]]
+
+
+class _Collector(object):
+    """A custom metrics collector for the background process metrics.
+
+    Ensures that all of the metrics are up-to-date with any in-flight processes
+    before they are returned.
+    """
+    def collect(self):
+        background_process_in_flight_count = GaugeMetricFamily(
+            "synapse_background_process_in_flight_count",
+            "Number of background processes in flight",
+            labels=["name"],
+        )
+
+        for desc, processes in six.iteritems(_background_processes):
+            background_process_in_flight_count.add_metric(
+                (desc,), len(processes),
+            )
+            for process in processes:
+                process.update_metrics()
+
+        yield background_process_in_flight_count
+
+        # now we need to run collect() over each of the static Counters, and
+        # yield each metric they return.
+        for m in (
+                _background_process_ru_utime,
+                _background_process_ru_stime,
+                _background_process_db_txn_count,
+                _background_process_db_txn_duration,
+                _background_process_db_sched_duration,
+        ):
+            for r in m.collect():
+                yield r
+
+
+REGISTRY.register(_Collector())
+
+
+class _BackgroundProcess(object):
+    def __init__(self, desc, ctx):
+        self.desc = desc
+        self._context = ctx
+        self._reported_stats = None
+
+    def update_metrics(self):
+        """Updates the metrics with values from this process."""
+        new_stats = self._context.get_resource_usage()
+        if self._reported_stats is None:
+            diff = new_stats
+        else:
+            diff = new_stats - self._reported_stats
+        self._reported_stats = new_stats
+
+        _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
+        _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
+        _background_process_db_txn_count.labels(self.desc).inc(
+            diff.db_txn_count,
+        )
+        _background_process_db_txn_duration.labels(self.desc).inc(
+            diff.db_txn_duration_sec,
+        )
+        _background_process_db_sched_duration.labels(self.desc).inc(
+            diff.db_sched_duration_sec,
+        )
+
+
+def run_as_background_process(desc, func, *args, **kwargs):
+    """Run the given function in its own logcontext, with resource metrics
+
+    This should be used to wrap processes which are fired off to run in the
+    background, instead of being associated with a particular request.
+
+    Args:
+        desc (str): a description for this background process type
+        func: a function, which may return a Deferred
+        args: positional args for func
+        kwargs: keyword args for func
+
+    Returns: None
+    """
+    @defer.inlineCallbacks
+    def run():
+        count = _background_process_counts.get(desc, 0)
+        _background_process_counts[desc] = count + 1
+        _background_process_start_count.labels(desc).inc()
+
+        with LoggingContext(desc) as context:
+            context.request = "%s-%i" % (desc, count)
+            proc = _BackgroundProcess(desc, context)
+            _background_processes.setdefault(desc, set()).add(proc)
+            try:
+                yield func(*args, **kwargs)
+            finally:
+                proc.update_metrics()
+                _background_processes[desc].remove(proc)
+
+    with PreserveLoggingContext():
+        run()
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 75c2a4ec8e..3418f06fd6 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,13 +14,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from six import PY3
+
 from synapse.http.server import JsonResource
 from synapse.rest.client import versions
-from synapse.rest.client.v1 import admin, directory, events, initial_sync
-from synapse.rest.client.v1 import login as v1_login
-from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher
-from synapse.rest.client.v1 import register as v1_register
-from synapse.rest.client.v1 import room, voip
+from synapse.rest.client.v1 import (
+    admin,
+    directory,
+    events,
+    initial_sync,
+    login as v1_login,
+    logout,
+    presence,
+    profile,
+    push_rule,
+    pusher,
+    room,
+    voip,
+)
 from synapse.rest.client.v2_alpha import (
     account,
     account_data,
@@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
     user_directory,
 )
 
+if not PY3:
+    from synapse.rest.client.v1_only import (
+        register as v1_register,
+    )
+
 
 class ClientRestResource(JsonResource):
     """A resource for version 1 of the matrix client API."""
@@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
     def register_servlets(client_resource, hs):
         versions.register_servlets(client_resource)
 
-        # "v1"
-        room.register_servlets(hs, client_resource)
+        if not PY3:
+            # "v1" (Python 2 only)
+            v1_register.register_servlets(hs, client_resource)
+
+        # Deprecated in r0
+        initial_sync.register_servlets(hs, client_resource)
+        room.register_deprecated_servlets(hs, client_resource)
+
+        # Partially deprecated in r0
         events.register_servlets(hs, client_resource)
-        v1_register.register_servlets(hs, client_resource)
+
+        # "v1" + "r0"
+        room.register_servlets(hs, client_resource)
         v1_login.register_servlets(hs, client_resource)
         profile.register_servlets(hs, client_resource)
         presence.register_servlets(hs, client_resource)
-        initial_sync.register_servlets(hs, client_resource)
         directory.register_servlets(hs, client_resource)
         voip.register_servlets(hs, client_resource)
         admin.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3d62447854..b9512a2b61 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -832,10 +832,13 @@ def register_servlets(hs, http_server):
     RoomSendEventRestServlet(hs).register(http_server)
     PublicRoomListRestServlet(hs).register(http_server)
     RoomStateRestServlet(hs).register(http_server)
-    RoomInitialSyncRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
     RoomTypingRestServlet(hs).register(http_server)
     SearchRestServlet(hs).register(http_server)
     JoinedRoomsRestServlet(hs).register(http_server)
     RoomEventServlet(hs).register(http_server)
     RoomEventContextServlet(hs).register(http_server)
+
+
+def register_deprecated_servlets(hs, http_server):
+    RoomInitialSyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py
new file mode 100644
index 0000000000..936f902ace
--- /dev/null
+++ b/synapse/rest/client/v1_only/__init__.py
@@ -0,0 +1,3 @@
+"""
+REST APIs that are only used in v1 (the legacy API).
+"""
diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py
new file mode 100644
index 0000000000..9d4db7437c
--- /dev/null
+++ b/synapse/rest/client/v1_only/base.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module contains base REST classes for constructing client v1 servlets.
+"""
+
+import re
+
+from synapse.api.urls import CLIENT_PREFIX
+
+
+def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
+    """Creates a regex compiled client path with the correct client path
+    prefix.
+
+    Args:
+        path_regex (str): The regex string to match. This should NOT have a ^
+        as this will be prefixed.
+    Returns:
+        list of SRE_Pattern
+    """
+    patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
+    if include_in_unstable:
+        unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
+        patterns.append(re.compile("^" + unstable_prefix + path_regex))
+    return patterns
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py
index 25a143af8d..3439c3c6d4 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
+from synapse.rest.client.v1.base import ClientV1RestServlet
 from synapse.types import create_requester
 
-from .base import ClientV1RestServlet, client_path_patterns
+from .base import v1_only_client_path_patterns
 
 logger = logging.getLogger(__name__)
 
@@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
     handler doesn't have a concept of multi-stages or sessions.
     """
 
-    PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False)
+    PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
 
     def __init__(self, hs):
         """
@@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
     """Handles user creation via a server-to-server interface
     """
 
-    PATTERNS = client_path_patterns("/createUser$", releases=())
+    PATTERNS = v1_only_client_path_patterns("/createUser$")
 
     def __init__(self, hs):
         super(CreateUserRestServlet, self).__init__(hs)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index dc9eca7d15..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -19,6 +19,8 @@ from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
+
 from . import engines
 from ._base import SQLBaseStore
 
@@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_handlers = {}
         self._all_done = False
 
-    @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        logger.info("Starting background schema updates")
+        run_as_background_process(
+            "background_updates", self._run_background_updates,
+        )
 
+    @defer.inlineCallbacks
+    def _run_background_updates(self):
+        logger.info("Starting background schema updates")
         while True:
             yield self.hs.get_clock().sleep(
                 self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b78eda3413..77ae10da3d 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -19,6 +19,7 @@ from six import iteritems
 
 from twisted.internet import defer
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import CACHE_SIZE_FACTOR
 
 from . import background_updates
@@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         self._batch_row_update[key] = (user_agent, device_id, now)
 
     def _update_client_ips_batch(self):
-        to_update = self._batch_row_update
-        self._batch_row_update = {}
-        return self.runInteraction(
-            "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+        def update():
+            to_update = self._batch_row_update
+            self._batch_row_update = {}
+            return self.runInteraction(
+                "_update_client_ips_batch", self._update_client_ips_batch_txn,
+                to_update,
+            )
+
+        run_as_background_process(
+            "update_client_ips", update,
         )
 
     def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..4ff0fdc4ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError
 # these are only included to make the type annotations work
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.events_worker import EventsWorkerStore
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 
@@ -155,11 +156,8 @@ class _EventPeristenceQueue(object):
                     self._event_persist_queues[room_id] = queue
                 self._currently_persisting_rooms.discard(room_id)
 
-        # set handle_queue_loop off on the background. We don't want to
-        # attribute work done in it to the current request, so we drop the
-        # logcontext altogether.
-        with PreserveLoggingContext():
-            handle_queue_loop()
+        # set handle_queue_loop off in the background
+        run_as_background_process("persist_events", handle_queue_loop)
 
     def _get_drainining_queue(self, room_id):
         queue = self._event_persist_queues.setdefault(room_id, deque())
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 67433606c6..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -25,6 +25,7 @@ from synapse.events import EventBase  # noqa: F401
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.logcontext import (
     LoggingContext,
     PreserveLoggingContext,
@@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
                 should_start = False
 
         if should_start:
-            with PreserveLoggingContext():
-                self.runWithConnection(
-                    self._do_fetch
-                )
+            run_as_background_process(
+                "fetch_events",
+                self.runWithConnection,
+                self._do_fetch,
+            )
 
         logger.debug("Loading %d events", len(events))
         with PreserveLoggingContext():
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 4abca91f6d..465adc54a8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,7 @@
 import logging
 from collections import OrderedDict
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
 
 logger = logging.getLogger(__name__)
@@ -63,7 +64,10 @@ class ExpiringCache(object):
             return
 
         def f():
-            self._prune_cache()
+            run_as_background_process(
+                "prune_cache_%s" % self._cache_name,
+                self._prune_cache,
+            )
 
         self._clock.looping_call(f, self._expiry_ms / 2)
 
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
 
 from twisted.internet import defer
 
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
 
 def user_left_room(distributor, user, room_id):
-    with PreserveLoggingContext():
-        distributor.fire("user_left_room", user=user, room_id=room_id)
+    distributor.fire("user_left_room", user=user, room_id=room_id)
 
 
 def user_joined_room(distributor, user, room_id):
-    with PreserveLoggingContext():
-        distributor.fire("user_joined_room", user=user, room_id=room_id)
+    distributor.fire("user_joined_room", user=user, room_id=room_id)
 
 
 class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
       model will do for today.
     """
 
-    def __init__(self, suppress_failures=True):
-        self.suppress_failures = suppress_failures
-
+    def __init__(self):
         self.signals = {}
         self.pre_registration = {}
 
@@ -56,7 +52,6 @@ class Distributor(object):
 
         self.signals[name] = Signal(
             name,
-            suppress_failures=self.suppress_failures,
         )
 
         if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
             self.pre_registration[name].append(observer)
 
     def fire(self, name, *args, **kwargs):
+        """Dispatches the given signal to the registered observers.
+
+        Runs the observers as a background process. Does not return a deferred.
+        """
         if name not in self.signals:
             raise KeyError("%r does not have a signal named %s" % (self, name))
 
-        return self.signals[name].fire(*args, **kwargs)
+        run_as_background_process(
+            name,
+            self.signals[name].fire,
+            *args, **kwargs
+        )
 
 
 class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
     method into all of the observers.
     """
 
-    def __init__(self, name, suppress_failures):
+    def __init__(self, name):
         self.name = name
-        self.suppress_failures = suppress_failures
         self.observers = []
 
     def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
         Each observer callable may return a Deferred."""
         self.observers.append(observer)
 
-    @defer.inlineCallbacks
     def fire(self, *args, **kwargs):
         """Invokes every callable in the observer list, passing in the args and
         kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
                         failure.type,
                         failure.value,
                         failure.getTracebackObject()))
-                if not self.suppress_failures:
-                    return failure
 
             return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
 
-        with PreserveLoggingContext():
-            deferreds = [
-                do(observer)
-                for observer in self.observers
-            ]
-
-            res = yield defer.gatherResults(
-                deferreds, consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        deferreds = [
+            run_in_background(do, o)
+            for o in self.observers
+        ]
 
-        defer.returnValue(res)
+        return make_deferred_yieldable(defer.gatherResults(
+            deferreds, consumeErrors=True,
+        ))
 
     def __repr__(self):
         return "<Signal name=%r>" % (self.name,)