diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 3 | ||||
-rw-r--r-- | synapse/util/caches/snapshot_cache.py | 93 | ||||
-rw-r--r-- | synapse/util/debug.py | 3 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 117 |
4 files changed, 198 insertions, 18 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index d69c7cb991..2170746025 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -64,8 +64,7 @@ class Clock(object): current_context = LoggingContext.current_context() def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context + with PreserveLoggingContext(current_context): callback(*args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py new file mode 100644 index 0000000000..09f00afbc5 --- /dev/null +++ b/synapse/util/caches/snapshot_cache.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.async import ObservableDeferred + + +class SnapshotCache(object): + """Cache for snapshots like the response of /initialSync. + The response of initialSync only has to be a recent snapshot of the + server state. It shouldn't matter to clients if it is a few minutes out + of date. + + This caches a deferred response. Until the deferred completes it will be + returned from the cache. This means that if the client retries the request + while the response is still being computed, that original response will be + used rather than trying to compute a new response. + + Once the deferred completes it will removed from the cache after 5 minutes. + We delay removing it from the cache because a client retrying its request + could race with us finishing computing the response. + + Rather than tracking precisely how long something has been in the cache we + keep two generations of completed responses. Every 5 minutes discard the + old generation, move the new generation to the old generation, and set the + new generation to be empty. This means that a result will be in the cache + somewhere between 5 and 10 minutes. + """ + + DURATION_MS = 5 * 60 * 1000 # Cache results for 5 minutes. + + def __init__(self): + self.pending_result_cache = {} # Request that haven't finished yet. + self.prev_result_cache = {} # The older requests that have finished. + self.next_result_cache = {} # The newer requests that have finished. + self.time_last_rotated_ms = 0 + + def rotate(self, time_now_ms): + # Rotate once if the cache duration has passed since the last rotation. + if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: + self.prev_result_cache = self.next_result_cache + self.next_result_cache = {} + self.time_last_rotated_ms += self.DURATION_MS + + # Rotate again if the cache duration has passed twice since the last + # rotation. + if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: + self.prev_result_cache = self.next_result_cache + self.next_result_cache = {} + self.time_last_rotated_ms = time_now_ms + + def get(self, time_now_ms, key): + self.rotate(time_now_ms) + # This cache is intended to deduplicate requests, so we expect it to be + # missed most of the time. So we just lookup the key in all of the + # dictionaries rather than trying to short circuit the lookup if the + # key is found. + result = self.prev_result_cache.get(key) + result = self.next_result_cache.get(key, result) + result = self.pending_result_cache.get(key, result) + if result is not None: + return result.observe() + else: + return None + + def set(self, time_now_ms, key, deferred): + self.rotate(time_now_ms) + + result = ObservableDeferred(deferred) + + self.pending_result_cache[key] = result + + def shuffle_along(r): + # When the deferred completes we shuffle it along to the first + # generation of the result cache. So that it will eventually + # expire from the rotation of that cache. + self.next_result_cache[key] = result + self.pending_result_cache.pop(key, None) + + result.observe().addBoth(shuffle_along) + + return result.observe() diff --git a/synapse/util/debug.py b/synapse/util/debug.py index f6a5a841a4..b2bee7958f 100644 --- a/synapse/util/debug.py +++ b/synapse/util/debug.py @@ -30,8 +30,7 @@ def debug_deferreds(): context = LoggingContext.current_context() def restore_context_callback(x): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = context + with PreserveLoggingContext(context): return fn(x) return restore_context_callback diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7e6062c1b8..d528ced55a 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -19,6 +19,25 @@ import logging logger = logging.getLogger(__name__) +try: + import resource + + # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined + # to be 1 on linux so we hard code it. + RUSAGE_THREAD = 1 + + # If the system doesn't support RUSAGE_THREAD then this should throw an + # exception. + resource.getrusage(RUSAGE_THREAD) + + def get_thread_resource_usage(): + return resource.getrusage(RUSAGE_THREAD) +except: + # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we + # won't track resource usage by returning None. + def get_thread_resource_usage(): + return None + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a @@ -27,7 +46,9 @@ class LoggingContext(object): name (str): Name for the context for debugging. """ - __slots__ = ["parent_context", "name", "__dict__"] + __slots__ = [ + "parent_context", "name", "usage_start", "usage_end", "main_thread", "__dict__" + ] thread_local = threading.local() @@ -42,11 +63,26 @@ class LoggingContext(object): def copy_to(self, record): pass + def start(self): + pass + + def stop(self): + pass + + def add_database_transaction(self, duration_ms): + pass + sentinel = Sentinel() def __init__(self, name=None): self.parent_context = None self.name = name + self.ru_stime = 0. + self.ru_utime = 0. + self.db_txn_count = 0 + self.db_txn_duration = 0. + self.usage_start = None + self.main_thread = threading.current_thread() def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -56,12 +92,26 @@ class LoggingContext(object): """Get the current logging context from thread local storage""" return getattr(cls.thread_local, "current_context", cls.sentinel) + @classmethod + def set_current_context(cls, context): + """Set the current logging context in thread local storage + Args: + context(LoggingContext): The context to activate. + Returns: + The context that was previously active + """ + current = cls.current_context() + if current is not context: + current.stop() + cls.thread_local.current_context = context + context.start() + return current + def __enter__(self): """Enters this logging context into thread local storage""" if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") - self.parent_context = self.current_context() - self.thread_local.current_context = self + self.parent_context = self.set_current_context(self) return self def __exit__(self, type, value, traceback): @@ -70,16 +120,16 @@ class LoggingContext(object): Returns: None to avoid suppressing any exeptions that were thrown. """ - if self.thread_local.current_context is not self: - if self.thread_local.current_context is self.sentinel: + current = self.set_current_context(self.parent_context) + if current is not self: + if current is self.sentinel: logger.debug("Expected logging context %s has been lost", self) else: logger.warn( "Current logging context %s is not expected context %s", - self.thread_local.current_context, + current, self ) - self.thread_local.current_context = self.parent_context self.parent_context = None def __getattr__(self, name): @@ -93,6 +143,43 @@ class LoggingContext(object): for key, value in self.__dict__.items(): setattr(record, key, value) + record.ru_utime, record.ru_stime = self.get_resource_usage() + + def start(self): + if threading.current_thread() is not self.main_thread: + return + + if self.usage_start and self.usage_end: + self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime + self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime + self.usage_start = None + self.usage_end = None + + if not self.usage_start: + self.usage_start = get_thread_resource_usage() + + def stop(self): + if threading.current_thread() is not self.main_thread: + return + + if self.usage_start: + self.usage_end = get_thread_resource_usage() + + def get_resource_usage(self): + ru_utime = self.ru_utime + ru_stime = self.ru_stime + + if self.usage_start and threading.current_thread() is self.main_thread: + current = get_thread_resource_usage() + ru_utime += current.ru_utime - self.usage_start.ru_utime + ru_stime += current.ru_stime - self.usage_start.ru_stime + + return ru_utime, ru_stime + + def add_database_transaction(self, duration_ms): + self.db_txn_count += 1 + self.db_txn_duration += duration_ms / 1000. + class LoggingContextFilter(logging.Filter): """Logging filter that adds values from the current logging context to each @@ -121,17 +208,20 @@ class PreserveLoggingContext(object): exited. Used to restore the context after a function using @defer.inlineCallbacks is resumed by a callback from the reactor.""" - __slots__ = ["current_context"] + __slots__ = ["current_context", "new_context"] + + def __init__(self, new_context=LoggingContext.sentinel): + self.new_context = new_context def __enter__(self): """Captures the current logging context""" - self.current_context = LoggingContext.current_context() - LoggingContext.thread_local.current_context = LoggingContext.sentinel + self.current_context = LoggingContext.set_current_context( + self.new_context + ) def __exit__(self, type, value, traceback): """Restores the current logging context""" - LoggingContext.thread_local.current_context = self.current_context - + LoggingContext.set_current_context(self.current_context) if self.current_context is not LoggingContext.sentinel: if self.current_context.parent_context is None: logger.warn( @@ -164,8 +254,7 @@ class _PreservingContextDeferred(defer.Deferred): def _wrap_callback(self, f): def g(res, *args, **kwargs): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = self._log_context + with PreserveLoggingContext(self._log_context): res = f(res, *args, **kwargs) return res return g |