diff options
-rw-r--r-- | .travis.yml | 10 | ||||
-rw-r--r-- | README.rst | 5 | ||||
-rw-r--r-- | changelog.d/4342.misc | 1 | ||||
-rw-r--r-- | changelog.d/4368.misc | 1 | ||||
-rw-r--r-- | changelog.d/4369.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/4370.misc | 1 | ||||
-rw-r--r-- | changelog.d/4377.misc | 1 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 141 | ||||
-rw-r--r-- | synapse/storage/_base.py | 10 | ||||
-rw-r--r-- | synapse/storage/client_ips.py | 138 | ||||
-rw-r--r-- | synapse/storage/schema/delta/53/user_ips_index.sql | 26 | ||||
-rw-r--r-- | tests/storage/test_client_ips.py | 71 |
12 files changed, 330 insertions, 76 deletions
diff --git a/.travis.yml b/.travis.yml index 84d5efff9b..3cab77ce4d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,9 @@ cache: # - $HOME/.cache/pip/wheels +addons: + postgresql: "9.4" + # don't clone the whole repo history, one commit will do git: depth: 1 @@ -68,6 +71,13 @@ matrix: install: - pip install tox + + # if we don't have python3.6 in this environment, travis unhelpfully gives us + # a `python3.6` on our path which does nothing but spit out a warning. Tox + # tries to run it (even if we're not running a py36 env), so the build logs + # then have warnings which look like errors. To reduce the noise, remove the + # non-functional python3.6. + - ( ! command -v python3.6 || python3.6 --version ) &>/dev/null || rm -f $(command -v python3.6) script: - tox -e $TOX_ENV diff --git a/README.rst b/README.rst index 4901e0464e..8bff55e78e 100644 --- a/README.rst +++ b/README.rst @@ -184,7 +184,7 @@ Configuring Synapse Before you can start Synapse, you will need to generate a configuration file. To do this, run (in your virtualenv, as before):: - cd ~/.synapse + cd ~/synapse python -m synapse.app.homeserver \ --server-name my.domain.name \ --config-path homeserver.yaml \ @@ -796,8 +796,7 @@ A manual password reset can be done via direct database access as follows. First calculate the hash of the new password:: - $ source ~/.synapse/bin/activate - $ ./scripts/hash_password + $ ~/synapse/env/bin/hash_password Password: Confirm password: $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx diff --git a/changelog.d/4342.misc b/changelog.d/4342.misc new file mode 100644 index 0000000000..a4fda47c20 --- /dev/null +++ b/changelog.d/4342.misc @@ -0,0 +1 @@ +Update README to use the new virtualenv everywhere \ No newline at end of file diff --git a/changelog.d/4368.misc b/changelog.d/4368.misc new file mode 100644 index 0000000000..020dacb547 --- /dev/null +++ b/changelog.d/4368.misc @@ -0,0 +1 @@ +Add better logging for unexpected errors while sending transactions diff --git a/changelog.d/4369.bugfix b/changelog.d/4369.bugfix new file mode 100644 index 0000000000..a78d557932 --- /dev/null +++ b/changelog.d/4369.bugfix @@ -0,0 +1 @@ +Prevent users with access tokens predating the introduction of device IDs from creating spurious entries in the user_ips table. diff --git a/changelog.d/4370.misc b/changelog.d/4370.misc new file mode 100644 index 0000000000..047061ed3c --- /dev/null +++ b/changelog.d/4370.misc @@ -0,0 +1 @@ +Apply a unique index to the user_ips table, preventing duplicates. diff --git a/changelog.d/4377.misc b/changelog.d/4377.misc new file mode 100644 index 0000000000..06273023fc --- /dev/null +++ b/changelog.d/4377.misc @@ -0,0 +1 @@ +Silence travis-ci build warnings by removing non-functional python3.6 \ No newline at end of file diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index be4076fc6a..f2a42f97a6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -229,19 +229,18 @@ class MatrixFederationHttpClient(object): backoff_on_404 (bool): Back off if we get a 404 Returns: - Deferred: resolves with the http response object on success. - - Fails with ``HttpResponseException``: if we get an HTTP response - code >= 300 (except 429). - - Fails with ``NotRetryingDestination`` if we are not yet ready - to retry this server. - - Fails with ``FederationDeniedError`` if this destination - is not on our federation whitelist - - Fails with ``RequestSendFailed`` if there were problems connecting to - the remote, due to e.g. DNS failures, connection timeouts etc. + Deferred[twisted.web.client.Response]: resolves with the HTTP + response object on success. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. """ if timeout: _sec_timeout = timeout / 1000 @@ -516,17 +515,18 @@ class MatrixFederationHttpClient(object): requests) Returns: - Deferred: Succeeds when we get a 2xx HTTP response. The result - will be the decoded JSON body. - - Fails with ``HttpResponseException`` if we get an HTTP response - code >= 300. - - Fails with ``NotRetryingDestination`` if we are not yet ready - to retry this server. - - Fails with ``FederationDeniedError`` if this destination - is not on our federation whitelist + Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + result will be the decoded JSON body. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( @@ -570,17 +570,18 @@ class MatrixFederationHttpClient(object): try the request anyway. args (dict): query params Returns: - Deferred: Succeeds when we get a 2xx HTTP response. The result - will be the decoded JSON body. - - Fails with ``HttpResponseException`` if we get an HTTP response - code >= 300. - - Fails with ``NotRetryingDestination`` if we are not yet ready - to retry this server. - - Fails with ``FederationDeniedError`` if this destination - is not on our federation whitelist + Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + result will be the decoded JSON body. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( @@ -625,17 +626,18 @@ class MatrixFederationHttpClient(object): ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. Returns: - Deferred: Succeeds when we get a 2xx HTTP response. The result - will be the decoded JSON body. - - Fails with ``HttpResponseException`` if we get an HTTP response - code >= 300. - - Fails with ``NotRetryingDestination`` if we are not yet ready - to retry this server. - - Fails with ``FederationDeniedError`` if this destination - is not on our federation whitelist + Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + result will be the decoded JSON body. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. """ logger.debug("get_json args: %s", args) @@ -676,17 +678,18 @@ class MatrixFederationHttpClient(object): ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. Returns: - Deferred: Succeeds when we get a 2xx HTTP response. The result - will be the decoded JSON body. - - Fails with ``HttpResponseException`` if we get an HTTP response - code >= 300. - - Fails with ``NotRetryingDestination`` if we are not yet ready - to retry this server. - - Fails with ``FederationDeniedError`` if this destination - is not on our federation whitelist + Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + result will be the decoded JSON body. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( method="DELETE", @@ -719,18 +722,20 @@ class MatrixFederationHttpClient(object): args (dict): Optional dictionary used to create the query string. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. - Returns: - Deferred: resolves with an (int,dict) tuple of the file length and - a dict of the response headers. - - Fails with ``HttpResponseException`` if we get an HTTP response code - >= 300 - Fails with ``NotRetryingDestination`` if we are not yet ready - to retry this server. - - Fails with ``FederationDeniedError`` if this destination - is not on our federation whitelist + Returns: + Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of + the file length and a dict of the response headers. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. """ request = MatrixFederationRequest( method="GET", diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 1d3069b143..865b5e915a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -547,11 +547,19 @@ class SQLBaseStore(object): if lock: self.database_engine.lock_table(txn, table) + def _getwhere(key): + # If the value we're passing in is None (aka NULL), we need to use + # IS, not =, as NULL = NULL equals NULL (False). + if keyvalues[key] is None: + return "%s IS ?" % (key,) + else: + return "%s = ?" % (key,) + # First try to update. sql = "UPDATE %s SET %s WHERE %s" % ( table, ", ".join("%s = ?" % (k,) for k in values), - " AND ".join("%s = ?" % (k,) for k in keyvalues) + " AND ".join(_getwhere(k) for k in keyvalues) ) sqlargs = list(values.values()) + list(keyvalues.values()) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 9ad17b7c25..5d548f250a 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -65,7 +65,27 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["last_seen"], ) - # (user_id, access_token, ip) -> (user_agent, device_id, last_seen) + self.register_background_update_handler( + "user_ips_remove_dupes", + self._remove_user_ip_dupes, + ) + + # Register a unique index + self.register_background_index_update( + "user_ips_device_unique_index", + index_name="user_ips_user_token_ip_unique_index", + table="user_ips", + columns=["user_id", "access_token", "ip"], + unique=True, + ) + + # Drop the old non-unique index + self.register_background_update_handler( + "user_ips_drop_nonunique_index", + self._remove_user_ip_nonunique, + ) + + # (user_id, access_token, ip,) -> (user_agent, device_id, last_seen) self._batch_row_update = {} self._client_ip_looper = self._clock.looping_call( @@ -76,6 +96,116 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): ) @defer.inlineCallbacks + def _remove_user_ip_nonunique(self, progress, batch_size): + def f(conn): + txn = conn.cursor() + txn.execute( + "DROP INDEX IF EXISTS user_ips_user_ip" + ) + txn.close() + + yield self.runWithConnection(f) + yield self._end_background_update("user_ips_drop_nonunique_index") + defer.returnValue(1) + + @defer.inlineCallbacks + def _remove_user_ip_dupes(self, progress, batch_size): + + last_seen_progress = progress.get("last_seen", 0) + + def get_last_seen(txn): + txn.execute( + """ + SELECT last_seen FROM user_ips + WHERE last_seen > ? + ORDER BY last_seen + LIMIT 1 + OFFSET ? + """, + (last_seen_progress, batch_size) + ) + results = txn.fetchone() + return results + + # Get a last seen that's sufficiently far away enough from the last one + last_seen = yield self.runInteraction( + "user_ips_dups_get_last_seen", get_last_seen + ) + + if not last_seen: + # If we get a None then we're reaching the end and just need to + # delete the last batch. + last = True + + # We fake not having an upper bound by using a future date, by + # just multiplying the current time by two.... + last_seen = int(self.clock.time_msec()) * 2 + else: + last = False + last_seen = last_seen[0] + + def remove(txn, last_seen_progress, last_seen): + # This works by looking at all entries in the given time span, and + # then for each (user_id, access_token, ip) tuple in that range + # checking for any duplicates in the rest of the table (via a join). + # It then only returns entries which have duplicates, and the max + # last_seen across all duplicates, which can the be used to delete + # all other duplicates. + # It is efficient due to the existence of (user_id, access_token, + # ip) and (last_seen) indices. + txn.execute( + """ + SELECT user_id, access_token, ip, + MAX(device_id), MAX(user_agent), MAX(last_seen) + FROM ( + SELECT user_id, access_token, ip + FROM user_ips + WHERE ? <= last_seen AND last_seen < ? + ORDER BY last_seen + ) c + INNER JOIN user_ips USING (user_id, access_token, ip) + GROUP BY user_id, access_token, ip + HAVING count(*) > 1""", + (last_seen_progress, last_seen) + ) + res = txn.fetchall() + + # We've got some duplicates + for i in res: + user_id, access_token, ip, device_id, user_agent, last_seen = i + + # Drop all the duplicates + txn.execute( + """ + DELETE FROM user_ips + WHERE user_id = ? AND access_token = ? AND ip = ? + """, + (user_id, access_token, ip) + ) + + # Add in one to be the last_seen + txn.execute( + """ + INSERT INTO user_ips + (user_id, access_token, ip, device_id, user_agent, last_seen) + VALUES (?, ?, ?, ?, ?, ?) + """, + (user_id, access_token, ip, device_id, user_agent, last_seen) + ) + + self._background_update_progress_txn( + txn, "user_ips_remove_dupes", {"last_seen": last_seen} + ) + + yield self.runInteraction( + "user_ips_dups_remove", remove, last_seen_progress, last_seen + ) + if last: + yield self._end_background_update("user_ips_remove_dupes") + + defer.returnValue(batch_size) + + @defer.inlineCallbacks def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id, now=None): if not now: @@ -127,10 +257,10 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): "user_id": user_id, "access_token": access_token, "ip": ip, - "user_agent": user_agent, - "device_id": device_id, }, values={ + "user_agent": user_agent, + "device_id": device_id, "last_seen": last_seen, }, lock=False, @@ -227,7 +357,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): results = {} for key in self._batch_row_update: - uid, access_token, ip = key + uid, access_token, ip, = key if uid == user_id: user_agent, _, last_seen = self._batch_row_update[key] results[(access_token, ip)] = (user_agent, last_seen) diff --git a/synapse/storage/schema/delta/53/user_ips_index.sql b/synapse/storage/schema/delta/53/user_ips_index.sql new file mode 100644 index 0000000000..4ca346c111 --- /dev/null +++ b/synapse/storage/schema/delta/53/user_ips_index.sql @@ -0,0 +1,26 @@ +/* Copyright 2018 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- delete duplicates +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_remove_dupes', '{}'); + +-- add a new unique index to user_ips table +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('user_ips_device_unique_index', '{}', 'user_ips_remove_dupes'); + +-- drop the old original index +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('user_ips_drop_nonunique_index', '{}', 'user_ips_device_unique_index'); \ No newline at end of file diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py index 4577e9422b..858efe4992 100644 --- a/tests/storage/test_client_ips.py +++ b/tests/storage/test_client_ips.py @@ -62,6 +62,77 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase): r, ) + def test_insert_new_client_ip_none_device_id(self): + """ + An insert with a device ID of NULL will not create a new entry, but + update an existing entry in the user_ips table. + """ + self.reactor.advance(12345678) + + user_id = "@user:id" + + # Add & trigger the storage loop + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", None + ) + ) + self.reactor.advance(200) + self.pump(0) + + result = self.get_success( + self.store._simple_select_list( + table="user_ips", + keyvalues={"user_id": user_id}, + retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"], + desc="get_user_ip_and_agents", + ) + ) + + self.assertEqual( + result, + [ + { + 'access_token': 'access_token', + 'ip': 'ip', + 'user_agent': 'user_agent', + 'device_id': None, + 'last_seen': 12345678000, + } + ], + ) + + # Add another & trigger the storage loop + self.get_success( + self.store.insert_client_ip( + user_id, "access_token", "ip", "user_agent", None + ) + ) + self.reactor.advance(10) + self.pump(0) + + result = self.get_success( + self.store._simple_select_list( + table="user_ips", + keyvalues={"user_id": user_id}, + retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"], + desc="get_user_ip_and_agents", + ) + ) + # Only one result, has been upserted. + self.assertEqual( + result, + [ + { + 'access_token': 'access_token', + 'ip': 'ip', + 'user_agent': 'user_agent', + 'device_id': None, + 'last_seen': 12345878000, + } + ], + ) + def test_disabled_monthly_active_user(self): self.hs.config.limit_usage_by_mau = False self.hs.config.max_mau_value = 50 |