summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-04-20 11:31:17 +0100
committerGitHub <noreply@github.com>2018-04-20 11:31:17 +0100
commit11a67b7c9db0da43eb5d9a8df4a0410cbf031d6a (patch)
tree5c4f2338436ddfc28b4293bf658921519a6168dc /synapse
parentMerge pull request #3117 from matrix-org/rav/refactor_have_events (diff)
parentReinstate linearizer for federation_server.on_context_state_request (diff)
downloadsynapse-11a67b7c9db0da43eb5d9a8df4a0410cbf031d6a.tar.xz
Merge pull request #3093 from matrix-org/rav/response_cache_wrap
Refactor ResponseCache usage
Diffstat (limited to 'synapse')
-rw-r--r--synapse/appservice/api.py8
-rw-r--r--synapse/federation/federation_server.py22
-rw-r--r--synapse/handlers/room_list.py38
-rw-r--r--synapse/handlers/sync.py16
-rw-r--r--synapse/replication/http/send_event.py18
-rw-r--r--synapse/util/caches/response_cache.py88
6 files changed, 111 insertions, 79 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 11e9c37c63..00efff1464 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.events.utils import serialize_event
-from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
                 defer.returnValue(None)
 
         key = (service.id, protocol)
-        result = self.protocol_meta_cache.get(key)
-        if not result:
-            result = self.protocol_meta_cache.set(
-                key, preserve_fn(_get)()
-            )
-        return make_deferred_yieldable(result)
+        return self.protocol_meta_cache.wrap(key, _get)
 
     @defer.inlineCallbacks
     def push_bulk(self, service, events, txn_id=None):
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e4ce037acf..12843fe179 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -30,7 +30,6 @@ import synapse.metrics
 from synapse.types import get_domain_from_id
 from synapse.util import async
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.logutils import log_function
 
 # when processing incoming transactions, we try to handle multiple rooms in
@@ -212,16 +211,17 @@ class FederationServer(FederationBase):
         if not in_room:
             raise AuthError(403, "Host not in room.")
 
-        result = self._state_resp_cache.get((room_id, event_id))
-        if not result:
-            with (yield self._server_linearizer.queue((origin, room_id))):
-                d = self._state_resp_cache.set(
-                    (room_id, event_id),
-                    preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
-                )
-                resp = yield make_deferred_yieldable(d)
-        else:
-            resp = yield make_deferred_yieldable(result)
+        # we grab the linearizer to protect ourselves from servers which hammer
+        # us. In theory we might already have the response to this query
+        # in the cache so we could return it without waiting for the linearizer
+        # - but that's non-trivial to get right, and anyway somewhat defeats
+        # the point of the linearizer.
+        with (yield self._server_linearizer.queue((origin, room_id))):
+            resp = yield self._state_resp_cache.wrap(
+                (room_id, event_id),
+                self._on_context_state_request_compute,
+                room_id, event_id,
+            )
 
         defer.returnValue((200, resp))
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 8028d793c2..add3f9b009 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -20,7 +20,6 @@ from ._base import BaseHandler
 from synapse.api.constants import (
     EventTypes, JoinRules,
 )
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
@@ -78,18 +77,11 @@ class RoomListHandler(BaseHandler):
             )
 
         key = (limit, since_token, network_tuple)
-        result = self.response_cache.get(key)
-        if not result:
-            logger.info("No cached result, calculating one.")
-            result = self.response_cache.set(
-                key,
-                preserve_fn(self._get_public_room_list)(
-                    limit, since_token, network_tuple=network_tuple
-                )
-            )
-        else:
-            logger.info("Using cached deferred result.")
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            key,
+            self._get_public_room_list,
+            limit, since_token, network_tuple=network_tuple,
+        )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
@@ -423,18 +415,14 @@ class RoomListHandler(BaseHandler):
             server_name, limit, since_token, include_all_networks,
             third_party_instance_id,
         )
-        result = self.remote_response_cache.get(key)
-        if not result:
-            result = self.remote_response_cache.set(
-                key,
-                repl_layer.get_public_rooms(
-                    server_name, limit=limit, since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            )
-        return result
+        return self.remote_response_cache.wrap(
+            key,
+            repl_layer.get_public_rooms,
+            server_name, limit=limit, since_token=since_token,
+            search_filter=search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
+        )
 
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 06d17ab20c..c6946831ab 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,7 +15,7 @@
 
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
-from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
@@ -180,15 +180,11 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        result = self.response_cache.get(sync_config.request_key)
-        if not result:
-            result = self.response_cache.set(
-                sync_config.request_key,
-                preserve_fn(self._wait_for_sync_for_user)(
-                    sync_config, since_token, timeout, full_state
-                )
-            )
-        return make_deferred_yieldable(result)
+        return self.response_cache.wrap(
+            sync_config.request_key,
+            self._wait_for_sync_for_user,
+            sync_config, since_token, timeout, full_state,
+        )
 
     @defer.inlineCallbacks
     def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index c6a6551d24..a9baa2c1c3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import Requester, UserID
 
@@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet):
         self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
 
     def on_PUT(self, request, event_id):
