diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a4e16fa96..d07bd24ffd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -30,10 +30,10 @@ class VoipConfig(Config):
## Turn ##
# The public URIs of the TURN server to give to clients
- turn_uris: []
+ #turn_uris: []
# The shared secret used to compute passwords for the TURN server
- turn_shared_secret: "YOUR_SHARED_SECRET"
+ #turn_shared_secret: "YOUR_SHARED_SECRET"
# The Username and password if the TURN server needs them and
# does not use a token
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 979545d613..fd491b9359 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
)
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -165,10 +166,11 @@ class TransactionQueue(object):
if self._is_processing:
return
- # fire off a processing loop in the background. It's likely it will
- # outlast the current request, so run it in the sentinel logcontext.
- with PreserveLoggingContext():
- self._process_event_queue_loop()
+ # fire off a processing loop in the background
+ run_as_background_process(
+ "process_transaction_queue",
+ self._process_event_queue_loop,
+ )
@defer.inlineCallbacks
def _process_event_queue_loop(self):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 20fb46fc89..65f6041b10 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -906,16 +906,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -1371,18 +1361,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1454,18 +1432,6 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
@@ -1730,15 +1696,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index f24b4b949c..588e280571 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+ "synapse_http_server_response_time_seconds", "sec",
+ ["method", "servlet", "tag", "code"],
)
response_ru_utime = Counter(
@@ -171,11 +172,13 @@ class RequestMetrics(object):
)
return
- outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+ response_code = str(request.code)
+
+ outgoing_responses_counter.labels(request.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag).observe(
+ response_timer.labels(request.method, self.name, tag, response_code).observe(
time_sec - self.start
)
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
new file mode 100644
index 0000000000..9d820e44a6
--- /dev/null
+++ b/synapse/metrics/background_process_metrics.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+_background_process_start_count = Counter(
+ "synapse_background_process_start_count",
+ "Number of background processes started",
+ ["name"],
+)
+
+# we set registry=None in all of these to stop them getting registered with
+# the default registry. Instead we collect them all via the CustomCollector,
+# which ensures that we can update them before they are collected.
+#
+_background_process_ru_utime = Counter(
+ "synapse_background_process_ru_utime_seconds",
+ "User CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_ru_stime = Counter(
+ "synapse_background_process_ru_stime_seconds",
+ "System CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_count = Counter(
+ "synapse_background_process_db_txn_count",
+ "Number of database transactions done by background processes",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_duration = Counter(
+ "synapse_background_process_db_txn_duration_seconds",
+ ("Seconds spent by background processes waiting for database "
+ "transactions, excluding scheduling time"),
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_sched_duration = Counter(
+ "synapse_background_process_db_sched_duration_seconds",
+ "Seconds spent by background processes waiting for database connections",
+ ["name"],
+ registry=None,
+)
+
+# map from description to a counter, so that we can name our logcontexts
+# incrementally. (It actually duplicates _background_process_start_count, but
+# it's much simpler to do so than to try to combine them.)
+_background_process_counts = dict() # type: dict[str, int]
+
+# map from description to the currently running background processes.
+#
+# it's kept as a dict of sets rather than a big set so that we can keep track
+# of process descriptions that no longer have any active processes.
+_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
+
+
+class _Collector(object):
+ """A custom metrics collector for the background process metrics.
+
+ Ensures that all of the metrics are up-to-date with any in-flight processes
+ before they are returned.
+ """
+ def collect(self):
+ background_process_in_flight_count = GaugeMetricFamily(
+ "synapse_background_process_in_flight_count",
+ "Number of background processes in flight",
+ labels=["name"],
+ )
+
+ for desc, processes in six.iteritems(_background_processes):
+ background_process_in_flight_count.add_metric(
+ (desc,), len(processes),
+ )
+ for process in processes:
+ process.update_metrics()
+
+ yield background_process_in_flight_count
+
+ # now we need to run collect() over each of the static Counters, and
+ # yield each metric they return.
+ for m in (
+ _background_process_ru_utime,
+ _background_process_ru_stime,
+ _background_process_db_txn_count,
+ _background_process_db_txn_duration,
+ _background_process_db_sched_duration,
+ ):
+ for r in m.collect():
+ yield r
+
+
+REGISTRY.register(_Collector())
+
+
+class _BackgroundProcess(object):
+ def __init__(self, desc, ctx):
+ self.desc = desc
+ self._context = ctx
+ self._reported_stats = None
+
+ def update_metrics(self):
+ """Updates the metrics with values from this process."""
+ new_stats = self._context.get_resource_usage()
+ if self._reported_stats is None:
+ diff = new_stats
+ else:
+ diff = new_stats - self._reported_stats
+ self._reported_stats = new_stats
+
+ _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
+ _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
+ _background_process_db_txn_count.labels(self.desc).inc(
+ diff.db_txn_count,
+ )
+ _background_process_db_txn_duration.labels(self.desc).inc(
+ diff.db_txn_duration_sec,
+ )
+ _background_process_db_sched_duration.labels(self.desc).inc(
+ diff.db_sched_duration_sec,
+ )
+
+
+def run_as_background_process(desc, func, *args, **kwargs):
+ """Run the given function in its own logcontext, with resource metrics
+
+ This should be used to wrap processes which are fired off to run in the
+ background, instead of being associated with a particular request.
+
+ Args:
+ desc (str): a description for this background process type
+ func: a function, which may return a Deferred
+ args: positional args for func
+ kwargs: keyword args for func
+
+ Returns: None
+ """
+ @defer.inlineCallbacks
+ def run():
+ count = _background_process_counts.get(desc, 0)
+ _background_process_counts[desc] = count + 1
+ _background_process_start_count.labels(desc).inc()
+
+ with LoggingContext(desc) as context:
+ context.request = "%s-%i" % (desc, count)
+ proc = _BackgroundProcess(desc, context)
+ _background_processes.setdefault(desc, set()).add(proc)
+ try:
+ yield func(*args, **kwargs)
+ finally:
+ proc.update_metrics()
+ _background_processes[desc].remove(proc)
+
+ with PreserveLoggingContext():
+ run()
|