summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-22 09:57:06 +0100
committerErik Johnston <erik@matrix.org>2020-10-22 09:57:06 +0100
commitab4cd7f8026838d57ff36642abd1724fe766f2af (patch)
treedb4d66c27381903c3c0e75f16b372ac61a9b711e /synapse/storage
parentMerge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes (diff)
parentOptimise CacheDescriptor (#8594) (diff)
downloadsynapse-ab4cd7f8026838d57ff36642abd1724fe766f2af.tar.xz
Merge remote-tracking branch 'origin/release-v1.21.3' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py12
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/client_ips.py8
-rw-r--r--synapse/storage/databases/main/devices.py8
-rw-r--r--synapse/storage/databases/main/events.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py24
-rw-r--r--synapse/storage/databases/main/profile.py6
-rw-r--r--synapse/storage/databases/main/pusher.py2
-rw-r--r--synapse/storage/databases/main/receipts.py11
-rw-r--r--synapse/storage/databases/main/roommember.py18
-rw-r--r--synapse/storage/databases/main/schema/delta/58/21as_device_stream.sql (renamed from synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql)5
-rw-r--r--synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql1
-rw-r--r--synapse/storage/types.py4
13 files changed, 47 insertions, 58 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ab49d227de..2b196ded1b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -76,14 +76,16 @@ class SQLBaseStore(metaclass=ABCMeta):
         """
 
         try:
-            if key is None:
-                getattr(self, cache_name).invalidate_all()
-            else:
-                getattr(self, cache_name).invalidate(tuple(key))
+            cache = getattr(self, cache_name)
         except AttributeError:
             # We probably haven't pulled in the cache in this worker,
             # which is fine.
-            pass
+            return
+
+        if key is None:
+            cache.invalidate_all()
+        else:
+            cache.invalidate(tuple(key))
 
 
 def db_to_json(db_content):
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 763722d6bc..0217e63108 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -160,7 +160,7 @@ class LoggingDatabaseConnection:
         self.conn.__enter__()
         return self
 
-    def __exit__(self, exc_type, exc_value, traceback) -> bool:
+    def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
         return self.conn.__exit__(exc_type, exc_value, traceback)
 
     # Proxy through any unknown lookups to the DB conn class.
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index ec8260e906..2408432738 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -19,7 +19,7 @@ from typing import Dict, Optional, Tuple
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool, make_tuple_comparison_clause
-from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.lrucache import LruCache
 
 logger = logging.getLogger(__name__)
 
@@ -410,8 +410,8 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore):
 class ClientIpStore(ClientIpWorkerStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
 
-        self.client_ip_last_seen = DeferredCache(
-            name="client_ip_last_seen", keylen=4, max_entries=50000
+        self.client_ip_last_seen = LruCache(
+            cache_name="client_ip_last_seen", keylen=4, max_size=50000
         )
 
         super().__init__(database, db_conn, hs)
@@ -442,7 +442,7 @@ class ClientIpStore(ClientIpWorkerStore):
         if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
             return
 
-        self.client_ip_last_seen.prefill(key, now)
+        self.client_ip_last_seen.set(key, now)
 
         self._batch_row_update[key] = (user_agent, device_id, now)
 
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e662a20d24..dfb4f87b8f 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -34,8 +34,8 @@ from synapse.storage.database import (
 )
 from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
 from synapse.util import json_decoder, json_encoder
-from synapse.util.caches.deferred_cache import DeferredCache
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 from synapse.util.stringutils import shortstr
 
@@ -1005,8 +1005,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
         # Map of (user_id, device_id) -> bool. If there is an entry that implies
         # the device exists.
-        self.device_id_exists_cache = DeferredCache(
-            name="device_id_exists", keylen=2, max_entries=10000
+        self.device_id_exists_cache = LruCache(
+            cache_name="device_id_exists", keylen=2, max_size=10000
         )
 
     async def store_device(
@@ -1052,7 +1052,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 )
                 if hidden:
                     raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
-            self.device_id_exists_cache.prefill(key, True)
+            self.device_id_exists_cache.set(key, True)
             return inserted
         except StoreError:
             raise
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ba3b1769b0..87808c1483 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1051,9 +1051,7 @@ class PersistEventsStore:
 
         def prefill():
             for cache_entry in to_prefill:
-                self.store._get_event_cache.prefill(
-                    (cache_entry[0].event_id,), cache_entry
-                )
+                self.store._get_event_cache.set((cache_entry[0].event_id,), cache_entry)
 
         txn.call_after(prefill)
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index ff150f0be7..6e7f16f39c 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -33,7 +33,10 @@ from synapse.api.room_versions import (
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.utils import prune_event
 from synapse.logging.context import PreserveLoggingContext, current_context
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
@@ -42,8 +45,8 @@ from synapse.storage.database import DatabasePool
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.types import Collection, get_domain_from_id
-from synapse.util.caches.deferred_cache import DeferredCache
 from synapse.util.caches.descriptors import cached
+from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -137,20 +140,16 @@ class EventsWorkerStore(SQLBaseStore):
                     db_conn, "events", "stream_ordering", step=-1
                 )
 
-        if not hs.config.worker.worker_app:
+        if hs.config.run_background_tasks:
             # We periodically clean out old transaction ID mappings
             self._clock.looping_call(
-                run_as_background_process,
-                5 * 60 * 1000,
-                "_cleanup_old_transaction_ids",
-                self._cleanup_old_transaction_ids,
+                self._cleanup_old_transaction_ids, 5 * 60 * 1000,
             )
 
-        self._get_event_cache = DeferredCache(
-            "*getEvent*",
+        self._get_event_cache = LruCache(
+            cache_name="*getEvent*",
             keylen=3,
-            max_entries=hs.config.caches.event_cache_size,
-            apply_cache_factor_from_config=False,
+            max_size=hs.config.caches.event_cache_size,
         )
 
         self._event_fetch_lock = threading.Condition()
@@ -749,7 +748,7 @@ class EventsWorkerStore(SQLBaseStore):
                 event=original_ev, redacted_event=redacted_event
             )
 
-            self._get_event_cache.prefill((event_id,), cache_entry)
+            self._get_event_cache.set((event_id,), cache_entry)
             result_map[event_id] = cache_entry
 
         return result_map
@@ -1375,6 +1374,7 @@ class EventsWorkerStore(SQLBaseStore):
 
         return mapping
 
+    @wrap_as_background_process("_cleanup_old_transaction_ids")
     async def _cleanup_old_transaction_ids(self):
         """Cleans out transaction id mappings older than 24hrs.
         """
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 1681caa1f0..a6d1eb908a 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Any, Dict, Optional
+from typing import Any, Dict, List, Optional
 
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
@@ -72,7 +72,7 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     async def set_profile_displayname(
-        self, user_localpart: str, new_displayname: str
+        self, user_localpart: str, new_displayname: Optional[str]
     ) -> None:
         await self.db_pool.simple_update_one(
             table="profiles",
@@ -144,7 +144,7 @@ class ProfileWorkerStore(SQLBaseStore):
 
     async def get_remote_profile_cache_entries_that_expire(
         self, last_checked: int
-    ) -> Dict[str, str]:
+    ) -> List[Dict[str, str]]:
         """Get all users who haven't been checked since `last_checked`
         """
 
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index df8609b97b..7997242d90 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -303,7 +303,7 @@ class PusherStore(PusherWorkerStore):
                 lock=False,
             )
 
-            user_has_pusher = self.get_if_user_has_pusher.cache.get(
+            user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
                 (user_id,), None, update_metrics=False
             )
 
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5cdf16521c..ca7917c989 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -25,7 +25,6 @@ from synapse.storage.database import DatabasePool
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import JsonDict
 from synapse.util import json_encoder
-from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -413,18 +412,10 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
         if receipt_type != "m.read":
             return
 
-        # Returns either an ObservableDeferred or the raw result
-        res = self.get_users_with_read_receipts_in_room.cache.get(
+        res = self.get_users_with_read_receipts_in_room.cache.get_immediate(
             room_id, None, update_metrics=False
         )
 
-        # first handle the ObservableDeferred case
-        if isinstance(res, ObservableDeferred):
-            if res.has_called():
-                res = res.get_result()
-            else:
-                res = None
-
         if res and user_id in res:
             # We'd only be adding to the set, so no point invalidating if the
             # user is already there
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 20fcdaa529..01d9dbb36f 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -20,7 +20,10 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
@@ -67,16 +70,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         ):
             self._known_servers_count = 1
             self.hs.get_clock().looping_call(
-                run_as_background_process,
-                60 * 1000,
-                "_count_known_servers",
-                self._count_known_servers,
+                self._count_known_servers, 60 * 1000,
             )
             self.hs.get_clock().call_later(
-                1000,
-                run_as_background_process,
-                "_count_known_servers",
-                self._count_known_servers,
+                1000, self._count_known_servers,
             )
             LaterGauge(
                 "synapse_federation_known_servers",
@@ -85,6 +82,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 lambda: self._known_servers_count,
             )
 
+    @wrap_as_background_process("_count_known_servers")
     async def _count_known_servers(self):
         """
         Count the servers that this server knows about.
@@ -531,7 +529,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # If we do then we can reuse that result and simply update it with
             # any membership changes in `delta_ids`
             if context.prev_group and context.delta_ids:
-                prev_res = self._get_joined_users_from_context.cache.get(
+                prev_res = self._get_joined_users_from_context.cache.get_immediate(
                     (room_id, context.prev_group), None
                 )
                 if prev_res and isinstance(prev_res, dict):
diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/21as_device_stream.sql
index 20f5a95a24..7b84a207fd 100644
--- a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
+++ b/synapse/storage/databases/main/schema/delta/58/21as_device_stream.sql
@@ -13,6 +13,5 @@
  * limitations under the License.
  */
 
-ALTER TABLE application_services_state
-    ADD COLUMN read_receipt_stream_id INT,
-    ADD COLUMN presence_stream_id INT;
\ No newline at end of file
+ALTER TABLE application_services_state ADD COLUMN read_receipt_stream_id INT;
+ALTER TABLE application_services_state ADD COLUMN presence_stream_id INT;
\ No newline at end of file
diff --git a/synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql b/synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql
new file mode 100644
index 0000000000..01ea6eddcf
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql
@@ -0,0 +1 @@
+DROP TABLE device_max_stream_id;
diff --git a/synapse/storage/types.py b/synapse/storage/types.py
index 970bb1b9da..9cadcba18f 100644
--- a/synapse/storage/types.py
+++ b/synapse/storage/types.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Any, Iterable, Iterator, List, Tuple
+from typing import Any, Iterable, Iterator, List, Optional, Tuple
 
 from typing_extensions import Protocol
 
@@ -65,5 +65,5 @@ class Connection(Protocol):
     def __enter__(self) -> "Connection":
         ...
 
-    def __exit__(self, exc_type, exc_value, traceback) -> bool:
+    def __exit__(self, exc_type, exc_value, traceback) -> Optional[bool]:
         ...