diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a4e16fa96..d07bd24ffd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -30,10 +30,10 @@ class VoipConfig(Config):
## Turn ##
# The public URIs of the TURN server to give to clients
- turn_uris: []
+ #turn_uris: []
# The shared secret used to compute passwords for the TURN server
- turn_shared_secret: "YOUR_SHARED_SECRET"
+ #turn_shared_secret: "YOUR_SHARED_SECRET"
# The Username and password if the TURN server needs them and
# does not use a token
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5a956ecfb3..6996d6b695 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
)
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -165,10 +166,11 @@ class TransactionQueue(object):
if self._is_processing:
return
- # fire off a processing loop in the background. It's likely it will
- # outlast the current request, so run it in the sentinel logcontext.
- with PreserveLoggingContext():
- self._process_event_queue_loop()
+ # fire off a processing loop in the background
+ run_as_background_process(
+ "process_event_queue_for_federation",
+ self._process_event_queue_loop,
+ )
@defer.inlineCallbacks
def _process_event_queue_loop(self):
@@ -432,14 +434,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Starting transaction loop", destination)
- # Drop the logcontext before starting the transaction. It doesn't
- # really make sense to log all the outbound transactions against
- # whatever path led us to this point: that's pretty arbitrary really.
- #
- # (this also means we can fire off _perform_transaction without
- # yielding)
- with logcontext.PreserveLoggingContext():
- self._transaction_transmission_loop(destination)
+ run_as_background_process(
+ "federation_transaction_transmission_loop",
+ self._transaction_transmission_loop,
+ destination,
+ )
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 20fb46fc89..65f6041b10 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -906,16 +906,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -1371,18 +1361,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1454,18 +1432,6 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
@@ -1730,15 +1696,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6a0d75b2b..25b6307884 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
-from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.client import (
+ Agent,
+ BrowserLikeRedirectAgent,
+ ContentDecoderAgent,
+ FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder,
HTTPConnectionPool,
PartialDownloadError,
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index f24b4b949c..588e280571 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+ "synapse_http_server_response_time_seconds", "sec",
+ ["method", "servlet", "tag", "code"],
)
response_ru_utime = Counter(
@@ -171,11 +172,13 @@ class RequestMetrics(object):
)
return
- outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+ response_code = str(request.code)
+
+ outgoing_responses_counter.labels(request.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag).observe(
+ response_timer.labels(request.method, self.name, tag, response_code).observe(
time_sec - self.start
)
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
new file mode 100644
index 0000000000..9d820e44a6
--- /dev/null
+++ b/synapse/metrics/background_process_metrics.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+_background_process_start_count = Counter(
+ "synapse_background_process_start_count",
+ "Number of background processes started",
+ ["name"],
+)
+
+# we set registry=None in all of these to stop them getting registered with
+# the default registry. Instead we collect them all via the CustomCollector,
+# which ensures that we can update them before they are collected.
+#
+_background_process_ru_utime = Counter(
+ "synapse_background_process_ru_utime_seconds",
+ "User CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_ru_stime = Counter(
+ "synapse_background_process_ru_stime_seconds",
+ "System CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_count = Counter(
+ "synapse_background_process_db_txn_count",
+ "Number of database transactions done by background processes",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_duration = Counter(
+ "synapse_background_process_db_txn_duration_seconds",
+ ("Seconds spent by background processes waiting for database "
+ "transactions, excluding scheduling time"),
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_sched_duration = Counter(
+ "synapse_background_process_db_sched_duration_seconds",
+ "Seconds spent by background processes waiting for database connections",
+ ["name"],
+ registry=None,
+)
+
+# map from description to a counter, so that we can name our logcontexts
+# incrementally. (It actually duplicates _background_process_start_count, but
+# it's much simpler to do so than to try to combine them.)
+_background_process_counts = dict() # type: dict[str, int]
+
+# map from description to the currently running background processes.
+#
+# it's kept as a dict of sets rather than a big set so that we can keep track
+# of process descriptions that no longer have any active processes.
+_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
+
+
+class _Collector(object):
+ """A custom metrics collector for the background process metrics.
+
+ Ensures that all of the metrics are up-to-date with any in-flight processes
+ before they are returned.
+ """
+ def collect(self):
+ background_process_in_flight_count = GaugeMetricFamily(
+ "synapse_background_process_in_flight_count",
+ "Number of background processes in flight",
+ labels=["name"],
+ )
+
+ for desc, processes in six.iteritems(_background_processes):
+ background_process_in_flight_count.add_metric(
+ (desc,), len(processes),
+ )
+ for process in processes:
+ process.update_metrics()
+
+ yield background_process_in_flight_count
+
+ # now we need to run collect() over each of the static Counters, and
+ # yield each metric they return.
+ for m in (
+ _background_process_ru_utime,
+ _background_process_ru_stime,
+ _background_process_db_txn_count,
+ _background_process_db_txn_duration,
+ _background_process_db_sched_duration,
+ ):
+ for r in m.collect():
+ yield r
+
+
+REGISTRY.register(_Collector())
+
+
+class _BackgroundProcess(object):
+ def __init__(self, desc, ctx):
+ self.desc = desc
+ self._context = ctx
+ self._reported_stats = None
+
+ def update_metrics(self):
+ """Updates the metrics with values from this process."""
+ new_stats = self._context.get_resource_usage()
+ if self._reported_stats is None:
+ diff = new_stats
+ else:
+ diff = new_stats - self._reported_stats
+ self._reported_stats = new_stats
+
+ _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
+ _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
+ _background_process_db_txn_count.labels(self.desc).inc(
+ diff.db_txn_count,
+ )
+ _background_process_db_txn_duration.labels(self.desc).inc(
+ diff.db_txn_duration_sec,
+ )
+ _background_process_db_sched_duration.labels(self.desc).inc(
+ diff.db_sched_duration_sec,
+ )
+
+
+def run_as_background_process(desc, func, *args, **kwargs):
+ """Run the given function in its own logcontext, with resource metrics
+
+ This should be used to wrap processes which are fired off to run in the
+ background, instead of being associated with a particular request.
+
+ Args:
+ desc (str): a description for this background process type
+ func: a function, which may return a Deferred
+ args: positional args for func
+ kwargs: keyword args for func
+
+ Returns: None
+ """
+ @defer.inlineCallbacks
+ def run():
+ count = _background_process_counts.get(desc, 0)
+ _background_process_counts[desc] = count + 1
+ _background_process_start_count.labels(desc).inc()
+
+ with LoggingContext(desc) as context:
+ context.request = "%s-%i" % (desc, count)
+ proc = _BackgroundProcess(desc, context)
+ _background_processes.setdefault(desc, set()).add(proc)
+ try:
+ yield func(*args, **kwargs)
+ finally:
+ proc.update_metrics()
+ _background_processes[desc].remove(proc)
+
+ with PreserveLoggingContext():
+ run()
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 75c2a4ec8e..3418f06fd6 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,13 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import PY3
+
from synapse.http.server import JsonResource
from synapse.rest.client import versions
-from synapse.rest.client.v1 import admin, directory, events, initial_sync
-from synapse.rest.client.v1 import login as v1_login
-from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher
-from synapse.rest.client.v1 import register as v1_register
-from synapse.rest.client.v1 import room, voip
+from synapse.rest.client.v1 import (
+ admin,
+ directory,
+ events,
+ initial_sync,
+ login as v1_login,
+ logout,
+ presence,
+ profile,
+ push_rule,
+ pusher,
+ room,
+ voip,
+)
from synapse.rest.client.v2_alpha import (
account,
account_data,
@@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
user_directory,
)
+if not PY3:
+ from synapse.rest.client.v1_only import (
+ register as v1_register,
+ )
+
class ClientRestResource(JsonResource):
"""A resource for version 1 of the matrix client API."""
@@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
def register_servlets(client_resource, hs):
versions.register_servlets(client_resource)
- # "v1"
- room.register_servlets(hs, client_resource)
+ if not PY3:
+ # "v1" (Python 2 only)
+ v1_register.register_servlets(hs, client_resource)
+
+ # Deprecated in r0
+ initial_sync.register_servlets(hs, client_resource)
+ room.register_deprecated_servlets(hs, client_resource)
+
+ # Partially deprecated in r0
events.register_servlets(hs, client_resource)
- v1_register.register_servlets(hs, client_resource)
+
+ # "v1" + "r0"
+ room.register_servlets(hs, client_resource)
v1_login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
- initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3d62447854..b9512a2b61 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -832,10 +832,13 @@ def register_servlets(hs, http_server):
RoomSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
- RoomInitialSyncRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
+
+
+def register_deprecated_servlets(hs, http_server):
+ RoomInitialSyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py
new file mode 100644
index 0000000000..936f902ace
--- /dev/null
+++ b/synapse/rest/client/v1_only/__init__.py
@@ -0,0 +1,3 @@
+"""
+REST APIs that are only used in v1 (the legacy API).
+"""
diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py
new file mode 100644
index 0000000000..9d4db7437c
--- /dev/null
+++ b/synapse/rest/client/v1_only/base.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module contains base REST classes for constructing client v1 servlets.
+"""
+
+import re
+
+from synapse.api.urls import CLIENT_PREFIX
+
+
+def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
+ """Creates a regex compiled client path with the correct client path
+ prefix.
+
+ Args:
+ path_regex (str): The regex string to match. This should NOT have a ^
+ as this will be prefixed.
+ Returns:
+ list of SRE_Pattern
+ """
+ patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
+ if include_in_unstable:
+ unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
+ patterns.append(re.compile("^" + unstable_prefix + path_regex))
+ return patterns
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py
index 25a143af8d..3439c3c6d4 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
+from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester
-from .base import ClientV1RestServlet, client_path_patterns
+from .base import v1_only_client_path_patterns
logger = logging.getLogger(__name__)
@@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
handler doesn't have a concept of multi-stages or sessions.
"""
- PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False)
+ PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
def __init__(self, hs):
"""
@@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
"""Handles user creation via a server-to-server interface
"""
- PATTERNS = client_path_patterns("/createUser$", releases=())
+ PATTERNS = v1_only_client_path_patterns("/createUser$")
def __init__(self, hs):
super(CreateUserRestServlet, self).__init__(hs)
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index dc9eca7d15..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -19,6 +19,8 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
+
from . import engines
from ._base import SQLBaseStore
@@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {}
self._all_done = False
- @defer.inlineCallbacks
def start_doing_background_updates(self):
- logger.info("Starting background schema updates")
+ run_as_background_process(
+ "background_updates", self._run_background_updates,
+ )
+ @defer.inlineCallbacks
+ def _run_background_updates(self):
+ logger.info("Starting background schema updates")
while True:
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b78eda3413..77ae10da3d 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates
@@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
- to_update = self._batch_row_update
- self._batch_row_update = {}
- return self.runInteraction(
- "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+ def update():
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn,
+ to_update,
+ )
+
+ run_as_background_process(
+ "update_client_ips", update,
)
def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..4ff0fdc4ab 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,12 +33,13 @@ from synapse.api.errors import SynapseError
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
+from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -155,11 +156,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 67433606c6..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
@@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
- with PreserveLoggingContext():
- self.runWithConnection(
- self._do_fetch
- )
+ run_as_background_process(
+ "fetch_events",
+ self.runWithConnection,
+ self._do_fetch,
+ )
logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 4abca91f6d..465adc54a8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,7 @@
import logging
from collections import OrderedDict
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
@@ -63,7 +64,10 @@ class ExpiringCache(object):
return
def f():
- self._prune_cache()
+ run_as_background_process(
+ "prune_cache_%s" % self._cache_name,
+ self._prune_cache,
+ )
self._clock.looping_call(f, self._expiry_ms / 2)
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
from twisted.internet import defer
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_left_room", user=user, room_id=room_id)
+ distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_joined_room", user=user, room_id=room_id)
+ distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
model will do for today.
"""
- def __init__(self, suppress_failures=True):
- self.suppress_failures = suppress_failures
-
+ def __init__(self):
self.signals = {}
self.pre_registration = {}
@@ -56,7 +52,6 @@ class Distributor(object):
self.signals[name] = Signal(
name,
- suppress_failures=self.suppress_failures,
)
if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
self.pre_registration[name].append(observer)
def fire(self, name, *args, **kwargs):
+ """Dispatches the given signal to the registered observers.
+
+ Runs the observers as a background process. Does not return a deferred.
+ """
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))
- return self.signals[name].fire(*args, **kwargs)
+ run_as_background_process(
+ name,
+ self.signals[name].fire,
+ *args, **kwargs
+ )
class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers.
"""
- def __init__(self, name, suppress_failures):
+ def __init__(self, name):
self.name = name
- self.suppress_failures = suppress_failures
self.observers = []
def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
- @defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
failure.type,
failure.value,
failure.getTracebackObject()))
- if not self.suppress_failures:
- return failure
return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
- with PreserveLoggingContext():
- deferreds = [
- do(observer)
- for observer in self.observers
- ]
-
- res = yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ deferreds = [
+ run_in_background(do, o)
+ for o in self.observers
+ ]
- defer.returnValue(res)
+ return make_deferred_yieldable(defer.gatherResults(
+ deferreds, consumeErrors=True,
+ ))
def __repr__(self):
return "<Signal name=%r>" % (self.name,)
|