diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 34 | ||||
-rw-r--r-- | synapse/handlers/device.py | 29 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 86 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 7 | ||||
-rw-r--r-- | synapse/handlers/message.py | 16 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 6 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 2 | ||||
-rw-r--r-- | synapse/handlers/register.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 3 |
10 files changed, 146 insertions, 46 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e83adc8339..faa5609c0c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,7 +53,20 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() - def ratelimit(self, requester): + @defer.inlineCallbacks + def ratelimit(self, requester, update=True): + """Ratelimits requests. + + Args: + requester (Requester) + update (bool): Whether to record that a request is being processed. + Set to False when doing multiple checks for one request (e.g. + to check up front if we would reject the request), and set to + True for the last call for a given request. + + Raises: + LimitExceededError if the request should be ratelimited + """ time_now = self.clock.time() user_id = requester.user.to_string() @@ -67,10 +80,25 @@ class BaseHandler(object): if requester.app_service and not requester.app_service.is_rate_limited(): return + # Check if there is a per user override in the DB. + override = yield self.store.get_ratelimit_for_user(user_id) + if override: + # If overriden with a null Hz then ratelimiting has been entirely + # disabled for the user + if not override.messages_per_second: + return + + messages_per_second = override.messages_per_second + burst_count = override.burst_count + else: + messages_per_second = self.hs.config.rc_messages_per_second + burst_count = self.hs.config.rc_message_burst_count + allowed, time_allowed = self.ratelimiter.send_message( user_id, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, + msg_rate_hz=messages_per_second, + burst_count=burst_count, + update=update, ) if not allowed: raise LimitExceededError( diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c22f65ce5d..982cda3edf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -425,12 +426,38 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + # Given a list of updates we check if we need to resync. This + # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) - result = yield self.federation.query_user_devices(origin, user_id) + try: + result = yield self.federation.query_user_devices(origin, user_id) + except NotRetryingDestination: + # TODO: Remember that we are now out of sync and try again + # later + logger.warn( + "Failed to handle device list update for %s," + " we're not retrying the remote", + user_id, + ) + # We abort on exceptions rather than accepting the update + # as otherwise synapse will 'forget' that its device list + # is out of date. If we bail then we will retry the resync + # next time we get a device list update for this user_id. + # This makes it more likely that the device lists will + # eventually become consistent. + return + except Exception: + # TODO: Remember that we are now out of sync and try again + # later + logger.exception( + "Failed to handle device list update for %s", user_id + ) + return + stream_id = result["stream_id"] devices = result["devices"] yield self.store.update_remote_device_list_cache( diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index c2b38d72a9..668a90e495 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -145,7 +145,7 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) for destination in remote_queries_not_in_cache ])) @@ -257,11 +257,21 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) for destination in remote_queries ])) + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({ "one_time_keys": json_result, "failures": failures @@ -288,19 +298,8 @@ class E2eKeysHandler(object): one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list + yield self._upload_one_time_keys_for_user( + user_id, device_id, time_now, one_time_keys, ) # the device should have been registered already, but it may have been @@ -313,3 +312,58 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue({"one_time_key_counts": result}) + + @defer.inlineCallbacks + def _upload_one_time_keys_for_user(self, user_id, device_id, time_now, + one_time_keys): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), device_id, user_id, time_now, + ) + + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, key_obj + )) + + # First we check if we have already persisted any of the keys. + existing_key_map = yield self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) + + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ("One time key %s:%s already exists. " + "Old key: %s; new key: %r") % + (algorithm, key_id, ex_json, key) + ) + else: + new_keys.append((algorithm, key_id, encode_canonical_json(key))) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) + + +def _one_time_keys_match(old_key_json, new_key): + old_key = json.loads(old_key_json) + + # if either is a string rather than an object, they must match exactly + if not isinstance(old_key, dict) or not isinstance(new_key, dict): + return old_key == new_key + + # otherwise, we strip off the 'signatures' if any, because it's legitimate + # for different upload attempts to have different signatures. + old_key.pop("signatures", None) + new_key_copy = dict(new_key) + new_key_copy.pop("signatures", None) + + return old_key == new_key_copy diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2af9849ed0..52d97dfbf3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -380,13 +380,6 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - # if we're receiving valid events from an origin, - # it's probably a good idea to mark it as not in retry-state - # for sending (although this is a bit of a leap) - retry_timings = yield self.store.get_destination_retry_timings(origin) - if retry_timings and retry_timings["retry_last_ts"]: - self.store.set_destination_retry_timings(origin, 0, 0) - room = yield self.store.get_room(event.room_id) if not room: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 57265c6d7d..196925edad 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError +from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -254,17 +254,7 @@ class MessageHandler(BaseHandler): # We check here if we are currently being rate limited, so that we # don't do unnecessary work. We check again just before we actually # send the event. - time_now = self.clock.time() - allowed, time_allowed = self.ratelimiter.send_message( - event.sender, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, - update=False, - ) - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now)), - ) + yield self.ratelimit(requester, update=False) user = UserID.from_string(event.sender) @@ -499,7 +489,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - self.ratelimit(requester) + yield self.ratelimit(requester) try: yield self.auth.check_from_context(event, context) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f3707afcd0..c7c0b0a1e2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -780,12 +780,12 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) self._push_to_remotes([state]) else: + user_ids = yield self.store.get_users_in_room(room_id) user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) @@ -1322,7 +1322,7 @@ def get_interested_parties(store, states): @defer.inlineCallbacks -def get_interested_remotes(store, states): +def get_interested_remotes(store, states, state_handler): """Given a list of presence states figure out which remote servers should be sent which. @@ -1345,7 +1345,7 @@ def get_interested_remotes(store, states): room_ids_to_states, users_to_states = yield get_interested_parties(store, states) for room_id, states in room_ids_to_states.iteritems(): - hosts = yield store.get_hosts_in_room(room_id) + hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.iteritems(): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9bf638f818..7abee98dea 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): return - self.ratelimit(requester) + yield self.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 03c6a85fc6..ee3a2269a8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -54,6 +54,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if not localpart: + raise SynapseError( + 400, + "User ID cannot be empty", + Codes.INVALID_USERNAME + ) + if localpart[0] == '_': raise SynapseError( 400, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99cb7db0db..d2a0d6520a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - self.ratelimit(requester) + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ab87632d99..1ca88517a2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -739,10 +739,11 @@ class RoomMemberHandler(BaseHandler): if len(current_state_ids) == 1 and create_event_id: defer.returnValue(self.hs.is_mine_id(create_event_id)) - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue + event_id = current_state_ids[(etype, state_key)] event = yield self.store.get_event(event_id, allow_none=True) if not event: continue |