summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/7453.bugfix1
-rw-r--r--synapse/handlers/device.py80
-rw-r--r--synapse/storage/data_stores/main/devices.py34
-rw-r--r--tests/test_federation.py63
4 files changed, 158 insertions, 20 deletions
diff --git a/changelog.d/7453.bugfix b/changelog.d/7453.bugfix
new file mode 100644
index 0000000000..629a3d61e2
--- /dev/null
+++ b/changelog.d/7453.bugfix
@@ -0,0 +1 @@
+Fix a bug that would cause Synapse not to resync out-of-sync device lists.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9bd941b5a0..29a19b4572 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,6 +29,7 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import RoomStreamToken, get_domain_from_id
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
@@ -535,6 +536,15 @@ class DeviceListUpdater(object):
             iterable=True,
         )
 
+        # Attempt to resync out of sync device lists every 30s.
+        self._resync_retry_in_progress = False
+        self.clock.looping_call(
+            run_as_background_process,
+            30 * 1000,
+            func=self._maybe_retry_device_resync,
+            desc="_maybe_retry_device_resync",
+        )
+
     @trace
     @defer.inlineCallbacks
     def incoming_device_list_update(self, origin, edu_content):
@@ -679,11 +689,50 @@ class DeviceListUpdater(object):
         return False
 
     @defer.inlineCallbacks
-    def user_device_resync(self, user_id):
+    def _maybe_retry_device_resync(self):
+        """Retry to resync device lists that are out of sync, except if another retry is
+        in progress.
+        """
+        if self._resync_retry_in_progress:
+            return
+
+        try:
+            # Prevent another call of this function to retry resyncing device lists so
+            # we don't send too many requests.
+            self._resync_retry_in_progress = True
+            # Get all of the users that need resyncing.
+            need_resync = yield self.store.get_user_ids_requiring_device_list_resync()
+            # Iterate over the set of user IDs.
+            for user_id in need_resync:
+                # Try to resync the current user's devices list. Exception handling
+                # isn't necessary here, since user_device_resync catches all instances
+                # of "Exception" that might be raised from the federation request. This
+                # means that if an exception is raised by this function, it must be
+                # because of a database issue, which means _maybe_retry_device_resync
+                # probably won't be able to go much further anyway.
+                result = yield self.user_device_resync(
+                    user_id=user_id, mark_failed_as_stale=False,
+                )
+                # user_device_resync only returns a result if it managed to successfully
+                # resync and update the database. Updating the table of users requiring
+                # resync isn't necessary here as user_device_resync already does it
+                # (through self.store.update_remote_device_list_cache).
+                if result:
+                    logger.debug(
+                        "Successfully resynced the device list for %s" % user_id,
+                    )
+        finally:
+            # Allow future calls to retry resyncinc out of sync device lists.
+            self._resync_retry_in_progress = False
+
+    @defer.inlineCallbacks
+    def user_device_resync(self, user_id, mark_failed_as_stale=True):
         """Fetches all devices for a user and updates the device cache with them.
 
         Args:
             user_id (str): The user's id whose device_list will be updated.
+            mark_failed_as_stale (bool): Whether to mark the user's device list as stale
+                if the attempt to resync failed.
         Returns:
             Deferred[dict]: a dict with device info as under the "devices" in the result of this
             request:
@@ -694,10 +743,23 @@ class DeviceListUpdater(object):
         origin = get_domain_from_id(user_id)
         try:
             result = yield self.federation.query_user_devices(origin, user_id)
-        except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
-            # TODO: Remember that we are now out of sync and try again
-            # later
-            logger.warning("Failed to handle device list update for %s", user_id)
+        except NotRetryingDestination:
+            if mark_failed_as_stale:
+                # Mark the remote user's device list as stale so we know we need to retry
+                # it later.
+                yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
+            return
+        except (RequestSendFailed, HttpResponseException) as e:
+            logger.warning(
+                "Failed to handle device list update for %s: %s", user_id, e,
+            )
+
+            if mark_failed_as_stale:
+                # Mark the remote user's device list as stale so we know we need to retry
+                # it later.
+                yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
             # We abort on exceptions rather than accepting the update
             # as otherwise synapse will 'forget' that its device list
             # is out of date. If we bail then we will retry the resync
@@ -711,13 +773,17 @@ class DeviceListUpdater(object):
             logger.info(e)
             return
         except Exception as e:
-            # TODO: Remember that we are now out of sync and try again
-            # later
             set_tag("error", True)
             log_kv(
                 {"message": "Exception raised by federation request", "exception": e}
             )
             logger.exception("Failed to handle device list update for %s", user_id)
+
+            if mark_failed_as_stale:
+                # Mark the remote user's device list as stale so we know we need to retry
+                # it later.
+                yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
             return
         log_kv({"result": result})
         stream_id = result["stream_id"]
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index fe6d6ecfe0..0e8378714a 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List, Tuple
+from typing import List, Optional, Set, Tuple
 
 from six import iteritems
 
@@ -649,21 +649,31 @@ class DeviceWorkerStore(SQLBaseStore):
         return results
 
     @defer.inlineCallbacks
-    def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]):
+    def get_user_ids_requiring_device_list_resync(
+        self, user_ids: Optional[Collection[str]] = None,
+    ) -> Set[str]:
         """Given a list of remote users return the list of users that we
