diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index c342df2a8b..6e7f16f39c 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -33,7 +33,10 @@ from synapse.api.room_versions import (
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event
from synapse.logging.context import PreserveLoggingContext, current_context
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream
@@ -140,10 +143,7 @@ class EventsWorkerStore(SQLBaseStore):
if hs.config.run_background_tasks:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
- run_as_background_process,
- 5 * 60 * 1000,
- "_cleanup_old_transaction_ids",
- self._cleanup_old_transaction_ids,
+ self._cleanup_old_transaction_ids, 5 * 60 * 1000,
)
self._get_event_cache = LruCache(
@@ -1374,6 +1374,7 @@ class EventsWorkerStore(SQLBaseStore):
return mapping
+ @wrap_as_background_process("_cleanup_old_transaction_ids")
async def _cleanup_old_transaction_ids(self):
"""Cleans out transaction id mappings older than 24hrs.
"""
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 822bf51a80..320ca9413b 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -131,7 +131,7 @@ class ProfileWorkerStore(SQLBaseStore):
)
async def set_profile_displayname(
- self, user_localpart: str, new_displayname: str, batchnum: int
+ self, user_localpart: str, new_displayname: Optional[str], batchnum: int
) -> None:
# Invalidate the read cache for this user
self.get_profile_displayname.invalidate((user_localpart,))
@@ -266,7 +266,7 @@ class ProfileWorkerStore(SQLBaseStore):
async def get_remote_profile_cache_entries_that_expire(
self, last_checked: int
- ) -> Dict[str, str]:
+ ) -> List[Dict[str, str]]:
"""Get all users who haven't been checked since `last_checked`
"""
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index df8609b97b..7997242d90 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -303,7 +303,7 @@ class PusherStore(PusherWorkerStore):
lock=False,
)
- user_has_pusher = self.get_if_user_has_pusher.cache.get(
+ user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
(user_id,), None, update_metrics=False
)
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5cdf16521c..ca7917c989 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -25,7 +25,6 @@ from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator
from synapse.types import JsonDict
from synapse.util import json_encoder
-from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -413,18 +412,10 @@ class ReceiptsWorkerStore(SQLBaseStore, metaclass=abc.ABCMeta):
if receipt_type != "m.read":
return
- # Returns either an ObservableDeferred or the raw result
- res = self.get_users_with_read_receipts_in_room.cache.get(
+ res = self.get_users_with_read_receipts_in_room.cache.get_immediate(
room_id, None, update_metrics=False
)
- # first handle the ObservableDeferred case
- if isinstance(res, ObservableDeferred):
- if res.has_called():
- res = res.get_result()
- else:
- res = None
-
if res and user_id in res:
# We'd only be adding to the set, so no point invalidating if the
# user is already there
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 20fcdaa529..01d9dbb36f 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -20,7 +20,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.events_worker import EventsWorkerStore
@@ -67,16 +70,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
):
self._known_servers_count = 1
self.hs.get_clock().looping_call(
- run_as_background_process,
- 60 * 1000,
- "_count_known_servers",
- self._count_known_servers,
+ self._count_known_servers, 60 * 1000,
)
self.hs.get_clock().call_later(
- 1000,
- run_as_background_process,
- "_count_known_servers",
- self._count_known_servers,
+ 1000, self._count_known_servers,
)
LaterGauge(
"synapse_federation_known_servers",
@@ -85,6 +82,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
lambda: self._known_servers_count,
)
+ @wrap_as_background_process("_count_known_servers")
async def _count_known_servers(self):
"""
Count the servers that this server knows about.
@@ -531,7 +529,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# If we do then we can reuse that result and simply update it with
# any membership changes in `delta_ids`
if context.prev_group and context.delta_ids:
- prev_res = self._get_joined_users_from_context.cache.get(
+ prev_res = self._get_joined_users_from_context.cache.get_immediate(
(room_id, context.prev_group), None
)
if prev_res and isinstance(prev_res, dict):
diff --git a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql b/synapse/storage/databases/main/schema/delta/58/21as_device_stream.sql
index 20f5a95a24..7b84a207fd 100644
--- a/synapse/storage/databases/main/schema/delta/59/19as_device_stream.sql
+++ b/synapse/storage/databases/main/schema/delta/58/21as_device_stream.sql
@@ -13,6 +13,5 @@
* limitations under the License.
*/
-ALTER TABLE application_services_state
- ADD COLUMN read_receipt_stream_id INT,
- ADD COLUMN presence_stream_id INT;
\ No newline at end of file
+ALTER TABLE application_services_state ADD COLUMN read_receipt_stream_id INT;
+ALTER TABLE application_services_state ADD COLUMN presence_stream_id INT;
\ No newline at end of file
diff --git a/synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql b/synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql
new file mode 100644
index 0000000000..01ea6eddcf
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/21drop_device_max_stream_id.sql
@@ -0,0 +1 @@
+DROP TABLE device_max_stream_id;
|