-        result = self.response_cache.get(event_id)
-        if not result:
-            result = self.response_cache.set(
-                event_id,
-                self._handle_request(request)
-            )
-        else:
-            logger.warn("Returning cached response")
-        return make_deferred_yieldable(result)
-
-    @preserve_fn
+        return self.response_cache.wrap(
+            event_id,
+            self._handle_request,
+            request
+        )
+
     @defer.inlineCallbacks
     def _handle_request(self, request):
         with Measure(self.clock, "repl_send_event_parse"):
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 066fa423fd..7f79333e96 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,9 +12,15 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
+from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
 from synapse.util.caches import metrics as cache_metrics
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
+logger = logging.getLogger(__name__)
 
 
 class ResponseCache(object):
@@ -31,6 +37,7 @@ class ResponseCache(object):
         self.clock = hs.get_clock()
         self.timeout_sec = timeout_ms / 1000.
 
+        self._name = name
         self._metrics = cache_metrics.register_cache(
             "response_cache",
             size_callback=lambda: self.size(),
@@ -43,15 +50,21 @@ class ResponseCache(object):
     def get(self, key):
         """Look up the given key.
 
-        Returns a deferred which doesn't follow the synapse logcontext rules,
-        so you'll probably want to make_deferred_yieldable it.
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if the request has completed, the actual
+        result. You will probably want to make_deferred_yieldable the result.
+
+        If there is no entry for the key, returns None. It is worth noting that
+        this means there is no way to distinguish a completed result of None
+        from an absent cache entry.
 
         Args:
-            key (str):
+            key (hashable):
 
         Returns:
-            twisted.internet.defer.Deferred|None: None if there is no entry
-            for this key; otherwise a deferred result.
+            twisted.internet.defer.Deferred|None|E: None if there is no entry
+            for this key; otherwise either a deferred result or the result
+            itself.
         """
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -68,19 +81,17 @@ class ResponseCache(object):
         you should wrap normal synapse deferreds with
         logcontext.run_in_background).
 
-        Returns a new Deferred which also doesn't follow the synapse logcontext
-        rules, so you will want to make_deferred_yieldable it
-
-        (TODO: before using this more widely, it might make sense to refactor
-        it and get() so that they do the necessary wrapping rather than having
-        to do it everywhere ResponseCache is used.)
+        Can return either a new Deferred (which also doesn't follow the synapse
+        logcontext rules), or, if *deferred* was already complete, the actual
+        result. You will probably want to make_deferred_yieldable the result.
 
         Args:
-            key (str):
-            deferred (twisted.internet.defer.Deferred):
+            key (hashable):
+            deferred (twisted.internet.defer.Deferred[T):
 
         Returns:
-            twisted.internet.defer.Deferred
+            twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
+                result.
         """
         result = ObservableDeferred(deferred, consumeErrors=True)
         self.pending_result_cache[key] = result
@@ -97,3 +108,52 @@ class ResponseCache(object):
 
         result.addBoth(remove)
         return result.observe()
+
+    def wrap(self, key, callback, *args, **kwargs):
+        """Wrap together a *get* and *set* call, taking care of logcontexts
+
+        First looks up the key in the cache, and if it is present makes it
+        follow the synapse logcontext rules and returns it.
+
+        Otherwise, makes a call to *callback(*args, **kwargs)*, which should
+        follow the synapse logcontext rules, and adds the result to the cache.
+
+        Example usage:
+
+            @defer.inlineCallbacks
+            def handle_request(request):
+                # etc
+                defer.returnValue(result)
+
+            result = yield response_cache.wrap(
+                key,
+                handle_request,
+                request,
+            )
+
+        Args:
+            key (hashable): key to get/set in the cache
+
+            callback (callable): function to call if the key is not found in
+                the cache
+
+            *args: positional parameters to pass to the callback, if it is used
+
+            **kwargs: named paramters to pass to the callback, if it is used
+
+        Returns:
+            twisted.internet.defer.Deferred: yieldable result
+        """
+        result = self.get(key)
+        if not result:
+            logger.info("[%s]: no cached result for [%s], calculating new one",
+                        self._name, key)
+            d = run_in_background(callback, *args, **kwargs)
+            result = self.set(key, d)
+        elif not isinstance(result, defer.Deferred) or result.called:
+            logger.info("[%s]: using completed cached result for [%s]",
+                        self._name, key)
+        else:
+            logger.info("[%s]: using incomplete cached result for [%s]",
+                        self._name, key)
+        return make_deferred_yieldable(result)