diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index bd56ba2515..c8d5f5ba8b 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -108,6 +108,23 @@ class DeviceStore(SQLBaseStore):
desc="delete_device",
)
+ def delete_devices(self, user_id, device_ids):
+ """Deletes several devices.
+
+ Args:
+ user_id (str): The ID of the user which owns the devices
+ device_ids (list): The IDs of the devices to delete
+ Returns:
+ defer.Deferred
+ """
+ return self._simple_delete_many(
+ table="devices",
+ column="device_id",
+ iterable=device_ids,
+ keyvalues={"user_id": user_id},
+ desc="delete_devices",
+ )
+
def update_device(self, user_id, device_id, new_display_name=None):
"""Update a device.
@@ -291,7 +308,7 @@ class DeviceStore(SQLBaseStore):
"""Get stream of updates to send to remote servers
Returns:
- (now_stream_id, [ { updates }, .. ])
+ (int, list[dict]): current stream id and list of updates
"""
now_stream_id = self._device_list_id_gen.get_current_token()
@@ -312,17 +329,20 @@ class DeviceStore(SQLBaseStore):
SELECT user_id, device_id, max(stream_id) FROM device_lists_outbound_pokes
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
GROUP BY user_id, device_id
+ LIMIT 20
"""
txn.execute(
sql, (destination, from_stream_id, now_stream_id, False)
)
- rows = txn.fetchall()
- if not rows:
+ # maps (user_id, device_id) -> stream_id
+ query_map = {(r[0], r[1]): r[2] for r in txn}
+ if not query_map:
return (now_stream_id, [])
- # maps (user_id, device_id) -> stream_id
- query_map = {(r[0], r[1]): r[2] for r in rows}
+ if len(query_map) >= 20:
+ now_stream_id = max(stream_id for stream_id in query_map.itervalues())
+
devices = self._get_e2e_device_keys_txn(
txn, query_map.keys(), include_all_devices=True
)
@@ -513,7 +533,7 @@ class DeviceStore(SQLBaseStore):
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
defer.returnValue(set(row[0] for row in rows))
- def get_all_device_list_changes_for_remotes(self, from_key):
+ def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
combined list of changes to devices, and which destinations need to be
poked. `destination` may be None if no destinations need to be poked.
@@ -521,11 +541,11 @@ class DeviceStore(SQLBaseStore):
sql = """
SELECT stream_id, user_id, destination FROM device_lists_stream
LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
- WHERE stream_id > ?
+ WHERE ? < stream_id AND stream_id <= ?
"""
return self._execute(
"get_all_device_list_changes_for_remotes", None,
- sql, from_key,
+ sql, from_key, to_key
)
@defer.inlineCallbacks
|