diff options
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r-- | synapse/handlers/device.py | 112 |
1 files changed, 49 insertions, 63 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index b398848079..f59d0479b5 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -58,9 +58,7 @@ class DeviceWorkerHandler(BaseHandler): device_map = yield self.store.get_devices_by_user(user_id) - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id=None - ) + ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None) devices = list(device_map.values()) for device in devices: @@ -85,9 +83,7 @@ class DeviceWorkerHandler(BaseHandler): device = yield self.store.get_device(user_id, device_id) except errors.StoreError: raise errors.NotFoundError - ips = yield self.store.get_last_client_ip_by_device( - user_id, device_id, - ) + ips = yield self.store.get_last_client_ip_by_device(user_id, device_id) _update_device_from_client_ips(device, ips) defer.returnValue(device) @@ -114,13 +110,11 @@ class DeviceWorkerHandler(BaseHandler): rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) member_events = yield self.store.get_membership_changes_for_user( - user_id, from_token.room_key, now_room_key, + user_id, from_token.room_key, now_room_key ) rooms_changed.update(event.room_id for event in member_events) - stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key - ).stream + stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key).stream possibly_changed = set(changed) possibly_left = set() @@ -206,10 +200,9 @@ class DeviceWorkerHandler(BaseHandler): possibly_joined = [] possibly_left = [] - defer.returnValue({ - "changed": list(possibly_joined), - "left": list(possibly_left), - }) + defer.returnValue( + {"changed": list(possibly_joined), "left": list(possibly_left)} + ) class DeviceHandler(DeviceWorkerHandler): @@ -223,17 +216,18 @@ class DeviceHandler(DeviceWorkerHandler): federation_registry = hs.get_federation_registry() federation_registry.register_edu_handler( - "m.device_list_update", self._edu_updater.incoming_device_list_update, + "m.device_list_update", self._edu_updater.incoming_device_list_update ) federation_registry.register_query_handler( - "user_devices", self.on_federation_query_user_devices, + "user_devices", self.on_federation_query_user_devices ) hs.get_distributor().observe("user_left_room", self.user_left_room) @defer.inlineCallbacks - def check_device_registered(self, user_id, device_id, - initial_device_display_name=None): + def check_device_registered( + self, user_id, device_id, initial_device_display_name=None + ): """ If the given device has not been registered, register it with the supplied display name. @@ -297,12 +291,10 @@ class DeviceHandler(DeviceWorkerHandler): raise yield self._auth_handler.delete_access_tokens_for_user( - user_id, device_id=device_id, + user_id, device_id=device_id ) - yield self.store.delete_e2e_keys_by_device( - user_id=user_id, device_id=device_id - ) + yield self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id) yield self.notify_device_update(user_id, [device_id]) @@ -349,7 +341,7 @@ class DeviceHandler(DeviceWorkerHandler): # considered as part of a critical path. for device_id in device_ids: yield self._auth_handler.delete_access_tokens_for_user( - user_id, device_id=device_id, + user_id, device_id=device_id ) yield self.store.delete_e2e_keys_by_device( user_id=user_id, device_id=device_id @@ -372,9 +364,7 @@ class DeviceHandler(DeviceWorkerHandler): try: yield self.store.update_device( - user_id, - device_id, - new_display_name=content.get("display_name") + user_id, device_id, new_display_name=content.get("display_name") ) yield self.notify_device_update(user_id, [device_id]) except errors.StoreError as e: @@ -404,29 +394,26 @@ class DeviceHandler(DeviceWorkerHandler): for device_id in device_ids: logger.debug( - "Notifying about update %r/%r, ID: %r", user_id, device_id, - position, + "Notifying about update %r/%r, ID: %r", user_id, device_id, position ) room_ids = yield self.store.get_rooms_for_user(user_id) - yield self.notifier.on_new_event( - "device_list_key", position, rooms=room_ids, - ) + yield self.notifier.on_new_event("device_list_key", position, rooms=room_ids) if hosts: - logger.info("Sending device list update notif for %r to: %r", user_id, hosts) + logger.info( + "Sending device list update notif for %r to: %r", user_id, hosts + ) for host in hosts: self.federation_sender.send_device_messages(host) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - defer.returnValue({ - "user_id": user_id, - "stream_id": stream_id, - "devices": devices, - }) + defer.returnValue( + {"user_id": user_id, "stream_id": stream_id, "devices": devices} + ) @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -440,10 +427,7 @@ class DeviceHandler(DeviceWorkerHandler): def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) - device.update({ - "last_seen_ts": ip.get("last_seen"), - "last_seen_ip": ip.get("ip"), - }) + device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")}) class DeviceListEduUpdater(object): @@ -481,13 +465,15 @@ class DeviceListEduUpdater(object): device_id = edu_content.pop("device_id") stream_id = str(edu_content.pop("stream_id")) # They may come as ints prev_ids = edu_content.pop("prev_id", []) - prev_ids = [str(p) for p in prev_ids] # They may come as ints + prev_ids = [str(p) for p in prev_ids] # They may come as ints if get_domain_from_id(user_id) != origin: # TODO: Raise? logger.warning( "Got device list update edu for %r/%r from %r", - user_id, device_id, origin, + user_id, + device_id, + origin, ) return @@ -497,13 +483,12 @@ class DeviceListEduUpdater(object): # probably won't get any further updates. logger.warning( "Got device list update edu for %r/%r, but don't share a room", - user_id, device_id, + user_id, + device_id, ) return - logger.debug( - "Received device list update for %r/%r", user_id, device_id, - ) + logger.debug("Received device list update for %r/%r", user_id, device_id) self._pending_updates.setdefault(user_id, []).append( (device_id, stream_id, prev_ids, edu_content) @@ -525,7 +510,10 @@ class DeviceListEduUpdater(object): for device_id, stream_id, prev_ids, content in pending_updates: logger.debug( "Handling update %r/%r, ID: %r, prev: %r ", - user_id, device_id, stream_id, prev_ids, + user_id, + device_id, + stream_id, + prev_ids, ) # Given a list of updates we check if we need to resync. This @@ -540,13 +528,13 @@ class DeviceListEduUpdater(object): try: result = yield self.federation.query_user_devices(origin, user_id) except ( - NotRetryingDestination, RequestSendFailed, HttpResponseException, + NotRetryingDestination, + RequestSendFailed, + HttpResponseException, ): # TODO: Remember that we are now out of sync and try again # later - logger.warn( - "Failed to handle device list update for %s", user_id, - ) + logger.warn("Failed to handle device list update for %s", user_id) # We abort on exceptions rather than accepting the update # as otherwise synapse will 'forget' that its device list # is out of date. If we bail then we will retry the resync @@ -582,18 +570,21 @@ class DeviceListEduUpdater(object): if len(devices) > 1000: logger.warn( "Ignoring device list snapshot for %s as it has >1K devs (%d)", - user_id, len(devices) + user_id, + len(devices), ) devices = [] for device in devices: logger.debug( "Handling resync update %r/%r, ID: %r", - user_id, device["device_id"], stream_id, + user_id, + device["device_id"], + stream_id, ) yield self.store.update_remote_device_list_cache( - user_id, devices, stream_id, + user_id, devices, stream_id ) device_ids = [device["device_id"] for device in devices] yield self.device_handler.notify_device_update(user_id, device_ids) @@ -606,7 +597,7 @@ class DeviceListEduUpdater(object): # change (because of the single prev_id matching the current cache) for device_id, stream_id, prev_ids, content in pending_updates: yield self.store.update_remote_device_list_cache_entry( - user_id, device_id, content, stream_id, + user_id, device_id, content, stream_id ) yield self.device_handler.notify_device_update( @@ -624,14 +615,9 @@ class DeviceListEduUpdater(object): """ seen_updates = self._seen_updates.get(user_id, set()) - extremity = yield self.store.get_device_list_last_stream_id_for_remote( - user_id - ) + extremity = yield self.store.get_device_list_last_stream_id_for_remote(user_id) - logger.debug( - "Current extremity for %r: %r", - user_id, extremity, - ) + logger.debug("Current extremity for %r: %r", user_id, extremity) stream_id_in_updates = set() # stream_ids in updates list for _, stream_id, prev_ids, _ in updates: |