diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 28 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 8 | ||||
-rw-r--r-- | synapse/storage/databases/main/keys.py | 17 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 6 | ||||
-rw-r--r-- | synapse/storage/engines/_base.py | 6 | ||||
-rw-r--r-- | synapse/storage/engines/postgres.py | 4 | ||||
-rw-r--r-- | synapse/storage/engines/sqlite.py | 4 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/48/group_unique_indexes.py | 4 |
8 files changed, 49 insertions, 28 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index b471fcb064..744e98c6d0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): table="devices", column="user_id", iterable=user_ids_to_query, - keyvalues={"user_id": user_id, "hidden": False}, + keyvalues={"hidden": False}, retcols=("device_id",), ) @@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore): @trace async def delete_messages_for_device( - self, user_id: str, device_id: Optional[str], up_to_stream_id: int + self, + user_id: str, + device_id: Optional[str], + up_to_stream_id: int, + limit: int, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. + limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 + ROW_ID_NAME = self.database_engine.row_id_name + def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) + sql = f""" + DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( + SELECT {ROW_ID_NAME} FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + LIMIT {limit} + ) + """ txn.execute(sql, (user_id, device_id, up_to_stream_id)) return txn.rowcount @@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) + # In this case we don't know if we hit the limit or the delete is complete + # so let's not update the cache. + if count == limit: + return count + # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index e4162f846b..324fdfa892 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1766,14 +1766,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_delete_many_txn( txn, - table="device_inbox", - column="device_id", - values=device_ids, - keyvalues={"user_id": user_id}, - ) - - self.db_pool.simple_delete_many_txn( - txn, table="device_auth_providers", column="device_id", values=device_ids, diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index a3b4744855..57aa4921e1 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -221,12 +221,17 @@ class KeyStore(CacheInvalidationWorkerStore): """Processes a batch of keys to fetch, and adds the result to `keys`.""" # batch_iter always returns tuples so it's safe to do len(batch) - sql = """ - SELECT server_name, key_id, key_json, ts_valid_until_ms - FROM server_keys_json WHERE 1=0 - """ + " OR (server_name=? AND key_id=?)" * len( - batch - ) + where_clause = " OR (server_name=? AND key_id=?)" * len(batch) + + # `server_keys_json` can have multiple entries per server (one per + # remote server we fetched from, if using perspectives). Order by + # `ts_added_ms` so the most recently fetched one always wins. + sql = f""" + SELECT server_name, key_id, key_json, ts_valid_until_ms + FROM server_keys_json WHERE 1=0 + {where_clause} + ORDER BY ts_added_ms + """ txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5ee5c7ad9f..e4d10ff250 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: - if isinstance(self.database_engine, PostgresEngine): - ROW_ID_NAME = "ctid" - else: - ROW_ID_NAME = "rowid" - + ROW_ID_NAME = self.database_engine.row_id_name # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b5b3bf03e..b1a2418cbd 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """Gets a string giving the server version. For example: '3.22.0'""" ... + @property + @abc.abstractmethod + def row_id_name(self) -> str: + """Gets the literal name representing a row id for this engine.""" + ... + @abc.abstractmethod def in_transaction(self, conn: ConnectionType) -> bool: """Whether the connection is currently in a transaction.""" diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 05a72dc554..6309363217 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -211,6 +211,10 @@ class PostgresEngine( else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + @property + def row_id_name(self) -> str: + return "ctid" + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ca8c59297c..802069e1e1 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): """Gets a string giving the server version. For example: '3.22.0'.""" return "%i.%i.%i" % sqlite3.sqlite_version_info + @property + def row_id_name(self) -> str: + return "rowid" + def in_transaction(self, conn: sqlite3.Connection) -> bool: return conn.in_transaction diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index ad2da4c8af..622686d28f 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -14,7 +14,7 @@ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + rowid = database_engine.row_id_name # remove duplicates from group_users & group_invites tables cur.execute( |