From e17a11066192354f6c6144135a14e7abe524f44c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Jan 2020 14:43:21 +0000 Subject: Detect unknown remote devices and mark cache as stale (#6776) We just mark the fact that the cache may be stale in the database for now. --- synapse/handlers/devicemessage.py | 57 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) (limited to 'synapse/handlers/devicemessage.py') diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 73b9e120f5..5c5fe77be2 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Any, Dict from canonicaljson import json @@ -65,6 +66,9 @@ class DeviceMessageHandler(object): logger.warning("Request for keys for non-local user %s", user_id) raise SynapseError(400, "Not a user here") + if not by_device: + continue + messages_by_device = { device_id: { "content": message_content, @@ -73,8 +77,11 @@ class DeviceMessageHandler(object): } for device_id, message_content in by_device.items() } - if messages_by_device: - local_messages[user_id] = messages_by_device + local_messages[user_id] = messages_by_device + + yield self._check_for_unknown_devices( + message_type, sender_user_id, by_device + ) stream_id = yield self.store.add_messages_from_remote_to_device_inbox( origin, message_id, local_messages @@ -84,6 +91,52 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) + @defer.inlineCallbacks + def _check_for_unknown_devices( + self, + message_type: str, + sender_user_id: str, + by_device: Dict[str, Dict[str, Any]], + ): + """Checks inbound device messages for unkown remote devices, and if + found marks the remote cache for the user as stale. + """ + + if message_type != "m.room_key_request": + return + + # Get the sending device IDs + requesting_device_ids = set() + for message_content in by_device.values(): + device_id = message_content.get("requesting_device_id") + requesting_device_ids.add(device_id) + + # Check if we are tracking the devices of the remote user. + room_ids = yield self.store.get_rooms_for_user(sender_user_id) + if not room_ids: + logger.info( + "Received device message from remote device we don't" + " share a room with: %s %s", + sender_user_id, + requesting_device_ids, + ) + return + + # If we are tracking check that we know about the sending + # devices. + cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id) + + unknown_devices = requesting_device_ids - set(cached_devices) + if unknown_devices: + logger.info( + "Received device message from remote device not in our cache: %s %s", + sender_user_id, + unknown_devices, + ) + yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id) + # TODO: Poke something to start trying to refetch user's + # keys. + @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): set_tag("number_of_messages", len(messages)) -- cgit 1.5.1 From b660327056cdced860d532ab2404a26946da7ef5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Jan 2020 17:06:38 +0000 Subject: Resync remote device list when detected as stale. (#6786) --- changelog.d/6786.misc | 1 + synapse/handlers/devicemessage.py | 10 ++++++++-- synapse/handlers/federation.py | 18 ++++++++++++++++-- tests/handlers/test_typing.py | 6 +++--- 4 files changed, 28 insertions(+), 7 deletions(-) create mode 100644 changelog.d/6786.misc (limited to 'synapse/handlers/devicemessage.py') diff --git a/changelog.d/6786.misc b/changelog.d/6786.misc new file mode 100644 index 0000000000..94c692e53a --- /dev/null +++ b/changelog.d/6786.misc @@ -0,0 +1 @@ +Attempt to resync remote users' devices when detected as stale. diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 5c5fe77be2..05c4b3eec0 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( get_active_span_text_map, log_kv, @@ -48,6 +49,8 @@ class DeviceMessageHandler(object): "m.direct_to_device", self.on_direct_to_device_edu ) + self._device_list_updater = hs.get_device_handler().device_list_updater + @defer.inlineCallbacks def on_direct_to_device_edu(self, origin, content): local_messages = {} @@ -134,8 +137,11 @@ class DeviceMessageHandler(object): unknown_devices, ) yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id) - # TODO: Poke something to start trying to refetch user's - # keys. + + # Immediately attempt a resync in the background + run_in_background( + self._device_list_updater.user_device_resync, sender_user_id + ) @defer.inlineCallbacks def send_device_message(self, sender_user_id, message_type, messages): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a67020a259..ca484e5458 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -57,6 +57,7 @@ from synapse.logging.context import ( run_in_background, ) from synapse.logging.utils import log_function +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, ReplicationFederationSendEventsRestServlet, @@ -156,6 +157,13 @@ class FederationHandler(BaseHandler): hs ) + if hs.config.worker_app: + self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client( + hs + ) + else: + self._device_list_updater = hs.get_device_handler().device_list_updater + # When joining a room we need to queue any events for that room up self.room_queues = {} self._room_pdu_linearizer = Linearizer("fed_room_pdu") @@ -759,8 +767,14 @@ class FederationHandler(BaseHandler): await self.store.mark_remote_user_device_cache_as_stale( event.sender ) - # TODO: Poke something to start trying to refetch user's - # keys. + + # Immediately attempt a resync in the background + if self.config.worker_app: + return run_in_background(self._user_device_resync, event.sender) + else: + return run_in_background( + self._device_list_updater.user_device_resync, event.sender + ) @log_function async def backfill(self, dest, room_id, limit, extremities): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 596ddc6970..68b9847bd2 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -81,6 +81,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ] ) + # the tests assume that we are starting at unix time 1000 + reactor.pump((1000,)) + hs = self.setup_test_homeserver( notifier=Mock(), http_client=mock_federation_client, keyring=mock_keyring ) @@ -90,9 +93,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): return hs def prepare(self, reactor, clock, hs): - # the tests assume that we are starting at unix time 1000 - reactor.pump((1000,)) - mock_notifier = hs.get_notifier() self.on_new_event = mock_notifier.on_new_event -- cgit 1.5.1