diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 50d71f5ebc..216a5925fc 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -14,8 +14,7 @@
# limitations under the License.
import logging
-
-from twisted.internet import defer
+from typing import Dict, Optional, Tuple
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
@@ -82,21 +81,19 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
"devices_last_seen", self._devices_last_seen_update
)
- @defer.inlineCallbacks
- def _remove_user_ip_nonunique(self, progress, batch_size):
+ async def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
txn = conn.cursor()
txn.execute("DROP INDEX IF EXISTS user_ips_user_ip")
txn.close()
- yield self.db_pool.runWithConnection(f)
- yield self.db_pool.updates._end_background_update(
+ await self.db_pool.runWithConnection(f)
+ await self.db_pool.updates._end_background_update(
"user_ips_drop_nonunique_index"
)
return 1
- @defer.inlineCallbacks
- def _analyze_user_ip(self, progress, batch_size):
+ async def _analyze_user_ip(self, progress, batch_size):
# Background update to analyze user_ips table before we run the
# deduplication background update. The table may not have been analyzed
# for ages due to the table locks.
@@ -106,14 +103,13 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
def user_ips_analyze(txn):
txn.execute("ANALYZE user_ips")
- yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
+ await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
- yield self.db_pool.updates._end_background_update("user_ips_analyze")
+ await self.db_pool.updates._end_background_update("user_ips_analyze")
return 1
- @defer.inlineCallbacks
- def _remove_user_ip_dupes(self, progress, batch_size):
+ async def _remove_user_ip_dupes(self, progress, batch_size):
# This works function works by scanning the user_ips table in batches
# based on `last_seen`. For each row in a batch it searches the rest of
# the table to see if there are any duplicates, if there are then they
@@ -140,7 +136,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return None
# Get a last seen that has roughly `batch_size` since `begin_last_seen`
- end_last_seen = yield self.db_pool.runInteraction(
+ end_last_seen = await self.db_pool.runInteraction(
"user_ips_dups_get_last_seen", get_last_seen
)
@@ -275,15 +271,14 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
)
- yield self.db_pool.runInteraction("user_ips_dups_remove", remove)
+ await self.db_pool.runInteraction("user_ips_dups_remove", remove)
if last:
- yield self.db_pool.updates._end_background_update("user_ips_remove_dupes")
+ await self.db_pool.updates._end_background_update("user_ips_remove_dupes")
return batch_size
- @defer.inlineCallbacks
- def _devices_last_seen_update(self, progress, batch_size):
+ async def _devices_last_seen_update(self, progress, batch_size):
"""Background update to insert last seen info into devices table
"""
@@ -346,12 +341,12 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
return len(rows)
- updated = yield self.db_pool.runInteraction(
+ updated = await self.db_pool.runInteraction(
"_devices_last_seen_update", _devices_last_seen_update_txn
)
if not updated:
- yield self.db_pool.updates._end_background_update("devices_last_seen")
+ await self.db_pool.updates._end_background_update("devices_last_seen")
return updated
@@ -460,25 +455,25 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
# Failed to upsert, log and continue
logger.error("Failed to insert client IP %r: %r", entry, e)
- @defer.inlineCallbacks
- def get_last_client_ip_by_device(self, user_id, device_id):
+ async def get_last_client_ip_by_device(
+ self, user_id: str, device_id: Optional[str]
+ ) -> Dict[Tuple[str, str], dict]:
"""For each device_id listed, give the user_ip it was last seen on
Args:
- user_id (str)
- device_id (str): If None fetches all devices for the user
+ user_id: The user to fetch devices for.
+ device_id: If None fetches all devices for the user
Returns:
- defer.Deferred: resolves to a dict, where the keys
- are (user_id, device_id) tuples. The values are also dicts, with
- keys giving the column names
+ A dictionary mapping a tuple of (user_id, device_id) to dicts, with
+ keys giving the column names from the devices table.
"""
keyvalues = {"user_id": user_id}
if device_id is not None:
keyvalues["device_id"] = device_id
- res = yield self.db_pool.simple_select_list(
+ res = await self.db_pool.simple_select_list(
table="devices",
keyvalues=keyvalues,
retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
@@ -500,8 +495,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
}
return ret
- @defer.inlineCallbacks
- def get_user_ip_and_agents(self, user):
+ async def get_user_ip_and_agents(self, user):
user_id = user.to_string()
results = {}
@@ -511,7 +505,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
user_agent, _, last_seen = self._batch_row_update[key]
results[(access_token, ip)] = (user_agent, last_seen)
- rows = yield self.db_pool.simple_select_list(
+ rows = await self.db_pool.simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "last_seen"],
|