summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-17 09:29:58 +0000
committerGitHub <noreply@github.com>2023-01-17 09:29:58 +0000
commit2b084c5b710d9630178484e6ade597ca7fa814b6 (patch)
treed80e5964b12d2e3710dfe49186fb0229c4afc1dc /synapse/storage/databases/main
parentAdd parameter to control whether we do a partial state join (#14843) (diff)
downloadsynapse-2b084c5b710d9630178484e6ade597ca7fa814b6.tar.xz
Merge device list replication streams (#14833)
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/devices.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b067664473..cd186c8472 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -38,7 +38,7 @@ from synapse.logging.opentracing import (
     whitelisted_homeserver,
 )
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
+from synapse.replication.tcp.streams._base import DeviceListsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -163,9 +163,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
     ) -> None:
         if stream_name == DeviceListsStream.NAME:
             self._invalidate_caches_for_devices(token, rows)
-        elif stream_name == UserSignatureStream.NAME:
-            for row in rows:
-                self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
+
         return super().process_replication_rows(stream_name, instance_name, token, rows)
 
     def process_replication_position(
@@ -173,14 +171,17 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
     ) -> None:
         if stream_name == DeviceListsStream.NAME:
             self._device_list_id_gen.advance(instance_name, token)
-        elif stream_name == UserSignatureStream.NAME:
-            self._device_list_id_gen.advance(instance_name, token)
+
         super().process_replication_position(stream_name, instance_name, token)
 
     def _invalidate_caches_for_devices(
         self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
     ) -> None:
         for row in rows:
+            if row.is_signature:
+                self._user_signature_stream_cache.entity_has_changed(row.entity, token)
+                continue
+
             # The entities are either user IDs (starting with '@') whose devices
             # have changed, or remote servers that we need to tell about
             # changes.