diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 6db8c54077..539584288d 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -19,7 +19,7 @@ from six import iteritems
from twisted.internet import defer
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates
@@ -42,6 +42,8 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
super(ClientIpStore, self).__init__(db_conn, hs)
+ self.user_ips_max_age = hs.config.user_ips_max_age
+
self.register_background_index_update(
"user_ips_device_index",
index_name="user_ips_device_id",
@@ -85,6 +87,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
)
+ # Update the last seen info in devices.
+ self.register_background_update_handler(
+ "devices_last_seen", self._devices_last_seen_update
+ )
+
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}
@@ -95,6 +102,9 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"before", "shutdown", self._update_client_ips_batch
)
+ if self.user_ips_max_age:
+ self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
+
@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
@@ -314,20 +324,19 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
+ @wrap_as_background_process("update_client_ips")
def _update_client_ips_batch(self):
# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return
- def update():
- to_update = self._batch_row_update
- self._batch_row_update = {}
- return self.runInteraction(
- "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
- )
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
- return run_as_background_process("update_client_ips", update)
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+ )
def _update_client_ips_batch_txn(self, txn, to_update):
if "user_ips" in self._unsafe_to_upsert_tables or (
@@ -354,6 +363,21 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
},
lock=False,
)
+
+ # Technically an access token might not be associated with
+ # a device so we need to check.
+ if device_id:
+ self._simple_upsert_txn(
+ txn,
+ table="devices",
+ keyvalues={"user_id": user_id, "device_id": device_id},
+ values={
+ "user_agent": user_agent,
+ "last_seen": last_seen,
+ "ip": ip,
+ },
+ lock=False,
+ )
except Exception as e:
# Failed to upsert, log and continue
logger.error("Failed to insert client IP %r: %r", entry, e)
@@ -372,19 +396,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
keys giving the column names
"""
- res = yield self.runInteraction(
- "get_last_client_ip_by_device",
- self._get_last_client_ip_by_device_txn,
- user_id,
- device_id,
- retcols=(
- "user_id",
- "access_token",
- "ip",
- "user_agent",
- "device_id",
- "last_seen",
- ),
+ keyvalues = {"user_id": user_id}
+ if device_id is not None:
+ keyvalues["device_id"] = device_id
+
+ res = yield self._simple_select_list(
+ table="devices",
+ keyvalues=keyvalues,
+ retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
)
ret = {(d["user_id"], d["device_id"]): d for d in res}
@@ -403,42 +422,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
}
return ret
- @classmethod
- def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
- where_clauses = []
- bindings = []
- if device_id is None:
- where_clauses.append("user_id = ?")
- bindings.extend((user_id,))
- else:
- where_clauses.append("(user_id = ? AND device_id = ?)")
- bindings.extend((user_id, device_id))
-
- if not where_clauses:
- return []
-
- inner_select = (
- "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
- "WHERE %(where)s "
- "GROUP BY user_id, device_id"
- ) % {"where": " OR ".join(where_clauses)}
-
- sql = (
- "SELECT %(retcols)s FROM user_ips "
- "JOIN (%(inner_select)s) ips ON"
- " user_ips.last_seen = ips.mls AND"
- " user_ips.user_id = ips.user_id AND"
- " (user_ips.device_id = ips.device_id OR"
- " (user_ips.device_id IS NULL AND ips.device_id IS NULL)"
- " )"
- ) % {
- "retcols": ",".join("user_ips." + c for c in retcols),
- "inner_select": inner_select,
- }
-
- txn.execute(sql, bindings)
- return cls.cursor_to_dict(txn)
-
@defer.inlineCallbacks
def get_user_ip_and_agents(self, user):
user_id = user.to_string()
@@ -470,3 +453,92 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
}
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
)
+
+ @defer.inlineCallbacks
+ def _devices_last_seen_update(self, progress, batch_size):
+ """Background update to insert last seen info into devices table
+ """
+
+ last_user_id = progress.get("last_user_id", "")
+ last_device_id = progress.get("last_device_id", "")
+
+ def _devices_last_seen_update_txn(txn):
+ sql = """
+ SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices
+ INNER JOIN user_ips AS u USING (user_id, device_id)
+ WHERE user_id > ? OR (user_id = ? AND device_id > ?)
+ ORDER BY user_id ASC, device_id ASC
+ LIMIT ?
+ """
+ txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ sql = """
+ UPDATE devices
+ SET last_seen = ?, ip = ?, user_agent = ?
+ WHERE user_id = ? AND device_id = ?
+ """
+ txn.execute_batch(sql, rows)
+
+ _, _, _, user_id, device_id = rows[-1]
+ self._background_update_progress_txn(
+ txn,
+ "devices_last_seen",
+ {"last_user_id": user_id, "last_device_id": device_id},
+ )
+
+ return len(rows)
+
+ updated = yield self.runInteraction(
+ "_devices_last_seen_update", _devices_last_seen_update_txn
+ )
+
+ if not updated:
+ yield self._end_background_update("devices_last_seen")
+
+ return updated
+
+ @wrap_as_background_process("prune_old_user_ips")
+ async def _prune_old_user_ips(self):
+ """Removes entries in user IPs older than the configured period.
+ """
+
+ if self.user_ips_max_age is None:
+ # Nothing to do
+ return
+
+ if not await self.has_completed_background_update("devices_last_seen"):
+ # Only start pruning if we have finished populating the devices
+ # last seen info.
+ return
+
+ # We do a slightly funky SQL delete to ensure we don't try and delete
+ # too much at once (as the table may be very large from before we
+ # started pruning).
+ #
+ # This works by finding the max last_seen that is less than the given
+ # time, but has no more than N rows before it, deleting all rows with
+ # a lesser last_seen time. (We COALESCE so that the sub-SELECT always
+ # returns exactly one row).
+ sql = """
+ DELETE FROM user_ips
+ WHERE last_seen <= (
+ SELECT COALESCE(MAX(last_seen), -1)
+ FROM (
+ SELECT last_seen FROM user_ips
+ WHERE last_seen <= ?
+ ORDER BY last_seen ASC
+ LIMIT 5000
+ ) AS u
+ )
+ """
+
+ timestamp = self.clock.time_msec() - self.user_ips_max_age
+
+ def _prune_old_user_ips_txn(txn):
+ txn.execute(sql, (timestamp,))
+
+ await self.runInteraction("_prune_old_user_ips", _prune_old_user_ips_txn)
|