summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-04-05 14:26:41 +0100
committerGitHub <noreply@github.com>2022-04-05 14:26:41 +0100
commit66053b6bfbc27a6e281655ebca8f2abbee730135 (patch)
treea81f601ee8d5819ba09b44c00dced2795331a2fc /synapse/storage/databases/main
parentAdd type hints to some tests files (#12371) (diff)
downloadsynapse-66053b6bfbc27a6e281655ebca8f2abbee730135.tar.xz
Prefill more stream change caches. (#12372)
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/__init__.py21
-rw-r--r--synapse/storage/databases/main/devices.py50
-rw-r--r--synapse/storage/databases/main/receipts.py13
3 files changed, 62 insertions, 22 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index d4a38daa9a..951031af50 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -183,27 +183,6 @@ class DataStore(
 
         super().__init__(database, db_conn, hs)
 
-        device_list_max = self._device_list_id_gen.get_current_token()
-        device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
-            db_conn,
-            "device_lists_stream",
-            entity_column="user_id",
-            stream_column="stream_id",
-            max_value=device_list_max,
-            limit=1000,
-        )
-        self._device_list_stream_cache = StreamChangeCache(
-            "DeviceListStreamChangeCache",
-            min_device_list_id,
-            prefilled_cache=device_list_prefill,
-        )
-        self._user_signature_stream_cache = StreamChangeCache(
-            "UserSignatureStreamChangeCache", device_list_max
-        )
-        self._device_list_federation_stream_cache = StreamChangeCache(
-            "DeviceListFederationStreamChangeCache", device_list_max
-        )
-
         events_max = self._stream_id_gen.get_current_token()
         curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
             db_conn,
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 07eea4b3d2..dc8009b23d 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -46,6 +46,7 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
 from synapse.util import json_decoder, json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.iterutils import batch_iter
 from synapse.util.stringutils import shortstr
 
@@ -71,6 +72,55 @@ class DeviceWorkerStore(SQLBaseStore):
     ):
         super().__init__(database, db_conn, hs)
 
+        device_list_max = self._device_list_id_gen.get_current_token()
+        device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
+            db_conn,
+            "device_lists_stream",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=device_list_max,
+            limit=10000,
+        )
+        self._device_list_stream_cache = StreamChangeCache(
+            "DeviceListStreamChangeCache",
+            min_device_list_id,
+            prefilled_cache=device_list_prefill,
+        )
+
+        (
+            user_signature_stream_prefill,
+            user_signature_stream_list_id,
+        ) = self.db_pool.get_cache_dict(
+            db_conn,
+            "user_signature_stream",
+            entity_column="from_user_id",
+            stream_column="stream_id",
+            max_value=device_list_max,
+            limit=1000,
+        )
+        self._user_signature_stream_cache = StreamChangeCache(
+            "UserSignatureStreamChangeCache",
+            user_signature_stream_list_id,
+            prefilled_cache=user_signature_stream_prefill,
+        )
+
+        (
+            device_list_federation_prefill,
+            device_list_federation_list_id,
+        ) = self.db_pool.get_cache_dict(
+            db_conn,
+            "device_lists_outbound_pokes",
+            entity_column="destination",
+            stream_column="stream_id",
+            max_value=device_list_max,
+            limit=10000,
+        )
+        self._device_list_federation_stream_cache = StreamChangeCache(
+            "DeviceListFederationStreamChangeCache",
+            device_list_federation_list_id,
+            prefilled_cache=device_list_federation_prefill,
+        )
+
         if hs.config.worker.run_background_tasks:
             self._clock.looping_call(
                 self._prune_old_outbound_device_pokes, 60 * 60 * 1000
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index e6f97aeece..332e901dda 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -98,8 +98,19 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
         super().__init__(database, db_conn, hs)
 
+        max_receipts_stream_id = self.get_max_receipt_stream_id()
+        receipts_stream_prefill, min_receipts_stream_id = self.db_pool.get_cache_dict(
+            db_conn,
+            "receipts_linearized",
+            entity_column="room_id",
+            stream_column="stream_id",
+            max_value=max_receipts_stream_id,
+            limit=10000,
+        )
         self._receipts_stream_cache = StreamChangeCache(
-            "ReceiptsRoomChangeCache", self.get_max_receipt_stream_id()
+            "ReceiptsRoomChangeCache",
+            min_receipts_stream_id,
+            prefilled_cache=receipts_stream_prefill,
         )
 
     def get_max_receipt_stream_id(self) -> int: