diff options
author | Erik Johnston <erik@matrix.org> | 2022-04-12 16:50:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-12 16:50:40 +0100 |
commit | aa2811026402394b4013033f075d8f509cdc1257 (patch) | |
tree | f635e1a630735ed5e875be51c1a279f1d1bd7b2b /synapse/storage | |
parent | Remove the unstable event field for `/send_join` per MSC3083. (#12395) (diff) | |
download | synapse-aa2811026402394b4013033f075d8f509cdc1257.tar.xz |
Process device list updates asynchronously (#12365)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/devices.py | 61 | ||||
-rw-r--r-- | synapse/storage/schema/__init__.py | 6 |
2 files changed, 12 insertions, 55 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index dc8009b23d..74e4e2122a 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1582,7 +1582,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self, user_id: str, device_ids: Collection[str], - hosts: Optional[Collection[str]], room_ids: Collection[str], ) -> Optional[int]: """Persist that a user's devices have been updated, and which hosts @@ -1592,9 +1591,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id: The ID of the user whose device changed. device_ids: The IDs of any changed devices. If empty, this function will return None. - hosts: The remote destinations that should be notified of the change. If - None then the set of hosts have *not* been calculated, and will be - calculated later by a background task. room_ids: The rooms that the user is in Returns: @@ -1606,14 +1602,12 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): context = get_active_span_text_map() - def add_device_changes_txn( - txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes - ): + def add_device_changes_txn(txn, stream_ids): self._add_device_change_to_stream_txn( txn, user_id, device_ids, - stream_ids_for_device_change, + stream_ids, ) self._add_device_outbound_room_poke_txn( @@ -1621,43 +1615,17 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): user_id, device_ids, room_ids, - stream_ids_for_device_change, - context, - hosts_have_been_calculated=hosts is not None, - ) - - # If the set of hosts to send to has not been calculated yet (and so - # `hosts` is None) or there are no `hosts` to send to, then skip - # trying to persist them to the DB. - if not hosts: - return - - self._add_device_outbound_poke_to_stream_txn( - txn, - user_id, - device_ids, - hosts, - stream_ids_for_outbound_pokes, + stream_ids, context, ) - # `device_lists_stream` wants a stream ID per device update. - num_stream_ids = len(device_ids) - - if hosts: - # `device_lists_outbound_pokes` wants a different stream ID for - # each row, which is a row per host per device update. - num_stream_ids += len(hosts) * len(device_ids) - - async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids: - stream_ids_for_device_change = stream_ids[: len(device_ids)] - stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :] - + async with self._device_list_id_gen.get_next_mult( + len(device_ids) + ) as stream_ids: await self.db_pool.runInteraction( "add_device_change_to_stream", add_device_changes_txn, - stream_ids_for_device_change, - stream_ids_for_outbound_pokes, + stream_ids, ) return stream_ids[-1] @@ -1752,19 +1720,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): room_ids: Collection[str], stream_ids: List[str], context: Dict[str, str], - hosts_have_been_calculated: bool, ) -> None: - """Record the user in the room has updated their device. - - Args: - hosts_have_been_calculated: True if `device_lists_outbound_pokes` - has been updated already with the updates. - """ - - # We only need to convert to outbound pokes if they are our user. - converted_to_destinations = ( - hosts_have_been_calculated or not self.hs.is_mine_id(user_id) - ) + """Record the user in the room has updated their device.""" encoded_context = json_encoder.encode(context) @@ -1789,7 +1746,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_id, room_id, stream_id, - converted_to_destinations, + False, encoded_context, ) for room_id in room_ids diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 151f2aa9bb..871d4ace12 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -66,9 +66,9 @@ Changes in SCHEMA_VERSION = 69: SCHEMA_COMPAT_VERSION = ( - # we now have `state_key` columns in both `events` and `state_events`, so - # now incompatible with synapses wth SCHEMA_VERSION < 66. - 66 + # We now assume that `device_lists_changes_in_room` has been filled out for + # recent device_list_updates. + 69 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat |