summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-01-07 18:06:52 +0000
committerGitHub <noreply@github.com>2021-01-07 18:06:52 +0000
commite34df813ce96168d6bd0ee6c2888122a127c1773 (patch)
tree0b8171fa4c893004515251bead5c0b347cdeec26
parentSome cleanups to device inbox store. (#9041) (diff)
downloadsynapse-e34df813ce96168d6bd0ee6c2888122a127c1773.tar.xz
Ensure that remote users' device list resyncing always happens on master (#9043)
Currently `DeviceMessageHandler` only ever exists on master, but that is about to change.
Diffstat (limited to '')
-rw-r--r--changelog.d/9043.feature1
-rw-r--r--synapse/handlers/devicemessage.py17
2 files changed, 14 insertions, 4 deletions
diff --git a/changelog.d/9043.feature b/changelog.d/9043.feature
new file mode 100644
index 0000000000..4ec319f1f2
--- /dev/null
+++ b/changelog.d/9043.feature
@@ -0,0 +1 @@
+Add experimental support for handling and persistence of to-device messages to happen on worker processes.
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 9cac5a8463..eb10d2b4bd 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -24,6 +24,7 @@ from synapse.logging.opentracing import (
     set_tag,
     start_active_span,
 )
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util import json_encoder
 from synapse.util.stringutils import random_string
@@ -50,7 +51,17 @@ class DeviceMessageHandler:
             "m.direct_to_device", self.on_direct_to_device_edu
         )
 
-        self._device_list_updater = hs.get_device_handler().device_list_updater
+        # The handler to call when we think a user's device list might be out of
+        # sync. We do all device list resyncing on the master instance, so if
+        # we're on a worker we hit the device resync replication API.
+        if hs.config.worker.worker_app is None:
+            self._user_device_resync = (
+                hs.get_device_handler().device_list_updater.user_device_resync
+            )
+        else:
+            self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
 
     async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
         local_messages = {}
@@ -138,9 +149,7 @@ class DeviceMessageHandler:
             await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
 
             # Immediately attempt a resync in the background
-            run_in_background(
-                self._device_list_updater.user_device_resync, sender_user_id
-            )
+            run_in_background(self._user_device_resync, sender_user_id)
 
     async def send_device_message(
         self,