summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorOlivier 'reivilibre <oliverw@matrix.org>2024-05-18 15:21:52 +0100
committerOlivier 'reivilibre <oliverw@matrix.org>2024-05-18 15:21:52 +0100
commit233e25e193ceee926d46914fe2ee7ce7a1abdf3a (patch)
tree4e2db513e1379c9a10a532625b7e481a957edf5c /synapse/replication/tcp/client.py
parentMerge branch 'develop' into matrix-org-hotfixes (diff)
parentNewsfile (diff)
downloadsynapse-233e25e193ceee926d46914fe2ee7ce7a1abdf3a.tar.xz
Merge branch 'erikj/device_list_sync_perf' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py14
1 files changed, 8 insertions, 6 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index 5e5387fdcb..cff88a87ec 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -112,6 +112,14 @@ class ReplicationDataHandler: token: stream token for this batch of rows rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ + all_room_ids: Set[str] = set() + if stream_name == DeviceListsStream.NAME: + prev_token = self.store.get_device_stream_token() + all_room_ids = await self.store.get_all_device_list_changes( + prev_token, token + ) + self.store.device_lists_in_rooms_have_changed(all_room_ids, token) + self.store.process_replication_rows(stream_name, instance_name, token, rows) # NOTE: this must be called after process_replication_rows to ensure any # cache invalidations are first handled before any stream ID advances. @@ -146,12 +154,6 @@ class ReplicationDataHandler: StreamKeyType.TO_DEVICE, token, users=entities ) elif stream_name == DeviceListsStream.NAME: - all_room_ids: Set[str] = set() - for row in rows: - if row.entity.startswith("@") and not row.is_signature: - room_ids = await self.store.get_rooms_for_user(row.entity) - all_room_ids.update(room_ids) - # `all_room_ids` can be large, so let's wake up those streams in batches for batched_room_ids in batch_iter(all_room_ids, 100): self.notifier.on_new_event(