diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 744e98c6d0..3e7425d4a6 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -344,18 +344,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# Note that this is more efficient than just dropping `device_id` from the query,
# since device_inbox has an index on `(user_id, device_id, stream_id)`
if not device_ids_to_query:
- user_device_dicts = self.db_pool.simple_select_many_txn(
- txn,
- table="devices",
- column="user_id",
- iterable=user_ids_to_query,
- keyvalues={"hidden": False},
- retcols=("device_id",),
+ user_device_dicts = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="devices",
+ column="user_id",
+ iterable=user_ids_to_query,
+ keyvalues={"hidden": False},
+ retcols=("device_id",),
+ ),
)
- device_ids_to_query.update(
- {row["device_id"] for row in user_device_dicts}
- )
+ device_ids_to_query.update({row[0] for row in user_device_dicts})
if not device_ids_to_query:
# We've ended up with no devices to query.
@@ -449,7 +450,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
- limit: int,
+ limit: Optional[int] = None,
) -> int:
"""
Args:
@@ -477,17 +478,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
- ROW_ID_NAME = self.database_engine.row_id_name
-
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
+ limit_statement = "" if limit is None else f"LIMIT {limit}"
sql = f"""
- DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
- SELECT {ROW_ID_NAME} FROM device_inbox
- WHERE user_id = ? AND device_id = ? AND stream_id <= ?
- LIMIT {limit}
+ DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
+ SELECT MAX(stream_id) FROM (
+ SELECT stream_id FROM device_inbox
+ WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+ ORDER BY stream_id
+ {limit_statement}
+ ) AS q1
)
"""
- txn.execute(sql, (user_id, device_id, up_to_stream_id))
+ txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
return txn.rowcount
count = await self.db_pool.runInteraction(
@@ -845,20 +848,21 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# We exclude hidden devices (such as cross-signing keys) here as they are
# not expected to receive to-device messages.
- rows = self.db_pool.simple_select_many_txn(
- txn,
- table="devices",
- keyvalues={"user_id": user_id, "hidden": False},
- column="device_id",
- iterable=devices,
- retcols=("device_id",),
+ rows = cast(
+ List[Tuple[str]],
+ self.db_pool.simple_select_many_txn(
+ txn,
+ table="devices",
+ keyvalues={"user_id": user_id, "hidden": False},
+ column="device_id",
+ iterable=devices,
+ retcols=("device_id",),
+ ),
)
- for row in rows:
+ for (device_id,) in rows:
# Only insert into the local inbox if the device exists on
# this server
- device_id = row["device_id"]
-
with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
|