diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 271cdf923c..744e98c6d0 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
- self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ up_to_stream_id: int,
+ limit: int,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
+ limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
+ ROW_ID_NAME = self.database_engine.row_id_name
+
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- sql = (
- "DELETE FROM device_inbox"
- " WHERE user_id = ? AND device_id = ?"
- " AND stream_id <= ?"
- )
+ sql = f"""
+ DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+ SELECT {ROW_ID_NAME} FROM device_inbox
+ WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+ LIMIT {limit}
+ )
+ """
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": f"deleted {count} messages for device", "count": count})
+ # In this case we don't know if we hit the limit or the delete is complete
+ # so let's not update the cache.
+ if count == limit:
+ return count
+
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
|