diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index ef866da1b6..18741c5fac 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -20,7 +20,14 @@ import string
from twisted.internet import defer
from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ CodeMessageException,
+ Codes,
+ NotFoundError,
+ StoreError,
+ SynapseError,
+)
from synapse.types import RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler
@@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler):
def delete_association(self, requester, user_id, room_alias):
# association deletion for human users
- can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+ try:
+ can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown room alias")
+ raise
+
if not can_delete:
raise AuthError(
403, "You don't have permission to delete the alias.",
@@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler):
def _user_can_delete_alias(self, alias, user_id):
creator = yield self.store.get_room_alias_creator(alias.to_string())
- if creator and creator == user_id:
+ if creator is not None and creator == user_id:
defer.returnValue(True)
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5170d093e3..a155b6e938 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -269,14 +269,7 @@ class PaginationHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
-
- if state:
- state = yield filter_events_for_client(
- self.store,
- user_id,
- state.values(),
- is_peeking=(member_event_id is None),
- )
+ state = state.values()
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 1e53f2c635..da914c46ff 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler):
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
+ ratelimit=False,
)
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 58ef8d3ce4..a3f9e4f67c 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
return value
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
- br'\1<redacted>\3',
+ r'\1<redacted>\3',
uri
)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d60f87b04e..ec339a92ad 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -93,7 +93,7 @@ class SimpleHttpClient(object):
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
- logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii')))
+ logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = treq.request(
@@ -108,14 +108,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
- method, redact_uri(uri.encode('ascii')), response.code
+ method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
- method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0]
+ method, redact_uri(uri), type(e).__name__, e.args[0]
)
raise
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c49dbacd93..083484a687 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -42,7 +42,9 @@ from synapse.api.errors import (
)
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
+from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
@@ -91,6 +93,7 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname
reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
+ pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
@@ -221,14 +224,30 @@ class MatrixFederationHttpClient(object):
headers=Headers(headers_dict),
data=data,
agent=self.agent,
- reactor=self.hs.get_reactor()
+ reactor=self.hs.get_reactor(),
+ unbuffered=True
)
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
- response = yield make_deferred_yieldable(
+
+ # Sometimes the timeout above doesn't work, so lets hack yet
+ # another layer of timeouts in in the vain hope that at some
+ # point the world made sense and this really really really
+ # should work.
+ request_deferred = timeout_no_seriously(
request_deferred,
+ timeout=_sec_timeout * 2,
+ reactor=self.hs.get_reactor(),
)
- log_result = "%d %s" % (response.code, response.phrase,)
+ with Measure(self.clock, "outbound_request"):
+ response = yield make_deferred_yieldable(
+ request_deferred,
+ )
+
+ log_result = "%d %s" % (
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 72c2654678..fedb4e6b18 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -162,7 +162,7 @@ class RequestMetrics(object):
with _in_flight_requests_lock:
_in_flight_requests.add(self)
- def stop(self, time_sec, request):
+ def stop(self, time_sec, response_code, sent_bytes):
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
@@ -179,35 +179,35 @@ class RequestMetrics(object):
)
return
- response_code = str(request.code)
+ response_code = str(response_code)
- outgoing_responses_counter.labels(request.method, response_code).inc()
+ outgoing_responses_counter.labels(self.method, response_code).inc()
- response_count.labels(request.method, self.name, tag).inc()
+ response_count.labels(self.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag, response_code).observe(
+ response_timer.labels(self.method, self.name, tag, response_code).observe(
time_sec - self.start
)
resource_usage = context.get_resource_usage()
- response_ru_utime.labels(request.method, self.name, tag).inc(
+ response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime,
)
- response_ru_stime.labels(request.method, self.name, tag).inc(
+ response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime,
)
- response_db_txn_count.labels(request.method, self.name, tag).inc(
+ response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
)
- response_db_txn_duration.labels(request.method, self.name, tag).inc(
+ response_db_txn_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_duration_sec
)
- response_db_sched_duration.labels(request.method, self.name, tag).inc(
+ response_db_sched_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_sched_duration_sec
)
- response_size.labels(request.method, self.name, tag).inc(request.sentLength)
+ response_size.labels(self.method, self.name, tag).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
diff --git a/synapse/http/site.py b/synapse/http/site.py
index f0828c6542..9579e8cd0d 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -82,10 +82,13 @@ class SynapseRequest(Request):
)
def get_request_id(self):
- return "%s-%i" % (self.method, self.request_seq)
+ return "%s-%i" % (self.method.decode('ascii'), self.request_seq)
def get_redacted_uri(self):
- return redact_uri(self.uri)
+ uri = self.uri
+ if isinstance(uri, bytes):
+ uri = self.uri.decode('ascii')
+ return redact_uri(uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -116,7 +119,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
- requests_counter.labels(self.method,
+ requests_counter.labels(self.method.decode('ascii'),
self.request_metrics.name).inc()
@contextlib.contextmanager
@@ -277,15 +280,15 @@ class SynapseRequest(Request):
int(usage.db_txn_count),
self.sentLength,
code,
- self.method,
+ self.method.decode('ascii'),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
user_agent,
usage.evt_db_fetch_count,
)
try:
- self.request_metrics.stop(self.finish_time, self)
+ self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 550f8443f7..59900aa5d1 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,8 +18,11 @@ import gc
import logging
import os
import platform
+import threading
import time
+import six
+
import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
@@ -68,7 +71,7 @@ class LaterGauge(object):
return
if isinstance(calls, dict):
- for k, v in calls.items():
+ for k, v in six.iteritems(calls):
g.add_metric(k, v)
else:
g.add_metric([], calls)
@@ -87,6 +90,109 @@ class LaterGauge(object):
all_gauges[self.name] = self
+class InFlightGauge(object):
+ """Tracks number of things (e.g. requests, Measure blocks, etc) in flight
+ at any given time.
+
+ Each InFlightGauge will create a metric called `<name>_total` that counts
+ the number of in flight blocks, as well as a metrics for each item in the
+ given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
+ callbacks.
+
+ Args:
+ name (str)
+ desc (str)
+ labels (list[str])
+ sub_metrics (list[str]): A list of sub metrics that the callbacks
+ will update.
+ """
+
+ def __init__(self, name, desc, labels, sub_metrics):
+ self.name = name
+ self.desc = desc
+ self.labels = labels
+ self.sub_metrics = sub_metrics
+
+ # Create a class which have the sub_metrics values as attributes, which
+ # default to 0 on initialization. Used to pass to registered callbacks.
+ self._metrics_class = attr.make_class(
+ "_MetricsEntry",
+ attrs={x: attr.ib(0) for x in sub_metrics},
+ slots=True,
+ )
+
+ # Counts number of in flight blocks for a given set of label values
+ self._registrations = {}
+
+ # Protects access to _registrations
+ self._lock = threading.Lock()
+
+ self._register_with_collector()
+
+ def register(self, key, callback):
+ """Registers that we've entered a new block with labels `key`.
+
+ `callback` gets called each time the metrics are collected. The same
+ value must also be given to `unregister`.
+
+ `callback` gets called with an object that has an attribute per
+ sub_metric, which should be updated with the necessary values. Note that
+ the metrics object is shared between all callbacks registered with the
+ same key.
+
+ Note that `callback` may be called on a separate thread.
+ """
+ with self._lock:
+ self._registrations.setdefault(key, set()).add(callback)
+
+ def unregister(self, key, callback):
+ """Registers that we've exited a block with labels `key`.
+ """
+
+ with self._lock:
+ self._registrations.setdefault(key, set()).discard(callback)
+
+ def collect(self):
+ """Called by prometheus client when it reads metrics.
+
+ Note: may be called by a separate thread.
+ """
+ in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+
+ metrics_by_key = {}
+
+ # We copy so that we don't mutate the list while iterating
+ with self._lock:
+ keys = list(self._registrations)
+
+ for key in keys:
+ with self._lock:
+ callbacks = set(self._registrations[key])
+
+ in_flight.add_metric(key, len(callbacks))
+
+ metrics = self._metrics_class()
+ metrics_by_key[key] = metrics
+ for callback in callbacks:
+ callback(metrics)
+
+ yield in_flight
+
+ for name in self.sub_metrics:
+ gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+ for key, metrics in six.iteritems(metrics_by_key):
+ gauge.add_metric(key, getattr(metrics, name))
+ yield gauge
+
+ def _register_with_collector(self):
+ if self.name in all_gauges.keys():
+ logger.warning("%s already registered, reregistering" % (self.name,))
+ REGISTRY.unregister(all_gauges.pop(self.name))
+
+ REGISTRY.register(self)
+ all_gauges[self.name] = self
+
+
#
# Detailed CPU metrics
#
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 55fe701c5c..c1e626be3f 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -196,7 +196,7 @@ class Stream(object):
)
if len(rows) >= MAX_EVENTS_BEHIND:
- raise Exception("stream %s has fallen behined" % (self.NAME))
+ raise Exception("stream %s has fallen behind" % (self.NAME))
else:
rows = yield self.update_function(
from_token, current_token,
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 808194236a..cfb687cb53 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore):
},
retcol="creator",
desc="get_room_alias_creator",
- allow_none=True
)
@cached(max_entries=5000)
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 9b3f2f4b96..40c2946129 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -438,3 +438,55 @@ def _cancelled_to_timed_out_error(value, timeout):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value
+
+
+def timeout_no_seriously(deferred, timeout, reactor):
+ """The in build twisted deferred addTimeout (and the method above)
+ completely fail to time things out under some unknown circumstances.
+
+ Lets try a different way of timing things out and maybe that will make
+ things work?!
+
+ TODO: Kill this with fire.
+ """
+
+ new_d = defer.Deferred()
+
+ timed_out = [False]
+
+ def time_it_out():
+ timed_out[0] = True
+
+ if not new_d.called:
+ new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+
+ deferred.cancel()
+
+ delayed_call = reactor.callLater(timeout, time_it_out)
+
+ def convert_cancelled(value):
+ if timed_out[0]:
+ return _cancelled_to_timed_out_error(value, timeout)
+ return value
+
+ deferred.addBoth(convert_cancelled)
+
+ def cancel_timeout(result):
+ # stop the pending call to cancel the deferred if it's been fired
+ if delayed_call.active():
+ delayed_call.cancel()
+ return result
+
+ deferred.addBoth(cancel_timeout)
+
+ def success_cb(val):
+ if not new_d.called:
+ new_d.callback(val)
+
+ def failure_cb(val):
+ if not new_d.called:
+ new_d.errback(val)
+
+ deferred.addCallbacks(success_cb, failure_cb)
+
+ return new_d
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 97f1267380..4b4ac5f6c7 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -20,6 +20,7 @@ from prometheus_client import Counter
from twisted.internet import defer
+from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
@@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
+# Tracks the number of blocks currently active
+in_flight = InFlightGauge(
+ "synapse_util_metrics_block_in_flight", "",
+ labels=["block_name"],
+ sub_metrics=["real_time_max", "real_time_sum"],
+)
+
def measure_func(name):
def wrapper(func):
@@ -82,10 +90,14 @@ class Measure(object):
self.start_usage = self.start_context.get_resource_usage()
+ in_flight.register((self.name,), self._update_in_flight)
+
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return
+ in_flight.unregister((self.name,), self._update_in_flight)
+
duration = self.clock.time() - self.start
block_counter.labels(self.name).inc()
@@ -120,3 +132,13 @@ class Measure(object):
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
+
+ def _update_in_flight(self, metrics):
+ """Gets called when processing in flight metrics
+ """
+ duration = self.clock.time() - self.start
+
+ metrics.real_time_max = max(metrics.real_time_max, duration)
+ metrics.real_time_sum += duration
+
+ # TODO: Add other in flight metrics.
|