diff options
39 files changed, 668 insertions, 147 deletions
diff --git a/UPGRADE.rst b/UPGRADE.rst index 9f044719a0..6164df8833 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -28,6 +28,15 @@ running: git pull # Update the versions of synapse's python dependencies. python synapse/python_dependencies.py | xargs -n1 pip install --upgrade + +To check whether your update was sucessfull, run: + +.. code:: bash + + # replace your.server.domain with ther domain of your synaspe homeserver + curl https://<your.server.domain>/_matrix/federation/v1/version + +So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version. Upgrading to v0.15.0 diff --git a/synapse/config/server.py b/synapse/config/server.py index 25e6666238..3910b9dc31 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -35,6 +35,8 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + self.filter_timeline_limit = config.get("filter_timeline_limit", -1) + if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': self.public_baseurl += '/' @@ -161,6 +163,10 @@ class ServerConfig(Config): # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] + # Set the limit on the returned events in the timeline in the get + # and sync operations. The default value is -1, means no upper limit. + # filter_timeline_limit: 5000 + # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bc20b9c201..51e3fdea06 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -440,6 +440,16 @@ class FederationServer(FederationBase): key_id: json.loads(json_bytes) } + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({"one_time_keys": json_result}) @defer.inlineCallbacks diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 695f1a7375..a15198e05d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -285,7 +285,7 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ - hosts_and_states = yield get_interested_remotes(self.store, states) + hosts_and_states = yield get_interested_remotes(self.store, states, self.state) for destinations, states in hosts_and_states: for destination in destinations: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c840da834c..3d676e7d8b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -24,6 +24,7 @@ from synapse.http.servlet import ( ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string +from synapse.util.logcontext import preserve_fn from synapse.types import ThirdPartyInstanceID import functools @@ -79,6 +80,7 @@ class Authenticator(object): def __init__(self, hs): self.keyring = hs.get_keyring() self.server_name = hs.hostname + self.store = hs.get_datastore() # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks @@ -138,6 +140,13 @@ class Authenticator(object): logger.info("Request from %s", origin) request.authenticated_entity = origin + # If we get a valid signed request from the other side, its probably + # alive + retry_timings = yield self.store.get_destination_retry_timings(origin) + if retry_timings and retry_timings["retry_last_ts"]: + logger.info("Marking origin %r as up", origin) + preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) + defer.returnValue(origin) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e83adc8339..faa5609c0c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,7 +53,20 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() - def ratelimit(self, requester): + @defer.inlineCallbacks + def ratelimit(self, requester, update=True): + """Ratelimits requests. + + Args: + requester (Requester) + update (bool): Whether to record that a request is being processed. + Set to False when doing multiple checks for one request (e.g. + to check up front if we would reject the request), and set to + True for the last call for a given request. + + Raises: + LimitExceededError if the request should be ratelimited + """ time_now = self.clock.time() user_id = requester.user.to_string() @@ -67,10 +80,25 @@ class BaseHandler(object): if requester.app_service and not requester.app_service.is_rate_limited(): return + # Check if there is a per user override in the DB. + override = yield self.store.get_ratelimit_for_user(user_id) + if override: + # If overriden with a null Hz then ratelimiting has been entirely + # disabled for the user + if not override.messages_per_second: + return + + messages_per_second = override.messages_per_second + burst_count = override.burst_count + else: + messages_per_second = self.hs.config.rc_messages_per_second + burst_count = self.hs.config.rc_message_burst_count + allowed, time_allowed = self.ratelimiter.send_message( user_id, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, + msg_rate_hz=messages_per_second, + burst_count=burst_count, + update=update, ) if not allowed: raise LimitExceededError( diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c22f65ce5d..982cda3edf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -425,12 +426,38 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + # Given a list of updates we check if we need to resync. This + # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) - result = yield self.federation.query_user_devices(origin, user_id) + try: + result = yield self.federation.query_user_devices(origin, user_id) + except NotRetryingDestination: + # TODO: Remember that we are now out of sync and try again + # later + logger.warn( + "Failed to handle device list update for %s," + " we're not retrying the remote", + user_id, + ) + # We abort on exceptions rather than accepting the update + # as otherwise synapse will 'forget' that its device list + # is out of date. If we bail then we will retry the resync + # next time we get a device list update for this user_id. + # This makes it more likely that the device lists will + # eventually become consistent. + return + except Exception: + # TODO: Remember that we are now out of sync and try again + # later + logger.exception( + "Failed to handle device list update for %s", user_id + ) + return + stream_id = result["stream_id"] devices = result["devices"] yield self.store.update_remote_device_list_cache( diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index c2b38d72a9..668a90e495 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -145,7 +145,7 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) for destination in remote_queries_not_in_cache ])) @@ -257,11 +257,21 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) for destination in remote_queries ])) + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({ "one_time_keys": json_result, "failures": failures @@ -288,19 +298,8 @@ class E2eKeysHandler(object): one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list + yield self._upload_one_time_keys_for_user( + user_id, device_id, time_now, one_time_keys, ) # the device should have been registered already, but it may have been @@ -313,3 +312,58 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue({"one_time_key_counts": result}) + + @defer.inlineCallbacks + def _upload_one_time_keys_for_user(self, user_id, device_id, time_now, + one_time_keys): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), device_id, user_id, time_now, + ) + + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, key_obj + )) + + # First we check if we have already persisted any of the keys. + existing_key_map = yield self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) + + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ("One time key %s:%s already exists. " + "Old key: %s; new key: %r") % + (algorithm, key_id, ex_json, key) + ) + else: + new_keys.append((algorithm, key_id, encode_canonical_json(key))) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) + + +def _one_time_keys_match(old_key_json, new_key): + old_key = json.loads(old_key_json) + + # if either is a string rather than an object, they must match exactly + if not isinstance(old_key, dict) or not isinstance(new_key, dict): + return old_key == new_key + + # otherwise, we strip off the 'signatures' if any, because it's legitimate + # for different upload attempts to have different signatures. + old_key.pop("signatures", None) + new_key_copy = dict(new_key) + new_key_copy.pop("signatures", None) + + return old_key == new_key_copy diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2af9849ed0..52d97dfbf3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -380,13 +380,6 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - # if we're receiving valid events from an origin, - # it's probably a good idea to mark it as not in retry-state - # for sending (although this is a bit of a leap) - retry_timings = yield self.store.get_destination_retry_timings(origin) - if retry_timings and retry_timings["retry_last_ts"]: - self.store.set_destination_retry_timings(origin, 0, 0) - room = yield self.store.get_room(event.room_id) if not room: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 57265c6d7d..196925edad 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError +from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -254,17 +254,7 @@ class MessageHandler(BaseHandler): # We check here if we are currently being rate limited, so that we # don't do unnecessary work. We check again just before we actually # send the event. - time_now = self.clock.time() - allowed, time_allowed = self.ratelimiter.send_message( - event.sender, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, - update=False, - ) - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now)), - ) + yield self.ratelimit(requester, update=False) user = UserID.from_string(event.sender) @@ -499,7 +489,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - self.ratelimit(requester) + yield self.ratelimit(requester) try: yield self.auth.check_from_context(event, context) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f3707afcd0..c7c0b0a1e2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -780,12 +780,12 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) self._push_to_remotes([state]) else: + user_ids = yield self.store.get_users_in_room(room_id) user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) @@ -1322,7 +1322,7 @@ def get_interested_parties(store, states): @defer.inlineCallbacks -def get_interested_remotes(store, states): +def get_interested_remotes(store, states, state_handler): """Given a list of presence states figure out which remote servers should be sent which. @@ -1345,7 +1345,7 @@ def get_interested_remotes(store, states): room_ids_to_states, users_to_states = yield get_interested_parties(store, states) for room_id, states in room_ids_to_states.iteritems(): - hosts = yield store.get_hosts_in_room(room_id) + hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.iteritems(): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9bf638f818..7abee98dea 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): return - self.ratelimit(requester) + yield self.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 03c6a85fc6..ee3a2269a8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -54,6 +54,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if not localpart: + raise SynapseError( + 400, + "User ID cannot be empty", + Codes.INVALID_USERNAME + ) + if localpart[0] == '_': raise SynapseError( 400, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99cb7db0db..d2a0d6520a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - self.ratelimit(requester) + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ab87632d99..1ca88517a2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -739,10 +739,11 @@ class RoomMemberHandler(BaseHandler): if len(current_state_ids) == 1 and create_event_id: defer.returnValue(self.hs.is_mine_id(create_event_id)) - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue + event_id = current_state_ids[(etype, state_key)] event = yield self.store.get_event(event_id, allow_none=True) if not event: continue diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index cb13874ccf..f943ff640f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -20,6 +20,7 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes +from synapse.visibility import filter_events_for_clients_context logger = logging.getLogger(__name__) @@ -66,6 +67,17 @@ class BulkPushRuleEvaluator: def action_for_event_by_user(self, event, context): actions_by_user = {} + # None of these users can be peeking since this list of users comes + # from the set of users in the room, so we know for sure they're all + # actually in the room. + user_tuples = [ + (u, False) for u in self.rules_by_user.keys() + ] + + filtered_by_user = yield filter_events_for_clients_context( + self.store, user_tuples, [event], {event.event_id: context} + ) + room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -75,14 +87,6 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): - if event.sender == uid: - continue - - if not event.is_state(): - is_ignored = yield self.store.is_ignored_by(event.sender, uid) - if is_ignored: - continue - display_name = None profile_info = room_members.get(uid) if profile_info: @@ -94,6 +98,13 @@ class BulkPushRuleEvaluator: if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) + filtered = filtered_by_user[uid] + if len(filtered) == 0: + continue + + if filtered[0].sender == uid: + continue + for rule in rules: if 'enabled' in rule and not rule['enabled']: continue diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 20e765f48f..1f5bc24cc3 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -47,3 +47,13 @@ def client_v2_patterns(path_regex, releases=(0,), new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns + + +def set_timeline_upper_limit(filter_json, filter_timeline_limit): + if filter_timeline_limit < 0: + return # no upper limits + timeline = filter_json.get('room', {}).get('timeline', {}) + if 'limit' in timeline: + filter_json['room']['timeline']["limit"] = min( + filter_json['room']['timeline']['limit'], + filter_timeline_limit) diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index b4084fec62..d2b2fd66e6 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -20,6 +20,7 @@ from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID from ._base import client_v2_patterns +from ._base import set_timeline_upper_limit import logging @@ -85,6 +86,11 @@ class CreateFilterRestServlet(RestServlet): raise AuthError(403, "Can only create filters for local users") content = parse_json_object_from_request(request) + set_timeline_upper_limit( + content, + self.hs.config.filter_timeline_limit + ) + filter_id = yield self.filtering.add_user_filter( user_localpart=target_user.localpart, user_filter=content, diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 38a739f2f8..1421c18152 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -21,7 +21,7 @@ from synapse.api.auth import get_access_token_from_request, has_access_token from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError from synapse.http.servlet import ( - RestServlet, parse_json_object_from_request, assert_params_in_request + RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string ) from synapse.util.msisdn import phone_number_to_msisdn @@ -147,10 +147,9 @@ class UsernameAvailabilityRestServlet(RestServlet): with self.ratelimiter.ratelimit(ip) as wait_deferred: yield wait_deferred - body = parse_json_object_from_request(request) - assert_params_in_request(body, ['username']) + username = parse_string(request, "username", required=True) - yield self.registration_handler.check_username(body['username']) + yield self.registration_handler.check_username(username) defer.returnValue((200, {"available": True})) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index f30eab76fd..771e127ab9 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -28,6 +28,7 @@ from synapse.api.filtering import FilterCollection, DEFAULT_FILTER_COLLECTION from synapse.api.errors import SynapseError from synapse.api.constants import PresenceState from ._base import client_v2_patterns +from ._base import set_timeline_upper_limit import itertools import logging @@ -78,6 +79,7 @@ class SyncRestServlet(RestServlet): def __init__(self, hs): super(SyncRestServlet, self).__init__() + self.hs = hs self.auth = hs.get_auth() self.sync_handler = hs.get_sync_handler() self.clock = hs.get_clock() @@ -121,6 +123,8 @@ class SyncRestServlet(RestServlet): if filter_id.startswith('{'): try: filter_object = json.loads(filter_id) + set_timeline_upper_limit(filter_object, + self.hs.config.filter_timeline_limit) except: raise SynapseError(400, "Invalid filter JSON") self.filtering.check_valid_filter(filter_object) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index c43b185e08..caca96c222 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \ from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.retryutils import NotRetryingDestination import os import errno @@ -181,7 +182,8 @@ class MediaRepository(object): logger.exception("Failed to fetch remote media %s/%s", server_name, media_id) raise - + except NotRetryingDestination: + logger.warn("Not retrying destination %r", server_name) except Exception: logger.exception("Failed to fetch remote media %s/%s", server_name, media_id) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d..58b73af7d2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index ff14e54c11..aa84ffc2b0 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,16 +308,3 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) - - @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) - def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): - ignored_account_data = yield self.get_global_account_data_by_type_for_user( - "m.ignored_user_list", ignorer_user_id, - on_invalidate=cache_context.invalidate, - ) - if not ignored_account_data: - defer.returnValue(False) - - defer.returnValue( - ignored_user_id in ignored_account_data.get("ignored_users", {}) - ) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index d4cf0fc59b..7157fb1dfb 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_handlers[update_name] = update_handler def register_background_index_update(self, update_name, index_name, - table, columns, where_clause=None): + table, columns, where_clause=None, + unique=False, + psql_only=False): """Helper for store classes to do a background index addition To use: @@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore): index_name (str): name of index to add table (str): table to add index to columns (list[str]): columns/expressions to include in index + unique (bool): true to make a UNIQUE index + psql_only: true to only create this index on psql databases (useful + for virtual sqlite tables) """ def create_index_psql(conn): @@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore): c.execute(sql) sql = ( - "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + "CREATE %(unique)s INDEX CONCURRENTLY %(name)s" + " ON %(table)s" " (%(columns)s) %(where_clause)s" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore): # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) sql = ( - "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s" " (%(columns)s)" ) % { + "unique": "UNIQUE" if unique else "", "name": index_name, "table": table, "columns": ", ".join(columns), @@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore): if isinstance(self.database_engine, engines.PostgresEngine): runner = create_index_psql + elif psql_only: + runner = None else: runner = create_index_sqlite @defer.inlineCallbacks def updater(progress, batch_size): - logger.info("Adding index %s to %s", index_name, table) - yield self.runWithConnection(runner) + if runner is not None: + logger.info("Adding index %s to %s", index_name, table) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index b01f0046e9..747d2df622 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -33,6 +33,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): self.client_ip_last_seen = Cache( name="client_ip_last_seen", keylen=4, + max_entries=5000, ) super(ClientIpStore, self).__init__(hs) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index c8d5f5ba8b..d9936c88bb 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -18,7 +18,7 @@ import ujson as json from twisted.internet import defer from synapse.api.errors import StoreError -from ._base import SQLBaseStore +from ._base import SQLBaseStore, Cache from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks @@ -29,6 +29,14 @@ class DeviceStore(SQLBaseStore): def __init__(self, hs): super(DeviceStore, self).__init__(hs) + # Map of (user_id, device_id) -> bool. If there is an entry that implies + # the device exists. + self.device_id_exists_cache = Cache( + name="device_id_exists", + keylen=2, + max_entries=10000, + ) + self._clock.looping_call( self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) @@ -54,6 +62,10 @@ class DeviceStore(SQLBaseStore): defer.Deferred: boolean whether the device was inserted or an existing device existed with that ID. """ + key = (user_id, device_id) + if self.device_id_exists_cache.get(key, None): + defer.returnValue(False) + try: inserted = yield self._simple_insert( "devices", @@ -65,6 +77,7 @@ class DeviceStore(SQLBaseStore): desc="store_device", or_ignore=True, ) + self.device_id_exists_cache.prefill(key, True) defer.returnValue(inserted) except Exception as e: logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" @@ -93,6 +106,7 @@ class DeviceStore(SQLBaseStore): desc="get_device", ) + @defer.inlineCallbacks def delete_device(self, user_id, device_id): """Delete a device. @@ -102,12 +116,15 @@ class DeviceStore(SQLBaseStore): Returns: defer.Deferred """ - return self._simple_delete_one( + yield self._simple_delete_one( table="devices", keyvalues={"user_id": user_id, "device_id": device_id}, desc="delete_device", ) + self.device_id_exists_cache.invalidate((user_id, device_id)) + + @defer.inlineCallbacks def delete_devices(self, user_id, device_ids): """Deletes several devices. @@ -117,13 +134,15 @@ class DeviceStore(SQLBaseStore): Returns: defer.Deferred """ - return self._simple_delete_many( + yield self._simple_delete_many( table="devices", column="device_id", iterable=device_ids, keyvalues={"user_id": user_id}, desc="delete_devices", ) + for device_id in device_ids: + self.device_id_exists_cache.invalidate((user_id, device_id)) def update_device(self, user_id, device_id, new_display_name=None): """Update a device. diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 7cbc1470fd..e00f31da2b 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json import ujson as json @@ -123,18 +123,24 @@ class EndToEndKeyStore(SQLBaseStore): return result @defer.inlineCallbacks - def add_e2e_one_time_keys(self, user_id, device_id, time_now, key_list): - """Insert some new one time keys for a device. + def get_e2e_one_time_keys(self, user_id, device_id, key_ids): + """Retrieve a number of one-time keys for a user - Checks if any of the keys are already inserted, if they are then check - if they match. If they don't then we raise an error. + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + key_ids(list[str]): list of key ids (excluding algorithm) to + retrieve + + Returns: + deferred resolving to Dict[(str, str), str]: map from (algorithm, + key_id) to json string for key """ - # First we check if we have already persisted any of the keys. rows = yield self._simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", - iterable=[key_id for _, key_id, _ in key_list], + iterable=key_ids, retcols=("algorithm", "key_id", "key_json",), keyvalues={ "user_id": user_id, @@ -143,20 +149,22 @@ class EndToEndKeyStore(SQLBaseStore): desc="add_e2e_one_time_keys_check", ) - existing_key_map = { + defer.returnValue({ (row["algorithm"], row["key_id"]): row["key_json"] for row in rows - } - - new_keys = [] # Keys that we need to insert - for algorithm, key_id, json_bytes in key_list: - ex_bytes = existing_key_map.get((algorithm, key_id), None) - if ex_bytes: - if json_bytes != ex_bytes: - raise SynapseError( - 400, "One time key with key_id %r already exists" % (key_id,) - ) - else: - new_keys.append((algorithm, key_id, json_bytes)) + }) + + @defer.inlineCallbacks + def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys): + """Insert some new one time keys for a device. Errors if any of the + keys already exist. + + Args: + user_id(str): id of user to get keys for + device_id(str): id of device to get keys for + time_now(long): insertion time to record (ms since epoch) + new_keys(iterable[(str, str, str)]: keys to add - each a tuple of + (algorithm, key_id, key json) + """ def _add_e2e_one_time_keys(txn): # We are protected from race between lookup and insertion due to @@ -177,10 +185,14 @@ class EndToEndKeyStore(SQLBaseStore): for algorithm, key_id, json_bytes in new_keys ], ) + txn.call_after( + self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) + ) yield self.runInteraction( "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys ) + @cached(max_entries=10000) def count_e2e_one_time_keys(self, user_id, device_id): """ Count the number of one time keys the server has for a device Returns: @@ -225,6 +237,9 @@ class EndToEndKeyStore(SQLBaseStore): ) for user_id, device_id, algorithm, key_id in delete: txn.execute(sql, (user_id, device_id, algorithm, key_id)) + txn.call_after( + self.count_e2e_one_time_keys.invalidate, (user_id, device_id,) + ) return result return self.runInteraction( "claim_e2e_one_time_keys", _claim_e2e_one_time_keys @@ -242,3 +257,4 @@ class EndToEndKeyStore(SQLBaseStore): keyvalues={"user_id": user_id, "device_id": device_id}, desc="delete_e2e_one_time_keys_by_device" ) + self.count_e2e_one_time_keys.invalidate((user_id, device_id,)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 98707d40ee..c4aeb48800 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore): where_clause="contains_url = true AND outlier = false", ) + # an event_id index on event_search is useful for the purge_history + # api. Plus it means we get to enforce some integrity with a UNIQUE + # clause + self.register_background_index_update( + "event_search_event_id_idx", + index_name="event_search_event_id_idx", + table="event_search", + columns=["event_id"], + unique=True, + psql_only=True, + ) + self._event_persist_queue = _EventPeristenceQueue() def persist_events(self, events_and_contexts, backfilled=False): @@ -387,6 +399,11 @@ class EventsStore(SQLBaseStore): event_counter.inc(event.type, origin_type, origin_entity) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to @@ -435,10 +452,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for @@ -545,7 +562,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -698,7 +715,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], @@ -1343,11 +1360,26 @@ class EventsStore(SQLBaseStore): def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) - def _get_events_from_cache(self, events, allow_rejected): + def _get_events_from_cache(self, events, allow_rejected, update_metrics=True): + """Fetch events from the caches + + Args: + events (list(str)): list of event_ids to fetch + allow_rejected (bool): Whether to teturn events that were rejected + update_metrics (bool): Whether to update the cache hit ratio metrics + + Returns: + dict of event_id -> _EventCacheEntry for each event_id in cache. If + allow_rejected is `False` then there will still be an entry but it + will be `None` + """ event_map = {} for event_id in events: - ret = self._get_event_cache.get((event_id,), None) + ret = self._get_event_cache.get( + (event_id,), None, + update_metrics=update_metrics, + ) if not ret: continue @@ -2007,6 +2039,8 @@ class EventsStore(SQLBaseStore): 400, "topological_ordering is greater than forward extremeties" ) + logger.debug("[purge] looking for events to delete") + txn.execute( "SELECT event_id, state_key FROM events" " LEFT JOIN state_events USING (room_id, event_id)" @@ -2015,9 +2049,19 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + to_delete = [ + (event_id,) for event_id, state_key in event_rows + if state_key is None and not self.hs.is_mine_id(event_id) + ] + logger.info( + "[purge] found %i events before cutoff, of which %i are remote" + " non-state events to delete", len(event_rows), len(to_delete)) + for event_id, state_key in event_rows: txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + logger.debug("[purge] Finding new backward extremities") + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -2030,6 +2074,8 @@ class EventsStore(SQLBaseStore): ) new_backwards_extrems = txn.fetchall() + logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems) + txn.execute( "DELETE FROM event_backward_extremities WHERE room_id = ?", (room_id,) @@ -2044,6 +2090,8 @@ class EventsStore(SQLBaseStore): ] ) + logger.debug("[purge] finding redundant state groups") + # Get all state groups that are only referenced by events that are # to be deleted. txn.execute( @@ -2059,15 +2107,20 @@ class EventsStore(SQLBaseStore): ) state_rows = txn.fetchall() - state_groups_to_delete = [sg for sg, in state_rows] + logger.debug("[purge] found %i redundant state groups", len(state_rows)) + + # make a set of the redundant state groups, so that we can look them up + # efficiently + state_groups_to_delete = set([sg for sg, in state_rows]) # Now we get all the state groups that rely on these state groups - new_state_edges = [] - chunks = [ - state_groups_to_delete[i:i + 100] - for i in xrange(0, len(state_groups_to_delete), 100) - ] - for chunk in chunks: + logger.debug("[purge] finding state groups which depend on redundant" + " state groups") + remaining_state_groups = [] + for i in xrange(0, len(state_rows), 100): + chunk = [sg for sg, in state_rows[i:i + 100]] + # look for state groups whose prev_state_group is one we are about + # to delete rows = self._simple_select_many_txn( txn, table="state_group_edges", @@ -2076,21 +2129,28 @@ class EventsStore(SQLBaseStore): retcols=["state_group"], keyvalues={}, ) - new_state_edges.extend(row["state_group"] for row in rows) + remaining_state_groups.extend( + row["state_group"] for row in rows + + # exclude state groups we are about to delete: no point in + # updating them + if row["state_group"] not in state_groups_to_delete + ) - # Now we turn the state groups that reference to-be-deleted state groups - # to non delta versions. - for new_state_edge in new_state_edges: + # Now we turn the state groups that reference to-be-deleted state + # groups to non delta versions. + for sg in remaining_state_groups: + logger.debug("[purge] de-delta-ing remaining state group %s", sg) curr_state = self._get_state_groups_from_groups_txn( - txn, [new_state_edge], types=None + txn, [sg], types=None ) - curr_state = curr_state[new_state_edge] + curr_state = curr_state[sg] self._simple_delete_txn( txn, table="state_groups_state", keyvalues={ - "state_group": new_state_edge, + "state_group": sg, } ) @@ -2098,7 +2158,7 @@ class EventsStore(SQLBaseStore): txn, table="state_group_edges", keyvalues={ - "state_group": new_state_edge, + "state_group": sg, } ) @@ -2107,7 +2167,7 @@ class EventsStore(SQLBaseStore): table="state_groups_state", values=[ { - "state_group": new_state_edge, + "state_group": sg, "room_id": room_id, "type": key[0], "state_key": key[1], @@ -2117,6 +2177,7 @@ class EventsStore(SQLBaseStore): ], ) + logger.debug("[purge] removing redundant state groups") txn.executemany( "DELETE FROM state_groups_state WHERE state_group = ?", state_rows @@ -2125,22 +2186,21 @@ class EventsStore(SQLBaseStore): "DELETE FROM state_groups WHERE id = ?", state_rows ) + # Delete all non-state + logger.debug("[purge] removing events from event_to_state_groups") txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", [(event_id,) for event_id, _ in event_rows] ) + logger.debug("[purge] updating room_depth") txn.execute( "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", (topological_ordering, room_id,) ) # Delete all remote non-state events - to_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is None and not self.hs.is_mine_id(event_id) - ] for table in ( "events", "event_json", @@ -2156,16 +2216,15 @@ class EventsStore(SQLBaseStore): "event_signatures", "rejections", ): + logger.debug("[purge] removing remote non-state events from %s", table) + txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), to_delete ) - txn.executemany( - "DELETE FROM events WHERE event_id = ?", - to_delete - ) # Mark all state and own events as outliers + logger.debug("[purge] marking remaining events as outliers") txn.executemany( "UPDATE events SET outlier = ?" " WHERE event_id = ?", @@ -2175,6 +2234,8 @@ class EventsStore(SQLBaseStore): ] ) + logger.info("[purge] done") + @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 353a135c4e..0a819d32c5 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.push.baserules import list_with_base_rules +from synapse.api.constants import EventTypes from twisted.internet import defer import logging @@ -184,11 +185,23 @@ class PushRuleStore(SQLBaseStore): if uid in local_users_in_room: user_ids.add(uid) + forgotten = yield self.who_forgot_in_room( + event.room_id, on_invalidate=cache_context.invalidate, + ) + + for row in forgotten: + user_id = row["user_id"] + event_id = row["event_id"] + + mem_id = current_state_ids.get((EventTypes.Member, user_id), None) + if event_id == mem_id: + user_ids.discard(user_id) + rules_by_user = yield self.bulk_get_push_rules( user_ids, on_invalidate=cache_context.invalidate, ) - rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None} + rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} defer.returnValue(rules_by_user) @@ -398,8 +411,7 @@ class PushRuleStore(SQLBaseStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, - event_stream_ordering, + "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering ) @defer.inlineCallbacks diff --git a/synapse/storage/room.py b/synapse/storage/room.py index e4c56cc175..5d543652bb 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from ._base import SQLBaseStore from .engines import PostgresEngine, Sqlite3Engine @@ -33,6 +33,11 @@ OpsLevel = collections.namedtuple( ("ban_level", "kick_level", "redact_level",) ) +RatelimitOverride = collections.namedtuple( + "RatelimitOverride", + ("messages_per_second", "burst_count",) +) + class RoomStore(SQLBaseStore): @@ -473,3 +478,32 @@ class RoomStore(SQLBaseStore): return self.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) + + @cachedInlineCallbacks(max_entries=10000) + def get_ratelimit_for_user(self, user_id): + """Check if there are any overrides for ratelimiting for the given + user + + Args: + user_id (str) + + Returns: + RatelimitOverride if there is an override, else None. If the contents + of RatelimitOverride are None or 0 then ratelimitng has been + disabled for that user entirely. + """ + row = yield self._simple_select_one( + table="ratelimit_override", + keyvalues={"user_id": user_id}, + retcols=("messages_per_second", "burst_count"), + allow_none=True, + desc="get_ratelimit_for_user", + ) + + if row: + defer.returnValue(RatelimitOverride( + messages_per_second=row["messages_per_second"], + burst_count=row["burst_count"], + )) + else: + defer.returnValue(None) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ad3c9b06d9..404f3583eb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -421,9 +421,13 @@ class RoomMemberStore(SQLBaseStore): # We check if we have any of the member event ids in the event cache # before we ask the DB + # We don't update the event cache hit ratio as it completely throws off + # the hit ratio counts. After all, we don't populate the cache if we + # miss it here event_map = self._get_events_from_cache( member_event_ids, allow_rejected=False, + update_metrics=False, ) missing_member_event_ids = [] @@ -530,7 +534,7 @@ class RoomMemberStore(SQLBaseStore): assert state_group is not None joined_hosts = set() - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype == EventTypes.Member: try: host = get_domain_from_id(state_key) @@ -541,6 +545,7 @@ class RoomMemberStore(SQLBaseStore): if host in joined_hosts: continue + event_id = current_state_ids[(etype, state_key)] event = yield self.get_event(event_id, allow_none=True) if event and event.content["membership"] == Membership.JOIN: joined_hosts.add(intern_string(host)) diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py index 784f3b348f..20ad8bd5a6 100644 --- a/synapse/storage/schema/delta/37/remove_auth_idx.py +++ b/synapse/storage/schema/delta/37/remove_auth_idx.py @@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref; -- and is used incredibly rarely. DROP INDEX IF EXISTS events_order_topo_stream_room; +-- an equivalent index to this actually gets re-created in delta 41, because it +-- turned out that deleting it wasn't a great plan :/. In any case, let's +-- delete it here, and delta 41 will create a new one with an added UNIQUE +-- constraint DROP INDEX IF EXISTS event_search_ev_idx; """ diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql new file mode 100644 index 0000000000..5d9cfecf36 --- /dev/null +++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('event_search_event_id_idx', '{}'); diff --git a/synapse/storage/schema/delta/41/ratelimit.sql b/synapse/storage/schema/delta/41/ratelimit.sql new file mode 100644 index 0000000000..a194bf0238 --- /dev/null +++ b/synapse/storage/schema/delta/41/ratelimit.sql @@ -0,0 +1,22 @@ +/* Copyright 2017 Vector Creations Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE ratelimit_override ( + user_id TEXT NOT NULL, + messages_per_second BIGINT, + burst_count BIGINT +); + +CREATE UNIQUE INDEX ratelimit_override_idx ON ratelimit_override(user_id); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a16afa8df5..85acf2ad1e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,18 @@ class StateStore(SQLBaseStore): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=dict(context.current_state_ids), + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", diff --git a/synapse/types.py b/synapse/types.py index c87ed813b9..445bdcb4d7 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -56,10 +56,10 @@ def create_requester(user_id, access_token_id=None, is_guest=False, def get_domain_from_id(string): - try: - return string.split(":", 1)[1] - except IndexError: + idx = string.find(":") + if idx == -1: raise SynapseError(400, "Invalid ID: %r" % (string,)) + return string[idx + 1:] class DomainSpecificString( diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index aa182eeac7..48dcbafeef 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -96,7 +96,7 @@ class Cache(object): "Cache objects can only be accessed from the main thread" ) - def get(self, key, default=_CacheSentinel, callback=None): + def get(self, key, default=_CacheSentinel, callback=None, update_metrics=True): """Looks the key up in the caches. Args: @@ -104,6 +104,7 @@ class Cache(object): default: What is returned if key is not in the caches. If not specified then function throws KeyError instead callback(fn): Gets called when the entry in the cache is invalidated + update_metrics (bool): whether to update the cache hit rate metrics Returns: Either a Deferred or the raw result @@ -113,7 +114,8 @@ class Cache(object): if val is not _CacheSentinel: if val.sequence == self.sequence: val.callbacks.update(callbacks) - self.metrics.inc_hits() + if update_metrics: + self.metrics.inc_hits() return val.deferred val = self.cache.get(key, _CacheSentinel, callbacks=callbacks) @@ -121,7 +123,8 @@ class Cache(object): self.metrics.inc_hits() return val - self.metrics.inc_misses() + if update_metrics: + self.metrics.inc_misses() if default is _CacheSentinel: raise KeyError() diff --git a/synapse/visibility.py b/synapse/visibility.py index 5590b866ed..c4dd9ae2c7 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -189,6 +189,25 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks +def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): + user_ids = set(u[0] for u in user_tuples) + event_id_to_state = {} + for event_id, context in event_id_to_context.items(): + state = yield store.get_events([ + e_id + for key, e_id in context.current_state_ids.iteritems() + if key == (EventTypes.RoomHistoryVisibility, "") + or (key[0] == EventTypes.Member and key[1] in user_ids) + ]) + event_id_to_state[event_id] = state + + res = yield filter_events_for_clients( + store, user_tuples, events, event_id_to_state + ) + defer.returnValue(res) + + +@defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ Check which events a user is allowed to see diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 878a54dc34..19f5ed6bce 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -14,6 +14,7 @@ # limitations under the License. import mock +from synapse.api import errors from twisted.internet import defer import synapse.api.errors @@ -44,3 +45,134 @@ class E2eKeysHandlerTestCase(unittest.TestCase): local_user = "@boris:" + self.hs.hostname res = yield self.handler.query_local_devices({local_user: None}) self.assertDictEqual(res, {local_user: {}}) + + @defer.inlineCallbacks + def test_reupload_one_time_keys(self): + """we should be able to re-upload the same keys""" + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + # we should be able to change the signature without a problem + keys["alg2:k2"]["signatures"]["k1"] = "sig2" + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + @defer.inlineCallbacks + def test_change_one_time_keys(self): + """attempts to change one-time-keys should be rejected""" + + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + "alg2:k2": { + "key": "key2", + "signatures": {"k1": "sig1"} + }, + "alg2:k3": { + "key": "key3", + }, + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1, "alg2": 2} + }) + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg1:k1": "key2"}}, + ) + self.fail("No error when changing string key") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": {"alg2:k3": "key2"}}, + ) + self.fail("No error when replacing dict key with string") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": {"alg1:k1": {"key": "key"}} + }, + ) + self.fail("No error when replacing string key with dict") + except errors.SynapseError: + pass + + try: + yield self.handler.upload_keys_for_user( + local_user, device_id, { + "one_time_keys": { + "alg2:k2": { + "key": "key3", + "signatures": {"k1": "sig1"}, + } + }, + }, + ) + self.fail("No error when replacing dict key") + except errors.SynapseError: + pass + + @unittest.DEBUG + @defer.inlineCallbacks + def test_claim_one_time_key(self): + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1} + }) + + res2 = yield self.handler.claim_one_time_keys({ + "one_time_keys": { + local_user: { + device_id: "alg1" + } + } + }, timeout=None) + self.assertEqual(res2, { + "failures": {}, + "one_time_keys": { + local_user: { + device_id: { + "alg1:k1": "key1" + } + } + } + }) |