diff options
Diffstat (limited to 'synapse/storage')
43 files changed, 508 insertions, 507 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c02248cfe9..9205e550bb 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -262,7 +262,7 @@ class SQLBaseStore(object): If the background updates have not completed, wait 15 sec and check again. """ - updates = yield self._simple_select_list( + updates = yield self.simple_select_list( "background_updates", keyvalues=None, retcols=["update_name"], @@ -307,7 +307,7 @@ class SQLBaseStore(object): self._clock.looping_call(loop, 10000) - def _new_transaction( + def new_transaction( self, conn, desc, after_callbacks, exception_callbacks, func, *args, **kwargs ): start = monotonic_time() @@ -444,7 +444,7 @@ class SQLBaseStore(object): try: result = yield self.runWithConnection( - self._new_transaction, + self.new_transaction, desc, after_callbacks, exception_callbacks, @@ -516,7 +516,7 @@ class SQLBaseStore(object): results = list(dict(zip(col_headers, row)) for row in cursor) return results - def _execute(self, desc, decoder, query, *args): + def execute(self, desc, decoder, query, *args): """Runs a single query for a result set. Args: @@ -541,7 +541,7 @@ class SQLBaseStore(object): # no complex WHERE clauses, just a dict of values for columns. @defer.inlineCallbacks - def _simple_insert(self, table, values, or_ignore=False, desc="_simple_insert"): + def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"): """Executes an INSERT query on the named table. Args: @@ -557,7 +557,7 @@ class SQLBaseStore(object): `or_ignore` is True """ try: - yield self.runInteraction(desc, self._simple_insert_txn, table, values) + yield self.runInteraction(desc, self.simple_insert_txn, table, values) except self.database_engine.module.IntegrityError: # We have to do or_ignore flag at this layer, since we can't reuse # a cursor after we receive an error from the db. @@ -567,7 +567,7 @@ class SQLBaseStore(object): return True @staticmethod - def _simple_insert_txn(txn, table, values): + def simple_insert_txn(txn, table, values): keys, vals = zip(*values.items()) sql = "INSERT INTO %s (%s) VALUES(%s)" % ( @@ -578,11 +578,11 @@ class SQLBaseStore(object): txn.execute(sql, vals) - def _simple_insert_many(self, table, values, desc): - return self.runInteraction(desc, self._simple_insert_many_txn, table, values) + def simple_insert_many(self, table, values, desc): + return self.runInteraction(desc, self.simple_insert_many_txn, table, values) @staticmethod - def _simple_insert_many_txn(txn, table, values): + def simple_insert_many_txn(txn, table, values): if not values: return @@ -611,13 +611,13 @@ class SQLBaseStore(object): txn.executemany(sql, vals) @defer.inlineCallbacks - def _simple_upsert( + def simple_upsert( self, table, keyvalues, values, insertion_values={}, - desc="_simple_upsert", + desc="simple_upsert", lock=True, ): """ @@ -649,7 +649,7 @@ class SQLBaseStore(object): try: result = yield self.runInteraction( desc, - self._simple_upsert_txn, + self.simple_upsert_txn, table, keyvalues, values, @@ -669,7 +669,7 @@ class SQLBaseStore(object): "IntegrityError when upserting into %s; retrying: %s", table, e ) - def _simple_upsert_txn( + def simple_upsert_txn( self, txn, table, keyvalues, values, insertion_values={}, lock=True ): """ @@ -693,11 +693,11 @@ class SQLBaseStore(object): self.database_engine.can_native_upsert and table not in self._unsafe_to_upsert_tables ): - return self._simple_upsert_txn_native_upsert( + return self.simple_upsert_txn_native_upsert( txn, table, keyvalues, values, insertion_values=insertion_values ) else: - return self._simple_upsert_txn_emulated( + return self.simple_upsert_txn_emulated( txn, table, keyvalues, @@ -706,7 +706,7 @@ class SQLBaseStore(object): lock=lock, ) - def _simple_upsert_txn_emulated( + def simple_upsert_txn_emulated( self, txn, table, keyvalues, values, insertion_values={}, lock=True ): """ @@ -775,7 +775,7 @@ class SQLBaseStore(object): # successfully inserted return True - def _simple_upsert_txn_native_upsert( + def simple_upsert_txn_native_upsert( self, txn, table, keyvalues, values, insertion_values={} ): """ @@ -809,7 +809,7 @@ class SQLBaseStore(object): ) txn.execute(sql, list(allvalues.values())) - def _simple_upsert_many_txn( + def simple_upsert_many_txn( self, txn, table, key_names, key_values, value_names, value_values ): """ @@ -829,15 +829,15 @@ class SQLBaseStore(object): self.database_engine.can_native_upsert and table not in self._unsafe_to_upsert_tables ): - return self._simple_upsert_many_txn_native_upsert( + return self.simple_upsert_many_txn_native_upsert( txn, table, key_names, key_values, value_names, value_values ) else: - return self._simple_upsert_many_txn_emulated( + return self.simple_upsert_many_txn_emulated( txn, table, key_names, key_values, value_names, value_values ) - def _simple_upsert_many_txn_emulated( + def simple_upsert_many_txn_emulated( self, txn, table, key_names, key_values, value_names, value_values ): """ @@ -862,9 +862,9 @@ class SQLBaseStore(object): _keys = {x: y for x, y in zip(key_names, keyv)} _vals = {x: y for x, y in zip(value_names, valv)} - self._simple_upsert_txn_emulated(txn, table, _keys, _vals) + self.simple_upsert_txn_emulated(txn, table, _keys, _vals) - def _simple_upsert_many_txn_native_upsert( + def simple_upsert_many_txn_native_upsert( self, txn, table, key_names, key_values, value_names, value_values ): """ @@ -909,8 +909,8 @@ class SQLBaseStore(object): return txn.execute_batch(sql, args) - def _simple_select_one( - self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one" + def simple_select_one( + self, table, keyvalues, retcols, allow_none=False, desc="simple_select_one" ): """Executes a SELECT query on the named table, which is expected to return a single row, returning multiple columns from it. @@ -924,16 +924,16 @@ class SQLBaseStore(object): statement returns no rows """ return self.runInteraction( - desc, self._simple_select_one_txn, table, keyvalues, retcols, allow_none + desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none ) - def _simple_select_one_onecol( + def simple_select_one_onecol( self, table, keyvalues, retcol, allow_none=False, - desc="_simple_select_one_onecol", + desc="simple_select_one_onecol", ): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it. @@ -945,7 +945,7 @@ class SQLBaseStore(object): """ return self.runInteraction( desc, - self._simple_select_one_onecol_txn, + self.simple_select_one_onecol_txn, table, keyvalues, retcol, @@ -953,10 +953,10 @@ class SQLBaseStore(object): ) @classmethod - def _simple_select_one_onecol_txn( + def simple_select_one_onecol_txn( cls, txn, table, keyvalues, retcol, allow_none=False ): - ret = cls._simple_select_onecol_txn( + ret = cls.simple_select_onecol_txn( txn, table=table, keyvalues=keyvalues, retcol=retcol ) @@ -969,7 +969,7 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") @staticmethod - def _simple_select_onecol_txn(txn, table, keyvalues, retcol): + def simple_select_onecol_txn(txn, table, keyvalues, retcol): sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table} if keyvalues: @@ -980,8 +980,8 @@ class SQLBaseStore(object): return [r[0] for r in txn] - def _simple_select_onecol( - self, table, keyvalues, retcol, desc="_simple_select_onecol" + def simple_select_onecol( + self, table, keyvalues, retcol, desc="simple_select_onecol" ): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -995,12 +995,10 @@ class SQLBaseStore(object): Deferred: Results in a list """ return self.runInteraction( - desc, self._simple_select_onecol_txn, table, keyvalues, retcol + desc, self.simple_select_onecol_txn, table, keyvalues, retcol ) - def _simple_select_list( - self, table, keyvalues, retcols, desc="_simple_select_list" - ): + def simple_select_list(self, table, keyvalues, retcols, desc="simple_select_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -1014,11 +1012,11 @@ class SQLBaseStore(object): defer.Deferred: resolves to list[dict[str, Any]] """ return self.runInteraction( - desc, self._simple_select_list_txn, table, keyvalues, retcols + desc, self.simple_select_list_txn, table, keyvalues, retcols ) @classmethod - def _simple_select_list_txn(cls, txn, table, keyvalues, retcols): + def simple_select_list_txn(cls, txn, table, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -1044,14 +1042,14 @@ class SQLBaseStore(object): return cls.cursor_to_dict(txn) @defer.inlineCallbacks - def _simple_select_many_batch( + def simple_select_many_batch( self, table, column, iterable, retcols, keyvalues={}, - desc="_simple_select_many_batch", + desc="simple_select_many_batch", batch_size=100, ): """Executes a SELECT query on the named table, which may return zero or @@ -1080,7 +1078,7 @@ class SQLBaseStore(object): for chunk in chunks: rows = yield self.runInteraction( desc, - self._simple_select_many_txn, + self.simple_select_many_txn, table, column, chunk, @@ -1093,7 +1091,7 @@ class SQLBaseStore(object): return results @classmethod - def _simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols): + def simple_select_many_txn(cls, txn, table, column, iterable, keyvalues, retcols): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -1126,13 +1124,13 @@ class SQLBaseStore(object): txn.execute(sql, values) return cls.cursor_to_dict(txn) - def _simple_update(self, table, keyvalues, updatevalues, desc): + def simple_update(self, table, keyvalues, updatevalues, desc): return self.runInteraction( - desc, self._simple_update_txn, table, keyvalues, updatevalues + desc, self.simple_update_txn, table, keyvalues, updatevalues ) @staticmethod - def _simple_update_txn(txn, table, keyvalues, updatevalues): + def simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) else: @@ -1148,8 +1146,8 @@ class SQLBaseStore(object): return txn.rowcount - def _simple_update_one( - self, table, keyvalues, updatevalues, desc="_simple_update_one" + def simple_update_one( + self, table, keyvalues, updatevalues, desc="simple_update_one" ): """Executes an UPDATE query on the named table, setting new values for columns in a row matching the key values. @@ -1169,12 +1167,12 @@ class SQLBaseStore(object): the update column in the 'keyvalues' dict as well. """ return self.runInteraction( - desc, self._simple_update_one_txn, table, keyvalues, updatevalues + desc, self.simple_update_one_txn, table, keyvalues, updatevalues ) @classmethod - def _simple_update_one_txn(cls, txn, table, keyvalues, updatevalues): - rowcount = cls._simple_update_txn(txn, table, keyvalues, updatevalues) + def simple_update_one_txn(cls, txn, table, keyvalues, updatevalues): + rowcount = cls.simple_update_txn(txn, table, keyvalues, updatevalues) if rowcount == 0: raise StoreError(404, "No row found (%s)" % (table,)) @@ -1182,7 +1180,7 @@ class SQLBaseStore(object): raise StoreError(500, "More than one row matched (%s)" % (table,)) @staticmethod - def _simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False): + def simple_select_one_txn(txn, table, keyvalues, retcols, allow_none=False): select_sql = "SELECT %s FROM %s WHERE %s" % ( ", ".join(retcols), table, @@ -1201,7 +1199,7 @@ class SQLBaseStore(object): return dict(zip(retcols, row)) - def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"): + def simple_delete_one(self, table, keyvalues, desc="simple_delete_one"): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -1209,10 +1207,10 @@ class SQLBaseStore(object): table : string giving the table name keyvalues : dict of column names and values to select the row with """ - return self.runInteraction(desc, self._simple_delete_one_txn, table, keyvalues) + return self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues) @staticmethod - def _simple_delete_one_txn(txn, table, keyvalues): + def simple_delete_one_txn(txn, table, keyvalues): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -1231,11 +1229,11 @@ class SQLBaseStore(object): if txn.rowcount > 1: raise StoreError(500, "More than one row matched (%s)" % (table,)) - def _simple_delete(self, table, keyvalues, desc): - return self.runInteraction(desc, self._simple_delete_txn, table, keyvalues) + def simple_delete(self, table, keyvalues, desc): + return self.runInteraction(desc, self.simple_delete_txn, table, keyvalues) @staticmethod - def _simple_delete_txn(txn, table, keyvalues): + def simple_delete_txn(txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( table, " AND ".join("%s = ?" % (k,) for k in keyvalues), @@ -1244,13 +1242,13 @@ class SQLBaseStore(object): txn.execute(sql, list(keyvalues.values())) return txn.rowcount - def _simple_delete_many(self, table, column, iterable, keyvalues, desc): + def simple_delete_many(self, table, column, iterable, keyvalues, desc): return self.runInteraction( - desc, self._simple_delete_many_txn, table, column, iterable, keyvalues + desc, self.simple_delete_many_txn, table, column, iterable, keyvalues ) @staticmethod - def _simple_delete_many_txn(txn, table, column, iterable, keyvalues): + def simple_delete_many_txn(txn, table, column, iterable, keyvalues): """Executes a DELETE query on the named table. Filters rows by if value of `column` is in `iterable`. @@ -1283,7 +1281,7 @@ class SQLBaseStore(object): return txn.rowcount - def _get_cache_dict( + def get_cache_dict( self, db_conn, table, entity_column, stream_column, max_value, limit=100000 ): # Fetch a mapping of room_id -> max stream position for "recent" rooms. @@ -1349,7 +1347,7 @@ class SQLBaseStore(object): # which is fine. pass - def _simple_select_list_paginate( + def simple_select_list_paginate( self, table, keyvalues, @@ -1358,7 +1356,7 @@ class SQLBaseStore(object): limit, retcols, order_direction="ASC", - desc="_simple_select_list_paginate", + desc="simple_select_list_paginate", ): """ Executes a SELECT query on the named table with start and limit, @@ -1380,7 +1378,7 @@ class SQLBaseStore(object): """ return self.runInteraction( desc, - self._simple_select_list_paginate_txn, + self.simple_select_list_paginate_txn, table, keyvalues, orderby, @@ -1391,7 +1389,7 @@ class SQLBaseStore(object): ) @classmethod - def _simple_select_list_paginate_txn( + def simple_select_list_paginate_txn( cls, txn, table, @@ -1452,9 +1450,7 @@ class SQLBaseStore(object): txn.execute(sql_count) return txn.fetchone()[0] - def _simple_search_list( - self, table, term, col, retcols, desc="_simple_search_list" - ): + def simple_search_list(self, table, term, col, retcols, desc="simple_search_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -1469,11 +1465,11 @@ class SQLBaseStore(object): """ return self.runInteraction( - desc, self._simple_search_list_txn, table, term, col, retcols + desc, self.simple_search_list_txn, table, term, col, retcols ) @classmethod - def _simple_search_list_txn(cls, txn, table, term, col, retcols): + def simple_search_list_txn(cls, txn, table, term, col, retcols): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -1496,14 +1492,6 @@ class SQLBaseStore(object): return cls.cursor_to_dict(txn) - @property - def database_engine_name(self): - return self.database_engine.module.__name__ - - def get_server_version(self): - """Returns a string describing the server version number""" - return self.database_engine.server_version - class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 37d469ffd7..06955a0537 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -139,7 +139,7 @@ class BackgroundUpdateStore(SQLBaseStore): # otherwise, check if there are updates to be run. This is important, # as we may be running on a worker which doesn't perform the bg updates # itself, but still wants to wait for them to happen. - updates = yield self._simple_select_onecol( + updates = yield self.simple_select_onecol( "background_updates", keyvalues=None, retcol="1", @@ -161,7 +161,7 @@ class BackgroundUpdateStore(SQLBaseStore): if update_name in self._background_update_queue: return False - update_exists = await self._simple_select_one_onecol( + update_exists = await self.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="1", @@ -184,7 +184,7 @@ class BackgroundUpdateStore(SQLBaseStore): no more work to do. """ if not self._background_update_queue: - updates = yield self._simple_select_list( + updates = yield self.simple_select_list( "background_updates", keyvalues=None, retcols=("update_name", "depends_on"), @@ -226,7 +226,7 @@ class BackgroundUpdateStore(SQLBaseStore): else: batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE - progress_json = yield self._simple_select_one_onecol( + progress_json = yield self.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="progress_json", @@ -413,7 +413,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue = [] progress_json = json.dumps(progress) - return self._simple_insert( + return self.simple_insert( "background_updates", {"update_name": update_name, "progress_json": progress_json}, ) @@ -429,7 +429,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue = [ name for name in self._background_update_queue if name != update_name ] - return self._simple_delete_one( + return self.simple_delete_one( "background_updates", keyvalues={"update_name": update_name} ) @@ -444,7 +444,7 @@ class BackgroundUpdateStore(SQLBaseStore): progress_json = json.dumps(progress) - self._simple_update_one_txn( + self.simple_update_one_txn( txn, "background_updates", keyvalues={"update_name": update_name}, diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index 474924c68f..2a5b33dda1 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -173,7 +173,7 @@ class DataStore( self._presence_on_startup = self._get_active_presence(db_conn) - presence_cache_prefill, min_presence_val = self._get_cache_dict( + presence_cache_prefill, min_presence_val = self.get_cache_dict( db_conn, "presence_stream", entity_column="user_id", @@ -187,7 +187,7 @@ class DataStore( ) max_device_inbox_id = self._device_inbox_id_gen.get_current_token() - device_inbox_prefill, min_device_inbox_id = self._get_cache_dict( + device_inbox_prefill, min_device_inbox_id = self.get_cache_dict( db_conn, "device_inbox", entity_column="user_id", @@ -202,7 +202,7 @@ class DataStore( ) # The federation outbox and the local device inbox uses the same # stream_id generator. - device_outbox_prefill, min_device_outbox_id = self._get_cache_dict( + device_outbox_prefill, min_device_outbox_id = self.get_cache_dict( db_conn, "device_federation_outbox", entity_column="destination", @@ -228,7 +228,7 @@ class DataStore( ) events_max = self._stream_id_gen.get_current_token() - curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + curr_state_delta_prefill, min_curr_state_delta_id = self.get_cache_dict( db_conn, "current_state_delta_stream", entity_column="room_id", @@ -242,7 +242,7 @@ class DataStore( prefilled_cache=curr_state_delta_prefill, ) - _group_updates_prefill, min_group_updates_id = self._get_cache_dict( + _group_updates_prefill, min_group_updates_id = self.get_cache_dict( db_conn, "local_group_updates", entity_column="user_id", @@ -482,7 +482,7 @@ class DataStore( Returns: defer.Deferred: resolves to list[dict[str, Any]] """ - return self._simple_select_list( + return self.simple_select_list( table="users", keyvalues={}, retcols=["name", "password_hash", "is_guest", "admin", "user_type"], @@ -504,7 +504,7 @@ class DataStore( """ users = yield self.runInteraction( "get_users_paginate", - self._simple_select_list_paginate_txn, + self.simple_select_list_paginate_txn, table="users", keyvalues={"is_guest": False}, orderby=order, @@ -526,7 +526,7 @@ class DataStore( Returns: defer.Deferred: resolves to list[dict[str, Any]] """ - return self._simple_search_list( + return self.simple_search_list( table="users", term=term, col="name", diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index 22093484ed..b0d22faf3f 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -67,7 +67,7 @@ class AccountDataWorkerStore(SQLBaseStore): """ def get_account_data_for_user_txn(txn): - rows = self._simple_select_list_txn( + rows = self.simple_select_list_txn( txn, "account_data", {"user_id": user_id}, @@ -78,7 +78,7 @@ class AccountDataWorkerStore(SQLBaseStore): row["account_data_type"]: json.loads(row["content"]) for row in rows } - rows = self._simple_select_list_txn( + rows = self.simple_select_list_txn( txn, "room_account_data", {"user_id": user_id}, @@ -102,7 +102,7 @@ class AccountDataWorkerStore(SQLBaseStore): Returns: Deferred: A dict """ - result = yield self._simple_select_one_onecol( + result = yield self.simple_select_one_onecol( table="account_data", keyvalues={"user_id": user_id, "account_data_type": data_type}, retcol="content", @@ -127,7 +127,7 @@ class AccountDataWorkerStore(SQLBaseStore): """ def get_account_data_for_room_txn(txn): - rows = self._simple_select_list_txn( + rows = self.simple_select_list_txn( txn, "room_account_data", {"user_id": user_id, "room_id": room_id}, @@ -156,7 +156,7 @@ class AccountDataWorkerStore(SQLBaseStore): """ def get_account_data_for_room_and_type_txn(txn): - content_json = self._simple_select_one_onecol_txn( + content_json = self.simple_select_one_onecol_txn( txn, table="room_account_data", keyvalues={ @@ -300,9 +300,9 @@ class AccountDataStore(AccountDataWorkerStore): with self._account_data_id_gen.get_next() as next_id: # no need to lock here as room_account_data has a unique constraint - # on (user_id, room_id, account_data_type) so _simple_upsert will + # on (user_id, room_id, account_data_type) so simple_upsert will # retry if there is a conflict. - yield self._simple_upsert( + yield self.simple_upsert( desc="add_room_account_data", table="room_account_data", keyvalues={ @@ -346,9 +346,9 @@ class AccountDataStore(AccountDataWorkerStore): with self._account_data_id_gen.get_next() as next_id: # no need to lock here as account_data has a unique constraint on - # (user_id, account_data_type) so _simple_upsert will retry if + # (user_id, account_data_type) so simple_upsert will retry if # there is a conflict. - yield self._simple_upsert( + yield self.simple_upsert( desc="add_user_account_data", table="account_data", keyvalues={"user_id": user_id, "account_data_type": account_data_type}, diff --git a/synapse/storage/data_stores/main/appservice.py b/synapse/storage/data_stores/main/appservice.py index 81babf2029..6b82fd392a 100644 --- a/synapse/storage/data_stores/main/appservice.py +++ b/synapse/storage/data_stores/main/appservice.py @@ -133,7 +133,7 @@ class ApplicationServiceTransactionWorkerStore( A Deferred which resolves to a list of ApplicationServices, which may be empty. """ - results = yield self._simple_select_list( + results = yield self.simple_select_list( "application_services_state", dict(state=state), ["as_id"] ) # NB: This assumes this class is linked with ApplicationServiceStore @@ -155,7 +155,7 @@ class ApplicationServiceTransactionWorkerStore( Returns: A Deferred which resolves to ApplicationServiceState. """ - result = yield self._simple_select_one( + result = yield self.simple_select_one( "application_services_state", dict(as_id=service.id), ["state"], @@ -175,7 +175,7 @@ class ApplicationServiceTransactionWorkerStore( Returns: A Deferred which resolves when the state was set successfully. """ - return self._simple_upsert( + return self.simple_upsert( "application_services_state", dict(as_id=service.id), dict(state=state) ) @@ -249,7 +249,7 @@ class ApplicationServiceTransactionWorkerStore( ) # Set current txn_id for AS to 'txn_id' - self._simple_upsert_txn( + self.simple_upsert_txn( txn, "application_services_state", dict(as_id=service.id), @@ -257,7 +257,7 @@ class ApplicationServiceTransactionWorkerStore( ) # Delete txn - self._simple_delete_txn( + self.simple_delete_txn( txn, "application_services_txns", dict(txn_id=txn_id, as_id=service.id) ) diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index 258c08722a..de3256049d 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -95,7 +95,7 @@ class CacheInvalidationStore(SQLBaseStore): txn.call_after(ctx.__exit__, None, None, None) txn.call_after(self.hs.get_notifier().on_new_replication_data) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="cache_invalidation_stream", values={ diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index cae93b0e22..66522a04b7 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -431,7 +431,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry try: - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="user_ips", keyvalues={ @@ -450,7 +450,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): # Technically an access token might not be associated with # a device so we need to check. if device_id: - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="devices", keyvalues={"user_id": user_id, "device_id": device_id}, @@ -483,7 +483,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): if device_id is not None: keyvalues["device_id"] = device_id - res = yield self._simple_select_list( + res = yield self.simple_select_list( table="devices", keyvalues=keyvalues, retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), @@ -516,7 +516,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): user_agent, _, last_seen = self._batch_row_update[key] results[(access_token, ip)] = (user_agent, last_seen) - rows = yield self._simple_select_list( + rows = yield self.simple_select_list( table="user_ips", keyvalues={"user_id": user_id}, retcols=["access_token", "ip", "user_agent", "last_seen"], diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index a23744f11c..206d39134d 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -314,7 +314,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) # Check if we've already inserted a matching message_id for that # origin. This can happen if the origin doesn't receive our # acknowledgement from the first time we received the message. - already_inserted = self._simple_select_one_txn( + already_inserted = self.simple_select_one_txn( txn, table="device_federation_inbox", keyvalues={"origin": origin, "message_id": message_id}, @@ -326,7 +326,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) # Add an entry for this message_id so that we know we've processed # it. - self._simple_insert_txn( + self.simple_insert_txn( txn, table="device_federation_inbox", values={ diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index a3ad23e783..727c582121 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -61,7 +61,7 @@ class DeviceWorkerStore(SQLBaseStore): Raises: StoreError: if the device is not found """ - return self._simple_select_one( + return self.simple_select_one( table="devices", keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, retcols=("user_id", "device_id", "display_name"), @@ -80,7 +80,7 @@ class DeviceWorkerStore(SQLBaseStore): containing "device_id", "user_id" and "display_name" for each device. """ - devices = yield self._simple_select_list( + devices = yield self.simple_select_list( table="devices", keyvalues={"user_id": user_id, "hidden": False}, retcols=("user_id", "device_id", "display_name"), @@ -414,7 +414,7 @@ class DeviceWorkerStore(SQLBaseStore): from_user_id, stream_id, ) - self._simple_insert_txn( + self.simple_insert_txn( txn, "user_signature_stream", values={ @@ -466,7 +466,7 @@ class DeviceWorkerStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2, tree=True) def _get_cached_user_device(self, user_id, device_id): - content = yield self._simple_select_one_onecol( + content = yield self.simple_select_one_onecol( table="device_lists_remote_cache", keyvalues={"user_id": user_id, "device_id": device_id}, retcol="content", @@ -476,7 +476,7 @@ class DeviceWorkerStore(SQLBaseStore): @cachedInlineCallbacks() def _get_cached_devices_for_user(self, user_id): - devices = yield self._simple_select_list( + devices = yield self.simple_select_list( table="device_lists_remote_cache", keyvalues={"user_id": user_id}, retcols=("device_id", "content"), @@ -584,7 +584,7 @@ class DeviceWorkerStore(SQLBaseStore): SELECT DISTINCT user_ids FROM user_signature_stream WHERE from_user_id = ? AND stream_id > ? """ - rows = yield self._execute( + rows = yield self.execute( "get_users_whose_signatures_changed", None, sql, user_id, from_key ) return set(user for row in rows for user in json.loads(row[0])) @@ -605,7 +605,7 @@ class DeviceWorkerStore(SQLBaseStore): WHERE ? < stream_id AND stream_id <= ? GROUP BY user_id, destination """ - return self._execute( + return self.execute( "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key ) @@ -614,7 +614,7 @@ class DeviceWorkerStore(SQLBaseStore): """Get the last stream_id we got for a user. May be None if we haven't got any information for them. """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, retcol="stream_id", @@ -628,7 +628,7 @@ class DeviceWorkerStore(SQLBaseStore): inlineCallbacks=True, ) def get_device_list_last_stream_id_for_remotes(self, user_ids): - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="device_lists_remote_extremeties", column="user_id", iterable=user_ids, @@ -722,7 +722,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): return False try: - inserted = yield self._simple_insert( + inserted = yield self.simple_insert( "devices", values={ "user_id": user_id, @@ -736,7 +736,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): if not inserted: # if the device already exists, check if it's a real device, or # if the device ID is reserved by something else - hidden = yield self._simple_select_one_onecol( + hidden = yield self.simple_select_one_onecol( "devices", keyvalues={"user_id": user_id, "device_id": device_id}, retcol="hidden", @@ -771,7 +771,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): Returns: defer.Deferred """ - yield self._simple_delete_one( + yield self.simple_delete_one( table="devices", keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, desc="delete_device", @@ -789,7 +789,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): Returns: defer.Deferred """ - yield self._simple_delete_many( + yield self.simple_delete_many( table="devices", column="device_id", iterable=device_ids, @@ -818,7 +818,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): updates["display_name"] = new_display_name if not updates: return defer.succeed(None) - return self._simple_update_one( + return self.simple_update_one( table="devices", keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, updatevalues=updates, @@ -829,7 +829,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def mark_remote_user_device_list_as_unsubscribed(self, user_id): """Mark that we no longer track device lists for remote user. """ - yield self._simple_delete( + yield self.simple_delete( table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, desc="mark_remote_user_device_list_as_unsubscribed", @@ -866,7 +866,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self, txn, user_id, device_id, content, stream_id ): if content.get("deleted"): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id, "device_id": device_id}, @@ -874,7 +874,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): txn.call_after(self.device_id_exists_cache.invalidate, (user_id, device_id)) else: - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id, "device_id": device_id}, @@ -890,7 +890,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) ) - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, @@ -923,11 +923,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) def _update_remote_device_list_cache_txn(self, txn, user_id, devices, stream_id): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id} ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="device_lists_remote_cache", values=[ @@ -946,7 +946,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,) ) - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, @@ -995,7 +995,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): [(user_id, device_id, stream_id) for device_id in device_ids], ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="device_lists_stream", values=[ @@ -1006,7 +1006,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): context = get_active_span_text_map() - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="device_lists_outbound_pokes", values=[ diff --git a/synapse/storage/data_stores/main/directory.py b/synapse/storage/data_stores/main/directory.py index 297966d9f4..d332f8a409 100644 --- a/synapse/storage/data_stores/main/directory.py +++ b/synapse/storage/data_stores/main/directory.py @@ -36,7 +36,7 @@ class DirectoryWorkerStore(SQLBaseStore): Deferred: results in namedtuple with keys "room_id" and "servers" or None if no association can be found """ - room_id = yield self._simple_select_one_onecol( + room_id = yield self.simple_select_one_onecol( "room_aliases", {"room_alias": room_alias.to_string()}, "room_id", @@ -47,7 +47,7 @@ class DirectoryWorkerStore(SQLBaseStore): if not room_id: return None - servers = yield self._simple_select_onecol( + servers = yield self.simple_select_onecol( "room_alias_servers", {"room_alias": room_alias.to_string()}, "server", @@ -60,7 +60,7 @@ class DirectoryWorkerStore(SQLBaseStore): return RoomAliasMapping(room_id, room_alias.to_string(), servers) def get_room_alias_creator(self, room_alias): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="room_aliases", keyvalues={"room_alias": room_alias}, retcol="creator", @@ -69,7 +69,7 @@ class DirectoryWorkerStore(SQLBaseStore): @cached(max_entries=5000) def get_aliases_for_room(self, room_id): - return self._simple_select_onecol( + return self.simple_select_onecol( "room_aliases", {"room_id": room_id}, "room_alias", @@ -93,7 +93,7 @@ class DirectoryStore(DirectoryWorkerStore): """ def alias_txn(txn): - self._simple_insert_txn( + self.simple_insert_txn( txn, "room_aliases", { @@ -103,7 +103,7 @@ class DirectoryStore(DirectoryWorkerStore): }, ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="room_alias_servers", values=[ diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py index 113224fd7c..df89eda337 100644 --- a/synapse/storage/data_stores/main/e2e_room_keys.py +++ b/synapse/storage/data_stores/main/e2e_room_keys.py @@ -38,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): StoreError """ - yield self._simple_update_one( + yield self.simple_update_one( table="e2e_room_keys", keyvalues={ "user_id": user_id, @@ -89,7 +89,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): } ) - yield self._simple_insert_many( + yield self.simple_insert_many( table="e2e_room_keys", values=values, desc="add_e2e_room_keys" ) @@ -125,7 +125,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): if session_id: keyvalues["session_id"] = session_id - rows = yield self._simple_select_list( + rows = yield self.simple_select_list( table="e2e_room_keys", keyvalues=keyvalues, retcols=( @@ -234,7 +234,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): version (str): the version ID of the backup we're querying about """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="e2e_room_keys", keyvalues={"user_id": user_id, "version": version}, retcol="COUNT(*)", @@ -267,7 +267,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): if session_id: keyvalues["session_id"] = session_id - yield self._simple_delete( + yield self.simple_delete( table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys" ) @@ -312,7 +312,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): # it isn't there. raise StoreError(404, "No row found") - result = self._simple_select_one_txn( + result = self.simple_select_one_txn( txn, table="e2e_room_keys_versions", keyvalues={"user_id": user_id, "version": this_version, "deleted": 0}, @@ -352,7 +352,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): new_version = str(int(current_version) + 1) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="e2e_room_keys_versions", values={ @@ -391,7 +391,7 @@ class EndToEndRoomKeyStore(SQLBaseStore): updatevalues["etag"] = version_etag if updatevalues: - return self._simple_update( + return self.simple_update( table="e2e_room_keys_versions", keyvalues={"user_id": user_id, "version": version}, updatevalues=updatevalues, @@ -420,13 +420,13 @@ class EndToEndRoomKeyStore(SQLBaseStore): else: this_version = version - self._simple_delete_txn( + self.simple_delete_txn( txn, table="e2e_room_keys", keyvalues={"user_id": user_id, "version": this_version}, ) - return self._simple_update_one_txn( + return self.simple_update_one_txn( txn, table="e2e_room_keys_versions", keyvalues={"user_id": user_id, "version": this_version}, diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 643327b57b..08bcdc4725 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -186,7 +186,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): key_id) to json string for key """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="e2e_one_time_keys_json", column="key_id", iterable=key_ids, @@ -219,7 +219,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): # a unique constraint. If there is a race of two calls to # `add_e2e_one_time_keys` then they'll conflict and we will only # insert one set. - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="e2e_one_time_keys_json", values=[ @@ -350,7 +350,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): WHERE ? < stream_id AND stream_id <= ? GROUP BY user_id """ - return self._execute( + return self.execute( "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key ) @@ -367,7 +367,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): set_tag("time_now", time_now) set_tag("device_keys", device_keys) - old_key_json = self._simple_select_one_onecol_txn( + old_key_json = self.simple_select_one_onecol_txn( txn, table="e2e_device_keys_json", keyvalues={"user_id": user_id, "device_id": device_id}, @@ -383,7 +383,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): log_kv({"Message": "Device key already stored."}) return False - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="e2e_device_keys_json", keyvalues={"user_id": user_id, "device_id": device_id}, @@ -442,12 +442,12 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): "user_id": user_id, } ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="e2e_device_keys_json", keyvalues={"user_id": user_id, "device_id": device_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="e2e_one_time_keys_json", keyvalues={"user_id": user_id, "device_id": device_id}, @@ -492,7 +492,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): # The "keys" property must only have one entry, which will be the public # key, so we just grab the first value in there pubkey = next(iter(key["keys"].values())) - self._simple_insert_txn( + self.simple_insert_txn( txn, "devices", values={ @@ -505,7 +505,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): # and finally, store the key itself with self._cross_signing_id_gen.get_next() as stream_id: - self._simple_insert_txn( + self.simple_insert_txn( txn, "e2e_cross_signing_keys", values={ @@ -539,7 +539,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): user_id (str): the user who made the signatures signatures (iterable[SignatureListItem]): signatures to add """ - return self._simple_insert_many( + return self.simple_insert_many( "e2e_cross_signing_signatures", [ { diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 90bef0cd2c..051ac7a8cb 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -126,7 +126,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas Returns Deferred[int] """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="events", column="event_id", iterable=event_ids, @@ -140,7 +140,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return max(row["depth"] for row in rows) def _get_oldest_events_in_room_txn(self, txn, room_id): - return self._simple_select_onecol_txn( + return self.simple_select_onecol_txn( txn, table="event_backward_extremities", keyvalues={"room_id": room_id}, @@ -235,7 +235,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas @cached(max_entries=5000, iterable=True) def get_latest_event_ids_in_room(self, room_id): - return self._simple_select_onecol( + return self.simple_select_onecol( table="event_forward_extremities", keyvalues={"room_id": room_id}, retcol="event_id", @@ -271,7 +271,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) def _get_min_depth_interaction(self, txn, room_id): - min_depth = self._simple_select_one_onecol_txn( + min_depth = self.simple_select_one_onecol_txn( txn, table="room_depth", keyvalues={"room_id": room_id}, @@ -383,7 +383,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas queue = PriorityQueue() for event_id in event_list: - depth = self._simple_select_one_onecol_txn( + depth = self.simple_select_one_onecol_txn( txn, table="events", keyvalues={"event_id": event_id, "room_id": room_id}, @@ -468,7 +468,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas Returns: Deferred[list[str]] """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="event_edges", column="prev_event_id", iterable=event_ids, @@ -508,7 +508,7 @@ class EventFederationStore(EventFederationWorkerStore): if min_depth and depth >= min_depth: return - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="room_depth", keyvalues={"room_id": room_id}, @@ -520,7 +520,7 @@ class EventFederationStore(EventFederationWorkerStore): For the given event, update the event edges table and forward and backward extremities tables. """ - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="event_edges", values=[ diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 04ce21ac66..0a37847cfd 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -441,7 +441,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ) def _add_push_actions_to_staging_txn(txn): - # We don't use _simple_insert_many here to avoid the overhead + # We don't use simple_insert_many here to avoid the overhead # of generating lists of dicts. sql = """ @@ -472,7 +472,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): """ try: - res = yield self._simple_delete( + res = yield self.simple_delete( table="event_push_actions_staging", keyvalues={"event_id": event_id}, desc="remove_push_actions_from_staging", @@ -677,7 +677,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) for event, _ in events_and_contexts: - user_ids = self._simple_select_onecol_txn( + user_ids = self.simple_select_onecol_txn( txn, table="event_push_actions_staging", keyvalues={"event_id": event.event_id}, @@ -844,7 +844,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): the archiving process has caught up or not. """ - old_rotate_stream_ordering = self._simple_select_one_onecol_txn( + old_rotate_stream_ordering = self.simple_select_one_onecol_txn( txn, table="event_push_summary_stream_ordering", keyvalues={}, @@ -880,7 +880,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): return caught_up def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering): - old_rotate_stream_ordering = self._simple_select_one_onecol_txn( + old_rotate_stream_ordering = self.simple_select_one_onecol_txn( txn, table="event_push_summary_stream_ordering", keyvalues={}, @@ -912,7 +912,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # If the `old.user_id` above is NULL then we know there isn't already an # entry in the table, so we simply insert it. Otherwise we update the # existing table. - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="event_push_summary", values=[ diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 79c91fe284..98ae69e996 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -432,7 +432,7 @@ class EventsStore( # event's auth chain, but its easier for now just to store them (and # it doesn't take much storage compared to storing the entire event # anyway). - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="event_auth", values=[ @@ -580,12 +580,12 @@ class EventsStore( self, txn, new_forward_extremities, max_stream_order ): for room_id, new_extrem in iteritems(new_forward_extremities): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) txn.call_after(self.get_latest_event_ids_in_room.invalidate, (room_id,)) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="event_forward_extremities", values=[ @@ -598,7 +598,7 @@ class EventsStore( # new stream_ordering to new forward extremeties in the room. # This allows us to later efficiently look up the forward extremeties # for a room before a given stream_ordering - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="stream_ordering_to_exterm", values=[ @@ -722,7 +722,7 @@ class EventsStore( # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering state_group_id = context.state_group - self._simple_insert_txn( + self.simple_insert_txn( txn, table="ex_outlier_stream", values={ @@ -794,7 +794,7 @@ class EventsStore( d.pop("redacted_because", None) return d - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="event_json", values=[ @@ -811,7 +811,7 @@ class EventsStore( ], ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="events", values=[ @@ -841,7 +841,7 @@ class EventsStore( # If we're persisting an unredacted event we go and ensure # that we mark any redactions that reference this event as # requiring censoring. - self._simple_update_txn( + self.simple_update_txn( txn, table="redactions", keyvalues={"redacts": event.event_id}, @@ -983,7 +983,7 @@ class EventsStore( state_values.append(vals) - self._simple_insert_many_txn(txn, table="state_events", values=state_values) + self.simple_insert_many_txn(txn, table="state_events", values=state_values) # Prefill the event cache self._add_to_cache(txn, events_and_contexts) @@ -1032,7 +1032,7 @@ class EventsStore( # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="redactions", values={ @@ -1077,9 +1077,7 @@ class EventsStore( LIMIT ? """ - rows = yield self._execute( - "_censor_redactions_fetch", None, sql, before_ts, 100 - ) + rows = yield self.execute("_censor_redactions_fetch", None, sql, before_ts, 100) updates = [] @@ -1111,7 +1109,7 @@ class EventsStore( if pruned_json: self._censor_event_txn(txn, event_id, pruned_json) - self._simple_update_one_txn( + self.simple_update_one_txn( txn, table="redactions", keyvalues={"event_id": redaction_id}, @@ -1129,7 +1127,7 @@ class EventsStore( event_id (str): The ID of the event to censor. pruned_json (str): The pruned JSON """ - self._simple_update_one_txn( + self.simple_update_one_txn( txn, table="event_json", keyvalues={"event_id": event_id}, @@ -1780,7 +1778,7 @@ class EventsStore( "[purge] found %i state groups to delete", len(state_groups_to_delete) ) - rows = self._simple_select_many_txn( + rows = self.simple_select_many_txn( txn, table="state_group_edges", column="prev_state_group", @@ -1807,15 +1805,15 @@ class EventsStore( curr_state = self._get_state_groups_from_groups_txn(txn, [sg]) curr_state = curr_state[sg] - self._simple_delete_txn( + self.simple_delete_txn( txn, table="state_groups_state", keyvalues={"state_group": sg} ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="state_group_edges", keyvalues={"state_group": sg} ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="state_groups_state", values=[ @@ -1852,7 +1850,7 @@ class EventsStore( state group. """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="state_group_edges", column="prev_state_group", iterable=state_groups, @@ -1882,7 +1880,7 @@ class EventsStore( # first we have to delete the state groups states logger.info("[purge] removing %s from state_groups_state", room_id) - self._simple_delete_many_txn( + self.simple_delete_many_txn( txn, table="state_groups_state", column="state_group", @@ -1893,7 +1891,7 @@ class EventsStore( # ... and the state group edges logger.info("[purge] removing %s from state_group_edges", room_id) - self._simple_delete_many_txn( + self.simple_delete_many_txn( txn, table="state_group_edges", column="state_group", @@ -1904,7 +1902,7 @@ class EventsStore( # ... and the state groups logger.info("[purge] removing %s from state_groups", room_id) - self._simple_delete_many_txn( + self.simple_delete_many_txn( txn, table="state_groups", column="id", @@ -1921,7 +1919,7 @@ class EventsStore( @cachedInlineCallbacks(max_entries=5000) def _get_event_ordering(self, event_id): - res = yield self._simple_select_one( + res = yield self.simple_select_one( table="events", retcols=["topological_ordering", "stream_ordering"], keyvalues={"event_id": event_id}, @@ -1962,7 +1960,7 @@ class EventsStore( room_id (str): The ID of the room the event was sent to. topological_ordering (int): The position of the event in the room's topology. """ - return self._simple_insert_many_txn( + return self.simple_insert_many_txn( txn=txn, table="event_labels", values=[ @@ -1984,7 +1982,7 @@ class EventsStore( event_id (str): The event ID the expiry timestamp is associated with. expiry_ts (int): The timestamp at which to expire (delete) the event. """ - return self._simple_insert_txn( + return self.simple_insert_txn( txn=txn, table="event_expiry", values={"event_id": event_id, "expiry_ts": expiry_ts}, @@ -2043,7 +2041,7 @@ class EventsStore( txn (LoggingTransaction): The transaction to use to perform the deletion. event_id (str): The event ID to delete the associated expiry timestamp of. """ - return self._simple_delete_txn( + return self.simple_delete_txn( txn=txn, table="event_expiry", keyvalues={"event_id": event_id} ) diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index aa87f9abc5..37dfc8c871 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -189,7 +189,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)] for chunk in chunks: - ev_rows = self._simple_select_many_txn( + ev_rows = self.simple_select_many_txn( txn, table="event_json", column="event_id", @@ -366,7 +366,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): to_delete.intersection_update(original_set) - deleted = self._simple_delete_many_txn( + deleted = self.simple_delete_many_txn( txn=txn, table="event_forward_extremities", column="event_id", @@ -382,7 +382,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): if deleted: # We now need to invalidate the caches of these rooms - rows = self._simple_select_many_txn( + rows = self.simple_select_many_txn( txn, table="events", column="event_id", @@ -396,7 +396,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): self.get_latest_event_ids_in_room.invalidate, (room_id,) ) - self._simple_delete_many_txn( + self.simple_delete_many_txn( txn=txn, table="_extremities_to_check", column="event_id", @@ -533,7 +533,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): try: event_json = json.loads(event_json_raw) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn=txn, table="event_labels", values=[ diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index eaddca65b7..6a08a746b6 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -78,7 +78,7 @@ class EventsWorkerStore(SQLBaseStore): Deferred[int|None]: Timestamp in milliseconds, or None for events that were persisted before received_ts was implemented. """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="events", keyvalues={"event_id": event_id}, retcol="received_ts", @@ -452,7 +452,7 @@ class EventsWorkerStore(SQLBaseStore): event_id for events, _ in event_list for event_id in events ) - row_dict = self._new_transaction( + row_dict = self.new_transaction( conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch ) @@ -745,7 +745,7 @@ class EventsWorkerStore(SQLBaseStore): """Given a list of event ids, check if we have already processed and stored them as non outliers. """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="events", retcols=("event_id",), column="event_id", diff --git a/synapse/storage/data_stores/main/filtering.py b/synapse/storage/data_stores/main/filtering.py index f05ace299a..17ef7b9354 100644 --- a/synapse/storage/data_stores/main/filtering.py +++ b/synapse/storage/data_stores/main/filtering.py @@ -30,7 +30,7 @@ class FilteringStore(SQLBaseStore): except ValueError: raise SynapseError(400, "Invalid filter ID", Codes.INVALID_PARAM) - def_json = yield self._simple_select_one_onecol( + def_json = yield self.simple_select_one_onecol( table="user_filters", keyvalues={"user_id": user_localpart, "filter_id": filter_id}, retcol="filter_json", diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index 5ded539af8..9e1d12bcb7 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -35,7 +35,7 @@ class GroupServerStore(SQLBaseStore): * "invite" * "open" """ - return self._simple_update_one( + return self.simple_update_one( table="groups", keyvalues={"group_id": group_id}, updatevalues={"join_policy": join_policy}, @@ -43,7 +43,7 @@ class GroupServerStore(SQLBaseStore): ) def get_group(self, group_id): - return self._simple_select_one( + return self.simple_select_one( table="groups", keyvalues={"group_id": group_id}, retcols=( @@ -65,7 +65,7 @@ class GroupServerStore(SQLBaseStore): if not include_private: keyvalues["is_public"] = True - return self._simple_select_list( + return self.simple_select_list( table="group_users", keyvalues=keyvalues, retcols=("user_id", "is_public", "is_admin"), @@ -75,7 +75,7 @@ class GroupServerStore(SQLBaseStore): def get_invited_users_in_group(self, group_id): # TODO: Pagination - return self._simple_select_onecol( + return self.simple_select_onecol( table="group_invites", keyvalues={"group_id": group_id}, retcol="user_id", @@ -89,7 +89,7 @@ class GroupServerStore(SQLBaseStore): if not include_private: keyvalues["is_public"] = True - return self._simple_select_list( + return self.simple_select_list( table="group_rooms", keyvalues=keyvalues, retcols=("room_id", "is_public"), @@ -180,7 +180,7 @@ class GroupServerStore(SQLBaseStore): an order of 1 will put the room first. Otherwise, the room gets added to the end. """ - room_in_group = self._simple_select_one_onecol_txn( + room_in_group = self.simple_select_one_onecol_txn( txn, table="group_rooms", keyvalues={"group_id": group_id, "room_id": room_id}, @@ -193,7 +193,7 @@ class GroupServerStore(SQLBaseStore): if category_id is None: category_id = _DEFAULT_CATEGORY_ID else: - cat_exists = self._simple_select_one_onecol_txn( + cat_exists = self.simple_select_one_onecol_txn( txn, table="group_room_categories", keyvalues={"group_id": group_id, "category_id": category_id}, @@ -204,7 +204,7 @@ class GroupServerStore(SQLBaseStore): raise SynapseError(400, "Category doesn't exist") # TODO: Check category is part of summary already - cat_exists = self._simple_select_one_onecol_txn( + cat_exists = self.simple_select_one_onecol_txn( txn, table="group_summary_room_categories", keyvalues={"group_id": group_id, "category_id": category_id}, @@ -224,7 +224,7 @@ class GroupServerStore(SQLBaseStore): (group_id, category_id, group_id, category_id), ) - existing = self._simple_select_one_txn( + existing = self.simple_select_one_txn( txn, table="group_summary_rooms", keyvalues={ @@ -257,7 +257,7 @@ class GroupServerStore(SQLBaseStore): to_update["room_order"] = order if is_public is not None: to_update["is_public"] = is_public - self._simple_update_txn( + self.simple_update_txn( txn, table="group_summary_rooms", keyvalues={ @@ -271,7 +271,7 @@ class GroupServerStore(SQLBaseStore): if is_public is None: is_public = True - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_summary_rooms", values={ @@ -287,7 +287,7 @@ class GroupServerStore(SQLBaseStore): if category_id is None: category_id = _DEFAULT_CATEGORY_ID - return self._simple_delete( + return self.simple_delete( table="group_summary_rooms", keyvalues={ "group_id": group_id, @@ -299,7 +299,7 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_group_categories(self, group_id): - rows = yield self._simple_select_list( + rows = yield self.simple_select_list( table="group_room_categories", keyvalues={"group_id": group_id}, retcols=("category_id", "is_public", "profile"), @@ -316,7 +316,7 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_group_category(self, group_id, category_id): - category = yield self._simple_select_one( + category = yield self.simple_select_one( table="group_room_categories", keyvalues={"group_id": group_id, "category_id": category_id}, retcols=("is_public", "profile"), @@ -343,7 +343,7 @@ class GroupServerStore(SQLBaseStore): else: update_values["is_public"] = is_public - return self._simple_upsert( + return self.simple_upsert( table="group_room_categories", keyvalues={"group_id": group_id, "category_id": category_id}, values=update_values, @@ -352,7 +352,7 @@ class GroupServerStore(SQLBaseStore): ) def remove_group_category(self, group_id, category_id): - return self._simple_delete( + return self.simple_delete( table="group_room_categories", keyvalues={"group_id": group_id, "category_id": category_id}, desc="remove_group_category", @@ -360,7 +360,7 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_group_roles(self, group_id): - rows = yield self._simple_select_list( + rows = yield self.simple_select_list( table="group_roles", keyvalues={"group_id": group_id}, retcols=("role_id", "is_public", "profile"), @@ -377,7 +377,7 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def get_group_role(self, group_id, role_id): - role = yield self._simple_select_one( + role = yield self.simple_select_one( table="group_roles", keyvalues={"group_id": group_id, "role_id": role_id}, retcols=("is_public", "profile"), @@ -404,7 +404,7 @@ class GroupServerStore(SQLBaseStore): else: update_values["is_public"] = is_public - return self._simple_upsert( + return self.simple_upsert( table="group_roles", keyvalues={"group_id": group_id, "role_id": role_id}, values=update_values, @@ -413,7 +413,7 @@ class GroupServerStore(SQLBaseStore): ) def remove_group_role(self, group_id, role_id): - return self._simple_delete( + return self.simple_delete( table="group_roles", keyvalues={"group_id": group_id, "role_id": role_id}, desc="remove_group_role", @@ -444,7 +444,7 @@ class GroupServerStore(SQLBaseStore): an order of 1 will put the user first. Otherwise, the user gets added to the end. """ - user_in_group = self._simple_select_one_onecol_txn( + user_in_group = self.simple_select_one_onecol_txn( txn, table="group_users", keyvalues={"group_id": group_id, "user_id": user_id}, @@ -457,7 +457,7 @@ class GroupServerStore(SQLBaseStore): if role_id is None: role_id = _DEFAULT_ROLE_ID else: - role_exists = self._simple_select_one_onecol_txn( + role_exists = self.simple_select_one_onecol_txn( txn, table="group_roles", keyvalues={"group_id": group_id, "role_id": role_id}, @@ -468,7 +468,7 @@ class GroupServerStore(SQLBaseStore): raise SynapseError(400, "Role doesn't exist") # TODO: Check role is part of the summary already - role_exists = self._simple_select_one_onecol_txn( + role_exists = self.simple_select_one_onecol_txn( txn, table="group_summary_roles", keyvalues={"group_id": group_id, "role_id": role_id}, @@ -488,7 +488,7 @@ class GroupServerStore(SQLBaseStore): (group_id, role_id, group_id, role_id), ) - existing = self._simple_select_one_txn( + existing = self.simple_select_one_txn( txn, table="group_summary_users", keyvalues={"group_id": group_id, "user_id": user_id, "role_id": role_id}, @@ -517,7 +517,7 @@ class GroupServerStore(SQLBaseStore): to_update["user_order"] = order if is_public is not None: to_update["is_public"] = is_public - self._simple_update_txn( + self.simple_update_txn( txn, table="group_summary_users", keyvalues={ @@ -531,7 +531,7 @@ class GroupServerStore(SQLBaseStore): if is_public is None: is_public = True - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_summary_users", values={ @@ -547,7 +547,7 @@ class GroupServerStore(SQLBaseStore): if role_id is None: role_id = _DEFAULT_ROLE_ID - return self._simple_delete( + return self.simple_delete( table="group_summary_users", keyvalues={"group_id": group_id, "role_id": role_id, "user_id": user_id}, desc="remove_user_from_summary", @@ -561,7 +561,7 @@ class GroupServerStore(SQLBaseStore): Deferred[list[str]]: A twisted.Deferred containing a list of group ids containing this room """ - return self._simple_select_onecol( + return self.simple_select_onecol( table="group_rooms", keyvalues={"room_id": room_id}, retcol="group_id", @@ -630,7 +630,7 @@ class GroupServerStore(SQLBaseStore): ) def is_user_in_group(self, user_id, group_id): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="group_users", keyvalues={"group_id": group_id, "user_id": user_id}, retcol="user_id", @@ -639,7 +639,7 @@ class GroupServerStore(SQLBaseStore): ).addCallback(lambda r: bool(r)) def is_user_admin_in_group(self, group_id, user_id): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="group_users", keyvalues={"group_id": group_id, "user_id": user_id}, retcol="is_admin", @@ -650,7 +650,7 @@ class GroupServerStore(SQLBaseStore): def add_group_invite(self, group_id, user_id): """Record that the group server has invited a user """ - return self._simple_insert( + return self.simple_insert( table="group_invites", values={"group_id": group_id, "user_id": user_id}, desc="add_group_invite", @@ -659,7 +659,7 @@ class GroupServerStore(SQLBaseStore): def is_user_invited_to_local_group(self, group_id, user_id): """Has the group server invited a user? """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="group_invites", keyvalues={"group_id": group_id, "user_id": user_id}, retcol="user_id", @@ -682,7 +682,7 @@ class GroupServerStore(SQLBaseStore): """ def _get_users_membership_in_group_txn(txn): - row = self._simple_select_one_txn( + row = self.simple_select_one_txn( txn, table="group_users", keyvalues={"group_id": group_id, "user_id": user_id}, @@ -697,7 +697,7 @@ class GroupServerStore(SQLBaseStore): "is_privileged": row["is_admin"], } - row = self._simple_select_one_onecol_txn( + row = self.simple_select_one_onecol_txn( txn, table="group_invites", keyvalues={"group_id": group_id, "user_id": user_id}, @@ -738,7 +738,7 @@ class GroupServerStore(SQLBaseStore): """ def _add_user_to_group_txn(txn): - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_users", values={ @@ -749,14 +749,14 @@ class GroupServerStore(SQLBaseStore): }, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_invites", keyvalues={"group_id": group_id, "user_id": user_id}, ) if local_attestation: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_attestations_renewals", values={ @@ -766,7 +766,7 @@ class GroupServerStore(SQLBaseStore): }, ) if remote_attestation: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_attestations_remote", values={ @@ -781,27 +781,27 @@ class GroupServerStore(SQLBaseStore): def remove_user_from_group(self, group_id, user_id): def _remove_user_from_group_txn(txn): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_users", keyvalues={"group_id": group_id, "user_id": user_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_invites", keyvalues={"group_id": group_id, "user_id": user_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_attestations_renewals", keyvalues={"group_id": group_id, "user_id": user_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_attestations_remote", keyvalues={"group_id": group_id, "user_id": user_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_summary_users", keyvalues={"group_id": group_id, "user_id": user_id}, @@ -812,14 +812,14 @@ class GroupServerStore(SQLBaseStore): ) def add_room_to_group(self, group_id, room_id, is_public): - return self._simple_insert( + return self.simple_insert( table="group_rooms", values={"group_id": group_id, "room_id": room_id, "is_public": is_public}, desc="add_room_to_group", ) def update_room_in_group_visibility(self, group_id, room_id, is_public): - return self._simple_update( + return self.simple_update( table="group_rooms", keyvalues={"group_id": group_id, "room_id": room_id}, updatevalues={"is_public": is_public}, @@ -828,13 +828,13 @@ class GroupServerStore(SQLBaseStore): def remove_room_from_group(self, group_id, room_id): def _remove_room_from_group_txn(txn): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_rooms", keyvalues={"group_id": group_id, "room_id": room_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_summary_rooms", keyvalues={"group_id": group_id, "room_id": room_id}, @@ -847,7 +847,7 @@ class GroupServerStore(SQLBaseStore): def get_publicised_groups_for_user(self, user_id): """Get all groups a user is publicising """ - return self._simple_select_onecol( + return self.simple_select_onecol( table="local_group_membership", keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True}, retcol="group_id", @@ -857,7 +857,7 @@ class GroupServerStore(SQLBaseStore): def update_group_publicity(self, group_id, user_id, publicise): """Update whether the user is publicising their membership of the group """ - return self._simple_update_one( + return self.simple_update_one( table="local_group_membership", keyvalues={"group_id": group_id, "user_id": user_id}, updatevalues={"is_publicised": publicise}, @@ -893,12 +893,12 @@ class GroupServerStore(SQLBaseStore): def _register_user_group_membership_txn(txn, next_id): # TODO: Upsert? - self._simple_delete_txn( + self.simple_delete_txn( txn, table="local_group_membership", keyvalues={"group_id": group_id, "user_id": user_id}, ) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="local_group_membership", values={ @@ -911,7 +911,7 @@ class GroupServerStore(SQLBaseStore): }, ) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="local_group_updates", values={ @@ -930,7 +930,7 @@ class GroupServerStore(SQLBaseStore): if membership == "join": if local_attestation: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_attestations_renewals", values={ @@ -940,7 +940,7 @@ class GroupServerStore(SQLBaseStore): }, ) if remote_attestation: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="group_attestations_remote", values={ @@ -951,12 +951,12 @@ class GroupServerStore(SQLBaseStore): }, ) else: - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_attestations_renewals", keyvalues={"group_id": group_id, "user_id": user_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="group_attestations_remote", keyvalues={"group_id": group_id, "user_id": user_id}, @@ -976,7 +976,7 @@ class GroupServerStore(SQLBaseStore): def create_group( self, group_id, user_id, name, avatar_url, short_description, long_description ): - yield self._simple_insert( + yield self.simple_insert( table="groups", values={ "group_id": group_id, @@ -991,7 +991,7 @@ class GroupServerStore(SQLBaseStore): @defer.inlineCallbacks def update_group_profile(self, group_id, profile): - yield self._simple_update_one( + yield self.simple_update_one( table="groups", keyvalues={"group_id": group_id}, updatevalues=profile, @@ -1017,7 +1017,7 @@ class GroupServerStore(SQLBaseStore): def update_attestation_renewal(self, group_id, user_id, attestation): """Update an attestation that we have renewed """ - return self._simple_update_one( + return self.simple_update_one( table="group_attestations_renewals", keyvalues={"group_id": group_id, "user_id": user_id}, updatevalues={"valid_until_ms": attestation["valid_until_ms"]}, @@ -1027,7 +1027,7 @@ class GroupServerStore(SQLBaseStore): def update_remote_attestion(self, group_id, user_id, attestation): """Update an attestation that a remote has renewed """ - return self._simple_update_one( + return self.simple_update_one( table="group_attestations_remote", keyvalues={"group_id": group_id, "user_id": user_id}, updatevalues={ @@ -1046,7 +1046,7 @@ class GroupServerStore(SQLBaseStore): group_id (str) user_id (str) """ - return self._simple_delete( + return self.simple_delete( table="group_attestations_renewals", keyvalues={"group_id": group_id, "user_id": user_id}, desc="remove_attestation_renewal", @@ -1057,7 +1057,7 @@ class GroupServerStore(SQLBaseStore): """Get the attestation that proves the remote agrees that the user is in the group. """ - row = yield self._simple_select_one( + row = yield self.simple_select_one( table="group_attestations_remote", keyvalues={"group_id": group_id, "user_id": user_id}, retcols=("valid_until_ms", "attestation_json"), @@ -1072,7 +1072,7 @@ class GroupServerStore(SQLBaseStore): return None def get_joined_groups(self, user_id): - return self._simple_select_onecol( + return self.simple_select_onecol( table="local_group_membership", keyvalues={"user_id": user_id, "membership": "join"}, retcol="group_id", @@ -1188,7 +1188,7 @@ class GroupServerStore(SQLBaseStore): ] for table in tables: - self._simple_delete_txn( + self.simple_delete_txn( txn, table=table, keyvalues={"group_id": group_id} ) diff --git a/synapse/storage/data_stores/main/keys.py b/synapse/storage/data_stores/main/keys.py index ebc7db3ed6..c7150432b3 100644 --- a/synapse/storage/data_stores/main/keys.py +++ b/synapse/storage/data_stores/main/keys.py @@ -129,7 +129,7 @@ class KeyStore(SQLBaseStore): return self.runInteraction( "store_server_verify_keys", - self._simple_upsert_many_txn, + self.simple_upsert_many_txn, table="server_signature_keys", key_names=("server_name", "key_id"), key_values=key_values, @@ -157,7 +157,7 @@ class KeyStore(SQLBaseStore): ts_valid_until_ms (int): The time when this json stops being valid. key_json (bytes): The encoded JSON. """ - return self._simple_upsert( + return self.simple_upsert( table="server_keys_json", keyvalues={ "server_name": server_name, @@ -196,7 +196,7 @@ class KeyStore(SQLBaseStore): keyvalues["key_id"] = key_id if from_server is not None: keyvalues["from_server"] = from_server - rows = self._simple_select_list_txn( + rows = self.simple_select_list_txn( txn, "server_keys_json", keyvalues=keyvalues, diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py index 0f2887bdce..0cb9446f96 100644 --- a/synapse/storage/data_stores/main/media_repository.py +++ b/synapse/storage/data_stores/main/media_repository.py @@ -39,7 +39,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): Returns: None if the media_id doesn't exist. """ - return self._simple_select_one( + return self.simple_select_one( "local_media_repository", {"media_id": media_id}, ( @@ -64,7 +64,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): user_id, url_cache=None, ): - return self._simple_insert( + return self.simple_insert( "local_media_repository", { "media_id": media_id, @@ -129,7 +129,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): def store_url_cache( self, url, response_code, etag, expires_ts, og, media_id, download_ts ): - return self._simple_insert( + return self.simple_insert( "local_media_repository_url_cache", { "url": url, @@ -144,7 +144,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ) def get_local_media_thumbnails(self, media_id): - return self._simple_select_list( + return self.simple_select_list( "local_media_repository_thumbnails", {"media_id": media_id}, ( @@ -166,7 +166,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): thumbnail_method, thumbnail_length, ): - return self._simple_insert( + return self.simple_insert( "local_media_repository_thumbnails", { "media_id": media_id, @@ -180,7 +180,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ) def get_cached_remote_media(self, origin, media_id): - return self._simple_select_one( + return self.simple_select_one( "remote_media_cache", {"media_origin": origin, "media_id": media_id}, ( @@ -205,7 +205,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): upload_name, filesystem_id, ): - return self._simple_insert( + return self.simple_insert( "remote_media_cache", { "media_origin": origin, @@ -253,7 +253,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): return self.runInteraction("update_cached_last_access_time", update_cache_txn) def get_remote_media_thumbnails(self, origin, media_id): - return self._simple_select_list( + return self.simple_select_list( "remote_media_cache_thumbnails", {"media_origin": origin, "media_id": media_id}, ( @@ -278,7 +278,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): thumbnail_method, thumbnail_length, ): - return self._simple_insert( + return self.simple_insert( "remote_media_cache_thumbnails", { "media_origin": origin, @@ -300,18 +300,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): " WHERE last_access_ts < ?" ) - return self._execute( + return self.execute( "get_remote_media_before", self.cursor_to_dict, sql, before_ts ) def delete_remote_media(self, media_origin, media_id): def delete_remote_media_txn(txn): - self._simple_delete_txn( + self.simple_delete_txn( txn, "remote_media_cache", keyvalues={"media_origin": media_origin, "media_id": media_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, "remote_media_cache_thumbnails", keyvalues={"media_origin": media_origin, "media_id": media_id}, diff --git a/synapse/storage/data_stores/main/monthly_active_users.py b/synapse/storage/data_stores/main/monthly_active_users.py index b41c3d317a..b8fc28f97b 100644 --- a/synapse/storage/data_stores/main/monthly_active_users.py +++ b/synapse/storage/data_stores/main/monthly_active_users.py @@ -32,7 +32,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): self._clock = hs.get_clock() self.hs = hs # Do not add more reserved users than the total allowable number - self._new_transaction( + self.new_transaction( dbconn, "initialise_mau_threepids", [], @@ -261,7 +261,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): # never be a big table and alternative approaches (batching multiple # upserts into a single txn) introduced a lot of extra complexity. # See https://github.com/matrix-org/synapse/issues/3854 for more - is_insert = self._simple_upsert_txn( + is_insert = self.simple_upsert_txn( txn, table="monthly_active_users", keyvalues={"user_id": user_id}, @@ -281,7 +281,7 @@ class MonthlyActiveUsersStore(SQLBaseStore): """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="monthly_active_users", keyvalues={"user_id": user_id}, retcol="timestamp", diff --git a/synapse/storage/data_stores/main/openid.py b/synapse/storage/data_stores/main/openid.py index 79b40044d9..650e49750e 100644 --- a/synapse/storage/data_stores/main/openid.py +++ b/synapse/storage/data_stores/main/openid.py @@ -3,7 +3,7 @@ from synapse.storage._base import SQLBaseStore class OpenIdStore(SQLBaseStore): def insert_open_id_token(self, token, ts_valid_until_ms, user_id): - return self._simple_insert( + return self.simple_insert( table="open_id_tokens", values={ "token": token, diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py index 523ed6575e..a5e121efd1 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py @@ -46,7 +46,7 @@ class PresenceStore(SQLBaseStore): txn.call_after(self._get_presence_for_user.invalidate, (state.user_id,)) # Actually insert new rows - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="presence_stream", values=[ @@ -103,7 +103,7 @@ class PresenceStore(SQLBaseStore): inlineCallbacks=True, ) def get_presence_for_users(self, user_ids): - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="presence_stream", column="user_id", iterable=user_ids, @@ -129,7 +129,7 @@ class PresenceStore(SQLBaseStore): return self._presence_id_gen.get_current_token() def allow_presence_visible(self, observed_localpart, observer_userid): - return self._simple_insert( + return self.simple_insert( table="presence_allow_inbound", values={ "observed_user_id": observed_localpart, @@ -140,7 +140,7 @@ class PresenceStore(SQLBaseStore): ) def disallow_presence_visible(self, observed_localpart, observer_userid): - return self._simple_delete_one( + return self.simple_delete_one( table="presence_allow_inbound", keyvalues={ "observed_user_id": observed_localpart, diff --git a/synapse/storage/data_stores/main/profile.py b/synapse/storage/data_stores/main/profile.py index e4e8a1c1d6..c8b5b60301 100644 --- a/synapse/storage/data_stores/main/profile.py +++ b/synapse/storage/data_stores/main/profile.py @@ -24,7 +24,7 @@ class ProfileWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_profileinfo(self, user_localpart): try: - profile = yield self._simple_select_one( + profile = yield self.simple_select_one( table="profiles", keyvalues={"user_id": user_localpart}, retcols=("displayname", "avatar_url"), @@ -42,7 +42,7 @@ class ProfileWorkerStore(SQLBaseStore): ) def get_profile_displayname(self, user_localpart): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", @@ -50,7 +50,7 @@ class ProfileWorkerStore(SQLBaseStore): ) def get_profile_avatar_url(self, user_localpart): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="profiles", keyvalues={"user_id": user_localpart}, retcol="avatar_url", @@ -58,7 +58,7 @@ class ProfileWorkerStore(SQLBaseStore): ) def get_from_remote_profile_cache(self, user_id): - return self._simple_select_one( + return self.simple_select_one( table="remote_profile_cache", keyvalues={"user_id": user_id}, retcols=("displayname", "avatar_url"), @@ -67,12 +67,12 @@ class ProfileWorkerStore(SQLBaseStore): ) def create_profile(self, user_localpart): - return self._simple_insert( + return self.simple_insert( table="profiles", values={"user_id": user_localpart}, desc="create_profile" ) def set_profile_displayname(self, user_localpart, new_displayname): - return self._simple_update_one( + return self.simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"displayname": new_displayname}, @@ -80,7 +80,7 @@ class ProfileWorkerStore(SQLBaseStore): ) def set_profile_avatar_url(self, user_localpart, new_avatar_url): - return self._simple_update_one( + return self.simple_update_one( table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"avatar_url": new_avatar_url}, @@ -95,7 +95,7 @@ class ProfileStore(ProfileWorkerStore): This should only be called when `is_subscribed_remote_profile_for_user` would return true for the user. """ - return self._simple_upsert( + return self.simple_upsert( table="remote_profile_cache", keyvalues={"user_id": user_id}, values={ @@ -107,7 +107,7 @@ class ProfileStore(ProfileWorkerStore): ) def update_remote_profile_cache(self, user_id, displayname, avatar_url): - return self._simple_update( + return self.simple_update( table="remote_profile_cache", keyvalues={"user_id": user_id}, values={ @@ -125,7 +125,7 @@ class ProfileStore(ProfileWorkerStore): """ subscribed = yield self.is_subscribed_remote_profile_for_user(user_id) if not subscribed: - yield self._simple_delete( + yield self.simple_delete( table="remote_profile_cache", keyvalues={"user_id": user_id}, desc="delete_remote_profile_cache", @@ -155,7 +155,7 @@ class ProfileStore(ProfileWorkerStore): def is_subscribed_remote_profile_for_user(self, user_id): """Check whether we are interested in a remote user's profile. """ - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="group_users", keyvalues={"user_id": user_id}, retcol="user_id", @@ -166,7 +166,7 @@ class ProfileStore(ProfileWorkerStore): if res: return True - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="group_invites", keyvalues={"user_id": user_id}, retcol="user_id", diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index b520062d84..75bd499bcd 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -75,7 +75,7 @@ class PushRulesWorkerStore( def __init__(self, db_conn, hs): super(PushRulesWorkerStore, self).__init__(db_conn, hs) - push_rules_prefill, push_rules_id = self._get_cache_dict( + push_rules_prefill, push_rules_id = self.get_cache_dict( db_conn, "push_rules_stream", entity_column="user_id", @@ -100,7 +100,7 @@ class PushRulesWorkerStore( @cachedInlineCallbacks(max_entries=5000) def get_push_rules_for_user(self, user_id): - rows = yield self._simple_select_list( + rows = yield self.simple_select_list( table="push_rules", keyvalues={"user_name": user_id}, retcols=( @@ -124,7 +124,7 @@ class PushRulesWorkerStore( @cachedInlineCallbacks(max_entries=5000) def get_push_rules_enabled_for_user(self, user_id): - results = yield self._simple_select_list( + results = yield self.simple_select_list( table="push_rules_enable", keyvalues={"user_name": user_id}, retcols=("user_name", "rule_id", "enabled"), @@ -162,7 +162,7 @@ class PushRulesWorkerStore( results = {user_id: [] for user_id in user_ids} - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="push_rules", column="user_name", iterable=user_ids, @@ -320,7 +320,7 @@ class PushRulesWorkerStore( results = {user_id: {} for user_id in user_ids} - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="push_rules_enable", column="user_name", iterable=user_ids, @@ -395,7 +395,7 @@ class PushRuleStore(PushRulesWorkerStore): relative_to_rule = before or after - res = self._simple_select_one_txn( + res = self.simple_select_one_txn( txn, table="push_rules", keyvalues={"user_name": user_id, "rule_id": relative_to_rule}, @@ -499,7 +499,7 @@ class PushRuleStore(PushRulesWorkerStore): actions_json, update_stream=True, ): - """Specialised version of _simple_upsert_txn that picks a push_rule_id + """Specialised version of simple_upsert_txn that picks a push_rule_id using the _push_rule_id_gen if it needs to insert the rule. It assumes that the "push_rules" table is locked""" @@ -518,7 +518,7 @@ class PushRuleStore(PushRulesWorkerStore): # We didn't update a row with the given rule_id so insert one push_rule_id = self._push_rule_id_gen.get_next() - self._simple_insert_txn( + self.simple_insert_txn( txn, table="push_rules", values={ @@ -561,7 +561,7 @@ class PushRuleStore(PushRulesWorkerStore): """ def delete_push_rule_txn(txn, stream_id, event_stream_ordering): - self._simple_delete_one_txn( + self.simple_delete_one_txn( txn, "push_rules", {"user_name": user_id, "rule_id": rule_id} ) @@ -596,7 +596,7 @@ class PushRuleStore(PushRulesWorkerStore): self, txn, stream_id, event_stream_ordering, user_id, rule_id, enabled ): new_id = self._push_rules_enable_id_gen.get_next() - self._simple_upsert_txn( + self.simple_upsert_txn( txn, "push_rules_enable", {"user_name": user_id, "rule_id": rule_id}, @@ -636,7 +636,7 @@ class PushRuleStore(PushRulesWorkerStore): update_stream=False, ) else: - self._simple_update_one_txn( + self.simple_update_one_txn( txn, "push_rules", {"user_name": user_id, "rule_id": rule_id}, @@ -675,7 +675,7 @@ class PushRuleStore(PushRulesWorkerStore): if data is not None: values.update(data) - self._simple_insert_txn(txn, "push_rules_stream", values=values) + self.simple_insert_txn(txn, "push_rules_stream", values=values) txn.call_after(self.get_push_rules_for_user.invalidate, (user_id,)) txn.call_after(self.get_push_rules_enabled_for_user.invalidate, (user_id,)) diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py index d76861cdc0..d5a169872b 100644 --- a/synapse/storage/data_stores/main/pusher.py +++ b/synapse/storage/data_stores/main/pusher.py @@ -59,7 +59,7 @@ class PusherWorkerStore(SQLBaseStore): @defer.inlineCallbacks def user_has_pusher(self, user_id): - ret = yield self._simple_select_one_onecol( + ret = yield self.simple_select_one_onecol( "pushers", {"user_name": user_id}, "id", allow_none=True ) return ret is not None @@ -72,7 +72,7 @@ class PusherWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_pushers_by(self, keyvalues): - ret = yield self._simple_select_list( + ret = yield self.simple_select_list( "pushers", keyvalues, [ @@ -193,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore): inlineCallbacks=True, ) def get_if_users_have_pushers(self, user_ids): - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="pushers", column="user_name", iterable=user_ids, @@ -229,8 +229,8 @@ class PusherStore(PusherWorkerStore): ): with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on - # (app_id, pushkey, user_name) so _simple_upsert will retry - yield self._simple_upsert( + # (app_id, pushkey, user_name) so simple_upsert will retry + yield self.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ @@ -269,7 +269,7 @@ class PusherStore(PusherWorkerStore): txn, self.get_if_user_has_pusher, (user_id,) ) - self._simple_delete_one_txn( + self.simple_delete_one_txn( txn, "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, @@ -278,7 +278,7 @@ class PusherStore(PusherWorkerStore): # it's possible for us to end up with duplicate rows for # (app_id, pushkey, user_id) at different stream_ids, but that # doesn't really matter. - self._simple_insert_txn( + self.simple_insert_txn( txn, table="deleted_pushers", values={ @@ -296,7 +296,7 @@ class PusherStore(PusherWorkerStore): def update_pusher_last_stream_ordering( self, app_id, pushkey, user_id, last_stream_ordering ): - yield self._simple_update_one( + yield self.simple_update_one( "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, {"last_stream_ordering": last_stream_ordering}, @@ -319,7 +319,7 @@ class PusherStore(PusherWorkerStore): Returns: Deferred[bool]: True if the pusher still exists; False if it has been deleted. """ - updated = yield self._simple_update( + updated = yield self.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={ @@ -333,7 +333,7 @@ class PusherStore(PusherWorkerStore): @defer.inlineCallbacks def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self._simple_update( + yield self.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={"failing_since": failing_since}, @@ -342,7 +342,7 @@ class PusherStore(PusherWorkerStore): @defer.inlineCallbacks def get_throttle_params_by_room(self, pusher_id): - res = yield self._simple_select_list( + res = yield self.simple_select_list( "pusher_throttle", {"pusher": pusher_id}, ["room_id", "last_sent_ts", "throttle_ms"], @@ -361,8 +361,8 @@ class PusherStore(PusherWorkerStore): @defer.inlineCallbacks def set_throttle_params(self, pusher_id, room_id, params): # no need to lock because `pusher_throttle` has a primary key on - # (pusher, room_id) so _simple_upsert will retry - yield self._simple_upsert( + # (pusher, room_id) so simple_upsert will retry + yield self.simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, params, diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index 8b17334ff4..380f388e30 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -61,7 +61,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cached(num_args=2) def get_receipts_for_room(self, room_id, receipt_type): - return self._simple_select_list( + return self.simple_select_list( table="receipts_linearized", keyvalues={"room_id": room_id, "receipt_type": receipt_type}, retcols=("user_id", "event_id"), @@ -70,7 +70,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cached(num_args=3) def get_last_receipt_event_id_for_user(self, user_id, room_id, receipt_type): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="receipts_linearized", keyvalues={ "room_id": room_id, @@ -84,7 +84,7 @@ class ReceiptsWorkerStore(SQLBaseStore): @cachedInlineCallbacks(num_args=2) def get_receipts_for_user(self, user_id, receipt_type): - rows = yield self._simple_select_list( + rows = yield self.simple_select_list( table="receipts_linearized", keyvalues={"user_id": user_id, "receipt_type": receipt_type}, retcols=("room_id", "event_id"), @@ -335,7 +335,7 @@ class ReceiptsStore(ReceiptsWorkerStore): otherwise, the rx timestamp of the event that the RR corresponds to (or 0 if the event is unknown) """ - res = self._simple_select_one_txn( + res = self.simple_select_one_txn( txn, table="events", retcols=["stream_ordering", "received_ts"], @@ -388,7 +388,7 @@ class ReceiptsStore(ReceiptsWorkerStore): (user_id, room_id, receipt_type), ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="receipts_linearized", keyvalues={ @@ -398,7 +398,7 @@ class ReceiptsStore(ReceiptsWorkerStore): }, ) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="receipts_linearized", values={ @@ -514,7 +514,7 @@ class ReceiptsStore(ReceiptsWorkerStore): self._get_linearized_receipts_for_room.invalidate_many, (room_id,) ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="receipts_graph", keyvalues={ @@ -523,7 +523,7 @@ class ReceiptsStore(ReceiptsWorkerStore): "user_id": user_id, }, ) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="receipts_graph", values={ diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index 653c9318cb..debc6706f5 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -45,7 +45,7 @@ class RegistrationWorkerStore(SQLBaseStore): @cached() def get_user_by_id(self, user_id): - return self._simple_select_one( + return self.simple_select_one( table="users", keyvalues={"name": user_id}, retcols=[ @@ -109,7 +109,7 @@ class RegistrationWorkerStore(SQLBaseStore): otherwise int representation of the timestamp (as a number of milliseconds since epoch). """ - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="account_validity", keyvalues={"user_id": user_id}, retcol="expiration_ts_ms", @@ -137,7 +137,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ def set_account_validity_for_user_txn(txn): - self._simple_update_txn( + self.simple_update_txn( txn=txn, table="account_validity", keyvalues={"user_id": user_id}, @@ -167,7 +167,7 @@ class RegistrationWorkerStore(SQLBaseStore): Raises: StoreError: The provided token is already set for another user. """ - yield self._simple_update_one( + yield self.simple_update_one( table="account_validity", keyvalues={"user_id": user_id}, updatevalues={"renewal_token": renewal_token}, @@ -184,7 +184,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: defer.Deferred[str]: The ID of the user to which the token belongs. """ - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="account_validity", keyvalues={"renewal_token": renewal_token}, retcol="user_id", @@ -203,7 +203,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: defer.Deferred[str]: The renewal token associated with this user ID. """ - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="account_validity", keyvalues={"user_id": user_id}, retcol="renewal_token", @@ -250,7 +250,7 @@ class RegistrationWorkerStore(SQLBaseStore): email_sent (bool): Flag which indicates whether a renewal email has been sent to this user. """ - yield self._simple_update_one( + yield self.simple_update_one( table="account_validity", keyvalues={"user_id": user_id}, updatevalues={"email_sent": email_sent}, @@ -265,7 +265,7 @@ class RegistrationWorkerStore(SQLBaseStore): Args: user_id (str): ID of the user to remove from the account validity table. """ - yield self._simple_delete_one( + yield self.simple_delete_one( table="account_validity", keyvalues={"user_id": user_id}, desc="delete_account_validity_for_user", @@ -281,7 +281,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns (bool): true iff the user is a server admin, false otherwise. """ - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="users", keyvalues={"name": user.to_string()}, retcol="admin", @@ -299,7 +299,7 @@ class RegistrationWorkerStore(SQLBaseStore): admin (bool): true iff the user is to be a server admin, false otherwise. """ - return self._simple_update_one( + return self.simple_update_one( table="users", keyvalues={"name": user.to_string()}, updatevalues={"admin": 1 if admin else 0}, @@ -351,7 +351,7 @@ class RegistrationWorkerStore(SQLBaseStore): return res def is_real_user_txn(self, txn, user_id): - res = self._simple_select_one_onecol_txn( + res = self.simple_select_one_onecol_txn( txn=txn, table="users", keyvalues={"name": user_id}, @@ -361,7 +361,7 @@ class RegistrationWorkerStore(SQLBaseStore): return res is None def is_support_user_txn(self, txn, user_id): - res = self._simple_select_one_onecol_txn( + res = self.simple_select_one_onecol_txn( txn=txn, table="users", keyvalues={"name": user_id}, @@ -394,7 +394,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: str|None: the mxid of the user, or None if they are not known """ - return await self._simple_select_one_onecol( + return await self.simple_select_one_onecol( table="user_external_ids", keyvalues={"auth_provider": auth_provider, "external_id": external_id}, retcol="user_id", @@ -536,7 +536,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: str|None: user id or None if no user id/threepid mapping exists """ - ret = self._simple_select_one_txn( + ret = self.simple_select_one_txn( txn, "user_threepids", {"medium": medium, "address": address}, @@ -549,7 +549,7 @@ class RegistrationWorkerStore(SQLBaseStore): @defer.inlineCallbacks def user_add_threepid(self, user_id, medium, address, validated_at, added_at): - yield self._simple_upsert( + yield self.simple_upsert( "user_threepids", {"medium": medium, "address": address}, {"user_id": user_id, "validated_at": validated_at, "added_at": added_at}, @@ -557,7 +557,7 @@ class RegistrationWorkerStore(SQLBaseStore): @defer.inlineCallbacks def user_get_threepids(self, user_id): - ret = yield self._simple_select_list( + ret = yield self.simple_select_list( "user_threepids", {"user_id": user_id}, ["medium", "address", "validated_at", "added_at"], @@ -566,7 +566,7 @@ class RegistrationWorkerStore(SQLBaseStore): return ret def user_delete_threepid(self, user_id, medium, address): - return self._simple_delete( + return self.simple_delete( "user_threepids", keyvalues={"user_id": user_id, "medium": medium, "address": address}, desc="user_delete_threepid", @@ -579,7 +579,7 @@ class RegistrationWorkerStore(SQLBaseStore): user_id: The user id to delete all threepids of """ - return self._simple_delete( + return self.simple_delete( "user_threepids", keyvalues={"user_id": user_id}, desc="user_delete_threepids", @@ -601,7 +601,7 @@ class RegistrationWorkerStore(SQLBaseStore): """ # We need to use an upsert, in case they user had already bound the # threepid - return self._simple_upsert( + return self.simple_upsert( table="user_threepid_id_server", keyvalues={ "user_id": user_id, @@ -627,7 +627,7 @@ class RegistrationWorkerStore(SQLBaseStore): medium (str): The medium of the threepid (e.g "email") address (str): The address of the threepid (e.g "bob@example.com") """ - return self._simple_select_list( + return self.simple_select_list( table="user_threepid_id_server", keyvalues={"user_id": user_id}, retcols=["medium", "address"], @@ -648,7 +648,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: Deferred """ - return self._simple_delete( + return self.simple_delete( table="user_threepid_id_server", keyvalues={ "user_id": user_id, @@ -671,7 +671,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns: Deferred[list[str]]: Resolves to a list of identity servers """ - return self._simple_select_onecol( + return self.simple_select_onecol( table="user_threepid_id_server", keyvalues={"user_id": user_id, "medium": medium, "address": address}, retcol="id_server", @@ -689,7 +689,7 @@ class RegistrationWorkerStore(SQLBaseStore): defer.Deferred(bool): The requested value. """ - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="users", keyvalues={"name": user_id}, retcol="deactivated", @@ -776,12 +776,12 @@ class RegistrationWorkerStore(SQLBaseStore): """ def delete_threepid_session_txn(txn): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="threepid_validation_token", keyvalues={"session_id": session_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="threepid_validation_session", keyvalues={"session_id": session_id}, @@ -961,7 +961,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): """ next_id = self._access_tokens_id_gen.get_next() - yield self._simple_insert( + yield self.simple_insert( "access_tokens", { "id": next_id, @@ -1037,7 +1037,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): # Ensure that the guest user actually exists # ``allow_none=False`` makes this raise an exception # if the row isn't in the database. - self._simple_select_one_txn( + self.simple_select_one_txn( txn, "users", keyvalues={"name": user_id, "is_guest": 1}, @@ -1045,7 +1045,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): allow_none=False, ) - self._simple_update_one_txn( + self.simple_update_one_txn( txn, "users", keyvalues={"name": user_id, "is_guest": 1}, @@ -1059,7 +1059,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): }, ) else: - self._simple_insert_txn( + self.simple_insert_txn( txn, "users", values={ @@ -1114,7 +1114,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): external_id: id on that system user_id: complete mxid that it is mapped to """ - return self._simple_insert( + return self.simple_insert( table="user_external_ids", values={ "auth_provider": auth_provider, @@ -1132,7 +1132,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): """ def user_set_password_hash_txn(txn): - self._simple_update_one_txn( + self.simple_update_one_txn( txn, "users", {"name": user_id}, {"password_hash": password_hash} ) self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) @@ -1152,7 +1152,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): """ def f(txn): - self._simple_update_one_txn( + self.simple_update_one_txn( txn, table="users", keyvalues={"name": user_id}, @@ -1176,7 +1176,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): """ def f(txn): - self._simple_update_one_txn( + self.simple_update_one_txn( txn, table="users", keyvalues={"name": user_id}, @@ -1234,7 +1234,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): def delete_access_token(self, access_token): def f(txn): - self._simple_delete_one_txn( + self.simple_delete_one_txn( txn, table="access_tokens", keyvalues={"token": access_token} ) @@ -1246,7 +1246,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): @cachedInlineCallbacks() def is_guest(self, user_id): - res = yield self._simple_select_one_onecol( + res = yield self.simple_select_one_onecol( table="users", keyvalues={"name": user_id}, retcol="is_guest", @@ -1261,7 +1261,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): Adds a user to the table of users who need to be parted from all the rooms they're in """ - return self._simple_insert( + return self.simple_insert( "users_pending_deactivation", values={"user_id": user_id}, desc="add_user_pending_deactivation", @@ -1274,7 +1274,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): """ # XXX: This should be simple_delete_one but we failed to put a unique index on # the table, so somehow duplicate entries have ended up in it. - return self._simple_delete( + return self.simple_delete( "users_pending_deactivation", keyvalues={"user_id": user_id}, desc="del_user_pending_deactivation", @@ -1285,7 +1285,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): Gets one user from the table of users waiting to be parted from all the rooms they're in. """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( "users_pending_deactivation", keyvalues={}, retcol="user_id", @@ -1315,7 +1315,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): # Insert everything into a transaction in order to run atomically def validate_threepid_session_txn(txn): - row = self._simple_select_one_txn( + row = self.simple_select_one_txn( txn, table="threepid_validation_session", keyvalues={"session_id": session_id}, @@ -1333,7 +1333,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): 400, "This client_secret does not match the provided session_id" ) - row = self._simple_select_one_txn( + row = self.simple_select_one_txn( txn, table="threepid_validation_token", keyvalues={"session_id": session_id, "token": token}, @@ -1358,7 +1358,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): ) # Looks good. Validate the session - self._simple_update_txn( + self.simple_update_txn( txn, table="threepid_validation_session", keyvalues={"session_id": session_id}, @@ -1401,7 +1401,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): if validated_at: insertion_values["validated_at"] = validated_at - return self._simple_upsert( + return self.simple_upsert( table="threepid_validation_session", keyvalues={"session_id": session_id}, values={"last_send_attempt": send_attempt}, @@ -1439,7 +1439,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): def start_or_continue_validation_session_txn(txn): # Create or update a validation session - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="threepid_validation_session", keyvalues={"session_id": session_id}, @@ -1452,7 +1452,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): ) # Create a new validation token with this session ID - self._simple_insert_txn( + self.simple_insert_txn( txn, table="threepid_validation_token", values={ @@ -1501,7 +1501,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): ) def set_user_deactivated_status_txn(self, txn, user_id, deactivated): - self._simple_update_one_txn( + self.simple_update_one_txn( txn=txn, table="users", keyvalues={"name": user_id}, @@ -1560,7 +1560,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): expiration_ts, ) - self._simple_upsert_txn( + self.simple_upsert_txn( txn, "account_validity", keyvalues={"user_id": user_id}, diff --git a/synapse/storage/data_stores/main/rejections.py b/synapse/storage/data_stores/main/rejections.py index 7d5de0ea2e..f81f9279a1 100644 --- a/synapse/storage/data_stores/main/rejections.py +++ b/synapse/storage/data_stores/main/rejections.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) class RejectionsStore(SQLBaseStore): def _store_rejections_txn(self, txn, event_id, reason): - self._simple_insert_txn( + self.simple_insert_txn( txn, table="rejections", values={ @@ -33,7 +33,7 @@ class RejectionsStore(SQLBaseStore): ) def get_rejection_reason(self, event_id): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="rejections", retcol="reason", keyvalues={"event_id": event_id}, diff --git a/synapse/storage/data_stores/main/relations.py b/synapse/storage/data_stores/main/relations.py index 858f65582b..aa5e10538b 100644 --- a/synapse/storage/data_stores/main/relations.py +++ b/synapse/storage/data_stores/main/relations.py @@ -352,7 +352,7 @@ class RelationsStore(RelationsWorkerStore): aggregation_key = relation.get("key") - self._simple_insert_txn( + self.simple_insert_txn( txn, table="event_relations", values={ @@ -380,6 +380,6 @@ class RelationsStore(RelationsWorkerStore): redacted_event_id (str): The event that was redacted. """ - self._simple_delete_txn( + self.simple_delete_txn( txn, table="event_relations", keyvalues={"event_id": redacted_event_id} ) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index dd1f42c23a..f309e3640c 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -54,7 +54,7 @@ class RoomWorkerStore(SQLBaseStore): Returns: A dict containing the room information, or None if the room is unknown. """ - return self._simple_select_one( + return self.simple_select_one( table="rooms", keyvalues={"room_id": room_id}, retcols=("room_id", "is_public", "creator"), @@ -63,7 +63,7 @@ class RoomWorkerStore(SQLBaseStore): ) def get_public_room_ids(self): - return self._simple_select_onecol( + return self.simple_select_onecol( table="rooms", keyvalues={"is_public": True}, retcol="room_id", @@ -267,7 +267,7 @@ class RoomWorkerStore(SQLBaseStore): @cached(max_entries=10000) def is_room_blocked(self, room_id): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="blocked_rooms", keyvalues={"room_id": room_id}, retcol="1", @@ -288,7 +288,7 @@ class RoomWorkerStore(SQLBaseStore): of RatelimitOverride are None or 0 then ratelimitng has been disabled for that user entirely. """ - row = yield self._simple_select_one( + row = yield self.simple_select_one( table="ratelimit_override", keyvalues={"user_id": user_id}, retcols=("messages_per_second", "burst_count"), @@ -408,7 +408,7 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore): ev = json.loads(row["json"]) retention_policy = json.dumps(ev["content"]) - self._simple_insert_txn( + self.simple_insert_txn( txn=txn, table="room_retention", values={ @@ -461,7 +461,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): try: def store_room_txn(txn, next_id): - self._simple_insert_txn( + self.simple_insert_txn( txn, "rooms", { @@ -471,7 +471,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): }, ) if is_public: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="public_room_list_stream", values={ @@ -490,14 +490,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): def set_room_is_public_txn(txn, next_id): - self._simple_update_one_txn( + self.simple_update_one_txn( txn, table="rooms", keyvalues={"room_id": room_id}, updatevalues={"is_public": is_public}, ) - entries = self._simple_select_list_txn( + entries = self.simple_select_list_txn( txn, table="public_room_list_stream", keyvalues={ @@ -515,7 +515,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): add_to_stream = bool(entries[-1]["visibility"]) != is_public if add_to_stream: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="public_room_list_stream", values={ @@ -555,7 +555,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def set_room_is_public_appservice_txn(txn, next_id): if is_public: try: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="appservice_room_list", values={ @@ -568,7 +568,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): # We've already inserted, nothing to do. return else: - self._simple_delete_txn( + self.simple_delete_txn( txn, table="appservice_room_list", keyvalues={ @@ -578,7 +578,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): }, ) - entries = self._simple_select_list_txn( + entries = self.simple_select_list_txn( txn, table="public_room_list_stream", keyvalues={ @@ -596,7 +596,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): add_to_stream = bool(entries[-1]["visibility"]) != is_public if add_to_stream: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="public_room_list_stream", values={ @@ -660,7 +660,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): # Ignore the event if one of the value isn't an integer. return - self._simple_insert_txn( + self.simple_insert_txn( txn=txn, table="room_retention", values={ @@ -679,7 +679,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): self, room_id, event_id, user_id, reason, content, received_ts ): next_id = self._event_reports_id_gen.get_next() - return self._simple_insert( + return self.simple_insert( table="event_reports", values={ "id": next_id, @@ -725,7 +725,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): Returns: Deferred """ - yield self._simple_upsert( + yield self.simple_upsert( table="blocked_rooms", keyvalues={"room_id": room_id}, values={}, diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index 2af24a20b7..fe2428a281 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from typing import Iterable, List from six import iteritems, itervalues @@ -127,7 +128,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): membership column is up to date """ - pending_update = self._simple_select_one_txn( + pending_update = self.simple_select_one_txn( txn, table="background_updates", keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME}, @@ -602,7 +603,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): to `user_id` and ProfileInfo (or None if not join event). """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="room_memberships", column="event_id", iterable=event_ids, @@ -642,7 +643,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # the returned user actually has the correct domain. like_clause = "%:" + host - rows = yield self._execute("is_host_joined", None, sql, room_id, like_clause) + rows = yield self.execute("is_host_joined", None, sql, room_id, like_clause) if not rows: return False @@ -682,7 +683,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): # the returned user actually has the correct domain. like_clause = "%:" + host - rows = yield self._execute("was_host_joined", None, sql, room_id, like_clause) + rows = yield self.execute("was_host_joined", None, sql, room_id, like_clause) if not rows: return False @@ -804,7 +805,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): Deferred[set[str]]: Set of room IDs. """ - room_ids = yield self._simple_select_onecol( + room_ids = yield self.simple_select_onecol( table="room_memberships", keyvalues={"membership": Membership.JOIN, "user_id": user_id}, retcol="room_id", @@ -813,6 +814,22 @@ class RoomMemberWorkerStore(EventsWorkerStore): return set(room_ids) + def get_membership_from_event_ids( + self, member_event_ids: Iterable[str] + ) -> List[dict]: + """Get user_id and membership of a set of event IDs. + """ + + return self.simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=member_event_ids, + retcols=("user_id", "membership", "event_id"), + keyvalues={}, + batch_size=500, + desc="get_membership_from_event_ids", + ) + class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore): def __init__(self, db_conn, hs): @@ -973,7 +990,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): def _store_room_members_txn(self, txn, events, backfilled): """Store a room member in the database. """ - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="room_memberships", values=[ @@ -1011,7 +1028,7 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): is_mine = self.hs.is_mine_id(event.state_key) if is_new_state and is_mine: if event.membership == Membership.INVITE: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="local_invites", values={ diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py index d1d7c6863d..f735cf095c 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py @@ -441,7 +441,7 @@ class SearchStore(SearchBackgroundUpdateStore): # entire table from the database. sql += " ORDER BY rank DESC LIMIT 500" - results = yield self._execute("search_msgs", self.cursor_to_dict, sql, *args) + results = yield self.execute("search_msgs", self.cursor_to_dict, sql, *args) results = list(filter(lambda row: row["room_id"] in room_ids, results)) @@ -455,7 +455,7 @@ class SearchStore(SearchBackgroundUpdateStore): count_sql += " GROUP BY room_id" - count_results = yield self._execute( + count_results = yield self.execute( "search_rooms_count", self.cursor_to_dict, count_sql, *count_args ) @@ -586,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore): args.append(limit) - results = yield self._execute("search_rooms", self.cursor_to_dict, sql, *args) + results = yield self.execute("search_rooms", self.cursor_to_dict, sql, *args) results = list(filter(lambda row: row["room_id"] in room_ids, results)) @@ -600,7 +600,7 @@ class SearchStore(SearchBackgroundUpdateStore): count_sql += " GROUP BY room_id" - count_results = yield self._execute( + count_results = yield self.execute( "search_rooms_count", self.cursor_to_dict, count_sql, *count_args ) diff --git a/synapse/storage/data_stores/main/signatures.py b/synapse/storage/data_stores/main/signatures.py index 556191b76f..f3da29ce14 100644 --- a/synapse/storage/data_stores/main/signatures.py +++ b/synapse/storage/data_stores/main/signatures.py @@ -98,4 +98,4 @@ class SignatureStore(SignatureWorkerStore): } ) - self._simple_insert_many_txn(txn, table="event_reference_hashes", values=vals) + self.simple_insert_many_txn(txn, table="event_reference_hashes", values=vals) diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 6a90daea31..2b33ec1a35 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -89,7 +89,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): count = 0 while next_group: - next_group = self._simple_select_one_onecol_txn( + next_group = self.simple_select_one_onecol_txn( txn, table="state_group_edges", keyvalues={"state_group": next_group}, @@ -192,7 +192,7 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): ): break - next_group = self._simple_select_one_onecol_txn( + next_group = self.simple_select_one_onecol_txn( txn, table="state_group_edges", keyvalues={"state_group": next_group}, @@ -431,7 +431,7 @@ class StateGroupWorkerStore( """ def _get_state_group_delta_txn(txn): - prev_group = self._simple_select_one_onecol_txn( + prev_group = self.simple_select_one_onecol_txn( txn, table="state_group_edges", keyvalues={"state_group": state_group}, @@ -442,7 +442,7 @@ class StateGroupWorkerStore( if not prev_group: return _GetStateGroupDelta(None, None) - delta_ids = self._simple_select_list_txn( + delta_ids = self.simple_select_list_txn( txn, table="state_groups_state", keyvalues={"state_group": state_group}, @@ -644,7 +644,7 @@ class StateGroupWorkerStore( @cached(max_entries=50000) def _get_state_group_for_event(self, event_id): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="event_to_state_groups", keyvalues={"event_id": event_id}, retcol="state_group", @@ -661,7 +661,7 @@ class StateGroupWorkerStore( def _get_state_group_for_events(self, event_ids): """Returns mapping event_id -> state_group """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="event_to_state_groups", column="event_id", iterable=event_ids, @@ -902,7 +902,7 @@ class StateGroupWorkerStore( state_group = self.database_engine.get_next_state_group_id(txn) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="state_groups", values={"id": state_group, "room_id": room_id, "event_id": event_id}, @@ -911,7 +911,7 @@ class StateGroupWorkerStore( # We persist as a delta if we can, while also ensuring the chain # of deltas isn't tooo long, as otherwise read performance degrades. if prev_group: - is_in_db = self._simple_select_one_onecol_txn( + is_in_db = self.simple_select_one_onecol_txn( txn, table="state_groups", keyvalues={"id": prev_group}, @@ -926,13 +926,13 @@ class StateGroupWorkerStore( potential_hops = self._count_state_group_hops_txn(txn, prev_group) if prev_group and potential_hops < MAX_STATE_DELTA_HOPS: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="state_group_edges", values={"state_group": state_group, "prev_state_group": prev_group}, ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="state_groups_state", values=[ @@ -947,7 +947,7 @@ class StateGroupWorkerStore( ], ) else: - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="state_groups_state", values=[ @@ -1007,7 +1007,7 @@ class StateGroupWorkerStore( referenced. """ - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="event_to_state_groups", column="state_group", iterable=state_groups, @@ -1065,7 +1065,7 @@ class StateBackgroundUpdateStore( batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR)) if max_group is None: - rows = yield self._execute( + rows = yield self.execute( "_background_deduplicate_state", None, "SELECT coalesce(max(id), 0) FROM state_groups", @@ -1135,13 +1135,13 @@ class StateBackgroundUpdateStore( if prev_state.get(key, None) != value } - self._simple_delete_txn( + self.simple_delete_txn( txn, table="state_group_edges", keyvalues={"state_group": state_group}, ) - self._simple_insert_txn( + self.simple_insert_txn( txn, table="state_group_edges", values={ @@ -1150,13 +1150,13 @@ class StateBackgroundUpdateStore( }, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="state_groups_state", keyvalues={"state_group": state_group}, ) - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="state_groups_state", values=[ @@ -1263,7 +1263,7 @@ class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore): state_groups[event.event_id] = context.state_group - self._simple_insert_many_txn( + self.simple_insert_many_txn( txn, table="event_to_state_groups", values=[ diff --git a/synapse/storage/data_stores/main/state_deltas.py b/synapse/storage/data_stores/main/state_deltas.py index 28f33ec18f..03b908026b 100644 --- a/synapse/storage/data_stores/main/state_deltas.py +++ b/synapse/storage/data_stores/main/state_deltas.py @@ -105,7 +105,7 @@ class StateDeltasStore(SQLBaseStore): ) def _get_max_stream_id_in_current_state_deltas_txn(self, txn): - return self._simple_select_one_onecol_txn( + return self.simple_select_one_onecol_txn( txn, table="current_state_delta_stream", keyvalues={}, diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py index 45b3de7d56..3aeba859fd 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py @@ -186,7 +186,7 @@ class StatsStore(StateDeltasStore): """ Returns the stats processor positions. """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="stats_incremental_position", keyvalues={}, retcol="stream_id", @@ -215,7 +215,7 @@ class StatsStore(StateDeltasStore): if field and "\0" in field: fields[col] = None - return self._simple_upsert( + return self.simple_upsert( table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, @@ -257,7 +257,7 @@ class StatsStore(StateDeltasStore): ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] ) - slice_list = self._simple_select_list_paginate_txn( + slice_list = self.simple_select_list_paginate_txn( txn, table + "_historical", {id_col: stats_id}, @@ -282,7 +282,7 @@ class StatsStore(StateDeltasStore): "name", "topic", "canonical_alias", "avatar", "join_rules", "history_visibility" """ - return self._simple_select_one( + return self.simple_select_one( "room_stats_state", {"room_id": room_id}, retcols=( @@ -308,7 +308,7 @@ class StatsStore(StateDeltasStore): """ table, id_col = TYPE_TO_TABLE[stats_type] - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( "%s_current" % (table,), keyvalues={id_col: id}, retcol="completed_delta_stream_id", @@ -344,7 +344,7 @@ class StatsStore(StateDeltasStore): complete_with_stream_id=stream_id, ) - self._simple_update_one_txn( + self.simple_update_one_txn( txn, table="stats_incremental_position", keyvalues={}, @@ -517,17 +517,17 @@ class StatsStore(StateDeltasStore): else: self.database_engine.lock_table(txn, table) retcols = list(chain(absolutes.keys(), additive_relatives.keys())) - current_row = self._simple_select_one_txn( + current_row = self.simple_select_one_txn( txn, table, keyvalues, retcols, allow_none=True ) if current_row is None: merged_dict = {**keyvalues, **absolutes, **additive_relatives} - self._simple_insert_txn(txn, table, merged_dict) + self.simple_insert_txn(txn, table, merged_dict) else: for (key, val) in additive_relatives.items(): current_row[key] += val current_row.update(absolutes) - self._simple_update_one_txn(txn, table, keyvalues, current_row) + self.simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( self, @@ -614,11 +614,11 @@ class StatsStore(StateDeltasStore): txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, into_table) - src_row = self._simple_select_one_txn( + src_row = self.simple_select_one_txn( txn, src_table, keyvalues, copy_columns ) all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues} - dest_current_row = self._simple_select_one_txn( + dest_current_row = self.simple_select_one_txn( txn, into_table, keyvalues=all_dest_keyvalues, @@ -634,11 +634,11 @@ class StatsStore(StateDeltasStore): **src_row, **additive_relatives, } - self._simple_insert_txn(txn, into_table, merged_dict) + self.simple_insert_txn(txn, into_table, merged_dict) else: for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val - self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) + self.simple_update_txn(txn, into_table, all_dest_keyvalues, src_row) def get_changes_room_total_events_and_bytes(self, min_pos, max_pos): """Fetches the counts of events in the given range of stream IDs. @@ -735,7 +735,7 @@ class StatsStore(StateDeltasStore): def _fetch_current_state_stats(txn): pos = self.get_room_max_stream_ordering() - rows = self._simple_select_many_txn( + rows = self.simple_select_many_txn( txn, table="current_state_events", column="type", diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index 21a410afd0..60487c4559 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -255,7 +255,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): super(StreamWorkerStore, self).__init__(db_conn, hs) events_max = self.get_room_max_stream_ordering() - event_cache_prefill, min_event_val = self._get_cache_dict( + event_cache_prefill, min_event_val = self.get_cache_dict( db_conn, "events", entity_column="room_id", @@ -576,7 +576,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: A deferred "s%d" stream token. """ - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering" ).addCallback(lambda row: "s%d" % (row,)) @@ -589,7 +589,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: A deferred "t%d-%d" topological token. """ - return self._simple_select_one( + return self.simple_select_one( table="events", keyvalues={"event_id": event_id}, retcols=("stream_ordering", "topological_ordering"), @@ -613,7 +613,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "SELECT coalesce(max(topological_ordering), 0) FROM events" " WHERE room_id = ? AND stream_ordering < ?" ) - return self._execute( + return self.execute( "get_max_topological_token", None, sql, room_id, stream_key ).addCallback(lambda r: r[0][0] if r else 0) @@ -709,7 +709,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): dict """ - results = self._simple_select_one_txn( + results = self.simple_select_one_txn( txn, "events", keyvalues={"event_id": event_id, "room_id": room_id}, @@ -797,7 +797,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return upper_bound, events def get_federation_out_pos(self, typ): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="federation_stream_position", retcol="stream_id", keyvalues={"type": typ}, @@ -805,7 +805,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) def update_federation_out_pos(self, typ, stream_id): - return self._simple_update_one( + return self.simple_update_one( table="federation_stream_position", keyvalues={"type": typ}, updatevalues={"stream_id": stream_id}, diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index aa24339717..85012403be 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -41,7 +41,7 @@ class TagsWorkerStore(AccountDataWorkerStore): tag strings to tag content. """ - deferred = self._simple_select_list( + deferred = self.simple_select_list( "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"] ) @@ -153,7 +153,7 @@ class TagsWorkerStore(AccountDataWorkerStore): Returns: A deferred list of string tags. """ - return self._simple_select_list( + return self.simple_select_list( table="room_tags", keyvalues={"user_id": user_id, "room_id": room_id}, retcols=("tag", "content"), @@ -178,7 +178,7 @@ class TagsStore(TagsWorkerStore): content_json = json.dumps(content) def add_tag_txn(txn, next_id): - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="room_tags", keyvalues={"user_id": user_id, "room_id": room_id, "tag": tag}, diff --git a/synapse/storage/data_stores/main/transactions.py b/synapse/storage/data_stores/main/transactions.py index 01b1be5e14..c162f3ea16 100644 --- a/synapse/storage/data_stores/main/transactions.py +++ b/synapse/storage/data_stores/main/transactions.py @@ -85,7 +85,7 @@ class TransactionStore(SQLBaseStore): ) def _get_received_txn_response(self, txn, transaction_id, origin): - result = self._simple_select_one_txn( + result = self.simple_select_one_txn( txn, table="received_transactions", keyvalues={"transaction_id": transaction_id, "origin": origin}, @@ -119,7 +119,7 @@ class TransactionStore(SQLBaseStore): response_json (str) """ - return self._simple_insert( + return self.simple_insert( table="received_transactions", values={ "transaction_id": transaction_id, @@ -160,7 +160,7 @@ class TransactionStore(SQLBaseStore): return result def _get_destination_retry_timings(self, txn, destination): - result = self._simple_select_one_txn( + result = self.simple_select_one_txn( txn, table="destinations", keyvalues={"destination": destination}, @@ -227,7 +227,7 @@ class TransactionStore(SQLBaseStore): # We need to be careful here as the data may have changed from under us # due to a worker setting the timings. - prev_row = self._simple_select_one_txn( + prev_row = self.simple_select_one_txn( txn, table="destinations", keyvalues={"destination": destination}, @@ -236,7 +236,7 @@ class TransactionStore(SQLBaseStore): ) if not prev_row: - self._simple_insert_txn( + self.simple_insert_txn( txn, table="destinations", values={ @@ -247,7 +247,7 @@ class TransactionStore(SQLBaseStore): }, ) elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: - self._simple_update_one_txn( + self.simple_update_one_txn( txn, "destinations", keyvalues={"destination": destination}, diff --git a/synapse/storage/data_stores/main/user_directory.py b/synapse/storage/data_stores/main/user_directory.py index 652abe0e6a..1a85aabbfb 100644 --- a/synapse/storage/data_stores/main/user_directory.py +++ b/synapse/storage/data_stores/main/user_directory.py @@ -85,7 +85,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore """ txn.execute(sql) rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] - self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) + self.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) del rooms # If search all users is on, get all the users we want to add. @@ -100,13 +100,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore txn.execute("SELECT name FROM users") users = [{"user_id": x[0]} for x in txn.fetchall()] - self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) + self.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) new_pos = yield self.get_max_stream_id_in_current_state_deltas() yield self.runInteraction( "populate_user_directory_temp_build", _make_staging_area ) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) + yield self.simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) yield self._end_background_update("populate_user_directory_createtables") return 1 @@ -116,7 +116,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore """ Update the user directory stream position, then clean up the old tables. """ - position = yield self._simple_select_one_onecol( + position = yield self.simple_select_one_onecol( TEMP_TABLE + "_position", None, "position" ) yield self.update_user_directory_stream_pos(position) @@ -243,7 +243,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore to_insert.clear() # We've finished a room. Delete it from the table. - yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id}) + yield self.simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id}) # Update the remaining counter. progress["remaining"] -= 1 yield self.runInteraction( @@ -312,7 +312,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore ) # We've finished processing a user. Delete it from the table. - yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id}) + yield self.simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id}) # Update the remaining counter. progress["remaining"] -= 1 yield self.runInteraction( @@ -361,7 +361,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore """ def _update_profile_in_user_dir_txn(txn): - new_entry = self._simple_upsert_txn( + new_entry = self.simple_upsert_txn( txn, table="user_directory", keyvalues={"user_id": user_id}, @@ -435,7 +435,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore ) elif isinstance(self.database_engine, Sqlite3Engine): value = "%s %s" % (user_id, display_name) if display_name else user_id - self._simple_upsert_txn( + self.simple_upsert_txn( txn, table="user_directory_search", keyvalues={"user_id": user_id}, @@ -462,7 +462,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore """ def _add_users_who_share_room_txn(txn): - self._simple_upsert_many_txn( + self.simple_upsert_many_txn( txn, table="users_who_share_private_rooms", key_names=["user_id", "other_user_id", "room_id"], @@ -489,7 +489,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore def _add_users_in_public_rooms_txn(txn): - self._simple_upsert_many_txn( + self.simple_upsert_many_txn( txn, table="users_in_public_rooms", key_names=["user_id", "room_id"], @@ -519,7 +519,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore @cached() def get_user_in_directory(self, user_id): - return self._simple_select_one( + return self.simple_select_one( table="user_directory", keyvalues={"user_id": user_id}, retcols=("display_name", "avatar_url"), @@ -528,7 +528,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore ) def update_user_directory_stream_pos(self, stream_id): - return self._simple_update_one( + return self.simple_update_one( table="user_directory_stream_pos", keyvalues={}, updatevalues={"stream_id": stream_id}, @@ -547,21 +547,21 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): def remove_from_user_dir(self, user_id): def _remove_from_user_dir_txn(txn): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="user_directory", keyvalues={"user_id": user_id} ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="user_directory_search", keyvalues={"user_id": user_id} ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="users_who_share_private_rooms", keyvalues={"user_id": user_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="users_who_share_private_rooms", keyvalues={"other_user_id": user_id}, @@ -575,14 +575,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): """Get all user_ids that are in the room directory because they're in the given room_id """ - user_ids_share_pub = yield self._simple_select_onecol( + user_ids_share_pub = yield self.simple_select_onecol( table="users_in_public_rooms", keyvalues={"room_id": room_id}, retcol="user_id", desc="get_users_in_dir_due_to_room", ) - user_ids_share_priv = yield self._simple_select_onecol( + user_ids_share_priv = yield self.simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"room_id": room_id}, retcol="other_user_id", @@ -605,17 +605,17 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): """ def _remove_user_who_share_room_txn(txn): - self._simple_delete_txn( + self.simple_delete_txn( txn, table="users_who_share_private_rooms", keyvalues={"user_id": user_id, "room_id": room_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="users_who_share_private_rooms", keyvalues={"other_user_id": user_id, "room_id": room_id}, ) - self._simple_delete_txn( + self.simple_delete_txn( txn, table="users_in_public_rooms", keyvalues={"user_id": user_id, "room_id": room_id}, @@ -636,14 +636,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): Returns: list: user_id """ - rows = yield self._simple_select_onecol( + rows = yield self.simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"user_id": user_id}, retcol="room_id", desc="get_rooms_user_is_in", ) - pub_rows = yield self._simple_select_onecol( + pub_rows = yield self.simple_select_onecol( table="users_in_public_rooms", keyvalues={"user_id": user_id}, retcol="room_id", @@ -674,14 +674,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): ) f2 USING (room_id) """ - rows = yield self._execute( + rows = yield self.execute( "get_rooms_in_common_for_users", None, sql, user_id, other_user_id ) return [room_id for room_id, in rows] def get_user_directory_stream_pos(self): - return self._simple_select_one_onecol( + return self.simple_select_one_onecol( table="user_directory_stream_pos", keyvalues={}, retcol="stream_id", @@ -786,9 +786,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): # This should be unreachable. raise Exception("Unrecognized database engine") - results = yield self._execute( - "search_user_dir", self.cursor_to_dict, sql, *args - ) + results = yield self.execute("search_user_dir", self.cursor_to_dict, sql, *args) limited = len(results) > limit diff --git a/synapse/storage/data_stores/main/user_erasure_store.py b/synapse/storage/data_stores/main/user_erasure_store.py index aa4f0da5f0..37860af070 100644 --- a/synapse/storage/data_stores/main/user_erasure_store.py +++ b/synapse/storage/data_stores/main/user_erasure_store.py @@ -31,7 +31,7 @@ class UserErasureWorkerStore(SQLBaseStore): Returns: Deferred[bool]: True if the user has requested erasure """ - return self._simple_select_onecol( + return self.simple_select_onecol( table="erased_users", keyvalues={"user_id": user_id}, retcol="1", @@ -56,7 +56,7 @@ class UserErasureWorkerStore(SQLBaseStore): # iterate it multiple times, and (b) avoiding duplicates. user_ids = tuple(set(user_ids)) - rows = yield self._simple_select_many_batch( + rows = yield self.simple_select_many_batch( table="erased_users", column="user_id", iterable=user_ids, |