diff options
author | Erik Johnston <erikj@jki.re> | 2016-08-02 15:21:37 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-02 15:21:37 +0100 |
commit | 7b0f6293f27c52194d86c3944a590d096e024f1b (patch) | |
tree | ebc1480decfa0f4063ec74628ae8e01bb84b5fc3 | |
parent | Print authorization header for federation_client.py (diff) | |
parent | Cache federation state responses (diff) | |
download | synapse-7b0f6293f27c52194d86c3944a590d096e024f1b.tar.xz |
Merge pull request #940 from matrix-org/erikj/fed_state_cache
Cache federation state responses
Diffstat (limited to '')
-rw-r--r-- | synapse/federation/federation_server.py | 66 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room.py | 4 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 13 |
5 files changed, 60 insertions, 32 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752fe..d15c7e1b40 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,10 +21,11 @@ from .units import Transaction, Edu from synapse.util.async import Linearizer from synapse.util.logutils import log_function +from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent import synapse.metrics -from synapse.api.errors import FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature @@ -48,9 +49,15 @@ class FederationServer(FederationBase): def __init__(self, hs): super(FederationServer, self).__init__(hs) + self.auth = hs.get_auth() + self._room_pdu_linearizer = Linearizer() self._server_linearizer = Linearizer() + # We cache responses to state queries, as they take a while and often + # come in waves. + self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -188,28 +195,45 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - with (yield self._server_linearizer.queue((origin, room_id))): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] + if not event_id: + raise NotImplementedError("Specify an event") + + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + result = self._state_resp_cache.get((room_id, event_id)) + if not result: + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self.response_cache.set( + (room_id, event_id), + self._on_context_state_request_compute(room_id, event_id) ) + else: + resp = yield result - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - else: - raise NotImplementedError("Specify an event") + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def _on_context_state_request_compute(self, room_id, event_id): + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1323235b62..187bfc4315 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -991,14 +991,9 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): + def get_state_for_pdu(self, room_id, event_id): yield run_on_reactor() - if do_auth: - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae44c7a556..bf6b1c1535 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler): class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache() - self.remote_list_request_cache = ResponseCache() + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) self.remote_list_cache = {} self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491ff..0ee4ebe504 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -138,7 +138,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache() + self.response_cache = ResponseCache(hs) def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 36686b479e..00af539880 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -24,9 +24,12 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self): + def __init__(self, hs, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. + self.clock = hs.get_clock() + self.timeout_sec = timeout_ms / 1000. + def get(self, key): result = self.pending_result_cache.get(key) if result is not None: @@ -39,7 +42,13 @@ class ResponseCache(object): self.pending_result_cache[key] = result def remove(r): - self.pending_result_cache.pop(key, None) + if self.timeout_sec: + self.clock.call_later( + self.timeout_sec, + self.pending_result_cache.pop, key, None, + ) + else: + self.pending_result_cache.pop(key, None) return r result.addBoth(remove) |