diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 271cdf923c..744e98c6d0 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
- self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ up_to_stream_id: int,
+ limit: int,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
+ limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
+ ROW_ID_NAME = self.database_engine.row_id_name
+
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- sql = (
- "DELETE FROM device_inbox"
- " WHERE user_id = ? AND device_id = ?"
- " AND stream_id <= ?"
- )
+ sql = f"""
+ DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+ SELECT {ROW_ID_NAME} FROM device_inbox
+ WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+ LIMIT {limit}
+ )
+ """
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": f"deleted {count} messages for device", "count": count})
+ # In this case we don't know if we hit the limit or the delete is complete
+ # so let's not update the cache.
+ if count == limit:
+ return count
+
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index fa69a4a298..7208fc8b33 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1768,14 +1768,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self.db_pool.simple_delete_many_txn(
txn,
- table="device_inbox",
- column="device_id",
- values=device_ids,
- keyvalues={"user_id": user_id},
- )
-
- self.db_pool.simple_delete_many_txn(
- txn,
table="device_auth_providers",
column="device_id",
values=device_ids,
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5ee5c7ad9f..e4d10ff250 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
receipts."""
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
- if isinstance(self.database_engine, PostgresEngine):
- ROW_ID_NAME = "ctid"
- else:
- ROW_ID_NAME = "rowid"
-
+ ROW_ID_NAME = self.database_engine.row_id_name
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# The following query takes less than a minute on matrix.org.
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 0b5b3bf03e..b1a2418cbd 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""Gets a string giving the server version. For example: '3.22.0'"""
...
+ @property
+ @abc.abstractmethod
+ def row_id_name(self) -> str:
+ """Gets the literal name representing a row id for this engine."""
+ ...
+
@abc.abstractmethod
def in_transaction(self, conn: ConnectionType) -> bool:
"""Whether the connection is currently in a transaction."""
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 05a72dc554..6309363217 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -211,6 +211,10 @@ class PostgresEngine(
else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
+ @property
+ def row_id_name(self) -> str:
+ return "ctid"
+
def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index ca8c59297c..802069e1e1 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
"""Gets a string giving the server version. For example: '3.22.0'."""
return "%i.%i.%i" % sqlite3.sqlite_version_info
+ @property
+ def row_id_name(self) -> str:
+ return "rowid"
+
def in_transaction(self, conn: sqlite3.Connection) -> bool:
return conn.in_transaction
diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
index ad2da4c8af..622686d28f 100644
--- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
@@ -14,7 +14,7 @@
from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements
FIX_INDEXES = """
@@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
- rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
+ rowid = database_engine.row_id_name
# remove duplicates from group_users & group_invites tables
cur.execute(
|