diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/appservice/api.py | 4 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 8 | ||||
-rw-r--r-- | synapse/handlers/initial_sync.py | 10 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 4 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 50 |
7 files changed, 46 insertions, 34 deletions
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index c526c28b93..e8f0793795 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -14,7 +14,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Tuple from prometheus_client import Counter @@ -93,7 +93,7 @@ class ApplicationServiceApi(SimpleHttpClient): self.protocol_meta_cache = ResponseCache( hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS - ) + ) # type: ResponseCache[Tuple[str, str]] async def query_user(self, service, user_id): if service.url is None: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e8039e244c..23278e36b7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -116,7 +116,7 @@ class FederationServer(FederationBase): # We cache results for transaction with the same ID self._transaction_resp_cache = ResponseCache( hs, "fed_txn_handler", timeout_ms=30000 - ) + ) # type: ResponseCache[Tuple[str, str]] self.transaction_actions = TransactionActions(self.store) @@ -124,10 +124,12 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) + self._state_resp_cache = ResponseCache( + hs, "state_resp", timeout_ms=30000 + ) # type: ResponseCache[Tuple[str, str]] self._state_ids_resp_cache = ResponseCache( hs, "state_ids_resp", timeout_ms=30000 - ) + ) # type: ResponseCache[Tuple[str, str]] self._federation_metrics_domains = ( hs.get_config().federation.federation_metrics_domains diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 39a85801c1..98075f48d2 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -14,7 +14,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional, Tuple from twisted.internet import defer @@ -47,12 +47,14 @@ class InitialSyncHandler(BaseHandler): self.state = hs.get_state_handler() self.clock = hs.get_clock() self.validator = EventValidator() - self.snapshot_cache = ResponseCache(hs, "initial_sync_cache") + self.snapshot_cache = ResponseCache( + hs, "initial_sync_cache" + ) # type: ResponseCache[Tuple[str, Optional[StreamToken], Optional[StreamToken], str, Optional[int], bool, bool]] self._event_serializer = hs.get_event_client_serializer() self.storage = hs.get_storage() self.state_store = self.storage.state - def snapshot_all_rooms( + async def snapshot_all_rooms( self, user_id: str, pagin_config: PaginationConfig, @@ -84,7 +86,7 @@ class InitialSyncHandler(BaseHandler): include_archived, ) - return self.snapshot_cache.wrap( + return await self.snapshot_cache.wrap( key, self._snapshot_all_rooms, user_id, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 1d04d41e98..93ed51063a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -120,7 +120,7 @@ class RoomCreationHandler(BaseHandler): # subsequent requests self._upgrade_response_cache = ResponseCache( hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS - ) + ) # type: ResponseCache[Tuple[str, str]] self._server_notices_mxid = hs.config.server_notices_mxid self.third_party_event_rules = hs.get_third_party_event_rules() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6fb8332f93..a306631094 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -243,7 +243,9 @@ class SyncHandler: self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache(hs, "sync") + self.response_cache = ResponseCache( + hs, "sync" + ) # type: ResponseCache[Tuple[Any, ...]] self.state = hs.get_state_handler() self.auth = hs.get_auth() self.storage = hs.get_storage() diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 64edadb624..2b3972cb14 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -92,7 +92,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): if self.CACHE: self.response_cache = ResponseCache( hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000 - ) + ) # type: ResponseCache[str] # We reserve `instance_name` as a parameter to sending requests, so we # assert here that sub classes don't try and use the name. diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index df1a721add..32228f42ee 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar from twisted.internet import defer @@ -20,10 +21,15 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches import register_cache +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) +T = TypeVar("T") + -class ResponseCache: +class ResponseCache(Generic[T]): """ This caches a deferred response. Until the deferred completes it will be returned from the cache. This means that if the client retries the request @@ -31,8 +37,9 @@ class ResponseCache: used rather than trying to compute a new response. """ - def __init__(self, hs, name, timeout_ms=0): - self.pending_result_cache = {} # Requests that haven't finished yet. + def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0): + # Requests that haven't finished yet. + self.pending_result_cache = {} # type: Dict[T, ObservableDeferred] self.clock = hs.get_clock() self.timeout_sec = timeout_ms / 1000.0 @@ -40,13 +47,13 @@ class ResponseCache: self._name = name self._metrics = register_cache("response_cache", name, self, resizable=False) - def size(self): + def size(self) -> int: return len(self.pending_result_cache) - def __len__(self): + def __len__(self) -> int: return self.size() - def get(self, key): + def get(self, key: T) -> Optional[defer.Deferred]: """Look up the given key. Can return either a new Deferred (which also doesn't follow the synapse @@ -58,12 +65,11 @@ class ResponseCache: from an absent cache entry. Args: - key (hashable): + key: key to get/set in the cache Returns: - twisted.internet.defer.Deferred|None|E: None if there is no entry - for this key; otherwise either a deferred result or the result - itself. + None if there is no entry for this key; otherwise a deferred which + resolves to the result. """ result = self.pending_result_cache.get(key) if result is not None: @@ -73,7 +79,7 @@ class ResponseCache: self._metrics.inc_misses() return None - def set(self, key, deferred): + def set(self, key: T, deferred: defer.Deferred) -> defer.Deferred: """Set the entry for the given key to the given deferred. *deferred* should run its callbacks in the sentinel logcontext (ie, @@ -85,12 +91,11 @@ class ResponseCache: result. You will probably want to make_deferred_yieldable the result. Args: - key (hashable): - deferred (twisted.internet.defer.Deferred[T): + key: key to get/set in the cache + deferred: The deferred which resolves to the result. Returns: - twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual - result. + A new deferred which resolves to the actual result. """ result = ObservableDeferred(deferred, consumeErrors=True) self.pending_result_cache[key] = result @@ -107,7 +112,9 @@ class ResponseCache: result.addBoth(remove) return result.observe() - def wrap(self, key, callback, *args, **kwargs): + def wrap( + self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any + ) -> defer.Deferred: """Wrap together a *get* and *set* call, taking care of logcontexts First looks up the key in the cache, and if it is present makes it @@ -118,21 +125,20 @@ class ResponseCache: Example usage: - @defer.inlineCallbacks - def handle_request(request): + async def handle_request(request): # etc return result - result = yield response_cache.wrap( + result = await response_cache.wrap( key, handle_request, request, ) Args: - key (hashable): key to get/set in the cache + key: key to get/set in the cache - callback (callable): function to call if the key is not found in + callback: function to call if the key is not found in the cache *args: positional parameters to pass to the callback, if it is used @@ -140,7 +146,7 @@ class ResponseCache: **kwargs: named parameters to pass to the callback, if it is used Returns: - twisted.internet.defer.Deferred: yieldable result + Deferred which resolves to the result """ result = self.get(key) if not result: |