diff options
author | Erik Johnston <erikj@element.io> | 2024-06-25 10:34:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-25 10:34:34 +0100 |
commit | 554a92601a4bf61f9076adfffb613a2c19871446 (patch) | |
tree | 49f58868fb911592b76dc55251c9fb267d999589 /synapse/replication/tcp | |
parent | Revert "Reduce device lists replication traffic." (#17360) (diff) | |
download | synapse-554a92601a4bf61f9076adfffb613a2c19871446.tar.xz |
Reintroduce "Reduce device lists replication traffic."" (#17361)
Reintroduces https://github.com/element-hq/synapse/pull/17333 Turns out the reason for revert was down two master instances running
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/client.py | 19 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 12 |
2 files changed, 20 insertions, 11 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2d6d49eed7..3dddbb70b4 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -114,13 +114,19 @@ class ReplicationDataHandler: """ all_room_ids: Set[str] = set() if stream_name == DeviceListsStream.NAME: - if any(row.entity.startswith("@") and not row.is_signature for row in rows): + if any(not row.is_signature and not row.hosts_calculated for row in rows): prev_token = self.store.get_device_stream_token() all_room_ids = await self.store.get_all_device_list_changes( prev_token, token ) self.store.device_lists_in_rooms_have_changed(all_room_ids, token) + # If we're sending federation we need to update the device lists + # outbound pokes stream change cache with updated hosts. + if self.send_handler and any(row.hosts_calculated for row in rows): + hosts = await self.store.get_destinations_for_device(token) + self.store.device_lists_outbound_pokes_have_changed(hosts, token) + self.store.process_replication_rows(stream_name, instance_name, token, rows) # NOTE: this must be called after process_replication_rows to ensure any # cache invalidations are first handled before any stream ID advances. @@ -433,12 +439,11 @@ class FederationSenderHandler: # The entities are either user IDs (starting with '@') whose devices # have changed, or remote servers that we need to tell about # changes. - hosts = { - row.entity - for row in rows - if not row.entity.startswith("@") and not row.is_signature - } - await self.federation_sender.send_device_messages(hosts, immediate=False) + if any(row.hosts_calculated for row in rows): + hosts = await self.store.get_destinations_for_device(token) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 661206c841..d021904de7 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -549,10 +549,14 @@ class DeviceListsStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) class DeviceListsStreamRow: - entity: str + user_id: str # Indicates that a user has signed their own device with their user-signing key is_signature: bool + # Indicates if this is a notification that we've calculated the hosts we + # need to send the update to. + hosts_calculated: bool + NAME = "device_lists" ROW_TYPE = DeviceListsStreamRow @@ -594,13 +598,13 @@ class DeviceListsStream(_StreamFromIdGen): upper_limit_token = min(upper_limit_token, signatures_to_token) device_updates = [ - (stream_id, (entity, False)) - for stream_id, (entity,) in device_updates + (stream_id, (entity, False, hosts)) + for stream_id, (entity, hosts) in device_updates if stream_id <= upper_limit_token ] signatures_updates = [ - (stream_id, (entity, True)) + (stream_id, (entity, True, False)) for stream_id, (entity,) in signatures_updates if stream_id <= upper_limit_token ] |