diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py
index f9eef1b78e..b58f04d00d 100644
--- a/synapse/storage/data_stores/main/account_data.py
+++ b/synapse/storage/data_stores/main/account_data.py
@@ -297,7 +297,13 @@ class AccountDataWorkerStore(SQLBaseStore):
class AccountDataStore(AccountDataWorkerStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = StreamIdGenerator(
- db_conn, "account_data_max_stream_id", "stream_id"
+ db_conn,
+ "account_data_max_stream_id",
+ "stream_id",
+ extra_tables=[
+ ("room_account_data", "stream_id"),
+ ("room_tags_revisions", "stream_id"),
+ ],
)
super(AccountDataStore, self).__init__(database, db_conn, hs)
@@ -387,6 +393,10 @@ class AccountDataStore(AccountDataWorkerStore):
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
+ #
+ # Note: This is only here for backwards compat to allow admins to
+ # roll back to a previous Synapse version. Next time we update the
+ # database version we can remove this table.
yield self._update_max_stream_id(next_id)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
@@ -405,6 +415,10 @@ class AccountDataStore(AccountDataWorkerStore):
next_id(int): The the revision to advance to.
"""
+ # Note: This is only here for backwards compat to allow admins to
+ # roll back to a previous Synapse version. Next time we update the
+ # database version we can remove this table.
+
def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 1310d39069..e459cf49a0 100644
--- a/synapse/storage/data_stores/main/monthly_active_users.py
+++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import List
from twisted.internet import defer
@@ -77,20 +78,19 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
return self.db.runInteraction("count_users_by_service", _count_users_by_service)
- @defer.inlineCallbacks
- def get_registered_reserved_users(self):
- """Of the reserved threepids defined in config, which are associated
- with registered users?
+ async def get_registered_reserved_users(self) -> List[str]:
+ """Of the reserved threepids defined in config, retrieve those that are associated
+ with registered users
Returns:
- Defered[list]: Real reserved users
+ User IDs of actual users that are reserved
"""
users = []
for tp in self.hs.config.mau_limits_reserved_threepids[
: self.hs.config.max_mau_value
]:
- user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
+ user_id = await self.hs.get_datastore().get_user_id_by_threepid(
tp["medium"], tp["address"]
)
if user_id:
@@ -171,13 +171,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
else:
logger.warning("mau limit reserved threepid %s not found in db" % tp)
- @defer.inlineCallbacks
- def reap_monthly_active_users(self):
+ async def reap_monthly_active_users(self):
"""Cleans out monthly active user table to ensure that no stale
entries exist.
-
- Returns:
- Deferred[]
"""
def _reap_users(txn, reserved_users):
@@ -249,8 +245,8 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
)
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
- reserved_users = yield self.get_registered_reserved_users()
- yield self.db.runInteraction(
+ reserved_users = await self.get_registered_reserved_users()
+ await self.db.runInteraction(
"reap_monthly_active_users", _reap_users, reserved_users
)
@@ -261,6 +257,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
Args:
user_id (str): user to add/update
+
+ Returns:
+ Deferred
"""
# Support user never to be included in MAU stats. Note I can't easily call this
# from upsert_monthly_active_user_txn because then I need a _txn form of
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0d932a0672..cebdcd409f 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -391,7 +391,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
(user_id, room_id, receipt_type),
)
- self.db.simple_delete_txn(
+ self.db.simple_upsert_txn(
txn,
table="receipts_linearized",
keyvalues={
@@ -399,19 +399,14 @@ class ReceiptsStore(ReceiptsWorkerStore):
"receipt_type": receipt_type,
"user_id": user_id,
},
- )
-
- self.db.simple_insert_txn(
- txn,
- table="receipts_linearized",
values={
"stream_id": stream_id,
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
"event_id": event_id,
"data": json.dumps(data),
},
+ # receipts_linearized has a unique constraint on
+ # (user_id, room_id, receipt_type), so no need to lock
+ lock=False,
)
if receipt_type == "m.read" and stream_ordering is not None:
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 1f1a7b4e36..ab70776977 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -17,7 +17,7 @@
import logging
import re
-from typing import List
+from typing import List, Optional
from six import iterkeys
@@ -423,7 +423,7 @@ class RegistrationWorkerStore(SQLBaseStore):
)
return res
- @cachedInlineCallbacks()
+ @cached()
def is_support_user(self, user_id):
"""Determines if the user is of type UserTypes.SUPPORT
@@ -433,10 +433,9 @@ class RegistrationWorkerStore(SQLBaseStore):
Returns:
Deferred[bool]: True if user is of type UserTypes.SUPPORT
"""
- res = yield self.db.runInteraction(
+ return self.db.runInteraction(
"is_support_user", self.is_support_user_txn, user_id
)
- return res
def is_real_user_txn(self, txn, user_id):
res = self.db.simple_select_one_onecol_txn(
@@ -597,18 +596,17 @@ class RegistrationWorkerStore(SQLBaseStore):
)
)
- @defer.inlineCallbacks
- def get_user_id_by_threepid(self, medium, address):
+ async def get_user_id_by_threepid(self, medium: str, address: str) -> Optional[str]:
"""Returns user id from threepid
Args:
- medium (str): threepid medium e.g. email
- address (str): threepid address e.g. me@example.com
+ medium: threepid medium e.g. email
+ address: threepid address e.g. me@example.com
Returns:
- Deferred[str|None]: user id or None if no user id/threepid mapping exists
+ The user ID or None if no user id/threepid mapping exists
"""
- user_id = yield self.db.runInteraction(
+ user_id = await self.db.runInteraction(
"get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address
)
return user_id
@@ -1074,7 +1072,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
Args:
user_id (str): The desired user ID to register.
- password_hash (str): Optional. The password hash for this user.
+ password_hash (str|None): Optional. The password hash for this user.
was_guest (bool): Optional. Whether this is a guest account being
upgraded to a non-guest account.
make_guest (boolean): True if the the new user should be guest,
@@ -1088,6 +1086,9 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
Raises:
StoreError if the user_id could not be registered.
+
+ Returns:
+ Deferred
"""
return self.db.runInteraction(
"register_user",
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 2aa1bafd48..4219018302 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -233,6 +233,9 @@ class TagsStore(TagsWorkerStore):
self._account_data_stream_cache.entity_has_changed, user_id, next_id
)
+ # Note: This is only here for backwards compat to allow admins to
+ # roll back to a previous Synapse version. Next time we update the
+ # database version we can remove this table.
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index e8edaf9f7b..ff000bc9ec 100644
--- a/synapse/storage/data_stores/state/bg_updates.py
+++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -109,20 +109,20 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
SELECT prev_state_group FROM state_group_edges e, state s
WHERE s.state_group = e.state_group
)
- SELECT DISTINCT type, state_key, last_value(event_id) OVER (
- PARTITION BY type, state_key ORDER BY state_group ASC
- ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
- ) AS event_id FROM state_groups_state
+ SELECT DISTINCT ON (type, state_key)
+ type, state_key, event_id
+ FROM state_groups_state
WHERE state_group IN (
SELECT state_group FROM state
- )
+ ) %s
+ ORDER BY type, state_key, state_group DESC
"""
for group in groups:
args = [group]
args.extend(where_args)
- txn.execute(sql + where_clause, args)
+ txn.execute(sql % (where_clause,), args)
for row in txn:
typ, state_key, event_id = row
key = (typ, state_key)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 9afc145340..9cc3b51fe6 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -33,6 +33,7 @@ logger = logging.getLogger(__name__)
# schema files, so the users will be informed on server restarts.
# XXX: If you're about to bump this to 59 (or higher) please create an update
# that drops the unused `cache_invalidation_stream` table, as per #7436!
+# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
SCHEMA_VERSION = 58
dir_path = os.path.abspath(os.path.dirname(__file__))
@@ -366,9 +367,8 @@ def _upgrade_existing_database(
if duplicates:
# We don't support using the same file name in the same delta version.
raise PrepareDatabaseException(
- "Found multiple delta files with the same name in v%d: %s",
- v,
- duplicates,
+ "Found multiple delta files with the same name in v%d: %s"
+ % (v, duplicates,)
)
# We sort to ensure that we apply the delta files in a consistent
|