summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/client.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 5e5387fdcb..2d6d49eed7 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -112,6 +112,15 @@ class ReplicationDataHandler:
             token: stream token for this batch of rows
             rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
         """
+        all_room_ids: Set[str] = set()
+        if stream_name == DeviceListsStream.NAME:
+            if any(row.entity.startswith("@") and not row.is_signature for row in rows):
+                prev_token = self.store.get_device_stream_token()
+                all_room_ids = await self.store.get_all_device_list_changes(
+                    prev_token, token
+                )
+                self.store.device_lists_in_rooms_have_changed(all_room_ids, token)
+
         self.store.process_replication_rows(stream_name, instance_name, token, rows)
         # NOTE: this must be called after process_replication_rows to ensure any
         # cache invalidations are first handled before any stream ID advances.
@@ -146,12 +155,6 @@ class ReplicationDataHandler:
                     StreamKeyType.TO_DEVICE, token, users=entities
                 )
         elif stream_name == DeviceListsStream.NAME:
-            all_room_ids: Set[str] = set()
-            for row in rows:
-                if row.entity.startswith("@") and not row.is_signature:
-                    room_ids = await self.store.get_rooms_for_user(row.entity)
-                    all_room_ids.update(room_ids)
-
             # `all_room_ids` can be large, so let's wake up those streams in batches
             for batched_room_ids in batch_iter(all_room_ids, 100):
                 self.notifier.on_new_event(