summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-06-25 09:57:34 +0100
committerGitHub <noreply@github.com>2024-06-25 09:57:34 +0100
commita98cb87bee18c9028d03676ce544860239e1ff34 (patch)
treef36b5fcd516a41173ba3c7c9fff3d01205495c1c /synapse/replication
parentAdd `is_invite` filtering to Sliding Sync `/sync` (#17335) (diff)
downloadsynapse-a98cb87bee18c9028d03676ce544860239e1ff34.tar.xz
Revert "Reduce device lists replication traffic." (#17360)
Reverts element-hq/synapse#17333

It looks like master was still sending out replication RDATA with the
old format... somehow
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/client.py19
-rw-r--r--synapse/replication/tcp/streams/_base.py12
2 files changed, 11 insertions, 20 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3dddbb70b4..2d6d49eed7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -114,19 +114,13 @@ class ReplicationDataHandler:
         """
         all_room_ids: Set[str] = set()
         if stream_name == DeviceListsStream.NAME:
-            if any(not row.is_signature and not row.hosts_calculated for row in rows):
+            if any(row.entity.startswith("@") and not row.is_signature for row in rows):
                 prev_token = self.store.get_device_stream_token()
                 all_room_ids = await self.store.get_all_device_list_changes(
                     prev_token, token
                 )
                 self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
 
-            # If we're sending federation we need to update the device lists
-            # outbound pokes stream change cache with updated hosts.
-            if self.send_handler and any(row.hosts_calculated for row in rows):
-                hosts = await self.store.get_destinations_for_device(token)
-                self.store.device_lists_outbound_pokes_have_changed(hosts, token)
-
         self.store.process_replication_rows(stream_name, instance_name, token, rows)
         # NOTE: this must be called after process_replication_rows to ensure any
         # cache invalidations are first handled before any stream ID advances.
@@ -439,11 +433,12 @@ class FederationSenderHandler:
             # The entities are either user IDs (starting with '@') whose devices
             # have changed, or remote servers that we need to tell about
             # changes.
-            if any(row.hosts_calculated for row in rows):
-                hosts = await self.store.get_destinations_for_device(token)
-                await self.federation_sender.send_device_messages(
-                    hosts, immediate=False
-                )
+            hosts = {
+                row.entity
+                for row in rows
+                if not row.entity.startswith("@") and not row.is_signature
+            }
+            await self.federation_sender.send_device_messages(hosts, immediate=False)
 
         elif stream_name == ToDeviceStream.NAME:
             # The to_device stream includes stuff to be pushed to both local
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index d021904de7..661206c841 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -549,14 +549,10 @@ class DeviceListsStream(_StreamFromIdGen):
 
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class DeviceListsStreamRow:
-        user_id: str
+        entity: str
         # Indicates that a user has signed their own device with their user-signing key
         is_signature: bool
 
-        # Indicates if this is a notification that we've calculated the hosts we
-        # need to send the update to.
-        hosts_calculated: bool
-
     NAME = "device_lists"
     ROW_TYPE = DeviceListsStreamRow
 
@@ -598,13 +594,13 @@ class DeviceListsStream(_StreamFromIdGen):
             upper_limit_token = min(upper_limit_token, signatures_to_token)
 
         device_updates = [
-            (stream_id, (entity, False, hosts))
-            for stream_id, (entity, hosts) in device_updates
+            (stream_id, (entity, False))
+            for stream_id, (entity,) in device_updates
             if stream_id <= upper_limit_token
         ]
 
         signatures_updates = [
-            (stream_id, (entity, True, False))
+            (stream_id, (entity, True))
             for stream_id, (entity,) in signatures_updates
             if stream_id <= upper_limit_token
         ]