summary refs log tree commit diff
path: root/synapse/storage/client_ips.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2019-01-25 11:09:53 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2019-01-25 11:09:53 +0000
commit821b65aeb51e744055e0eaaf9b528587fa494b20 (patch)
treec4231f142f51d319c4b86c2dcc0e4fa691a3bed0 /synapse/storage/client_ips.py
parentMove tag and direct state copying into separate function (diff)
parentMerge pull request #4415 from matrix-org/anoa/full_search_upgraded_rooms (diff)
downloadsynapse-821b65aeb51e744055e0eaaf9b528587fa494b20.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/dm_room_upgrade
Diffstat (limited to 'synapse/storage/client_ips.py')
-rw-r--r--synapse/storage/client_ips.py70
1 files changed, 43 insertions, 27 deletions
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index 5d548f250a..091d7116c5 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -110,8 +110,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
 
     @defer.inlineCallbacks
     def _remove_user_ip_dupes(self, progress, batch_size):
+        # This works function works by scanning the user_ips table in batches
+        # based on `last_seen`. For each row in a batch it searches the rest of
+        # the table to see if there are any duplicates, if there are then they
+        # are removed and replaced with a suitable row.
 
-        last_seen_progress = progress.get("last_seen", 0)
+        # Fetch the start of the batch
+        begin_last_seen = progress.get("last_seen", 0)
 
         def get_last_seen(txn):
             txn.execute(
@@ -122,29 +127,28 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 LIMIT 1
                 OFFSET ?
                 """,
-                (last_seen_progress, batch_size)
+                (begin_last_seen, batch_size)
             )
-            results = txn.fetchone()
-            return results
-
-        # Get a last seen that's sufficiently far away enough from the last one
-        last_seen = yield self.runInteraction(
+            row = txn.fetchone()
+            if row:
+                return row[0]
+            else:
+                return None
+
+        # Get a last seen that has roughly `batch_size` since `begin_last_seen`
+        end_last_seen = yield self.runInteraction(
             "user_ips_dups_get_last_seen", get_last_seen
         )
 
-        if not last_seen:
-            # If we get a None then we're reaching the end and just need to
-            # delete the last batch.
-            last = True
+        # If it returns None, then we're processing the last batch
+        last = end_last_seen is None
 
-            # We fake not having an upper bound by using a future date, by
-            # just multiplying the current time by two....
-            last_seen = int(self.clock.time_msec()) * 2
-        else:
-            last = False
-            last_seen = last_seen[0]
+        logger.info(
+            "Scanning for duplicate 'user_ips' rows in range: %s <= last_seen < %s",
+            begin_last_seen, end_last_seen,
+        )
 
-        def remove(txn, last_seen_progress, last_seen):
+        def remove(txn):
             # This works by looking at all entries in the given time span, and
             # then for each (user_id, access_token, ip) tuple in that range
             # checking for any duplicates in the rest of the table (via a join).
@@ -153,6 +157,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
             # all other duplicates.
             # It is efficient due to the existence of (user_id, access_token,
             # ip) and (last_seen) indices.
+
+            # Define the search space, which requires handling the last batch in
+            # a different way
+            if last:
+                clause = "? <= last_seen"
+                args = (begin_last_seen,)
+            else:
+                clause = "? <= last_seen AND last_seen < ?"
+                args = (begin_last_seen, end_last_seen)
+
             txn.execute(
                 """
                 SELECT user_id, access_token, ip,
@@ -160,13 +174,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 FROM (
                     SELECT user_id, access_token, ip
                     FROM user_ips
-                    WHERE ? <= last_seen AND last_seen < ?
-                    ORDER BY last_seen
+                    WHERE {}
                 ) c
                 INNER JOIN user_ips USING (user_id, access_token, ip)
                 GROUP BY user_id, access_token, ip
-                HAVING count(*) > 1""",
-                (last_seen_progress, last_seen)
+                HAVING count(*) > 1
+                """.format(clause),
+                args
             )
             res = txn.fetchall()
 
@@ -194,12 +208,11 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
                 )
 
             self._background_update_progress_txn(
-                txn, "user_ips_remove_dupes", {"last_seen": last_seen}
+                txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
             )
 
-        yield self.runInteraction(
-            "user_ips_dups_remove", remove, last_seen_progress, last_seen
-        )
+        yield self.runInteraction("user_ips_dups_remove", remove)
+
         if last:
             yield self._end_background_update("user_ips_remove_dupes")
 
@@ -244,7 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
         )
 
     def _update_client_ips_batch_txn(self, txn, to_update):
-        self.database_engine.lock_table(txn, "user_ips")
+        if "user_ips" in self._unsafe_to_upsert_tables or (
+            not self.database_engine.can_native_upsert
+        ):
+            self.database_engine.lock_table(txn, "user_ips")
 
         for entry in iteritems(to_update):
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry