diff options
author | Daniel Wagner-Hall <daniel@matrix.org> | 2015-08-18 14:43:44 +0100 |
---|---|---|
committer | Daniel Wagner-Hall <daniel@matrix.org> | 2015-08-18 14:43:44 +0100 |
commit | 1469141023fb01b99e1d1bbb699a4dee803149aa (patch) | |
tree | f6a3afbafa8a1cb8267a754a5f6e4105c34838d4 /synapse | |
parent | Remove accidentally added file (diff) | |
parent | Typo (diff) | |
download | synapse-1469141023fb01b99e1d1bbb699a4dee803149aa.tar.xz |
Merge branch 'develop' into auth
Diffstat (limited to 'synapse')
31 files changed, 1236 insertions, 448 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7736d14fb5..f5e346cdbc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,7 +23,7 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError -from synapse.util.expiringcache import ExpiringCache +from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent import synapse.metrics @@ -134,6 +134,36 @@ class FederationClient(FederationBase): destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail ) + @log_function + def query_client_keys(self, destination, content): + """Query device keys for a device hosted on a remote server. + + Args: + destination (str): Domain name of the remote homeserver + content (dict): The query content. + + Returns: + a Deferred which will eventually yield a JSON object from the + response + """ + sent_queries_counter.inc("client_device_keys") + return self.transport_layer.query_client_keys(destination, content) + + @log_function + def claim_client_keys(self, destination, content): + """Claims one-time keys for a device hosted on a remote server. + + Args: + destination (str): Domain name of the remote homeserver + content (dict): The query content. + + Returns: + a Deferred which will eventually yield a JSON object from the + response + """ + sent_queries_counter.inc("client_one_time_keys") + return self.transport_layer.claim_client_keys(destination, content) + @defer.inlineCallbacks @log_function def backfill(self, dest, context, limit, extremities): diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index cd79e23f4b..725c6f3fa5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,6 +27,7 @@ from synapse.api.errors import FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature +import simplejson as json import logging @@ -314,6 +315,48 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function + def on_query_client_keys(self, origin, content): + query = [] + for user_id, device_ids in content.get("device_keys", {}).items(): + if not device_ids: + query.append((user_id, None)) + else: + for device_id in device_ids: + query.append((user_id, device_id)) + + results = yield self.store.get_e2e_device_keys(query) + + json_result = {} + for user_id, device_keys in results.items(): + for device_id, json_bytes in device_keys.items(): + json_result.setdefault(user_id, {})[device_id] = json.loads( + json_bytes + ) + + defer.returnValue({"device_keys": json_result}) + + @defer.inlineCallbacks + @log_function + def on_claim_client_keys(self, origin, content): + query = [] + for user_id, device_keys in content.get("one_time_keys", {}).items(): + for device_id, algorithm in device_keys.items(): + query.append((user_id, device_id, algorithm)) + + results = yield self.store.claim_e2e_one_time_keys(query) + + json_result = {} + for user_id, device_keys in results.items(): + for device_id, keys in device_keys.items(): + for key_id, json_bytes in keys.items(): + json_result.setdefault(user_id, {})[device_id] = { + key_id: json.loads(json_bytes) + } + + defer.returnValue({"one_time_keys": json_result}) + + @defer.inlineCallbacks + @log_function def on_get_missing_events(self, origin, room_id, earliest_events, latest_events, limit, min_depth): missing_events = yield self.handler.on_get_missing_events( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 610a4c3163..ced703364b 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -224,6 +224,76 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function + def query_client_keys(self, destination, query_content): + """Query the device keys for a list of user ids hosted on a remote + server. + + Request: + { + "device_keys": { + "<user_id>": ["<device_id>"] + } } + + Response: + { + "device_keys": { + "<user_id>": { + "<device_id>": {...} + } } } + + Args: + destination(str): The server to query. + query_content(dict): The user ids to query. + Returns: + A dict containg the device keys. + """ + path = PREFIX + "/user/keys/query" + + content = yield self.client.post_json( + destination=destination, + path=path, + data=query_content, + ) + defer.returnValue(content) + + @defer.inlineCallbacks + @log_function + def claim_client_keys(self, destination, query_content): + """Claim one-time keys for a list of devices hosted on a remote server. + + Request: + { + "one_time_keys": { + "<user_id>": { + "<device_id>": "<algorithm>" + } } } + + Response: + { + "device_keys": { + "<user_id>": { + "<device_id>": { + "<algorithm>:<key_id>": "<key_base64>" + } } } } + + Args: + destination(str): The server to query. + query_content(dict): The user ids to query. + Returns: + A dict containg the one-time keys. + """ + + path = PREFIX + "/user/keys/claim" + + content = yield self.client.post_json( + destination=destination, + path=path, + data=query_content, + ) + defer.returnValue(content) + + @defer.inlineCallbacks + @log_function def get_missing_events(self, destination, room_id, earliest_events, latest_events, limit, min_depth): path = PREFIX + "/get_missing_events/%s" % (room_id,) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index bad93c6b2f..36f250e1a3 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -325,6 +325,24 @@ class FederationInviteServlet(BaseFederationServlet): defer.returnValue((200, content)) +class FederationClientKeysQueryServlet(BaseFederationServlet): + PATH = "/user/keys/query" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query): + response = yield self.handler.on_query_client_keys(origin, content) + defer.returnValue((200, response)) + + +class FederationClientKeysClaimServlet(BaseFederationServlet): + PATH = "/user/keys/claim" + + @defer.inlineCallbacks + def on_POST(self, origin, content, query): + response = yield self.handler.on_claim_client_keys(origin, content) + defer.returnValue((200, response)) + + class FederationQueryAuthServlet(BaseFederationServlet): PATH = "/query_auth/([^/]*)/([^/]*)" @@ -373,4 +391,6 @@ SERVLET_CLASSES = ( FederationQueryAuthServlet, FederationGetMissingEventsServlet, FederationEventAuthServlet, + FederationClientKeysQueryServlet, + FederationClientKeysClaimServlet, ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f7155fd8d3..1e3dccf5a8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -229,15 +229,15 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): - states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + event_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, None), + ) ) - events_and_states = zip(events, states) - - def redact_disallowed(event_and_state): - event, state = event_and_state - + def redact_disallowed(event, state): if not state: return event @@ -271,11 +271,10 @@ class FederationHandler(BaseHandler): return event - res = map(redact_disallowed, events_and_states) - - logger.info("_filter_events_for_server %r", res) - - defer.returnValue(res) + defer.returnValue([ + redact_disallowed(e, event_to_state[e.event_id]) + for e in events + ]) @log_function @defer.inlineCallbacks @@ -503,7 +502,7 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) states = yield defer.gatherResults([ - self.state_handler.resolve_state_groups([e]) + self.state_handler.resolve_state_groups(room_id, [e]) for e in event_ids ]) states = dict(zip(event_ids, [s[1] for s in states])) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9d6d4f0978..f12465fa2c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -137,15 +137,15 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): - states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + event_id_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) ) - events_and_states = zip(events, states) - - def allowed(event_and_state): - event, state = event_and_state - + def allowed(event, state): if event.type == EventTypes.RoomHistoryVisibility: return True @@ -175,10 +175,10 @@ class MessageHandler(BaseHandler): return True - events_and_states = filter(allowed, events_and_states) defer.returnValue([ - ev - for ev, _ in events_and_states + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) ]) @defer.inlineCallbacks @@ -401,10 +401,14 @@ class MessageHandler(BaseHandler): except: logger.exception("Failed to get snapshot") - yield defer.gatherResults( - [handle_room(e) for e in room_list], - consumeErrors=True - ).addErrback(unwrapFirstError) + # Only do N rooms at once + n = 5 + d_list = [handle_room(e) for e in room_list] + for i in range(0, len(d_list), n): + yield defer.gatherResults( + d_list[i:i + n], + consumeErrors=True + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, @@ -456,20 +460,14 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def get_presence(): - presence_defs = yield defer.DeferredList( - [ - presence_handler.get_state( - target_user=UserID.from_string(m.user_id), - auth_user=auth_user, - as_event=True, - check_auth=False, - ) - for m in room_members - ], - consumeErrors=True, + states = yield presence_handler.get_states( + target_users=[UserID.from_string(m.user_id) for m in room_members], + auth_user=auth_user, + as_event=True, + check_auth=False, ) - defer.returnValue([p for success, p in presence_defs if success]) + defer.returnValue(states.values()) receipts_handler = self.hs.get_handlers().receipts_handler diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 341a516da2..e91e81831e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -192,6 +192,20 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_state(self, target_user, auth_user, as_event=False, check_auth=True): + """Get the current presence state of the given user. + + Args: + target_user (UserID): The user whose presence we want + auth_user (UserID): The user requesting the presence, used for + checking if said user is allowed to see the persence of the + `target_user` + as_event (bool): Format the return as an event or not? + check_auth (bool): Perform the auth checks or not? + + Returns: + dict: The presence state of the `target_user`, whose format depends + on the `as_event` argument. + """ if self.hs.is_mine(target_user): if check_auth: visible = yield self.is_presence_visible( @@ -233,6 +247,81 @@ class PresenceHandler(BaseHandler): defer.returnValue(state) @defer.inlineCallbacks + def get_states(self, target_users, auth_user, as_event=False, check_auth=True): + """A batched version of the `get_state` method that accepts a list of + `target_users` + + Args: + target_users (list): The list of UserID's whose presence we want + auth_user (UserID): The user requesting the presence, used for + checking if said user is allowed to see the persence of the + `target_users` + as_event (bool): Format the return as an event or not? + check_auth (bool): Perform the auth checks or not? + + Returns: + dict: A mapping from user -> presence_state + """ + local_users, remote_users = partitionbool( + target_users, + lambda u: self.hs.is_mine(u) + ) + + if check_auth: + for user in local_users: + visible = yield self.is_presence_visible( + observer_user=auth_user, + observed_user=user + ) + + if not visible: + raise SynapseError(404, "Presence information not visible") + + results = {} + if local_users: + for user in local_users: + if user in self._user_cachemap: + results[user] = self._user_cachemap[user].get_state() + + local_to_user = {u.localpart: u for u in local_users} + + states = yield self.store.get_presence_states( + [u.localpart for u in local_users if u not in results] + ) + + for local_part, state in states.items(): + if state is None: + continue + res = {"presence": state["state"]} + if "status_msg" in state and state["status_msg"]: + res["status_msg"] = state["status_msg"] + results[local_to_user[local_part]] = res + + for user in remote_users: + # TODO(paul): Have remote server send us permissions set + results[user] = self._get_or_offline_usercache(user).get_state() + + for state in results.values(): + if "last_active" in state: + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + if as_event: + for user, state in results.items(): + content = state + content["user_id"] = user.to_string() + + if "last_active" in content: + content["last_active_ago"] = int( + self._clock.time_msec() - content.pop("last_active") + ) + + results[user] = {"type": "m.presence", "content": content} + + defer.returnValue(results) + + @defer.inlineCallbacks @log_function def set_state(self, target_user, auth_user, state): # return diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6cff6230c1..7206ae23d7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -294,15 +294,15 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def _filter_events_for_client(self, user_id, room_id, events): - states = yield self.store.get_state_for_events( - room_id, [e.event_id for e in events], + event_id_to_state = yield self.store.get_state_for_events( + room_id, frozenset(e.event_id for e in events), + types=( + (EventTypes.RoomHistoryVisibility, ""), + (EventTypes.Member, user_id), + ) ) - events_and_states = zip(events, states) - - def allowed(event_and_state): - event, state = event_and_state - + def allowed(event, state): if event.type == EventTypes.RoomHistoryVisibility: return True @@ -331,10 +331,11 @@ class SyncHandler(BaseHandler): return membership == Membership.INVITE return True - events_and_states = filter(allowed, events_and_states) + defer.returnValue([ - ev - for ev, _ in events_and_states + event + for event in events + if allowed(event, event_id_to_state[event.event_id]) ]) @defer.inlineCallbacks diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index ed47e701e7..854e17a473 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -16,7 +16,7 @@ from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError -from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool +from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone @@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter( ) -class MatrixFederationHttpAgent(_AgentBase): - - def __init__(self, reactor, pool=None): - _AgentBase.__init__(self, reactor, pool) - - def request(self, destination, endpoint, method, path, params, query, - headers, body_producer): - - outgoing_requests_counter.inc(method) - - host = b"" - port = 0 - fragment = b"" - - parsed_URI = _URI(b"http", destination, host, port, path, params, - query, fragment) - - # Set the connection pool key to be the destination. - key = destination - - d = self._requestWithEndpoint(key, endpoint, method, parsed_URI, - headers, body_producer, - parsed_URI.originForm) - - def _cb(response): - incoming_responses_counter.inc(method, response.code) - return response - - def _eb(failure): - incoming_responses_counter.inc(method, "ERR") - return failure +class MatrixFederationEndpointFactory(object): + def __init__(self, hs): + self.tls_context_factory = hs.tls_context_factory - d.addCallbacks(_cb, _eb) + def endpointForURI(self, uri): + destination = uri.netloc - return d + return matrix_federation_endpoint( + reactor, destination, timeout=10, + ssl_context_factory=self.tls_context_factory + ) class MatrixFederationHttpClient(object): @@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object): self.server_name = hs.hostname pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 10 - self.agent = MatrixFederationHttpAgent(reactor, pool=pool) + self.agent = Agent.usingEndpointFactory( + reactor, MatrixFederationEndpointFactory(hs), pool=pool + ) self.clock = hs.get_clock() self.version_string = hs.version_string - self._next_id = 1 + def _create_url(self, destination, path_bytes, param_bytes, query_bytes): + return urlparse.urlunparse( + ("matrix", destination, path_bytes, param_bytes, query_bytes, "") + ) + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"Host"] = [destination] - url_bytes = urlparse.urlunparse( - ("", "", path_bytes, param_bytes, query_bytes, "",) + url_bytes = self._create_url( + destination, path_bytes, param_bytes, query_bytes ) txn_id = "%s-O-%s" % (method, self._next_id) @@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object): # (once we have reliable transactions in place) retries_left = 5 - endpoint = preserve_context_over_fn( - self._getEndpoint, reactor, destination + http_url_bytes = urlparse.urlunparse( + ("", "", path_bytes, param_bytes, query_bytes, "") ) log_result = None @@ -148,17 +130,14 @@ class MatrixFederationHttpClient(object): while True: producer = None if body_callback: - producer = body_callback(method, url_bytes, headers_dict) + producer = body_callback(method, http_url_bytes, headers_dict) try: def send_request(): - request_deferred = self.agent.request( - destination, - endpoint, + request_deferred = preserve_context_over_fn( + self.agent.request, method, - path_bytes, - param_bytes, - query_bytes, + url_bytes, Headers(headers_dict), producer ) @@ -452,12 +431,6 @@ class MatrixFederationHttpClient(object): defer.returnValue((length, headers)) - def _getEndpoint(self, reactor, destination): - return matrix_federation_endpoint( - reactor, destination, timeout=10, - ssl_context_factory=self.hs.tls_context_factory - ) - class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 9233ea3da9..0be9772991 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,8 +18,12 @@ from __future__ import absolute_import import logging from resource import getrusage, getpagesize, RUSAGE_SELF +import functools import os import stat +import time + +from twisted.internet import reactor from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric @@ -144,3 +148,28 @@ def _process_fds(): return counts get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) + +reactor_metrics = get_metrics_for("reactor") +tick_time = reactor_metrics.register_distribution("tick_time") +pending_calls_metric = reactor_metrics.register_distribution("pending_calls") + + +def runUntilCurrentTimer(func): + + @functools.wraps(func) + def f(*args, **kwargs): + pending_calls = len(reactor.getDelayedCalls()) + start = time.time() * 1000 + ret = func(*args, **kwargs) + end = time.time() * 1000 + tick_time.inc_by(end - start) + pending_calls_metric.inc_by(pending_calls) + return ret + + return f + + +if hasattr(reactor, "runUntilCurrent"): + # runUntilCurrent is called when we have pending calls. It is called once + # per iteratation after fd polling. + reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index b6e00c27b5..94d7784aee 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) REQUIREMENTS = { "syutil>=0.0.7": ["syutil>=0.0.7"], - "Twisted==14.0.2": ["twisted==14.0.2"], + "Twisted>=15.1.0": ["twisted>=15.1.0"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], "pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyyaml": ["yaml"], diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 5f3a6207b5..718928eedd 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.http.servlet import RestServlet +from synapse.types import UserID from syutil.jsonutil import encode_canonical_json from ._base import client_v2_pattern @@ -164,45 +165,63 @@ class KeyQueryServlet(RestServlet): super(KeyQueryServlet, self).__init__() self.store = hs.get_datastore() self.auth = hs.get_auth() + self.federation = hs.get_replication_layer() + self.is_mine = hs.is_mine @defer.inlineCallbacks def on_POST(self, request, user_id, device_id): - logger.debug("onPOST") yield self.auth.get_user_by_req(request) try: body = json.loads(request.content.read()) except: raise SynapseError(400, "Invalid key JSON") - query = [] - for user_id, device_ids in body.get("device_keys", {}).items(): - if not device_ids: - query.append((user_id, None)) - else: - for device_id in device_ids: - query.append((user_id, device_id)) - results = yield self.store.get_e2e_device_keys(query) - defer.returnValue(self.json_result(request, results)) + result = yield self.handle_request(body) + defer.returnValue(result) @defer.inlineCallbacks def on_GET(self, request, user_id, device_id): auth_user, client_info = yield self.auth.get_user_by_req(request) auth_user_id = auth_user.to_string() - if not user_id: - user_id = auth_user_id - if not device_id: - device_id = None - # Returns a map of user_id->device_id->json_bytes. - results = yield self.store.get_e2e_device_keys([(user_id, device_id)]) - defer.returnValue(self.json_result(request, results)) - - def json_result(self, request, results): + user_id = user_id if user_id else auth_user_id + device_ids = [device_id] if device_id else [] + result = yield self.handle_request( + {"device_keys": {user_id: device_ids}} + ) + defer.returnValue(result) + + @defer.inlineCallbacks + def handle_request(self, body): + local_query = [] + remote_queries = {} + for user_id, device_ids in body.get("device_keys", {}).items(): + user = UserID.from_string(user_id) + if self.is_mine(user): + if not device_ids: + local_query.append((user_id, None)) + else: + for device_id in device_ids: + local_query.append((user_id, device_id)) + else: + remote_queries.setdefault(user.domain, {})[user_id] = list( + device_ids + ) + results = yield self.store.get_e2e_device_keys(local_query) + json_result = {} for user_id, device_keys in results.items(): for device_id, json_bytes in device_keys.items(): json_result.setdefault(user_id, {})[device_id] = json.loads( json_bytes ) - return (200, {"device_keys": json_result}) + + for destination, device_keys in remote_queries.items(): + remote_result = yield self.federation.query_client_keys( + destination, {"device_keys": device_keys} + ) + for user_id, keys in remote_result["device_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys + defer.returnValue((200, {"device_keys": json_result})) class OneTimeKeyServlet(RestServlet): @@ -236,14 +255,16 @@ class OneTimeKeyServlet(RestServlet): self.store = hs.get_datastore() self.auth = hs.get_auth() self.clock = hs.get_clock() + self.federation = hs.get_replication_layer() + self.is_mine = hs.is_mine @defer.inlineCallbacks def on_GET(self, request, user_id, device_id, algorithm): yield self.auth.get_user_by_req(request) - results = yield self.store.claim_e2e_one_time_keys( - [(user_id, device_id, algorithm)] + result = yield self.handle_request( + {"one_time_keys": {user_id: {device_id: algorithm}}} ) - defer.returnValue(self.json_result(request, results)) + defer.returnValue(result) @defer.inlineCallbacks def on_POST(self, request, user_id, device_id, algorithm): @@ -252,14 +273,24 @@ class OneTimeKeyServlet(RestServlet): body = json.loads(request.content.read()) except: raise SynapseError(400, "Invalid key JSON") - query = [] + result = yield self.handle_request(body) + defer.returnValue(result) + + @defer.inlineCallbacks + def handle_request(self, body): + local_query = [] + remote_queries = {} for user_id, device_keys in body.get("one_time_keys", {}).items(): - for device_id, algorithm in device_keys.items(): - query.append((user_id, device_id, algorithm)) - results = yield self.store.claim_e2e_one_time_keys(query) - defer.returnValue(self.json_result(request, results)) + user = UserID.from_string(user_id) + if self.is_mine(user): + for device_id, algorithm in device_keys.items(): + local_query.append((user_id, device_id, algorithm)) + else: + remote_queries.setdefault(user.domain, {})[user_id] = ( + device_keys + ) + results = yield self.store.claim_e2e_one_time_keys(local_query) - def json_result(self, request, results): json_result = {} for user_id, device_keys in results.items(): for device_id, keys in device_keys.items(): @@ -267,7 +298,16 @@ class OneTimeKeyServlet(RestServlet): json_result.setdefault(user_id, {})[device_id] = { key_id: json.loads(json_bytes) } - return (200, {"one_time_keys": json_result}) + + for destination, device_keys in remote_queries.items(): + remote_result = yield self.federation.claim_client_keys( + destination, {"one_time_keys": device_keys} + ) + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys + + defer.returnValue((200, {"one_time_keys": json_result})) def register_servlets(hs, http_server): diff --git a/synapse/state.py b/synapse/state.py index 80da90a72c..1fe4d066bd 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor -from synapse.util.expiringcache import ExpiringCache +from synapse.util.caches.expiringcache import ExpiringCache from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.auth import AuthEventTypes @@ -96,7 +96,7 @@ class StateHandler(object): cache.ts = self.clock.time_msec() state = cache.state else: - res = yield self.resolve_state_groups(event_ids) + res = yield self.resolve_state_groups(room_id, event_ids) state = res[1] if event_type: @@ -155,13 +155,13 @@ class StateHandler(object): if event.is_state(): ret = yield self.resolve_state_groups( - [e for e, _ in event.prev_events], + event.room_id, [e for e, _ in event.prev_events], event_type=event.type, state_key=event.state_key, ) else: ret = yield self.resolve_state_groups( - [e for e, _ in event.prev_events], + event.room_id, [e for e, _ in event.prev_events], ) group, curr_state, prev_state = ret @@ -180,7 +180,7 @@ class StateHandler(object): @defer.inlineCallbacks @log_function - def resolve_state_groups(self, event_ids, event_type=None, state_key=""): + def resolve_state_groups(self, room_id, event_ids, event_type=None, state_key=""): """ Given a list of event_ids this method fetches the state at each event, resolves conflicts between them and returns them. @@ -205,7 +205,7 @@ class StateHandler(object): ) state_groups = yield self.store.get_state_groups( - event_ids + room_id, event_ids ) logger.debug( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 73eea157a4..1444767a52 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,25 +15,22 @@ import logging from synapse.api.errors import StoreError -from synapse.util.async import ObservableDeferred from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext -from synapse.util.lrucache import LruCache +from synapse.util.caches.dictionary_cache import DictionaryCache +from synapse.util.caches.descriptors import Cache import synapse.metrics from util.id_generators import IdGenerator, StreamIdGenerator from twisted.internet import defer -from collections import namedtuple, OrderedDict +from collections import namedtuple -import functools -import inspect import sys import time import threading -DEBUG_CACHES = False logger = logging.getLogger(__name__) @@ -49,208 +46,6 @@ sql_scheduling_timer = metrics.register_distribution("schedule_time") sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) -caches_by_name = {} -cache_counter = metrics.register_cache( - "cache", - lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, - labels=["name"], -) - - -_CacheSentinel = object() - - -class Cache(object): - - def __init__(self, name, max_entries=1000, keylen=1, lru=True): - if lru: - self.cache = LruCache(max_size=max_entries) - self.max_entries = None - else: - self.cache = OrderedDict() - self.max_entries = max_entries - - self.name = name - self.keylen = keylen - self.sequence = 0 - self.thread = None - caches_by_name[name] = self.cache - - def check_thread(self): - expected_thread = self.thread - if expected_thread is None: - self.thread = threading.current_thread() - else: - if expected_thread is not threading.current_thread(): - raise ValueError( - "Cache objects can only be accessed from the main thread" - ) - - def get(self, key, default=_CacheSentinel): - val = self.cache.get(key, _CacheSentinel) - if val is not _CacheSentinel: - cache_counter.inc_hits(self.name) - return val - - cache_counter.inc_misses(self.name) - - if default is _CacheSentinel: - raise KeyError() - else: - return default - - def update(self, sequence, key, value): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - self.prefill(key, value) - - def prefill(self, key, value): - if self.max_entries is not None: - while len(self.cache) >= self.max_entries: - self.cache.popitem(last=False) - - self.cache[key] = value - - def invalidate(self, key): - self.check_thread() - if not isinstance(key, tuple): - raise TypeError( - "The cache key must be a tuple not %r" % (type(key),) - ) - - # Increment the sequence number so that any SELECT statements that - # raced with the INSERT don't update the cache (SYN-369) - self.sequence += 1 - self.cache.pop(key, None) - - def invalidate_all(self): - self.check_thread() - self.sequence += 1 - self.cache.clear() - - -class CacheDescriptor(object): - """ A method decorator that applies a memoizing cache around the function. - - This caches deferreds, rather than the results themselves. Deferreds that - fail are removed from the cache. - - The function is presumed to take zero or more arguments, which are used in - a tuple as the key for the cache. Hits are served directly from the cache; - misses use the function body to generate the value. - - The wrapped function has an additional member, a callable called - "invalidate". This can be used to remove individual entries from the cache. - - The wrapped function has another additional callable, called "prefill", - which can be used to insert values into the cache specifically, without - calling the calculation function. - """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=True, - inlineCallbacks=False): - self.orig = orig - - if inlineCallbacks: - self.function_to_call = defer.inlineCallbacks(orig) - else: - self.function_to_call = orig - - self.max_entries = max_entries - self.num_args = num_args - self.lru = lru - - self.arg_names = inspect.getargspec(orig).args[1:num_args+1] - - if len(self.arg_names) < self.num_args: - raise Exception( - "Not enough explicit positional arguments to key off of for %r." - " (@cached cannot key off of *args or **kwars)" - % (orig.__name__,) - ) - - self.cache = Cache( - name=self.orig.__name__, - max_entries=self.max_entries, - keylen=self.num_args, - lru=self.lru, - ) - - def __get__(self, obj, objtype=None): - - @functools.wraps(self.orig) - def wrapped(*args, **kwargs): - arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) - cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) - try: - cached_result_d = self.cache.get(cache_key) - - observer = cached_result_d.observe() - if DEBUG_CACHES: - @defer.inlineCallbacks - def check_result(cached_result): - actual_result = yield self.function_to_call(obj, *args, **kwargs) - if actual_result != cached_result: - logger.error( - "Stale cache entry %s%r: cached: %r, actual %r", - self.orig.__name__, cache_key, - cached_result, actual_result, - ) - raise ValueError("Stale cache entry") - defer.returnValue(cached_result) - observer.addCallback(check_result) - - return observer - except KeyError: - # Get the sequence number of the cache before reading from the - # database so that we can tell if the cache is invalidated - # while the SELECT is executing (SYN-369) - sequence = self.cache.sequence - - ret = defer.maybeDeferred( - self.function_to_call, - obj, *args, **kwargs - ) - - def onErr(f): - self.cache.invalidate(cache_key) - return f - - ret.addErrback(onErr) - - ret = ObservableDeferred(ret, consumeErrors=True) - self.cache.update(sequence, cache_key, ret) - - return ret.observe() - - wrapped.invalidate = self.cache.invalidate - wrapped.invalidate_all = self.cache.invalidate_all - wrapped.prefill = self.cache.prefill - - obj.__dict__[self.orig.__name__] = wrapped - - return wrapped - - -def cached(max_entries=1000, num_args=1, lru=True): - return lambda orig: CacheDescriptor( - orig, - max_entries=max_entries, - num_args=num_args, - lru=lru - ) - - -def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False): - return lambda orig: CacheDescriptor( - orig, - max_entries=max_entries, - num_args=num_args, - lru=lru, - inlineCallbacks=True, - ) - class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object @@ -372,6 +167,8 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000) + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index f3947bbe89..d92028ea43 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from synapse.api.errors import SynapseError diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 910b6598a7..25cc84eb95 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -15,7 +15,8 @@ from twisted.internet import defer -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from syutil.base64util import encode_base64 import logging diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 49b8e37cfd..ffd6daa880 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from _base import SQLBaseStore, cachedInlineCallbacks +from _base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 576cf670cc..9b136f3119 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -13,19 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached, cachedList from twisted.internet import defer class PresenceStore(SQLBaseStore): def create_presence(self, user_localpart): - return self._simple_insert( + res = self._simple_insert( table="presence", values={"user_id": user_localpart}, desc="create_presence", ) + self.get_presence_state.invalidate((user_localpart,)) + return res + def has_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -35,6 +39,7 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) + @cached() def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", @@ -43,8 +48,27 @@ class PresenceStore(SQLBaseStore): desc="get_presence_state", ) + @cachedList(get_presence_state.cache, list_name="user_localparts") + def get_presence_states(self, user_localparts): + def f(txn): + results = {} + for user_localpart in user_localparts: + res = self._simple_select_one_txn( + txn, + table="presence", + keyvalues={"user_id": user_localpart}, + retcols=["state", "status_msg", "mtime"], + allow_none=True, + ) + if res: + results[user_localpart] = res + + return results + + return self.runInteraction("get_presence_states", f) + def set_presence_state(self, user_localpart, new_state): - return self._simple_update_one( + res = self._simple_update_one( table="presence", keyvalues={"user_id": user_localpart}, updatevalues={"state": new_state["state"], @@ -53,6 +77,9 @@ class PresenceStore(SQLBaseStore): desc="set_presence_state", ) + self.get_presence_state.invalidate((user_localpart,)) + return res + def allow_presence_visible(self, observed_localpart, observer_userid): return self._simple_insert( table="presence_allow_inbound", diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 9b88ca7b39..5305b7e122 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cachedInlineCallbacks +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer import logging diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index b79d6683ca..cac1a5657e 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cachedInlineCallbacks +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from twisted.internet import defer diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d2d5b07cb3..bf803f2c6e 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -17,7 +17,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError, Codes -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached class RegistrationStore(SQLBaseStore): diff --git a/synapse/storage/room.py b/synapse/storage/room.py index dd5bc2c8fb..5e07b7e0e5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -17,7 +17,8 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from ._base import SQLBaseStore, cachedInlineCallbacks +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks import collections import logging diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 9f14f38f24..8eee2dfbcc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -17,7 +17,8 @@ from twisted.internet import defer from collections import namedtuple -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from synapse.api.constants import Membership from synapse.types import UserID diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7ce51b9bdc..ab3ad5a076 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cachedInlineCallbacks +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import ( + cached, cachedInlineCallbacks, cachedList +) from twisted.internet import defer @@ -44,59 +47,25 @@ class StateStore(SQLBaseStore): """ @defer.inlineCallbacks - def get_state_groups(self, event_ids): + def get_state_groups(self, room_id, event_ids): """ Get the state groups for the given list of event_ids The return value is a dict mapping group names to lists of events. """ + if not event_ids: + defer.returnValue({}) - def f(txn): - groups = set() - for event_id in event_ids: - group = self._simple_select_one_onecol_txn( - txn, - table="event_to_state_groups", - keyvalues={"event_id": event_id}, - retcol="state_group", - allow_none=True, - ) - if group: - groups.add(group) - - res = {} - for group in groups: - state_ids = self._simple_select_onecol_txn( - txn, - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", - ) - - res[group] = state_ids - - return res - - states = yield self.runInteraction( - "get_state_groups", - f, + event_to_groups = yield self._get_state_group_for_events( + room_id, event_ids, ) - state_list = yield defer.gatherResults( - [ - self._fetch_events_for_group(group, vals) - for group, vals in states.items() - ], - consumeErrors=True, - ) + groups = set(event_to_groups.values()) + group_to_state = yield self._get_state_for_groups(groups) - defer.returnValue(dict(state_list)) - - def _fetch_events_for_group(self, key, events): - return self._get_events( - events, get_prev_content=False - ).addCallback( - lambda evs: (key, evs) - ) + defer.returnValue({ + group: state_map.values() + for group, state_map in group_to_state.items() + }) def _store_state_groups_txn(self, txn, event, context): return self._store_mult_state_groups_txn(txn, [(event, context)]) @@ -204,64 +173,251 @@ class StateStore(SQLBaseStore): events = yield self._get_events(event_ids, get_prev_content=False) defer.returnValue(events) + def _get_state_groups_from_groups(self, groups_and_types): + """Returns dictionary state_group -> state event ids + + Args: + groups_and_types (list): list of 2-tuple (`group`, `types`) + """ + def f(txn): + results = {} + for group, types in groups_and_types: + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + + sql = ( + "SELECT event_id FROM state_groups_state WHERE" + " state_group = ? %s" + ) % (where_clause,) + + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + + results[group] = [r[0] for r in txn.fetchall()] + + return results + + return self.runInteraction( + "_get_state_groups_from_groups", + f, + ) + @defer.inlineCallbacks - def get_state_for_events(self, room_id, event_ids): + def get_state_for_events(self, room_id, event_ids, types): + """Given a list of event_ids and type tuples, return a list of state + dicts for each event. The state dicts will only have the type/state_keys + that are in the `types` list. + + Args: + room_id (str) + event_ids (list) + types (list): List of (type, state_key) tuples which are used to + filter the state fetched. `state_key` may be None, which matches + any `state_key` + + Returns: + deferred: A list of dicts corresponding to the event_ids given. + The dicts are mappings from (type, state_key) -> state_events + """ + event_to_groups = yield self._get_state_group_for_events( + room_id, event_ids, + ) + + groups = set(event_to_groups.values()) + group_to_state = yield self._get_state_for_groups(groups, types) + + event_to_state = { + event_id: group_to_state[group] + for event_id, group in event_to_groups.items() + } + + defer.returnValue({event: event_to_state[event] for event in event_ids}) + + @cached(num_args=2, lru=True, max_entries=10000) + def _get_state_group_for_event(self, room_id, event_id): + return self._simple_select_one_onecol( + table="event_to_state_groups", + keyvalues={ + "event_id": event_id, + }, + retcol="state_group", + allow_none=True, + desc="_get_state_group_for_event", + ) + + @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", + num_args=2) + def _get_state_group_for_events(self, room_id, event_ids): + """Returns mapping event_id -> state_group + """ def f(txn): - groups = set() - event_to_group = {} + results = {} for event_id in event_ids: - # TODO: Remove this loop. - group = self._simple_select_one_onecol_txn( + results[event_id] = self._simple_select_one_onecol_txn( txn, table="event_to_state_groups", - keyvalues={"event_id": event_id}, + keyvalues={ + "event_id": event_id, + }, retcol="state_group", allow_none=True, ) - if group: - event_to_group[event_id] = group - groups.add(group) - group_to_state_ids = {} - for group in groups: - state_ids = self._simple_select_onecol_txn( - txn, - table="state_groups_state", - keyvalues={"state_group": group}, - retcol="event_id", + return results + + return self.runInteraction("_get_state_group_for_events", f) + + def _get_some_state_from_cache(self, group, types): + """Checks if group is in cache. See `_get_state_for_groups` + + Returns 3-tuple (`state_dict`, `missing_types`, `got_all`). + `missing_types` is the list of types that aren't in the cache for that + group. `got_all` is a bool indicating if we successfully retrieved all + requests state from the cache, if False we need to query the DB for the + missing state. + + Args: + group: The state group to lookup + types (list): List of 2-tuples of the form (`type`, `state_key`), + where a `state_key` of `None` matches all state_keys for the + `type`. + """ + is_all, state_dict = self._state_group_cache.get(group) + + type_to_key = {} + missing_types = set() + for typ, state_key in types: + if state_key is None: + type_to_key[typ] = None + missing_types.add((typ, state_key)) + else: + if type_to_key.get(typ, object()) is not None: + type_to_key.setdefault(typ, set()).add(state_key) + + if (typ, state_key) not in state_dict: + missing_types.add((typ, state_key)) + + sentinel = object() + + def include(typ, state_key): + valid_state_keys = type_to_key.get(typ, sentinel) + if valid_state_keys is sentinel: + return False + if valid_state_keys is None: + return True + if state_key in valid_state_keys: + return True + return False + + got_all = not (missing_types or types is None) + + return { + k: v for k, v in state_dict.items() + if include(k[0], k[1]) + }, missing_types, got_all + + def _get_all_state_from_cache(self, group): + """Checks if group is in cache. See `_get_state_for_groups` + + Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool + indicating if we successfully retrieved all requests state from the + cache, if False we need to query the DB for the missing state. + + Args: + group: The state group to lookup + """ + is_all, state_dict = self._state_group_cache.get(group) + return state_dict, is_all + + @defer.inlineCallbacks + def _get_state_for_groups(self, groups, types=None): + """Given list of groups returns dict of group -> list of state events + with matching types. `types` is a list of `(type, state_key)`, where + a `state_key` of None matches all state_keys. If `types` is None then + all events are returned. + """ + results = {} + missing_groups_and_types = [] + if types is not None: + for group in set(groups): + state_dict, missing_types, got_all = self._get_some_state_from_cache( + group, types + ) + results[group] = state_dict + + if not got_all: + missing_groups_and_types.append((group, missing_types)) + else: + for group in set(groups): + state_dict, got_all = self._get_all_state_from_cache( + group ) + results[group] = state_dict - group_to_state_ids[group] = state_ids + if not got_all: + missing_groups_and_types.append((group, None)) - return event_to_group, group_to_state_ids + if not missing_groups_and_types: + defer.returnValue({ + group: { + type_tuple: event + for type_tuple, event in state.items() + if event + } + for group, state in results.items() + }) - res = yield self.runInteraction( - "annotate_events_with_state_groups", - f, - ) + # Okay, so we have some missing_types, lets fetch them. + cache_seq_num = self._state_group_cache.sequence - event_to_group, group_to_state_ids = res + group_state_dict = yield self._get_state_groups_from_groups( + missing_groups_and_types + ) - state_list = yield defer.gatherResults( - [ - self._fetch_events_for_group(group, vals) - for group, vals in group_to_state_ids.items() - ], - consumeErrors=True, + state_events = yield self._get_events( + [e_id for l in group_state_dict.values() for e_id in l], + get_prev_content=False ) - state_dict = { - group: { - (ev.type, ev.state_key): ev - for ev in state + state_events = {e.event_id: e for e in state_events} + + # Now we want to update the cache with all the things we fetched + # from the database. + for group, state_ids in group_state_dict.items(): + if types: + # We delibrately put key -> None mappings into the cache to + # cache absence of the key, on the assumption that if we've + # explicitly asked for some types then we will probably ask + # for them again. + state_dict = {key: None for key in types} + state_dict.update(results[group]) + else: + state_dict = results[group] + + for event_id in state_ids: + state_event = state_events[event_id] + state_dict[(state_event.type, state_event.state_key)] = state_event + + self._state_group_cache.update( + cache_seq_num, + key=group, + value=state_dict, + full=(types is None), + ) + + # We replace here to remove all the entries with None values. + results[group] = { + key: value for key, value in state_dict.items() if value } - for group, state in state_list - } - defer.returnValue([ - state_dict.get(event_to_group.get(event, None), None) - for event in event_ids - ]) + defer.returnValue(results) def _make_group_id(clock): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index af45fc5619..d7fe423f5a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -36,6 +36,7 @@ what sort order was used: from twisted.internet import defer from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logutils import log_function @@ -299,9 +300,8 @@ class StreamStore(SQLBaseStore): defer.returnValue((events, token)) - @defer.inlineCallbacks - def get_recent_events_for_room(self, room_id, limit, end_token, - with_feedback=False, from_token=None): + @cachedInlineCallbacks(num_args=4) + def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): # TODO (erikj): Handle compressed feedback end_token = RoomStreamToken.parse_stream_token(end_token) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 624da4a9dc..c8c7e6591a 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore, cached +from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached from collections import namedtuple diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py new file mode 100644 index 0000000000..da0e06a468 --- /dev/null +++ b/synapse/util/caches/__init__.py @@ -0,0 +1,27 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse.metrics + +DEBUG_CACHES = False + +metrics = synapse.metrics.get_metrics_for("synapse.util.caches") + +caches_by_name = {} +cache_counter = metrics.register_cache( + "cache", + lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, + labels=["name"], +) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py new file mode 100644 index 0000000000..362944bc51 --- /dev/null +++ b/synapse/util/caches/descriptors.py @@ -0,0 +1,377 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from synapse.util.async import ObservableDeferred +from synapse.util import unwrapFirstError +from synapse.util.caches.lrucache import LruCache + +from . import caches_by_name, DEBUG_CACHES, cache_counter + +from twisted.internet import defer + +from collections import OrderedDict + +import functools +import inspect +import threading + +logger = logging.getLogger(__name__) + + +_CacheSentinel = object() + + +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1, lru=True): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries + + self.name = name + self.keylen = keylen + self.sequence = 0 + self.thread = None + caches_by_name[name] = self.cache + + def check_thread(self): + expected_thread = self.thread + if expected_thread is None: + self.thread = threading.current_thread() + else: + if expected_thread is not threading.current_thread(): + raise ValueError( + "Cache objects can only be accessed from the main thread" + ) + + def get(self, key, default=_CacheSentinel): + val = self.cache.get(key, _CacheSentinel) + if val is not _CacheSentinel: + cache_counter.inc_hits(self.name) + return val + + cache_counter.inc_misses(self.name) + + if default is _CacheSentinel: + raise KeyError() + else: + return default + + def update(self, sequence, key, value): + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + self.prefill(key, value) + + def prefill(self, key, value): + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) + + self.cache[key] = value + + def invalidate(self, key): + self.check_thread() + if not isinstance(key, tuple): + raise TypeError( + "The cache key must be a tuple not %r" % (type(key),) + ) + + # Increment the sequence number so that any SELECT statements that + # raced with the INSERT don't update the cache (SYN-369) + self.sequence += 1 + self.cache.pop(key, None) + + def invalidate_all(self): + self.check_thread() + self.sequence += 1 + self.cache.clear() + + +class CacheDescriptor(object): + """ A method decorator that applies a memoizing cache around the function. + + This caches deferreds, rather than the results themselves. Deferreds that + fail are removed from the cache. + + The function is presumed to take zero or more arguments, which are used in + a tuple as the key for the cache. Hits are served directly from the cache; + misses use the function body to generate the value. + + The wrapped function has an additional member, a callable called + "invalidate". This can be used to remove individual entries from the cache. + + The wrapped function has another additional callable, called "prefill", + which can be used to insert values into the cache specifically, without + calling the calculation function. + """ + def __init__(self, orig, max_entries=1000, num_args=1, lru=True, + inlineCallbacks=False): + self.orig = orig + + if inlineCallbacks: + self.function_to_call = defer.inlineCallbacks(orig) + else: + self.function_to_call = orig + + self.max_entries = max_entries + self.num_args = num_args + self.lru = lru + + self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + + if len(self.arg_names) < self.num_args: + raise Exception( + "Not enough explicit positional arguments to key off of for %r." + " (@cached cannot key off of *args or **kwars)" + % (orig.__name__,) + ) + + self.cache = Cache( + name=self.orig.__name__, + max_entries=self.max_entries, + keylen=self.num_args, + lru=self.lru, + ) + + def __get__(self, obj, objtype=None): + + @functools.wraps(self.orig) + def wrapped(*args, **kwargs): + arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) + cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) + try: + cached_result_d = self.cache.get(cache_key) + + observer = cached_result_d.observe() + if DEBUG_CACHES: + @defer.inlineCallbacks + def check_result(cached_result): + actual_result = yield self.function_to_call(obj, *args, **kwargs) + if actual_result != cached_result: + logger.error( + "Stale cache entry %s%r: cached: %r, actual %r", + self.orig.__name__, cache_key, + cached_result, actual_result, + ) + raise ValueError("Stale cache entry") + defer.returnValue(cached_result) + observer.addCallback(check_result) + + return observer + except KeyError: + # Get the sequence number of the cache before reading from the + # database so that we can tell if the cache is invalidated + # while the SELECT is executing (SYN-369) + sequence = self.cache.sequence + + ret = defer.maybeDeferred( + self.function_to_call, + obj, *args, **kwargs + ) + + def onErr(f): + self.cache.invalidate(cache_key) + return f + + ret.addErrback(onErr) + + ret = ObservableDeferred(ret, consumeErrors=True) + self.cache.update(sequence, cache_key, ret) + + return ret.observe() + + wrapped.invalidate = self.cache.invalidate + wrapped.invalidate_all = self.cache.invalidate_all + wrapped.prefill = self.cache.prefill + + obj.__dict__[self.orig.__name__] = wrapped + + return wrapped + + +class CacheListDescriptor(object): + """Wraps an existing cache to support bulk fetching of keys. + + Given a list of keys it looks in the cache to find any hits, then passes + the list of missing keys to the wrapped fucntion. + """ + + def __init__(self, orig, cache, list_name, num_args=1, inlineCallbacks=False): + """ + Args: + orig (function) + cache (Cache) + list_name (str): Name of the argument which is the bulk lookup list + num_args (int) + inlineCallbacks (bool): Whether orig is a generator that should + be wrapped by defer.inlineCallbacks + """ + self.orig = orig + + if inlineCallbacks: + self.function_to_call = defer.inlineCallbacks(orig) + else: + self.function_to_call = orig + + self.num_args = num_args + self.list_name = list_name + + self.arg_names = inspect.getargspec(orig).args[1:num_args+1] + self.list_pos = self.arg_names.index(self.list_name) + + self.cache = cache + + self.sentinel = object() + + if len(self.arg_names) < self.num_args: + raise Exception( + "Not enough explicit positional arguments to key off of for %r." + " (@cached cannot key off of *args or **kwars)" + % (orig.__name__,) + ) + + if self.list_name not in self.arg_names: + raise Exception( + "Couldn't see arguments %r for %r." + % (self.list_name, cache.name,) + ) + + def __get__(self, obj, objtype=None): + + @functools.wraps(self.orig) + def wrapped(*args, **kwargs): + arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) + keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] + list_args = arg_dict[self.list_name] + + # cached is a dict arg -> deferred, where deferred results in a + # 2-tuple (`arg`, `result`) + cached = {} + missing = [] + for arg in list_args: + key = list(keyargs) + key[self.list_pos] = arg + + try: + res = self.cache.get(tuple(key)).observe() + res.addCallback(lambda r, arg: (arg, r), arg) + cached[arg] = res + except KeyError: + missing.append(arg) + + if missing: + sequence = self.cache.sequence + args_to_call = dict(arg_dict) + args_to_call[self.list_name] = missing + + ret_d = defer.maybeDeferred( + self.function_to_call, + **args_to_call + ) + + ret_d = ObservableDeferred(ret_d) + + # We need to create deferreds for each arg in the list so that + # we can insert the new deferred into the cache. + for arg in missing: + observer = ret_d.observe() + observer.addCallback(lambda r, arg: r.get(arg, None), arg) + + observer = ObservableDeferred(observer) + + key = list(keyargs) + key[self.list_pos] = arg + self.cache.update(sequence, tuple(key), observer) + + def invalidate(f, key): + self.cache.invalidate(key) + return f + observer.addErrback(invalidate, tuple(key)) + + res = observer.observe() + res.addCallback(lambda r, arg: (arg, r), arg) + + cached[arg] = res + + return defer.gatherResults( + cached.values(), + consumeErrors=True, + ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)) + + obj.__dict__[self.orig.__name__] = wrapped + + return wrapped + + +def cached(max_entries=1000, num_args=1, lru=True): + return lambda orig: CacheDescriptor( + orig, + max_entries=max_entries, + num_args=num_args, + lru=lru + ) + + +def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False): + return lambda orig: CacheDescriptor( + orig, + max_entries=max_entries, + num_args=num_args, + lru=lru, + inlineCallbacks=True, + ) + + +def cachedList(cache, list_name, num_args=1, inlineCallbacks=False): + """Creates a descriptor that wraps a function in a `CacheListDescriptor`. + + Used to do batch lookups for an already created cache. A single argument + is specified as a list that is iterated through to lookup keys in the + original cache. A new list consisting of the keys that weren't in the cache + get passed to the original function, the result of which is stored in the + cache. + + Args: + cache (Cache): The underlying cache to use. + list_name (str): The name of the argument that is the list to use to + do batch lookups in the cache. + num_args (int): Number of arguments to use as the key in the cache. + inlineCallbacks (bool): Should the function be wrapped in an + `defer.inlineCallbacks`? + + Example: + + class Example(object): + @cached(num_args=2) + def do_something(self, first_arg): + ... + + @cachedList(do_something.cache, list_name="second_args", num_args=2) + def batch_do_something(self, first_arg, second_args): + ... + """ + return lambda orig: CacheListDescriptor( + orig, + cache=cache, + list_name=list_name, + num_args=num_args, + inlineCallbacks=inlineCallbacks, + ) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py new file mode 100644 index 0000000000..e69adf62fe --- /dev/null +++ b/synapse/util/caches/dictionary_cache.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.util.caches.lrucache import LruCache +from collections import namedtuple +from . import caches_by_name, cache_counter +import threading +import logging + + +logger = logging.getLogger(__name__) + + +DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value")) + + +class DictionaryCache(object): + """Caches key -> dictionary lookups, supporting caching partial dicts, i.e. + fetching a subset of dictionary keys for a particular key. + """ + + def __init__(self, name, max_entries=1000): + self.cache = LruCache(max_size=max_entries) + + self.name = name + self.sequence = 0 + self.thread = None + # caches_by_name[name] = self.cache + + class Sentinel(object): + __slots__ = [] + + self.sentinel = Sentinel() + caches_by_name[name] = self.cache + + def check_thread(self): + expected_thread = self.thread + if expected_thread is None: + self.thread = threading.current_thread() + else: + if expected_thread is not threading.current_thread(): + raise ValueError( + "Cache objects can only be accessed from the main thread" + ) + + def get(self, key, dict_keys=None): + entry = self.cache.get(key, self.sentinel) + if entry is not self.sentinel: + cache_counter.inc_hits(self.name) + + if dict_keys is None: + return DictionaryEntry(entry.full, dict(entry.value)) + else: + return DictionaryEntry(entry.full, { + k: entry.value[k] + for k in dict_keys + if k in entry.value + }) + + cache_counter.inc_misses(self.name) + return DictionaryEntry(False, {}) + + def invalidate(self, key): + self.check_thread() + + # Increment the sequence number so that any SELECT statements that + # raced with the INSERT don't update the cache (SYN-369) + self.sequence += 1 + self.cache.pop(key, None) + + def invalidate_all(self): + self.check_thread() + self.sequence += 1 + self.cache.clear() + + def update(self, sequence, key, value, full=False): + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + if full: + self._insert(key, value) + else: + self._update_or_insert(key, value) + + def _update_or_insert(self, key, value): + entry = self.cache.setdefault(key, DictionaryEntry(False, {})) + entry.value.update(value) + + def _insert(self, key, value): + self.cache[key] = DictionaryEntry(True, value) diff --git a/synapse/util/expiringcache.py b/synapse/util/caches/expiringcache.py index 06d1eea01b..06d1eea01b 100644 --- a/synapse/util/expiringcache.py +++ b/synapse/util/caches/expiringcache.py diff --git a/synapse/util/lrucache.py b/synapse/util/caches/lrucache.py index cacd7e45fa..cacd7e45fa 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/caches/lrucache.py |