diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/tcp/client.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 5e5387fdcb..2d6d49eed7 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -112,6 +112,15 @@ class ReplicationDataHandler: token: stream token for this batch of rows rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ + all_room_ids: Set[str] = set() + if stream_name == DeviceListsStream.NAME: + if any(row.entity.startswith("@") and not row.is_signature for row in rows): + prev_token = self.store.get_device_stream_token() + all_room_ids = await self.store.get_all_device_list_changes( + prev_token, token + ) + self.store.device_lists_in_rooms_have_changed(all_room_ids, token) + self.store.process_replication_rows(stream_name, instance_name, token, rows) # NOTE: this must be called after process_replication_rows to ensure any # cache invalidations are first handled before any stream ID advances. @@ -146,12 +155,6 @@ class ReplicationDataHandler: StreamKeyType.TO_DEVICE, token, users=entities ) elif stream_name == DeviceListsStream.NAME: - all_room_ids: Set[str] = set() - for row in rows: - if row.entity.startswith("@") and not row.is_signature: - room_ids = await self.store.get_rooms_for_user(row.entity) - all_room_ids.update(room_ids) - # `all_room_ids` can be large, so let's wake up those streams in batches for batched_room_ids in batch_iter(all_room_ids, 100): self.notifier.on_new_event( |