summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12365.feature1
-rw-r--r--synapse/config/server.py8
-rw-r--r--synapse/handlers/device.py28
-rw-r--r--synapse/storage/databases/main/devices.py61
-rw-r--r--synapse/storage/schema/__init__.py6
-rw-r--r--tests/federation/test_federation_sender.py8
-rw-r--r--tests/storage/test_devices.py47
7 files changed, 40 insertions, 119 deletions
diff --git a/changelog.d/12365.feature b/changelog.d/12365.feature
new file mode 100644
index 0000000000..642dea966c
--- /dev/null
+++ b/changelog.d/12365.feature
@@ -0,0 +1 @@
+Enable processing of device list updates asynchronously.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 415279d269..d771045b52 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -680,14 +680,6 @@ class ServerConfig(Config):
             config.get("use_account_validity_in_account_status") or False
         )
 
-        # This is a temporary option that enables fully using the new
-        # `device_lists_changes_in_room` without the backwards compat code. This
-        # is primarily for testing. If enabled the server should *not* be
-        # downgraded, as it may lead to missing device list updates.
-        self.use_new_device_lists_changes_in_room = (
-            config.get("use_new_device_lists_changes_in_room") or False
-        )
-
         self.rooms_to_exclude_from_sync: List[str] = (
             config.get("exclude_rooms_from_sync") or []
         )
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ffa28b2a30..958599e7b8 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -291,12 +291,6 @@ class DeviceHandler(DeviceWorkerHandler):
         # On start up check if there are any updates pending.
         hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
 
-        # Used to decide if we calculate outbound pokes up front or not. By
-        # default we do to allow safely downgrading Synapse.
-        self.use_new_device_lists_changes_in_room = (
-            hs.config.server.use_new_device_lists_changes_in_room
-        )
-
     def _check_device_name_length(self, name: Optional[str]) -> None:
         """
         Checks whether a device name is longer than the maximum allowed length.
@@ -490,23 +484,9 @@ class DeviceHandler(DeviceWorkerHandler):
 
         room_ids = await self.store.get_rooms_for_user(user_id)
 
-        hosts: Optional[Set[str]] = None
-        if not self.use_new_device_lists_changes_in_room:
-            hosts = set()
-
-            if self.hs.is_mine_id(user_id):
-                for room_id in room_ids:
-                    joined_users = await self.store.get_users_in_room(room_id)
-                    hosts.update(get_domain_from_id(u) for u in joined_users)
-
-                set_tag("target_hosts", hosts)
-
-                hosts.discard(self.server_name)
-
         position = await self.store.add_device_change_to_streams(
             user_id,
             device_ids,
-            hosts=hosts,
             room_ids=room_ids,
         )
 
@@ -528,14 +508,6 @@ class DeviceHandler(DeviceWorkerHandler):
         # We may need to do some processing asynchronously.
         self._handle_new_device_update_async()
 
-        if hosts:
-            logger.info(
-                "Sending device list update notif for %r to: %r", user_id, hosts
-            )
-            for host in hosts:
-                self.federation_sender.send_device_messages(host, immediate=False)
-                log_kv({"message": "sent device update to host", "host": host})
-
     async def notify_user_signature_update(
         self, from_user_id: str, user_ids: List[str]
     ) -> None:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index dc8009b23d..74e4e2122a 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         self,
         user_id: str,
         device_ids: Collection[str],
-        hosts: Optional[Collection[str]],
         room_ids: Collection[str],
     ) -> Optional[int]:
         """Persist that a user's devices have been updated, and which hosts
@@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             user_id: The ID of the user whose device changed.
             device_ids: The IDs of any changed devices. If empty, this function will
                 return None.
-            hosts: The remote destinations that should be notified of the change. If
-                None then the set of hosts have *not* been calculated, and will be
-                calculated later by a background task.
             room_ids: The rooms that the user is in
 
         Returns:
@@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         context = get_active_span_text_map()
 
-        def add_device_changes_txn(
-            txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
-        ):
+        def add_device_changes_txn(txn, stream_ids):
             self._add_device_change_to_stream_txn(
                 txn,
                 user_id,
                 device_ids,
-                stream_ids_for_device_change,
+                stream_ids,
             )
 
             self._add_device_outbound_room_poke_txn(
@@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 user_id,
                 device_ids,
                 room_ids,
-                stream_ids_for_device_change,
-                context,
-                hosts_have_been_calculated=hosts is not None,
-            )
-
-            # If the set of hosts to send to has not been calculated yet (and so
-            # `hosts` is None) or there are no `hosts` to send to, then skip
-            # trying to persist them to the DB.
-            if not hosts:
-                return
-
-            self._add_device_outbound_poke_to_stream_txn(
-                txn,
-                user_id,
-                device_ids,
-                hosts,
-                stream_ids_for_outbound_pokes,
+                stream_ids,
                 context,
             )
 
-        # `device_lists_stream` wants a stream ID per device update.
-        num_stream_ids = len(device_ids)
-
-        if hosts:
-            # `device_lists_outbound_pokes` wants a different stream ID for
-            # each row, which is a row per host per device update.
-            num_stream_ids += len(hosts) * len(device_ids)
-
-        async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
-            stream_ids_for_device_change = stream_ids[: len(device_ids)]
-            stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]
-
+        async with self._device_list_id_gen.get_next_mult(
+            len(device_ids)
+        ) as stream_ids:
             await self.db_pool.runInteraction(
                 "add_device_change_to_stream",
                 add_device_changes_txn,
-                stream_ids_for_device_change,
-                stream_ids_for_outbound_pokes,
+                stream_ids,
             )
 
         return stream_ids[-1]
