summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-04-08 15:29:13 +0100
committerGitHub <noreply@github.com>2022-04-08 15:29:13 +0100
commite630722f11821b4d7610bceae256674114f9803b (patch)
tree9bd1903a583b6b5d59793da500b44a917dfcde14 /synapse/storage/databases
parentMake `synapse._scripts` pass typechecks (#12421) (diff)
downloadsynapse-e630722f11821b4d7610bceae256674114f9803b.tar.xz
Optimise `_update_client_ips_batch_txn` to batch together database operations. (#12252)
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/client_ips.py66
1 files changed, 34 insertions, 32 deletions
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 8480ea4e1c..0df160d2b0 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -616,9 +616,10 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
         to_update = self._batch_row_update
         self._batch_row_update = {}
 
-        await self.db_pool.runInteraction(
-            "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
-        )
+        if to_update:
+            await self.db_pool.runInteraction(
+                "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+            )
 
     def _update_client_ips_batch_txn(
         self,
@@ -629,42 +630,43 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
             self._update_on_this_worker
         ), "This worker is not designated to update client IPs"
 
-        if "user_ips" in self.db_pool._unsafe_to_upsert_tables or (
-            not self.database_engine.can_native_upsert
-        ):
-            self.database_engine.lock_table(txn, "user_ips")
+        # Keys and values for the `user_ips` upsert.
+        user_ips_keys = []
+        user_ips_values = []
+
+        # Keys and values for the `devices` update.
+        devices_keys = []
+        devices_values = []
 
         for entry in to_update.items():
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
-
-            self.db_pool.simple_upsert_txn(
-                txn,
-                table="user_ips",
-                keyvalues={"user_id": user_id, "access_token": access_token, "ip": ip},
-                values={
-                    "user_agent": user_agent,
-                    "device_id": device_id,
-                    "last_seen": last_seen,
-                },
-                lock=False,
-            )
+            user_ips_keys.append((user_id, access_token, ip))
+            user_ips_values.append((user_agent, device_id, last_seen))
 
             # Technically an access token might not be associated with
             # a device so we need to check.
             if device_id:
-                # this is always an update rather than an upsert: the row should
-                # already exist, and if it doesn't, that may be because it has been
-                # deleted, and we don't want to re-create it.
-                self.db_pool.simple_update_txn(
-                    txn,
-                    table="devices",
-                    keyvalues={"user_id": user_id, "device_id": device_id},
-                    updatevalues={
-                        "user_agent": user_agent,
-                        "last_seen": last_seen,
-                        "ip": ip,
-                    },
-                )
+                devices_keys.append((user_id, device_id))
+                devices_values.append((user_agent, last_seen, ip))
+
+        self.db_pool.simple_upsert_many_txn(
+            txn,
+            table="user_ips",
+            key_names=("user_id", "access_token", "ip"),
+            key_values=user_ips_keys,
+            value_names=("user_agent", "device_id", "last_seen"),
+            value_values=user_ips_values,
+        )
+
+        if devices_values:
+            self.db_pool.simple_update_many_txn(
+                txn,
+                table="devices",
+                key_names=("user_id", "device_id"),
+                key_values=devices_keys,
+                value_names=("user_agent", "last_seen", "ip"),
+                value_values=devices_values,
+            )
 
     async def get_last_client_ip_by_device(
         self, user_id: str, device_id: Optional[str]