diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/directory.py | 19 | ||||
-rw-r--r-- | synapse/handlers/pagination.py | 9 | ||||
-rw-r--r-- | synapse/handlers/register.py | 1 | ||||
-rw-r--r-- | synapse/http/__init__.py | 4 | ||||
-rw-r--r-- | synapse/http/client.py | 6 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 25 | ||||
-rw-r--r-- | synapse/http/request_metrics.py | 22 | ||||
-rw-r--r-- | synapse/http/site.py | 15 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 108 | ||||
-rw-r--r-- | synapse/replication/tcp/streams.py | 2 | ||||
-rw-r--r-- | synapse/storage/directory.py | 1 | ||||
-rw-r--r-- | synapse/util/async_helpers.py | 52 | ||||
-rw-r--r-- | synapse/util/metrics.py | 22 |
13 files changed, 247 insertions, 39 deletions
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index ef866da1b6..18741c5fac 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -20,7 +20,14 @@ import string from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError +from synapse.api.errors import ( + AuthError, + CodeMessageException, + Codes, + NotFoundError, + StoreError, + SynapseError, +) from synapse.types import RoomAlias, UserID, get_domain_from_id from ._base import BaseHandler @@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler): def delete_association(self, requester, user_id, room_alias): # association deletion for human users - can_delete = yield self._user_can_delete_alias(room_alias, user_id) + try: + can_delete = yield self._user_can_delete_alias(room_alias, user_id) + except StoreError as e: + if e.code == 404: + raise NotFoundError("Unknown room alias") + raise + if not can_delete: raise AuthError( 403, "You don't have permission to delete the alias.", @@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler): def _user_can_delete_alias(self, alias, user_id): creator = yield self.store.get_room_alias_creator(alias.to_string()) - if creator and creator == user_id: + if creator is not None and creator == user_id: defer.returnValue(True) is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id)) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 5170d093e3..a155b6e938 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -269,14 +269,7 @@ class PaginationHandler(object): if state_ids: state = yield self.store.get_events(list(state_ids.values())) - - if state: - state = yield filter_events_for_client( - self.store, - user_id, - state.values(), - is_peeking=(member_event_id is None), - ) + state = state.values() time_now = self.clock.time_msec() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 1e53f2c635..da914c46ff 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler): room_id=room_id, remote_room_hosts=remote_room_hosts, action="join", + ratelimit=False, ) diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 58ef8d3ce4..a3f9e4f67c 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout): return value -ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') +ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') def redact_uri(uri): """Strips access tokens from the uri replaces with <redacted>""" return ACCESS_TOKEN_RE.sub( - br'\1<redacted>\3', + r'\1<redacted>\3', uri ) diff --git a/synapse/http/client.py b/synapse/http/client.py index d60f87b04e..ec339a92ad 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -93,7 +93,7 @@ class SimpleHttpClient(object): outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) - logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii'))) + logger.info("Sending request %s %s", method, redact_uri(uri)) try: request_deferred = treq.request( @@ -108,14 +108,14 @@ class SimpleHttpClient(object): incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, redact_uri(uri.encode('ascii')), response.code + method, redact_uri(uri), response.code ) defer.returnValue(response) except Exception as e: incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0] + method, redact_uri(uri), type(e).__name__, e.args[0] ) raise diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c49dbacd93..083484a687 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -42,7 +42,9 @@ from synapse.api.errors import ( ) from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext +from synapse.util.async_helpers import timeout_no_seriously from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") @@ -91,6 +93,7 @@ class MatrixFederationHttpClient(object): self.server_name = hs.hostname reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) + pool.retryAutomatically = False pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 self.agent = Agent.usingEndpointFactory( @@ -221,14 +224,30 @@ class MatrixFederationHttpClient(object): headers=Headers(headers_dict), data=data, agent=self.agent, - reactor=self.hs.get_reactor() + reactor=self.hs.get_reactor(), + unbuffered=True ) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - response = yield make_deferred_yieldable( + + # Sometimes the timeout above doesn't work, so lets hack yet + # another layer of timeouts in in the vain hope that at some + # point the world made sense and this really really really + # should work. + request_deferred = timeout_no_seriously( request_deferred, + timeout=_sec_timeout * 2, + reactor=self.hs.get_reactor(), ) - log_result = "%d %s" % (response.code, response.phrase,) + with Measure(self.clock, "outbound_request"): + response = yield make_deferred_yieldable( + request_deferred, + ) + + log_result = "%d %s" % ( + response.code, + response.phrase.decode('ascii', errors='replace'), + ) break except Exception as e: if not retry_on_dns_fail and isinstance(e, DNSLookupError): diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 72c2654678..fedb4e6b18 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -162,7 +162,7 @@ class RequestMetrics(object): with _in_flight_requests_lock: _in_flight_requests.add(self) - def stop(self, time_sec, request): + def stop(self, time_sec, response_code, sent_bytes): with _in_flight_requests_lock: _in_flight_requests.discard(self) @@ -179,35 +179,35 @@ class RequestMetrics(object): ) return - response_code = str(request.code) + response_code = str(response_code) - outgoing_responses_counter.labels(request.method, response_code).inc() + outgoing_responses_counter.labels(self.method, response_code).inc() - response_count.labels(request.method, self.name, tag).inc() + response_count.labels(self.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag, response_code).observe( + response_timer.labels(self.method, self.name, tag, response_code).observe( time_sec - self.start ) resource_usage = context.get_resource_usage() - response_ru_utime.labels(request.method, self.name, tag).inc( + response_ru_utime.labels(self.method, self.name, tag).inc( resource_usage.ru_utime, ) - response_ru_stime.labels(request.method, self.name, tag).inc( + response_ru_stime.labels(self.method, self.name, tag).inc( resource_usage.ru_stime, ) - response_db_txn_count.labels(request.method, self.name, tag).inc( + response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count ) - response_db_txn_duration.labels(request.method, self.name, tag).inc( + response_db_txn_duration.labels(self.method, self.name, tag).inc( resource_usage.db_txn_duration_sec ) - response_db_sched_duration.labels(request.method, self.name, tag).inc( + response_db_sched_duration.labels(self.method, self.name, tag).inc( resource_usage.db_sched_duration_sec ) - response_size.labels(request.method, self.name, tag).inc(request.sentLength) + response_size.labels(self.method, self.name, tag).inc(sent_bytes) # We always call this at the end to ensure that we update the metrics # regardless of whether a call to /metrics while the request was in diff --git a/synapse/http/site.py b/synapse/http/site.py index f0828c6542..9579e8cd0d 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -82,10 +82,13 @@ class SynapseRequest(Request): ) def get_request_id(self): - return "%s-%i" % (self.method, self.request_seq) + return "%s-%i" % (self.method.decode('ascii'), self.request_seq) def get_redacted_uri(self): - return redact_uri(self.uri) + uri = self.uri + if isinstance(uri, bytes): + uri = self.uri.decode('ascii') + return redact_uri(uri) def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] @@ -116,7 +119,7 @@ class SynapseRequest(Request): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.labels(self.method, + requests_counter.labels(self.method.decode('ascii'), self.request_metrics.name).inc() @contextlib.contextmanager @@ -277,15 +280,15 @@ class SynapseRequest(Request): int(usage.db_txn_count), self.sentLength, code, - self.method, + self.method.decode('ascii'), self.get_redacted_uri(), - self.clientproto, + self.clientproto.decode('ascii', errors='replace'), user_agent, usage.evt_db_fetch_count, ) try: - self.request_metrics.stop(self.finish_time, self) + self.request_metrics.stop(self.finish_time, self.code, self.sentLength) except Exception as e: logger.warn("Failed to stop metrics: %r", e) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 550f8443f7..59900aa5d1 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,8 +18,11 @@ import gc import logging import os import platform +import threading import time +import six + import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import REGISTRY, GaugeMetricFamily @@ -68,7 +71,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in calls.items(): + for k, v in six.iteritems(calls): g.add_metric(k, v) else: g.add_metric([], calls) @@ -87,6 +90,109 @@ class LaterGauge(object): all_gauges[self.name] = self +class InFlightGauge(object): + """Tracks number of things (e.g. requests, Measure blocks, etc) in flight + at any given time. + + Each InFlightGauge will create a metric called `<name>_total` that counts + the number of in flight blocks, as well as a metrics for each item in the + given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the + callbacks. + + Args: + name (str) + desc (str) + labels (list[str]) + sub_metrics (list[str]): A list of sub metrics that the callbacks + will update. + """ + + def __init__(self, name, desc, labels, sub_metrics): + self.name = name + self.desc = desc + self.labels = labels + self.sub_metrics = sub_metrics + + # Create a class which have the sub_metrics values as attributes, which + # default to 0 on initialization. Used to pass to registered callbacks. + self._metrics_class = attr.make_class( + "_MetricsEntry", + attrs={x: attr.ib(0) for x in sub_metrics}, + slots=True, + ) + + # Counts number of in flight blocks for a given set of label values + self._registrations = {} + + # Protects access to _registrations + self._lock = threading.Lock() + + self._register_with_collector() + + def register(self, key, callback): + """Registers that we've entered a new block with labels `key`. + + `callback` gets called each time the metrics are collected. The same + value must also be given to `unregister`. + + `callback` gets called with an object that has an attribute per + sub_metric, which should be updated with the necessary values. Note that + the metrics object is shared between all callbacks registered with the + same key. + + Note that `callback` may be called on a separate thread. + """ + with self._lock: + self._registrations.setdefault(key, set()).add(callback) + + def unregister(self, key, callback): + """Registers that we've exited a block with labels `key`. + """ + + with self._lock: + self._registrations.setdefault(key, set()).discard(callback) + + def collect(self): + """Called by prometheus client when it reads metrics. + + Note: may be called by a separate thread. + """ + in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels) + + metrics_by_key = {} + + # We copy so that we don't mutate the list while iterating + with self._lock: + keys = list(self._registrations) + + for key in keys: + with self._lock: + callbacks = set(self._registrations[key]) + + in_flight.add_metric(key, len(callbacks)) + + metrics = self._metrics_class() + metrics_by_key[key] = metrics + for callback in callbacks: + callback(metrics) + + yield in_flight + + for name in self.sub_metrics: + gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels) + for key, metrics in six.iteritems(metrics_by_key): + gauge.add_metric(key, getattr(metrics, name)) + yield gauge + + def _register_with_collector(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + + # # Detailed CPU metrics # diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 55fe701c5c..c1e626be3f 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -196,7 +196,7 @@ class Stream(object): ) if len(rows) >= MAX_EVENTS_BEHIND: - raise Exception("stream %s has fallen behined" % (self.NAME)) + raise Exception("stream %s has fallen behind" % (self.NAME)) else: rows = yield self.update_function( from_token, current_token, diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 808194236a..cfb687cb53 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore): }, retcol="creator", desc="get_room_alias_creator", - allow_none=True ) @cached(max_entries=5000) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 9b3f2f4b96..40c2946129 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -438,3 +438,55 @@ def _cancelled_to_timed_out_error(value, timeout): value.trap(CancelledError) raise DeferredTimeoutError(timeout, "Deferred") return value + + +def timeout_no_seriously(deferred, timeout, reactor): + """The in build twisted deferred addTimeout (and the method above) + completely fail to time things out under some unknown circumstances. + + Lets try a different way of timing things out and maybe that will make + things work?! + + TODO: Kill this with fire. + """ + + new_d = defer.Deferred() + + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + + if not new_d.called: + new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + + deferred.cancel() + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + return _cancelled_to_timed_out_error(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + def success_cb(val): + if not new_d.called: + new_d.callback(val) + + def failure_cb(val): + if not new_d.called: + new_d.errback(val) + + deferred.addCallbacks(success_cb, failure_cb) + + return new_d diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 97f1267380..4b4ac5f6c7 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -20,6 +20,7 @@ from prometheus_client import Counter from twisted.internet import defer +from synapse.metrics import InFlightGauge from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) @@ -45,6 +46,13 @@ block_db_txn_duration = Counter( block_db_sched_duration = Counter( "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) +# Tracks the number of blocks currently active +in_flight = InFlightGauge( + "synapse_util_metrics_block_in_flight", "", + labels=["block_name"], + sub_metrics=["real_time_max", "real_time_sum"], +) + def measure_func(name): def wrapper(func): @@ -82,10 +90,14 @@ class Measure(object): self.start_usage = self.start_context.get_resource_usage() + in_flight.register((self.name,), self._update_in_flight) + def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: return + in_flight.unregister((self.name,), self._update_in_flight) + duration = self.clock.time() - self.start block_counter.labels(self.name).inc() @@ -120,3 +132,13 @@ class Measure(object): if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) + + def _update_in_flight(self, metrics): + """Gets called when processing in flight metrics + """ + duration = self.clock.time() - self.start + + metrics.real_time_max = max(metrics.real_time_max, duration) + metrics.real_time_sum += duration + + # TODO: Add other in flight metrics. |