diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9bd941b5a0..29a19b4572 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,6 +29,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
@@ -535,6 +536,15 @@ class DeviceListUpdater(object):
iterable=True,
)
+ # Attempt to resync out of sync device lists every 30s.
+ self._resync_retry_in_progress = False
+ self.clock.looping_call(
+ run_as_background_process,
+ 30 * 1000,
+ func=self._maybe_retry_device_resync,
+ desc="_maybe_retry_device_resync",
+ )
+
@trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
@@ -679,11 +689,50 @@ class DeviceListUpdater(object):
return False
@defer.inlineCallbacks
- def user_device_resync(self, user_id):
+ def _maybe_retry_device_resync(self):
+ """Retry to resync device lists that are out of sync, except if another retry is
+ in progress.
+ """
+ if self._resync_retry_in_progress:
+ return
+
+ try:
+ # Prevent another call of this function to retry resyncing device lists so
+ # we don't send too many requests.
+ self._resync_retry_in_progress = True
+ # Get all of the users that need resyncing.
+ need_resync = yield self.store.get_user_ids_requiring_device_list_resync()
+ # Iterate over the set of user IDs.
+ for user_id in need_resync:
+ # Try to resync the current user's devices list. Exception handling
+ # isn't necessary here, since user_device_resync catches all instances
+ # of "Exception" that might be raised from the federation request. This
+ # means that if an exception is raised by this function, it must be
+ # because of a database issue, which means _maybe_retry_device_resync
+ # probably won't be able to go much further anyway.
+ result = yield self.user_device_resync(
+ user_id=user_id, mark_failed_as_stale=False,
+ )
+ # user_device_resync only returns a result if it managed to successfully
+ # resync and update the database. Updating the table of users requiring
+ # resync isn't necessary here as user_device_resync already does it
+ # (through self.store.update_remote_device_list_cache).
+ if result:
+ logger.debug(
+ "Successfully resynced the device list for %s" % user_id,
+ )
+ finally:
+ # Allow future calls to retry resyncinc out of sync device lists.
+ self._resync_retry_in_progress = False
+
+ @defer.inlineCallbacks
+ def user_device_resync(self, user_id, mark_failed_as_stale=True):
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id (str): The user's id whose device_list will be updated.
+ mark_failed_as_stale (bool): Whether to mark the user's device list as stale
+ if the attempt to resync failed.
Returns:
Deferred[dict]: a dict with device info as under the "devices" in the result of this
request:
@@ -694,10 +743,23 @@ class DeviceListUpdater(object):
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
- except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
- # TODO: Remember that we are now out of sync and try again
- # later
- logger.warning("Failed to handle device list update for %s", user_id)
+ except NotRetryingDestination:
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
+ return
+ except (RequestSendFailed, HttpResponseException) as e:
+ logger.warning(
+ "Failed to handle device list update for %s: %s", user_id, e,
+ )
+
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
@@ -711,13 +773,17 @@ class DeviceListUpdater(object):
logger.info(e)
return
except Exception as e:
- # TODO: Remember that we are now out of sync and try again
- # later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
+
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
return
log_kv({"result": result})
stream_id = result["stream_id"]
|