summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/caches/response_cache.py88
-rw-r--r--synapse/util/file_consumer.py4
2 files changed, 76 insertions, 16 deletions
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 066fa423fd..7f79333e96 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,9 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -31,6 +37,7 @@ class ResponseCache(object):
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
         self._metrics = cache_metrics.register_cache(
             "response_cache",
             size_callback=lambda: self.size(),
@@ -43,15 +50,21 @@ class ResponseCache(object):
     def get(self, key):
         """Look up the given key.
 
-        Returns a deferred which doesn't follow the synapse logcontext rules,
-        so you'll probably want to make_deferred_yieldable it.
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
 
         Args:
-            key (str):
+            key (hashable):
 
         Returns:
-            twisted.internet.defer.Deferred|None: None if there is no entry
-            for this key; otherwise a deferred result.
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
         """
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -68,19 +81,17 @@ class ResponseCache(object):
         you should wrap normal synapse deferreds with
         logcontext.run_in_background).
 
-        Returns a new Deferred which also doesn't follow the synapse logcontext
-        rules, so you will want to make_deferred_yieldable it
-
-        (TODO: before using this more widely, it might make sense to refactor
-        it and get() so that they do the necessary wrapping rather than having
-        to do it everywhere ResponseCache is used.)
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
 
         Args:
-            key (str):
-            deferred (twisted.internet.defer.Deferred):
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
 
         Returns:
-            twisted.internet.defer.Deferred
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
         """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
@@ -97,3 +108,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 90a2608d6f..3c8a165331 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
 
 from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 
-import Queue
+from six.moves import queue
 
 
 class BackgroundFileConsumer(object):
@@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
 
         # Queue of slices of bytes to be written. When producer calls
         # unregister a final None is sent.
-        self._bytes_queue = Queue.Queue()
+        self._bytes_queue = queue.Queue()
 
         # Deferred that is resolved when finished writing
         self._finished_deferred = None