summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/6089.misc1
-rw-r--r--synapse/storage/client_ips.py124
-rw-r--r--synapse/storage/schema/delta/56/devices_last_seen.sql24
-rw-r--r--tests/storage/test_client_ips.py80
4 files changed, 179 insertions, 50 deletions
diff --git a/changelog.d/6089.misc b/changelog.d/6089.misc
new file mode 100644
index 0000000000..fa3c197c54
--- /dev/null
+++ b/changelog.d/6089.misc
@@ -0,0 +1 @@
+Move last seen info into devices table.
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 6db8c54077..8996689744 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -85,6 +85,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             "user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
         )
 
+        # Update the last seen info in devices.
+        self.register_background_update_handler(
+            "devices_last_seen", self._devices_last_seen_update
+        )
+
         # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
         self._batch_row_update = {}
 
@@ -354,6 +359,21 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                     },
                     lock=False,
                 )
+
+                # Technically an access token might not be associated with
+                # a device so we need to check.
+                if device_id:
+                    self._simple_upsert_txn(
+                        txn,
+                        table="devices",
+                        keyvalues={"user_id": user_id, "device_id": device_id},
+                        values={
+                            "user_agent": user_agent,
+                            "last_seen": last_seen,
+                            "ip": ip,
+                        },
+                        lock=False,
+                    )
             except Exception as e:
                 # Failed to upsert, log and continue
                 logger.error("Failed to insert client IP %r: %r", entry, e)
