diff options
author | Erik Johnston <erik@matrix.org> | 2022-04-12 16:50:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-12 16:50:40 +0100 |
commit | aa2811026402394b4013033f075d8f509cdc1257 (patch) | |
tree | f635e1a630735ed5e875be51c1a279f1d1bd7b2b /synapse | |
parent | Remove the unstable event field for `/send_join` per MSC3083. (#12395) (diff) | |
download | synapse-aa2811026402394b4013033f075d8f509cdc1257.tar.xz |
Process device list updates asynchronously (#12365)
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/server.py | 8 | ||||
-rw-r--r-- | synapse/handlers/device.py | 28 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 61 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 6 |
4 files changed, 12 insertions, 91 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py index 415279d269..d771045b52 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -680,14 +680,6 @@ class ServerConfig(Config): config.get("use_account_validity_in_account_status") or False ) - # This is a temporary option that enables fully using the new - # `device_lists_changes_in_room` without the backwards compat code. This - # is primarily for testing. If enabled the server should *not* be - # downgraded, as it may lead to missing device list updates. - self.use_new_device_lists_changes_in_room = ( - config.get("use_new_device_lists_changes_in_room") or False - ) - self.rooms_to_exclude_from_sync: List[str] = ( config.get("exclude_rooms_from_sync") or [] ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ffa28b2a30..958599e7b8 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -291,12 +291,6 @@ class DeviceHandler(DeviceWorkerHandler): # On start up check if there are any updates pending. hs.get_reactor().callWhenRunning(self._handle_new_device_update_async) - # Used to decide if we calculate outbound pokes up front or not. By - # default we do to allow safely downgrading Synapse. - self.use_new_device_lists_changes_in_room = ( - hs.config.server.use_new_device_lists_changes_in_room - ) - def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -490,23 +484,9 @@ class DeviceHandler(DeviceWorkerHandler): room_ids = await self.store.get_rooms_for_user(user_id) - hosts: Optional[Set[str]] = None - if not self.use_new_device_lists_changes_in_room: - hosts = set() - - if self.hs.is_mine_id(user_id): - for room_id in room_ids: - joined_users = await self.store.get_users_in_room(room_id) - hosts.update(get_domain_from_id(u) for u in joined_users) - - set_tag("target_hosts", hosts) - - hosts.discard(self.server_name) - position = await self.store.add_device_change_to_streams( user_id, device_ids, - hosts=hosts, room_ids=room_ids, ) @@ -528,14 +508,6 @@ class DeviceHandler(DeviceWorkerHandler): # We may need to do some processing asynchronously. self._handle_new_device_update_async() - if hosts: - logger.info( - "Sending device list update notif for %r to: %r", user_id, hosts - ) - for host in hosts: - self.federation_sender.send_device_messages(host, immediate=False) - log_kv({"message": "sent device update to host", "host": host}) - async def notify_user_signature_update( self, from_user_id: str, user_ids: List[str] ) -> None: diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index dc8009b23d..74e4e2122a 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self, user_id: str, device_ids: Collection[str], - hosts: Optional[Collection[str]], room_ids: Collection[str], ) -> Optional[int]: """Persist that a user's devices have been updated, and which hosts @@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: The ID of the user whose device changed. device_ids: The IDs of any changed devices. If empty, this function will return None. - hosts: The remote destinations that should be notified of the change. If - None then the set of hosts have *not* been calculated, and will be - calculated later by a background task. room_ids: The rooms that the user is in Returns: @@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): context = get_active_span_text_map() - def add_device_changes_txn( - txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes - ): + def add_device_changes_txn(txn, stream_ids): self._add_device_change_to_stream_txn( txn, user_id, device_ids, - stream_ids_for_device_change, + stream_ids, ) self._add_device_outbound_room_poke_txn( @@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id, device_ids, room_ids, - stream_ids_for_device_change, - context, - hosts_have_been_calculated=hosts is not None, - ) - - # If the set of hosts to send to has not been calculated yet (and so - # `hosts` is None) or there are no `hosts` to send to, then skip - # trying to persist them to the DB. - if not hosts: - return - - self._add_device_outbound_poke_to_stream_txn( - txn, - user_id, - device_ids, - hosts, - stream_ids_for_outbound_pokes, + stream_ids, context, ) - # `device_lists_stream` wants a stream ID per device update. - num_stream_ids = len(device_ids) - - if hosts: - # `device_lists_outbound_pokes` wants a different stream ID for - # each row, which is a row per host per device update. - num_stream_ids += len(hosts) * len(device_ids) - - async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids: - stream_ids_for_device_change = stream_ids[: len(device_ids)] - stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :] - + async with self._device_list_id_gen.get_next_mult( + len(device_ids) + ) as stream_ids: await self.db_pool.runInteraction( "add_device_change_to_stream", add_device_changes_txn, - stream_ids_for_device_change, - stream_ids_for_outbound_pokes, + stream_ids, ) return stream_ids[-1] @@ -1752,19 +1720,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): room_ids: Collection[str], stream_ids: List[str], context: Dict[str, str], - hosts_have_been_calculated: bool, ) -> None: - """Record the user in the room has updated their device. - - Args: - hosts_have_been_calculated: True if `device_lists_outbound_pokes` - has been updated already with the updates. - """ - - # We only need to convert to outbound pokes if they are our user. - converted_to_destinations = ( - hosts_have_been_calculated or not self.hs.is_mine_id(user_id) - ) + """Record the user in the room has updated their device.""" encoded_context = json_encoder.encode(context) @@ -1789,7 +1746,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_id, room_id, stream_id, - converted_to_destinations, + False, encoded_context, ) for room_id in room_ids diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 151f2aa9bb..871d4ace12 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -66,9 +66,9 @@ Changes in SCHEMA_VERSION = 69: SCHEMA_COMPAT_VERSION = ( - # we now have `state_key` columns in both `events` and `state_events`, so - # now incompatible with synapses wth SCHEMA_VERSION < 66. - 66 + # We now assume that `device_lists_changes_in_room` has been filled out for + # recent device_list_updates. + 69 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat |