diff options
31 files changed, 351 insertions, 98 deletions
diff --git a/README.rst b/README.rst index 2471619706..9c0f9c09c8 100644 --- a/README.rst +++ b/README.rst @@ -908,7 +908,7 @@ to install using pip and a virtualenv:: virtualenv -p python2.7 env source env/bin/activate - python synapse/python_dependencies.py | xargs pip install + python -m synapse.python_dependencies | xargs pip install pip install lxml mock This will run a process of downloading and installing all the needed @@ -963,5 +963,13 @@ variable. The default is 0.5, which can be decreased to reduce RAM usage in memory constrained enviroments, or increased if performance starts to degrade. +Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant +improvement in overall amount, and especially in terms of giving back RAM +to the OS. To use it, the library must simply be put in the LD_PRELOAD +environment variable when launching Synapse. On Debian, this can be done +by installing the ``libjemalloc1`` package and adding this line to +``/etc/default/matrix-synapse``:: + + LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1 .. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys diff --git a/changelog.d/3860.misc b/changelog.d/3860.misc new file mode 100644 index 0000000000..364239d3e3 --- /dev/null +++ b/changelog.d/3860.misc @@ -0,0 +1 @@ +Fix typo in replication stream exception. diff --git a/changelog.d/3871.misc b/changelog.d/3871.misc new file mode 100644 index 0000000000..dd9510ceb6 --- /dev/null +++ b/changelog.d/3871.misc @@ -0,0 +1 @@ +Add in flight real time metrics for Measure blocks diff --git a/changelog.d/3872.misc b/changelog.d/3872.misc new file mode 100644 index 0000000000..b450c506d8 --- /dev/null +++ b/changelog.d/3872.misc @@ -0,0 +1 @@ +Disable buffering and automatic retrying in treq requests to prevent timeouts. \ No newline at end of file diff --git a/changelog.d/3874.bugfix b/changelog.d/3874.bugfix new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/changelog.d/3874.bugfix diff --git a/changelog.d/3875.bugfix b/changelog.d/3875.bugfix new file mode 100644 index 0000000000..2d2147dd4b --- /dev/null +++ b/changelog.d/3875.bugfix @@ -0,0 +1 @@ +Mitigate outbound federation randomly becoming wedged diff --git a/changelog.d/3877.misc b/changelog.d/3877.misc new file mode 100644 index 0000000000..a80fec4bd8 --- /dev/null +++ b/changelog.d/3877.misc @@ -0,0 +1 @@ +mention jemalloc in the README diff --git a/changelog.d/3879.bugfix b/changelog.d/3879.bugfix new file mode 100644 index 0000000000..82cb2a8ebc --- /dev/null +++ b/changelog.d/3879.bugfix @@ -0,0 +1 @@ +Don't ratelimit autojoins diff --git a/changelog.d/3883.feature b/changelog.d/3883.feature new file mode 100644 index 0000000000..c11e5c5309 --- /dev/null +++ b/changelog.d/3883.feature @@ -0,0 +1 @@ +Adding the ability to change MAX_UPLOAD_SIZE for the docker container variables. \ No newline at end of file diff --git a/changelog.d/3888.misc b/changelog.d/3888.misc new file mode 100644 index 0000000000..a10ede547e --- /dev/null +++ b/changelog.d/3888.misc @@ -0,0 +1 @@ +Remove unmaintained "nuke-room-from-db.sh" script diff --git a/changelog.d/3889.bugfix b/changelog.d/3889.bugfix new file mode 100644 index 0000000000..e976425987 --- /dev/null +++ b/changelog.d/3889.bugfix @@ -0,0 +1 @@ +Fix 500 error when deleting unknown room alias diff --git a/changelog.d/3892.bugfix b/changelog.d/3892.bugfix new file mode 100644 index 0000000000..8b30afab04 --- /dev/null +++ b/changelog.d/3892.bugfix @@ -0,0 +1 @@ +Fix some b'abcd' noise in logs and metrics diff --git a/changelog.d/3895.bugfix b/changelog.d/3895.bugfix new file mode 100644 index 0000000000..8b30afab04 --- /dev/null +++ b/changelog.d/3895.bugfix @@ -0,0 +1 @@ +Fix some b'abcd' noise in logs and metrics diff --git a/changelog.d/3897.misc b/changelog.d/3897.misc new file mode 100644 index 0000000000..87e7ac796e --- /dev/null +++ b/changelog.d/3897.misc @@ -0,0 +1 @@ +Fix typo in README, synaspse -> synapse \ No newline at end of file diff --git a/docker/README.md b/docker/README.md index 038c78f7c0..3c00d1e948 100644 --- a/docker/README.md +++ b/docker/README.md @@ -88,6 +88,7 @@ variables are available for configuration: * ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN uris to enable TURN for this homeserver. * ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required. +* ``SYNAPSE_MAX_UPLOAD_SIZE``, set this variable to change the max upload size [default `10M`]. Shared secrets, that will be initialized to random values if not set: diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index 6bc25bb45f..cfe88788f2 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -85,7 +85,7 @@ federation_rc_concurrent: 3 media_store_path: "/data/media" uploads_path: "/data/uploads" -max_upload_size: "10M" +max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "10M" }}" max_image_pixels: "32M" dynamic_thumbnails: false diff --git a/scripts-dev/nuke-room-from-db.sh b/scripts-dev/nuke-room-from-db.sh deleted file mode 100755 index c62928afdb..0000000000 --- a/scripts-dev/nuke-room-from-db.sh +++ /dev/null @@ -1,57 +0,0 @@ -#!/bin/bash - -## CAUTION: -## This script will remove (hopefully) all trace of the given room ID from -## your homeserver.db - -## Do not run it lightly. - -set -e - -if [ "$1" == "-h" ] || [ "$1" == "" ]; then - echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run" - echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db" - echo "or" - echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse" - exit -fi - -ROOMID="$1" - -cat <<EOF -DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID'; -DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID'; -DELETE FROM event_edges WHERE room_id = '$ROOMID'; -DELETE FROM room_depth WHERE room_id = '$ROOMID'; -DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID'; -DELETE FROM events WHERE room_id = '$ROOMID'; -DELETE FROM event_json WHERE room_id = '$ROOMID'; -DELETE FROM state_events WHERE room_id = '$ROOMID'; -DELETE FROM current_state_events WHERE room_id = '$ROOMID'; -DELETE FROM room_memberships WHERE room_id = '$ROOMID'; -DELETE FROM feedback WHERE room_id = '$ROOMID'; -DELETE FROM topics WHERE room_id = '$ROOMID'; -DELETE FROM room_names WHERE room_id = '$ROOMID'; -DELETE FROM rooms WHERE room_id = '$ROOMID'; -DELETE FROM room_hosts WHERE room_id = '$ROOMID'; -DELETE FROM room_aliases WHERE room_id = '$ROOMID'; -DELETE FROM state_groups WHERE room_id = '$ROOMID'; -DELETE FROM state_groups_state WHERE room_id = '$ROOMID'; -DELETE FROM receipts_graph WHERE room_id = '$ROOMID'; -DELETE FROM receipts_linearized WHERE room_id = '$ROOMID'; -DELETE FROM event_search WHERE room_id = '$ROOMID'; -DELETE FROM guest_access WHERE room_id = '$ROOMID'; -DELETE FROM history_visibility WHERE room_id = '$ROOMID'; -DELETE FROM room_tags WHERE room_id = '$ROOMID'; -DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID'; -DELETE FROM room_account_data WHERE room_id = '$ROOMID'; -DELETE FROM event_push_actions WHERE room_id = '$ROOMID'; -DELETE FROM local_invites WHERE room_id = '$ROOMID'; -DELETE FROM pusher_throttle WHERE room_id = '$ROOMID'; -DELETE FROM event_reports WHERE room_id = '$ROOMID'; -DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID'; -DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID'; -DELETE FROM event_auth WHERE room_id = '$ROOMID'; -DELETE FROM appservice_room_list WHERE room_id = '$ROOMID'; -VACUUM; -EOF diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index ef866da1b6..18741c5fac 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -20,7 +20,14 @@ import string from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError +from synapse.api.errors import ( + AuthError, + CodeMessageException, + Codes, + NotFoundError, + StoreError, + SynapseError, +) from synapse.types import RoomAlias, UserID, get_domain_from_id from ._base import BaseHandler @@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler): def delete_association(self, requester, user_id, room_alias): # association deletion for human users - can_delete = yield self._user_can_delete_alias(room_alias, user_id) + try: + can_delete = yield self._user_can_delete_alias(room_alias, user_id) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown room alias") + raise + if not can_delete: raise AuthError( 403, "You don't have permission to delete the alias.", @@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler): def _user_can_delete_alias(self, alias, user_id): creator = yield self.store.get_room_alias_creator(alias.to_string()) - if creator and creator == user_id: + if creator is not None and creator == user_id: defer.returnValue(True) is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 5170d093e3..a155b6e938 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -269,14 +269,7 @@ class PaginationHandler(object): if state_ids: state = yield self.store.get_events(list(state_ids.values())) - - if state: - state = yield filter_events_for_client( - self.store, - user_id, - state.values(), - is_peeking=(member_event_id is None), - ) + state = state.values() time_now = self.clock.time_msec() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1e53f2c635..da914c46ff 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler): room_id=room_id, remote_room_hosts=remote_room_hosts, action="join", + ratelimit=False, ) diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 58ef8d3ce4..a3f9e4f67c 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout): return value -ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') +ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') def redact_uri(uri): """Strips access tokens from the uri replaces with <redacted>""" return ACCESS_TOKEN_RE.sub( - br'\1<redacted>\3', + r'\1<redacted>\3', uri ) diff --git a/synapse/http/client.py b/synapse/http/client.py index d60f87b04e..ec339a92ad 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -93,7 +93,7 @@ class SimpleHttpClient(object): outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) - logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii'))) + logger.info("Sending request %s %s", method, redact_uri(uri)) try: request_deferred = treq.request( @@ -108,14 +108,14 @@ class SimpleHttpClient(object): incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, redact_uri(uri.encode('ascii')), response.code + method, redact_uri(uri), response.code ) defer.returnValue(response) except Exception as e: incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0] + method, redact_uri(uri), type(e).__name__, e.args[0] ) raise diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c49dbacd93..083484a687 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -42,7 +42,9 @@ from synapse.api.errors import ( ) from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext +from synapse.util.async_helpers import timeout_no_seriously from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") @@ -91,6 +93,7 @@ class MatrixFederationHttpClient(object): self.server_name = hs.hostname reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) + pool.retryAutomatically = False pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 self.agent = Agent.usingEndpointFactory( @@ -221,14 +224,30 @@ class MatrixFederationHttpClient(object): headers=Headers(headers_dict), data=data, agent=self.agent, - reactor=self.hs.get_reactor() + reactor=self.hs.get_reactor(), + unbuffered=True ) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - response = yield make_deferred_yieldable( + + # Sometimes the timeout above doesn't work, so lets hack yet + # another layer of timeouts in in the vain hope that at some + # point the world made sense and this really really really + # should work. + request_deferred = timeout_no_seriously( request_deferred, + timeout=_sec_timeout * 2, + reactor=self.hs.get_reactor(), ) - log_result = "%d %s" % (response.code, response.phrase,) + with Measure(self.clock, "outbound_request"): + response = yield make_deferred_yieldable( + request_deferred, + ) + + log_result = "%d %s" % ( + response.code, + response.phrase.decode('ascii', errors='replace'), + ) break except Exception as e: if not retry_on_dns_fail and isinstance(e, DNSLookupError): diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 72c2654678..fedb4e6b18 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -162,7 +162,7 @@ class RequestMetrics(object): with _in_flight_requests_lock: _in_flight_requests.add(self) - def stop(self, time_sec, request): + def stop(self, time_sec, response_code, sent_bytes): with _in_flight_requests_lock: _in_flight_requests.discard(self) @@ -179,35 +179,35 @@ class RequestMetrics(object): ) return - response_code = str(request.code) + response_code = str(response_code) - outgoing_responses_counter.labels(request.method, response_code).inc() + outgoing_responses_counter.labels(self.method, response_code).inc() - response_count.labels(request.method, self.name, tag).inc() + response_count.labels(self.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag, response_code).observe( + response_timer.labels(self.method, self.name, tag, response_code).observe( time_sec - self.start ) resource_usage = context.get_resource_usage() - response_ru_utime.labels(request.method, self.name, tag).inc( + response_ru_utime.labels(self.method, self.name, tag).inc( resource_usage.ru_utime, ) - response_ru_stime.labels(request.method, self.name, tag).inc( + response_ru_stime.labels(self.method, self.name, tag).inc( resource_usage.ru_stime, ) - response_db_txn_count.labels(request.method, self.name, tag).inc( + response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count ) - response_db_txn_duration.labels(request.method, self.name, tag).inc( + response_db_txn_duration.labels(self.method, self.name, tag).inc( resource_usage.db_txn_duration_sec ) - response_db_sched_duration.labels(request.method, self.name, tag).inc( + response_db_sched_duration.labels(self.method, self.name, tag).inc( resource_usage.db_sched_duration_sec ) - response_size.labels(request.method, self.name, tag).inc(request.sentLength) + response_size.labels(self.method, self.name, tag).inc(sent_bytes) # We always call this at the end to ensure that we update the metrics # regardless of whether a call to /metrics while the request was in diff --git a/synapse/http/site.py b/synapse/http/site.py index f0828c6542..9579e8cd0d 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -82,10 +82,13 @@ class SynapseRequest(Request): ) def get_request_id(self): - return "%s-%i" % (self.method, self.request_seq) + return "%s-%i" % (self.method.decode('ascii'), self.request_seq) def get_redacted_uri(self): - return redact_uri(self.uri) + uri = self.uri + if isinstance(uri, bytes): + uri = self.uri.decode('ascii') + return redact_uri(uri) def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] @@ -116,7 +119,7 @@ class SynapseRequest(Request): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.labels(self.method, + requests_counter.labels(self.method.decode('ascii'), self.request_metrics.name).inc() @contextlib.contextmanager @@ -277,15 +280,15 @@ class SynapseRequest(Request): int(usage.db_txn_count), self.sentLength, code, - self.method, + self.method.decode('ascii'), self.get_redacted_uri(), - self.clientproto, + self.clientproto.decode('ascii', errors='replace'), user_agent, usage.evt_db_fetch_count, ) try: - self.request_metrics.stop(self.finish_time, self) + self.request_metrics.stop(self.finish_time, self.code, self.sentLength) except Exception as e: logger.warn("Failed to stop metrics: %r", e) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 550f8443f7..59900aa5d1 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,8 +18,11 @@ import gc import logging import os import platform +import threading import time +import six + import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import REGISTRY, GaugeMetricFamily @@ -68,7 +71,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in calls.items(): + for k, v in six.iteritems(calls): g.add_metric(k, v) else: g.add_metric([], calls) @@ -87,6 +90,109 @@ class LaterGauge(object): all_gauges[self.name] = self +class InFlightGauge(object): + """Tracks number of things (e.g. requests, Measure blocks, etc) in flight + at any given time. + + Each InFlightGauge will create a metric called `<name>_total` that counts + the number of in flight blocks, as well as a metrics for each item in the + given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the + callbacks. + + Args: + name (str) + desc (str) + labels (list[str]) + sub_metrics (list[str]): A list of sub metrics that the callbacks + will update. + """ + + def __init__(self, name, desc, labels, sub_metrics): + self.name = name + self.desc = desc + self.labels = labels + self.sub_metrics = sub_metrics + + # Create a class which have the sub_metrics values as attributes, which + # default to 0 on initialization. Used to pass to registered callbacks. + self._metrics_class = attr.make_class( + "_MetricsEntry", + attrs={x: attr.ib(0) for x in sub_metrics}, + slots=True, + ) + + # Counts number of in flight blocks for a given set of label values + self._registrations = {} + + # Protects access to _registrations + self._lock = threading.Lock() + + self._register_with_collector() + + def register(self, key, callback): + """Registers that we've entered a new block with labels `key`. + + `callback` gets called each time the metrics are collected. The same + value must also be given to `unregister`. + + `callback` gets called with an object that has an attribute per + sub_metric, which should be updated with the necessary values. Note that + the metrics object is shared between all callbacks registered with the + same key. + + Note that `callback` may be called on a separate thread. + """ + with self._lock: + self._registrations.setdefault(key, set()).add(callback) + + def unregister(self, key, callback): + """Registers that we've exited a block with labels `key`. + """ + + with self._lock: + self._registrations.setdefault(key, set()).discard(callback) + + def collect(self): + """Called by prometheus client when it reads metrics. + + Note: may be called by a separate thread. + """ + in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels) + + metrics_by_key = {} + + # We copy so that we don't mutate the list while iterating + with self._lock: + keys = list(self._registrations) + + for key in keys: + with self._lock: + callbacks = set(self._registrations[key]) + + in_flight.add_metric(key, len(callbacks)) + + metrics = self._metrics_class() + metrics_by_key[key] = metrics + for callback in callbacks: + callback(metrics) + + yield in_flight + + for name in self.sub_metrics: + gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels) + for key, metrics in six.iteritems(metrics_by_key): + gauge.add_metric(key, getattr(metrics, name)) + yield gauge + + def _register_with_collector(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + + # # Detailed CPU metrics # diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 55fe701c5c..c1e626be3f 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -196,7 +196,7 @@ class Stream(object): ) if len(rows) >= MAX_EVENTS_BEHIND: - raise Exception("stream %s has fallen behined" % (self.NAME)) + raise Exception("stream %s has fallen behind" % (self.NAME)) else: rows = yield self.update_function( from_token, current_token, diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 808194236a..cfb687cb53 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore): }, retcol="creator", desc="get_room_alias_creator", - allow_none=True ) @cached(max_entries=5000) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 9b3f2f4b96..40c2946129 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -438,3 +438,55 @@ def _cancelled_to_timed_out_error(value, timeout): value.trap(CancelledError) raise DeferredTimeoutError(timeout, "Deferred") return value + + +def timeout_no_seriously(deferred, timeout, reactor): + """The in build twisted deferred addTimeout (and the method above) + completely fail to time things out under some unknown circumstances. + + Lets try a different way of timing things out and maybe that will make + things work?! + + TODO: Kill this with fire. + """ + + new_d = defer.Deferred() + + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + + if not new_d.called: + new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + return _cancelled_to_timed_out_error(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + def success_cb(val): + if not new_d.called: + new_d.callback(val) + + def failure_cb(val): + if not new_d.called: + new_d.errback(val) + + deferred.addCallbacks(success_cb, failure_cb) + + return new_d diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 97f1267380..4b4ac5f6c7 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -20,6 +20,7 @@ from prometheus_client import Counter from twisted.internet import defer +from synapse.metrics import InFlightGauge from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) @@ -45,6 +46,13 @@ block_db_txn_duration = Counter( block_db_sched_duration = Counter( "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) +# Tracks the number of blocks currently active +in_flight = InFlightGauge( + "synapse_util_metrics_block_in_flight", "", + labels=["block_name"], + sub_metrics=["real_time_max", "real_time_sum"], +) + def measure_func(name): def wrapper(func): @@ -82,10 +90,14 @@ class Measure(object): self.start_usage = self.start_context.get_resource_usage() + in_flight.register((self.name,), self._update_in_flight) + def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: return + in_flight.unregister((self.name,), self._update_in_flight) + duration = self.clock.time() - self.start block_counter.labels(self.name).inc() @@ -120,3 +132,13 @@ class Measure(object): if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) + + def _update_in_flight(self, metrics): + """Gets called when processing in flight metrics + """ + duration = self.clock.time() - self.start + + metrics.real_time_max = max(metrics.real_time_max, duration) + metrics.real_time_sum += duration + + # TODO: Add other in flight metrics. diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000000..17897711a1 --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from synapse.metrics import InFlightGauge + +from tests import unittest + + +class TestMauLimit(unittest.TestCase): + def test_basic(self): + gauge = InFlightGauge( + "test1", "", + labels=["test_label"], + sub_metrics=["foo", "bar"], + ) + + def handle1(metrics): + metrics.foo += 2 + metrics.bar = max(metrics.bar, 5) + + def handle2(metrics): + metrics.foo += 3 + metrics.bar = max(metrics.bar, 7) + + gauge.register(("key1",), handle1) + + self.assert_dict({ + "test1_total": {("key1",): 1}, + "test1_foo": {("key1",): 2}, + "test1_bar": {("key1",): 5}, + }, self.get_metrics_from_gauge(gauge)) + + gauge.unregister(("key1",), handle1) + + self.assert_dict({ + "test1_total": {("key1",): 0}, + "test1_foo": {("key1",): 0}, + "test1_bar": {("key1",): 0}, + }, self.get_metrics_from_gauge(gauge)) + + gauge.register(("key1",), handle1) + gauge.register(("key2",), handle2) + + self.assert_dict({ + "test1_total": {("key1",): 1, ("key2",): 1}, + "test1_foo": {("key1",): 2, ("key2",): 3}, + "test1_bar": {("key1",): 5, ("key2",): 7}, + }, self.get_metrics_from_gauge(gauge)) + + gauge.unregister(("key2",), handle2) + gauge.register(("key1",), handle2) + + self.assert_dict({ + "test1_total": {("key1",): 2, ("key2",): 0}, + "test1_foo": {("key1",): 5, ("key2",): 0}, + "test1_bar": {("key1",): 7, ("key2",): 0}, + }, self.get_metrics_from_gauge(gauge)) + + def get_metrics_from_gauge(self, gauge): + results = {} + + for r in gauge.collect(): + results[r.name] = { + tuple(labels[x] for x in gauge.labels): value + for _, labels, value in r.samples + } + + return results |