diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/_base.py | 8 | ||||
-rw-r--r-- | synapse/handlers/auth.py | 97 | ||||
-rw-r--r-- | synapse/handlers/device.py | 201 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 52 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 7 | ||||
-rw-r--r-- | synapse/handlers/message.py | 10 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 48 | ||||
-rw-r--r-- | synapse/handlers/register.py | 10 | ||||
-rw-r--r-- | synapse/handlers/room.py | 1 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 7 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 22 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 34 |
12 files changed, 383 insertions, 114 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 90f96209f8..e83adc8339 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -88,9 +88,13 @@ class BaseHandler(object): current_state = yield self.store.get_events( context.current_state_ids.values() ) - current_state = current_state.values() else: - current_state = yield self.store.get_current_state(event.room_id) + current_state = yield self.state_handler.get_current_state( + event.room_id + ) + + current_state = current_state.values() + logger.info("maybe_kick_guest_users %r", current_state) yield self.kick_guest_users(current_state) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 3b146f09d6..fffba34383 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -65,6 +65,7 @@ class AuthHandler(BaseHandler): self.hs = hs # FIXME better possibility to access registrationHandler later? self.device_handler = hs.get_device_handler() + self.macaroon_gen = hs.get_macaroon_generator() @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): @@ -529,37 +530,11 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def issue_access_token(self, user_id, device_id=None): - access_token = self.generate_access_token(user_id) + access_token = self.macaroon_gen.generate_access_token(user_id) yield self.store.add_access_token_to_user(user_id, access_token, device_id) defer.returnValue(access_token) - def generate_access_token(self, user_id, extra_caveats=None): - extra_caveats = extra_caveats or [] - macaroon = self._generate_base_macaroon(user_id) - macaroon.add_first_party_caveat("type = access") - # Include a nonce, to make sure that each login gets a different - # access token. - macaroon.add_first_party_caveat("nonce = %s" % ( - stringutils.random_string_with_symbols(16), - )) - for caveat in extra_caveats: - macaroon.add_first_party_caveat(caveat) - return macaroon.serialize() - - def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): - macaroon = self._generate_base_macaroon(user_id) - macaroon.add_first_party_caveat("type = login") - now = self.hs.get_clock().time_msec() - expiry = now + duration_in_ms - macaroon.add_first_party_caveat("time < %d" % (expiry,)) - return macaroon.serialize() - - def generate_delete_pusher_token(self, user_id): - macaroon = self._generate_base_macaroon(user_id) - macaroon.add_first_party_caveat("type = delete_pusher") - return macaroon.serialize() - def validate_short_term_login_token_and_get_user_id(self, login_token): auth_api = self.hs.get_auth() try: @@ -570,15 +545,6 @@ class AuthHandler(BaseHandler): except Exception: raise AuthError(403, "Invalid token", errcode=Codes.FORBIDDEN) - def _generate_base_macaroon(self, user_id): - macaroon = pymacaroons.Macaroon( - location=self.hs.config.server_name, - identifier="key", - key=self.hs.config.macaroon_secret_key) - macaroon.add_first_party_caveat("gen = 1") - macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) - return macaroon - @defer.inlineCallbacks def set_password(self, user_id, newpassword, requester=None): password_hash = self.hash(newpassword) @@ -607,7 +573,7 @@ class AuthHandler(BaseHandler): # types (mediums) of threepid. For now, we still use the existing # infrastructure, but this is the start of synapse gaining knowledge # of specific types of threepid (and fixes the fact that checking - # for the presenc eof an email address during password reset was + # for the presence of an email address during password reset was # case sensitive). if medium == 'email': address = address.lower() @@ -617,6 +583,17 @@ class AuthHandler(BaseHandler): self.hs.get_clock().time_msec() ) + @defer.inlineCallbacks + def delete_threepid(self, user_id, medium, address): + # 'Canonicalise' email addresses as per above + if medium == 'email': + address = address.lower() + + ret = yield self.store.user_delete_threepid( + user_id, medium, address, + ) + defer.returnValue(ret) + def _save_session(self, session): # TODO: Persistent storage logger.debug("Saving session %s", session) @@ -656,12 +633,54 @@ class AuthHandler(BaseHandler): Whether self.hash(password) == stored_hash (bool). """ if stored_hash: - return bcrypt.hashpw(password + self.hs.config.password_pepper, - stored_hash.encode('utf-8')) == stored_hash + return bcrypt.hashpw(password.encode('utf8') + self.hs.config.password_pepper, + stored_hash.encode('utf8')) == stored_hash else: return False +class MacaroonGeneartor(object): + def __init__(self, hs): + self.clock = hs.get_clock() + self.server_name = hs.config.server_name + self.macaroon_secret_key = hs.config.macaroon_secret_key + + def generate_access_token(self, user_id, extra_caveats=None): + extra_caveats = extra_caveats or [] + macaroon = self._generate_base_macaroon(user_id) + macaroon.add_first_party_caveat("type = access") + # Include a nonce, to make sure that each login gets a different + # access token. + macaroon.add_first_party_caveat("nonce = %s" % ( + stringutils.random_string_with_symbols(16), + )) + for caveat in extra_caveats: + macaroon.add_first_party_caveat(caveat) + return macaroon.serialize() + + def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)): + macaroon = self._generate_base_macaroon(user_id) + macaroon.add_first_party_caveat("type = login") + now = self.clock.time_msec() + expiry = now + duration_in_ms + macaroon.add_first_party_caveat("time < %d" % (expiry,)) + return macaroon.serialize() + + def generate_delete_pusher_token(self, user_id): + macaroon = self._generate_base_macaroon(user_id) + macaroon.add_first_party_caveat("type = delete_pusher") + return macaroon.serialize() + + def _generate_base_macaroon(self, user_id): + macaroon = pymacaroons.Macaroon( + location=self.server_name, + identifier="key", + key=self.macaroon_secret_key) + macaroon.add_first_party_caveat("gen = 1") + macaroon.add_first_party_caveat("user_id = %s" % (user_id,)) + return macaroon + + class _AccountHandler(object): """A proxy object that gets passed to password auth providers so they can register new users etc if necessary. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index aa68755936..8cb47ac417 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,7 +14,11 @@ # limitations under the License. from synapse.api import errors +from synapse.api.constants import EventTypes from synapse.util import stringutils +from synapse.util.async import Linearizer +from synapse.util.metrics import measure_func +from synapse.types import get_domain_from_id, RoomStreamToken from twisted.internet import defer from ._base import BaseHandler @@ -27,6 +31,21 @@ class DeviceHandler(BaseHandler): def __init__(self, hs): super(DeviceHandler, self).__init__(hs) + self.hs = hs + self.state = hs.get_state_handler() + self.federation_sender = hs.get_federation_sender() + self.federation = hs.get_replication_layer() + self._remote_edue_linearizer = Linearizer(name="remote_device_list") + + self.federation.register_edu_handler( + "m.device_list_update", self._incoming_device_list_update, + ) + self.federation.register_query_handler( + "user_devices", self.on_federation_query_user_devices, + ) + + hs.get_distributor().observe("user_left_room", self.user_left_room) + @defer.inlineCallbacks def check_device_registered(self, user_id, device_id, initial_device_display_name=None): @@ -45,29 +64,29 @@ class DeviceHandler(BaseHandler): str: device id (generated if none was supplied) """ if device_id is not None: - yield self.store.store_device( + new_device = yield self.store.store_device( user_id=user_id, device_id=device_id, initial_device_display_name=initial_device_display_name, - ignore_if_known=True, ) + if new_device: + yield self.notify_device_update(user_id, [device_id]) defer.returnValue(device_id) # if the device id is not specified, we'll autogen one, but loop a few # times in case of a clash. attempts = 0 while attempts < 5: - try: - device_id = stringutils.random_string(10).upper() - yield self.store.store_device( - user_id=user_id, - device_id=device_id, - initial_device_display_name=initial_device_display_name, - ignore_if_known=False, - ) + device_id = stringutils.random_string(10).upper() + new_device = yield self.store.store_device( + user_id=user_id, + device_id=device_id, + initial_device_display_name=initial_device_display_name, + ) + if new_device: + yield self.notify_device_update(user_id, [device_id]) defer.returnValue(device_id) - except errors.StoreError: - attempts += 1 + attempts += 1 raise errors.StoreError(500, "Couldn't generate a device ID.") @@ -147,6 +166,8 @@ class DeviceHandler(BaseHandler): user_id=user_id, device_id=device_id ) + yield self.notify_device_update(user_id, [device_id]) + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device @@ -166,12 +187,168 @@ class DeviceHandler(BaseHandler): device_id, new_display_name=content.get("display_name") ) + yield self.notify_device_update(user_id, [device_id]) except errors.StoreError, e: if e.code == 404: raise errors.NotFoundError() else: raise + @measure_func("notify_device_update") + @defer.inlineCallbacks + def notify_device_update(self, user_id, device_ids): + """Notify that a user's device(s) has changed. Pokes the notifier, and + remote servers if the user is local. + """ + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + hosts = set() + if self.hs.is_mine_id(user_id): + hosts.update(get_domain_from_id(u) for u in users_who_share_room) + hosts.discard(self.server_name) + + position = yield self.store.add_device_change_to_streams( + user_id, device_ids, list(hosts) + ) + + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = [r.room_id for r in rooms] + + yield self.notifier.on_new_event( + "device_list_key", position, rooms=room_ids, + ) + + if hosts: + logger.info("Sending device list update notif to: %r", hosts) + for host in hosts: + self.federation_sender.send_device_messages(host) + + @measure_func("device.get_user_ids_changed") + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_token): + """Get list of users that have had the devices updated, or have newly + joined a room, that `user_id` may be interested in. + + Args: + user_id (str) + from_token (StreamToken) + """ + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + # First we check if any devices have changed + changed = yield self.store.get_user_whose_devices_changed( + from_token.device_list_key + ) + + # Then work out if any users have since joined + rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + + possibly_changed = set(changed) + for room_id in rooms_changed: + # Fetch the current state at the time. + stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key) + + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + except: + prev_state_ids = {} + + current_state_ids = yield self.state.get_current_state_ids(room_id) + + # If there has been any change in membership, include them in the + # possibly changed list. We'll check if they are joined below, + # and we're not toooo worried about spuriously adding users. + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype == EventTypes.Member: + prev_event_id = prev_state_ids.get(key, None) + if not prev_event_id or prev_event_id != event_id: + possibly_changed.add(state_key) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + defer.returnValue(users_who_share_room & possibly_changed) + + @measure_func("_incoming_device_list_update") + @defer.inlineCallbacks + def _incoming_device_list_update(self, origin, edu_content): + user_id = edu_content["user_id"] + device_id = edu_content["device_id"] + stream_id = edu_content["stream_id"] + prev_ids = edu_content.get("prev_id", []) + + if get_domain_from_id(user_id) != origin: + # TODO: Raise? + logger.warning("Got device list update edu for %r from %r", user_id, origin) + return + + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We don't share any rooms with this user. Ignore update, as we + # probably won't get any further updates. + return + + with (yield self._remote_edue_linearizer.queue(user_id)): + # If the prev id matches whats in our cache table, then we don't need + # to resync the users device list, otherwise we do. + resync = True + if len(prev_ids) == 1: + extremity = yield self.store.get_device_list_last_stream_id_for_remote( + user_id + ) + logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids) + if str(extremity) == str(prev_ids[0]): + resync = False + + if resync: + # Fetch all devices for the user. + result = yield self.federation.query_user_devices(origin, user_id) + stream_id = result["stream_id"] + devices = result["devices"] + yield self.store.update_remote_device_list_cache( + user_id, devices, stream_id, + ) + device_ids = [device["device_id"] for device in devices] + yield self.notify_device_update(user_id, device_ids) + else: + # Simply update the single device, since we know that is the only + # change (becuase of the single prev_id matching the current cache) + content = dict(edu_content) + for key in ("user_id", "device_id", "stream_id", "prev_ids"): + content.pop(key, None) + yield self.store.update_remote_device_list_cache_entry( + user_id, device_id, content, stream_id, + ) + yield self.notify_device_update(user_id, [device_id]) + + @defer.inlineCallbacks + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + defer.returnValue({ + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + }) + + @defer.inlineCallbacks + def user_left_room(self, user, room_id): + user_id = user.to_string() + rooms = yield self.store.get_rooms_for_user(user_id) + if not rooms: + # We no longer share rooms with this user, so we'll no longer + # receive device updates. Mark this in DB. + yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id) + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index b63a660c06..e40495d1ab 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -73,10 +73,9 @@ class E2eKeysHandler(object): if self.is_mine_id(user_id): local_query[user_id] = device_ids else: - domain = get_domain_from_id(user_id) - remote_queries.setdefault(domain, {})[user_id] = device_ids + remote_queries[user_id] = device_ids - # do the queries + # Firt get local devices. failures = {} results = {} if local_query: @@ -85,9 +84,42 @@ class E2eKeysHandler(object): if user_id in local_query: results[user_id] = keys + # Now attempt to get any remote devices from our local cache. + remote_queries_not_in_cache = {} + if remote_queries: + query_list = [] + for user_id, device_ids in remote_queries.iteritems(): + if device_ids: + query_list.extend((user_id, device_id) for device_id in device_ids) + else: + query_list.append((user_id, None)) + + user_ids_not_in_cache, remote_results = ( + yield self.store.get_user_devices_from_cache( + query_list + ) + ) + for user_id, devices in remote_results.iteritems(): + user_devices = results.setdefault(user_id, {}) + for device_id, device in devices.iteritems(): + keys = device.get("keys", None) + device_display_name = device.get("device_display_name", None) + if keys: + result = dict(keys) + unsigned = result.setdefault("unsigned", {}) + if device_display_name: + unsigned["device_display_name"] = device_display_name + user_devices[device_id] = result + + for user_id in user_ids_not_in_cache: + domain = get_domain_from_id(user_id) + r = remote_queries_not_in_cache.setdefault(domain, {}) + r[user_id] = remote_queries[user_id] + + # Now fetch any devices that we don't have in our cache @defer.inlineCallbacks def do_remote_query(destination): - destination_query = remote_queries[destination] + destination_query = remote_queries_not_in_cache[destination] try: limiter = yield get_retry_limiter( destination, self.clock, self.store @@ -119,7 +151,7 @@ class E2eKeysHandler(object): yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(do_remote_query)(destination) - for destination in remote_queries + for destination in remote_queries_not_in_cache ])) defer.returnValue({ @@ -162,7 +194,7 @@ class E2eKeysHandler(object): # "unsigned" section for user_id, device_keys in results.items(): for device_id, device_info in device_keys.items(): - r = json.loads(device_info["key_json"]) + r = dict(device_info["keys"]) r["unsigned"] = {} display_name = device_info["device_display_name"] if display_name is not None: @@ -255,10 +287,12 @@ class E2eKeysHandler(object): device_id, user_id, time_now ) # TODO: Sign the JSON with the server key - yield self.store.set_e2e_device_keys( - user_id, device_id, time_now, - encode_canonical_json(device_keys) + changed = yield self.store.set_e2e_device_keys( + user_id, device_id, time_now, device_keys, ) + if changed: + # Only notify about device updates *if* the keys actually changed + yield self.device_handler.notify_device_update(user_id, [device_id]) one_time_keys = keys.get("one_time_keys", None) if one_time_keys: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1021bcc405..996bfd0e23 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -591,12 +591,12 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) - logger.info("calling resolve_state_groups in _maybe_backfill") + logger.debug("calling resolve_state_groups in _maybe_backfill") states = yield preserve_context_over_deferred(defer.gatherResults([ preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e]) for e in event_ids ])) - states = dict(zip(event_ids, [s[1] for s in states])) + states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( [e_id for ids in states.values() for e_id in ids], @@ -1319,7 +1319,6 @@ class FederationHandler(BaseHandler): event_stream_id, max_stream_id = yield self.store.persist_event( event, new_event_context, - current_state=state, ) defer.returnValue((event_stream_id, max_stream_id)) @@ -1530,7 +1529,7 @@ class FederationHandler(BaseHandler): (d.type, d.state_key): d for d in different_events if d }) - new_state, prev_state = self.state_handler.resolve_events( + new_state = self.state_handler.resolve_events( [local_view.values(), remote_view.values()], event ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7a57a69bd3..7a498af5a2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -208,8 +208,10 @@ class MessageHandler(BaseHandler): content = builder.content try: - content["displayname"] = yield profile.get_displayname(target) - content["avatar_url"] = yield profile.get_avatar_url(target) + if "displayname" not in content: + content["displayname"] = yield profile.get_displayname(target) + if "avatar_url" not in content: + content["avatar_url"] = yield profile.get_avatar_url(target) except Exception as e: logger.info( "Failed to get profile information for %r: %s", @@ -279,7 +281,9 @@ class MessageHandler(BaseHandler): if event.type == EventTypes.Message: presence = self.hs.get_presence_handler() - yield presence.bump_presence_active_time(user) + # We don't want to block sending messages on any presence code. This + # matters as sometimes presence code can take a while. + preserve_fn(presence.bump_presence_active_time)(user) @defer.inlineCallbacks def deduplicate_state_event(self, event, context): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1b89dc6274..fdfce2a88c 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -574,7 +574,7 @@ class PresenceHandler(object): if not local_states: continue - users = yield self.state.get_current_user_in_room(room_id) + users = yield self.store.get_users_in_room(room_id) hosts = set(get_domain_from_id(u) for u in users) for host in hosts: @@ -766,7 +766,7 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.state.get_current_user_in_room(room_id) + user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) @@ -1011,7 +1011,7 @@ class PresenceEventSource(object): @defer.inlineCallbacks @log_function def get_new_events(self, user, from_key, room_ids=None, include_offline=True, - **kwargs): + explicit_room_id=None, **kwargs): # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1028,22 +1028,24 @@ class PresenceEventSource(object): user_id = user.to_string() if from_key is not None: from_key = int(from_key) - room_ids = room_ids or [] presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache - if not room_ids: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(e.room_id for e in rooms) - else: - room_ids = set(room_ids) - max_token = self.store.get_current_presence_token() plist = yield self.store.get_presence_list_accepted(user.localpart) - friends = set(row["observed_user_id"] for row in plist) - friends.add(user_id) # So that we receive our own presence + users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in.add(user_id) # So that we receive our own presence + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + users_interested_in.update(users_who_share_room) + + if explicit_room_id: + user_ids = yield self.store.get_users_in_room(explicit_room_id) + users_interested_in.update(user_ids) user_ids_changed = set() changed = None @@ -1055,35 +1057,19 @@ class PresenceEventSource(object): # work out if we share a room or they're in our presence list get_updates_counter.inc("stream") for other_user_id in changed: - if other_user_id in friends: + if other_user_id in users_interested_in: user_ids_changed.add(other_user_id) - continue - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) - continue else: # Too many possible updates. Find all users we can see and check # if any of them have changed. get_updates_counter.inc("full") - user_ids_to_check = set() - for room_id in room_ids: - users = yield self.state.get_current_user_in_room(room_id) - user_ids_to_check.update(users) - - user_ids_to_check.update(friends) - - # Always include yourself. Only really matters for when the user is - # not in any rooms, but still. - user_ids_to_check.add(user_id) - if from_key: user_ids_changed = stream_change_cache.get_entities_changed( - user_ids_to_check, from_key, + users_interested_in, from_key, ) else: - user_ids_changed = user_ids_to_check + user_ids_changed = users_interested_in updates = yield presence.current_state_for_users(user_ids_changed) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 286f0cef0a..03c6a85fc6 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -40,6 +40,8 @@ class RegistrationHandler(BaseHandler): self._next_generated_user_id = None + self.macaroon_gen = hs.get_macaroon_generator() + @defer.inlineCallbacks def check_username(self, localpart, guest_access_token=None, assigned_user_id=None): @@ -143,7 +145,7 @@ class RegistrationHandler(BaseHandler): token = None if generate_token: - token = self.auth_handler().generate_access_token(user_id) + token = self.macaroon_gen.generate_access_token(user_id) yield self.store.register( user_id=user_id, token=token, @@ -167,7 +169,7 @@ class RegistrationHandler(BaseHandler): user_id = user.to_string() yield self.check_user_id_not_appservice_exclusive(user_id) if generate_token: - token = self.auth_handler().generate_access_token(user_id) + token = self.macaroon_gen.generate_access_token(user_id) try: yield self.store.register( user_id=user_id, @@ -254,7 +256,7 @@ class RegistrationHandler(BaseHandler): user_id = user.to_string() yield self.check_user_id_not_appservice_exclusive(user_id) - token = self.auth_handler().generate_access_token(user_id) + token = self.macaroon_gen.generate_access_token(user_id) try: yield self.store.register( user_id=user_id, @@ -399,7 +401,7 @@ class RegistrationHandler(BaseHandler): user = UserID(localpart, self.hs.hostname) user_id = user.to_string() - token = self.auth_handler().generate_access_token(user_id) + token = self.macaroon_gen.generate_access_token(user_id) if need_register: yield self.store.register( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5f18007e90..7e7671c9a2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -437,6 +437,7 @@ class RoomEventSource(object): limit, room_ids, is_guest, + explicit_room_id=None, ): # We just ignore the key for now. diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 667223df0c..19eebbd43f 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -62,17 +62,18 @@ class RoomListHandler(BaseHandler): appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. """ - if search_filter or (network_tuple and network_tuple.appservice_id is not None): + if search_filter: # We explicitly don't bother caching searches or requests for # appservice specific lists. return self._get_public_room_list( limit, since_token, search_filter, network_tuple=network_tuple, ) - result = self.response_cache.get((limit, since_token)) + key = (limit, since_token, network_tuple) + result = self.response_cache.get(key) if not result: result = self.response_cache.set( - (limit, since_token), + key, self._get_public_room_list( limit, since_token, network_tuple=network_tuple ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 2f8782e522..b2806555cf 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -45,7 +45,7 @@ class RoomMemberHandler(BaseHandler): def __init__(self, hs): super(RoomMemberHandler, self).__init__(hs) - self.member_linearizer = Linearizer() + self.member_linearizer = Linearizer(name="member") self.clock = hs.get_clock() @@ -89,7 +89,7 @@ class RoomMemberHandler(BaseHandler): duplicate = yield msg_handler.deduplicate_state_event(event, context) if duplicate is not None: # Discard the new event since this membership change is a no-op. - return + defer.returnValue(duplicate) yield msg_handler.handle_new_client_event( requester, @@ -120,6 +120,8 @@ class RoomMemberHandler(BaseHandler): if prev_member_event.membership == Membership.JOIN: user_left_room(self.distributor, target, room_id) + defer.returnValue(event) + @defer.inlineCallbacks def remote_join(self, remote_room_hosts, room_id, user, content): if len(remote_room_hosts) == 0: @@ -187,6 +189,7 @@ class RoomMemberHandler(BaseHandler): ratelimit=True, content=None, ): + content_specified = bool(content) if content is None: content = {} @@ -229,6 +232,13 @@ class RoomMemberHandler(BaseHandler): errcode=Codes.BAD_STATE ) + if old_state: + same_content = content == old_state.content + same_membership = old_membership == effective_membership_state + same_sender = requester.user.to_string() == old_state.sender + if same_sender and same_membership and same_content: + defer.returnValue(old_state) + is_host_in_room = yield self._is_host_in_room(current_state_ids) if effective_membership_state == Membership.JOIN: @@ -247,8 +257,9 @@ class RoomMemberHandler(BaseHandler): content["membership"] = Membership.JOIN profile = self.hs.get_handlers().profile_handler - content["displayname"] = yield profile.get_displayname(target) - content["avatar_url"] = yield profile.get_avatar_url(target) + if not content_specified: + content["displayname"] = yield profile.get_displayname(target) + content["avatar_url"] = yield profile.get_avatar_url(target) if requester.is_guest: content["kind"] = "guest" @@ -290,7 +301,7 @@ class RoomMemberHandler(BaseHandler): defer.returnValue({}) - yield self._local_membership_update( + res = yield self._local_membership_update( requester=requester, target=target, room_id=room_id, @@ -300,6 +311,7 @@ class RoomMemberHandler(BaseHandler): prev_event_ids=latest_event_ids, content=content, ) + defer.returnValue(res) @defer.inlineCallbacks def send_membership_event( diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c880f61685..d7dcd1ce5b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,7 +16,7 @@ from synapse.api.constants import Membership, EventTypes from synapse.util.async import concurrently_execute from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure +from synapse.util.metrics import Measure, measure_func from synapse.util.caches.response_cache import ResponseCache from synapse.push.clientformat import format_push_rules_for_user from synapse.visibility import filter_events_for_client @@ -115,6 +115,7 @@ class SyncResult(collections.namedtuple("SyncResult", [ "invited", # InvitedSyncResult for each invited room. "archived", # ArchivedSyncResult for each archived room. "to_device", # List of direct messages for the device. + "device_lists", # List of user_ids whose devices have chanegd ])): __slots__ = [] @@ -129,7 +130,8 @@ class SyncResult(collections.namedtuple("SyncResult", [ self.invited or self.archived or self.account_data or - self.to_device + self.to_device or + self.device_lists ) @@ -544,6 +546,10 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) + device_lists = yield self._generate_sync_entry_for_device_list( + sync_result_builder + ) + defer.returnValue(SyncResult( presence=sync_result_builder.presence, account_data=sync_result_builder.account_data, @@ -551,9 +557,33 @@ class SyncHandler(object): invited=sync_result_builder.invited, archived=sync_result_builder.archived, to_device=sync_result_builder.to_device, + device_lists=device_lists, next_batch=sync_result_builder.now_token, )) + @measure_func("_generate_sync_entry_for_device_list") + @defer.inlineCallbacks + def _generate_sync_entry_for_device_list(self, sync_result_builder): + user_id = sync_result_builder.sync_config.user.to_string() + since_token = sync_result_builder.since_token + + if since_token and since_token.device_list_key: + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed( + since_token.device_list_key + ) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + else: + defer.returnValue([]) + @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): """Generates the portion of the sync response. Populates |