From 48a2526d629fc33207cf864de36f3cff706a1c4c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 3 Dec 2015 21:03:01 +0000 Subject: Track the cpu used in the main thread by each logging context --- synapse/util/__init__.py | 3 +- synapse/util/debug.py | 3 +- synapse/util/logcontext.py | 76 ++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 72 insertions(+), 10 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index d69c7cb991..2170746025 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -64,8 +64,7 @@ class Clock(object): current_context = LoggingContext.current_context() def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context + with PreserveLoggingContext(current_context): callback(*args, **kwargs) with PreserveLoggingContext(): diff --git a/synapse/util/debug.py b/synapse/util/debug.py index f6a5a841a4..b2bee7958f 100644 --- a/synapse/util/debug.py +++ b/synapse/util/debug.py @@ -30,8 +30,7 @@ def debug_deferreds(): context = LoggingContext.current_context() def restore_context_callback(x): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = context + with PreserveLoggingContext(context): return fn(x) return restore_context_callback diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 7e6062c1b8..6d7a6c3e2b 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -19,6 +19,16 @@ import logging logger = logging.getLogger(__name__) +try: + import resource + RUSAGE_THREAD = 1 + resource.getrusage(RUSAGE_THREAD) + def get_thread_resource_usage(): + return resource.getrusage(RUSAGE_THREAD) +except: + def get_thread_resource_usage(): + return None + class LoggingContext(object): """Additional context for log formatting. Contexts are scoped within a @@ -27,7 +37,9 @@ class LoggingContext(object): name (str): Name for the context for debugging. """ - __slots__ = ["parent_context", "name", "__dict__"] + __slots__ = [ + "parent_context", "name", "usage_start", "usage_end", "main_thread", "__dict__" + ] thread_local = threading.local() @@ -42,11 +54,21 @@ class LoggingContext(object): def copy_to(self, record): pass + def start(self): + pass + + def stop(self): + pass + sentinel = Sentinel() def __init__(self, name=None): self.parent_context = None self.name = name + self.ru_stime = 0. + self.ru_utime = 0. + self.usage_start = None + self.main_thread = threading.current_thread() def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -62,6 +84,7 @@ class LoggingContext(object): raise Exception("Attempt to enter logging context multiple times") self.parent_context = self.current_context() self.thread_local.current_context = self + self.start() return self def __exit__(self, type, value, traceback): @@ -80,6 +103,7 @@ class LoggingContext(object): self ) self.thread_local.current_context = self.parent_context + self.stop() self.parent_context = None def __getattr__(self, name): @@ -93,6 +117,40 @@ class LoggingContext(object): for key, value in self.__dict__.items(): setattr(record, key, value) + record.ru_utime, record.ru_stime = self.get_resource_usage() + + def start(self): + if threading.current_thread() is not self.main_thread: + return + + if self.usage_start and self.usage_end: + self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime + self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime + self.usage_start = None + self.usage_end = None + + if not self.usage_start: + self.usage_start = get_thread_resource_usage() + + def stop(self): + if threading.current_thread() is not self.main_thread: + return + + if self.usage_start: + self.usage_end = get_thread_resource_usage() + + def get_resource_usage(self): + ru_utime = self.ru_utime + ru_stime = self.ru_stime + + start = self.usage_start + if self.usage_start and threading.current_thread() is self.main_thread: + current = get_thread_resource_usage() + ru_utime += current.ru_utime - self.usage_start.ru_utime + ru_stime += current.ru_stime - self.usage_start.ru_stime + + return ru_utime, ru_stime + class LoggingContextFilter(logging.Filter): """Logging filter that adds values from the current logging context to each @@ -121,17 +179,24 @@ class PreserveLoggingContext(object): exited. Used to restore the context after a function using @defer.inlineCallbacks is resumed by a callback from the reactor.""" - __slots__ = ["current_context"] + __slots__ = ["current_context", "new_context"] + + def __init__(self, new_context=LoggingContext.sentinel): + self.new_context = new_context def __enter__(self): """Captures the current logging context""" self.current_context = LoggingContext.current_context() - LoggingContext.thread_local.current_context = LoggingContext.sentinel + if self.new_context is not self.current_context: + self.current_context.stop() + LoggingContext.thread_local.current_context = self.new_context def __exit__(self, type, value, traceback): """Restores the current logging context""" + context = LoggingContext.thread_local.current_context LoggingContext.thread_local.current_context = self.current_context - + if context is not self.current_context: + self.current_context.start() if self.current_context is not LoggingContext.sentinel: if self.current_context.parent_context is None: logger.warn( @@ -164,8 +229,7 @@ class _PreservingContextDeferred(defer.Deferred): def _wrap_callback(self, f): def g(res, *args, **kwargs): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = self._log_context + with PreserveLoggingContext(self._log_context): res = f(res, *args, **kwargs) return res return g -- cgit 1.5.1 From d6059bdd2ade632b3778d1f475a35ffd4a7242e9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 4 Dec 2015 11:34:05 +0000 Subject: Fix warnings --- synapse/app/homeserver.py | 12 +++++++++++- synapse/util/logcontext.py | 4 ++-- 2 files changed, 13 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index cd7a52ec07..58c679bbfd 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -499,13 +499,23 @@ class SynapseRequest(Request): self.start_time = int(time.time() * 1000) def finished_processing(self): + + try: + context = LoggingContext.current_context() + ru_utime, ru_stime = context.get_resource_usage() + except: + ru_utime, ru_stime = (0, 0) + self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + " Processed request: %dms (%dms, %dms)" + " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, self.authenticated_entity, int(time.time() * 1000) - self.start_time, + int(ru_utime * 1000), + int(ru_stime * 1000), self.sentLength, self.code, self.method, diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 6d7a6c3e2b..2633201528 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -23,6 +23,7 @@ try: import resource RUSAGE_THREAD = 1 resource.getrusage(RUSAGE_THREAD) + def get_thread_resource_usage(): return resource.getrusage(RUSAGE_THREAD) except: @@ -137,13 +138,12 @@ class LoggingContext(object): return if self.usage_start: - self.usage_end = get_thread_resource_usage() + self.usage_end = get_thread_resource_usage() def get_resource_usage(self): ru_utime = self.ru_utime ru_stime = self.ru_stime - start = self.usage_start if self.usage_start and threading.current_thread() is self.main_thread: current = get_thread_resource_usage() ru_utime += current.ru_utime - self.usage_start.ru_utime -- cgit 1.5.1 From 5231737369c6c5488cdfdcb76af8008fc8a2db07 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 4 Dec 2015 11:53:38 +0000 Subject: Add comments to explain why we are hardcoding RUSAGE_THREAD --- synapse/util/logcontext.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 2633201528..e4ce087afe 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -21,12 +21,20 @@ logger = logging.getLogger(__name__) try: import resource + + # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined + # to be 1 on linux so we hard code it. RUSAGE_THREAD = 1 + + # If the system doesn't support RUSAGE_THREAD then this should throw an + # exception. resource.getrusage(RUSAGE_THREAD) def get_thread_resource_usage(): return resource.getrusage(RUSAGE_THREAD) except: + # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we + # won't track resource usage by returning None. def get_thread_resource_usage(): return None -- cgit 1.5.1 From 3dd16308487d4b5f76d8b3f3e0bf5ce2a72aff22 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 7 Dec 2015 10:51:18 +0000 Subject: Add a setter for the current log context. Move the resource tracking inside that setter so that it is easier to make sure that the resource tracking isn't double counting the resource usage. --- synapse/util/logcontext.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index e4ce087afe..c20c89aa8f 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -87,13 +87,26 @@ class LoggingContext(object): """Get the current logging context from thread local storage""" return getattr(cls.thread_local, "current_context", cls.sentinel) + @classmethod + def set_current_context(cls, context): + """Set the current logging context in thread local storage + Args: + context(LoggingContext): The context to activate. + Returns: + The context that was previously active + """ + current = cls.current_context() + if current is not context: + current.stop() + cls.thread_local.current_context = context + context.start() + return current + def __enter__(self): """Enters this logging context into thread local storage""" if self.parent_context is not None: raise Exception("Attempt to enter logging context multiple times") - self.parent_context = self.current_context() - self.thread_local.current_context = self - self.start() + self.parent_context = self.set_current_context(self) return self def __exit__(self, type, value, traceback): @@ -102,17 +115,16 @@ class LoggingContext(object): Returns: None to avoid suppressing any exeptions that were thrown. """ - if self.thread_local.current_context is not self: - if self.thread_local.current_context is self.sentinel: + current = self.set_current_context(self.parent_context) + if current is not self: + if current is self.sentinel: logger.debug("Expected logging context %s has been lost", self) else: logger.warn( "Current logging context %s is not expected context %s", - self.thread_local.current_context, + current, self ) - self.thread_local.current_context = self.parent_context - self.stop() self.parent_context = None def __getattr__(self, name): @@ -194,17 +206,13 @@ class PreserveLoggingContext(object): def __enter__(self): """Captures the current logging context""" - self.current_context = LoggingContext.current_context() - if self.new_context is not self.current_context: - self.current_context.stop() - LoggingContext.thread_local.current_context = self.new_context + self.current_context = LoggingContext.set_current_context( + self.new_context + ) def __exit__(self, type, value, traceback): """Restores the current logging context""" - context = LoggingContext.thread_local.current_context - LoggingContext.thread_local.current_context = self.current_context - if context is not self.current_context: - self.current_context.start() + LoggingContext.set_current_context(self.current_context) if self.current_context is not LoggingContext.sentinel: if self.current_context.parent_context is None: logger.warn( -- cgit 1.5.1 From 6a5ff5f223c1b4311aa63574663c0335d0c6bd79 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 7 Dec 2015 17:56:11 +0000 Subject: Track the time spent in the database per request. and track the number of transactions that request started. --- synapse/app/homeserver.py | 7 ++++++- synapse/http/server.py | 15 +++++++++++++++ synapse/storage/_base.py | 9 +++++++-- synapse/storage/events.py | 2 +- synapse/util/logcontext.py | 9 +++++++++ 5 files changed, 38 insertions(+), 4 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 58c679bbfd..56bc52e9ca 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -503,12 +503,15 @@ class SynapseRequest(Request): try: context = LoggingContext.current_context() ru_utime, ru_stime = context.get_resource_usage() + db_txn_count = context.db_txn_count + db_txn_duration = context.db_txn_duration except: ru_utime, ru_stime = (0, 0) + db_txn_count, db_txn_duration = (0, 0) self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms)" + " Processed request: %dms (%dms, %dms) (%dms/%d)" " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, @@ -516,6 +519,8 @@ class SynapseRequest(Request): int(time.time() * 1000) - self.start_time, int(ru_utime * 1000), int(ru_stime * 1000), + int(db_txn_duration * 1000), + int(db_txn_count), self.sentLength, self.code, self.method, diff --git a/synapse/http/server.py b/synapse/http/server.py index 06fb53707b..c44bdfc888 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -61,6 +61,15 @@ response_ru_stime = metrics.register_distribution( "response_ru_stime", labels=["method", "servlet"] ) +response_db_txn_count = metrics.register_distribution( + "response_db_txn_count", labels=["method", "servlet"] +) + +response_db_txn_duration = metrics.register_distribution( + "response_db_txn_duration", labels=["method", "servlet"] +) + + _next_request_id = 0 @@ -235,6 +244,12 @@ class JsonResource(HttpServer, resource.Resource): response_ru_utime.inc_by(ru_utime, request.method, servlet_classname) response_ru_stime.inc_by(ru_stime, request.method, servlet_classname) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname + ) except: pass diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 218e708054..17a14e001c 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -214,7 +214,8 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - def _new_transaction(self, conn, desc, after_callbacks, func, *args, **kwargs): + def _new_transaction(self, conn, desc, after_callbacks, logging_context, + func, *args, **kwargs): start = time.time() * 1000 txn_id = self._TXN_ID @@ -277,6 +278,9 @@ class SQLBaseStore(object): end = time.time() * 1000 duration = end - start + if logging_context is not None: + logging_context.add_database_transaction(duration) + transaction_logger.debug("[TXN END] {%s} %f", name, duration) self._current_txn_total_time += duration @@ -302,7 +306,8 @@ class SQLBaseStore(object): current_context.copy_to(context) return self._new_transaction( - conn, desc, after_callbacks, func, *args, **kwargs + conn, desc, after_callbacks, current_context, + func, *args, **kwargs ) result = yield preserve_context_over_fn( diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7088f2709b..fc5725097c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -649,7 +649,7 @@ class EventsStore(SQLBaseStore): ] rows = self._new_transaction( - conn, "do_fetch", [], self._fetch_event_rows, event_ids + conn, "do_fetch", [], None, self._fetch_event_rows, event_ids ) row_dict = { diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index c20c89aa8f..d528ced55a 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -69,6 +69,9 @@ class LoggingContext(object): def stop(self): pass + def add_database_transaction(self, duration_ms): + pass + sentinel = Sentinel() def __init__(self, name=None): @@ -76,6 +79,8 @@ class LoggingContext(object): self.name = name self.ru_stime = 0. self.ru_utime = 0. + self.db_txn_count = 0 + self.db_txn_duration = 0. self.usage_start = None self.main_thread = threading.current_thread() @@ -171,6 +176,10 @@ class LoggingContext(object): return ru_utime, ru_stime + def add_database_transaction(self, duration_ms): + self.db_txn_count += 1 + self.db_txn_duration += duration_ms / 1000. + class LoggingContextFilter(logging.Filter): """Logging filter that adds values from the current logging context to each -- cgit 1.5.1 From 9ac417fa88906d70de6a7c6f94d40fe11fc6d2fa Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 22 Dec 2015 18:27:56 +0000 Subject: Add a cache for initialSync responses that expires after 5 minutes --- synapse/handlers/message.py | 24 +++++++++++- synapse/util/caches/snapshot_cache.py | 71 +++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 synapse/util/caches/snapshot_cache.py (limited to 'synapse/util') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ccdd3d8473..bef477b31e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,6 +22,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.types import UserID, RoomStreamToken, StreamToken from ._base import BaseHandler @@ -45,6 +46,7 @@ class MessageHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() + self.snapshot_cache = SnapshotCache() @defer.inlineCallbacks def get_message(self, msg_id=None, room_id=None, sender_id=None, @@ -326,9 +328,29 @@ class MessageHandler(BaseHandler): [serialize_event(c, now) for c in room_state.values()] ) - @defer.inlineCallbacks def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): + key = ( + user_id, + pagin_config.from_token, + pagin_config.to_token, + pagin_config.direction, + pagin_config.limit, + as_client_event, + include_archived, + ) + now_ms = self.clock.time_msec() + result = self.snapshot_cache.get(now_ms, key) + if result is not None: + return result + + return self.snapshot_cache.set(now_ms, key, self._snapshot_all_rooms( + user_id, pagin_config, as_client_event, include_archived + )) + + @defer.inlineCallbacks + def _snapshot_all_rooms(self, user_id=None, pagin_config=None, + as_client_event=True, include_archived=False): """Retrieve a snapshot of all rooms the user is invited or has joined. This snapshot may include messages for all rooms where the user is diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py new file mode 100644 index 0000000000..b19aca05ab --- /dev/null +++ b/synapse/util/caches/snapshot_cache.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.async import ObservableDeferred + + +class SnapshotCache(object): + + DURATION_MS = 5 * 60 * 1000 # Cache results for 2 minutes. + + def __init__(self): + self.pending_result_cache = {} # Request that haven't finished yet. + self.prev_result_cache = {} # The older requests that have finished. + self.next_result_cache = {} # The newer requests that have finished. + self.time_last_rotated_ms = 0 + + def rotate(self, time_now_ms): + # Rotate once if the cache duration has passed since the last rotation. + if time_now_ms - self.time_last_rotated_ms > self.DURATION_MS: + self.prev_result_cache = self.next_result_cache + self.next_result_cache = {} + self.time_last_rotated_ms += self.DURATION_MS + + # Rotate again if the cache duration has passed twice since the last + # rotation. + if time_now_ms - self.time_last_rotated_ms > self.DURATION_MS: + self.prev_result_cache = self.next_result_cache + self.next_result_cache = {} + self.time_last_rotated_ms = time_now_ms + + def get(self, time_now_ms, key): + self.rotate(time_now_ms) + # This cache is intended to deduplicate requests, so we expect it to be + # missed most of the time. So we just lookup the key in all of the + # dictionaries rather than trying to short circuit the lookup if the + # key is found. + result = self.prev_result_cache.get(key) + result = self.next_result_cache.get(key, result) + result = self.pending_result_cache.get(key, result) + if result is not None: + return result.observe() + + def set(self, time_now_ms, key, deferred): + self.rotate(time_now_ms) + + result = ObservableDeferred(deferred) + + self.pending_result_cache[key] = result + + def shuffle_along(r): + # When the deferred completes we shuffle it along to the first + # generation of the result cache. So that it will eventually + # expire from the rotation of that cache. + self.next_result_cache[key] = result + self.pending_result_cache.pop(key, None) + + result.observe().addBoth(shuffle_along) + + return result.observe() -- cgit 1.5.1 From 7fa71e32670aa0ed2b49d04fd3c66a72e8fbc1cf Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 23 Dec 2015 11:48:03 +0000 Subject: Add a unit test for the snapshot cache --- synapse/util/caches/snapshot_cache.py | 4 +-- tests/util/test_snapshot_cache.py | 60 +++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 tests/util/test_snapshot_cache.py (limited to 'synapse/util') diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py index b19aca05ab..8a7ca47a86 100644 --- a/synapse/util/caches/snapshot_cache.py +++ b/synapse/util/caches/snapshot_cache.py @@ -28,14 +28,14 @@ class SnapshotCache(object): def rotate(self, time_now_ms): # Rotate once if the cache duration has passed since the last rotation. - if time_now_ms - self.time_last_rotated_ms > self.DURATION_MS: + if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: self.prev_result_cache = self.next_result_cache self.next_result_cache = {} self.time_last_rotated_ms += self.DURATION_MS # Rotate again if the cache duration has passed twice since the last # rotation. - if time_now_ms - self.time_last_rotated_ms > self.DURATION_MS: + if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: self.prev_result_cache = self.next_result_cache self.next_result_cache = {} self.time_last_rotated_ms = time_now_ms diff --git a/tests/util/test_snapshot_cache.py b/tests/util/test_snapshot_cache.py new file mode 100644 index 0000000000..f58576c941 --- /dev/null +++ b/tests/util/test_snapshot_cache.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from .. import unittest + +from synapse.util.caches.snapshot_cache import SnapshotCache +from twisted.internet.defer import Deferred + +class SnapshotCacheTestCase(unittest.TestCase): + + def setUp(self): + self.cache = SnapshotCache() + self.cache.DURATION_MS = 1 + + def test_get_set(self): + # Check that getting a missing key returns None + self.assertEquals(self.cache.get(0, "key"), None) + + # Check that setting a key with a deferred returns + # a deferred that resolves when the initial deferred does + d = Deferred() + set_result = self.cache.set(0, "key", d) + self.assertIsNotNone(set_result) + self.assertFalse(set_result.called) + + # Check that getting the key before the deferred has resolved + # returns a deferred that resolves when the initial deferred does. + get_result_at_10 = self.cache.get(10, "key") + self.assertIsNotNone(get_result_at_10) + self.assertFalse(get_result_at_10.called) + + # Check that the returned deferreds resolve when the initial deferred + # does. + d.callback("v") + self.assertTrue(set_result.called) + self.assertTrue(get_result_at_10.called) + + # Check that getting the key after the deferred has resolved + # before the cache expires returns a resolved deferred. + get_result_at_11 = self.cache.get(11, "key") + self.assertIsNotNone(get_result_at_11) + self.assertTrue(get_result_at_11.called) + + # Check that getting the key after the deferred has resolved + # after the cache expires returns None + get_result_at_12 = self.cache.get(12, "key") + self.assertIsNone(get_result_at_12) -- cgit 1.5.1 From d12c00bdc311bd0685aa7e7e70f1aa7787317164 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 23 Dec 2015 15:18:11 +0000 Subject: Add some docstring explaining the snapshot cache does --- synapse/util/caches/snapshot_cache.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py index 8a7ca47a86..09f00afbc5 100644 --- a/synapse/util/caches/snapshot_cache.py +++ b/synapse/util/caches/snapshot_cache.py @@ -17,8 +17,28 @@ from synapse.util.async import ObservableDeferred class SnapshotCache(object): + """Cache for snapshots like the response of /initialSync. + The response of initialSync only has to be a recent snapshot of the + server state. It shouldn't matter to clients if it is a few minutes out + of date. - DURATION_MS = 5 * 60 * 1000 # Cache results for 2 minutes. + This caches a deferred response. Until the deferred completes it will be + returned from the cache. This means that if the client retries the request + while the response is still being computed, that original response will be + used rather than trying to compute a new response. + + Once the deferred completes it will removed from the cache after 5 minutes. + We delay removing it from the cache because a client retrying its request + could race with us finishing computing the response. + + Rather than tracking precisely how long something has been in the cache we + keep two generations of completed responses. Every 5 minutes discard the + old generation, move the new generation to the old generation, and set the + new generation to be empty. This means that a result will be in the cache + somewhere between 5 and 10 minutes. + """ + + DURATION_MS = 5 * 60 * 1000 # Cache results for 5 minutes. def __init__(self): self.pending_result_cache = {} # Request that haven't finished yet. @@ -51,6 +71,8 @@ class SnapshotCache(object): result = self.pending_result_cache.get(key, result) if result is not None: return result.observe() + else: + return None def set(self, time_now_ms, key, deferred): self.rotate(time_now_ms) -- cgit 1.5.1