diff options
author | Erik Johnston <erikj@element.io> | 2024-06-25 10:34:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-25 10:34:34 +0100 |
commit | 554a92601a4bf61f9076adfffb613a2c19871446 (patch) | |
tree | 49f58868fb911592b76dc55251c9fb267d999589 /synapse/replication/tcp/client.py | |
parent | Revert "Reduce device lists replication traffic." (#17360) (diff) | |
download | synapse-554a92601a4bf61f9076adfffb613a2c19871446.tar.xz |
Reintroduce "Reduce device lists replication traffic."" (#17361)
Reintroduces https://github.com/element-hq/synapse/pull/17333 Turns out the reason for revert was down two master instances running
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r-- | synapse/replication/tcp/client.py | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2d6d49eed7..3dddbb70b4 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -114,13 +114,19 @@ class ReplicationDataHandler: """ all_room_ids: Set[str] = set() if stream_name == DeviceListsStream.NAME: - if any(row.entity.startswith("@") and not row.is_signature for row in rows): + if any(not row.is_signature and not row.hosts_calculated for row in rows): prev_token = self.store.get_device_stream_token() all_room_ids = await self.store.get_all_device_list_changes( prev_token, token ) self.store.device_lists_in_rooms_have_changed(all_room_ids, token) + # If we're sending federation we need to update the device lists + # outbound pokes stream change cache with updated hosts. + if self.send_handler and any(row.hosts_calculated for row in rows): + hosts = await self.store.get_destinations_for_device(token) + self.store.device_lists_outbound_pokes_have_changed(hosts, token) + self.store.process_replication_rows(stream_name, instance_name, token, rows) # NOTE: this must be called after process_replication_rows to ensure any # cache invalidations are first handled before any stream ID advances. @@ -433,12 +439,11 @@ class FederationSenderHandler: # The entities are either user IDs (starting with '@') whose devices # have changed, or remote servers that we need to tell about # changes. - hosts = { - row.entity - for row in rows - if not row.entity.startswith("@") and not row.is_signature - } - await self.federation_sender.send_device_messages(hosts, immediate=False) + if any(row.hosts_calculated for row in rows): + hosts = await self.store.get_destinations_for_device(token) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local |