diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ae1d9337ad..b9d3b7fbc6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
@@ -921,12 +920,8 @@ class DeviceListWorkerUpdater:
def __init__(self, hs: "HomeServer"):
from synapse.replication.http.devices import (
ReplicationMultiUserDevicesResyncRestServlet,
- ReplicationUserDevicesResyncRestServlet,
)
- self._user_device_resync_client = (
- ReplicationUserDevicesResyncRestServlet.make_client(hs)
- )
self._multi_user_device_resync_client = (
ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
@@ -948,37 +943,7 @@ class DeviceListWorkerUpdater:
# Shortcut empty requests
return {}
- try:
- return await self._multi_user_device_resync_client(user_ids=user_ids)
- except SynapseError as err:
- if not (
- err.code == HTTPStatus.NOT_FOUND and err.errcode == Codes.UNRECOGNIZED
- ):
- raise
-
- # Fall back to single requests
- result: Dict[str, Optional[JsonDict]] = {}
- for user_id in user_ids:
- result[user_id] = await self._user_device_resync_client(user_id=user_id)
- return result
-
- async def user_device_resync(
- self, user_id: str, mark_failed_as_stale: bool = True
- ) -> Optional[JsonDict]:
- """Fetches all devices for a user and updates the device cache with them.
-
- Args:
- user_id: The user's id whose device_list will be updated.
- mark_failed_as_stale: Whether to mark the user's device list as stale
- if the attempt to resync failed.
- Returns:
- A dict with device info as under the "devices" in the result of this
- request:
- https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
- None when we weren't able to fetch the device info for some reason,
- e.g. due to a connection problem.
- """
- return (await self.multi_user_device_resync([user_id]))[user_id]
+ return await self._multi_user_device_resync_client(user_ids=user_ids)
class DeviceListUpdater(DeviceListWorkerUpdater):
@@ -1131,7 +1096,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
)
if resync:
- await self.user_device_resync(user_id)
+ await self.multi_user_device_resync([user_id])
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
@@ -1198,10 +1163,9 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
for user_id in need_resync:
try:
# Try to resync the current user's devices list.
- result = await self.user_device_resync(
- user_id=user_id,
- mark_failed_as_stale=False,
- )
+ result = (await self.multi_user_device_resync([user_id], False))[
+ user_id
+ ]
# user_device_resync only returns a result if it managed to
# successfully resync and update the database. Updating the table
@@ -1260,18 +1224,6 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
return result
- async def user_device_resync(
- self, user_id: str, mark_failed_as_stale: bool = True
- ) -> Optional[JsonDict]:
- result, failed = await self._user_device_resync_returning_failed(user_id)
-
- if failed and mark_failed_as_stale:
- # Mark the remote user's device list as stale so we know we need to retry
- # it later.
- await self.store.mark_remote_users_device_caches_as_stale((user_id,))
-
- return result
-
async def _user_device_resync_returning_failed(
self, user_id: str
) -> Tuple[Optional[JsonDict], bool]:
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 00c403db49..3caf9b31cc 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -25,7 +25,9 @@ from synapse.logging.opentracing import (
log_kv,
set_tag,
)
-from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
+from synapse.replication.http.devices import (
+ ReplicationMultiUserDevicesResyncRestServlet,
+)
from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
from synapse.util import json_encoder
from synapse.util.stringutils import random_string
@@ -71,12 +73,12 @@ class DeviceMessageHandler:
# sync. We do all device list resyncing on the master instance, so if
# we're on a worker we hit the device resync replication API.
if hs.config.worker.worker_app is None:
- self._user_device_resync = (
- hs.get_device_handler().device_list_updater.user_device_resync
+ self._multi_user_device_resync = (
+ hs.get_device_handler().device_list_updater.multi_user_device_resync
)
else:
- self._user_device_resync = (
- ReplicationUserDevicesResyncRestServlet.make_client(hs)
+ self._multi_user_device_resync = (
+ ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
# a rate limiter for room key requests. The keys are
@@ -198,7 +200,7 @@ class DeviceMessageHandler:
await self.store.mark_remote_users_device_caches_as_stale((sender_user_id,))
# Immediately attempt a resync in the background
- run_in_background(self._user_device_resync, user_id=sender_user_id)
+ run_in_background(self._multi_user_device_resync, user_ids=[sender_user_id])
async def send_device_message(
self,
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 8d5be81a92..06609fab93 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -70,7 +70,9 @@ from synapse.logging.opentracing import (
trace,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
+from synapse.replication.http.devices import (
+ ReplicationMultiUserDevicesResyncRestServlet,
+)
from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
)
@@ -167,8 +169,8 @@ class FederationEventHandler:
self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
if hs.config.worker.worker_app:
- self._user_device_resync = (
- ReplicationUserDevicesResyncRestServlet.make_client(hs)
+ self._multi_user_device_resync = (
+ ReplicationMultiUserDevicesResyncRestServlet.make_client(hs)
)
else:
self._device_list_updater = hs.get_device_handler().device_list_updater
@@ -1487,9 +1489,11 @@ class FederationEventHandler:
# Immediately attempt a resync in the background
if self._config.worker.worker_app:
- await self._user_device_resync(user_id=sender)
+ await self._multi_user_device_resync(user_ids=[sender])
else:
- await self._device_list_updater.user_device_resync(sender)
+ await self._device_list_updater.multi_user_device_resync(
+ user_ids=[sender]
+ )
except Exception:
logger.exception("Failed to resync device for %s", sender)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index cc3929dcf5..f874f072f9 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -28,62 +28,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
- """Ask master to resync the device list for a user by contacting their
- server.
-
- This must happen on master so that the results can be correctly cached in
- the database and streamed to workers.
-
- Request format:
-
- POST /_synapse/replication/user_device_resync/:user_id
-
- {}
-
- Response is equivalent to ` /_matrix/federation/v1/user/devices/:user_id`
- response, e.g.:
-
- {
- "user_id": "@alice:example.org",
- "devices": [
- {
- "device_id": "JLAFKJWSCS",
- "keys": { ... },
- "device_display_name": "Alice's Mobile Phone"
- }
- ]
- }
- """
-
- NAME = "user_device_resync"
- PATH_ARGS = ("user_id",)
- CACHE = False
-
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
-
- from synapse.handlers.device import DeviceHandler
-
- handler = hs.get_device_handler()
- assert isinstance(handler, DeviceHandler)
- self.device_list_updater = handler.device_list_updater
-
- self.store = hs.get_datastores().main
- self.clock = hs.get_clock()
-
- @staticmethod
- async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override]
- return {}
-
- async def _handle_request( # type: ignore[override]
- self, request: Request, content: JsonDict, user_id: str
- ) -> Tuple[int, Optional[JsonDict]]:
- user_devices = await self.device_list_updater.user_device_resync(user_id)
-
- return 200, user_devices
-
-
class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint):
"""Ask master to resync the device list for multiple users from the same
remote server by contacting their server.
@@ -216,6 +160,5 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
- ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationMultiUserDevicesResyncRestServlet(hs).register(http_server)
ReplicationUploadKeysForUserRestServlet(hs).register(http_server)
|