diff options
author | Erik Johnston <erikj@element.io> | 2024-06-24 14:15:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-24 14:15:13 +0100 |
commit | cf711ac03cd88b70568b3ac9df4aed4de5b33523 (patch) | |
tree | 5903fca3c4fa9ae66c23bafcb4746a8c6365d890 /synapse/replication/tcp/client.py | |
parent | Tidy up integer parsing (#17339) (diff) | |
download | synapse-cf711ac03cd88b70568b3ac9df4aed4de5b33523.tar.xz |
Reduce device lists replication traffic. (#17333)
Reduce the replication traffic of device lists, by not sending every destination that needs to be sent the device list update over replication. Instead a "hosts to send to have been calculated" notification over replication, and then federation senders read the destinations from the DB. For non federation senders this should heavily reduce the impact of a user in many large rooms changing a device.
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r-- | synapse/replication/tcp/client.py | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2d6d49eed7..3dddbb70b4 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -114,13 +114,19 @@ class ReplicationDataHandler: """ all_room_ids: Set[str] = set() if stream_name == DeviceListsStream.NAME: - if any(row.entity.startswith("@") and not row.is_signature for row in rows): + if any(not row.is_signature and not row.hosts_calculated for row in rows): prev_token = self.store.get_device_stream_token() all_room_ids = await self.store.get_all_device_list_changes( prev_token, token ) self.store.device_lists_in_rooms_have_changed(all_room_ids, token) + # If we're sending federation we need to update the device lists + # outbound pokes stream change cache with updated hosts. + if self.send_handler and any(row.hosts_calculated for row in rows): + hosts = await self.store.get_destinations_for_device(token) + self.store.device_lists_outbound_pokes_have_changed(hosts, token) + self.store.process_replication_rows(stream_name, instance_name, token, rows) # NOTE: this must be called after process_replication_rows to ensure any # cache invalidations are first handled before any stream ID advances. @@ -433,12 +439,11 @@ class FederationSenderHandler: # The entities are either user IDs (starting with '@') whose devices # have changed, or remote servers that we need to tell about # changes. - hosts = { - row.entity - for row in rows - if not row.entity.startswith("@") and not row.is_signature - } - await self.federation_sender.send_device_messages(hosts, immediate=False) + if any(row.hosts_calculated for row in rows): + hosts = await self.store.get_destinations_for_device(token) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local |