summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/account_data.py16
-rw-r--r--synapse/storage/data_stores/main/monthly_active_users.py25
-rw-r--r--synapse/storage/data_stores/main/receipts.py13
-rw-r--r--synapse/storage/data_stores/main/registration.py23
-rw-r--r--synapse/storage/data_stores/main/tags.py3
-rw-r--r--synapse/storage/data_stores/state/bg_updates.py12
-rw-r--r--synapse/storage/prepare_database.py6
7 files changed, 55 insertions, 43 deletions
diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py

index f9eef1b78e..b58f04d00d 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py
@@ -297,7 +297,13 @@ class AccountDataWorkerStore(SQLBaseStore): class AccountDataStore(AccountDataWorkerStore): def __init__(self, database: Database, db_conn, hs): self._account_data_id_gen = StreamIdGenerator( - db_conn, "account_data_max_stream_id", "stream_id" + db_conn, + "account_data_max_stream_id", + "stream_id", + extra_tables=[ + ("room_account_data", "stream_id"), + ("room_tags_revisions", "stream_id"), + ], ) super(AccountDataStore, self).__init__(database, db_conn, hs) @@ -387,6 +393,10 @@ class AccountDataStore(AccountDataWorkerStore): # doesn't sound any worse than the whole update getting lost, # which is what would happen if we combined the two into one # transaction. + # + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. yield self._update_max_stream_id(next_id) self._account_data_stream_cache.entity_has_changed(user_id, next_id) @@ -405,6 +415,10 @@ class AccountDataStore(AccountDataWorkerStore): next_id(int): The the revision to advance to. """ + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. + def _update(txn): update_max_id_sql = ( "UPDATE account_data_max_stream_id" diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py
index 1310d39069..e459cf49a0 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py
@@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from twisted.internet import defer @@ -77,20 +78,19 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore): return self.db.runInteraction("count_users_by_service", _count_users_by_service) - @defer.inlineCallbacks - def get_registered_reserved_users(self): - """Of the reserved threepids defined in config, which are associated - with registered users? + async def get_registered_reserved_users(self) -> List[str]: + """Of the reserved threepids defined in config, retrieve those that are associated + with registered users Returns: - Defered[list]: Real reserved users + User IDs of actual users that are reserved """ users = [] for tp in self.hs.config.mau_limits_reserved_threepids[ : self.hs.config.max_mau_value ]: - user_id = yield self.hs.get_datastore().get_user_id_by_threepid( + user_id = await self.hs.get_datastore().get_user_id_by_threepid( tp["medium"], tp["address"] ) if user_id: @@ -171,13 +171,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): else: logger.warning("mau limit reserved threepid %s not found in db" % tp) - @defer.inlineCallbacks - def reap_monthly_active_users(self): + async def reap_monthly_active_users(self): """Cleans out monthly active user table to ensure that no stale entries exist. - - Returns: - Deferred[] """ def _reap_users(txn, reserved_users): @@ -249,8 +245,8 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): ) self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ()) - reserved_users = yield self.get_registered_reserved_users() - yield self.db.runInteraction( + reserved_users = await self.get_registered_reserved_users() + await self.db.runInteraction( "reap_monthly_active_users", _reap_users, reserved_users ) @@ -261,6 +257,9 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore): Args: user_id (str): user to add/update + + Returns: + Deferred """ # Support user never to be included in MAU stats. Note I can't easily call this # from upsert_monthly_active_user_txn because then I need a _txn form of diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0d932a0672..cebdcd409f 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py
@@ -391,7 +391,7 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type), ) - self.db.simple_delete_txn( + self.db.simple_upsert_txn( txn, table="receipts_linearized", keyvalues={ @@ -399,19 +399,14 @@ class ReceiptsStore(ReceiptsWorkerStore): "receipt_type": receipt_type, "user_id": user_id, }, - ) - - self.db.simple_insert_txn( - txn, - table="receipts_linearized", values={ "stream_id": stream_id, - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, "event_id": event_id, "data": json.dumps(data), }, + # receipts_linearized has a unique constraint on + # (user_id, room_id, receipt_type), so no need to lock + lock=False, ) if receipt_type == "m.read" and stream_ordering is not None: diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 1f1a7b4e36..ab70776977 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py
@@ -17,7 +17,7 @@ import logging import re -from typing import List +from typing import List, Optional from six import iterkeys @@ -423,7 +423,7 @@ class RegistrationWorkerStore(SQLBaseStore): ) return res - @cachedInlineCallbacks() + @cached() def is_support_user(self, user_id): """Determines if the user is of type UserTypes.SUPPORT @@ -433,10 +433,9 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: Deferred[bool]: True if user is of type UserTypes.SUPPORT """ - res = yield self.db.runInteraction( + return self.db.runInteraction( "is_support_user", self.is_support_user_txn, user_id ) - return res def is_real_user_txn(self, txn, user_id): res = self.db.simple_select_one_onecol_txn( @@ -597,18 +596,17 @@ class RegistrationWorkerStore(SQLBaseStore): ) ) - @defer.inlineCallbacks - def get_user_id_by_threepid(self, medium, address): + async def get_user_id_by_threepid(self, medium: str, address: str) -> Optional[str]: """Returns user id from threepid Args: - medium (str): threepid medium e.g. email - address (str): threepid address e.g. me@example.com + medium: threepid medium e.g. email + address: threepid address e.g. me@example.com Returns: - Deferred[str|None]: user id or None if no user id/threepid mapping exists + The user ID or None if no user id/threepid mapping exists """ - user_id = yield self.db.runInteraction( + user_id = await self.db.runInteraction( "get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address ) return user_id @@ -1074,7 +1072,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): Args: user_id (str): The desired user ID to register. - password_hash (str): Optional. The password hash for this user. + password_hash (str|None): Optional. The password hash for this user. was_guest (bool): Optional. Whether this is a guest account being upgraded to a non-guest account. make_guest (boolean): True if the the new user should be guest, @@ -1088,6 +1086,9 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. + + Returns: + Deferred """ return self.db.runInteraction( "register_user", diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 2aa1bafd48..4219018302 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py
@@ -233,6 +233,9 @@ class TagsStore(TagsWorkerStore): self._account_data_stream_cache.entity_has_changed, user_id, next_id ) + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. update_max_id_sql = ( "UPDATE account_data_max_stream_id" " SET stream_id = ?" diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index e8edaf9f7b..ff000bc9ec 100644 --- a/synapse/storage/data_stores/state/bg_updates.py +++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -109,20 +109,20 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): SELECT prev_state_group FROM state_group_edges e, state s WHERE s.state_group = e.state_group ) - SELECT DISTINCT type, state_key, last_value(event_id) OVER ( - PARTITION BY type, state_key ORDER BY state_group ASC - ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING - ) AS event_id FROM state_groups_state + SELECT DISTINCT ON (type, state_key) + type, state_key, event_id + FROM state_groups_state WHERE state_group IN ( SELECT state_group FROM state - ) + ) %s + ORDER BY type, state_key, state_group DESC """ for group in groups: args = [group] args.extend(where_args) - txn.execute(sql + where_clause, args) + txn.execute(sql % (where_clause,), args) for row in txn: typ, state_key, event_id = row key = (typ, state_key) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 9afc145340..9cc3b51fe6 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py
@@ -33,6 +33,7 @@ logger = logging.getLogger(__name__) # schema files, so the users will be informed on server restarts. # XXX: If you're about to bump this to 59 (or higher) please create an update # that drops the unused `cache_invalidation_stream` table, as per #7436! +# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! SCHEMA_VERSION = 58 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -366,9 +367,8 @@ def _upgrade_existing_database( if duplicates: # We don't support using the same file name in the same delta version. raise PrepareDatabaseException( - "Found multiple delta files with the same name in v%d: %s", - v, - duplicates, + "Found multiple delta files with the same name in v%d: %s" + % (v, duplicates,) ) # We sort to ensure that we apply the delta files in a consistent