diff --git a/changelog.d/12365.feature b/changelog.d/12365.feature
new file mode 100644
index 0000000000..642dea966c
--- /dev/null
+++ b/changelog.d/12365.feature
@@ -0,0 +1 @@
+Enable processing of device list updates asynchronously.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 415279d269..d771045b52 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -680,14 +680,6 @@ class ServerConfig(Config):
config.get("use_account_validity_in_account_status") or False
)
- # This is a temporary option that enables fully using the new
- # `device_lists_changes_in_room` without the backwards compat code. This
- # is primarily for testing. If enabled the server should *not* be
- # downgraded, as it may lead to missing device list updates.
- self.use_new_device_lists_changes_in_room = (
- config.get("use_new_device_lists_changes_in_room") or False
- )
-
self.rooms_to_exclude_from_sync: List[str] = (
config.get("exclude_rooms_from_sync") or []
)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ffa28b2a30..958599e7b8 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -291,12 +291,6 @@ class DeviceHandler(DeviceWorkerHandler):
# On start up check if there are any updates pending.
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
- # Used to decide if we calculate outbound pokes up front or not. By
- # default we do to allow safely downgrading Synapse.
- self.use_new_device_lists_changes_in_room = (
- hs.config.server.use_new_device_lists_changes_in_room
- )
-
def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
@@ -490,23 +484,9 @@ class DeviceHandler(DeviceWorkerHandler):
room_ids = await self.store.get_rooms_for_user(user_id)
- hosts: Optional[Set[str]] = None
- if not self.use_new_device_lists_changes_in_room:
- hosts = set()
-
- if self.hs.is_mine_id(user_id):
- for room_id in room_ids:
- joined_users = await self.store.get_users_in_room(room_id)
- hosts.update(get_domain_from_id(u) for u in joined_users)
-
- set_tag("target_hosts", hosts)
-
- hosts.discard(self.server_name)
-
position = await self.store.add_device_change_to_streams(
user_id,
device_ids,
- hosts=hosts,
room_ids=room_ids,
)
@@ -528,14 +508,6 @@ class DeviceHandler(DeviceWorkerHandler):
# We may need to do some processing asynchronously.
self._handle_new_device_update_async()
- if hosts:
- logger.info(
- "Sending device list update notif for %r to: %r", user_id, hosts
- )
- for host in hosts:
- self.federation_sender.send_device_messages(host, immediate=False)
- log_kv({"message": "sent device update to host", "host": host})
-
async def notify_user_signature_update(
self, from_user_id: str, user_ids: List[str]
) -> None:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index dc8009b23d..74e4e2122a 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self,
user_id: str,
device_ids: Collection[str],
- hosts: Optional[Collection[str]],
room_ids: Collection[str],
) -> Optional[int]:
"""Persist that a user's devices have been updated, and which hosts
@@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
user_id: The ID of the user whose device changed.
device_ids: The IDs of any changed devices. If empty, this function will
return None.
- hosts: The remote destinations that should be notified of the change. If
- None then the set of hosts have *not* been calculated, and will be
- calculated later by a background task.
room_ids: The rooms that the user is in
Returns:
@@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
context = get_active_span_text_map()
- def add_device_changes_txn(
- txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
- ):
+ def add_device_changes_txn(txn, stream_ids):
self._add_device_change_to_stream_txn(
txn,
user_id,
device_ids,
- stream_ids_for_device_change,
+ stream_ids,
)
self._add_device_outbound_room_poke_txn(
@@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
user_id,
device_ids,
room_ids,
- stream_ids_for_device_change,
- context,
- hosts_have_been_calculated=hosts is not None,
- )
-
- # If the set of hosts to send to has not been calculated yet (and so
- # `hosts` is None) or there are no `hosts` to send to, then skip
- # trying to persist them to the DB.
- if not hosts:
- return
-
- self._add_device_outbound_poke_to_stream_txn(
- txn,
- user_id,
- device_ids,
- hosts,
- stream_ids_for_outbound_pokes,
+ stream_ids,
context,
)
- # `device_lists_stream` wants a stream ID per device update.
- num_stream_ids = len(device_ids)
-
- if hosts:
- # `device_lists_outbound_pokes` wants a different stream ID for
- # each row, which is a row per host per device update.
- num_stream_ids += len(hosts) * len(device_ids)
-
- async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
- stream_ids_for_device_change = stream_ids[: len(device_ids)]
- stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]
-
+ async with self._device_list_id_gen.get_next_mult(
+ len(device_ids)
+ ) as stream_ids:
await self.db_pool.runInteraction(
"add_device_change_to_stream",
add_device_changes_txn,
- stream_ids_for_device_change,
- stream_ids_for_outbound_pokes,
+ stream_ids,
)
return stream_ids[-1]
@@ -1752,19 +1720,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
room_ids: Collection[str],
stream_ids: List[str],
context: Dict[str, str],
- hosts_have_been_calculated: bool,
) -> None:
- """Record the user in the room has updated their device.
-
- Args:
- hosts_have_been_calculated: True if `device_lists_outbound_pokes`
- has been updated already with the updates.
- """
-
- # We only need to convert to outbound pokes if they are our user.
- converted_to_destinations = (
- hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
- )
+ """Record the user in the room has updated their device."""
encoded_context = json_encoder.encode(context)
@@ -1789,7 +1746,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
device_id,
room_id,
stream_id,
- converted_to_destinations,
+ False,
encoded_context,
)
for room_id in room_ids
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 151f2aa9bb..871d4ace12 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -66,9 +66,9 @@ Changes in SCHEMA_VERSION = 69:
SCHEMA_COMPAT_VERSION = (
- # we now have `state_key` columns in both `events` and `state_events`, so
- # now incompatible with synapses wth SCHEMA_VERSION < 66.
- 66
+ # We now assume that `device_lists_changes_in_room` has been filled out for
+ # recent device_list_updates.
+ 69
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index a6e91956af..63ea4f9ee4 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -14,7 +14,6 @@
from typing import Optional
from unittest.mock import Mock
-from parameterized import parameterized_class
from signedjson import key, sign
from signedjson.types import BaseKey, SigningKey
@@ -155,12 +154,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
)
-@parameterized_class(
- [
- {"enable_room_poke_code_path": False},
- {"enable_room_poke_code_path": True},
- ]
-)
class FederationSenderDevicesTestCases(HomeserverTestCase):
servlets = [
admin.register_servlets,
@@ -175,7 +168,6 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
def default_config(self):
c = super().default_config()
c["send_federation"] = True
- c["use_new_device_lists_changes_in_room"] = self.enable_room_poke_code_path
return c
def prepare(self, reactor, clock, hs):
diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py
index d1227dd4ac..5491fbf6da 100644
--- a/tests/storage/test_devices.py
+++ b/tests/storage/test_devices.py
@@ -21,6 +21,29 @@ class DeviceStoreTestCase(HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.store = hs.get_datastores().main
+ def add_device_change(self, user_id, device_ids, host):
+ """Add a device list change for the given device to
+ `device_lists_outbound_pokes` table.
+ """
+
+ for device_id in device_ids:
+ stream_id = self.get_success(
+ self.store.add_device_change_to_streams(
+ "user_id", [device_id], ["!some:room"]
+ )
+ )
+
+ self.get_success(
+ self.store.add_device_list_outbound_pokes(
+ user_id=user_id,
+ device_id=device_id,
+ room_id="!some:room",
+ stream_id=stream_id,
+ hosts=[host],
+ context={},
+ )
+ )
+
def test_store_new_device(self):
self.get_success(
self.store.store_device("user_id", "device_id", "display_name")
@@ -95,11 +118,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
device_ids = ["device_id1", "device_id2"]
# Add two device updates with sequential `stream_id`s
- self.get_success(
- self.store.add_device_change_to_streams(
- "user_id", device_ids, ["somehost"], ["!some:room"]
- )
- )
+ self.add_device_change("user_id", device_ids, "somehost")
# Get all device updates ever meant for this remote
now_stream_id, device_updates = self.get_success(
@@ -123,11 +142,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
"device_id4",
"device_id5",
]
- self.get_success(
- self.store.add_device_change_to_streams(
- "user_id", device_ids, ["somehost"], ["!some:room"]
- )
- )
+ self.add_device_change("user_id", device_ids, "somehost")
# Get device updates meant for this remote
next_stream_id, device_updates = self.get_success(
@@ -147,11 +162,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
# Add some more device updates to ensure it still resumes properly
device_ids = ["device_id6", "device_id7"]
- self.get_success(
- self.store.add_device_change_to_streams(
- "user_id", device_ids, ["somehost"], ["!some:room"]
- )
- )
+ self.add_device_change("user_id", device_ids, "somehost")
# Get the next batch of device updates
next_stream_id, device_updates = self.get_success(
@@ -224,11 +235,7 @@ class DeviceStoreTestCase(HomeserverTestCase):
"fakeSelfSigning",
]
- self.get_success(
- self.store.add_device_change_to_streams(
- "@user_id:test", device_ids, ["somehost"], ["!some:room"]
- )
- )
+ self.add_device_change("@user_id:test", device_ids, "somehost")
# Get device updates meant for this remote
next_stream_id, device_updates = self.get_success(
|