diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 3dddbb70b4..2d6d49eed7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -114,19 +114,13 @@ class ReplicationDataHandler:
"""
all_room_ids: Set[str] = set()
if stream_name == DeviceListsStream.NAME:
- if any(not row.is_signature and not row.hosts_calculated for row in rows):
+ if any(row.entity.startswith("@") and not row.is_signature for row in rows):
prev_token = self.store.get_device_stream_token()
all_room_ids = await self.store.get_all_device_list_changes(
prev_token, token
)
self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
- # If we're sending federation we need to update the device lists
- # outbound pokes stream change cache with updated hosts.
- if self.send_handler and any(row.hosts_calculated for row in rows):
- hosts = await self.store.get_destinations_for_device(token)
- self.store.device_lists_outbound_pokes_have_changed(hosts, token)
-
self.store.process_replication_rows(stream_name, instance_name, token, rows)
# NOTE: this must be called after process_replication_rows to ensure any
# cache invalidations are first handled before any stream ID advances.
@@ -439,11 +433,12 @@ class FederationSenderHandler:
# The entities are either user IDs (starting with '@') whose devices
# have changed, or remote servers that we need to tell about
# changes.
- if any(row.hosts_calculated for row in rows):
- hosts = await self.store.get_destinations_for_device(token)
- await self.federation_sender.send_device_messages(
- hosts, immediate=False
- )
+ hosts = {
+ row.entity
+ for row in rows
+ if not row.entity.startswith("@") and not row.is_signature
+ }
+ await self.federation_sender.send_device_messages(hosts, immediate=False)
elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index d021904de7..661206c841 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -549,14 +549,10 @@ class DeviceListsStream(_StreamFromIdGen):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class DeviceListsStreamRow:
- user_id: str
+ entity: str
# Indicates that a user has signed their own device with their user-signing key
is_signature: bool
- # Indicates if this is a notification that we've calculated the hosts we
- # need to send the update to.
- hosts_calculated: bool
-
NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow
@@ -598,13 +594,13 @@ class DeviceListsStream(_StreamFromIdGen):
upper_limit_token = min(upper_limit_token, signatures_to_token)
device_updates = [
- (stream_id, (entity, False, hosts))
- for stream_id, (entity, hosts) in device_updates
+ (stream_id, (entity, False))
+ for stream_id, (entity,) in device_updates
if stream_id <= upper_limit_token
]
signatures_updates = [
- (stream_id, (entity, True, False))
+ (stream_id, (entity, True))
for stream_id, (entity,) in signatures_updates
if stream_id <= upper_limit_token
]
|