diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 32 | ||||
-rw-r--r-- | synapse/util/async.py | 47 | ||||
-rw-r--r-- | synapse/util/caches/__init__.py | 6 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 22 | ||||
-rw-r--r-- | synapse/util/caches/dictionary_cache.py | 34 | ||||
-rw-r--r-- | synapse/util/caches/expiringcache.py | 11 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 20 | ||||
-rw-r--r-- | synapse/util/distributor.py | 48 | ||||
-rw-r--r-- | synapse/util/file_consumer.py | 20 | ||||
-rw-r--r-- | synapse/util/frozenutils.py | 6 | ||||
-rw-r--r-- | synapse/util/httpresourcetree.py | 4 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 176 | ||||
-rw-r--r-- | synapse/util/logformatter.py | 3 | ||||
-rw-r--r-- | synapse/util/logutils.py | 8 | ||||
-rw-r--r-- | synapse/util/manhole.py | 6 | ||||
-rw-r--r-- | synapse/util/metrics.py | 33 | ||||
-rw-r--r-- | synapse/util/msisdn.py | 1 | ||||
-rw-r--r-- | synapse/util/ratelimitutils.py | 47 | ||||
-rw-r--r-- | synapse/util/retryutils.py | 9 | ||||
-rw-r--r-- | synapse/util/rlimit.py | 3 | ||||
-rw-r--r-- | synapse/util/stringutils.py | 1 | ||||
-rw-r--r-- | synapse/util/versionstring.py | 4 |
23 files changed, 332 insertions, 211 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fc11e26623..680ea928c7 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext +import logging +from itertools import islice -from twisted.internet import defer, reactor, task +import attr -import time -import logging +from twisted.internet import defer, task -from itertools import islice +from synapse.util.logcontext import PreserveLoggingContext logger = logging.getLogger(__name__) @@ -31,16 +31,27 @@ def unwrapFirstError(failure): return failure.value.subFailure +@attr.s class Clock(object): - """A small utility that obtains current time-of-day so that time may be - mocked during unit-tests. + """ + A Clock wraps a Twisted reactor and provides utilities on top of it. - TODO(paul): Also move the sleep() functionality into it + Args: + reactor: The Twisted reactor to use. """ + _reactor = attr.ib() + + @defer.inlineCallbacks + def sleep(self, seconds): + d = defer.Deferred() + with PreserveLoggingContext(): + self._reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def time(self): """Returns the current system time in seconds since epoch.""" - return time.time() + return self._reactor.seconds() def time_msec(self): """Returns the current system time in miliseconds since epoch.""" @@ -56,6 +67,7 @@ class Clock(object): msec(float): How long to wait between calls in milliseconds. """ call = task.LoopingCall(f) + call.clock = self._reactor call.start(msec / 1000.0, now=False) return call @@ -73,7 +85,7 @@ class Clock(object): callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer, ignore_errs=False): try: diff --git a/synapse/util/async.py b/synapse/util/async.py index 9dd4e6b5bc..5d0fb39130 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -13,41 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +from contextlib import contextmanager -from twisted.internet import defer, reactor +from six.moves import range + +from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure +from synapse.util import Clock, logcontext, unwrapFirstError + from .logcontext import ( - PreserveLoggingContext, make_deferred_yieldable, run_in_background + PreserveLoggingContext, + make_deferred_yieldable, + run_in_background, ) -from synapse.util import logcontext, unwrapFirstError - -from contextlib import contextmanager - -import logging - -from six.moves import range logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def sleep(seconds): - d = defer.Deferred() - with PreserveLoggingContext(): - reactor.callLater(seconds, d.callback, seconds) - res = yield d - defer.returnValue(res) - - -def run_on_reactor(): - """ This will cause the rest of the function to be invoked upon the next - iteration of the main loop - """ - return sleep(0) - - class ObservableDeferred(object): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original @@ -180,13 +165,18 @@ class Linearizer(object): # do some work. """ - def __init__(self, name=None): + def __init__(self, name=None, clock=None): if name is None: self.name = id(self) else: self.name = name self.key_to_defer = {} + if not clock: + from twisted.internet import reactor + clock = Clock(reactor) + self._clock = clock + @defer.inlineCallbacks def queue(self, key): # If there is already a deferred in the queue, we pull it out so that @@ -227,7 +217,7 @@ class Linearizer(object): # the context manager, but it needs to happen while we hold the # lock, and the context manager's exit code must be synchronous, # so actually this is the only sensible place. - yield run_on_reactor() + yield self._clock.sleep(0) else: logger.info("Acquired uncontended linearizer lock %r for key %r", @@ -404,7 +394,7 @@ class DeferredTimeoutError(Exception): """ -def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): +def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): """ Add a timeout to a deferred by scheduling it to be cancelled after timeout seconds. @@ -419,6 +409,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None): Args: deferred (defer.Deferred): deferred to be timed out timeout (Number): seconds to time out after + reactor (twisted.internet.reactor): the Twisted reactor to use on_timeout_cancel (callable): A callable which is called immediately after the deferred times out, and not if this deferred is diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 900575eb3c..7b065b195e 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily - import os -from six.moves import intern import six +from six.moves import intern + +from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 65a1042de1..f8a07df6b8 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -13,10 +13,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import functools +import inspect import logging +import threading +from collections import namedtuple +import six +from six import itervalues, string_types + +from twisted.internet import defer + +from synapse.util import logcontext, unwrapFirstError from synapse.util.async import ObservableDeferred -from synapse.util import unwrapFirstError, logcontext from synapse.util.caches import get_cache_factor_for from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry @@ -24,17 +33,6 @@ from synapse.util.stringutils import to_ascii from . import register_cache -from twisted.internet import defer -from collections import namedtuple - -import functools -import inspect -import threading - -from six import string_types, itervalues -import six - - logger = logging.getLogger(__name__) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index bdc21e348f..6c0b5a4094 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -13,12 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches.lrucache import LruCache -from collections import namedtuple -from . import register_cache -import threading import logging +import threading +from collections import namedtuple +from synapse.util.caches.lrucache import LruCache + +from . import register_cache logger = logging.getLogger(__name__) @@ -107,29 +108,28 @@ class DictionaryCache(object): self.sequence += 1 self.cache.clear() - def update(self, sequence, key, value, full=False, known_absent=None): + def update(self, sequence, key, value, fetched_keys=None): """Updates the entry in the cache Args: sequence - key - value (dict): The value to update the cache with. - full (bool): Whether the given value is the full dict, or just a - partial subset there of. If not full then any existing entries - for the key will be updated. - known_absent (set): Set of keys that we know don't exist in the full - dict. + key (K) + value (dict[X,Y]): The value to update the cache with. + fetched_keys (None|set[X]): All of the dictionary keys which were + fetched from the database. + + If None, this is the complete value for key K. Otherwise, it + is used to infer a list of keys which we know don't exist in + the full dict. """ self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) - if known_absent is None: - known_absent = set() - if full: - self._insert(key, value, known_absent) + if fetched_keys is None: + self._insert(key, value, set()) else: - self._update_or_insert(key, value, known_absent) + self._update_or_insert(key, value, fetched_keys) def _update_or_insert(self, key, value, known_absent): # We pop and reinsert as we need to tell the cache the size may have diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index ff04c91955..465adc54a8 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import register_cache - -from collections import OrderedDict import logging +from collections import OrderedDict +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.caches import register_cache logger = logging.getLogger(__name__) @@ -64,7 +64,10 @@ class ExpiringCache(object): return def f(): - self._prune_cache() + run_as_background_process( + "prune_cache_%s" % self._cache_name, + self._prune_cache, + ) self._clock.looping_call(f, self._expiry_ms / 2) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 1c5a982094..b684f24e7b 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -14,8 +14,8 @@ # limitations under the License. -from functools import wraps import threading +from functools import wraps from synapse.util.caches.treecache import TreeCache diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 817118e30f..f2bde74dc5 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,12 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util import caches - +import logging from sortedcontainers import SortedDict -import logging +from synapse.util import caches logger = logging.getLogger(__name__) @@ -75,13 +74,13 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - not_known_entities = set(entities) - set(self._entity_to_key) + changed_entities = { + self._cache[k] for k in self._cache.islice( + start=self._cache.bisect_right(stream_pos), + ) + } - result = ( - set(self._cache.values()[self._cache.bisect_right(stream_pos) :]) - .intersection(entities) - .union(not_known_entities) - ) + result = changed_entities.intersection(entities) self.metrics.inc_hits() else: @@ -113,7 +112,8 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos >= self._earliest_known_stream_pos: - return self._cache.values()[self._cache.bisect_right(stream_pos) :] + return [self._cache[k] for k in self._cache.islice( + start=self._cache.bisect_right(stream_pos))] else: return None diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 734331caaa..194da87639 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -17,20 +17,18 @@ import logging from twisted.internet import defer -from synapse.util import unwrapFirstError -from synapse.util.logcontext import PreserveLoggingContext +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) def user_left_room(distributor, user, room_id): - with PreserveLoggingContext(): - distributor.fire("user_left_room", user=user, room_id=room_id) + distributor.fire("user_left_room", user=user, room_id=room_id) def user_joined_room(distributor, user, room_id): - with PreserveLoggingContext(): - distributor.fire("user_joined_room", user=user, room_id=room_id) + distributor.fire("user_joined_room", user=user, room_id=room_id) class Distributor(object): @@ -44,9 +42,7 @@ class Distributor(object): model will do for today. """ - def __init__(self, suppress_failures=True): - self.suppress_failures = suppress_failures - + def __init__(self): self.signals = {} self.pre_registration = {} @@ -56,7 +52,6 @@ class Distributor(object): self.signals[name] = Signal( name, - suppress_failures=self.suppress_failures, ) if name in self.pre_registration: @@ -75,10 +70,18 @@ class Distributor(object): self.pre_registration[name].append(observer) def fire(self, name, *args, **kwargs): + """Dispatches the given signal to the registered observers. + + Runs the observers as a background process. Does not return a deferred. + """ if name not in self.signals: raise KeyError("%r does not have a signal named %s" % (self, name)) - return self.signals[name].fire(*args, **kwargs) + run_as_background_process( + name, + self.signals[name].fire, + *args, **kwargs + ) class Signal(object): @@ -91,9 +94,8 @@ class Signal(object): method into all of the observers. """ - def __init__(self, name, suppress_failures): + def __init__(self, name): self.name = name - self.suppress_failures = suppress_failures self.observers = [] def observe(self, observer): @@ -103,7 +105,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -121,22 +122,17 @@ class Signal(object): failure.type, failure.value, failure.getTracebackObject())) - if not self.suppress_failures: - return failure return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) - with PreserveLoggingContext(): - deferreds = [ - do(observer) - for observer in self.observers - ] - - res = yield defer.gatherResults( - deferreds, consumeErrors=True - ).addErrback(unwrapFirstError) + deferreds = [ + run_in_background(do, o) + for o in self.observers + ] - defer.returnValue(res) + return make_deferred_yieldable(defer.gatherResults( + deferreds, consumeErrors=True, + )) def __repr__(self): return "<Signal name=%r>" % (self.name,) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 3380970e4e..629ed44149 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import threads, reactor +from six.moves import queue -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from twisted.internet import threads -from six.moves import queue +from synapse.util.logcontext import make_deferred_yieldable, run_in_background class BackgroundFileConsumer(object): @@ -27,6 +27,7 @@ class BackgroundFileConsumer(object): Args: file_obj (file): The file like object to write to. Closed when finished. + reactor (twisted.internet.reactor): the Twisted reactor to use """ # For PushProducers pause if we have this many unwritten slices @@ -34,9 +35,11 @@ class BackgroundFileConsumer(object): # And resume once the size of the queue is less than this _RESUME_ON_QUEUE_SIZE = 2 - def __init__(self, file_obj): + def __init__(self, file_obj, reactor): self._file_obj = file_obj + self._reactor = reactor + # Producer we're registered with self._producer = None @@ -71,7 +74,10 @@ class BackgroundFileConsumer(object): self._producer = producer self.streaming = streaming self._finished_deferred = run_in_background( - threads.deferToThread, self._writer + threads.deferToThreadPool, + self._reactor, + self._reactor.getThreadPool(), + self._writer, ) if not streaming: self._producer.resumeProducing() @@ -109,7 +115,7 @@ class BackgroundFileConsumer(object): # producer. if self._producer and self._paused_producer: if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: - reactor.callFromThread(self._resume_paused_producer) + self._reactor.callFromThread(self._resume_paused_producer) bytes = self._bytes_queue.get() @@ -121,7 +127,7 @@ class BackgroundFileConsumer(object): # If its a pull producer then we need to explicitly ask for # more stuff. if not self.streaming and self._producer: - reactor.callFromThread(self._producer.resumeProducing) + self._reactor.callFromThread(self._producer.resumeProducing) except Exception as e: self._write_exception = e raise diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 15f0a7ba9e..581c6052ac 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from frozendict import frozendict -import simplejson as json - from six import string_types +from canonicaljson import json +from frozendict import frozendict + def freeze(o): if isinstance(o, dict): diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py index e9f0f292ee..2d7ddc1cbe 100644 --- a/synapse/util/httpresourcetree.py +++ b/synapse/util/httpresourcetree.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.web.resource import NoResource - import logging +from twisted.web.resource import NoResource + logger = logging.getLogger(__name__) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a58c723403..f6c7175f74 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -22,10 +22,10 @@ them. See doc/log_contexts.rst for details on how this works. """ -from twisted.internet import defer - -import threading import logging +import threading + +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -49,17 +49,107 @@ except Exception: return None +class ContextResourceUsage(object): + """Object for tracking the resources used by a log context + + Attributes: + ru_utime (float): user CPU time (in seconds) + ru_stime (float): system CPU time (in seconds) + db_txn_count (int): number of database transactions done + db_sched_duration_sec (float): amount of time spent waiting for a + database connection + db_txn_duration_sec (float): amount of time spent doing database + transactions (excluding scheduling time) + evt_db_fetch_count (int): number of events requested from the database + """ + + __slots__ = [ + "ru_stime", "ru_utime", + "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", + "evt_db_fetch_count", + ] + + def __init__(self, copy_from=None): + """Create a new ContextResourceUsage + + Args: + copy_from (ContextResourceUsage|None): if not None, an object to + copy stats from + """ + if copy_from is None: + self.reset() + else: + self.ru_utime = copy_from.ru_utime + self.ru_stime = copy_from.ru_stime + self.db_txn_count = copy_from.db_txn_count + + self.db_txn_duration_sec = copy_from.db_txn_duration_sec + self.db_sched_duration_sec = copy_from.db_sched_duration_sec + self.evt_db_fetch_count = copy_from.evt_db_fetch_count + + def copy(self): + return ContextResourceUsage(copy_from=self) + + def reset(self): + self.ru_stime = 0. + self.ru_utime = 0. + self.db_txn_count = 0 + + self.db_txn_duration_sec = 0 + self.db_sched_duration_sec = 0 + self.evt_db_fetch_count = 0 + + def __iadd__(self, other): + """Add another ContextResourceUsage's stats to this one's. + + Args: + other (ContextResourceUsage): the other resource usage object + """ + self.ru_utime += other.ru_utime + self.ru_stime += other.ru_stime + self.db_txn_count += other.db_txn_count + self.db_txn_duration_sec += other.db_txn_duration_sec + self.db_sched_duration_sec += other.db_sched_duration_sec + self.evt_db_fetch_count += other.evt_db_fetch_count + return self + + def __isub__(self, other): + self.ru_utime -= other.ru_utime + self.ru_stime -= other.ru_stime + self.db_txn_count -= other.db_txn_count + self.db_txn_duration_sec -= other.db_txn_duration_sec + self.db_sched_duration_sec -= other.db_sched_duration_sec + self.evt_db_fetch_count -= other.evt_db_fetch_count + return self + + def __add__(self, other): + res = ContextResourceUsage(copy_from=self) + res += other + return res + + def __sub__(self, other): + res = ContextResourceUsage(copy_from=self) + res -= other + return res + + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a "with" block. + If a parent is given when creating a new context, then: + - logging fields are copied from the parent to the new context on entry + - when the new context exits, the cpu usage stats are copied from the + child to the parent + Args: name (str): Name for the context for debugging. + parent_context (LoggingContext|None): The parent of the new context """ __slots__ = [ - "previous_context", "name", "ru_stime", "ru_utime", - "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", + "previous_context", "name", "parent_context", + "_resource_usage", "usage_start", "main_thread", "alive", "request", "tag", @@ -90,24 +180,21 @@ class LoggingContext(object): def add_database_scheduled(self, sched_sec): pass + def record_event_fetch(self, event_count): + pass + def __nonzero__(self): return False __bool__ = __nonzero__ # python3 sentinel = Sentinel() - def __init__(self, name=None): + def __init__(self, name=None, parent_context=None): self.previous_context = LoggingContext.current_context() self.name = name - self.ru_stime = 0. - self.ru_utime = 0. - self.db_txn_count = 0 - - # sec spent waiting for db txns, excluding scheduling time - self.db_txn_duration_sec = 0 - # sec spent waiting for db txns to be scheduled - self.db_sched_duration_sec = 0 + # track the resources used by this context so far + self._resource_usage = ContextResourceUsage() # If alive has the thread resource usage when the logcontext last # became active. @@ -118,6 +205,8 @@ class LoggingContext(object): self.tag = "" self.alive = True + self.parent_context = parent_context + def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -155,6 +244,10 @@ class LoggingContext(object): self.previous_context, old_context ) self.alive = True + + if self.parent_context is not None: + self.parent_context.copy_to(self) + return self def __exit__(self, type, value, traceback): @@ -176,6 +269,13 @@ class LoggingContext(object): self.previous_context = None self.alive = False + # if we have a parent, pass our CPU usage stats on + if self.parent_context is not None: + self.parent_context._resource_usage += self._resource_usage + + # reset them in case we get entered again + self._resource_usage.reset() + def copy_to(self, record): """Copy logging fields from this context to a log record or another LoggingContext @@ -200,39 +300,43 @@ class LoggingContext(object): logger.warning("Stopped logcontext %s on different thread", self) return - # When we stop, let's record the resource used since we started - if self.usage_start: - usage_end = get_thread_resource_usage() + # When we stop, let's record the cpu used since we started + if not self.usage_start: + logger.warning( + "Called stop on logcontext %s without calling start", self, + ) + return + + usage_end = get_thread_resource_usage() - self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime - self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime + self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime + self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime - self.usage_start = None - else: - logger.warning("Called stop on logcontext %s without calling start", self) + self.usage_start = None def get_resource_usage(self): - """Get CPU time used by this logcontext so far. + """Get resources used by this logcontext so far. Returns: - tuple[float, float]: The user and system CPU usage in seconds + ContextResourceUsage: a *copy* of the object tracking resource + usage so far """ - ru_utime = self.ru_utime - ru_stime = self.ru_stime + # we always return a copy, for consistency + res = self._resource_usage.copy() # If we are on the correct thread and we're currently running then we # can include resource usage so far. is_main_thread = threading.current_thread() is self.main_thread if self.alive and self.usage_start and is_main_thread: current = get_thread_resource_usage() - ru_utime += current.ru_utime - self.usage_start.ru_utime - ru_stime += current.ru_stime - self.usage_start.ru_stime + res.ru_utime += current.ru_utime - self.usage_start.ru_utime + res.ru_stime += current.ru_stime - self.usage_start.ru_stime - return ru_utime, ru_stime + return res def add_database_transaction(self, duration_sec): - self.db_txn_count += 1 - self.db_txn_duration_sec += duration_sec + self._resource_usage.db_txn_count += 1 + self._resource_usage.db_txn_duration_sec += duration_sec def add_database_scheduled(self, sched_sec): """Record a use of the database pool @@ -241,7 +345,15 @@ class LoggingContext(object): sched_sec (float): number of seconds it took us to get a connection """ - self.db_sched_duration_sec += sched_sec + self._resource_usage.db_sched_duration_sec += sched_sec + + def record_event_fetch(self, event_count): + """Record a number of events being fetched from the db + + Args: + event_count (int): number of events being fetched + """ + self._resource_usage.evt_db_fetch_count += event_count class LoggingContextFilter(logging.Filter): diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py index 3e42868ea9..a46bc47ce3 100644 --- a/synapse/util/logformatter.py +++ b/synapse/util/logformatter.py @@ -14,10 +14,11 @@ # limitations under the License. -from six import StringIO import logging import traceback +from six import StringIO + class LogFormatter(logging.Formatter): """Log formatter which gives more detail for exceptions diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 03249c5dc8..62a00189cc 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -14,13 +14,11 @@ # limitations under the License. -from inspect import getcallargs -from functools import wraps - -import logging import inspect +import logging import time - +from functools import wraps +from inspect import getcallargs _TIME_FUNC_ID = 0 diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 97e0f00b67..14be3c7396 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.conch.manhole import ColoredManhole -from twisted.conch.insults import insults from twisted.conch import manhole_ssh -from twisted.cred import checkers, portal +from twisted.conch.insults import insults +from twisted.conch.manhole import ColoredManhole from twisted.conch.ssh.keys import Key +from twisted.cred import checkers, portal PUBLIC_KEY = ( "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 1ba7d65c7c..6ba7107896 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging +from functools import wraps from prometheus_client import Counter -from synapse.util.logcontext import LoggingContext -from functools import wraps -import logging +from twisted.internet import defer +from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) @@ -60,10 +60,9 @@ def measure_func(name): class Measure(object): __slots__ = [ - "clock", "name", "start_context", "start", "new_context", "ru_utime", - "ru_stime", - "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec", + "clock", "name", "start_context", "start", "created_context", + "start_usage", ] def __init__(self, clock, name): @@ -81,10 +80,7 @@ class Measure(object): self.start_context.__enter__() self.created_context = True - self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() - self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration_sec = self.start_context.db_txn_duration_sec - self.db_sched_duration_sec = self.start_context.db_sched_duration_sec + self.start_usage = self.start_context.get_resource_usage() def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: @@ -108,15 +104,12 @@ class Measure(object): logger.warn("Expected context. (%r)", self.name) return - ru_utime, ru_stime = context.get_resource_usage() - - block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime) - block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime) - block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count) - block_db_txn_duration.labels(self.name).inc( - context.db_txn_duration_sec - self.db_txn_duration_sec) - block_db_sched_duration.labels(self.name).inc( - context.db_sched_duration_sec - self.db_sched_duration_sec) + usage = context.get_resource_usage() - self.start_usage + block_ru_utime.labels(self.name).inc(usage.ru_utime) + block_ru_stime.labels(self.name).inc(usage.ru_stime) + block_db_txn_count.labels(self.name).inc(usage.db_txn_count) + block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) + block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py index 607161e7f0..a6c30e5265 100644 --- a/synapse/util/msisdn.py +++ b/synapse/util/msisdn.py @@ -14,6 +14,7 @@ # limitations under the License. import phonenumbers + from synapse.api.errors import SynapseError diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 0ab63c3d7d..7deb38f2a7 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -13,21 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections +import contextlib +import logging + from twisted.internet import defer from synapse.api.errors import LimitExceededError - -from synapse.util.async import sleep from synapse.util.logcontext import ( - run_in_background, make_deferred_yieldable, PreserveLoggingContext, + make_deferred_yieldable, + run_in_background, ) -import collections -import contextlib -import logging - - logger = logging.getLogger(__name__) @@ -94,13 +92,22 @@ class _PerHostRatelimiter(object): self.window_size = window_size self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec + self.sleep_sec = sleep_msec / 1000.0 self.reject_limit = reject_limit self.concurrent_requests = concurrent_requests + # request_id objects for requests which have been slept self.sleeping_requests = set() + + # map from request_id object to Deferred for requests which are ready + # for processing but have been queued self.ready_request_queue = collections.OrderedDict() + + # request id objects for requests which are in progress self.current_processing = set() + + # times at which we have recently (within the last window_size ms) + # received requests. self.request_times = [] @contextlib.contextmanager @@ -119,11 +126,15 @@ class _PerHostRatelimiter(object): def _on_enter(self, request_id): time_now = self.clock.time_msec() + + # remove any entries from request_times which aren't within the window self.request_times[:] = [ r for r in self.request_times if time_now - r < self.window_size ] + # reject the request if we already have too many queued up (either + # sleeping or in the ready queue). queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) if queue_size > self.reject_limit: raise LimitExceededError( @@ -136,9 +147,13 @@ class _PerHostRatelimiter(object): def queue_request(): if len(self.current_processing) > self.concurrent_requests: - logger.debug("Ratelimit [%s]: Queue req", id(request_id)) queue_defer = defer.Deferred() self.ready_request_queue[request_id] = queue_defer + logger.info( + "Ratelimiter: queueing request (queue now %i items)", + len(self.ready_request_queue), + ) + return queue_defer else: return defer.succeed(None) @@ -150,10 +165,9 @@ class _PerHostRatelimiter(object): if len(self.request_times) > self.sleep_limit: logger.debug( - "Ratelimit [%s]: sleeping req", - id(request_id), + "Ratelimiter: sleeping request for %f sec", self.sleep_sec, ) - ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0) + ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) self.sleeping_requests.add(request_id) @@ -202,11 +216,8 @@ class _PerHostRatelimiter(object): ) self.current_processing.discard(request_id) try: - request_id, deferred = self.ready_request_queue.popitem() - - # XXX: why do we do the following? the on_start callback above will - # do it for us. - self.current_processing.add(request_id) + # start processing the next item on the queue. + _, deferred = self.ready_request_queue.popitem(last=False) with PreserveLoggingContext(): deferred.callback(None) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 4e93f69d3a..8a3a06fd74 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -12,14 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.logcontext -from twisted.internet import defer - -from synapse.api.errors import CodeMessageException - import logging import random +from twisted.internet import defer + +import synapse.util.logcontext +from synapse.api.errors import CodeMessageException logger = logging.getLogger(__name__) diff --git a/synapse/util/rlimit.py b/synapse/util/rlimit.py index f4a9abf83f..6c0f2bb0cf 100644 --- a/synapse/util/rlimit.py +++ b/synapse/util/rlimit.py @@ -13,9 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import resource import logging - +import resource logger = logging.getLogger("synapse.app.homeserver") diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index b98b9dc6e4..43d9db67ec 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -15,6 +15,7 @@ import random import string + from six.moves import range _string_with_symbols = ( diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index 52086df465..1fbcd41115 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -14,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import subprocess -import os import logging +import os +import subprocess logger = logging.getLogger(__name__) |