diff options
-rw-r--r-- | changelog.d/6089.misc | 1 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 124 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/devices_last_seen.sql | 24 | ||||
-rw-r--r-- | tests/storage/test_client_ips.py | 80 |
4 files changed, 179 insertions, 50 deletions
diff --git a/changelog.d/6089.misc b/changelog.d/6089.misc new file mode 100644 index 0000000000..fa3c197c54 --- /dev/null +++ b/changelog.d/6089.misc @@ -0,0 +1 @@ +Move last seen info into devices table. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 6db8c54077..8996689744 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -85,6 +85,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_ips_drop_nonunique_index", self._remove_user_ip_nonunique ) + # Update the last seen info in devices. + self.register_background_update_handler( + "devices_last_seen", self._devices_last_seen_update + ) + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) self._batch_row_update = {} @@ -354,6 +359,21 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): }, lock=False, ) + + # Technically an access token might not be associated with + # a device so we need to check. + if device_id: + self._simple_upsert_txn( + txn, + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + values={ + "user_agent": user_agent, + "last_seen": last_seen, + "ip": ip, + }, + lock=False, + ) except Exception as e: # Failed to upsert, log and continue logger.error("Failed to insert client IP %r: %r", entry, e) @@ -372,19 +392,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): keys giving the column names """ - res = yield self.runInteraction( - "get_last_client_ip_by_device", - self._get_last_client_ip_by_device_txn, - user_id, - device_id, - retcols=( - "user_id", - "access_token", - "ip", - "user_agent", - "device_id", - "last_seen", - ), + keyvalues = {"user_id": user_id} + if device_id is not None: + keyvalues["device_id"] = device_id + + res = yield self._simple_select_list( + table="devices", + keyvalues=keyvalues, + retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), ) ret = {(d["user_id"], d["device_id"]): d for d in res} @@ -403,42 +418,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): } return ret - @classmethod - def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols): - where_clauses = [] - bindings = [] - if device_id is None: - where_clauses.append("user_id = ?") - bindings.extend((user_id,)) - else: - where_clauses.append("(user_id = ? AND device_id = ?)") - bindings.extend((user_id, device_id)) - - if not where_clauses: - return [] - - inner_select = ( - "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips " - "WHERE %(where)s " - "GROUP BY user_id, device_id" - ) % {"where": " OR ".join(where_clauses)} - - sql = ( - "SELECT %(retcols)s FROM user_ips " - "JOIN (%(inner_select)s) ips ON" - " user_ips.last_seen = ips.mls AND" - " user_ips.user_id = ips.user_id AND" - " (user_ips.device_id = ips.device_id OR" - " (user_ips.device_id IS NULL AND ips.device_id IS NULL)" - " )" - ) % { - "retcols": ",".join("user_ips." + c for c in retcols), - "inner_select": inner_select, - } - - txn.execute(sql, bindings) - return cls.cursor_to_dict(txn) - @defer.inlineCallbacks def get_user_ip_and_agents(self, user): user_id = user.to_string() @@ -470,3 +449,50 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): } for (access_token, ip), (user_agent, last_seen) in iteritems(results) ) + + @defer.inlineCallbacks + def _devices_last_seen_update(self, progress, batch_size): + """Background update to insert last seen info into devices table + """ + + last_user_id = progress.get("last_user_id", "") + last_device_id = progress.get("last_device_id", "") + + def _devices_last_seen_update_txn(txn): + sql = """ + SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices + INNER JOIN user_ips AS u USING (user_id, device_id) + WHERE user_id > ? OR (user_id = ? AND device_id > ?) + ORDER BY user_id ASC, device_id ASC + LIMIT ? + """ + txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + sql = """ + UPDATE devices + SET last_seen = ?, ip = ?, user_agent = ? + WHERE user_id = ? AND device_id = ? + """ + txn.execute_batch(sql, rows) + + _, _, _, user_id, device_id = rows[-1] + self._background_update_progress_txn( + txn, + "devices_last_seen", + {"last_user_id": user_id, "last_device_id": device_id}, + ) + + return len(rows) + + updated = yield self.runInteraction( + "_devices_last_seen_update", _devices_last_seen_update_txn + ) + + if not updated: + yield self._end_background_update("devices_last_seen") + + return updated diff --git a/synapse/storage/schema/delta/56/devices_last_seen.sql b/synapse/storage/schema/delta/56/devices_last_seen.sql new file mode 100644 index 0000000000..dfa902d0ba --- /dev/null +++ b/synapse/storage/schema/delta/56/devices_last_seen.sql @@ -0,0 +1,24 @@ +/* Copyright 2019 Matrix.org Foundation CIC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Track last seen information for a device in the devices table, rather +-- than relying on it being in the user_ips table (which we want to be able +-- to purge old entries from) +ALTER TABLE devices ADD COLUMN last_seen BIGINT; +ALTER TABLE devices ADD COLUMN ip TEXT; +ALTER TABLE devices ADD COLUMN user_agent TEXT; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('devices_last_seen', '{}'); diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 09305c3bf1..76fe65b59e 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -55,7 +55,6 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): { "user_id": user_id, "device_id": "device_id", - "access_token": "access_token", "ip": "ip", "user_agent": "user_agent", "last_seen": 12345678000, @@ -201,6 +200,85 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): active = self.get_success(self.store.user_last_seen_monthly_active(user_id)) self.assertTrue(active) + def test_devices_last_seen_bg_update(self): + # First make sure we have completed all updates. + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + # Insert a user IP + user_id = "@user:id" + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", "device_id" + ) + ) + + # Force persisting to disk + self.reactor.advance(200) + + # But clear the associated entry in devices table + self.get_success( + self.store._simple_update( + table="devices", + keyvalues={"user_id": user_id, "device_id": "device_id"}, + updatevalues={"last_seen": None, "ip": None, "user_agent": None}, + desc="test_devices_last_seen_bg_update", + ) + ) + + # We should now get nulls when querying + result = self.get_success( + self.store.get_last_client_ip_by_device(user_id, "device_id") + ) + + r = result[(user_id, "device_id")] + self.assertDictContainsSubset( + { + "user_id": user_id, + "device_id": "device_id", + "ip": None, + "user_agent": None, + "last_seen": None, + }, + r, + ) + + # Register the background update to run again. + self.get_success( + self.store._simple_insert( + table="background_updates", + values={ + "update_name": "devices_last_seen", + "progress_json": "{}", + "depends_on": None, + }, + ) + ) + + # ... and tell the DataStore that it hasn't finished all updates yet + self.store._all_done = False + + # Now let's actually drive the updates to completion + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + # We should now get the correct result again + result = self.get_success( + self.store.get_last_client_ip_by_device(user_id, "device_id") + ) + + r = result[(user_id, "device_id")] + self.assertDictContainsSubset( + { + "user_id": user_id, + "device_id": "device_id", + "ip": "ip", + "user_agent": "user_agent", + "last_seen": 0, + }, + r, + ) + class ClientIpAuthTestCase(unittest.HomeserverTestCase): |