diff options
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r-- | synapse/handlers/device.py | 392 |
1 files changed, 219 insertions, 173 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c708c35d4d..b398848079 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -37,13 +37,185 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) -class DeviceHandler(BaseHandler): +class DeviceWorkerHandler(BaseHandler): def __init__(self, hs): - super(DeviceHandler, self).__init__(hs) + super(DeviceWorkerHandler, self).__init__(hs) self.hs = hs self.state = hs.get_state_handler() self._auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def get_devices_by_user(self, user_id): + """ + Retrieve the given user's devices + + Args: + user_id (str): + Returns: + defer.Deferred: list[dict[str, X]]: info on each device + """ + + device_map = yield self.store.get_devices_by_user(user_id) + + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id=None + ) + + devices = list(device_map.values()) + for device in devices: + _update_device_from_client_ips(device, ips) + + defer.returnValue(devices) + + @defer.inlineCallbacks + def get_device(self, user_id, device_id): + """ Retrieve the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: dict[str, X]: info on the device + Raises: + errors.NotFoundError: if the device was not found + """ + try: + device = yield self.store.get_device(user_id, device_id) + except errors.StoreError: + raise errors.NotFoundError + ips = yield self.store.get_last_client_ip_by_device( + user_id, device_id, + ) + _update_device_from_client_ips(device, ips) + defer.returnValue(device) + + @measure_func("device.get_user_ids_changed") + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_token): + """Get list of users that have had the devices updated, or have newly + joined a room, that `user_id` may be interested in. + + Args: + user_id (str) + from_token (StreamToken) + """ + now_room_key = yield self.store.get_room_events_max_id() + + room_ids = yield self.store.get_rooms_for_user(user_id) + + # First we check if any devices have changed + changed = yield self.store.get_user_whose_devices_changed( + from_token.device_list_key + ) + + # Then work out if any users have since joined + rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_room_key, + ) + rooms_changed.update(event.room_id for event in member_events) + + stream_ordering = RoomStreamToken.parse_stream_token( + from_token.room_key + ).stream + + possibly_changed = set(changed) + possibly_left = set() + for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + + # Fetch the current state at the time. + try: + event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_ordering=stream_ordering + ) + except errors.StoreError: + # we have purged the stream_ordering index since the stream + # ordering: treat it the same as a new room + event_ids = [] + + # special-case for an empty prev state: include all members + # in the changed list + if not event_ids: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + continue + + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + + # mapping from event_id -> state_dict + prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in itervalues(prev_state_ids): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + + # If there has been any change in membership, include them in the + # possibly changed list. We'll check if they are joined below, + # and we're not toooo worried about spuriously adding users. + for key, event_id in iteritems(current_state_ids): + etype, state_key = key + if etype != EventTypes.Member: + continue + + # check if this member has changed since any of the extremities + # at the stream_ordering, and add them to the list if so. + for state_dict in itervalues(prev_state_ids): + prev_event_id = state_dict.get(key, None) + if not prev_event_id or prev_event_id != event_id: + if state_key != user_id: + possibly_changed.add(state_key) + break + + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) + + +class DeviceHandler(DeviceWorkerHandler): + def __init__(self, hs): + super(DeviceHandler, self).__init__(hs) + self.federation_sender = hs.get_federation_sender() self._edu_updater = DeviceListEduUpdater(hs, self) @@ -104,52 +276,6 @@ class DeviceHandler(BaseHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") @defer.inlineCallbacks - def get_devices_by_user(self, user_id): - """ - Retrieve the given user's devices - - Args: - user_id (str): - Returns: - defer.Deferred: list[dict[str, X]]: info on each device - """ - - device_map = yield self.store.get_devices_by_user(user_id) - - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id=None - ) - - devices = list(device_map.values()) - for device in devices: - _update_device_from_client_ips(device, ips) - - defer.returnValue(devices) - - @defer.inlineCallbacks - def get_device(self, user_id, device_id): - """ Retrieve the given device - - Args: - user_id (str): - device_id (str): - - Returns: - defer.Deferred: dict[str, X]: info on the device - Raises: - errors.NotFoundError: if the device was not found - """ - try: - device = yield self.store.get_device(user_id, device_id) - except errors.StoreError: - raise errors.NotFoundError - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id, - ) - _update_device_from_client_ips(device, ips) - defer.returnValue(device) - - @defer.inlineCallbacks def delete_device(self, user_id, device_id): """ Delete the given device @@ -276,6 +402,12 @@ class DeviceHandler(BaseHandler): user_id, device_ids, list(hosts) ) + for device_id in device_ids: + logger.debug( + "Notifying about update %r/%r, ID: %r", user_id, device_id, + position, + ) + room_ids = yield self.store.get_rooms_for_user(user_id) yield self.notifier.on_new_event( @@ -283,130 +415,10 @@ class DeviceHandler(BaseHandler): ) if hosts: - logger.info("Sending device list update notif to: %r", hosts) + logger.info("Sending device list update notif for %r to: %r", user_id, hosts) for host in hosts: self.federation_sender.send_device_messages(host) - @measure_func("device.get_user_ids_changed") - @defer.inlineCallbacks - def get_user_ids_changed(self, user_id, from_token): - """Get list of users that have had the devices updated, or have newly - joined a room, that `user_id` may be interested in. - - Args: - user_id (str) - from_token (StreamToken) - """ - now_token = yield self.hs.get_event_sources().get_current_token() - - room_ids = yield self.store.get_rooms_for_user(user_id) - - # First we check if any devices have changed - changed = yield self.store.get_user_whose_devices_changed( - from_token.device_list_key - ) - - # Then work out if any users have since joined - rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) - - member_events = yield self.store.get_membership_changes_for_user( - user_id, from_token.room_key, now_token.room_key - ) - rooms_changed.update(event.room_id for event in member_events) - - stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key - ).stream - - possibly_changed = set(changed) - possibly_left = set() - for room_id in rooms_changed: - current_state_ids = yield self.store.get_current_state_ids(room_id) - - # The user may have left the room - # TODO: Check if they actually did or if we were just invited. - if room_id not in room_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_left.add(state_key) - continue - - # Fetch the current state at the time. - try: - event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_ordering=stream_ordering - ) - except errors.StoreError: - # we have purged the stream_ordering index since the stream - # ordering: treat it the same as a new room - event_ids = [] - - # special-case for an empty prev state: include all members - # in the changed list - if not event_ids: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - continue - - current_member_id = current_state_ids.get((EventTypes.Member, user_id)) - if not current_member_id: - continue - - # mapping from event_id -> state_dict - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) - - # Check if we've joined the room? If so we just blindly add all the users to - # the "possibly changed" users. - for state_dict in itervalues(prev_state_ids): - member_event = state_dict.get((EventTypes.Member, user_id), None) - if not member_event or member_event != current_member_id: - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - possibly_changed.add(state_key) - break - - # If there has been any change in membership, include them in the - # possibly changed list. We'll check if they are joined below, - # and we're not toooo worried about spuriously adding users. - for key, event_id in iteritems(current_state_ids): - etype, state_key = key - if etype != EventTypes.Member: - continue - - # check if this member has changed since any of the extremities - # at the stream_ordering, and add them to the list if so. - for state_dict in itervalues(prev_state_ids): - prev_event_id = state_dict.get(key, None) - if not prev_event_id or prev_event_id != event_id: - if state_key != user_id: - possibly_changed.add(state_key) - break - - if possibly_changed or possibly_left: - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) - - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - possibly_joined = possibly_changed & users_who_share_room - possibly_left = (possibly_changed | possibly_left) - users_who_share_room - else: - possibly_joined = [] - possibly_left = [] - - defer.returnValue({ - "changed": list(possibly_joined), - "left": list(possibly_left), - }) - @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) @@ -473,15 +485,26 @@ class DeviceListEduUpdater(object): if get_domain_from_id(user_id) != origin: # TODO: Raise? - logger.warning("Got device list update edu for %r from %r", user_id, origin) + logger.warning( + "Got device list update edu for %r/%r from %r", + user_id, device_id, origin, + ) return room_ids = yield self.store.get_rooms_for_user(user_id) if not room_ids: # We don't share any rooms with this user. Ignore update, as we # probably won't get any further updates. + logger.warning( + "Got device list update edu for %r/%r, but don't share a room", + user_id, device_id, + ) return + logger.debug( + "Received device list update for %r/%r", user_id, device_id, + ) + self._pending_updates.setdefault(user_id, []).append( (device_id, stream_id, prev_ids, edu_content) ) @@ -499,10 +522,18 @@ class DeviceListEduUpdater(object): # This can happen since we batch updates return + for device_id, stream_id, prev_ids, content in pending_updates: + logger.debug( + "Handling update %r/%r, ID: %r, prev: %r ", + user_id, device_id, stream_id, prev_ids, + ) + # Given a list of updates we check if we need to resync. This # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) + logger.debug("Need to re-sync devices for %r? %r", user_id, resync) + if resync: # Fetch all devices for the user. origin = get_domain_from_id(user_id) @@ -555,11 +586,21 @@ class DeviceListEduUpdater(object): ) devices = [] + for device in devices: + logger.debug( + "Handling resync update %r/%r, ID: %r", + user_id, device["device_id"], stream_id, + ) + yield self.store.update_remote_device_list_cache( user_id, devices, stream_id, ) device_ids = [device["device_id"] for device in devices] yield self.device_handler.notify_device_update(user_id, device_ids) + + # We clobber the seen updates since we've re-synced from a given + # point. + self._seen_updates[user_id] = set([stream_id]) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) @@ -572,9 +613,9 @@ class DeviceListEduUpdater(object): user_id, [device_id for device_id, _, _, _ in pending_updates] ) - self._seen_updates.setdefault(user_id, set()).update( - stream_id for _, stream_id, _, _ in pending_updates - ) + self._seen_updates.setdefault(user_id, set()).update( + stream_id for _, stream_id, _, _ in pending_updates + ) @defer.inlineCallbacks def _need_to_do_resync(self, user_id, updates): @@ -587,6 +628,11 @@ class DeviceListEduUpdater(object): user_id ) + logger.debug( + "Current extremity for %r: %r", + user_id, extremity, + ) + stream_id_in_updates = set() # stream_ids in updates list for _, stream_id, prev_ids, _ in updates: if not prev_ids: |