From a7bdf98d01d2225a479753a85ba81adf02b16a32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2020 21:38:57 +0100 Subject: Rename database classes to make some sense (#8033) --- synapse/storage/databases/main/pusher.py | 356 +++++++++++++++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 synapse/storage/databases/main/pusher.py (limited to 'synapse/storage/databases/main/pusher.py') diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py new file mode 100644 index 0000000000..b5200fbe79 --- /dev/null +++ b/synapse/storage/databases/main/pusher.py @@ -0,0 +1,356 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Iterable, Iterator, List, Tuple + +from canonicaljson import encode_canonical_json + +from twisted.internet import defer + +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList + +logger = logging.getLogger(__name__) + + +class PusherWorkerStore(SQLBaseStore): + def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[dict]: + """JSON-decode the data in the rows returned from the `pushers` table + + Drops any rows whose data cannot be decoded + """ + for r in rows: + dataJson = r["data"] + try: + r["data"] = db_to_json(dataJson) + except Exception as e: + logger.warning( + "Invalid JSON in data for pusher %d: %s, %s", + r["id"], + dataJson, + e.args[0], + ) + continue + + yield r + + @defer.inlineCallbacks + def user_has_pusher(self, user_id): + ret = yield self.db_pool.simple_select_one_onecol( + "pushers", {"user_name": user_id}, "id", allow_none=True + ) + return ret is not None + + def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey): + return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey}) + + def get_pushers_by_user_id(self, user_id): + return self.get_pushers_by({"user_name": user_id}) + + @defer.inlineCallbacks + def get_pushers_by(self, keyvalues): + ret = yield self.db_pool.simple_select_list( + "pushers", + keyvalues, + [ + "id", + "user_name", + "access_token", + "profile_tag", + "kind", + "app_id", + "app_display_name", + "device_display_name", + "pushkey", + "ts", + "lang", + "data", + "last_stream_ordering", + "last_success", + "failing_since", + ], + desc="get_pushers_by", + ) + return self._decode_pushers_rows(ret) + + @defer.inlineCallbacks + def get_all_pushers(self): + def get_pushers(txn): + txn.execute("SELECT * FROM pushers") + rows = self.db_pool.cursor_to_dict(txn) + + return self._decode_pushers_rows(rows) + + rows = yield self.db_pool.runInteraction("get_all_pushers", get_pushers) + return rows + + async def get_all_updated_pushers_rows( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for pushers replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_all_updated_pushers_rows_txn(txn): + sql = """ + SELECT id, user_name, app_id, pushkey + FROM pushers + WHERE ? < id AND id <= ? + ORDER BY id ASC LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + updates = [ + (stream_id, (user_name, app_id, pushkey, False)) + for stream_id, user_name, app_id, pushkey in txn + ] + + sql = """ + SELECT stream_id, user_id, app_id, pushkey + FROM deleted_pushers + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + updates.extend( + (stream_id, (user_name, app_id, pushkey, True)) + for stream_id, user_name, app_id, pushkey in txn + ) + + updates.sort() # Sort so that they're ordered by stream id + + limited = False + upper_bound = current_id + if len(updates) >= limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited + + return await self.db_pool.runInteraction( + "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn + ) + + @cachedInlineCallbacks(num_args=1, max_entries=15000) + def get_if_user_has_pusher(self, user_id): + # This only exists for the cachedList decorator + raise NotImplementedError() + + @cachedList( + cached_method_name="get_if_user_has_pusher", + list_name="user_ids", + num_args=1, + inlineCallbacks=True, + ) + def get_if_users_have_pushers(self, user_ids): + rows = yield self.db_pool.simple_select_many_batch( + table="pushers", + column="user_name", + iterable=user_ids, + retcols=["user_name"], + desc="get_if_users_have_pushers", + ) + + result = {user_id: False for user_id in user_ids} + result.update({r["user_name"]: True for r in rows}) + + return result + + @defer.inlineCallbacks + def update_pusher_last_stream_ordering( + self, app_id, pushkey, user_id, last_stream_ordering + ): + yield self.db_pool.simple_update_one( + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + {"last_stream_ordering": last_stream_ordering}, + desc="update_pusher_last_stream_ordering", + ) + + @defer.inlineCallbacks + def update_pusher_last_stream_ordering_and_success( + self, app_id, pushkey, user_id, last_stream_ordering, last_success + ): + """Update the last stream ordering position we've processed up to for + the given pusher. + + Args: + app_id (str) + pushkey (str) + last_stream_ordering (int) + last_success (int) + + Returns: + Deferred[bool]: True if the pusher still exists; False if it has been deleted. + """ + updated = yield self.db_pool.simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={ + "last_stream_ordering": last_stream_ordering, + "last_success": last_success, + }, + desc="update_pusher_last_stream_ordering_and_success", + ) + + return bool(updated) + + @defer.inlineCallbacks + def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): + yield self.db_pool.simple_update( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + updatevalues={"failing_since": failing_since}, + desc="update_pusher_failing_since", + ) + + @defer.inlineCallbacks + def get_throttle_params_by_room(self, pusher_id): + res = yield self.db_pool.simple_select_list( + "pusher_throttle", + {"pusher": pusher_id}, + ["room_id", "last_sent_ts", "throttle_ms"], + desc="get_throttle_params_by_room", + ) + + params_by_room = {} + for row in res: + params_by_room[row["room_id"]] = { + "last_sent_ts": row["last_sent_ts"], + "throttle_ms": row["throttle_ms"], + } + + return params_by_room + + @defer.inlineCallbacks + def set_throttle_params(self, pusher_id, room_id, params): + # no need to lock because `pusher_throttle` has a primary key on + # (pusher, room_id) so simple_upsert will retry + yield self.db_pool.simple_upsert( + "pusher_throttle", + {"pusher": pusher_id, "room_id": room_id}, + params, + desc="set_throttle_params", + lock=False, + ) + + +class PusherStore(PusherWorkerStore): + def get_pushers_stream_token(self): + return self._pushers_id_gen.get_current_token() + + @defer.inlineCallbacks + def add_pusher( + self, + user_id, + access_token, + kind, + app_id, + app_display_name, + device_display_name, + pushkey, + pushkey_ts, + lang, + data, + last_stream_ordering, + profile_tag="", + ): + with self._pushers_id_gen.get_next() as stream_id: + # no need to lock because `pushers` has a unique key on + # (app_id, pushkey, user_name) so simple_upsert will retry + yield self.db_pool.simple_upsert( + table="pushers", + keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + values={ + "access_token": access_token, + "kind": kind, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "ts": pushkey_ts, + "lang": lang, + "data": bytearray(encode_canonical_json(data)), + "last_stream_ordering": last_stream_ordering, + "profile_tag": profile_tag, + "id": stream_id, + }, + desc="add_pusher", + lock=False, + ) + + user_has_pusher = self.get_if_user_has_pusher.cache.get( + (user_id,), None, update_metrics=False + ) + + if user_has_pusher is not True: + # invalidate, since we the user might not have had a pusher before + yield self.db_pool.runInteraction( + "add_pusher", + self._invalidate_cache_and_stream, + self.get_if_user_has_pusher, + (user_id,), + ) + + @defer.inlineCallbacks + def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): + def delete_pusher_txn(txn, stream_id): + self._invalidate_cache_and_stream( + txn, self.get_if_user_has_pusher, (user_id,) + ) + + self.db_pool.simple_delete_one_txn( + txn, + "pushers", + {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, + ) + + # it's possible for us to end up with duplicate rows for + # (app_id, pushkey, user_id) at different stream_ids, but that + # doesn't really matter. + self.db_pool.simple_insert_txn( + txn, + table="deleted_pushers", + values={ + "stream_id": stream_id, + "app_id": app_id, + "pushkey": pushkey, + "user_id": user_id, + }, + ) + + with self._pushers_id_gen.get_next() as stream_id: + yield self.db_pool.runInteraction( + "delete_pusher", delete_pusher_txn, stream_id + ) -- cgit 1.5.1 From 894dae74fe8e79911c3c001c8b84620ef3985bf6 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 07:24:26 -0400 Subject: Convert misc database code to async (#8087) --- changelog.d/8087.misc | 1 + synapse/storage/background_updates.py | 14 +++++--------- synapse/storage/databases/main/devices.py | 5 ++--- synapse/storage/databases/main/event_push_actions.py | 9 ++++----- synapse/storage/databases/main/presence.py | 9 +++------ synapse/storage/databases/main/push_rule.py | 16 ++++++---------- synapse/storage/databases/main/pusher.py | 9 +++------ synapse/storage/databases/main/receipts.py | 5 ++--- synapse/storage/databases/main/roommember.py | 17 ++++++----------- synapse/storage/databases/main/state.py | 5 ++--- synapse/storage/databases/main/user_erasure_store.py | 13 +++++-------- 11 files changed, 39 insertions(+), 64 deletions(-) create mode 100644 changelog.d/8087.misc (limited to 'synapse/storage/databases/main/pusher.py') diff --git a/changelog.d/8087.misc b/changelog.d/8087.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8087.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index f43463df53..90a1f9e8b1 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -18,8 +18,6 @@ from typing import Optional from canonicaljson import json -from twisted.internet import defer - from synapse.metrics.background_process_metrics import run_as_background_process from . import engines @@ -308,9 +306,8 @@ class BackgroundUpdater(object): update_name (str): Name of update """ - @defer.inlineCallbacks - def noop_update(progress, batch_size): - yield self._end_background_update(update_name) + async def noop_update(progress, batch_size): + await self._end_background_update(update_name) return 1 self.register_background_update_handler(update_name, noop_update) @@ -409,12 +406,11 @@ class BackgroundUpdater(object): else: runner = create_index_sqlite - @defer.inlineCallbacks - def updater(progress, batch_size): + async def updater(progress, batch_size): if runner is not None: logger.info("Adding index %s to %s", index_name, table) - yield self.db_pool.runWithConnection(runner) - yield self._end_background_update(update_name) + await self.db_pool.runWithConnection(runner) + await self._end_background_update(update_name) return 1 self.register_background_update_handler(update_name, updater) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 2b33060480..9a786e2929 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -671,10 +671,9 @@ class DeviceWorkerStore(SQLBaseStore): @cachedList( cached_method_name="get_device_list_last_stream_id_for_remote", list_name="user_ids", - inlineCallbacks=True, ) - def get_device_list_last_stream_id_for_remotes(self, user_ids: str): - rows = yield self.db_pool.simple_select_many_batch( + async def get_device_list_last_stream_id_for_remotes(self, user_ids: str): + rows = await self.db_pool.simple_select_many_batch( table="device_lists_remote_extremeties", column="user_id", iterable=user_ids, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 7c246d3e4c..e8834b2162 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -21,7 +21,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool from synapse.util import json_encoder -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -86,18 +86,17 @@ class EventPushActionsWorkerStore(SQLBaseStore): self._rotate_delay = 3 self._rotate_count = 10000 - @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000) - def get_unread_event_push_actions_by_room_for_user( + @cached(num_args=3, tree=True, max_entries=5000) + async def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): - ret = yield self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "get_unread_event_push_actions_by_room", self._get_unread_counts_by_receipt_txn, room_id, user_id, last_read_event_id, ) - return ret def _get_unread_counts_by_receipt_txn( self, txn, room_id, user_id, last_read_event_id diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 59ba12820a..fd213d2dfd 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -130,13 +130,10 @@ class PresenceStore(SQLBaseStore): raise NotImplementedError() @cachedList( - cached_method_name="_get_presence_for_user", - list_name="user_ids", - num_args=1, - inlineCallbacks=True, + cached_method_name="_get_presence_for_user", list_name="user_ids", num_args=1, ) - def get_presence_for_users(self, user_ids): - rows = yield self.db_pool.simple_select_many_batch( + async def get_presence_for_users(self, user_ids): + rows = await self.db_pool.simple_select_many_batch( table="presence_stream", column="user_id", iterable=user_ids, diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 6562db5c2b..6aa5802977 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -170,18 +170,15 @@ class PushRulesWorkerStore( ) @cachedList( - cached_method_name="get_push_rules_for_user", - list_name="user_ids", - num_args=1, - inlineCallbacks=True, + cached_method_name="get_push_rules_for_user", list_name="user_ids", num_args=1, ) - def bulk_get_push_rules(self, user_ids): + async def bulk_get_push_rules(self, user_ids): if not user_ids: return {} results = {user_id: [] for user_id in user_ids} - rows = yield self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="push_rules", column="user_name", iterable=user_ids, @@ -194,7 +191,7 @@ class PushRulesWorkerStore( for row in rows: results.setdefault(row["user_name"], []).append(row) - enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) + enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids) for user_id, rules in results.items(): use_new_defaults = user_id in self._users_new_default_push_rules @@ -260,15 +257,14 @@ class PushRulesWorkerStore( cached_method_name="get_push_rules_enabled_for_user", list_name="user_ids", num_args=1, - inlineCallbacks=True, ) - def bulk_get_push_rules_enabled(self, user_ids): + async def bulk_get_push_rules_enabled(self, user_ids): if not user_ids: return {} results = {user_id: {} for user_id in user_ids} - rows = yield self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="push_rules_enable", column="user_name", iterable=user_ids, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index b5200fbe79..8b793d1487 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -170,13 +170,10 @@ class PusherWorkerStore(SQLBaseStore): raise NotImplementedError() @cachedList( - cached_method_name="get_if_user_has_pusher", - list_name="user_ids", - num_args=1, - inlineCallbacks=True, + cached_method_name="get_if_user_has_pusher", list_name="user_ids", num_args=1, ) - def get_if_users_have_pushers(self, user_ids): - rows = yield self.db_pool.simple_select_many_batch( + async def get_if_users_have_pushers(self, user_ids): + rows = await self.db_pool.simple_select_many_batch( table="pushers", column="user_name", iterable=user_ids, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 1920a8a152..579b7bb17b 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -212,9 +212,8 @@ class ReceiptsWorkerStore(SQLBaseStore): cached_method_name="_get_linearized_receipts_for_room", list_name="room_ids", num_args=3, - inlineCallbacks=True, ) - def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): + async def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): if not room_ids: return {} @@ -243,7 +242,7 @@ class ReceiptsWorkerStore(SQLBaseStore): return self.db_pool.cursor_to_dict(txn) - txn_results = yield self.db_pool.runInteraction( + txn_results = await self.db_pool.runInteraction( "_get_linearized_receipts_for_rooms", f ) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index b2fcfc9bfe..1cc8c08ed0 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -17,8 +17,6 @@ import logging from typing import TYPE_CHECKING, Awaitable, Iterable, List, Optional, Set -from twisted.internet import defer - from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext @@ -92,8 +90,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): lambda: self._known_servers_count, ) - @defer.inlineCallbacks - def _count_known_servers(self): + async def _count_known_servers(self): """ Count the servers that this server knows about. @@ -121,7 +118,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(query) return list(txn)[0][0] - count = yield self.db_pool.runInteraction("get_known_servers", _transact) + count = await self.db_pool.runInteraction("get_known_servers", _transact) # We always know about ourselves, even if we have nothing in # room_memberships (for example, the server is new). @@ -589,11 +586,9 @@ class RoomMemberWorkerStore(EventsWorkerStore): raise NotImplementedError() @cachedList( - cached_method_name="_get_joined_profile_from_event_id", - list_name="event_ids", - inlineCallbacks=True, + cached_method_name="_get_joined_profile_from_event_id", list_name="event_ids", ) - def _get_joined_profiles_from_event_ids(self, event_ids: Iterable[str]): + async def _get_joined_profiles_from_event_ids(self, event_ids: Iterable[str]): """For given set of member event_ids check if they point to a join event and if so return the associated user and profile info. @@ -601,11 +596,11 @@ class RoomMemberWorkerStore(EventsWorkerStore): event_ids: The member event IDs to lookup Returns: - Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID + dict[str, Tuple[str, ProfileInfo]|None]: Map from event ID to `user_id` and ProfileInfo (or None if not join event). """ - rows = yield self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="room_memberships", column="event_id", iterable=event_ids, diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 96e0378e50..991233a9bc 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -273,12 +273,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): cached_method_name="_get_state_group_for_event", list_name="event_ids", num_args=1, - inlineCallbacks=True, ) - def _get_state_group_for_events(self, event_ids): + async def _get_state_group_for_events(self, event_ids): """Returns mapping event_id -> state_group """ - rows = yield self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="event_to_state_groups", column="event_id", iterable=event_ids, diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py index ab6cb2c1f6..da23fe7355 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py @@ -38,10 +38,8 @@ class UserErasureWorkerStore(SQLBaseStore): desc="is_user_erased", ).addCallback(operator.truth) - @cachedList( - cached_method_name="is_user_erased", list_name="user_ids", inlineCallbacks=True - ) - def are_users_erased(self, user_ids): + @cachedList(cached_method_name="is_user_erased", list_name="user_ids") + async def are_users_erased(self, user_ids): """ Checks which users in a list have requested erasure @@ -49,14 +47,14 @@ class UserErasureWorkerStore(SQLBaseStore): user_ids (iterable[str]): full user id to check Returns: - Deferred[dict[str, bool]]: + dict[str, bool]: for each user, whether the user has requested erasure. """ # this serves the dual purpose of (a) making sure we can do len and # iterate it multiple times, and (b) avoiding duplicates. user_ids = tuple(set(user_ids)) - rows = yield self.db_pool.simple_select_many_batch( + rows = await self.db_pool.simple_select_many_batch( table="erased_users", column="user_id", iterable=user_ids, @@ -65,8 +63,7 @@ class UserErasureWorkerStore(SQLBaseStore): ) erased_users = {row["user_id"] for row in rows} - res = {u: u in erased_users for u in user_ids} - return res + return {u: u in erased_users for u in user_ids} class UserErasureStore(UserErasureWorkerStore): -- cgit 1.5.1 From b069b78bb4fc9ce005cc84099e208497a0789ddc Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 10:30:16 -0400 Subject: Convert pusher databases to async/await. (#8075) --- changelog.d/8075.misc | 1 + synapse/rest/client/v1/push_rule.py | 9 +-- synapse/storage/databases/main/push_rule.py | 80 ++++++++++++------------ synapse/storage/databases/main/pusher.py | 95 ++++++++++++++--------------- 4 files changed, 90 insertions(+), 95 deletions(-) create mode 100644 changelog.d/8075.misc (limited to 'synapse/storage/databases/main/pusher.py') diff --git a/changelog.d/8075.misc b/changelog.d/8075.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8075.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 00831879f3..e2df638cc5 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from synapse.api.errors import ( NotFoundError, StoreError, @@ -163,7 +162,7 @@ class PushRuleRestServlet(RestServlet): stream_id, _ = self.store.get_push_rules_stream_token() self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) - def set_rule_attr(self, user_id, spec, val): + async def set_rule_attr(self, user_id, spec, val): if spec["attr"] == "enabled": if isinstance(val, dict) and "enabled" in val: val = val["enabled"] @@ -173,7 +172,9 @@ class PushRuleRestServlet(RestServlet): # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) - return self.store.set_push_rule_enabled(user_id, namespaced_rule_id, val) + return await self.store.set_push_rule_enabled( + user_id, namespaced_rule_id, val + ) elif spec["attr"] == "actions": actions = val.get("actions") _check_actions(actions) @@ -188,7 +189,7 @@ class PushRuleRestServlet(RestServlet): if namespaced_rule_id not in rule_ids: raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) - return self.store.set_push_rule_actions( + return await self.store.set_push_rule_actions( user_id, namespaced_rule_id, actions, is_default_rule ) else: diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 6aa5802977..c2289a9557 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -32,7 +32,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.util.id_generators import ChainedIdGenerator from synapse.util import json_encoder -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -115,9 +115,9 @@ class PushRulesWorkerStore( """ raise NotImplementedError() - @cachedInlineCallbacks(max_entries=5000) - def get_push_rules_for_user(self, user_id): - rows = yield self.db_pool.simple_select_list( + @cached(max_entries=5000) + async def get_push_rules_for_user(self, user_id): + rows = await self.db_pool.simple_select_list( table="push_rules", keyvalues={"user_name": user_id}, retcols=( @@ -133,17 +133,15 @@ class PushRulesWorkerStore( rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) - enabled_map = yield self.get_push_rules_enabled_for_user(user_id) + enabled_map = await self.get_push_rules_enabled_for_user(user_id) use_new_defaults = user_id in self._users_new_default_push_rules - rules = _load_rules(rows, enabled_map, use_new_defaults) + return _load_rules(rows, enabled_map, use_new_defaults) - return rules - - @cachedInlineCallbacks(max_entries=5000) - def get_push_rules_enabled_for_user(self, user_id): - results = yield self.db_pool.simple_select_list( + @cached(max_entries=5000) + async def get_push_rules_enabled_for_user(self, user_id): + results = await self.db_pool.simple_select_list( table="push_rules_enable", keyvalues={"user_name": user_id}, retcols=("user_name", "rule_id", "enabled"), @@ -202,14 +200,15 @@ class PushRulesWorkerStore( return results - @defer.inlineCallbacks - def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): + async def copy_push_rule_from_room_to_room( + self, new_room_id: str, user_id: str, rule: dict + ) -> None: """Copy a single push rule from one room to another for a specific user. Args: - new_room_id (str): ID of the new room. - user_id (str): ID of user the push rule belongs to. - rule (Dict): A push rule. + new_room_id: ID of the new room. + user_id : ID of user the push rule belongs to. + rule: A push rule. """ # Create new rule id rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) @@ -221,7 +220,7 @@ class PushRulesWorkerStore( condition["pattern"] = new_room_id # Add the rule for the new room - yield self.add_push_rule( + await self.add_push_rule( user_id=user_id, rule_id=new_rule_id, priority_class=rule["priority_class"], @@ -229,20 +228,19 @@ class PushRulesWorkerStore( actions=rule["actions"], ) - @defer.inlineCallbacks - def copy_push_rules_from_room_to_room_for_user( - self, old_room_id, new_room_id, user_id - ): + async def copy_push_rules_from_room_to_room_for_user( + self, old_room_id: str, new_room_id: str, user_id: str + ) -> None: """Copy all of the push rules from one room to another for a specific user. Args: - old_room_id (str): ID of the old room. - new_room_id (str): ID of the new room. - user_id (str): ID of user to copy push rules for. + old_room_id: ID of the old room. + new_room_id: ID of the new room. + user_id: ID of user to copy push rules for. """ # Retrieve push rules for this user - user_push_rules = yield self.get_push_rules_for_user(user_id) + user_push_rules = await self.get_push_rules_for_user(user_id) # Get rules relating to the old room and copy them to the new room for rule in user_push_rules: @@ -251,7 +249,7 @@ class PushRulesWorkerStore( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions ): - yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) + await self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) @cachedList( cached_method_name="get_push_rules_enabled_for_user", @@ -328,8 +326,7 @@ class PushRulesWorkerStore( class PushRuleStore(PushRulesWorkerStore): - @defer.inlineCallbacks - def add_push_rule( + async def add_push_rule( self, user_id, rule_id, @@ -338,13 +335,13 @@ class PushRuleStore(PushRulesWorkerStore): actions, before=None, after=None, - ): + ) -> None: conditions_json = json_encoder.encode(conditions) actions_json = json_encoder.encode(actions) with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids if before or after: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_add_push_rule_relative_txn", self._add_push_rule_relative_txn, stream_id, @@ -358,7 +355,7 @@ class PushRuleStore(PushRulesWorkerStore): after, ) else: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_add_push_rule_highest_priority_txn", self._add_push_rule_highest_priority_txn, stream_id, @@ -542,16 +539,15 @@ class PushRuleStore(PushRulesWorkerStore): }, ) - @defer.inlineCallbacks - def delete_push_rule(self, user_id, rule_id): + async def delete_push_rule(self, user_id: str, rule_id: str) -> None: """ Delete a push rule. Args specify the row to be deleted and can be any of the columns in the push_rule table, but below are the standard ones Args: - user_id (str): The matrix ID of the push rule owner - rule_id (str): The rule_id of the rule to be deleted + user_id: The matrix ID of the push rule owner + rule_id: The rule_id of the rule to be deleted """ def delete_push_rule_txn(txn, stream_id, event_stream_ordering): @@ -565,18 +561,17 @@ class PushRuleStore(PushRulesWorkerStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering, ) - @defer.inlineCallbacks - def set_push_rule_enabled(self, user_id, rule_id, enabled): + async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None: with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, stream_id, @@ -607,8 +602,9 @@ class PushRuleStore(PushRulesWorkerStore): op="ENABLE" if enabled else "DISABLE", ) - @defer.inlineCallbacks - def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): + async def set_push_rule_actions( + self, user_id, rule_id, actions, is_default_rule + ) -> None: actions_json = json_encoder.encode(actions) def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): @@ -649,7 +645,7 @@ class PushRuleStore(PushRulesWorkerStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "set_push_rule_actions", set_push_rule_actions_txn, stream_id, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 8b793d1487..1126fd0751 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -19,10 +19,8 @@ from typing import Iterable, Iterator, List, Tuple from canonicaljson import encode_canonical_json -from twisted.internet import defer - from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList logger = logging.getLogger(__name__) @@ -34,23 +32,22 @@ class PusherWorkerStore(SQLBaseStore): Drops any rows whose data cannot be decoded """ for r in rows: - dataJson = r["data"] + data_json = r["data"] try: - r["data"] = db_to_json(dataJson) + r["data"] = db_to_json(data_json) except Exception as e: logger.warning( "Invalid JSON in data for pusher %d: %s, %s", r["id"], - dataJson, + data_json, e.args[0], ) continue yield r - @defer.inlineCallbacks - def user_has_pusher(self, user_id): - ret = yield self.db_pool.simple_select_one_onecol( + async def user_has_pusher(self, user_id): + ret = await self.db_pool.simple_select_one_onecol( "pushers", {"user_name": user_id}, "id", allow_none=True ) return ret is not None @@ -61,9 +58,8 @@ class PusherWorkerStore(SQLBaseStore): def get_pushers_by_user_id(self, user_id): return self.get_pushers_by({"user_name": user_id}) - @defer.inlineCallbacks - def get_pushers_by(self, keyvalues): - ret = yield self.db_pool.simple_select_list( + async def get_pushers_by(self, keyvalues): + ret = await self.db_pool.simple_select_list( "pushers", keyvalues, [ @@ -87,16 +83,14 @@ class PusherWorkerStore(SQLBaseStore): ) return self._decode_pushers_rows(ret) - @defer.inlineCallbacks - def get_all_pushers(self): + async def get_all_pushers(self): def get_pushers(txn): txn.execute("SELECT * FROM pushers") rows = self.db_pool.cursor_to_dict(txn) return self._decode_pushers_rows(rows) - rows = yield self.db_pool.runInteraction("get_all_pushers", get_pushers) - return rows + return await self.db_pool.runInteraction("get_all_pushers", get_pushers) async def get_all_updated_pushers_rows( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -164,8 +158,8 @@ class PusherWorkerStore(SQLBaseStore): "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) - @cachedInlineCallbacks(num_args=1, max_entries=15000) - def get_if_user_has_pusher(self, user_id): + @cached(num_args=1, max_entries=15000) + async def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator raise NotImplementedError() @@ -186,34 +180,38 @@ class PusherWorkerStore(SQLBaseStore): return result - @defer.inlineCallbacks - def update_pusher_last_stream_ordering( + async def update_pusher_last_stream_ordering( self, app_id, pushkey, user_id, last_stream_ordering - ): - yield self.db_pool.simple_update_one( + ) -> None: + await self.db_pool.simple_update_one( "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, {"last_stream_ordering": last_stream_ordering}, desc="update_pusher_last_stream_ordering", ) - @defer.inlineCallbacks - def update_pusher_last_stream_ordering_and_success( - self, app_id, pushkey, user_id, last_stream_ordering, last_success - ): + async def update_pusher_last_stream_ordering_and_success( + self, + app_id: str, + pushkey: str, + user_id: str, + last_stream_ordering: int, + last_success: int, + ) -> bool: """Update the last stream ordering position we've processed up to for the given pusher. Args: - app_id (str) - pushkey (str) - last_stream_ordering (int) - last_success (int) + app_id + pushkey + user_id + last_stream_ordering + last_success Returns: - Deferred[bool]: True if the pusher still exists; False if it has been deleted. + True if the pusher still exists; False if it has been deleted. """ - updated = yield self.db_pool.simple_update( + updated = await self.db_pool.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={ @@ -225,18 +223,18 @@ class PusherWorkerStore(SQLBaseStore): return bool(updated) - @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self.db_pool.simple_update( + async def update_pusher_failing_since( + self, app_id, pushkey, user_id, failing_since + ) -> None: + await self.db_pool.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={"failing_since": failing_since}, desc="update_pusher_failing_since", ) - @defer.inlineCallbacks - def get_throttle_params_by_room(self, pusher_id): - res = yield self.db_pool.simple_select_list( + async def get_throttle_params_by_room(self, pusher_id): + res = await self.db_pool.simple_select_list( "pusher_throttle", {"pusher": pusher_id}, ["room_id", "last_sent_ts", "throttle_ms"], @@ -252,11 +250,10 @@ class PusherWorkerStore(SQLBaseStore): return params_by_room - @defer.inlineCallbacks - def set_throttle_params(self, pusher_id, room_id, params): + async def set_throttle_params(self, pusher_id, room_id, params) -> None: # no need to lock because `pusher_throttle` has a primary key on # (pusher, room_id) so simple_upsert will retry - yield self.db_pool.simple_upsert( + await self.db_pool.simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, params, @@ -269,8 +266,7 @@ class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self): return self._pushers_id_gen.get_current_token() - @defer.inlineCallbacks - def add_pusher( + async def add_pusher( self, user_id, access_token, @@ -284,11 +280,11 @@ class PusherStore(PusherWorkerStore): data, last_stream_ordering, profile_tag="", - ): + ) -> None: with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so simple_upsert will retry - yield self.db_pool.simple_upsert( + await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ @@ -313,15 +309,16 @@ class PusherStore(PusherWorkerStore): if user_has_pusher is not True: # invalidate, since we the user might not have had a pusher before - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "add_pusher", self._invalidate_cache_and_stream, self.get_if_user_has_pusher, (user_id,), ) - @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): + async def delete_pusher_by_app_id_pushkey_user_id( + self, app_id, pushkey, user_id + ) -> None: def delete_pusher_txn(txn, stream_id): self._invalidate_cache_and_stream( txn, self.get_if_user_has_pusher, (user_id,) @@ -348,6 +345,6 @@ class PusherStore(PusherWorkerStore): ) with self._pushers_id_gen.get_next() as stream_id: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_pusher", delete_pusher_txn, stream_id ) -- cgit 1.5.1 From 2231dffee6788836c86e868dd29574970b13dd18 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 25 Aug 2020 15:10:08 +0100 Subject: Make StreamIdGen `get_next` and `get_next_mult` async (#8161) This is mainly so that `StreamIdGenerator` and `MultiWriterIdGenerator` will have the same interface, allowing them to be used interchangeably. --- changelog.d/8161.misc | 1 + synapse/storage/databases/main/account_data.py | 4 +-- synapse/storage/databases/main/deviceinbox.py | 4 +-- synapse/storage/databases/main/devices.py | 8 +++-- synapse/storage/databases/main/end_to_end_keys.py | 43 ++++++++++++----------- synapse/storage/databases/main/events.py | 4 +-- synapse/storage/databases/main/group_server.py | 2 +- synapse/storage/databases/main/presence.py | 2 +- synapse/storage/databases/main/push_rule.py | 8 ++--- synapse/storage/databases/main/pusher.py | 4 +-- synapse/storage/databases/main/receipts.py | 3 +- synapse/storage/databases/main/room.py | 6 ++-- synapse/storage/databases/main/tags.py | 4 +-- synapse/storage/util/id_generators.py | 10 +++--- 14 files changed, 54 insertions(+), 49 deletions(-) create mode 100644 changelog.d/8161.misc (limited to 'synapse/storage/databases/main/pusher.py') diff --git a/changelog.d/8161.misc b/changelog.d/8161.misc new file mode 100644 index 0000000000..89ff274de3 --- /dev/null +++ b/changelog.d/8161.misc @@ -0,0 +1 @@ +Refactor `StreamIdGenerator` and `MultiWriterIdGenerator` to have the same interface. diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 82aac2bbf3..04042a2c98 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -336,7 +336,7 @@ class AccountDataStore(AccountDataWorkerStore): """ content_json = json_encoder.encode(content) - with self._account_data_id_gen.get_next() as next_id: + with await self._account_data_id_gen.get_next() as next_id: # no need to lock here as room_account_data has a unique constraint # on (user_id, room_id, account_data_type) so simple_upsert will # retry if there is a conflict. @@ -384,7 +384,7 @@ class AccountDataStore(AccountDataWorkerStore): """ content_json = json_encoder.encode(content) - with self._account_data_id_gen.get_next() as next_id: + with await self._account_data_id_gen.get_next() as next_id: # no need to lock here as account_data has a unique constraint on # (user_id, account_data_type) so simple_upsert will retry if # there is a conflict. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 1f6e995c4f..bb85637a95 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -362,7 +362,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) rows.append((destination, stream_id, now_ms, edu_json)) txn.executemany(sql, rows) - with self._device_inbox_id_gen.get_next() as stream_id: + with await self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() await self.db_pool.runInteraction( "add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id @@ -411,7 +411,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) txn, stream_id, local_messages_by_user_then_device ) - with self._device_inbox_id_gen.get_next() as stream_id: + with await self._device_inbox_id_gen.get_next() as stream_id: now_ms = self.clock.time_msec() await self.db_pool.runInteraction( "add_messages_from_remote_to_device_inbox", diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9a786e2929..03b45dbc4d 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -380,7 +380,7 @@ class DeviceWorkerStore(SQLBaseStore): THe new stream ID. """ - with self._device_list_id_gen.get_next() as stream_id: + with await self._device_list_id_gen.get_next() as stream_id: await self.db_pool.runInteraction( "add_user_sig_change_to_streams", self._add_user_signature_change_txn, @@ -1146,7 +1146,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): if not device_ids: return - with self._device_list_id_gen.get_next_mult(len(device_ids)) as stream_ids: + with await self._device_list_id_gen.get_next_mult( + len(device_ids) + ) as stream_ids: await self.db_pool.runInteraction( "add_device_change_to_stream", self._add_device_change_to_stream_txn, @@ -1159,7 +1161,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): return stream_ids[-1] context = get_active_span_text_map() - with self._device_list_id_gen.get_next_mult( + with await self._device_list_id_gen.get_next_mult( len(hosts) * len(device_ids) ) as stream_ids: await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index f93e0d320d..385868bdab 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -648,7 +648,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): "delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn ) - def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key): + def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key, stream_id): """Set a user's cross-signing key. Args: @@ -658,6 +658,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): for a master key, 'self_signing' for a self-signing key, or 'user_signing' for a user-signing key key (dict): the key data + stream_id (int) """ # the 'key' dict will look something like: # { @@ -695,23 +696,22 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ) # and finally, store the key itself - with self._cross_signing_id_gen.get_next() as stream_id: - self.db_pool.simple_insert_txn( - txn, - "e2e_cross_signing_keys", - values={ - "user_id": user_id, - "keytype": key_type, - "keydata": json_encoder.encode(key), - "stream_id": stream_id, - }, - ) + self.db_pool.simple_insert_txn( + txn, + "e2e_cross_signing_keys", + values={ + "user_id": user_id, + "keytype": key_type, + "keydata": json_encoder.encode(key), + "stream_id": stream_id, + }, + ) self._invalidate_cache_and_stream( txn, self._get_bare_e2e_cross_signing_keys, (user_id,) ) - def set_e2e_cross_signing_key(self, user_id, key_type, key): + async def set_e2e_cross_signing_key(self, user_id, key_type, key): """Set a user's cross-signing key. Args: @@ -719,13 +719,16 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): key_type (str): the type of cross-signing key to set key (dict): the key data """ - return self.db_pool.runInteraction( - "add_e2e_cross_signing_key", - self._set_e2e_cross_signing_key_txn, - user_id, - key_type, - key, - ) + + with await self._cross_signing_id_gen.get_next() as stream_id: + return await self.db_pool.runInteraction( + "add_e2e_cross_signing_key", + self._set_e2e_cross_signing_key_txn, + user_id, + key_type, + key, + stream_id, + ) def store_e2e_cross_signing_signatures(self, user_id, signatures): """Stores cross-signing signatures. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b90e6de2d5..6313b41eef 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -153,11 +153,11 @@ class PersistEventsStore: # Note: Multiple instances of this function cannot be in flight at # the same time for the same room. if backfilled: - stream_ordering_manager = self._backfill_id_gen.get_next_mult( + stream_ordering_manager = await self._backfill_id_gen.get_next_mult( len(events_and_contexts) ) else: - stream_ordering_manager = self._stream_id_gen.get_next_mult( + stream_ordering_manager = await self._stream_id_gen.get_next_mult( len(events_and_contexts) ) diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index 0e3b8739c6..a488e0924b 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -1182,7 +1182,7 @@ class GroupServerStore(GroupServerWorkerStore): return next_id - with self._group_updates_id_gen.get_next() as next_id: + with await self._group_updates_id_gen.get_next() as next_id: res = await self.db_pool.runInteraction( "register_user_group_membership", _register_user_group_membership_txn, diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 4e3ec02d14..c9f655dfb7 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -23,7 +23,7 @@ from synapse.util.iterutils import batch_iter class PresenceStore(SQLBaseStore): async def update_presence(self, presence_states): - stream_ordering_manager = self._presence_id_gen.get_next_mult( + stream_ordering_manager = await self._presence_id_gen.get_next_mult( len(presence_states) ) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index a585e54812..2fb5b02d7d 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -338,7 +338,7 @@ class PushRuleStore(PushRulesWorkerStore): ) -> None: conditions_json = json_encoder.encode(conditions) actions_json = json_encoder.encode(actions) - with self._push_rules_stream_id_gen.get_next() as stream_id: + with await self._push_rules_stream_id_gen.get_next() as stream_id: event_stream_ordering = self._stream_id_gen.get_current_token() if before or after: @@ -560,7 +560,7 @@ class PushRuleStore(PushRulesWorkerStore): txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE" ) - with self._push_rules_stream_id_gen.get_next() as stream_id: + with await self._push_rules_stream_id_gen.get_next() as stream_id: event_stream_ordering = self._stream_id_gen.get_current_token() await self.db_pool.runInteraction( @@ -571,7 +571,7 @@ class PushRuleStore(PushRulesWorkerStore): ) async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None: - with self._push_rules_stream_id_gen.get_next() as stream_id: + with await self._push_rules_stream_id_gen.get_next() as stream_id: event_stream_ordering = self._stream_id_gen.get_current_token() await self.db_pool.runInteraction( @@ -646,7 +646,7 @@ class PushRuleStore(PushRulesWorkerStore): data={"actions": actions_json}, ) - with self._push_rules_stream_id_gen.get_next() as stream_id: + with await self._push_rules_stream_id_gen.get_next() as stream_id: event_stream_ordering = self._stream_id_gen.get_current_token() await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 1126fd0751..c388468273 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -281,7 +281,7 @@ class PusherStore(PusherWorkerStore): last_stream_ordering, profile_tag="", ) -> None: - with self._pushers_id_gen.get_next() as stream_id: + with await self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so simple_upsert will retry await self.db_pool.simple_upsert( @@ -344,7 +344,7 @@ class PusherStore(PusherWorkerStore): }, ) - with self._pushers_id_gen.get_next() as stream_id: + with await self._pushers_id_gen.get_next() as stream_id: await self.db_pool.runInteraction( "delete_pusher", delete_pusher_txn, stream_id ) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 19ad1c056f..6821476ee0 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -520,8 +520,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "insert_receipt_conv", graph_to_linear ) - stream_id_manager = self._receipts_id_gen.get_next() - with stream_id_manager as stream_id: + with await self._receipts_id_gen.get_next() as stream_id: event_ts = await self.db_pool.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 7d3ac47261..b3772be2b2 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1129,7 +1129,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): }, ) - with self._public_room_id_gen.get_next() as next_id: + with await self._public_room_id_gen.get_next() as next_id: await self.db_pool.runInteraction( "store_room_txn", store_room_txn, next_id ) @@ -1196,7 +1196,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): }, ) - with self._public_room_id_gen.get_next() as next_id: + with await self._public_room_id_gen.get_next() as next_id: await self.db_pool.runInteraction( "set_room_is_public", set_room_is_public_txn, next_id ) @@ -1276,7 +1276,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): }, ) - with self._public_room_id_gen.get_next() as next_id: + with await self._public_room_id_gen.get_next() as next_id: await self.db_pool.runInteraction( "set_room_is_public_appservice", set_room_is_public_appservice_txn, diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index ade7abc927..0c34bbf21a 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -210,7 +210,7 @@ class TagsStore(TagsWorkerStore): ) self._update_revision_txn(txn, user_id, room_id, next_id) - with self._account_data_id_gen.get_next() as next_id: + with await self._account_data_id_gen.get_next() as next_id: await self.db_pool.runInteraction("add_tag", add_tag_txn, next_id) self.get_tags_for_user.invalidate((user_id,)) @@ -232,7 +232,7 @@ class TagsStore(TagsWorkerStore): txn.execute(sql, (user_id, room_id, tag)) self._update_revision_txn(txn, user_id, room_id, next_id) - with self._account_data_id_gen.get_next() as next_id: + with await self._account_data_id_gen.get_next() as next_id: await self.db_pool.runInteraction("remove_tag", remove_tag_txn, next_id) self.get_tags_for_user.invalidate((user_id,)) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 0bf772d4d1..ddb5c8c60c 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -80,7 +80,7 @@ class StreamIdGenerator(object): upwards, -1 to grow downwards. Usage: - with stream_id_gen.get_next() as stream_id: + with await stream_id_gen.get_next() as stream_id: # ... persist event ... """ @@ -95,10 +95,10 @@ class StreamIdGenerator(object): ) self._unfinished_ids = deque() # type: Deque[int] - def get_next(self): + async def get_next(self): """ Usage: - with stream_id_gen.get_next() as stream_id: + with await stream_id_gen.get_next() as stream_id: # ... persist event ... """ with self._lock: @@ -117,10 +117,10 @@ class StreamIdGenerator(object): return manager() - def get_next_mult(self, n): + async def get_next_mult(self, n): """ Usage: - with stream_id_gen.get_next(n) as stream_ids: + with await stream_id_gen.get_next(n) as stream_ids: # ... persist events ... """ with self._lock: -- cgit 1.5.1