summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-04-12 16:50:40 +0100
committerGitHub <noreply@github.com>2022-04-12 16:50:40 +0100
commitaa2811026402394b4013033f075d8f509cdc1257 (patch)
treef635e1a630735ed5e875be51c1a279f1d1bd7b2b /synapse/storage/databases
parentRemove the unstable event field for `/send_join` per MSC3083. (#12395) (diff)
downloadsynapse-aa2811026402394b4013033f075d8f509cdc1257.tar.xz
Process device list updates asynchronously (#12365)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/devices.py61
1 files changed, 9 insertions, 52 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index dc8009b23d..74e4e2122a 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         self,
         user_id: str,
         device_ids: Collection[str],
-        hosts: Optional[Collection[str]],
         room_ids: Collection[str],
     ) -> Optional[int]:
         """Persist that a user's devices have been updated, and which hosts
@@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             user_id: The ID of the user whose device changed.
             device_ids: The IDs of any changed devices. If empty, this function will
                 return None.
-            hosts: The remote destinations that should be notified of the change. If
-                None then the set of hosts have *not* been calculated, and will be
-                calculated later by a background task.
             room_ids: The rooms that the user is in
 
         Returns:
@@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         context = get_active_span_text_map()
 
-        def add_device_changes_txn(
-            txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
-        ):
+        def add_device_changes_txn(txn, stream_ids):
             self._add_device_change_to_stream_txn(
                 txn,
                 user_id,
                 device_ids,
-                stream_ids_for_device_change,
+                stream_ids,
             )
 
             self._add_device_outbound_room_poke_txn(
@@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 user_id,
                 device_ids,
                 room_ids,
-                stream_ids_for_device_change,
-                context,
-                hosts_have_been_calculated=hosts is not None,
-            )
-
-            # If the set of hosts to send to has not been calculated yet (and so
-            # `hosts` is None) or there are no `hosts` to send to, then skip
-            # trying to persist them to the DB.
-            if not hosts:
-                return
-
-            self._add_device_outbound_poke_to_stream_txn(
-                txn,
-                user_id,
-                device_ids,
-                hosts,
-                stream_ids_for_outbound_pokes,
+                stream_ids,
                 context,
             )
 
-        # `device_lists_stream` wants a stream ID per device update.
-        num_stream_ids = len(device_ids)
-
-        if hosts:
-            # `device_lists_outbound_pokes` wants a different stream ID for
-            # each row, which is a row per host per device update.
-            num_stream_ids += len(hosts) * len(device_ids)
-
-        async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
-            stream_ids_for_device_change = stream_ids[: len(device_ids)]
-            stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]
-
+        async with self._device_list_id_gen.get_next_mult(
+            len(device_ids)
+        ) as stream_ids:
             await self.db_pool.runInteraction(
                 "add_device_change_to_stream",
                 add_device_changes_txn,
-                stream_ids_for_device_change,
-                stream_ids_for_outbound_pokes,
+                stream_ids,
             )
 
         return stream_ids[-1]
@@ -1752,19 +1720,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         room_ids: Collection[str],
         stream_ids: List[str],
         context: Dict[str, str],
-        hosts_have_been_calculated: bool,
     ) -> None:
-        """Record the user in the room has updated their device.
-
-        Args:
-            hosts_have_been_calculated: True if `device_lists_outbound_pokes`
-                has been updated already with the updates.
-        """
-
-        # We only need to convert to outbound pokes if they are our user.
-        converted_to_destinations = (
-            hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
-        )
+        """Record the user in the room has updated their device."""
 
         encoded_context = json_encoder.encode(context)
 
@@ -1789,7 +1746,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                     device_id,
                     room_id,
                     stream_id,
-                    converted_to_destinations,
+                    False,
                     encoded_context,
                 )
                 for room_id in room_ids