@@ -372,19 +392,14 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             keys giving the column names
         """
 
-        res = yield self.runInteraction(
-            "get_last_client_ip_by_device",
-            self._get_last_client_ip_by_device_txn,
-            user_id,
-            device_id,
-            retcols=(
-                "user_id",
-                "access_token",
-                "ip",
-                "user_agent",
-                "device_id",
-                "last_seen",
-            ),
+        keyvalues = {"user_id": user_id}
+        if device_id is not None:
+            keyvalues["device_id"] = device_id
+
+        res = yield self._simple_select_list(
+            table="devices",
+            keyvalues=keyvalues,
+            retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
         )
 
         ret = {(d["user_id"], d["device_id"]): d for d in res}
@@ -403,42 +418,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                     }
         return ret
 
-    @classmethod
-    def _get_last_client_ip_by_device_txn(cls, txn, user_id, device_id, retcols):
-        where_clauses = []
-        bindings = []
-        if device_id is None:
-            where_clauses.append("user_id = ?")
-            bindings.extend((user_id,))
-        else:
-            where_clauses.append("(user_id = ? AND device_id = ?)")
-            bindings.extend((user_id, device_id))
-
-        if not where_clauses:
-            return []
-
-        inner_select = (
-            "SELECT MAX(last_seen) mls, user_id, device_id FROM user_ips "
-            "WHERE %(where)s "
-            "GROUP BY user_id, device_id"
-        ) % {"where": " OR ".join(where_clauses)}
-
-        sql = (
-            "SELECT %(retcols)s FROM user_ips "
-            "JOIN (%(inner_select)s) ips ON"
-            "    user_ips.last_seen = ips.mls AND"
-            "    user_ips.user_id = ips.user_id AND"
-            "    (user_ips.device_id = ips.device_id OR"
-            "         (user_ips.device_id IS NULL AND ips.device_id IS NULL)"
-            "    )"
-        ) % {
-            "retcols": ",".join("user_ips." + c for c in retcols),
-            "inner_select": inner_select,
-        }
-
-        txn.execute(sql, bindings)
-        return cls.cursor_to_dict(txn)
-
     @defer.inlineCallbacks
     def get_user_ip_and_agents(self, user):
         user_id = user.to_string()
@@ -470,3 +449,50 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             }
             for (access_token, ip), (user_agent, last_seen) in iteritems(results)
         )
+
+    @defer.inlineCallbacks
+    def _devices_last_seen_update(self, progress, batch_size):
+        """Background update to insert last seen info into devices table
+        """
+
+        last_user_id = progress.get("last_user_id", "")
+        last_device_id = progress.get("last_device_id", "")
+
+        def _devices_last_seen_update_txn(txn):
+            sql = """
+                SELECT u.last_seen, u.ip, u.user_agent, user_id, device_id FROM devices
+                INNER JOIN user_ips AS u USING (user_id, device_id)
+                WHERE user_id > ? OR (user_id = ? AND device_id > ?)
+                ORDER BY user_id ASC, device_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_user_id, last_user_id, last_device_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            sql = """
+                UPDATE devices
+                SET last_seen = ?, ip = ?, user_agent = ?
+                WHERE user_id = ? AND device_id = ?
+            """
+            txn.execute_batch(sql, rows)
+
+            _, _, _, user_id, device_id = rows[-1]
+            self._background_update_progress_txn(
+                txn,
+                "devices_last_seen",
+                {"last_user_id": user_id, "last_device_id": device_id},
+            )
+
+            return len(rows)
+
+        updated = yield self.runInteraction(
+            "_devices_last_seen_update", _devices_last_seen_update_txn
+        )
+
+        if not updated:
+            yield self._end_background_update("devices_last_seen")
+
+        return updated
diff --git a/synapse/storage/schema/delta/56/devices_last_seen.sql b/synapse/storage/schema/delta/56/devices_last_seen.sql
new file mode 100644
index 0000000000..dfa902d0ba
--- /dev/null
+++ b/synapse/storage/schema/delta/56/devices_last_seen.sql
@@ -0,0 +1,24 @@
+/* Copyright 2019 Matrix.org Foundation CIC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Track last seen information for a device in the devices table, rather
+-- than relying on it being in the user_ips table (which we want to be able
+-- to purge old entries from)
+ALTER TABLE devices ADD COLUMN last_seen BIGINT;
+ALTER TABLE devices ADD COLUMN ip TEXT;
+ALTER TABLE devices ADD COLUMN user_agent TEXT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('devices_last_seen', '{}');
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index 09305c3bf1..76fe65b59e 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -55,7 +55,6 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
             {
                 "user_id": user_id,
                 "device_id": "device_id",
-                "access_token": "access_token",
                 "ip": "ip",
                 "user_agent": "user_agent",
                 "last_seen": 12345678000,
@@ -201,6 +200,85 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
         active = self.get_success(self.store.user_last_seen_monthly_active(user_id))
         self.assertTrue(active)
 
+    def test_devices_last_seen_bg_update(self):
+        # First make sure we have completed all updates.
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+        # Insert a user IP
+        user_id = "@user:id"
+        self.get_success(
+            self.store.insert_client_ip(
+                user_id, "access_token", "ip", "user_agent", "device_id"
+            )
+        )
+
+        # Force persisting to disk
+        self.reactor.advance(200)
+
+        # But clear the associated entry in devices table
+        self.get_success(
+            self.store._simple_update(
+                table="devices",
+                keyvalues={"user_id": user_id, "device_id": "device_id"},
+                updatevalues={"last_seen": None, "ip": None, "user_agent": None},
+                desc="test_devices_last_seen_bg_update",
+            )
+        )
+
+        # We should now get nulls when querying
+        result = self.get_success(
+            self.store.get_last_client_ip_by_device(user_id, "device_id")
+        )
+
+        r = result[(user_id, "device_id")]
+        self.assertDictContainsSubset(
+            {
+                "user_id": user_id,
+                "device_id": "device_id",
+                "ip": None,
+                "user_agent": None,
+                "last_seen": None,
+            },
+            r,
+        )
+
+        # Register the background update to run again.
+        self.get_success(
+            self.store._simple_insert(
+                table="background_updates",
+                values={
+                    "update_name": "devices_last_seen",
+                    "progress_json": "{}",
+                    "depends_on": None,
+                },
+            )
+        )
+
+        # ... and tell the DataStore that it hasn't finished all updates yet
+        self.store._all_done = False
+
+        # Now let's actually drive the updates to completion
+        while not self.get_success(self.store.has_completed_background_updates()):
+            self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+        # We should now get the correct result again
+        result = self.get_success(
+            self.store.get_last_client_ip_by_device(user_id, "device_id")
+        )
+
+        r = result[(user_id, "device_id")]
+        self.assertDictContainsSubset(
+            {
+                "user_id": user_id,
+                "device_id": "device_id",
+                "ip": "ip",
+                "user_agent": "user_agent",
+                "last_seen": 0,
+            },
+            r,
+        )
+
 
 class ClientIpAuthTestCase(unittest.HomeserverTestCase):