diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index ba257d34e6..3dddbb70b4 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -55,6 +55,7 @@ from synapse.replication.tcp.streams.partial_state import (
)
from synapse.types import PersistedEventPosition, ReadReceipt, StreamKeyType, UserID
from synapse.util.async_helpers import Linearizer, timeout_deferred
+from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -111,6 +112,21 @@ class ReplicationDataHandler:
token: stream token for this batch of rows
rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
"""
+ all_room_ids: Set[str] = set()
+ if stream_name == DeviceListsStream.NAME:
+ if any(not row.is_signature and not row.hosts_calculated for row in rows):
+ prev_token = self.store.get_device_stream_token()
+ all_room_ids = await self.store.get_all_device_list_changes(
+ prev_token, token
+ )
+ self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
+
+ # If we're sending federation we need to update the device lists
+ # outbound pokes stream change cache with updated hosts.
+ if self.send_handler and any(row.hosts_calculated for row in rows):
+ hosts = await self.store.get_destinations_for_device(token)
+ self.store.device_lists_outbound_pokes_have_changed(hosts, token)
+
self.store.process_replication_rows(stream_name, instance_name, token, rows)
# NOTE: this must be called after process_replication_rows to ensure any
# cache invalidations are first handled before any stream ID advances.
@@ -145,14 +161,14 @@ class ReplicationDataHandler:
StreamKeyType.TO_DEVICE, token, users=entities
)
elif stream_name == DeviceListsStream.NAME:
- all_room_ids: Set[str] = set()
- for row in rows:
- if row.entity.startswith("@") and not row.is_signature:
- room_ids = await self.store.get_rooms_for_user(row.entity)
- all_room_ids.update(room_ids)
- self.notifier.on_new_event(
- StreamKeyType.DEVICE_LIST, token, rooms=all_room_ids
- )
+ # `all_room_ids` can be large, so let's wake up those streams in batches
+ for batched_room_ids in batch_iter(all_room_ids, 100):
+ self.notifier.on_new_event(
+ StreamKeyType.DEVICE_LIST, token, rooms=batched_room_ids
+ )
+
+ # Yield to reactor so that we don't block.
+ await self._clock.sleep(0)
elif stream_name == PushersStream.NAME:
for row in rows:
if row.deleted:
@@ -423,12 +439,11 @@ class FederationSenderHandler:
# The entities are either user IDs (starting with '@') whose devices
# have changed, or remote servers that we need to tell about
# changes.
- hosts = {
- row.entity
- for row in rows
- if not row.entity.startswith("@") and not row.is_signature
- }
- await self.federation_sender.send_device_messages(hosts, immediate=False)
+ if any(row.hosts_calculated for row in rows):
+ hosts = await self.store.get_destinations_for_device(token)
+ await self.federation_sender.send_device_messages(
+ hosts, immediate=False
+ )
elif stream_name == ToDeviceStream.NAME:
# The to_device stream includes stuff to be pushed to both local
|