@@ -1752,19 +1720,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         room_ids: Collection[str],
         stream_ids: List[str],
         context: Dict[str, str],
-        hosts_have_been_calculated: bool,
     ) -> None:
-        """Record the user in the room has updated their device.
-
-        Args:
-            hosts_have_been_calculated: True if `device_lists_outbound_pokes`
-                has been updated already with the updates.
-        """
-
-        # We only need to convert to outbound pokes if they are our user.
-        converted_to_destinations = (
-            hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
-        )
+        """Record the user in the room has updated their device."""
 
         encoded_context = json_encoder.encode(context)
 
@@ -1789,7 +1746,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                     device_id,
                     room_id,
                     stream_id,
-                    converted_to_destinations,
+                    False,
                     encoded_context,
                 )
                 for room_id in room_ids
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 151f2aa9bb..871d4ace12 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -66,9 +66,9 @@ Changes in SCHEMA_VERSION = 69:
 
 
 SCHEMA_COMPAT_VERSION = (
-    # we now have `state_key` columns in both `events` and `state_events`, so
-    # now incompatible with synapses wth SCHEMA_VERSION < 66.
-    66
+    # We now assume that `device_lists_changes_in_room` has been filled out for
+    # recent device_list_updates.
+    69
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index a6e91956af..63ea4f9ee4 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -14,7 +14,6 @@
 from typing import Optional
 from unittest.mock import Mock
 
-from parameterized import parameterized_class
 from signedjson import key, sign
 from signedjson.types import BaseKey, SigningKey
 
@@ -155,12 +154,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
         )
 
 
-@parameterized_class(
-    [
-        {"enable_room_poke_code_path": False},
-        {"enable_room_poke_code_path": True},
-    ]
-)
 class FederationSenderDevicesTestCases(HomeserverTestCase):
     servlets = [
         admin.register_servlets,
@@ -175,7 +168,6 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
     def default_config(self):
         c = super().default_config()
         c["send_federation"] = True
-        c["use_new_device_lists_changes_in_room"] = self.enable_room_poke_code_path
         return c
 
     def prepare(self, reactor, clock, hs):
diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py
index d1227dd4ac..5491fbf6da 100644
--- a/tests/storage/test_devices.py
+++ b/tests/storage/test_devices.py
@@ -21,6 +21,29 @@ class DeviceStoreTestCase(HomeserverTestCase):
     def prepare(self, reactor, clock, hs):
         self.store = hs.get_datastores().main
 
+    def add_device_change(self, user_id, device_ids, host):
+        """Add a device list change for the given device to
+        `device_lists_outbound_pokes` table.
+        """
+
+        for device_id in device_ids:
+            stream_id = self.get_success(
+                self.store.add_device_change_to_streams(
+                    "user_id", [device_id], ["!some:room"]
+                )
+            )
+
+            self.get_success(
+                self.store.add_device_list_outbound_pokes(
+                    user_id=user_id,
+                    device_id=device_id,
+                    room_id="!some:room",
+                    stream_id=stream_id,
+                    hosts=[host],
+                    context={},
+                )
+            )
+
     def test_store_new_device(self):
         self.get_success(
             self.store.store_device("user_id", "device_id", "display_name")
@@ -95,11 +118,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
         device_ids = ["device_id1", "device_id2"]
 
         # Add two device updates with sequential `stream_id`s
-        self.get_success(
-            self.store.add_device_change_to_streams(
-                "user_id", device_ids, ["somehost"], ["!some:room"]
-            )
-        )
+        self.add_device_change("user_id", device_ids, "somehost")
 
         # Get all device updates ever meant for this remote
         now_stream_id, device_updates = self.get_success(
@@ -123,11 +142,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
             "device_id4",
             "device_id5",
         ]
-        self.get_success(
-            self.store.add_device_change_to_streams(
-                "user_id", device_ids, ["somehost"], ["!some:room"]
-            )
-        )
+        self.add_device_change("user_id", device_ids, "somehost")
 
         # Get device updates meant for this remote
         next_stream_id, device_updates = self.get_success(
@@ -147,11 +162,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
 
         # Add some more device updates to ensure it still resumes properly
         device_ids = ["device_id6", "device_id7"]
-        self.get_success(
-            self.store.add_device_change_to_streams(
-                "user_id", device_ids, ["somehost"], ["!some:room"]
-            )
-        )
+        self.add_device_change("user_id", device_ids, "somehost")
 
         # Get the next batch of device updates
         next_stream_id, device_updates = self.get_success(
@@ -224,11 +235,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
             "fakeSelfSigning",
         ]
 
-        self.get_success(
-            self.store.add_device_change_to_streams(
-                "@user_id:test", device_ids, ["somehost"], ["!some:room"]
-            )
-        )
+        self.add_device_change("@user_id:test", device_ids, "somehost")
 
         # Get device updates meant for this remote
         next_stream_id, device_updates = self.get_success(