diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 34 | ||||
-rw-r--r-- | synapse/handlers/device.py | 29 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 86 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 125 | ||||
-rw-r--r-- | synapse/handlers/identity.py | 11 | ||||
-rw-r--r-- | synapse/handlers/message.py | 37 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 218 | ||||
-rw-r--r-- | synapse/handlers/profile.py | 2 | ||||
-rw-r--r-- | synapse/handlers/read_marker.py | 64 | ||||
-rw-r--r-- | synapse/handlers/register.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 27 | ||||
-rw-r--r-- | synapse/handlers/typing.py | 7 |
13 files changed, 450 insertions, 199 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index e83adc8339..faa5609c0c 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -53,7 +53,20 @@ class BaseHandler(object): self.event_builder_factory = hs.get_event_builder_factory() - def ratelimit(self, requester): + @defer.inlineCallbacks + def ratelimit(self, requester, update=True): + """Ratelimits requests. + + Args: + requester (Requester) + update (bool): Whether to record that a request is being processed. + Set to False when doing multiple checks for one request (e.g. + to check up front if we would reject the request), and set to + True for the last call for a given request. + + Raises: + LimitExceededError if the request should be ratelimited + """ time_now = self.clock.time() user_id = requester.user.to_string() @@ -67,10 +80,25 @@ class BaseHandler(object): if requester.app_service and not requester.app_service.is_rate_limited(): return + # Check if there is a per user override in the DB. + override = yield self.store.get_ratelimit_for_user(user_id) + if override: + # If overriden with a null Hz then ratelimiting has been entirely + # disabled for the user + if not override.messages_per_second: + return + + messages_per_second = override.messages_per_second + burst_count = override.burst_count + else: + messages_per_second = self.hs.config.rc_messages_per_second + burst_count = self.hs.config.rc_message_burst_count + allowed, time_allowed = self.ratelimiter.send_message( user_id, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, + msg_rate_hz=messages_per_second, + burst_count=burst_count, + update=update, ) if not allowed: raise LimitExceededError( diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c22f65ce5d..982cda3edf 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -17,6 +17,7 @@ from synapse.api.constants import EventTypes from synapse.util import stringutils from synapse.util.async import Linearizer from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer @@ -425,12 +426,38 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + # Given a list of updates we check if we need to resync. This + # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) - result = yield self.federation.query_user_devices(origin, user_id) + try: + result = yield self.federation.query_user_devices(origin, user_id) + except NotRetryingDestination: + # TODO: Remember that we are now out of sync and try again + # later + logger.warn( + "Failed to handle device list update for %s," + " we're not retrying the remote", + user_id, + ) + # We abort on exceptions rather than accepting the update + # as otherwise synapse will 'forget' that its device list + # is out of date. If we bail then we will retry the resync + # next time we get a device list update for this user_id. + # This makes it more likely that the device lists will + # eventually become consistent. + return + except Exception: + # TODO: Remember that we are now out of sync and try again + # later + logger.exception( + "Failed to handle device list update for %s", user_id + ) + return + stream_id = result["stream_id"] devices = result["devices"] yield self.store.update_remote_device_list_cache( diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index c2b38d72a9..668a90e495 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException from synapse.types import get_domain_from_id -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.util.logcontext import preserve_fn, make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -145,7 +145,7 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(do_remote_query)(destination) for destination in remote_queries_not_in_cache ])) @@ -257,11 +257,21 @@ class E2eKeysHandler(object): "status": 503, "message": e.message } - yield preserve_context_over_deferred(defer.gatherResults([ + yield make_deferred_yieldable(defer.gatherResults([ preserve_fn(claim_client_keys)(destination) for destination in remote_queries ])) + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({ "one_time_keys": json_result, "failures": failures @@ -288,19 +298,8 @@ class E2eKeysHandler(object): one_time_keys = keys.get("one_time_keys", None) if one_time_keys: - logger.info( - "Adding %d one_time_keys for device %r for user %r at %d", - len(one_time_keys), device_id, user_id, time_now - ) - key_list = [] - for key_id, key_json in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append(( - algorithm, key_id, encode_canonical_json(key_json) - )) - - yield self.store.add_e2e_one_time_keys( - user_id, device_id, time_now, key_list + yield self._upload_one_time_keys_for_user( + user_id, device_id, time_now, one_time_keys, ) # the device should have been registered already, but it may have been @@ -313,3 +312,58 @@ class E2eKeysHandler(object): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue({"one_time_key_counts": result}) + + @defer.inlineCallbacks + def _upload_one_time_keys_for_user(self, user_id, device_id, time_now, + one_time_keys): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), device_id, user_id, time_now, + ) + + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, key_obj + )) + + # First we check if we have already persisted any of the keys. + existing_key_map = yield self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) + + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ("One time key %s:%s already exists. " + "Old key: %s; new key: %r") % + (algorithm, key_id, ex_json, key) + ) + else: + new_keys.append((algorithm, key_id, encode_canonical_json(key))) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) + + +def _one_time_keys_match(old_key_json, new_key): + old_key = json.loads(old_key_json) + + # if either is a string rather than an object, they must match exactly + if not isinstance(old_key, dict) or not isinstance(new_key, dict): + return old_key == new_key + + # otherwise, we strip off the 'signatures' if any, because it's legitimate + # for different upload attempts to have different signatures. + old_key.pop("signatures", None) + new_key_copy = dict(new_key) + new_key_copy.pop("signatures", None) + + return old_key == new_key_copy diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 53f9296399..52d97dfbf3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -28,7 +28,7 @@ from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.events.validator import EventValidator from synapse.util import unwrapFirstError from synapse.util.logcontext import ( - PreserveLoggingContext, preserve_fn, preserve_context_over_deferred + preserve_fn, preserve_context_over_deferred ) from synapse.util.metrics import measure_func from synapse.util.logutils import log_function @@ -172,8 +172,22 @@ class FederationHandler(BaseHandler): origin, pdu, prevs, min_depth ) - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + # Update the set of things we've seen after trying to + # fetch the missing stuff + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.iterkeys()) + + if not prevs - seen: + logger.info( + "Found all missing prev events for %s", pdu.event_id + ) + elif prevs - seen: + logger.info( + "Not fetching %d missing events for room %r,event %s: %r...", + len(prevs - seen), pdu.room_id, pdu.event_id, + list(prevs - seen)[:5], + ) + if prevs - seen: logger.info( "Still missing %d events for room %r: %r...", @@ -208,19 +222,15 @@ class FederationHandler(BaseHandler): Args: origin (str): Origin of the pdu. Will be called to get the missing events pdu: received pdu - prevs (str[]): List of event ids which we are missing + prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. - - Returns: - Deferred<dict(str, str?)>: updated have_seen dictionary """ # We recalculate seen, since it may have changed. have_seen = yield self.store.have_events(prevs) seen = set(have_seen.keys()) if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) + return latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id @@ -232,8 +242,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + "Missing %d events for room %r pdu %s: %r...", + len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] ) # XXX: we set timeout to 10s to help workaround @@ -265,22 +275,23 @@ class FederationHandler(BaseHandler): timeout=10000, ) + logger.info( + "Got %d events: %r...", + len(missing_events), [e.event_id for e in missing_events[:5]] + ) + # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for e in missing_events: + logger.info("Handling found event %s", e.event_id) yield self.on_receive_pdu( origin, e, get_missing=False ) - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - defer.returnValue(have_seen) - @log_function @defer.inlineCallbacks def _process_received_pdu(self, origin, pdu, state, auth_chain): @@ -369,13 +380,6 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - # if we're receiving valid events from an origin, - # it's probably a good idea to mark it as not in retry-state - # for sending (although this is a bit of a leap) - retry_timings = yield self.store.get_destination_retry_timings(origin) - if retry_timings and retry_timings["retry_last_ts"]: - self.store.set_destination_retry_timings(origin, 0, 0) - room = yield self.store.get_room(event.room_id) if not room: @@ -394,11 +398,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) if event.type == EventTypes.Member: if event.membership == Membership.JOIN: @@ -916,11 +919,10 @@ class FederationHandler(BaseHandler): origin, auth_chain, state, event ) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[joinee] - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[joinee] + ) logger.debug("Finished joining %s to %s", joinee, room_id) finally: @@ -1035,10 +1037,9 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, extra_users=extra_users + ) if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: @@ -1084,29 +1085,22 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=[target_user], - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=[target_user], + ) defer.returnValue(event) @defer.inlineCallbacks def do_remotely_reject_invite(self, target_hosts, room_id, user_id): - try: - origin, event = yield self._make_and_verify_event( - target_hosts, - room_id, - user_id, - "leave" - ) - event = self._sign_event(event) - except SynapseError: - raise - except CodeMessageException as e: - logger.warn("Failed to reject invite: %s", e) - raise SynapseError(500, "Failed to reject invite") + origin, event = yield self._make_and_verify_event( + target_hosts, + room_id, + user_id, + "leave" + ) + event = self._sign_event(event) # Try the host that we succesfully called /make_leave/ on first for # the /send_leave/ request. @@ -1116,16 +1110,10 @@ class FederationHandler(BaseHandler): except ValueError: pass - try: - yield self.replication_layer.send_leave( - target_hosts, - event - ) - except SynapseError: - raise - except CodeMessageException as e: - logger.warn("Failed to reject invite: %s", e) - raise SynapseError(500, "Failed to reject invite") + yield self.replication_layer.send_leave( + target_hosts, + event + ) context = yield self.state_handler.compute_event_context(event) @@ -1246,10 +1234,9 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - with PreserveLoggingContext(): - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, extra_users=extra_users - ) + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, extra_users=extra_users + ) defer.returnValue(None) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6a53c5eb47..9efcdff1d6 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -18,7 +18,7 @@ from twisted.internet import defer from synapse.api.errors import ( - CodeMessageException + MatrixCodeMessageException, CodeMessageException ) from ._base import BaseHandler from synapse.util.async import run_on_reactor @@ -90,6 +90,9 @@ class IdentityHandler(BaseHandler): ), {'sid': creds['sid'], 'client_secret': client_secret} ) + except MatrixCodeMessageException as e: + logger.info("getValidated3pid failed with Matrix error: %r", e) + raise SynapseError(e.code, e.msg, e.errcode) except CodeMessageException as e: data = json.loads(e.msg) @@ -159,6 +162,9 @@ class IdentityHandler(BaseHandler): params ) defer.returnValue(data) + except MatrixCodeMessageException as e: + logger.info("Proxied requestToken failed with Matrix error: %r", e) + raise SynapseError(e.code, e.msg, e.errcode) except CodeMessageException as e: logger.info("Proxied requestToken failed: %r", e) raise e @@ -193,6 +199,9 @@ class IdentityHandler(BaseHandler): params ) defer.returnValue(data) + except MatrixCodeMessageException as e: + logger.info("Proxied requestToken failed with Matrix error: %r", e) + raise SynapseError(e.code, e.msg, e.errcode) except CodeMessageException as e: logger.info("Proxied requestToken failed: %r", e) raise e diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7a498af5a2..196925edad 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership -from synapse.api.errors import AuthError, Codes, SynapseError, LimitExceededError +from synapse.api.errors import AuthError, Codes, SynapseError from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator @@ -175,7 +175,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): + def create_event(self, requester, event_dict, token_id=None, txn_id=None, + prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -185,6 +186,7 @@ class MessageHandler(BaseHandler): Adds display names to Join membership events. Args: + requester event_dict (dict): An entire event token_id (str) txn_id (str) @@ -226,6 +228,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + requester=requester, prev_event_ids=prev_event_ids, ) @@ -251,17 +254,7 @@ class MessageHandler(BaseHandler): # We check here if we are currently being rate limited, so that we # don't do unnecessary work. We check again just before we actually # send the event. - time_now = self.clock.time() - allowed, time_allowed = self.ratelimiter.send_message( - event.sender, time_now, - msg_rate_hz=self.hs.config.rc_messages_per_second, - burst_count=self.hs.config.rc_message_burst_count, - update=False, - ) - if not allowed: - raise LimitExceededError( - retry_after_ms=int(1000 * (time_allowed - time_now)), - ) + yield self.ratelimit(requester, update=False) user = UserID.from_string(event.sender) @@ -319,6 +312,7 @@ class MessageHandler(BaseHandler): See self.create_event and self.send_nonmember_event. """ event, context = yield self.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id @@ -416,7 +410,7 @@ class MessageHandler(BaseHandler): @measure_func("_create_new_client_event") @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): + def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -456,6 +450,8 @@ class MessageHandler(BaseHandler): state_handler = self.state_handler context = yield state_handler.compute_event_context(builder) + if requester: + context.app_service = requester.app_service if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( @@ -493,7 +489,7 @@ class MessageHandler(BaseHandler): # We now need to go and hit out to wherever we need to hit out to. if ratelimit: - self.ratelimit(requester) + yield self.ratelimit(requester) try: yield self.auth.check_from_context(event, context) @@ -531,9 +527,9 @@ class MessageHandler(BaseHandler): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.items() + for k, e_id in context.current_state_ids.iteritems() if k[0] in self.hs.config.room_invite_state_types - or k[0] == EventTypes.Member and k[1] == event.sender + or k == (EventTypes.Member, event.sender) ] state_to_include = yield self.store.get_events(state_to_include_ids) @@ -545,7 +541,7 @@ class MessageHandler(BaseHandler): "content": e.content, "sender": e.sender, } - for e in state_to_include.values() + for e in state_to_include.itervalues() ] invitee = UserID.from_string(event.state_key) @@ -612,12 +608,9 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _notify(): yield run_on_reactor() - yield self.notifier.on_new_room_event( + self.notifier.on_new_room_event( event, event_stream_id, max_stream_id, extra_users=extra_users ) preserve_fn(_notify)() - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1ede117c79..c7c0b0a1e2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -30,6 +30,7 @@ from synapse.api.constants import PresenceState from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.async import Linearizer from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -187,6 +188,7 @@ class PresenceHandler(object): # process_id to millisecond timestamp last updated. self.external_process_to_current_syncs = {} self.external_process_last_updated_ms = {} + self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to @@ -316,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -509,6 +507,73 @@ class PresenceHandler(object): self.external_process_to_current_syncs[process_id] = syncing_user_ids @defer.inlineCallbacks + def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): + """Update the syncing users for an external process as a delta. + + Args: + process_id (str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + user_id (str): The user who has started or stopped syncing + is_syncing (bool): Whether or not the user is now syncing + sync_time_msec(int): Time in ms when the user was last syncing + """ + with (yield self.external_sync_linearizer.queue(process_id)): + prev_state = yield self.current_state_for_user(user_id) + + process_presence = self.external_process_to_current_syncs.setdefault( + process_id, set() + ) + + updates = [] + if is_syncing and user_id not in process_presence: + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=sync_time_msec, + )) + process_presence.add(user_id) + elif user_id in process_presence: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=sync_time_msec, + )) + + if not is_syncing: + process_presence.discard(user_id) + + if updates: + yield self._update_states(updates) + + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + + @defer.inlineCallbacks + def update_external_syncs_clear(self, process_id): + """Marks all users that had been marked as syncing by a given process + as offline. + + Used when the process has stopped/disappeared. + """ + with (yield self.external_sync_linearizer.queue(process_id)): + process_presence = self.external_process_to_current_syncs.pop( + process_id, set() + ) + prev_states = yield self.current_state_for_users(process_presence) + time_now_ms = self.clock.time_msec() + + yield self._update_states([ + prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + ) + for prev_state in prev_states.itervalues() + ]) + self.external_process_last_updated_ms.pop(process_id, None) + + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. """ @@ -527,14 +592,14 @@ class PresenceHandler(object): for user_id in user_ids } - missing = [user_id for user_id, state in states.items() if not state] + missing = [user_id for user_id, state in states.iteritems() if not state] if missing: # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) states.update(res) - missing = [user_id for user_id, state in states.items() if not state] + missing = [user_id for user_id, state in states.iteritems() if not state] if missing: new = { user_id: UserPresenceState.default(user_id) @@ -546,88 +611,39 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): - """Given a list of states return which entities (rooms, users, servers) - are interested in the given states. - - Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, - with each item being a dict of `entity_name` -> `[UserPresenceState]` - """ - room_ids_to_states = {} - users_to_states = {} - for state in states: - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield self.store.get_presence_list_observers_accepted(state.user_id) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - # Always notify self - users_to_states.setdefault(state.user_id, []).append(state) - - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - - # TODO: de-dup hosts_to_states, as a single host might have multiple - # of same presence - - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) - - @defer.inlineCallbacks def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to interested remote servers """ stream_id, max_token = yield self.store.update_presence(states) - parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + parties = yield get_interested_parties(self.store, states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): - parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + parties = yield get_interested_parties(self.store, [state]) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + states (list(UserPresenceState)) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -764,18 +780,17 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: + user_ids = yield self.store.get_users_in_room(room_id) user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): @@ -1275,3 +1290,66 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): persist_and_notify = True return new_state, persist_and_notify, federation_ping + + +@defer.inlineCallbacks +def get_interested_parties(store, states): + """Given a list of states return which entities (rooms, users) + are interested in the given states. + + Args: + states (list(UserPresenceState)) + + Returns: + 2-tuple: `(room_ids_to_states, users_to_states)`, + with each item being a dict of `entity_name` -> `[UserPresenceState]` + """ + room_ids_to_states = {} + users_to_states = {} + for state in states: + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted(state.user_id) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + # Always notify self + users_to_states.setdefault(state.user_id, []).append(state) + + defer.returnValue((room_ids_to_states, users_to_states)) + + +@defer.inlineCallbacks +def get_interested_remotes(store, states, state_handler): + """Given a list of presence states figure out which remote servers + should be sent which. + + All the presence states should be for local users only. + + Args: + store (DataStore) + states (list(UserPresenceState)) + + Returns: + Deferred list of ([destinations], [UserPresenceState]), where for + each row the list of UserPresenceState should be sent to each + destination + """ + hosts_and_states = [] + + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states, users_to_states = yield get_interested_parties(store, states) + + for room_id, states in room_ids_to_states.iteritems(): + hosts = yield state_handler.get_current_hosts_in_room(room_id) + hosts_and_states.append((hosts, states)) + + for user_id, states in users_to_states.iteritems(): + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], states)) + + defer.returnValue(hosts_and_states) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 9bf638f818..7abee98dea 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler): if not self.hs.is_mine(user): return - self.ratelimit(requester) + yield self.ratelimit(requester) room_ids = yield self.store.get_rooms_for_user( user.to_string(), diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py new file mode 100644 index 0000000000..b5b0303d54 --- /dev/null +++ b/synapse/handlers/read_marker.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.async import Linearizer + +import logging +logger = logging.getLogger(__name__) + + +class ReadMarkerHandler(BaseHandler): + def __init__(self, hs): + super(ReadMarkerHandler, self).__init__(hs) + self.server_name = hs.config.server_name + self.store = hs.get_datastore() + self.read_marker_linearizer = Linearizer(name="read_marker") + self.notifier = hs.get_notifier() + + @defer.inlineCallbacks + def received_client_read_marker(self, room_id, user_id, event_id): + """Updates the read marker for a given user in a given room if the event ID given + is ahead in the stream relative to the current read marker. + + This uses a notifier to indicate that account data should be sent down /sync if + the read marker has changed. + """ + + with (yield self.read_marker_linearizer.queue((room_id, user_id))): + account_data = yield self.store.get_account_data_for_room(user_id, room_id) + + existing_read_marker = account_data.get("m.fully_read", None) + + should_update = True + + if existing_read_marker: + # Only update if the new marker is ahead in the stream + should_update = yield self.store.is_event_after( + event_id, + existing_read_marker['event_id'] + ) + + if should_update: + content = { + "event_id": event_id + } + max_id = yield self.store.add_account_data_to_room( + user_id, room_id, "m.fully_read", content + ) + self.notifier.on_new_event("account_data_key", max_id, users=[user_id]) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 03c6a85fc6..ee3a2269a8 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -54,6 +54,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if not localpart: + raise SynapseError( + 400, + "User ID cannot be empty", + Codes.INVALID_USERNAME + ) + if localpart[0] == '_': raise SynapseError( 400, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 99cb7db0db..d2a0d6520a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -75,7 +75,7 @@ class RoomCreationHandler(BaseHandler): """ user_id = requester.user.to_string() - self.ratelimit(requester) + yield self.ratelimit(requester) if "room_alias_name" in config: for wchar in string.whitespace: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 2052d6d05f..1ca88517a2 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler): content["kind"] = "guest" event, context = yield msg_handler.create_event( + requester, { "type": EventTypes.Member, "content": content, @@ -139,13 +140,6 @@ class RoomMemberHandler(BaseHandler): ) yield user_joined_room(self.distributor, user, room_id) - def reject_remote_invite(self, user_id, room_id, remote_room_hosts): - return self.hs.get_handlers().federation_handler.do_remotely_reject_invite( - remote_room_hosts, - room_id, - user_id - ) - @defer.inlineCallbacks def update_membership( self, @@ -286,13 +280,21 @@ class RoomMemberHandler(BaseHandler): else: # send the rejection to the inviter's HS. remote_room_hosts = remote_room_hosts + [inviter.domain] - + fed_handler = self.hs.get_handlers().federation_handler try: - ret = yield self.reject_remote_invite( - target.to_string(), room_id, remote_room_hosts + ret = yield fed_handler.do_remotely_reject_invite( + remote_room_hosts, + room_id, + target.to_string(), ) defer.returnValue(ret) - except SynapseError as e: + except Exception as e: + # if we were unable to reject the exception, just mark + # it as rejected on our end and plough ahead. + # + # The 'except' clause is very broad, but we need to + # capture everything from DNS failures upwards + # logger.warn("Failed to reject invite: %s", e) yield self.store.locally_reject_invite( @@ -737,10 +739,11 @@ class RoomMemberHandler(BaseHandler): if len(current_state_ids) == 1 and create_event_id: defer.returnValue(self.hs.is_mine_id(create_event_id)) - for (etype, state_key), event_id in current_state_ids.items(): + for etype, state_key in current_state_ids: if etype != EventTypes.Member or not self.hs.is_mine_id(state_key): continue + event_id = current_state_ids[(etype, state_key)] event = yield self.store.get_event(event_id, allow_none=True) if not event: continue diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0eea7f8f9c..3b7818af5c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id import logging from collections import namedtuple -import ujson as json logger = logging.getLogger(__name__) @@ -288,11 +287,13 @@ class TypingHandler(object): for room_id, serial in self._room_serials.items(): if last_id < serial and serial <= current_id: typing = self._room_typing[room_id] - typing_bytes = json.dumps(list(typing), ensure_ascii=False) - rows.append((serial, room_id, typing_bytes)) + rows.append((serial, room_id, list(typing))) rows.sort() return rows + def get_current_token(self): + return self._latest_room_serial + class TypingNotificationEventSource(object): def __init__(self, hs): |