diff options
-rw-r--r-- | .travis.yml | 3 | ||||
-rw-r--r-- | changelog.d/4370.misc | 1 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 138 | ||||
-rw-r--r-- | synapse/storage/schema/delta/53/user_ips_index.sql | 26 |
4 files changed, 164 insertions, 4 deletions
diff --git a/.travis.yml b/.travis.yml index 84d5efff9b..728f0e248a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,9 @@ cache: # - $HOME/.cache/pip/wheels +addons: + postgresql: "9.4" + # don't clone the whole repo history, one commit will do git: depth: 1 diff --git a/changelog.d/4370.misc b/changelog.d/4370.misc new file mode 100644 index 0000000000..047061ed3c --- /dev/null +++ b/changelog.d/4370.misc @@ -0,0 +1 @@ +Apply a unique index to the user_ips table, preventing duplicates. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 9ad17b7c25..5d548f250a 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -65,7 +65,27 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["last_seen"], ) - # (user_id, access_token, ip) -> (user_agent, device_id, last_seen) + self.register_background_update_handler( + "user_ips_remove_dupes", + self._remove_user_ip_dupes, + ) + + # Register a unique index + self.register_background_index_update( + "user_ips_device_unique_index", + index_name="user_ips_user_token_ip_unique_index", + table="user_ips", + columns=["user_id", "access_token", "ip"], + unique=True, + ) + + # Drop the old non-unique index + self.register_background_update_handler( + "user_ips_drop_nonunique_index", + self._remove_user_ip_nonunique, + ) + + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) self._batch_row_update = {} self._client_ip_looper = self._clock.looping_call( @@ -76,6 +96,116 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) @defer.inlineCallbacks + def _remove_user_ip_nonunique(self, progress, batch_size): + def f(conn): + txn = conn.cursor() + txn.execute( + "DROP INDEX IF EXISTS user_ips_user_ip" + ) + txn.close() + + yield self.runWithConnection(f) + yield self._end_background_update("user_ips_drop_nonunique_index") + defer.returnValue(1) + + @defer.inlineCallbacks + def _remove_user_ip_dupes(self, progress, batch_size): + + last_seen_progress = progress.get("last_seen", 0) + + def get_last_seen(txn): + txn.execute( + """ + SELECT last_seen FROM user_ips + WHERE last_seen > ? + ORDER BY last_seen + LIMIT 1 + OFFSET ? + """, + (last_seen_progress, batch_size) + ) + results = txn.fetchone() + return results + + # Get a last seen that's sufficiently far away enough from the last one + last_seen = yield self.runInteraction( + "user_ips_dups_get_last_seen", get_last_seen + ) + + if not last_seen: + # If we get a None then we're reaching the end and just need to + # delete the last batch. + last = True + + # We fake not having an upper bound by using a future date, by + # just multiplying the current time by two.... + last_seen = int(self.clock.time_msec()) * 2 + else: + last = False + last_seen = last_seen[0] + + def remove(txn, last_seen_progress, last_seen): + # This works by looking at all entries in the given time span, and + # then for each (user_id, access_token, ip) tuple in that range + # checking for any duplicates in the rest of the table (via a join). + # It then only returns entries which have duplicates, and the max + # last_seen across all duplicates, which can the be used to delete + # all other duplicates. + # It is efficient due to the existence of (user_id, access_token, + # ip) and (last_seen) indices. + txn.execute( + """ + SELECT user_id, access_token, ip, + MAX(device_id), MAX(user_agent), MAX(last_seen) + FROM ( + SELECT user_id, access_token, ip + FROM user_ips + WHERE ? <= last_seen AND last_seen < ? + ORDER BY last_seen + ) c + INNER JOIN user_ips USING (user_id, access_token, ip) + GROUP BY user_id, access_token, ip + HAVING count(*) > 1""", + (last_seen_progress, last_seen) + ) + res = txn.fetchall() + + # We've got some duplicates + for i in res: + user_id, access_token, ip, device_id, user_agent, last_seen = i + + # Drop all the duplicates + txn.execute( + """ + DELETE FROM user_ips + WHERE user_id = ? AND access_token = ? AND ip = ? + """, + (user_id, access_token, ip) + ) + + # Add in one to be the last_seen + txn.execute( + """ + INSERT INTO user_ips + (user_id, access_token, ip, device_id, user_agent, last_seen) + VALUES (?, ?, ?, ?, ?, ?) + """, + (user_id, access_token, ip, device_id, user_agent, last_seen) + ) + + self._background_update_progress_txn( + txn, "user_ips_remove_dupes", {"last_seen": last_seen} + ) + + yield self.runInteraction( + "user_ips_dups_remove", remove, last_seen_progress, last_seen + ) + if last: + yield self._end_background_update("user_ips_remove_dupes") + + defer.returnValue(batch_size) + + @defer.inlineCallbacks def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): if not now: @@ -127,10 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_id": user_id, "access_token": access_token, "ip": ip, - "user_agent": user_agent, - "device_id": device_id, }, values={ + "user_agent": user_agent, + "device_id": device_id, "last_seen": last_seen, }, lock=False, @@ -227,7 +357,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): results = {} for key in self._batch_row_update: - uid, access_token, ip = key + uid, access_token, ip, = key if uid == user_id: user_agent, _, last_seen = self._batch_row_update[key] results[(access_token, ip)] = (user_agent, last_seen) diff --git a/synapse/storage/schema/delta/53/user_ips_index.sql b/synapse/storage/schema/delta/53/user_ips_index.sql new file mode 100644 index 0000000000..4ca346c111 --- /dev/null +++ b/synapse/storage/schema/delta/53/user_ips_index.sql @@ -0,0 +1,26 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- delete duplicates +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_remove_dupes', '{}'); + +-- add a new unique index to user_ips table +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('user_ips_device_unique_index', '{}', 'user_ips_remove_dupes'); + +-- drop the old original index +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('user_ips_drop_nonunique_index', '{}', 'user_ips_device_unique_index'); \ No newline at end of file |