-        should resync the device lists for.
+        should resync the device lists for. If None is given instead of a list,
+        return every user that we should resync the device lists for.
 
         Returns:
-            Deferred[Set[str]]
+            The IDs of users whose device lists need resync.
         """
-
-        rows = yield self.db.simple_select_many_batch(
-            table="device_lists_remote_resync",
-            column="user_id",
-            iterable=user_ids,
-            retcols=("user_id",),
-            desc="get_user_ids_requiring_device_list_resync",
-        )
+        if user_ids:
+            rows = yield self.db.simple_select_many_batch(
+                table="device_lists_remote_resync",
+                column="user_id",
+                iterable=user_ids,
+                retcols=("user_id",),
+                desc="get_user_ids_requiring_device_list_resync_with_iterable",
+            )
+        else:
+            rows = yield self.db.simple_select_list(
+                table="device_lists_remote_resync",
+                keyvalues=None,
+                retcols=("user_id",),
+                desc="get_user_ids_requiring_device_list_resync",
+            )
 
         return {row["user_id"] for row in rows}
 
diff --git a/tests/test_federation.py b/tests/test_federation.py
index f297de95f1..13ff14863e 100644
--- a/tests/test_federation.py
+++ b/tests/test_federation.py
@@ -6,12 +6,13 @@ from synapse.events import make_event_from_dict
 from synapse.logging.context import LoggingContext
 from synapse.types import Requester, UserID
 from synapse.util import Clock
+from synapse.util.retryutils import NotRetryingDestination
 
 from tests import unittest
 from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver
 
 
-class MessageAcceptTests(unittest.TestCase):
+class MessageAcceptTests(unittest.HomeserverTestCase):
     def setUp(self):
 
         self.http_client = Mock()
@@ -145,3 +146,63 @@ class MessageAcceptTests(unittest.TestCase):
         # Make sure the invalid event isn't there
         extrem = maybeDeferred(self.store.get_latest_event_ids_in_room, self.room_id)
         self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv")
+
+    def test_retry_device_list_resync(self):
+        """Tests that device lists are marked as stale if they couldn't be synced, and
+        that stale device lists are retried periodically.
+        """
+        remote_user_id = "@john:test_remote"
+        remote_origin = "test_remote"
+
+        # Track the number of attempts to resync the user's device list.
+        self.resync_attempts = 0
+
+        # When this function is called, increment the number of resync attempts (only if
+        # we're querying devices for the right user ID), then raise a
+        # NotRetryingDestination error to fail the resync gracefully.
+        def query_user_devices(destination, user_id):
+            if user_id == remote_user_id:
+                self.resync_attempts += 1
+
+            raise NotRetryingDestination(0, 0, destination)
+
+        # Register the mock on the federation client.
+        federation_client = self.homeserver.get_federation_client()
+        federation_client.query_user_devices = Mock(side_effect=query_user_devices)
+
+        # Register a mock on the store so that the incoming update doesn't fail because
+        # we don't share a room with the user.
+        store = self.homeserver.get_datastore()
+        store.get_rooms_for_user = Mock(return_value=["!someroom:test"])
+
+        # Manually inject a fake device list update. We need this update to include at
+        # least one prev_id so that the user's device list will need to be retried.
+        device_list_updater = self.homeserver.get_device_handler().device_list_updater
+        self.get_success(
+            device_list_updater.incoming_device_list_update(
+                origin=remote_origin,
+                edu_content={
+                    "deleted": False,
+                    "device_display_name": "Mobile",
+                    "device_id": "QBUAZIFURK",
+                    "prev_id": [5],
+                    "stream_id": 6,
+                    "user_id": remote_user_id,
+                },
+            )
+        )
+
+        # Check that there was one resync attempt.
+        self.assertEqual(self.resync_attempts, 1)
+
+        # Check that the resync attempt failed and caused the user's device list to be
+        # marked as stale.
+        need_resync = self.get_success(
+            store.get_user_ids_requiring_device_list_resync()
+        )
+        self.assertIn(remote_user_id, need_resync)
+
+        # Check that waiting for 30 seconds caused Synapse to retry resyncing the device
+        # list.
+        self.reactor.advance(30)
+        self.assertEqual(self.resync_attempts, 2)