diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/file_consumer.py | 139 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 48 | ||||
-rw-r--r-- | synapse/util/metrics.py | 26 |
3 files changed, 199 insertions, 14 deletions
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py new file mode 100644 index 0000000000..90a2608d6f --- /dev/null +++ b/synapse/util/file_consumer.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import threads, reactor + +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn + +import Queue + + +class BackgroundFileConsumer(object): + """A consumer that writes to a file like object. Supports both push + and pull producers + + Args: + file_obj (file): The file like object to write to. Closed when + finished. + """ + + # For PushProducers pause if we have this many unwritten slices + _PAUSE_ON_QUEUE_SIZE = 5 + # And resume once the size of the queue is less than this + _RESUME_ON_QUEUE_SIZE = 2 + + def __init__(self, file_obj): + self._file_obj = file_obj + + # Producer we're registered with + self._producer = None + + # True if PushProducer, false if PullProducer + self.streaming = False + + # For PushProducers, indicates whether we've paused the producer and + # need to call resumeProducing before we get more data. + self._paused_producer = False + + # Queue of slices of bytes to be written. When producer calls + # unregister a final None is sent. + self._bytes_queue = Queue.Queue() + + # Deferred that is resolved when finished writing + self._finished_deferred = None + + # If the _writer thread throws an exception it gets stored here. + self._write_exception = None + + def registerProducer(self, producer, streaming): + """Part of IConsumer interface + + Args: + producer (IProducer) + streaming (bool): True if push based producer, False if pull + based. + """ + if self._producer: + raise Exception("registerProducer called twice") + + self._producer = producer + self.streaming = streaming + self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + if not streaming: + self._producer.resumeProducing() + + def unregisterProducer(self): + """Part of IProducer interface + """ + self._producer = None + if not self._finished_deferred.called: + self._bytes_queue.put_nowait(None) + + def write(self, bytes): + """Part of IProducer interface + """ + if self._write_exception: + raise self._write_exception + + if self._finished_deferred.called: + raise Exception("consumer has closed") + + self._bytes_queue.put_nowait(bytes) + + # If this is a PushProducer and the queue is getting behind + # then we pause the producer. + if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE: + self._paused_producer = True + self._producer.pauseProducing() + + def _writer(self): + """This is run in a background thread to write to the file. + """ + try: + while self._producer or not self._bytes_queue.empty(): + # If we've paused the producer check if we should resume the + # producer. + if self._producer and self._paused_producer: + if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: + reactor.callFromThread(self._resume_paused_producer) + + bytes = self._bytes_queue.get() + + # If we get a None (or empty list) then that's a signal used + # to indicate we should check if we should stop. + if bytes: + self._file_obj.write(bytes) + + # If its a pull producer then we need to explicitly ask for + # more stuff. + if not self.streaming and self._producer: + reactor.callFromThread(self._producer.resumeProducing) + except Exception as e: + self._write_exception = e + raise + finally: + self._file_obj.close() + + def wait(self): + """Returns a deferred that resolves when finished writing to file + """ + return make_deferred_yieldable(self._finished_deferred) + + def _resume_paused_producer(self): + """Gets called if we should resume producing after being paused + """ + if self._paused_producer and self._producer: + self._paused_producer = False + self._producer.resumeProducing() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 48c9f6802d..94fa7cac98 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -52,13 +52,17 @@ except Exception: class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a "with" block. + Args: name (str): Name for the context for debugging. """ __slots__ = [ - "previous_context", "name", "usage_start", "usage_end", "main_thread", - "__dict__", "tag", "alive", + "previous_context", "name", "ru_stime", "ru_utime", + "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "usage_start", "usage_end", + "main_thread", "alive", + "request", "tag", ] thread_local = threading.local() @@ -83,6 +87,9 @@ class LoggingContext(object): def add_database_transaction(self, duration_ms): pass + def add_database_scheduled(self, sched_ms): + pass + def __nonzero__(self): return False @@ -94,9 +101,17 @@ class LoggingContext(object): self.ru_stime = 0. self.ru_utime = 0. self.db_txn_count = 0 - self.db_txn_duration = 0. + + # ms spent waiting for db txns, excluding scheduling time + self.db_txn_duration_ms = 0 + + # ms spent waiting for db txns to be scheduled + self.db_sched_duration_ms = 0 + self.usage_start = None + self.usage_end = None self.main_thread = threading.current_thread() + self.request = None self.tag = "" self.alive = True @@ -105,7 +120,11 @@ class LoggingContext(object): @classmethod def current_context(cls): - """Get the current logging context from thread local storage""" + """Get the current logging context from thread local storage + + Returns: + LoggingContext: the current logging context + """ return getattr(cls.thread_local, "current_context", cls.sentinel) @classmethod @@ -155,11 +174,13 @@ class LoggingContext(object): self.alive = False def copy_to(self, record): - """Copy fields from this context to the record""" - for key, value in self.__dict__.items(): - setattr(record, key, value) + """Copy logging fields from this context to a log record or + another LoggingContext + """ - record.ru_utime, record.ru_stime = self.get_resource_usage() + # 'request' is the only field we currently use in the logger, so that's + # all we need to copy + record.request = self.request def start(self): if threading.current_thread() is not self.main_thread: @@ -194,7 +215,16 @@ class LoggingContext(object): def add_database_transaction(self, duration_ms): self.db_txn_count += 1 - self.db_txn_duration += duration_ms / 1000. + self.db_txn_duration_ms += duration_ms + + def add_database_scheduled(self, sched_ms): + """Record a use of the database pool + + Args: + sched_ms (int): number of milliseconds it took us to get a + connection + """ + self.db_sched_duration_ms += sched_ms class LoggingContextFilter(logging.Filter): diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 8d22ff3068..e4b5687a4b 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) # total number of times we have hit this block -response_count = metrics.register_counter( +block_counter = metrics.register_counter( "block_count", labels=["block_name"], alternative_names=( @@ -72,13 +72,19 @@ block_db_txn_count = metrics.register_counter( ), ) +# seconds spent waiting for db txns, excluding scheduling time, in this block block_db_txn_duration = metrics.register_counter( "block_db_txn_duration_seconds", labels=["block_name"], alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", + metrics.name_prefix + "_block_db_txn_duration:total", ), ) +# seconds spent waiting for a db connection, in this block +block_db_sched_duration = metrics.register_counter( + "block_db_sched_duration_seconds", labels=["block_name"], +) + def measure_func(name): def wrapper(func): @@ -95,7 +101,9 @@ def measure_func(name): class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", - "ru_stime", "db_txn_count", "db_txn_duration", "created_context" + "ru_stime", + "db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms", + "created_context", ] def __init__(self, clock, name): @@ -115,13 +123,16 @@ class Measure(object): self.ru_utime, self.ru_stime = self.start_context.get_resource_usage() self.db_txn_count = self.start_context.db_txn_count - self.db_txn_duration = self.start_context.db_txn_duration + self.db_txn_duration_ms = self.start_context.db_txn_duration_ms + self.db_sched_duration_ms = self.start_context.db_sched_duration_ms def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: return duration = self.clock.time_msec() - self.start + + block_counter.inc(self.name) block_timer.inc_by(duration, self.name) context = LoggingContext.current_context() @@ -145,7 +156,12 @@ class Measure(object): context.db_txn_count - self.db_txn_count, self.name ) block_db_txn_duration.inc_by( - context.db_txn_duration - self.db_txn_duration, self.name + (context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000., + self.name + ) + block_db_sched_duration.inc_by( + (context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000., + self.name ) if self.created_context: |