diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index fc11e26623..680ea928c7 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.logcontext import PreserveLoggingContext
+import logging
+from itertools import islice
-from twisted.internet import defer, reactor, task
+import attr
-import time
-import logging
+from twisted.internet import defer, task
-from itertools import islice
+from synapse.util.logcontext import PreserveLoggingContext
logger = logging.getLogger(__name__)
@@ -31,16 +31,27 @@ def unwrapFirstError(failure):
return failure.value.subFailure
+@attr.s
class Clock(object):
- """A small utility that obtains current time-of-day so that time may be
- mocked during unit-tests.
+ """
+ A Clock wraps a Twisted reactor and provides utilities on top of it.
- TODO(paul): Also move the sleep() functionality into it
+ Args:
+ reactor: The Twisted reactor to use.
"""
+ _reactor = attr.ib()
+
+ @defer.inlineCallbacks
+ def sleep(self, seconds):
+ d = defer.Deferred()
+ with PreserveLoggingContext():
+ self._reactor.callLater(seconds, d.callback, seconds)
+ res = yield d
+ defer.returnValue(res)
def time(self):
"""Returns the current system time in seconds since epoch."""
- return time.time()
+ return self._reactor.seconds()
def time_msec(self):
"""Returns the current system time in miliseconds since epoch."""
@@ -56,6 +67,7 @@ class Clock(object):
msec(float): How long to wait between calls in milliseconds.
"""
call = task.LoopingCall(f)
+ call.clock = self._reactor
call.start(msec / 1000.0, now=False)
return call
@@ -73,7 +85,7 @@ class Clock(object):
callback(*args, **kwargs)
with PreserveLoggingContext():
- return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
+ return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer, ignore_errs=False):
try:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..5d0fb39130 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -13,41 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+from contextlib import contextmanager
-from twisted.internet import defer, reactor
+from six.moves import range
+
+from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.python import failure
+from synapse.util import Clock, logcontext, unwrapFirstError
+
from .logcontext import (
- PreserveLoggingContext, make_deferred_yieldable, run_in_background
+ PreserveLoggingContext,
+ make_deferred_yieldable,
+ run_in_background,
)
-from synapse.util import logcontext, unwrapFirstError
-
-from contextlib import contextmanager
-
-import logging
-
-from six.moves import range
logger = logging.getLogger(__name__)
-@defer.inlineCallbacks
-def sleep(seconds):
- d = defer.Deferred()
- with PreserveLoggingContext():
- reactor.callLater(seconds, d.callback, seconds)
- res = yield d
- defer.returnValue(res)
-
-
-def run_on_reactor():
- """ This will cause the rest of the function to be invoked upon the next
- iteration of the main loop
- """
- return sleep(0)
-
-
class ObservableDeferred(object):
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
@@ -180,13 +165,18 @@ class Linearizer(object):
# do some work.
"""
- def __init__(self, name=None):
+ def __init__(self, name=None, clock=None):
if name is None:
self.name = id(self)
else:
self.name = name
self.key_to_defer = {}
+ if not clock:
+ from twisted.internet import reactor
+ clock = Clock(reactor)
+ self._clock = clock
+
@defer.inlineCallbacks
def queue(self, key):
# If there is already a deferred in the queue, we pull it out so that
@@ -227,7 +217,7 @@ class Linearizer(object):
# the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place.
- yield run_on_reactor()
+ yield self._clock.sleep(0)
else:
logger.info("Acquired uncontended linearizer lock %r for key %r",
@@ -404,7 +394,7 @@ class DeferredTimeoutError(Exception):
"""
-def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
"""
Add a timeout to a deferred by scheduling it to be cancelled after
timeout seconds.
@@ -419,6 +409,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
Args:
deferred (defer.Deferred): deferred to be timed out
timeout (Number): seconds to time out after
+ reactor (twisted.internet.reactor): the Twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 900575eb3c..7b065b195e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,12 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
-
import os
-from six.moves import intern
import six
+from six.moves import intern
+
+from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 65a1042de1..f8a07df6b8 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -13,10 +13,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import functools
+import inspect
import logging
+import threading
+from collections import namedtuple
+import six
+from six import itervalues, string_types
+
+from twisted.internet import defer
+
+from synapse.util import logcontext, unwrapFirstError
from synapse.util.async import ObservableDeferred
-from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -24,17 +33,6 @@ from synapse.util.stringutils import to_ascii
from . import register_cache
-from twisted.internet import defer
-from collections import namedtuple
-
-import functools
-import inspect
-import threading
-
-from six import string_types, itervalues
-import six
-
-
logger = logging.getLogger(__name__)
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index bdc21e348f..6c0b5a4094 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -13,12 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches.lrucache import LruCache
-from collections import namedtuple
-from . import register_cache
-import threading
import logging
+import threading
+from collections import namedtuple
+from synapse.util.caches.lrucache import LruCache
+
+from . import register_cache
logger = logging.getLogger(__name__)
@@ -107,29 +108,28 @@ class DictionaryCache(object):
self.sequence += 1
self.cache.clear()
- def update(self, sequence, key, value, full=False, known_absent=None):
+ def update(self, sequence, key, value, fetched_keys=None):
"""Updates the entry in the cache
Args:
sequence
- key
- value (dict): The value to update the cache with.
- full (bool): Whether the given value is the full dict, or just a
- partial subset there of. If not full then any existing entries
- for the key will be updated.
- known_absent (set): Set of keys that we know don't exist in the full
- dict.
+ key (K)
+ value (dict[X,Y]): The value to update the cache with.
+ fetched_keys (None|set[X]): All of the dictionary keys which were
+ fetched from the database.
+
+ If None, this is the complete value for key K. Otherwise, it
+ is used to infer a list of keys which we know don't exist in
+ the full dict.
"""
self.check_thread()
if self.sequence == sequence:
# Only update the cache if the caches sequence number matches the
# number that the cache had before the SELECT was started (SYN-369)
- if known_absent is None:
- known_absent = set()
- if full:
- self._insert(key, value, known_absent)
+ if fetched_keys is None:
+ self._insert(key, value, set())
else:
- self._update_or_insert(key, value, known_absent)
+ self._update_or_insert(key, value, fetched_keys)
def _update_or_insert(self, key, value, known_absent):
# We pop and reinsert as we need to tell the cache the size may have
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ff04c91955..465adc54a8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import register_cache
-
-from collections import OrderedDict
import logging
+from collections import OrderedDict
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
@@ -64,7 +64,10 @@ class ExpiringCache(object):
return
def f():
- self._prune_cache()
+ run_as_background_process(
+ "prune_cache_%s" % self._cache_name,
+ self._prune_cache,
+ )
self._clock.looping_call(f, self._expiry_ms / 2)
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 1c5a982094..b684f24e7b 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -14,8 +14,8 @@
# limitations under the License.
-from functools import wraps
import threading
+from functools import wraps
from synapse.util.caches.treecache import TreeCache
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 817118e30f..f2bde74dc5 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,12 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util import caches
-
+import logging
from sortedcontainers import SortedDict
-import logging
+from synapse.util import caches
logger = logging.getLogger(__name__)
@@ -75,13 +74,13 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- not_known_entities = set(entities) - set(self._entity_to_key)
+ changed_entities = {
+ self._cache[k] for k in self._cache.islice(
+ start=self._cache.bisect_right(stream_pos),
+ )
+ }
- result = (
- set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
- .intersection(entities)
- .union(not_known_entities)
- )
+ result = changed_entities.intersection(entities)
self.metrics.inc_hits()
else:
@@ -113,7 +112,8 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- return self._cache.values()[self._cache.bisect_right(stream_pos) :]
+ return [self._cache[k] for k in self._cache.islice(
+ start=self._cache.bisect_right(stream_pos))]
else:
return None
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
from twisted.internet import defer
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_left_room", user=user, room_id=room_id)
+ distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_joined_room", user=user, room_id=room_id)
+ distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
model will do for today.
"""
- def __init__(self, suppress_failures=True):
- self.suppress_failures = suppress_failures
-
+ def __init__(self):
self.signals = {}
self.pre_registration = {}
@@ -56,7 +52,6 @@ class Distributor(object):
self.signals[name] = Signal(
name,
- suppress_failures=self.suppress_failures,
)
if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
self.pre_registration[name].append(observer)
def fire(self, name, *args, **kwargs):
+ """Dispatches the given signal to the registered observers.
+
+ Runs the observers as a background process. Does not return a deferred.
+ """
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))
- return self.signals[name].fire(*args, **kwargs)
+ run_as_background_process(
+ name,
+ self.signals[name].fire,
+ *args, **kwargs
+ )
class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers.
"""
- def __init__(self, name, suppress_failures):
+ def __init__(self, name):
self.name = name
- self.suppress_failures = suppress_failures
self.observers = []
def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
- @defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
failure.type,
failure.value,
failure.getTracebackObject()))
- if not self.suppress_failures:
- return failure
return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
- with PreserveLoggingContext():
- deferreds = [
- do(observer)
- for observer in self.observers
- ]
-
- res = yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ deferreds = [
+ run_in_background(do, o)
+ for o in self.observers
+ ]
- defer.returnValue(res)
+ return make_deferred_yieldable(defer.gatherResults(
+ deferreds, consumeErrors=True,
+ ))
def __repr__(self):
return "<Signal name=%r>" % (self.name,)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 3380970e4e..629ed44149 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import threads, reactor
+from six.moves import queue
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from twisted.internet import threads
-from six.moves import queue
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
class BackgroundFileConsumer(object):
@@ -27,6 +27,7 @@ class BackgroundFileConsumer(object):
Args:
file_obj (file): The file like object to write to. Closed when
finished.
+ reactor (twisted.internet.reactor): the Twisted reactor to use
"""
# For PushProducers pause if we have this many unwritten slices
@@ -34,9 +35,11 @@ class BackgroundFileConsumer(object):
# And resume once the size of the queue is less than this
_RESUME_ON_QUEUE_SIZE = 2
- def __init__(self, file_obj):
+ def __init__(self, file_obj, reactor):
self._file_obj = file_obj
+ self._reactor = reactor
+
# Producer we're registered with
self._producer = None
@@ -71,7 +74,10 @@ class BackgroundFileConsumer(object):
self._producer = producer
self.streaming = streaming
self._finished_deferred = run_in_background(
- threads.deferToThread, self._writer
+ threads.deferToThreadPool,
+ self._reactor,
+ self._reactor.getThreadPool(),
+ self._writer,
)
if not streaming:
self._producer.resumeProducing()
@@ -109,7 +115,7 @@ class BackgroundFileConsumer(object):
# producer.
if self._producer and self._paused_producer:
if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
- reactor.callFromThread(self._resume_paused_producer)
+ self._reactor.callFromThread(self._resume_paused_producer)
bytes = self._bytes_queue.get()
@@ -121,7 +127,7 @@ class BackgroundFileConsumer(object):
# If its a pull producer then we need to explicitly ask for
# more stuff.
if not self.streaming and self._producer:
- reactor.callFromThread(self._producer.resumeProducing)
+ self._reactor.callFromThread(self._producer.resumeProducing)
except Exception as e:
self._write_exception = e
raise
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 15f0a7ba9e..581c6052ac 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from frozendict import frozendict
-import simplejson as json
-
from six import string_types
+from canonicaljson import json
+from frozendict import frozendict
+
def freeze(o):
if isinstance(o, dict):
diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py
index e9f0f292ee..2d7ddc1cbe 100644
--- a/synapse/util/httpresourcetree.py
+++ b/synapse/util/httpresourcetree.py
@@ -12,10 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.web.resource import NoResource
-
import logging
+from twisted.web.resource import NoResource
+
logger = logging.getLogger(__name__)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a58c723403..f6c7175f74 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -22,10 +22,10 @@ them.
See doc/log_contexts.rst for details on how this works.
"""
-from twisted.internet import defer
-
-import threading
import logging
+import threading
+
+from twisted.internet import defer
logger = logging.getLogger(__name__)
@@ -49,17 +49,107 @@ except Exception:
return None
+class ContextResourceUsage(object):
+ """Object for tracking the resources used by a log context
+
+ Attributes:
+ ru_utime (float): user CPU time (in seconds)
+ ru_stime (float): system CPU time (in seconds)
+ db_txn_count (int): number of database transactions done
+ db_sched_duration_sec (float): amount of time spent waiting for a
+ database connection
+ db_txn_duration_sec (float): amount of time spent doing database
+ transactions (excluding scheduling time)
+ evt_db_fetch_count (int): number of events requested from the database
+ """
+
+ __slots__ = [
+ "ru_stime", "ru_utime",
+ "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+ "evt_db_fetch_count",
+ ]
+
+ def __init__(self, copy_from=None):
+ """Create a new ContextResourceUsage
+
+ Args:
+ copy_from (ContextResourceUsage|None): if not None, an object to
+ copy stats from
+ """
+ if copy_from is None:
+ self.reset()
+ else:
+ self.ru_utime = copy_from.ru_utime
+ self.ru_stime = copy_from.ru_stime
+ self.db_txn_count = copy_from.db_txn_count
+
+ self.db_txn_duration_sec = copy_from.db_txn_duration_sec
+ self.db_sched_duration_sec = copy_from.db_sched_duration_sec
+ self.evt_db_fetch_count = copy_from.evt_db_fetch_count
+
+ def copy(self):
+ return ContextResourceUsage(copy_from=self)
+
+ def reset(self):
+ self.ru_stime = 0.
+ self.ru_utime = 0.
+ self.db_txn_count = 0
+
+ self.db_txn_duration_sec = 0
+ self.db_sched_duration_sec = 0
+ self.evt_db_fetch_count = 0
+
+ def __iadd__(self, other):
+ """Add another ContextResourceUsage's stats to this one's.
+
+ Args:
+ other (ContextResourceUsage): the other resource usage object
+ """
+ self.ru_utime += other.ru_utime
+ self.ru_stime += other.ru_stime
+ self.db_txn_count += other.db_txn_count
+ self.db_txn_duration_sec += other.db_txn_duration_sec
+ self.db_sched_duration_sec += other.db_sched_duration_sec
+ self.evt_db_fetch_count += other.evt_db_fetch_count
+ return self
+
+ def __isub__(self, other):
+ self.ru_utime -= other.ru_utime
+ self.ru_stime -= other.ru_stime
+ self.db_txn_count -= other.db_txn_count
+ self.db_txn_duration_sec -= other.db_txn_duration_sec
+ self.db_sched_duration_sec -= other.db_sched_duration_sec
+ self.evt_db_fetch_count -= other.evt_db_fetch_count
+ return self
+
+ def __add__(self, other):
+ res = ContextResourceUsage(copy_from=self)
+ res += other
+ return res
+
+ def __sub__(self, other):
+ res = ContextResourceUsage(copy_from=self)
+ res -= other
+ return res
+
+
class LoggingContext(object):
"""Additional context for log formatting. Contexts are scoped within a
"with" block.
+ If a parent is given when creating a new context, then:
+ - logging fields are copied from the parent to the new context on entry
+ - when the new context exits, the cpu usage stats are copied from the
+ child to the parent
+
Args:
name (str): Name for the context for debugging.
+ parent_context (LoggingContext|None): The parent of the new context
"""
__slots__ = [
- "previous_context", "name", "ru_stime", "ru_utime",
- "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+ "previous_context", "name", "parent_context",
+ "_resource_usage",
"usage_start",
"main_thread", "alive",
"request", "tag",
@@ -90,24 +180,21 @@ class LoggingContext(object):
def add_database_scheduled(self, sched_sec):
pass
+ def record_event_fetch(self, event_count):
+ pass
+
def __nonzero__(self):
return False
__bool__ = __nonzero__ # python3
sentinel = Sentinel()
- def __init__(self, name=None):
+ def __init__(self, name=None, parent_context=None):
self.previous_context = LoggingContext.current_context()
self.name = name
- self.ru_stime = 0.
- self.ru_utime = 0.
- self.db_txn_count = 0
-
- # sec spent waiting for db txns, excluding scheduling time
- self.db_txn_duration_sec = 0
- # sec spent waiting for db txns to be scheduled
- self.db_sched_duration_sec = 0
+ # track the resources used by this context so far
+ self._resource_usage = ContextResourceUsage()
# If alive has the thread resource usage when the logcontext last
# became active.
@@ -118,6 +205,8 @@ class LoggingContext(object):
self.tag = ""
self.alive = True
+ self.parent_context = parent_context
+
def __str__(self):
return "%s@%x" % (self.name, id(self))
@@ -155,6 +244,10 @@ class LoggingContext(object):
self.previous_context, old_context
)
self.alive = True
+
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
return self
def __exit__(self, type, value, traceback):
@@ -176,6 +269,13 @@ class LoggingContext(object):
self.previous_context = None
self.alive = False
+ # if we have a parent, pass our CPU usage stats on
+ if self.parent_context is not None:
+ self.parent_context._resource_usage += self._resource_usage
+
+ # reset them in case we get entered again
+ self._resource_usage.reset()
+
def copy_to(self, record):
"""Copy logging fields from this context to a log record or
another LoggingContext
@@ -200,39 +300,43 @@ class LoggingContext(object):
logger.warning("Stopped logcontext %s on different thread", self)
return
- # When we stop, let's record the resource used since we started
- if self.usage_start:
- usage_end = get_thread_resource_usage()
+ # When we stop, let's record the cpu used since we started
+ if not self.usage_start:
+ logger.warning(
+ "Called stop on logcontext %s without calling start", self,
+ )
+ return
+
+ usage_end = get_thread_resource_usage()
- self.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
- self.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
+ self._resource_usage.ru_utime += usage_end.ru_utime - self.usage_start.ru_utime
+ self._resource_usage.ru_stime += usage_end.ru_stime - self.usage_start.ru_stime
- self.usage_start = None
- else:
- logger.warning("Called stop on logcontext %s without calling start", self)
+ self.usage_start = None
def get_resource_usage(self):
- """Get CPU time used by this logcontext so far.
+ """Get resources used by this logcontext so far.
Returns:
- tuple[float, float]: The user and system CPU usage in seconds
+ ContextResourceUsage: a *copy* of the object tracking resource
+ usage so far
"""
- ru_utime = self.ru_utime
- ru_stime = self.ru_stime
+ # we always return a copy, for consistency
+ res = self._resource_usage.copy()
# If we are on the correct thread and we're currently running then we
# can include resource usage so far.
is_main_thread = threading.current_thread() is self.main_thread
if self.alive and self.usage_start and is_main_thread:
current = get_thread_resource_usage()
- ru_utime += current.ru_utime - self.usage_start.ru_utime
- ru_stime += current.ru_stime - self.usage_start.ru_stime
+ res.ru_utime += current.ru_utime - self.usage_start.ru_utime
+ res.ru_stime += current.ru_stime - self.usage_start.ru_stime
- return ru_utime, ru_stime
+ return res
def add_database_transaction(self, duration_sec):
- self.db_txn_count += 1
- self.db_txn_duration_sec += duration_sec
+ self._resource_usage.db_txn_count += 1
+ self._resource_usage.db_txn_duration_sec += duration_sec
def add_database_scheduled(self, sched_sec):
"""Record a use of the database pool
@@ -241,7 +345,15 @@ class LoggingContext(object):
sched_sec (float): number of seconds it took us to get a
connection
"""
- self.db_sched_duration_sec += sched_sec
+ self._resource_usage.db_sched_duration_sec += sched_sec
+
+ def record_event_fetch(self, event_count):
+ """Record a number of events being fetched from the db
+
+ Args:
+ event_count (int): number of events being fetched
+ """
+ self._resource_usage.evt_db_fetch_count += event_count
class LoggingContextFilter(logging.Filter):
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
index 3e42868ea9..a46bc47ce3 100644
--- a/synapse/util/logformatter.py
+++ b/synapse/util/logformatter.py
@@ -14,10 +14,11 @@
# limitations under the License.
-from six import StringIO
import logging
import traceback
+from six import StringIO
+
class LogFormatter(logging.Formatter):
"""Log formatter which gives more detail for exceptions
diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py
index 03249c5dc8..62a00189cc 100644
--- a/synapse/util/logutils.py
+++ b/synapse/util/logutils.py
@@ -14,13 +14,11 @@
# limitations under the License.
-from inspect import getcallargs
-from functools import wraps
-
-import logging
import inspect
+import logging
import time
-
+from functools import wraps
+from inspect import getcallargs
_TIME_FUNC_ID = 0
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 97e0f00b67..14be3c7396 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.conch.manhole import ColoredManhole
-from twisted.conch.insults import insults
from twisted.conch import manhole_ssh
-from twisted.cred import checkers, portal
+from twisted.conch.insults import insults
+from twisted.conch.manhole import ColoredManhole
from twisted.conch.ssh.keys import Key
+from twisted.cred import checkers, portal
PUBLIC_KEY = (
"ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az"
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 1ba7d65c7c..6ba7107896 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
+from functools import wraps
from prometheus_client import Counter
-from synapse.util.logcontext import LoggingContext
-from functools import wraps
-import logging
+from twisted.internet import defer
+from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
@@ -60,10 +60,9 @@ def measure_func(name):
class Measure(object):
__slots__ = [
- "clock", "name", "start_context", "start", "new_context", "ru_utime",
- "ru_stime",
- "db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
+ "clock", "name", "start_context", "start",
"created_context",
+ "start_usage",
]
def __init__(self, clock, name):
@@ -81,10 +80,7 @@ class Measure(object):
self.start_context.__enter__()
self.created_context = True
- self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
- self.db_txn_count = self.start_context.db_txn_count
- self.db_txn_duration_sec = self.start_context.db_txn_duration_sec
- self.db_sched_duration_sec = self.start_context.db_sched_duration_sec
+ self.start_usage = self.start_context.get_resource_usage()
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
@@ -108,15 +104,12 @@ class Measure(object):
logger.warn("Expected context. (%r)", self.name)
return
- ru_utime, ru_stime = context.get_resource_usage()
-
- block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime)
- block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime)
- block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count)
- block_db_txn_duration.labels(self.name).inc(
- context.db_txn_duration_sec - self.db_txn_duration_sec)
- block_db_sched_duration.labels(self.name).inc(
- context.db_sched_duration_sec - self.db_sched_duration_sec)
+ usage = context.get_resource_usage() - self.start_usage
+ block_ru_utime.labels(self.name).inc(usage.ru_utime)
+ block_ru_stime.labels(self.name).inc(usage.ru_stime)
+ block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
+ block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
+ block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py
index 607161e7f0..a6c30e5265 100644
--- a/synapse/util/msisdn.py
+++ b/synapse/util/msisdn.py
@@ -14,6 +14,7 @@
# limitations under the License.
import phonenumbers
+
from synapse.api.errors import SynapseError
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 0ab63c3d7d..7deb38f2a7 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -13,21 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import collections
+import contextlib
+import logging
+
from twisted.internet import defer
from synapse.api.errors import LimitExceededError
-
-from synapse.util.async import sleep
from synapse.util.logcontext import (
- run_in_background, make_deferred_yieldable,
PreserveLoggingContext,
+ make_deferred_yieldable,
+ run_in_background,
)
-import collections
-import contextlib
-import logging
-
-
logger = logging.getLogger(__name__)
@@ -94,13 +92,22 @@ class _PerHostRatelimiter(object):
self.window_size = window_size
self.sleep_limit = sleep_limit
- self.sleep_msec = sleep_msec
+ self.sleep_sec = sleep_msec / 1000.0
self.reject_limit = reject_limit
self.concurrent_requests = concurrent_requests
+ # request_id objects for requests which have been slept
self.sleeping_requests = set()
+
+ # map from request_id object to Deferred for requests which are ready
+ # for processing but have been queued
self.ready_request_queue = collections.OrderedDict()
+
+ # request id objects for requests which are in progress
self.current_processing = set()
+
+ # times at which we have recently (within the last window_size ms)
+ # received requests.
self.request_times = []
@contextlib.contextmanager
@@ -119,11 +126,15 @@ class _PerHostRatelimiter(object):
def _on_enter(self, request_id):
time_now = self.clock.time_msec()
+
+ # remove any entries from request_times which aren't within the window
self.request_times[:] = [
r for r in self.request_times
if time_now - r < self.window_size
]
+ # reject the request if we already have too many queued up (either
+ # sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
raise LimitExceededError(
@@ -136,9 +147,13 @@ class _PerHostRatelimiter(object):
def queue_request():
if len(self.current_processing) > self.concurrent_requests:
- logger.debug("Ratelimit [%s]: Queue req", id(request_id))
queue_defer = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer
+ logger.info(
+ "Ratelimiter: queueing request (queue now %i items)",
+ len(self.ready_request_queue),
+ )
+
return queue_defer
else:
return defer.succeed(None)
@@ -150,10 +165,9 @@ class _PerHostRatelimiter(object):
if len(self.request_times) > self.sleep_limit:
logger.debug(
- "Ratelimit [%s]: sleeping req",
- id(request_id),
+ "Ratelimiter: sleeping request for %f sec", self.sleep_sec,
)
- ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
+ ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
self.sleeping_requests.add(request_id)
@@ -202,11 +216,8 @@ class _PerHostRatelimiter(object):
)
self.current_processing.discard(request_id)
try:
- request_id, deferred = self.ready_request_queue.popitem()
-
- # XXX: why do we do the following? the on_start callback above will
- # do it for us.
- self.current_processing.add(request_id)
+ # start processing the next item on the queue.
+ _, deferred = self.ready_request_queue.popitem(last=False)
with PreserveLoggingContext():
deferred.callback(None)
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4e93f69d3a..8a3a06fd74 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -12,14 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.util.logcontext
-from twisted.internet import defer
-
-from synapse.api.errors import CodeMessageException
-
import logging
import random
+from twisted.internet import defer
+
+import synapse.util.logcontext
+from synapse.api.errors import CodeMessageException
logger = logging.getLogger(__name__)
diff --git a/synapse/util/rlimit.py b/synapse/util/rlimit.py
index f4a9abf83f..6c0f2bb0cf 100644
--- a/synapse/util/rlimit.py
+++ b/synapse/util/rlimit.py
@@ -13,9 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import resource
import logging
-
+import resource
logger = logging.getLogger("synapse.app.homeserver")
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index b98b9dc6e4..43d9db67ec 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -15,6 +15,7 @@
import random
import string
+
from six.moves import range
_string_with_symbols = (
diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py
index 52086df465..1fbcd41115 100644
--- a/synapse/util/versionstring.py
+++ b/synapse/util/versionstring.py
@@ -14,9 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import subprocess
-import os
import logging
+import os
+import subprocess
logger = logging.getLogger(__name__)
|