summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-06-24 14:15:13 +0100
committerGitHub <noreply@github.com>2024-06-24 14:15:13 +0100
commitcf711ac03cd88b70568b3ac9df4aed4de5b33523 (patch)
tree5903fca3c4fa9ae66c23bafcb4746a8c6365d890 /synapse/replication
parentTidy up integer parsing (#17339) (diff)
downloadsynapse-cf711ac03cd88b70568b3ac9df4aed4de5b33523.tar.xz
Reduce device lists replication traffic. (#17333)
Reduce the replication traffic of device lists, by not sending every
destination that needs to be sent the device list update over
replication. Instead a "hosts to send to have been calculated"
notification over replication, and then federation senders read the
destinations from the DB.

For non federation senders this should heavily reduce the impact of a
user in many large rooms changing a device.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/client.py19
-rw-r--r--synapse/replication/tcp/streams/_base.py12
2 files changed, 20 insertions, 11 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 2d6d49eed7..3dddbb70b4 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -114,13 +114,19 @@ class ReplicationDataHandler:
         """
         all_room_ids: Set[str] = set()
         if stream_name == DeviceListsStream.NAME:
-            if any(row.entity.startswith("@") and not row.is_signature for row in rows):
+            if any(not row.is_signature and not row.hosts_calculated for row in rows):
                 prev_token = self.store.get_device_stream_token()
                 all_room_ids = await self.store.get_all_device_list_changes(
                     prev_token, token
                 )
                 self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
 
+            # If we're sending federation we need to update the device lists
+            # outbound pokes stream change cache with updated hosts.
+            if self.send_handler and any(row.hosts_calculated for row in rows):
+                hosts = await self.store.get_destinations_for_device(token)
+                self.store.device_lists_outbound_pokes_have_changed(hosts, token)
+
         self.store.process_replication_rows(stream_name, instance_name, token, rows)
         # NOTE: this must be called after process_replication_rows to ensure any
         # cache invalidations are first handled before any stream ID advances.
@@ -433,12 +439,11 @@ class FederationSenderHandler:
             # The entities are either user IDs (starting with '@') whose devices
             # have changed, or remote servers that we need to tell about
             # changes.
-            hosts = {
-                row.entity
-                for row in rows
-                if not row.entity.startswith("@") and not row.is_signature
-            }
-            await self.federation_sender.send_device_messages(hosts, immediate=False)
+            if any(row.hosts_calculated for row in rows):
+                hosts = await self.store.get_destinations_for_device(token)
+                await self.federation_sender.send_device_messages(
+                    hosts, immediate=False
+                )
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 661206c841..d021904de7 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -549,10 +549,14 @@ class DeviceListsStream(_StreamFromIdGen):
 
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class DeviceListsStreamRow:
-        entity: str
+        user_id: str
         # Indicates that a user has signed their own device with their user-signing key
         is_signature: bool
 
+        # Indicates if this is a notification that we've calculated the hosts we
+        # need to send the update to.
+        hosts_calculated: bool
+
     NAME = "device_lists"
     ROW_TYPE = DeviceListsStreamRow
 
@@ -594,13 +598,13 @@ class DeviceListsStream(_StreamFromIdGen):
             upper_limit_token = min(upper_limit_token, signatures_to_token)
 
         device_updates = [
-            (stream_id, (entity, False))
-            for stream_id, (entity,) in device_updates
+            (stream_id, (entity, False, hosts))
+            for stream_id, (entity, hosts) in device_updates
             if stream_id <= upper_limit_token
         ]
 
         signatures_updates = [
-            (stream_id, (entity, True))
+            (stream_id, (entity, True, False))
             for stream_id, (entity,) in signatures_updates
             if stream_id <= upper_limit_token
         ]