summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-05-03 00:26:33 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-05-03 00:26:33 +0100
commit50d5a97c1b3eb07deaa59f1d75f105415df0ba0d (patch)
treec8f980ae0a8c2e4d34d52e2ee7fb09383867a396 /synapse/util
parentMerge pull request #3166 from matrix-org/dbkr/postgres_doesnt_have_ifnull (diff)
parentmissing word :| (diff)
downloadsynapse-50d5a97c1b3eb07deaa59f1d75f105415df0ba0d.tar.xz
Merge branch 'master' into dinsic
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/caches/response_cache.py88
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/file_consumer.py4
-rw-r--r--synapse/util/logcontext.py1
4 files changed, 79 insertions, 18 deletions
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 066fa423fd..7f79333e96 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,9 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -31,6 +37,7 @@ class ResponseCache(object):
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
         self._metrics = cache_metrics.register_cache(
             "response_cache",
             size_callback=lambda: self.size(),
@@ -43,15 +50,21 @@ class ResponseCache(object):
     def get(self, key):
         """Look up the given key.
 
-        Returns a deferred which doesn't follow the synapse logcontext rules,
-        so you'll probably want to make_deferred_yieldable it.
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
 
         Args:
-            key (str):
+            key (hashable):
 
         Returns:
-            twisted.internet.defer.Deferred|None: None if there is no entry
-            for this key; otherwise a deferred result.
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
         """
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -68,19 +81,17 @@ class ResponseCache(object):
         you should wrap normal synapse deferreds with
         logcontext.run_in_background).
 
-        Returns a new Deferred which also doesn't follow the synapse logcontext
-        rules, so you will want to make_deferred_yieldable it
-
-        (TODO: before using this more widely, it might make sense to refactor
-        it and get() so that they do the necessary wrapping rather than having
-        to do it everywhere ResponseCache is used.)
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
 
         Args:
-            key (str):
-            deferred (twisted.internet.defer.Deferred):
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
 
         Returns:
-            twisted.internet.defer.Deferred
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
         """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
@@ -97,3 +108,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2ff46090a6..941d873ab8 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -16,7 +16,7 @@
 from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
 
 
-from sortedcontainers import SortedDict
+from blist import sorteddict
 import logging
 
 
@@ -35,7 +35,7 @@ class StreamChangeCache(object):
     def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
         self._max_size = int(max_size * CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
-        self._cache = SortedDict()
+        self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
         self.metrics = register_cache(self.name, self._cache)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d6f..3c8a165331 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
 
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 
-import Queue
+from six.moves import queue
 
 
 class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
 
         # Queue of slices of bytes to be written. When producer calls
         # unregister a final None is sent.
-        self._bytes_queue = Queue.Queue()
+        self._bytes_queue = queue.Queue()
 
         # Deferred that is resolved when finished writing
         self._finished_deferred = None
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index d660ec785b..d59adc236e 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -92,6 +92,7 @@ class LoggingContext(object):
 
         def __nonzero__(self):
             return False
+        __bool__ = __nonzero__  # python3
 
     sentinel = Sentinel()