From 28989cb301fecf5a669a634c09bc2b73f97fec5d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 27 May 2022 17:47:32 +0200 Subject: Add a background job to automatically delete stale devices (#12855) Co-authored-by: Patrick Cloke --- changelog.d/12855.feature | 1 + docs/usage/configuration/config_documentation.md | 12 ++ synapse/config/server.py | 11 ++ synapse/handlers/device.py | 30 +++- synapse/storage/databases/main/devices.py | 39 +++++ tests/rest/client/test_device_lists.py | 159 ------------------ tests/rest/client/test_devices.py | 202 +++++++++++++++++++++++ 7 files changed, 294 insertions(+), 160 deletions(-) create mode 100644 changelog.d/12855.feature delete mode 100644 tests/rest/client/test_device_lists.py create mode 100644 tests/rest/client/test_devices.py diff --git a/changelog.d/12855.feature b/changelog.d/12855.feature new file mode 100644 index 0000000000..915f008ec6 --- /dev/null +++ b/changelog.d/12855.feature @@ -0,0 +1 @@ +Add a configurable background job to delete stale devices. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index b71b09ba96..88b9e5744d 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -575,6 +575,18 @@ Example configuration: dummy_events_threshold: 5 ``` --- +Config option `delete_stale_devices_after` + +An optional duration. If set, Synapse will run a daily background task to log out and +delete any device that hasn't been accessed for more than the specified amount of time. + +Defaults to no duration, which means devices are never pruned. + +Example configuration: +```yaml +delete_stale_devices_after: 1y +``` + ## Homeserver blocking ## Useful options for Synapse admins. diff --git a/synapse/config/server.py b/synapse/config/server.py index f73d5e1f66..657322cb1f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -679,6 +679,17 @@ class ServerConfig(Config): config.get("exclude_rooms_from_sync") or [] ) + delete_stale_devices_after: Optional[str] = ( + config.get("delete_stale_devices_after") or None + ) + + if delete_stale_devices_after is not None: + self.delete_stale_devices_after: Optional[int] = self.parse_duration( + delete_stale_devices_after + ) + else: + self.delete_stale_devices_after = None + def has_tls_listener(self) -> bool: return any(listener.tls for listener in self.listeners) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 438a549339..2a56473dc6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -61,6 +61,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) MAX_DEVICE_DISPLAY_NAME_LEN = 100 +DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 class DeviceWorkerHandler: @@ -295,6 +296,19 @@ class DeviceHandler(DeviceWorkerHandler): # On start up check if there are any updates pending. hs.get_reactor().callWhenRunning(self._handle_new_device_update_async) + self._delete_stale_devices_after = hs.config.server.delete_stale_devices_after + + # Ideally we would run this on a worker and condition this on the + # "run_background_tasks_on" setting, but this would mean making the notification + # of device list changes over federation work on workers, which is nontrivial. + if self._delete_stale_devices_after is not None: + self.clock.looping_call( + run_as_background_process, + DELETE_STALE_DEVICES_INTERVAL_MS, + "delete_stale_devices", + self._delete_stale_devices, + ) + def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -370,6 +384,19 @@ class DeviceHandler(DeviceWorkerHandler): raise errors.StoreError(500, "Couldn't generate a device ID.") + async def _delete_stale_devices(self) -> None: + """Background task that deletes devices which haven't been accessed for more than + a configured time period. + """ + # We should only be running this job if the config option is defined. + assert self._delete_stale_devices_after is not None + now_ms = self.clock.time_msec() + since_ms = now_ms - self._delete_stale_devices_after + devices = await self.store.get_local_devices_not_accessed_since(since_ms) + + for user_id, user_devices in devices.items(): + await self.delete_devices(user_id, user_devices) + @trace async def delete_device(self, user_id: str, device_id: str) -> None: """Delete the given device @@ -692,7 +719,8 @@ class DeviceHandler(DeviceWorkerHandler): ) # TODO: when called, this isn't in a logging context. # This leads to log spam, sentry event spam, and massive - # memory usage. See #12552. + # memory usage. + # See https://github.com/matrix-org/synapse/issues/12552. # log_kv( # {"message": "sent device update to host", "host": host} # ) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index dd43bae784..d900064c07 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1154,6 +1154,45 @@ class DeviceWorkerStore(SQLBaseStore): _prune_txn, ) + async def get_local_devices_not_accessed_since( + self, since_ms: int + ) -> Dict[str, List[str]]: + """Retrieves local devices that haven't been accessed since a given date. + + Args: + since_ms: the timestamp to select on, every device with a last access date + from before that time is returned. + + Returns: + A dictionary with an entry for each user with at least one device matching + the request, which value is a list of the device ID(s) for the corresponding + device(s). + """ + + def get_devices_not_accessed_since_txn( + txn: LoggingTransaction, + ) -> List[Dict[str, str]]: + sql = """ + SELECT user_id, device_id + FROM devices WHERE last_seen < ? AND hidden = FALSE + """ + txn.execute(sql, (since_ms,)) + return self.db_pool.cursor_to_dict(txn) + + rows = await self.db_pool.runInteraction( + "get_devices_not_accessed_since", + get_devices_not_accessed_since_txn, + ) + + devices: Dict[str, List[str]] = {} + for row in rows: + # Remote devices are never stale from our point of view. + if self.hs.is_mine_id(row["user_id"]): + user_devices = devices.setdefault(row["user_id"], []) + user_devices.append(row["device_id"]) + + return devices + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__( diff --git a/tests/rest/client/test_device_lists.py b/tests/rest/client/test_device_lists.py deleted file mode 100644 index a8af4e2435..0000000000 --- a/tests/rest/client/test_device_lists.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright 2022 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from http import HTTPStatus - -from synapse.rest import admin, devices, room, sync -from synapse.rest.client import account, login, register - -from tests import unittest - - -class DeviceListsTestCase(unittest.HomeserverTestCase): - """Tests regarding device list changes.""" - - servlets = [ - admin.register_servlets_for_client_rest_resource, - login.register_servlets, - register.register_servlets, - account.register_servlets, - room.register_servlets, - sync.register_servlets, - devices.register_servlets, - ] - - def test_receiving_local_device_list_changes(self) -> None: - """Tests that a local users that share a room receive each other's device list - changes. - """ - # Register two users - test_device_id = "TESTDEVICE" - alice_user_id = self.register_user("alice", "correcthorse") - alice_access_token = self.login( - alice_user_id, "correcthorse", device_id=test_device_id - ) - - bob_user_id = self.register_user("bob", "ponyponypony") - bob_access_token = self.login(bob_user_id, "ponyponypony") - - # Create a room for them to coexist peacefully in - new_room_id = self.helper.create_room_as( - alice_user_id, is_public=True, tok=alice_access_token - ) - self.assertIsNotNone(new_room_id) - - # Have Bob join the room - self.helper.invite( - new_room_id, alice_user_id, bob_user_id, tok=alice_access_token - ) - self.helper.join(new_room_id, bob_user_id, tok=bob_access_token) - - # Now have Bob initiate an initial sync (in order to get a since token) - channel = self.make_request( - "GET", - "/sync", - access_token=bob_access_token, - ) - self.assertEqual(channel.code, 200, channel.json_body) - next_batch_token = channel.json_body["next_batch"] - - # ...and then an incremental sync. This should block until the sync stream is woken up, - # which we hope will happen as a result of Alice updating their device list. - bob_sync_channel = self.make_request( - "GET", - f"/sync?since={next_batch_token}&timeout=30000", - access_token=bob_access_token, - # Start the request, then continue on. - await_result=False, - ) - - # Have alice update their device list - channel = self.make_request( - "PUT", - f"/devices/{test_device_id}", - { - "display_name": "New Device Name", - }, - access_token=alice_access_token, - ) - self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) - - # Check that bob's incremental sync contains the updated device list. - # If not, the client would only receive the device list update on the - # *next* sync. - bob_sync_channel.await_result() - self.assertEqual(bob_sync_channel.code, 200, bob_sync_channel.json_body) - - changed_device_lists = bob_sync_channel.json_body.get("device_lists", {}).get( - "changed", [] - ) - self.assertIn(alice_user_id, changed_device_lists, bob_sync_channel.json_body) - - def test_not_receiving_local_device_list_changes(self) -> None: - """Tests a local users DO NOT receive device updates from each other if they do not - share a room. - """ - # Register two users - test_device_id = "TESTDEVICE" - alice_user_id = self.register_user("alice", "correcthorse") - alice_access_token = self.login( - alice_user_id, "correcthorse", device_id=test_device_id - ) - - bob_user_id = self.register_user("bob", "ponyponypony") - bob_access_token = self.login(bob_user_id, "ponyponypony") - - # These users do not share a room. They are lonely. - - # Have Bob initiate an initial sync (in order to get a since token) - channel = self.make_request( - "GET", - "/sync", - access_token=bob_access_token, - ) - self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) - next_batch_token = channel.json_body["next_batch"] - - # ...and then an incremental sync. This should block until the sync stream is woken up, - # which we hope will happen as a result of Alice updating their device list. - bob_sync_channel = self.make_request( - "GET", - f"/sync?since={next_batch_token}&timeout=1000", - access_token=bob_access_token, - # Start the request, then continue on. - await_result=False, - ) - - # Have alice update their device list - channel = self.make_request( - "PUT", - f"/devices/{test_device_id}", - { - "display_name": "New Device Name", - }, - access_token=alice_access_token, - ) - self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) - - # Check that bob's incremental sync does not contain the updated device list. - bob_sync_channel.await_result() - self.assertEqual( - bob_sync_channel.code, HTTPStatus.OK, bob_sync_channel.json_body - ) - - changed_device_lists = bob_sync_channel.json_body.get("device_lists", {}).get( - "changed", [] - ) - self.assertNotIn( - alice_user_id, changed_device_lists, bob_sync_channel.json_body - ) diff --git a/tests/rest/client/test_devices.py b/tests/rest/client/test_devices.py new file mode 100644 index 0000000000..aa98222434 --- /dev/null +++ b/tests/rest/client/test_devices.py @@ -0,0 +1,202 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from http import HTTPStatus + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.errors import NotFoundError +from synapse.rest import admin, devices, room, sync +from synapse.rest.client import account, login, register +from synapse.server import HomeServer +from synapse.util import Clock + +from tests import unittest + + +class DeviceListsTestCase(unittest.HomeserverTestCase): + """Tests regarding device list changes.""" + + servlets = [ + admin.register_servlets_for_client_rest_resource, + login.register_servlets, + register.register_servlets, + account.register_servlets, + room.register_servlets, + sync.register_servlets, + devices.register_servlets, + ] + + def test_receiving_local_device_list_changes(self) -> None: + """Tests that a local users that share a room receive each other's device list + changes. + """ + # Register two users + test_device_id = "TESTDEVICE" + alice_user_id = self.register_user("alice", "correcthorse") + alice_access_token = self.login( + alice_user_id, "correcthorse", device_id=test_device_id + ) + + bob_user_id = self.register_user("bob", "ponyponypony") + bob_access_token = self.login(bob_user_id, "ponyponypony") + + # Create a room for them to coexist peacefully in + new_room_id = self.helper.create_room_as( + alice_user_id, is_public=True, tok=alice_access_token + ) + self.assertIsNotNone(new_room_id) + + # Have Bob join the room + self.helper.invite( + new_room_id, alice_user_id, bob_user_id, tok=alice_access_token + ) + self.helper.join(new_room_id, bob_user_id, tok=bob_access_token) + + # Now have Bob initiate an initial sync (in order to get a since token) + channel = self.make_request( + "GET", + "/sync", + access_token=bob_access_token, + ) + self.assertEqual(channel.code, 200, channel.json_body) + next_batch_token = channel.json_body["next_batch"] + + # ...and then an incremental sync. This should block until the sync stream is woken up, + # which we hope will happen as a result of Alice updating their device list. + bob_sync_channel = self.make_request( + "GET", + f"/sync?since={next_batch_token}&timeout=30000", + access_token=bob_access_token, + # Start the request, then continue on. + await_result=False, + ) + + # Have alice update their device list + channel = self.make_request( + "PUT", + f"/devices/{test_device_id}", + { + "display_name": "New Device Name", + }, + access_token=alice_access_token, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # Check that bob's incremental sync contains the updated device list. + # If not, the client would only receive the device list update on the + # *next* sync. + bob_sync_channel.await_result() + self.assertEqual(bob_sync_channel.code, 200, bob_sync_channel.json_body) + + changed_device_lists = bob_sync_channel.json_body.get("device_lists", {}).get( + "changed", [] + ) + self.assertIn(alice_user_id, changed_device_lists, bob_sync_channel.json_body) + + def test_not_receiving_local_device_list_changes(self) -> None: + """Tests a local users DO NOT receive device updates from each other if they do not + share a room. + """ + # Register two users + test_device_id = "TESTDEVICE" + alice_user_id = self.register_user("alice", "correcthorse") + alice_access_token = self.login( + alice_user_id, "correcthorse", device_id=test_device_id + ) + + bob_user_id = self.register_user("bob", "ponyponypony") + bob_access_token = self.login(bob_user_id, "ponyponypony") + + # These users do not share a room. They are lonely. + + # Have Bob initiate an initial sync (in order to get a since token) + channel = self.make_request( + "GET", + "/sync", + access_token=bob_access_token, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + next_batch_token = channel.json_body["next_batch"] + + # ...and then an incremental sync. This should block until the sync stream is woken up, + # which we hope will happen as a result of Alice updating their device list. + bob_sync_channel = self.make_request( + "GET", + f"/sync?since={next_batch_token}&timeout=1000", + access_token=bob_access_token, + # Start the request, then continue on. + await_result=False, + ) + + # Have alice update their device list + channel = self.make_request( + "PUT", + f"/devices/{test_device_id}", + { + "display_name": "New Device Name", + }, + access_token=alice_access_token, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # Check that bob's incremental sync does not contain the updated device list. + bob_sync_channel.await_result() + self.assertEqual( + bob_sync_channel.code, HTTPStatus.OK, bob_sync_channel.json_body + ) + + changed_device_lists = bob_sync_channel.json_body.get("device_lists", {}).get( + "changed", [] + ) + self.assertNotIn( + alice_user_id, changed_device_lists, bob_sync_channel.json_body + ) + + +class DevicesTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + login.register_servlets, + sync.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.handler = hs.get_device_handler() + + @unittest.override_config({"delete_stale_devices_after": 72000000}) + def test_delete_stale_devices(self) -> None: + """Tests that stale devices are automatically removed after a set time of + inactivity. + The configuration is set to delete devices that haven't been used in the past 20h. + """ + # Register a user and creates 2 devices for them. + user_id = self.register_user("user", "password") + tok1 = self.login("user", "password", device_id="abc") + tok2 = self.login("user", "password", device_id="def") + + # Sync them so they have a last_seen value. + self.make_request("GET", "/sync", access_token=tok1) + self.make_request("GET", "/sync", access_token=tok2) + + # Advance half a day and sync again with one of the devices, so that the next + # time the background job runs we don't delete this device (since it will look + # for devices that haven't been used for over an hour). + self.reactor.advance(43200) + self.make_request("GET", "/sync", access_token=tok1) + + # Advance another half a day, and check that the device that has synced still + # exists but the one that hasn't has been removed. + self.reactor.advance(43200) + self.get_success(self.handler.get_device(user_id, "abc")) + self.get_failure(self.handler.get_device(user_id, "def"), NotFoundError) -- cgit 1.4.1