summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-08-02 15:21:37 +0100
committerGitHub <noreply@github.com>2016-08-02 15:21:37 +0100
commit7b0f6293f27c52194d86c3944a590d096e024f1b (patch)
treeebc1480decfa0f4063ec74628ae8e01bb84b5fc3
parentPrint authorization header for federation_client.py (diff)
parentCache federation state responses (diff)
downloadsynapse-7b0f6293f27c52194d86c3944a590d096e024f1b.tar.xz
Merge pull request #940 from matrix-org/erikj/fed_state_cache
Cache federation state responses
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_server.py66
-rw-r--r--synapse/handlers/federation.py7
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/sync.py2
-rw-r--r--synapse/util/caches/response_cache.py13
5 files changed, 60 insertions, 32 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 85f5e752fe..d15c7e1b40 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -21,10 +21,11 @@ from .units import Transaction, Edu
 
 from synapse.util.async import Linearizer
 from synapse.util.logutils import log_function
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.events import FrozenEvent
 import synapse.metrics
 
-from synapse.api.errors import FederationError, SynapseError
+from synapse.api.errors import AuthError, FederationError, SynapseError
 
 from synapse.crypto.event_signing import compute_event_signature
 
@@ -48,9 +49,15 @@ class FederationServer(FederationBase):
     def __init__(self, hs):
         super(FederationServer, self).__init__(hs)
 
+        self.auth = hs.get_auth()
+
         self._room_pdu_linearizer = Linearizer()
         self._server_linearizer = Linearizer()
 
+        # We cache responses to state queries, as they take a while and often
+        # come in waves.
+        self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
+
     def set_handler(self, handler):
         """Sets the handler that the replication layer will use to communicate
         receipt of new PDUs from other home servers. The required methods are
@@ -188,28 +195,45 @@ class FederationServer(FederationBase):
     @defer.inlineCallbacks
     @log_function
     def on_context_state_request(self, origin, room_id, event_id):
-        with (yield self._server_linearizer.queue((origin, room_id))):
-            if event_id:
-                pdus = yield self.handler.get_state_for_pdu(
-                    origin, room_id, event_id,
-                )
-                auth_chain = yield self.store.get_auth_chain(
-                    [pdu.event_id for pdu in pdus]
+        if not event_id:
+            raise NotImplementedError("Specify an event")
+
+        in_room = yield self.auth.check_host_in_room(room_id, origin)
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        result = self._state_resp_cache.get((room_id, event_id))
+        if not result:
+            with (yield self._server_linearizer.queue((origin, room_id))):
+                resp = yield self.response_cache.set(
+                    (room_id, event_id),
+                    self._on_context_state_request_compute(room_id, event_id)
                 )
+        else:
+            resp = yield result
 
-                for event in auth_chain:
-                    # We sign these again because there was a bug where we
-                    # incorrectly signed things the first time round
-                    if self.hs.is_mine_id(event.event_id):
-                        event.signatures.update(
-                            compute_event_signature(
-                                event,
-                                self.hs.hostname,
-                                self.hs.config.signing_key[0]
-                            )
-                        )
-            else:
-                raise NotImplementedError("Specify an event")
+        defer.returnValue((200, resp))
+
+    @defer.inlineCallbacks
+    def _on_context_state_request_compute(self, room_id, event_id):
+        pdus = yield self.handler.get_state_for_pdu(
+            room_id, event_id,
+        )
+        auth_chain = yield self.store.get_auth_chain(
+            [pdu.event_id for pdu in pdus]
+        )
+
+        for event in auth_chain:
+            # We sign these again because there was a bug where we
+            # incorrectly signed things the first time round
+            if self.hs.is_mine_id(event.event_id):
+                event.signatures.update(
+                    compute_event_signature(
+                        event,
+                        self.hs.hostname,
+                        self.hs.config.signing_key[0]
+                    )
+                )
 
         defer.returnValue((200, {
             "pdus": [pdu.get_pdu_json() for pdu in pdus],
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1323235b62..187bfc4315 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -991,14 +991,9 @@ class FederationHandler(BaseHandler):
         defer.returnValue(None)
 
     @defer.inlineCallbacks
-    def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
+    def get_state_for_pdu(self, room_id, event_id):
         yield run_on_reactor()
 
-        if do_auth:
-            in_room = yield self.auth.check_host_in_room(room_id, origin)
-            if not in_room:
-                raise AuthError(403, "Host not in room.")
-
         state_groups = yield self.store.get_state_groups(
             room_id, [event_id]
         )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ae44c7a556..bf6b1c1535 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler):
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
-        self.response_cache = ResponseCache()
-        self.remote_list_request_cache = ResponseCache()
+        self.response_cache = ResponseCache(hs)
+        self.remote_list_request_cache = ResponseCache(hs)
         self.remote_list_cache = {}
         self.fetch_looping_call = hs.get_clock().looping_call(
             self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index be26a491ff..0ee4ebe504 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -138,7 +138,7 @@ class SyncHandler(object):
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache = ResponseCache()
+        self.response_cache = ResponseCache(hs)
 
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 36686b479e..00af539880 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -24,9 +24,12 @@ class ResponseCache(object):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self):
+    def __init__(self, hs, timeout_ms=0):
         self.pending_result_cache = {}  # Requests that haven't finished yet.
 
+        self.clock = hs.get_clock()
+        self.timeout_sec = timeout_ms / 1000.
+
     def get(self, key):
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -39,7 +42,13 @@ class ResponseCache(object):
         self.pending_result_cache[key] = result
 
         def remove(r):
-            self.pending_result_cache.pop(key, None)
+            if self.timeout_sec:
+                self.clock.call_later(
+                    self.timeout_sec,
+                    self.pending_result_cache.pop, key, None,
+                )
+            else:
+                self.pending_result_cache.pop(key, None)
             return r
 
         result.addBoth(remove)