diff options
101 files changed, 2446 insertions, 808 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml index ec3848b048..5395028426 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,99 +23,106 @@ jobs: - run: docker push matrixdotorg/synapse:latest - run: docker push matrixdotorg/synapse:latest-py3 sytestpy2: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy2postgres: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy2merged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs - + path: /logs sytestpy2postgresmerged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3postgres: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3merged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3postgresmerged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs workflows: version: 2 diff --git a/.circleci/merge_base_branch.sh b/.circleci/merge_base_branch.sh index 6b0bf3aa48..b2c8c40f4c 100755 --- a/.circleci/merge_base_branch.sh +++ b/.circleci/merge_base_branch.sh @@ -16,7 +16,7 @@ then GITBASE="develop" else # Get the reference, using the GitHub API - GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'` + GITBASE=`wget -O- https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'` fi # Show what we are before @@ -31,4 +31,4 @@ git fetch -u origin $GITBASE git merge --no-edit origin/$GITBASE # Show what we are after. -git show -s \ No newline at end of file +git show -s diff --git a/.travis.yml b/.travis.yml index 2077f6af72..7ee1a278db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,7 @@ matrix: - python: 2.7 env: TOX_ENV=packaging - - python: 2.7 + - python: 3.6 env: TOX_ENV=pep8 - python: 2.7 diff --git a/CHANGES.md b/CHANGES.md index 048b9f95db..fb98c934c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,58 @@ +Synapse 0.33.7 (2018-10-18) +=========================== + +**Warning**: This release removes the example email notification templates from +`res/templates` (they are now internal to the python package). This should only +affect you if you (a) deploy your Synapse instance from a git checkout or a +github snapshot URL, and (b) have email notifications enabled. + +If you have email notifications enabled, you should ensure that +`email.template_dir` is either configured to point at a directory where you +have installed customised templates, or leave it unset to use the default +templates. + +Synapse 0.33.7rc2 (2018-10-17) +============================== + +Features +-------- + +- Ship the example email templates as part of the package ([\#4052](https://github.com/matrix-org/synapse/issues/4052)) + +Bugfixes +-------- + +- Fix bug which made get_missing_events return too few events ([\#4045](https://github.com/matrix-org/synapse/issues/4045)) + + +Synapse 0.33.7rc1 (2018-10-15) +============================== + +Features +-------- + +- Add support for end-to-end key backup (MSC1687) ([\#4019](https://github.com/matrix-org/synapse/issues/4019)) + + +Bugfixes +-------- + +- Fix bug in event persistence logic which caused 'NoneType is not iterable' ([\#3995](https://github.com/matrix-org/synapse/issues/3995)) +- Fix exception in background metrics collection ([\#3996](https://github.com/matrix-org/synapse/issues/3996)) +- Fix exception handling in fetching remote profiles ([\#3997](https://github.com/matrix-org/synapse/issues/3997)) +- Fix handling of rejected threepid invites ([\#3999](https://github.com/matrix-org/synapse/issues/3999)) +- Workers now start on Python 3. ([\#4027](https://github.com/matrix-org/synapse/issues/4027)) +- Synapse now starts on Python 3.7. ([\#4033](https://github.com/matrix-org/synapse/issues/4033)) + + +Internal Changes +---------------- + +- Log exceptions in looping calls ([\#4008](https://github.com/matrix-org/synapse/issues/4008)) +- Optimisation for serving federation requests ([\#4017](https://github.com/matrix-org/synapse/issues/4017)) +- Add metric to count number of non-empty sync responses ([\#4022](https://github.com/matrix-org/synapse/issues/4022)) + + Synapse 0.33.6 (2018-10-04) =========================== diff --git a/MANIFEST.in b/MANIFEST.in index c6a37ac685..25cdf0a61b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -12,12 +12,12 @@ recursive-include synapse/storage/schema *.sql recursive-include synapse/storage/schema *.py recursive-include docs * -recursive-include res * recursive-include scripts * recursive-include scripts-dev * recursive-include synapse *.pyi recursive-include tests *.py +recursive-include synapse/res * recursive-include synapse/static *.css recursive-include synapse/static *.gif recursive-include synapse/static *.html diff --git a/README.rst b/README.rst index e1ea351f84..456a3d9d43 100644 --- a/README.rst +++ b/README.rst @@ -174,6 +174,12 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate a synapse server in a single Docker image, at https://hub.docker.com/r/avhost/docker-matrix/tags/ +Slavi Pantaleev has created an Ansible playbook, +which installs the offical Docker image of Matrix Synapse +along with many other Matrix-related services (Postgres database, riot-web, coturn, mxisd, SSL support, etc.). +For more details, see +https://github.com/spantaleev/matrix-docker-ansible-deploy + Configuring Synapse ------------------- diff --git a/UPGRADE.rst b/UPGRADE.rst index 6cf3169f75..55c77eedde 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -48,6 +48,19 @@ returned by the Client-Server API: # configured on port 443. curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:" +Upgrading to v0.33.7 +==================== + +This release removes the example email notification templates from +``res/templates`` (they are now internal to the python package). This should +only affect you if you (a) deploy your Synapse instance from a git checkout or +a github snapshot URL, and (b) have email notifications enabled. + +If you have email notifications enabled, you should ensure that +``email.template_dir`` is either configured to point at a directory where you +have installed customised templates, or leave it unset to use the default +templates. + Upgrading to v0.27.3 ==================== diff --git a/changelog.d/3698.misc b/changelog.d/3698.misc new file mode 100644 index 0000000000..12537e76f2 --- /dev/null +++ b/changelog.d/3698.misc @@ -0,0 +1 @@ +Add information about the [matrix-docker-ansible-deploy](https://github.com/spantaleev/matrix-docker-ansible-deploy) playbook diff --git a/changelog.d/3995.bugfix b/changelog.d/3995.bugfix deleted file mode 100644 index 2adc36756b..0000000000 --- a/changelog.d/3995.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix bug in event persistence logic which caused 'NoneType is not iterable' \ No newline at end of file diff --git a/changelog.d/3996.bugfix b/changelog.d/3996.bugfix deleted file mode 100644 index a056485ea1..0000000000 --- a/changelog.d/3996.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix exception in background metrics collection diff --git a/changelog.d/3997.bugfix b/changelog.d/3997.bugfix deleted file mode 100644 index b060ee8c18..0000000000 --- a/changelog.d/3997.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix exception handling in fetching remote profiles diff --git a/changelog.d/3999.bugfix b/changelog.d/3999.bugfix deleted file mode 100644 index dc3b2caffa..0000000000 --- a/changelog.d/3999.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix handling of rejected threepid invites diff --git a/changelog.d/4008.misc b/changelog.d/4008.misc deleted file mode 100644 index 5730210054..0000000000 --- a/changelog.d/4008.misc +++ /dev/null @@ -1 +0,0 @@ -Log exceptions in looping calls diff --git a/changelog.d/4017.misc b/changelog.d/4017.misc deleted file mode 100644 index b1ceb06560..0000000000 --- a/changelog.d/4017.misc +++ /dev/null @@ -1 +0,0 @@ -Optimisation for serving federation requests \ No newline at end of file diff --git a/changelog.d/4022.misc b/changelog.d/4022.misc deleted file mode 100644 index 5b0e136795..0000000000 --- a/changelog.d/4022.misc +++ /dev/null @@ -1 +0,0 @@ -Add metric to count number of non-empty sync responses diff --git a/changelog.d/4027.bugfix b/changelog.d/4027.bugfix deleted file mode 100644 index c9bbff3f68..0000000000 --- a/changelog.d/4027.bugfix +++ /dev/null @@ -1 +0,0 @@ -Workers now start on Python 3. diff --git a/changelog.d/4031.misc b/changelog.d/4031.misc new file mode 100644 index 0000000000..60be8b59fd --- /dev/null +++ b/changelog.d/4031.misc @@ -0,0 +1 @@ +Various cleanups in the federation client code diff --git a/changelog.d/4041.misc b/changelog.d/4041.misc new file mode 100644 index 0000000000..8cce9daac9 --- /dev/null +++ b/changelog.d/4041.misc @@ -0,0 +1 @@ +Run the CircleCI builds in docker containers diff --git a/changelog.d/4046.bugfix b/changelog.d/4046.bugfix new file mode 100644 index 0000000000..5046dd1ce3 --- /dev/null +++ b/changelog.d/4046.bugfix @@ -0,0 +1 @@ +Fix issue where Python 3 users couldn't paginate /publicRooms diff --git a/changelog.d/4049.misc b/changelog.d/4049.misc new file mode 100644 index 0000000000..4370d9dfa6 --- /dev/null +++ b/changelog.d/4049.misc @@ -0,0 +1 @@ +Only colourise synctl output when attached to tty diff --git a/changelog.d/4050.bugfix b/changelog.d/4050.bugfix new file mode 100644 index 0000000000..3d1f6af847 --- /dev/null +++ b/changelog.d/4050.bugfix @@ -0,0 +1 @@ +Fix URL priewing to work in Python 3.7 diff --git a/changelog.d/4057.bugfix b/changelog.d/4057.bugfix new file mode 100644 index 0000000000..7577731255 --- /dev/null +++ b/changelog.d/4057.bugfix @@ -0,0 +1 @@ +synctl will use the right python executable to run worker processes \ No newline at end of file diff --git a/changelog.d/4060.bugfix b/changelog.d/4060.bugfix new file mode 100644 index 0000000000..78d69a8819 --- /dev/null +++ b/changelog.d/4060.bugfix @@ -0,0 +1 @@ +Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting. diff --git a/changelog.d/4061.bugfix b/changelog.d/4061.bugfix new file mode 100644 index 0000000000..94ffcf7a51 --- /dev/null +++ b/changelog.d/4061.bugfix @@ -0,0 +1 @@ +Fix some metrics being racy and causing exceptions when polled by Prometheus. diff --git a/changelog.d/4067.bugfix b/changelog.d/4067.bugfix new file mode 100644 index 0000000000..78d69a8819 --- /dev/null +++ b/changelog.d/4067.bugfix @@ -0,0 +1 @@ +Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting. diff --git a/changelog.d/4068.bugfix b/changelog.d/4068.bugfix new file mode 100644 index 0000000000..74bda7491f --- /dev/null +++ b/changelog.d/4068.bugfix @@ -0,0 +1 @@ +Fix bug which prevented email notifications from being sent unless an absolute path was given for `email_templates`. \ No newline at end of file diff --git a/changelog.d/4068.misc b/changelog.d/4068.misc new file mode 100644 index 0000000000..db6c4ade59 --- /dev/null +++ b/changelog.d/4068.misc @@ -0,0 +1 @@ +Make the Python scripts in the top-level scripts folders meet pep8 and pass flake8. diff --git a/changelog.d/4073.misc b/changelog.d/4073.misc new file mode 100644 index 0000000000..fc304bef06 --- /dev/null +++ b/changelog.d/4073.misc @@ -0,0 +1 @@ +Add psutil as an explicit dependency diff --git a/changelog.d/4074.bugfix b/changelog.d/4074.bugfix new file mode 100644 index 0000000000..b3b6b00243 --- /dev/null +++ b/changelog.d/4074.bugfix @@ -0,0 +1 @@ +Correctly account for cpu usage by background threads diff --git a/changelog.d/4076.misc b/changelog.d/4076.misc new file mode 100644 index 0000000000..9dd000decf --- /dev/null +++ b/changelog.d/4076.misc @@ -0,0 +1 @@ +Correctly manage logcontexts during startup to fix some "Unexpected logging context" warnings \ No newline at end of file diff --git a/changelog.d/4077.misc b/changelog.d/4077.misc new file mode 100644 index 0000000000..52ca4c1de2 --- /dev/null +++ b/changelog.d/4077.misc @@ -0,0 +1 @@ +Give some more things logcontexts diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index cfe88788f2..a38b929f50 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -211,7 +211,9 @@ email: require_transport_security: False notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}" app_name: Matrix - template_dir: res/templates + # if template_dir is unset, uses the example templates that are part of + # the Synapse distribution. + #template_dir: res/templates notif_template_html: notif_mail.html notif_template_text: notif_mail.txt notif_for_new_users: True diff --git a/scripts-dev/check_auth.py b/scripts-dev/check_auth.py index 4fa8792a5f..b3d11f49ec 100644 --- a/scripts-dev/check_auth.py +++ b/scripts-dev/check_auth.py @@ -1,21 +1,20 @@ -from synapse.events import FrozenEvent -from synapse.api.auth import Auth - -from mock import Mock +from __future__ import print_function import argparse import itertools import json import sys +from mock import Mock + +from synapse.api.auth import Auth +from synapse.events import FrozenEvent + def check_auth(auth, auth_chain, events): auth_chain.sort(key=lambda e: e.depth) - auth_map = { - e.event_id: e - for e in auth_chain - } + auth_map = {e.event_id: e for e in auth_chain} create_events = {} for e in auth_chain: @@ -25,31 +24,26 @@ def check_auth(auth, auth_chain, events): for e in itertools.chain(auth_chain, events): auth_events_list = [auth_map[i] for i, _ in e.auth_events] - auth_events = { - (e.type, e.state_key): e - for e in auth_events_list - } + auth_events = {(e.type, e.state_key): e for e in auth_events_list} auth_events[("m.room.create", "")] = create_events[e.room_id] try: auth.check(e, auth_events=auth_events) except Exception as ex: - print "Failed:", e.event_id, e.type, e.state_key - print "Auth_events:", auth_events - print ex - print json.dumps(e.get_dict(), sort_keys=True, indent=4) + print("Failed:", e.event_id, e.type, e.state_key) + print("Auth_events:", auth_events) + print(ex) + print(json.dumps(e.get_dict(), sort_keys=True, indent=4)) # raise - print "Success:", e.event_id, e.type, e.state_key + print("Success:", e.event_id, e.type, e.state_key) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( - 'json', - nargs='?', - type=argparse.FileType('r'), - default=sys.stdin, + 'json', nargs='?', type=argparse.FileType('r'), default=sys.stdin ) args = parser.parse_args() diff --git a/scripts-dev/check_event_hash.py b/scripts-dev/check_event_hash.py index 7ccae34d48..8535f99697 100644 --- a/scripts-dev/check_event_hash.py +++ b/scripts-dev/check_event_hash.py @@ -1,10 +1,15 @@ -from synapse.crypto.event_signing import * -from unpaddedbase64 import encode_base64 - import argparse import hashlib -import sys import json +import logging +import sys + +from unpaddedbase64 import encode_base64 + +from synapse.crypto.event_signing import ( + check_event_content_hash, + compute_event_reference_hash, +) class dictobj(dict): @@ -24,27 +29,26 @@ class dictobj(dict): def main(): parser = argparse.ArgumentParser() - parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), - default=sys.stdin) + parser.add_argument( + "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin + ) args = parser.parse_args() logging.basicConfig() event_json = dictobj(json.load(args.input_json)) - algorithms = { - "sha256": hashlib.sha256, - } + algorithms = {"sha256": hashlib.sha256} for alg_name in event_json.hashes: if check_event_content_hash(event_json, algorithms[alg_name]): - print "PASS content hash %s" % (alg_name,) + print("PASS content hash %s" % (alg_name,)) else: - print "FAIL content hash %s" % (alg_name,) + print("FAIL content hash %s" % (alg_name,)) for algorithm in algorithms.values(): name, h_bytes = compute_event_reference_hash(event_json, algorithm) - print "Reference hash %s: %s" % (name, encode_base64(h_bytes)) + print("Reference hash %s: %s" % (name, encode_base64(h_bytes))) -if __name__=="__main__": - main() +if __name__ == "__main__": + main() diff --git a/scripts-dev/check_signature.py b/scripts-dev/check_signature.py index 079577908a..612f17ca7f 100644 --- a/scripts-dev/check_signature.py +++ b/scripts-dev/check_signature.py @@ -1,15 +1,15 @@ -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes, write_signing_keys -from unpaddedbase64 import decode_base64 - -import urllib2 +import argparse import json +import logging import sys +import urllib2 + import dns.resolver -import pprint -import argparse -import logging +from signedjson.key import decode_verify_key_bytes, write_signing_keys +from signedjson.sign import verify_signed_json +from unpaddedbase64 import decode_base64 + def get_targets(server_name): if ":" in server_name: @@ -23,6 +23,7 @@ def get_targets(server_name): except dns.resolver.NXDOMAIN: yield (server_name, 8448) + def get_server_keys(server_name, target, port): url = "https://%s:%i/_matrix/key/v1" % (target, port) keys = json.load(urllib2.urlopen(url)) @@ -33,12 +34,14 @@ def get_server_keys(server_name, target, port): verify_keys[key_id] = verify_key return verify_keys + def main(): parser = argparse.ArgumentParser() parser.add_argument("signature_name") - parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), - default=sys.stdin) + parser.add_argument( + "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin + ) args = parser.parse_args() logging.basicConfig() @@ -48,24 +51,23 @@ def main(): for target, port in get_targets(server_name): try: keys = get_server_keys(server_name, target, port) - print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port) + print("Using keys from https://%s:%s/_matrix/key/v1" % (target, port)) write_signing_keys(sys.stdout, keys.values()) break - except: + except Exception: logging.exception("Error talking to %s:%s", target, port) json_to_check = json.load(args.input_json) - print "Checking JSON:" + print("Checking JSON:") for key_id in json_to_check["signatures"][args.signature_name]: try: key = keys[key_id] verify_signed_json(json_to_check, args.signature_name, key) - print "PASS %s" % (key_id,) - except: + print("PASS %s" % (key_id,)) + except Exception: logging.exception("Check for key %s failed" % (key_id,)) - print "FAIL %s" % (key_id,) + print("FAIL %s" % (key_id,)) if __name__ == '__main__': main() - diff --git a/scripts-dev/convert_server_keys.py b/scripts-dev/convert_server_keys.py index 151551f22c..dde8596697 100644 --- a/scripts-dev/convert_server_keys.py +++ b/scripts-dev/convert_server_keys.py @@ -1,13 +1,21 @@ -import psycopg2 -import yaml -import sys +import hashlib import json +import sys import time -import hashlib -from unpaddedbase64 import encode_base64 + +import six + +import psycopg2 +import yaml +from canonicaljson import encode_canonical_json from signedjson.key import read_signing_keys from signedjson.sign import sign_json -from canonicaljson import encode_canonical_json +from unpaddedbase64 import encode_base64 + +if six.PY2: + db_type = six.moves.builtins.buffer +else: + db_type = memoryview def select_v1_keys(connection): @@ -39,7 +47,9 @@ def select_v2_json(connection): cursor.close() results = {} for server_name, key_id, key_json in rows: - results.setdefault(server_name, {})[key_id] = json.loads(str(key_json).decode("utf-8")) + results.setdefault(server_name, {})[key_id] = json.loads( + str(key_json).decode("utf-8") + ) return results @@ -47,10 +57,7 @@ def convert_v1_to_v2(server_name, valid_until, keys, certificate): return { "old_verify_keys": {}, "server_name": server_name, - "verify_keys": { - key_id: {"key": key} - for key_id, key in keys.items() - }, + "verify_keys": {key_id: {"key": key} for key_id, key in keys.items()}, "valid_until_ts": valid_until, "tls_fingerprints": [fingerprint(certificate)], } @@ -65,7 +72,7 @@ def rows_v2(server, json): valid_until = json["valid_until_ts"] key_json = encode_canonical_json(json) for key_id in json["verify_keys"]: - yield (server, key_id, "-", valid_until, valid_until, buffer(key_json)) + yield (server, key_id, "-", valid_until, valid_until, db_type(key_json)) def main(): @@ -87,7 +94,7 @@ def main(): result = {} for server in keys: - if not server in json: + if server not in json: v2_json = convert_v1_to_v2( server, valid_until, keys[server], certificates[server] ) @@ -96,10 +103,7 @@ def main(): yaml.safe_dump(result, sys.stdout, default_flow_style=False) - rows = list( - row for server, json in result.items() - for row in rows_v2(server, json) - ) + rows = list(row for server, json in result.items() for row in rows_v2(server, json)) cursor = connection.cursor() cursor.executemany( @@ -107,7 +111,7 @@ def main(): " server_name, key_id, from_server," " ts_added_ms, ts_valid_until_ms, key_json" ") VALUES (%s, %s, %s, %s, %s, %s)", - rows + rows, ) connection.commit() diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py index 47dac7772d..1deb0fe2b7 100755 --- a/scripts-dev/definitions.py +++ b/scripts-dev/definitions.py @@ -1,8 +1,16 @@ #! /usr/bin/python +from __future__ import print_function + +import argparse import ast +import os +import re +import sys + import yaml + class DefinitionVisitor(ast.NodeVisitor): def __init__(self): super(DefinitionVisitor, self).__init__() @@ -42,15 +50,18 @@ def non_empty(defs): functions = {name: non_empty(f) for name, f in defs['def'].items()} classes = {name: non_empty(f) for name, f in defs['class'].items()} result = {} - if functions: result['def'] = functions - if classes: result['class'] = classes + if functions: + result['def'] = functions + if classes: + result['class'] = classes names = defs['names'] uses = [] for name in names.get('Load', ()): if name not in names.get('Param', ()) and name not in names.get('Store', ()): uses.append(name) uses.extend(defs['attrs']) - if uses: result['uses'] = uses + if uses: + result['uses'] = uses result['names'] = names result['attrs'] = defs['attrs'] return result @@ -95,7 +106,6 @@ def used_names(prefix, item, defs, names): if __name__ == '__main__': - import sys, os, argparse, re parser = argparse.ArgumentParser(description='Find definitions.') parser.add_argument( @@ -105,24 +115,28 @@ if __name__ == '__main__': "--ignore", action="append", metavar="REGEXP", help="Ignore a pattern" ) parser.add_argument( - "--pattern", action="append", metavar="REGEXP", - help="Search for a pattern" + "--pattern", action="append", metavar="REGEXP", help="Search for a pattern" ) parser.add_argument( - "directories", nargs='+', metavar="DIR", - help="Directories to search for definitions" + "directories", + nargs='+', + metavar="DIR", + help="Directories to search for definitions", ) parser.add_argument( - "--referrers", default=0, type=int, - help="Include referrers up to the given depth" + "--referrers", + default=0, + type=int, + help="Include referrers up to the given depth", ) parser.add_argument( - "--referred", default=0, type=int, - help="Include referred down to the given depth" + "--referred", + default=0, + type=int, + help="Include referred down to the given depth", ) parser.add_argument( - "--format", default="yaml", - help="Output format, one of 'yaml' or 'dot'" + "--format", default="yaml", help="Output format, one of 'yaml' or 'dot'" ) args = parser.parse_args() @@ -162,7 +176,7 @@ if __name__ == '__main__': for used_by in entry.get("used", ()): referrers.add(used_by) for name, definition in names.items(): - if not name in referrers: + if name not in referrers: continue if ignore and any(pattern.match(name) for pattern in ignore): continue @@ -176,7 +190,7 @@ if __name__ == '__main__': for uses in entry.get("uses", ()): referred.add(uses) for name, definition in names.items(): - if not name in referred: + if name not in referred: continue if ignore and any(pattern.match(name) for pattern in ignore): continue @@ -185,12 +199,12 @@ if __name__ == '__main__': if args.format == 'yaml': yaml.dump(result, sys.stdout, default_flow_style=False) elif args.format == 'dot': - print "digraph {" + print("digraph {") for name, entry in result.items(): - print name + print(name) for used_by in entry.get("used", ()): if used_by in result: - print used_by, "->", name - print "}" + print(used_by, "->", name) + print("}") else: raise ValueError("Unknown format %r" % (args.format)) diff --git a/scripts-dev/dump_macaroon.py b/scripts-dev/dump_macaroon.py index fcc5568835..22b30fa78e 100755 --- a/scripts-dev/dump_macaroon.py +++ b/scripts-dev/dump_macaroon.py @@ -1,8 +1,11 @@ #!/usr/bin/env python2 -import pymacaroons +from __future__ import print_function + import sys +import pymacaroons + if len(sys.argv) == 1: sys.stderr.write("usage: %s macaroon [key]\n" % (sys.argv[0],)) sys.exit(1) @@ -11,14 +14,14 @@ macaroon_string = sys.argv[1] key = sys.argv[2] if len(sys.argv) > 2 else None macaroon = pymacaroons.Macaroon.deserialize(macaroon_string) -print macaroon.inspect() +print(macaroon.inspect()) -print "" +print("") verifier = pymacaroons.Verifier() verifier.satisfy_general(lambda c: True) try: verifier.verify(macaroon, key) - print "Signature is correct" + print("Signature is correct") except Exception as e: - print str(e) + print(str(e)) diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index d2acc7654d..2566ce7cef 100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -18,21 +18,21 @@ from __future__ import print_function import argparse +import base64 +import json +import sys from urlparse import urlparse, urlunparse import nacl.signing -import json -import base64 import requests -import sys - -from requests.adapters import HTTPAdapter import srvlookup import yaml +from requests.adapters import HTTPAdapter # uncomment the following to enable debug logging of http requests -#from httplib import HTTPConnection -#HTTPConnection.debuglevel = 1 +# from httplib import HTTPConnection +# HTTPConnection.debuglevel = 1 + def encode_base64(input_bytes): """Encode bytes as a base64 string without any padding.""" @@ -58,15 +58,15 @@ def decode_base64(input_string): def encode_canonical_json(value): return json.dumps( - value, - # Encode code-points outside of ASCII as UTF-8 rather than \u escapes - ensure_ascii=False, - # Remove unecessary white space. - separators=(',',':'), - # Sort the keys of dictionaries. - sort_keys=True, - # Encode the resulting unicode as UTF-8 bytes. - ).encode("UTF-8") + value, + # Encode code-points outside of ASCII as UTF-8 rather than \u escapes + ensure_ascii=False, + # Remove unecessary white space. + separators=(',', ':'), + # Sort the keys of dictionaries. + sort_keys=True, + # Encode the resulting unicode as UTF-8 bytes. + ).encode("UTF-8") def sign_json(json_object, signing_key, signing_name): @@ -88,6 +88,7 @@ def sign_json(json_object, signing_key, signing_name): NACL_ED25519 = "ed25519" + def decode_signing_key_base64(algorithm, version, key_base64): """Decode a base64 encoded signing key Args: @@ -143,14 +144,12 @@ def request_json(method, origin_name, origin_key, destination, path, content): authorization_headers = [] for key, sig in signed_json["signatures"][origin_name].items(): - header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( - origin_name, key, sig, - ) + header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (origin_name, key, sig) authorization_headers.append(bytes(header)) - print ("Authorization: %s" % header, file=sys.stderr) + print("Authorization: %s" % header, file=sys.stderr) dest = "matrix://%s%s" % (destination, path) - print ("Requesting %s" % dest, file=sys.stderr) + print("Requesting %s" % dest, file=sys.stderr) s = requests.Session() s.mount("matrix://", MatrixConnectionAdapter()) @@ -158,10 +157,7 @@ def request_json(method, origin_name, origin_key, destination, path, content): result = s.request( method=method, url=dest, - headers={ - "Host": destination, - "Authorization": authorization_headers[0] - }, + headers={"Host": destination, "Authorization": authorization_headers[0]}, verify=False, data=content, ) @@ -171,50 +167,50 @@ def request_json(method, origin_name, origin_key, destination, path, content): def main(): parser = argparse.ArgumentParser( - description= - "Signs and sends a federation request to a matrix homeserver", + description="Signs and sends a federation request to a matrix homeserver" ) parser.add_argument( - "-N", "--server-name", + "-N", + "--server-name", help="Name to give as the local homeserver. If unspecified, will be " - "read from the config file.", + "read from the config file.", ) parser.add_argument( - "-k", "--signing-key-path", + "-k", + "--signing-key-path", help="Path to the file containing the private ed25519 key to sign the " - "request with.", + "request with.", ) parser.add_argument( - "-c", "--config", + "-c", + "--config", default="homeserver.yaml", help="Path to server config file. Ignored if --server-name and " - "--signing-key-path are both given.", + "--signing-key-path are both given.", ) parser.add_argument( - "-d", "--destination", + "-d", + "--destination", default="matrix.org", help="name of the remote homeserver. We will do SRV lookups and " - "connect appropriately.", + "connect appropriately.", ) parser.add_argument( - "-X", "--method", + "-X", + "--method", help="HTTP method to use for the request. Defaults to GET if --data is" - "unspecified, POST if it is." + "unspecified, POST if it is.", ) - parser.add_argument( - "--body", - help="Data to send as the body of the HTTP request" - ) + parser.add_argument("--body", help="Data to send as the body of the HTTP request") parser.add_argument( - "path", - help="request path. We will add '/_matrix/federation/v1/' to this." + "path", help="request path. We will add '/_matrix/federation/v1/' to this." ) args = parser.parse_args() @@ -227,13 +223,15 @@ def main(): result = request_json( args.method, - args.server_name, key, args.destination, + args.server_name, + key, + args.destination, "/_matrix/federation/v1/" + args.path, content=args.body, ) json.dump(result, sys.stdout) - print ("") + print("") def read_args_from_config(args): @@ -253,7 +251,7 @@ class MatrixConnectionAdapter(HTTPAdapter): return s, 8448 if ":" in s: - out = s.rsplit(":",1) + out = s.rsplit(":", 1) try: port = int(out[1]) except ValueError: @@ -263,7 +261,7 @@ class MatrixConnectionAdapter(HTTPAdapter): try: srv = srvlookup.lookup("matrix", "tcp", s)[0] return srv.host, srv.port - except: + except Exception: return s, 8448 def get_connection(self, url, proxies=None): @@ -272,10 +270,9 @@ class MatrixConnectionAdapter(HTTPAdapter): (host, port) = self.lookup(parsed.netloc) netloc = "%s:%d" % (host, port) print("Connecting to %s" % (netloc,), file=sys.stderr) - url = urlunparse(( - "https", netloc, parsed.path, parsed.params, parsed.query, - parsed.fragment, - )) + url = urlunparse( + ("https", netloc, parsed.path, parsed.params, parsed.query, parsed.fragment) + ) return super(MatrixConnectionAdapter, self).get_connection(url, proxies) diff --git a/scripts-dev/hash_history.py b/scripts-dev/hash_history.py index 616d6a10e7..514d80fa60 100644 --- a/scripts-dev/hash_history.py +++ b/scripts-dev/hash_history.py @@ -1,23 +1,31 @@ -from synapse.storage.pdu import PduStore -from synapse.storage.signatures import SignatureStore -from synapse.storage._base import SQLBaseStore -from synapse.federation.units import Pdu -from synapse.crypto.event_signing import ( - add_event_pdu_content_hash, compute_pdu_event_reference_hash -) -from synapse.api.events.utils import prune_pdu -from unpaddedbase64 import encode_base64, decode_base64 -from canonicaljson import encode_canonical_json +from __future__ import print_function + import sqlite3 import sys +from unpaddedbase64 import decode_base64, encode_base64 + +from synapse.crypto.event_signing import ( + add_event_pdu_content_hash, + compute_pdu_event_reference_hash, +) +from synapse.federation.units import Pdu +from synapse.storage._base import SQLBaseStore +from synapse.storage.pdu import PduStore +from synapse.storage.signatures import SignatureStore + + class Store(object): _get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"] _get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"] _get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"] - _get_pdu_origin_signatures_txn = SignatureStore.__dict__["_get_pdu_origin_signatures_txn"] + _get_pdu_origin_signatures_txn = SignatureStore.__dict__[ + "_get_pdu_origin_signatures_txn" + ] _store_pdu_content_hash_txn = SignatureStore.__dict__["_store_pdu_content_hash_txn"] - _store_pdu_reference_hash_txn = SignatureStore.__dict__["_store_pdu_reference_hash_txn"] + _store_pdu_reference_hash_txn = SignatureStore.__dict__[ + "_store_pdu_reference_hash_txn" + ] _store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"] _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] @@ -26,9 +34,7 @@ store = Store() def select_pdus(cursor): - cursor.execute( - "SELECT pdu_id, origin FROM pdus ORDER BY depth ASC" - ) + cursor.execute("SELECT pdu_id, origin FROM pdus ORDER BY depth ASC") ids = cursor.fetchall() @@ -41,23 +47,30 @@ def select_pdus(cursor): for pdu in pdus: try: if pdu.prev_pdus: - print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus + print("PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus) for pdu_id, origin, hashes in pdu.prev_pdus: ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)] hashes[ref_alg] = encode_base64(ref_hsh) - store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh) - print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus + store._store_prev_pdu_hash_txn( + cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh + ) + print("SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus) pdu = add_event_pdu_content_hash(pdu) ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu) reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh) - store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh) + store._store_pdu_reference_hash_txn( + cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh + ) for alg, hsh_base64 in pdu.hashes.items(): - print alg, hsh_base64 - store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64)) + print(alg, hsh_base64) + store._store_pdu_content_hash_txn( + cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64) + ) + + except Exception: + print("FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus) - except: - print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus def main(): conn = sqlite3.connect(sys.argv[1]) @@ -65,5 +78,6 @@ def main(): select_pdus(cursor) conn.commit() -if __name__=='__main__': + +if __name__ == '__main__': main() diff --git a/scripts-dev/list_url_patterns.py b/scripts-dev/list_url_patterns.py index 58d40c4ff4..da027be26e 100755 --- a/scripts-dev/list_url_patterns.py +++ b/scripts-dev/list_url_patterns.py @@ -1,18 +1,17 @@ #! /usr/bin/python -import ast import argparse +import ast import os import sys + import yaml PATTERNS_V1 = [] PATTERNS_V2 = [] -RESULT = { - "v1": PATTERNS_V1, - "v2": PATTERNS_V2, -} +RESULT = {"v1": PATTERNS_V1, "v2": PATTERNS_V2} + class CallVisitor(ast.NodeVisitor): def visit_Call(self, node): @@ -21,7 +20,6 @@ class CallVisitor(ast.NodeVisitor): else: return - if name == "client_path_patterns": PATTERNS_V1.append(node.args[0].s) elif name == "client_v2_patterns": @@ -42,8 +40,10 @@ def find_patterns_in_file(filepath): parser = argparse.ArgumentParser(description='Find url patterns.') parser.add_argument( - "directories", nargs='+', metavar="DIR", - help="Directories to search for definitions" + "directories", + nargs='+', + metavar="DIR", + help="Directories to search for definitions", ) args = parser.parse_args() diff --git a/scripts-dev/tail-synapse.py b/scripts-dev/tail-synapse.py index 18be711e92..1a36b94038 100644 --- a/scripts-dev/tail-synapse.py +++ b/scripts-dev/tail-synapse.py @@ -1,8 +1,9 @@ -import requests import collections +import json import sys import time -import json + +import requests Entry = collections.namedtuple("Entry", "name position rows") @@ -30,11 +31,11 @@ def parse_response(content): def replicate(server, streams): - return parse_response(requests.get( - server + "/_synapse/replication", - verify=False, - params=streams - ).content) + return parse_response( + requests.get( + server + "/_synapse/replication", verify=False, params=streams + ).content + ) def main(): @@ -45,7 +46,7 @@ def main(): try: streams = { row.name: row.position - for row in replicate(server, {"streams":"-1"})["streams"].rows + for row in replicate(server, {"streams": "-1"})["streams"].rows } except requests.exceptions.ConnectionError as e: time.sleep(0.1) @@ -53,8 +54,8 @@ def main(): while True: try: results = replicate(server, streams) - except: - sys.stdout.write("connection_lost("+ repr(streams) + ")\n") + except Exception: + sys.stdout.write("connection_lost(" + repr(streams) + ")\n") break for update in results.values(): for row in update.rows: @@ -62,6 +63,5 @@ def main(): streams[update.name] = update.position - -if __name__=='__main__': +if __name__ == '__main__': main() diff --git a/scripts/hash_password b/scripts/hash_password index 215ab25cfe..a62bb5aa83 100755 --- a/scripts/hash_password +++ b/scripts/hash_password @@ -1,12 +1,10 @@ #!/usr/bin/env python import argparse - +import getpass import sys import bcrypt -import getpass - import yaml bcrypt_rounds=12 @@ -52,4 +50,3 @@ if __name__ == "__main__": password = prompt_for_pass() print bcrypt.hashpw(password + password_pepper, bcrypt.gensalt(bcrypt_rounds)) - diff --git a/scripts/move_remote_media_to_new_store.py b/scripts/move_remote_media_to_new_store.py index 7914ead889..e630936f78 100755 --- a/scripts/move_remote_media_to_new_store.py +++ b/scripts/move_remote_media_to_new_store.py @@ -36,12 +36,9 @@ from __future__ import print_function import argparse import logging - -import sys - import os - import shutil +import sys from synapse.rest.media.v1.filepath import MediaFilePaths @@ -77,24 +74,23 @@ def move_media(origin_server, file_id, src_paths, dest_paths): if not os.path.exists(original_file): logger.warn( "Original for %s/%s (%s) does not exist", - origin_server, file_id, original_file, + origin_server, + file_id, + original_file, ) else: mkdir_and_move( - original_file, - dest_paths.remote_media_filepath(origin_server, file_id), + original_file, dest_paths.remote_media_filepath(origin_server, file_id) ) # now look for thumbnails - original_thumb_dir = src_paths.remote_media_thumbnail_dir( - origin_server, file_id, - ) + original_thumb_dir = src_paths.remote_media_thumbnail_dir(origin_server, file_id) if not os.path.exists(original_thumb_dir): return mkdir_and_move( original_thumb_dir, - dest_paths.remote_media_thumbnail_dir(origin_server, file_id) + dest_paths.remote_media_thumbnail_dir(origin_server, file_id), ) @@ -109,24 +105,16 @@ def mkdir_and_move(original_file, dest_file): if __name__ == "__main__": parser = argparse.ArgumentParser( - description=__doc__, - formatter_class = argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument( - "-v", action='store_true', help='enable debug logging') - parser.add_argument( - "src_repo", - help="Path to source content repo", - ) - parser.add_argument( - "dest_repo", - help="Path to source content repo", + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) + parser.add_argument("-v", action='store_true', help='enable debug logging') + parser.add_argument("src_repo", help="Path to source content repo") + parser.add_argument("dest_repo", help="Path to source content repo") args = parser.parse_args() logging_config = { "level": logging.DEBUG if args.v else logging.INFO, - "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s" + "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s", } logging.basicConfig(**logging_config) diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user index 91bdb3a25b..89143c5d59 100755 --- a/scripts/register_new_matrix_user +++ b/scripts/register_new_matrix_user @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function import argparse import getpass @@ -22,19 +23,23 @@ import hmac import json import sys import urllib2 + +from six import input + import yaml def request_registration(user, password, server_location, shared_secret, admin=False): req = urllib2.Request( "%s/_matrix/client/r0/admin/register" % (server_location,), - headers={'Content-Type': 'application/json'} + headers={'Content-Type': 'application/json'}, ) try: if sys.version_info[:3] >= (2, 7, 9): # As of version 2.7.9, urllib2 now checks SSL certs import ssl + f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) else: f = urllib2.urlopen(req) @@ -42,18 +47,15 @@ def request_registration(user, password, server_location, shared_secret, admin=F f.close() nonce = json.loads(body)["nonce"] except urllib2.HTTPError as e: - print "ERROR! Received %d %s" % (e.code, e.reason,) + print("ERROR! Received %d %s" % (e.code, e.reason)) if 400 <= e.code < 500: if e.info().type == "application/json": resp = json.load(e) if "error" in resp: - print resp["error"] + print(resp["error"]) sys.exit(1) - mac = hmac.new( - key=shared_secret, - digestmod=hashlib.sha1, - ) + mac = hmac.new(key=shared_secret, digestmod=hashlib.sha1) mac.update(nonce) mac.update("\x00") @@ -75,30 +77,31 @@ def request_registration(user, password, server_location, shared_secret, admin=F server_location = server_location.rstrip("/") - print "Sending registration request..." + print("Sending registration request...") req = urllib2.Request( "%s/_matrix/client/r0/admin/register" % (server_location,), data=json.dumps(data), - headers={'Content-Type': 'application/json'} + headers={'Content-Type': 'application/json'}, ) try: if sys.version_info[:3] >= (2, 7, 9): # As of version 2.7.9, urllib2 now checks SSL certs import ssl + f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) else: f = urllib2.urlopen(req) f.read() f.close() - print "Success." + print("Success.") except urllib2.HTTPError as e: - print "ERROR! Received %d %s" % (e.code, e.reason,) + print("ERROR! Received %d %s" % (e.code, e.reason)) if 400 <= e.code < 500: if e.info().type == "application/json": resp = json.load(e) if "error" in resp: - print resp["error"] + print(resp["error"]) sys.exit(1) @@ -106,35 +109,35 @@ def register_new_user(user, password, server_location, shared_secret, admin): if not user: try: default_user = getpass.getuser() - except: + except Exception: default_user = None if default_user: - user = raw_input("New user localpart [%s]: " % (default_user,)) + user = input("New user localpart [%s]: " % (default_user,)) if not user: user = default_user else: - user = raw_input("New user localpart: ") + user = input("New user localpart: ") if not user: - print "Invalid user name" + print("Invalid user name") sys.exit(1) if not password: password = getpass.getpass("Password: ") if not password: - print "Password cannot be blank." + print("Password cannot be blank.") sys.exit(1) confirm_password = getpass.getpass("Confirm password: ") if password != confirm_password: - print "Passwords do not match" + print("Passwords do not match") sys.exit(1) if admin is None: - admin = raw_input("Make admin [no]: ") + admin = input("Make admin [no]: ") if admin in ("y", "yes", "true"): admin = True else: @@ -146,42 +149,51 @@ def register_new_user(user, password, server_location, shared_secret, admin): if __name__ == "__main__": parser = argparse.ArgumentParser( description="Used to register new users with a given home server when" - " registration has been disabled. The home server must be" - " configured with the 'registration_shared_secret' option" - " set.", + " registration has been disabled. The home server must be" + " configured with the 'registration_shared_secret' option" + " set." ) parser.add_argument( - "-u", "--user", + "-u", + "--user", default=None, help="Local part of the new user. Will prompt if omitted.", ) parser.add_argument( - "-p", "--password", + "-p", + "--password", default=None, help="New password for user. Will prompt if omitted.", ) admin_group = parser.add_mutually_exclusive_group() admin_group.add_argument( - "-a", "--admin", + "-a", + "--admin", action="store_true", - help="Register new user as an admin. Will prompt if --no-admin is not set either.", + help=( + "Register new user as an admin. " + "Will prompt if --no-admin is not set either." + ), ) admin_group.add_argument( "--no-admin", action="store_true", - help="Register new user as a regular user. Will prompt if --admin is not set either.", + help=( + "Register new user as a regular user. " + "Will prompt if --admin is not set either." + ), ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( - "-c", "--config", + "-c", + "--config", type=argparse.FileType('r'), help="Path to server config file. Used to read in shared secret.", ) group.add_argument( - "-k", "--shared-secret", - help="Shared secret as defined in server config file.", + "-k", "--shared-secret", help="Shared secret as defined in server config file." ) parser.add_argument( @@ -189,7 +201,7 @@ if __name__ == "__main__": default="https://localhost:8448", nargs='?', help="URL to use to talk to the home server. Defaults to " - " 'https://localhost:8448'.", + " 'https://localhost:8448'.", ) args = parser.parse_args() @@ -198,7 +210,7 @@ if __name__ == "__main__": config = yaml.safe_load(args.config) secret = config.get("registration_shared_secret", None) if not secret: - print "No 'registration_shared_secret' defined in config." + print("No 'registration_shared_secret' defined in config.") sys.exit(1) else: secret = args.shared_secret diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index b9b828c154..2f6e69e552 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -15,23 +15,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.enterprise import adbapi - -from synapse.storage._base import LoggingTransaction, SQLBaseStore -from synapse.storage.engines import create_engine -from synapse.storage.prepare_database import prepare_database - import argparse import curses import logging import sys import time import traceback -import yaml from six import string_types +import yaml + +from twisted.enterprise import adbapi +from twisted.internet import defer, reactor + +from synapse.storage._base import LoggingTransaction, SQLBaseStore +from synapse.storage.engines import create_engine +from synapse.storage.prepare_database import prepare_database logger = logging.getLogger("synapse_port_db") @@ -105,6 +105,7 @@ class Store(object): *All* database interactions should go through this object. """ + def __init__(self, db_pool, engine): self.db_pool = db_pool self.database_engine = engine @@ -135,7 +136,8 @@ class Store(object): txn = conn.cursor() return func( LoggingTransaction(txn, desc, self.database_engine, [], []), - *args, **kwargs + *args, + **kwargs ) except self.database_engine.module.DatabaseError as e: if self.database_engine.is_deadlock(e): @@ -158,22 +160,20 @@ class Store(object): def r(txn): txn.execute(sql, args) return txn.fetchall() + return self.runInteraction("execute_sql", r) def insert_many_txn(self, txn, table, headers, rows): sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, ", ".join(k for k in headers), - ", ".join("%s" for _ in headers) + ", ".join("%s" for _ in headers), ) try: txn.executemany(sql, rows) - except: - logger.exception( - "Failed to insert: %s", - table, - ) + except Exception: + logger.exception("Failed to insert: %s", table) raise @@ -206,7 +206,7 @@ class Porter(object): "table_name": table, "forward_rowid": 1, "backward_rowid": 0, - } + }, ) forward_chunk = 1 @@ -221,10 +221,10 @@ class Porter(object): table, forward_chunk, backward_chunk ) else: + def delete_all(txn): txn.execute( - "DELETE FROM port_from_sqlite3 WHERE table_name = %s", - (table,) + "DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,) ) txn.execute("TRUNCATE %s CASCADE" % (table,)) @@ -232,11 +232,7 @@ class Porter(object): yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={ - "table_name": table, - "forward_rowid": 1, - "backward_rowid": 0, - } + values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0}, ) forward_chunk = 1 @@ -251,12 +247,16 @@ class Porter(object): ) @defer.inlineCallbacks - def handle_table(self, table, postgres_size, table_size, forward_chunk, - backward_chunk): + def handle_table( + self, table, postgres_size, table_size, forward_chunk, backward_chunk + ): logger.info( "Table %s: %i/%i (rows %i-%i) already ported", - table, postgres_size, table_size, - backward_chunk+1, forward_chunk-1, + table, + postgres_size, + table_size, + backward_chunk + 1, + forward_chunk - 1, ) if not table_size: @@ -271,7 +271,9 @@ class Porter(object): return if table in ( - "user_directory", "user_directory_search", "users_who_share_rooms", + "user_directory", + "user_directory_search", + "users_who_share_rooms", "users_in_pubic_room", ): # We don't port these tables, as they're a faff and we can regenreate @@ -283,37 +285,35 @@ class Porter(object): # We need to make sure there is a single row, `(X, null), as that is # what synapse expects to be there. yield self.postgres_store._simple_insert( - table=table, - values={"stream_id": None}, + table=table, values={"stream_id": None} ) self.progress.update(table, table_size) # Mark table as done return forward_select = ( - "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" - % (table,) + "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) ) backward_select = ( - "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" - % (table,) + "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" % (table,) ) do_forward = [True] do_backward = [True] while True: + def r(txn): forward_rows = [] backward_rows = [] if do_forward[0]: - txn.execute(forward_select, (forward_chunk, self.batch_size,)) + txn.execute(forward_select, (forward_chunk, self.batch_size)) forward_rows = txn.fetchall() if not forward_rows: do_forward[0] = False if do_backward[0]: - txn.execute(backward_select, (backward_chunk, self.batch_size,)) + txn.execute(backward_select, (backward_chunk, self.batch_size)) backward_rows = txn.fetchall() if not backward_rows: do_backward[0] = False @@ -325,9 +325,7 @@ class Porter(object): return headers, forward_rows, backward_rows - headers, frows, brows = yield self.sqlite_store.runInteraction( - "select", r - ) + headers, frows, brows = yield self.sqlite_store.runInteraction("select", r) if frows or brows: if frows: @@ -339,9 +337,7 @@ class Porter(object): rows = self._convert_rows(table, headers, rows) def insert(txn): - self.postgres_store.insert_many_txn( - txn, table, headers[1:], rows - ) + self.postgres_store.insert_many_txn(txn, table, headers[1:], rows) self.postgres_store._simple_update_one_txn( txn, @@ -362,8 +358,9 @@ class Porter(object): return @defer.inlineCallbacks - def handle_search_table(self, postgres_size, table_size, forward_chunk, - backward_chunk): + def handle_search_table( + self, postgres_size, table_size, forward_chunk, backward_chunk + ): select = ( "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" " FROM event_search as es" @@ -373,8 +370,9 @@ class Porter(object): ) while True: + def r(txn): - txn.execute(select, (forward_chunk, self.batch_size,)) + txn.execute(select, (forward_chunk, self.batch_size)) rows = txn.fetchall() headers = [column[0] for column in txn.description] @@ -402,18 +400,21 @@ class Porter(object): else: rows_dict.append(d) - txn.executemany(sql, [ - ( - row["event_id"], - row["room_id"], - row["key"], - row["sender"], - row["value"], - row["origin_server_ts"], - row["stream_ordering"], - ) - for row in rows_dict - ]) + txn.executemany( + sql, + [ + ( + row["event_id"], + row["room_id"], + row["key"], + row["sender"], + row["value"], + row["origin_server_ts"], + row["stream_ordering"], + ) + for row in rows_dict + ], + ) self.postgres_store._simple_update_one_txn( txn, @@ -437,7 +438,8 @@ class Porter(object): def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{ - k: v for k, v in db_config.get("args", {}).items() + k: v + for k, v in db_config.get("args", {}).items() if not k.startswith("cp_") } ) @@ -450,13 +452,11 @@ class Porter(object): def run(self): try: sqlite_db_pool = adbapi.ConnectionPool( - self.sqlite_config["name"], - **self.sqlite_config["args"] + self.sqlite_config["name"], **self.sqlite_config["args"] ) postgres_db_pool = adbapi.ConnectionPool( - self.postgres_config["name"], - **self.postgres_config["args"] + self.postgres_config["name"], **self.postgres_config["args"] ) sqlite_engine = create_engine(sqlite_config) @@ -465,9 +465,7 @@ class Porter(object): self.sqlite_store = Store(sqlite_db_pool, sqlite_engine) self.postgres_store = Store(postgres_db_pool, postgres_engine) - yield self.postgres_store.execute( - postgres_engine.check_database - ) + yield self.postgres_store.execute(postgres_engine.check_database) # Step 1. Set up databases. self.progress.set_state("Preparing SQLite3") @@ -477,6 +475,7 @@ class Porter(object): self.setup_db(postgres_config, postgres_engine) self.progress.set_state("Creating port tables") + def create_port_table(txn): txn.execute( "CREATE TABLE IF NOT EXISTS port_from_sqlite3 (" @@ -501,9 +500,7 @@ class Porter(object): ) try: - yield self.postgres_store.runInteraction( - "alter_table", alter_table - ) + yield self.postgres_store.runInteraction("alter_table", alter_table) except Exception as e: pass @@ -514,11 +511,7 @@ class Porter(object): # Step 2. Get tables. self.progress.set_state("Fetching tables") sqlite_tables = yield self.sqlite_store._simple_select_onecol( - table="sqlite_master", - keyvalues={ - "type": "table", - }, - retcol="name", + table="sqlite_master", keyvalues={"type": "table"}, retcol="name" ) postgres_tables = yield self.postgres_store._simple_select_onecol( @@ -545,18 +538,14 @@ class Porter(object): # Step 4. Do the copying. self.progress.set_state("Copying to postgres") yield defer.gatherResults( - [ - self.handle_table(*res) - for res in setup_res - ], - consumeErrors=True, + [self.handle_table(*res) for res in setup_res], consumeErrors=True ) # Step 5. Do final post-processing yield self._setup_state_group_id_seq() self.progress.done() - except: + except Exception: global end_error_exec_info end_error_exec_info = sys.exc_info() logger.exception("") @@ -566,9 +555,7 @@ class Porter(object): def _convert_rows(self, table, headers, rows): bool_col_names = BOOLEAN_COLUMNS.get(table, []) - bool_cols = [ - i for i, h in enumerate(headers) if h in bool_col_names - ] + bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names] class BadValueException(Exception): pass @@ -577,18 +564,21 @@ class Porter(object): if j in bool_cols: return bool(col) elif isinstance(col, string_types) and "\0" in col: - logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col) - raise BadValueException(); + logger.warn( + "DROPPING ROW: NUL value in table %s col %s: %r", + table, + headers[j], + col, + ) + raise BadValueException() return col outrows = [] for i, row in enumerate(rows): try: - outrows.append(tuple( - conv(j, col) - for j, col in enumerate(row) - if j > 0 - )) + outrows.append( + tuple(conv(j, col) for j, col in enumerate(row) if j > 0) + ) except BadValueException: pass @@ -616,9 +606,7 @@ class Porter(object): return headers, [r for r in rows if r[ts_ind] < yesterday] - headers, rows = yield self.sqlite_store.runInteraction( - "select", r, - ) + headers, rows = yield self.sqlite_store.runInteraction("select", r) rows = self._convert_rows("sent_transactions", headers, rows) @@ -639,7 +627,7 @@ class Porter(object): txn.execute( "SELECT rowid FROM sent_transactions WHERE ts >= ?" " ORDER BY rowid ASC LIMIT 1", - (yesterday,) + (yesterday,), ) rows = txn.fetchall() @@ -657,21 +645,17 @@ class Porter(object): "table_name": "sent_transactions", "forward_rowid": next_chunk, "backward_rowid": 0, - } + }, ) def get_sent_table_size(txn): txn.execute( - "SELECT count(*) FROM sent_transactions" - " WHERE ts >= ?", - (yesterday,) + "SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,) ) size, = txn.fetchone() return int(size) - remaining_count = yield self.sqlite_store.execute( - get_sent_table_size - ) + remaining_count = yield self.sqlite_store.execute(get_sent_table_size) total_count = remaining_count + inserted_rows @@ -680,13 +664,11 @@ class Porter(object): @defer.inlineCallbacks def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): frows = yield self.sqlite_store.execute_sql( - "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), - forward_chunk, + "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk ) brows = yield self.sqlite_store.execute_sql( - "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), - backward_chunk, + "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk ) defer.returnValue(frows[0][0] + brows[0][0]) @@ -694,7 +676,7 @@ class Porter(object): @defer.inlineCallbacks def _get_already_ported_count(self, table): rows = yield self.postgres_store.execute_sql( - "SELECT count(*) FROM %s" % (table,), + "SELECT count(*) FROM %s" % (table,) ) defer.returnValue(rows[0][0]) @@ -717,22 +699,21 @@ class Porter(object): def _setup_state_group_id_seq(self): def r(txn): txn.execute("SELECT MAX(id) FROM state_groups") - next_id = txn.fetchone()[0]+1 - txn.execute( - "ALTER SEQUENCE state_group_id_seq RESTART WITH %s", - (next_id,), - ) + next_id = txn.fetchone()[0] + 1 + txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,)) + return self.postgres_store.runInteraction("setup_state_group_id_seq", r) ############################################## -###### The following is simply UI stuff ###### +# The following is simply UI stuff ############################################## class Progress(object): """Used to report progress of the port """ + def __init__(self): self.tables = {} @@ -758,6 +739,7 @@ class Progress(object): class CursesProgress(Progress): """Reports progress to a curses window """ + def __init__(self, stdscr): self.stdscr = stdscr @@ -801,7 +783,7 @@ class CursesProgress(Progress): duration = int(now) - int(self.start_time) minutes, seconds = divmod(duration, 60) - duration_str = '%02dm %02ds' % (minutes, seconds,) + duration_str = '%02dm %02ds' % (minutes, seconds) if self.finished: status = "Time spent: %s (Done!)" % (duration_str,) @@ -814,16 +796,12 @@ class CursesProgress(Progress): est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60) else: est_remaining_str = "Unknown" - status = ( - "Time spent: %s (est. remaining: %s)" - % (duration_str, est_remaining_str,) + status = "Time spent: %s (est. remaining: %s)" % ( + duration_str, + est_remaining_str, ) - self.stdscr.addstr( - 0, 0, - status, - curses.A_BOLD, - ) + self.stdscr.addstr(0, 0, status, curses.A_BOLD) max_len = max([len(t) for t in self.tables.keys()]) @@ -831,9 +809,7 @@ class CursesProgress(Progress): middle_space = 1 items = self.tables.items() - items.sort( - key=lambda i: (i[1]["perc"], i[0]), - ) + items.sort(key=lambda i: (i[1]["perc"], i[0])) for i, (table, data) in enumerate(items): if i + 2 >= rows: @@ -844,9 +820,7 @@ class CursesProgress(Progress): color = curses.color_pair(2) if perc == 100 else curses.color_pair(1) self.stdscr.addstr( - i + 2, left_margin + max_len - len(table), - table, - curses.A_BOLD | color, + i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color ) size = 20 @@ -857,15 +831,13 @@ class CursesProgress(Progress): ) self.stdscr.addstr( - i + 2, left_margin + max_len + middle_space, + i + 2, + left_margin + max_len + middle_space, "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]), ) if self.finished: - self.stdscr.addstr( - rows - 1, 0, - "Press any key to exit...", - ) + self.stdscr.addstr(rows - 1, 0, "Press any key to exit...") self.stdscr.refresh() self.last_update = time.time() @@ -877,29 +849,25 @@ class CursesProgress(Progress): def set_state(self, state): self.stdscr.clear() - self.stdscr.addstr( - 0, 0, - state + "...", - curses.A_BOLD, - ) + self.stdscr.addstr(0, 0, state + "...", curses.A_BOLD) self.stdscr.refresh() class TerminalProgress(Progress): """Just prints progress to the terminal """ + def update(self, table, num_done): super(TerminalProgress, self).update(table, num_done) data = self.tables[table] - print "%s: %d%% (%d/%d)" % ( - table, data["perc"], - data["num_done"], data["total"], + print( + "%s: %d%% (%d/%d)" % (table, data["perc"], data["num_done"], data["total"]) ) def set_state(self, state): - print state + "..." + print(state + "...") ############################################## @@ -909,34 +877,38 @@ class TerminalProgress(Progress): if __name__ == "__main__": parser = argparse.ArgumentParser( description="A script to port an existing synapse SQLite database to" - " a new PostgreSQL database." + " a new PostgreSQL database." ) parser.add_argument("-v", action='store_true') parser.add_argument( - "--sqlite-database", required=True, + "--sqlite-database", + required=True, help="The snapshot of the SQLite database file. This must not be" - " currently used by a running synapse server" + " currently used by a running synapse server", ) parser.add_argument( - "--postgres-config", type=argparse.FileType('r'), required=True, - help="The database config file for the PostgreSQL database" + "--postgres-config", + type=argparse.FileType('r'), + required=True, + help="The database config file for the PostgreSQL database", ) parser.add_argument( - "--curses", action='store_true', - help="display a curses based progress UI" + "--curses", action='store_true', help="display a curses based progress UI" ) parser.add_argument( - "--batch-size", type=int, default=1000, + "--batch-size", + type=int, + default=1000, help="The number of rows to select from the SQLite table each" - " iteration [default=1000]", + " iteration [default=1000]", ) args = parser.parse_args() logging_config = { "level": logging.DEBUG if args.v else logging.INFO, - "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s" + "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s", } if args.curses: diff --git a/setup.py b/setup.py index b00c2af367..00b69c43f5 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,8 @@ #!/usr/bin/env python -# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2014-2017 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd +# Copyright 2017-2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -86,7 +88,7 @@ setup( name="matrix-synapse", version=version, packages=find_packages(exclude=["tests", "tests.*"]), - description="Reference Synapse Home Server", + description="Reference homeserver for the Matrix decentralised comms protocol", install_requires=dependencies['requirements'](include_conditional=True).keys(), dependency_links=dependencies["DEPENDENCY_LINKS"].values(), include_package_data=True, diff --git a/synapse/__init__.py b/synapse/__init__.py index 43c5821ade..1ddbbbebfb 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.33.6" +__version__ = "0.33.7" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 2e7f98404d..48b903374d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -59,6 +59,7 @@ class Codes(object): RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED" UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION" INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION" + WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION" class CodeMessageException(RuntimeError): @@ -312,6 +313,20 @@ class LimitExceededError(SynapseError): ) +class RoomKeysVersionError(SynapseError): + """A client has tried to upload to a non-current version of the room_keys store + """ + def __init__(self, current_version): + """ + Args: + current_version (str): the current version of the store they should have used + """ + super(RoomKeysVersionError, self).__init__( + 403, "Wrong room_keys version", Codes.WRONG_ROOM_KEYS_VERSION + ) + self.current_version = current_version + + class IncompatibleRoomVersionError(SynapseError): """A server is trying to join a room whose version it does not support.""" diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index fc4b25de1c..f5c61dec5b 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet): "Authorization": auth_headers, } result = yield self.http_client.get_json( - self.main_uri + request.uri, + self.main_uri + request.uri.decode('ascii'), headers=headers, ) defer.returnValue((200, result)) @@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet): "Authorization": auth_headers, } result = yield self.http_client.post_json_get_json( - self.main_uri + request.uri, + self.main_uri + request.uri.decode('ascii'), body, headers=headers, ) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e3f0d99a3f..0b85b377e3 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,7 @@ import sys from six import iteritems +import psutil from prometheus_client import Gauge from twisted.application import service @@ -502,7 +503,6 @@ def run(hs): def performance_stats_init(): try: - import psutil process = psutil.Process() # Ensure we can fetch both, and make the initial request for cpu_percent # so the next request will use this as the initial point. @@ -510,12 +510,9 @@ def run(hs): process.cpu_percent(interval=None) logger.info("report_stats can use psutil") stats_process.append(process) - except (ImportError, AttributeError): - logger.warn( - "report_stats enabled but psutil is not installed or incorrect version." - " Disabling reporting of memory/cpu stats." - " Ensuring psutil is available will help matrix.org track performance" - " changes across releases." + except (AttributeError): + logger.warning( + "Unable to read memory/cpu stats. Disabling reporting." ) def generate_user_daily_visit_stats(): @@ -530,10 +527,13 @@ def run(hs): clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) # monthly active user limiting functionality - clock.looping_call( - hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60 - ) - hs.get_datastore().reap_monthly_active_users() + def reap_monthly_active_users(): + return run_as_background_process( + "reap_monthly_active_users", + hs.get_datastore().reap_monthly_active_users, + ) + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) + reap_monthly_active_users() @defer.inlineCallbacks def generate_monthly_active_users(): @@ -547,12 +547,23 @@ def run(hs): registered_reserved_users_mau_gauge.set(float(reserved_count)) max_mau_gauge.set(float(hs.config.max_mau_value)) - hs.get_datastore().initialise_reserved_users( - hs.config.mau_limits_reserved_threepids + def start_generate_monthly_active_users(): + return run_as_background_process( + "generate_monthly_active_users", + generate_monthly_active_users, + ) + + # XXX is this really supposed to be a background process? it looks + # like it needs to complete before some of the other stuff runs. + run_as_background_process( + "initialise_reserved_users", + hs.get_datastore().initialise_reserved_users, + hs.config.mau_limits_reserved_threepids, ) - generate_monthly_active_users() + + start_generate_monthly_active_users() if hs.config.limit_usage_by_mau: - clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings if hs.config.report_stats: @@ -568,7 +579,7 @@ def run(hs): clock.call_later(5 * 60, start_phone_stats_home) if hs.config.daemonize and hs.config.print_pidfile: - print (hs.config.pid_file) + print(hs.config.pid_file) _base.start_reactor( "synapse-homeserver", diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py index 8fccf573ee..79fe9c3dac 100644 --- a/synapse/config/__main__.py +++ b/synapse/config/__main__.py @@ -28,7 +28,7 @@ if __name__ == "__main__": sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) - print (getattr(config, key)) + print(getattr(config, key)) sys.exit(0) else: sys.stderr.write("Unknown command %r\n" % (action,)) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 3d2e90dd5b..14dae65ea0 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -106,10 +106,7 @@ class Config(object): @classmethod def check_file(cls, file_path, config_name): if file_path is None: - raise ConfigError( - "Missing config for %s." - % (config_name,) - ) + raise ConfigError("Missing config for %s." % (config_name,)) try: os.stat(file_path) except OSError as e: @@ -128,9 +125,7 @@ class Config(object): if e.errno != errno.EEXIST: raise if not os.path.isdir(dir_path): - raise ConfigError( - "%s is not a directory" % (dir_path,) - ) + raise ConfigError("%s is not a directory" % (dir_path,)) return dir_path @classmethod @@ -156,21 +151,20 @@ class Config(object): return results def generate_config( - self, - config_dir_path, - server_name, - is_generating_file, - report_stats=None, + self, config_dir_path, server_name, is_generating_file, report_stats=None ): default_config = "# vim:ft=yaml\n" - default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all( - "default_config", - config_dir_path=config_dir_path, - server_name=server_name, - is_generating_file=is_generating_file, - report_stats=report_stats, - )) + default_config += "\n\n".join( + dedent(conf) + for conf in self.invoke_all( + "default_config", + config_dir_path=config_dir_path, + server_name=server_name, + is_generating_file=is_generating_file, + report_stats=report_stats, + ) + ) config = yaml.load(default_config) @@ -178,23 +172,22 @@ class Config(object): @classmethod def load_config(cls, description, argv): - config_parser = argparse.ArgumentParser( - description=description, - ) + config_parser = argparse.ArgumentParser(description=description) config_parser.add_argument( - "-c", "--config-path", + "-c", + "--config-path", action="append", metavar="CONFIG_FILE", help="Specify config file. Can be given multiple times and" - " may specify directories containing *.yaml files." + " may specify directories containing *.yaml files.", ) config_parser.add_argument( "--keys-directory", metavar="DIRECTORY", help="Where files such as certs and signing keys are stored when" - " their location is given explicitly in the config." - " Defaults to the directory containing the last config file", + " their location is given explicitly in the config." + " Defaults to the directory containing the last config file", ) config_args = config_parser.parse_args(argv) @@ -203,9 +196,7 @@ class Config(object): obj = cls() obj.read_config_files( - config_files, - keys_directory=config_args.keys_directory, - generate_keys=False, + config_files, keys_directory=config_args.keys_directory, generate_keys=False ) return obj @@ -213,38 +204,38 @@ class Config(object): def load_or_generate_config(cls, description, argv): config_parser = argparse.ArgumentParser(add_help=False) config_parser.add_argument( - "-c", "--config-path", + "-c", + "--config-path", action="append", metavar="CONFIG_FILE", help="Specify config file. Can be given multiple times and" - " may specify directories containing *.yaml files." + " may specify directories containing *.yaml files.", ) config_parser.add_argument( "--generate-config", action="store_true", - help="Generate a config file for the server name" + help="Generate a config file for the server name", ) config_parser.add_argument( "--report-stats", action="store", help="Whether the generated config reports anonymized usage statistics", - choices=["yes", "no"] + choices=["yes", "no"], ) config_parser.add_argument( "--generate-keys", action="store_true", - help="Generate any missing key files then exit" + help="Generate any missing key files then exit", ) config_parser.add_argument( "--keys-directory", metavar="DIRECTORY", help="Used with 'generate-*' options to specify where files such as" - " certs and signing keys should be stored in, unless explicitly" - " specified in the config." + " certs and signing keys should be stored in, unless explicitly" + " specified in the config.", ) config_parser.add_argument( - "-H", "--server-name", - help="The server name to generate a config file for" + "-H", "--server-name", help="The server name to generate a config file for" ) config_args, remaining_args = config_parser.parse_known_args(argv) @@ -257,8 +248,8 @@ class Config(object): if config_args.generate_config: if config_args.report_stats is None: config_parser.error( - "Please specify either --report-stats=yes or --report-stats=no\n\n" + - MISSING_REPORT_STATS_SPIEL + "Please specify either --report-stats=yes or --report-stats=no\n\n" + + MISSING_REPORT_STATS_SPIEL ) if not config_files: config_parser.error( @@ -287,26 +278,32 @@ class Config(object): config_dir_path=config_dir_path, server_name=server_name, report_stats=(config_args.report_stats == "yes"), - is_generating_file=True + is_generating_file=True, ) obj.invoke_all("generate_files", config) config_file.write(config_str) - print(( - "A config file has been generated in %r for server name" - " %r with corresponding SSL keys and self-signed" - " certificates. Please review this file and customise it" - " to your needs." - ) % (config_path, server_name)) + print( + ( + "A config file has been generated in %r for server name" + " %r with corresponding SSL keys and self-signed" + " certificates. Please review this file and customise it" + " to your needs." + ) + % (config_path, server_name) + ) print( "If this server name is incorrect, you will need to" " regenerate the SSL certificates" ) return else: - print(( - "Config file %r already exists. Generating any missing key" - " files." - ) % (config_path,)) + print( + ( + "Config file %r already exists. Generating any missing key" + " files." + ) + % (config_path,) + ) generate_keys = True parser = argparse.ArgumentParser( @@ -338,8 +335,7 @@ class Config(object): return obj - def read_config_files(self, config_files, keys_directory=None, - generate_keys=False): + def read_config_files(self, config_files, keys_directory=None, generate_keys=False): if not keys_directory: keys_directory = os.path.dirname(config_files[-1]) @@ -364,8 +360,9 @@ class Config(object): if "report_stats" not in config: raise ConfigError( - MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" + - MISSING_REPORT_STATS_SPIEL + MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + + "\n" + + MISSING_REPORT_STATS_SPIEL ) if generate_keys: @@ -399,16 +396,16 @@ def find_config_files(search_paths): for entry in os.listdir(config_path): entry_path = os.path.join(config_path, entry) if not os.path.isfile(entry_path): - print ( - "Found subdirectory in config directory: %r. IGNORING." - ) % (entry_path, ) + err = "Found subdirectory in config directory: %r. IGNORING." + print(err % (entry_path,)) continue if not entry.endswith(".yaml"): - print ( - "Found file in config directory that does not" - " end in '.yaml': %r. IGNORING." - ) % (entry_path, ) + err = ( + "Found file in config directory that does not end in " + "'.yaml': %r. IGNORING." + ) + print(err % (entry_path,)) continue files.append(entry_path) diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index fe156b6930..93d70cff14 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -13,10 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function + # This file can't be called email.py because if it is, we cannot: import email.utils +import logging +import os + +import pkg_resources -from ._base import Config +from ._base import Config, ConfigError + +logger = logging.getLogger(__name__) class EmailConfig(Config): @@ -38,7 +46,6 @@ class EmailConfig(Config): "smtp_host", "smtp_port", "notif_from", - "template_dir", "notif_template_html", "notif_template_text", ] @@ -62,9 +69,26 @@ class EmailConfig(Config): self.email_smtp_host = email_config["smtp_host"] self.email_smtp_port = email_config["smtp_port"] self.email_notif_from = email_config["notif_from"] - self.email_template_dir = email_config["template_dir"] self.email_notif_template_html = email_config["notif_template_html"] self.email_notif_template_text = email_config["notif_template_text"] + + template_dir = email_config.get("template_dir") + # we need an absolute path, because we change directory after starting (and + # we don't yet know what auxilliary templates like mail.css we will need). + # (Note that loading as package_resources with jinja.PackageLoader doesn't + # work for the same reason.) + if not template_dir: + template_dir = pkg_resources.resource_filename( + 'synapse', 'res/templates' + ) + template_dir = os.path.abspath(template_dir) + + for f in self.email_notif_template_text, self.email_notif_template_html: + p = os.path.join(template_dir, f) + if not os.path.isfile(p): + raise ConfigError("Unable to find email template file %s" % (p, )) + self.email_template_dir = template_dir + self.email_notif_for_new_users = email_config.get( "notif_for_new_users", True ) @@ -113,7 +137,9 @@ class EmailConfig(Config): # require_transport_security: False # notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>" # app_name: Matrix - # template_dir: res/templates + # # if template_dir is unset, uses the example templates that are part of + # # the Synapse distribution. + # #template_dir: res/templates # notif_template_html: notif_mail.html # notif_template_text: notif_mail.txt # notif_for_new_users: True diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 819e8f7331..4efe95faa4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -507,19 +507,19 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): + latest_events, limit): with (yield self._server_linearizer.queue((origin, room_id))): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) logger.info( "on_get_missing_events: earliest_events: %r, latest_events: %r," - " limit: %d, min_depth: %d", - earliest_events, latest_events, limit, min_depth + " limit: %d", + earliest_events, latest_events, limit, ) missing_events = yield self.handler.on_get_missing_events( - origin, room_id, earliest_events, latest_events, limit, min_depth + origin, room_id, earliest_events, latest_events, limit, ) if len(missing_events) < 5: diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 98b5950800..3fdd63be95 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -633,14 +633,6 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) except HttpResponseException as e: code = e.code response = e.response @@ -657,19 +649,24 @@ class TransactionQueue(object): destination, txn_id, code ) - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( transaction, code, response ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) - if code != 200: + if code == 200: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, txn_id, e_id, r, + ) + else: for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination + logger.warn( + "TX [%s] {%s} Failed to send event %s", + destination, txn_id, p.event_id, ) success = False diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2ab973d6c8..edba5a9808 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -143,9 +143,17 @@ class TransportLayerClient(object): transaction (Transaction) Returns: - Deferred: Results of the deferred is a tuple in the form of - (response_code, response_body) where the response_body is a - python dict decoded from json + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. + + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ logger.debug( "send_data dest=%s, txid=%s", @@ -170,11 +178,6 @@ class TransportLayerClient(object): backoff_on_404=True, # If we get a 404 the other side has gone ) - logger.debug( - "send_data dest=%s, txid=%s, got response: 200", - transaction.destination, transaction.transaction_id, - ) - defer.returnValue(response) @defer.inlineCallbacks diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2f874b4838..7288d49074 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -560,7 +560,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): limit = int(content.get("limit", 10)) - min_depth = int(content.get("min_depth", 0)) earliest_events = content.get("earliest_events", []) latest_events = content.get("latest_events", []) @@ -569,7 +568,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, - min_depth=min_depth, limit=limit, ) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 2a5eab124f..329e3c7d71 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -22,7 +22,7 @@ import bcrypt import pymacaroons from canonicaljson import json -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.client import PartialDownloadError import synapse.util.stringutils as stringutils @@ -37,8 +37,8 @@ from synapse.api.errors import ( ) from synapse.module_api import ModuleApi from synapse.types import UserID +from synapse.util import logcontext from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable from ._base import BaseHandler @@ -884,11 +884,7 @@ class AuthHandler(BaseHandler): bcrypt.gensalt(self.bcrypt_rounds), ).decode('ascii') - return make_deferred_yieldable( - threads.deferToThreadPool( - self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash - ), - ) + return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -913,13 +909,7 @@ class AuthHandler(BaseHandler): if not isinstance(stored_hash, bytes): stored_hash = stored_hash.encode('ascii') - return make_deferred_yieldable( - threads.deferToThreadPool( - self.hs.get_reactor(), - self.hs.get_reactor().getThreadPool(), - _do_validate_hash, - ), - ) + return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash) else: return defer.succeed(False) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index b078df4a76..75fe50c42c 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -17,8 +17,8 @@ import logging from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, create_requester -from synapse.util.logcontext import run_in_background from ._base import BaseHandler @@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler): None """ if not self._user_parter_running: - run_in_background(self._user_parter_loop) + run_as_background_process("user_parter_loop", self._user_parter_loop) @defer.inlineCallbacks def _user_parter_loop(self): diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py new file mode 100644 index 0000000000..5edb3cfe04 --- /dev/null +++ b/synapse/handlers/e2e_room_keys.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- +# Copyright 2017, 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError +from synapse.util.async_helpers import Linearizer + +logger = logging.getLogger(__name__) + + +class E2eRoomKeysHandler(object): + """ + Implements an optional realtime backup mechanism for encrypted E2E megolm room keys. + This gives a way for users to store and recover their megolm keys if they lose all + their clients. It should also extend easily to future room key mechanisms. + The actual payload of the encrypted keys is completely opaque to the handler. + """ + + def __init__(self, hs): + self.store = hs.get_datastore() + + # Used to lock whenever a client is uploading key data. This prevents collisions + # between clients trying to upload the details of a new session, given all + # clients belonging to a user will receive and try to upload a new session at + # roughly the same time. Also used to lock out uploads when the key is being + # changed. + self._upload_linearizer = Linearizer("upload_room_keys_lock") + + @defer.inlineCallbacks + def get_room_keys(self, user_id, version, room_id=None, session_id=None): + """Bulk get the E2E room keys for a given backup, optionally filtered to a given + room, or a given session. + See EndToEndRoomKeyStore.get_e2e_room_keys for full details. + + Args: + user_id(str): the user whose keys we're getting + version(str): the version ID of the backup we're getting keys from + room_id(string): room ID to get keys for, for None to get keys for all rooms + session_id(string): session ID to get keys for, for None to get keys for all + sessions + Returns: + A deferred list of dicts giving the session_data and message metadata for + these room keys. + """ + + # we deliberately take the lock to get keys so that changing the version + # works atomically + with (yield self._upload_linearizer.queue(user_id)): + results = yield self.store.get_e2e_room_keys( + user_id, version, room_id, session_id + ) + + if results['rooms'] == {}: + raise SynapseError(404, "No room_keys found") + + defer.returnValue(results) + + @defer.inlineCallbacks + def delete_room_keys(self, user_id, version, room_id=None, session_id=None): + """Bulk delete the E2E room keys for a given backup, optionally filtered to a given + room or a given session. + See EndToEndRoomKeyStore.delete_e2e_room_keys for full details. + + Args: + user_id(str): the user whose backup we're deleting + version(str): the version ID of the backup we're deleting + room_id(string): room ID to delete keys for, for None to delete keys for all + rooms + session_id(string): session ID to delete keys for, for None to delete keys + for all sessions + Returns: + A deferred of the deletion transaction + """ + + # lock for consistency with uploading + with (yield self._upload_linearizer.queue(user_id)): + yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) + + @defer.inlineCallbacks + def upload_room_keys(self, user_id, version, room_keys): + """Bulk upload a list of room keys into a given backup version, asserting + that the given version is the current backup version. room_keys are merged + into the current backup as described in RoomKeysServlet.on_PUT(). + + Args: + user_id(str): the user whose backup we're setting + version(str): the version ID of the backup we're updating + room_keys(dict): a nested dict describing the room_keys we're setting: + + { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } + } + + Raises: + SynapseError: with code 404 if there are no versions defined + RoomKeysVersionError: if the uploaded version is not the current version + """ + + # TODO: Validate the JSON to make sure it has the right keys. + + # XXX: perhaps we should use a finer grained lock here? + with (yield self._upload_linearizer.queue(user_id)): + + # Check that the version we're trying to upload is the current version + try: + version_info = yield self.store.get_e2e_room_keys_version_info(user_id) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Version '%s' not found" % (version,)) + else: + raise + + if version_info['version'] != version: + # Check that the version we're trying to upload actually exists + try: + version_info = yield self.store.get_e2e_room_keys_version_info( + user_id, version, + ) + # if we get this far, the version must exist + raise RoomKeysVersionError(current_version=version_info['version']) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Version '%s' not found" % (version,)) + else: + raise + + # go through the room_keys. + # XXX: this should/could be done concurrently, given we're in a lock. + for room_id, room in iteritems(room_keys['rooms']): + for session_id, session in iteritems(room['sessions']): + yield self._upload_room_key( + user_id, version, room_id, session_id, session + ) + + @defer.inlineCallbacks + def _upload_room_key(self, user_id, version, room_id, session_id, room_key): + """Upload a given room_key for a given room and session into a given + version of the backup. Merges the key with any which might already exist. + + Args: + user_id(str): the user whose backup we're setting + version(str): the version ID of the backup we're updating + room_id(str): the ID of the room whose keys we're setting + session_id(str): the session whose room_key we're setting + room_key(dict): the room_key being set + """ + + # get the room_key for this particular row + current_room_key = None + try: + current_room_key = yield self.store.get_e2e_room_key( + user_id, version, room_id, session_id + ) + except StoreError as e: + if e.code == 404: + pass + else: + raise + + if self._should_replace_room_key(current_room_key, room_key): + yield self.store.set_e2e_room_key( + user_id, version, room_id, session_id, room_key + ) + + @staticmethod + def _should_replace_room_key(current_room_key, room_key): + """ + Determine whether to replace a given current_room_key (if any) + with a newly uploaded room_key backup + + Args: + current_room_key (dict): Optional, the current room_key dict if any + room_key (dict): The new room_key dict which may or may not be fit to + replace the current_room_key + + Returns: + True if current_room_key should be replaced by room_key in the backup + """ + + if current_room_key: + # spelt out with if/elifs rather than nested boolean expressions + # purely for legibility. + + if room_key['is_verified'] and not current_room_key['is_verified']: + return True + elif ( + room_key['first_message_index'] < + current_room_key['first_message_index'] + ): + return True + elif room_key['forwarded_count'] < current_room_key['forwarded_count']: + return True + else: + return False + return True + + @defer.inlineCallbacks + def create_version(self, user_id, version_info): + """Create a new backup version. This automatically becomes the new + backup version for the user's keys; previous backups will no longer be + writeable to. + + Args: + user_id(str): the user whose backup version we're creating + version_info(dict): metadata about the new version being created + + { + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + + Returns: + A deferred of a string that gives the new version number. + """ + + # TODO: Validate the JSON to make sure it has the right keys. + + # lock everyone out until we've switched version + with (yield self._upload_linearizer.queue(user_id)): + new_version = yield self.store.create_e2e_room_keys_version( + user_id, version_info + ) + defer.returnValue(new_version) + + @defer.inlineCallbacks + def get_version_info(self, user_id, version=None): + """Get the info about a given version of the user's backup + + Args: + user_id(str): the user whose current backup version we're querying + version(str): Optional; if None gives the most recent version + otherwise a historical one. + Raises: + StoreError: code 404 if the requested backup version doesn't exist + Returns: + A deferred of a info dict that gives the info about the new version. + + { + "version": "1234", + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + """ + + with (yield self._upload_linearizer.queue(user_id)): + res = yield self.store.get_e2e_room_keys_version_info(user_id, version) + defer.returnValue(res) + + @defer.inlineCallbacks + def delete_version(self, user_id, version=None): + """Deletes a given version of the user's e2e_room_keys backup + + Args: + user_id(str): the user whose current backup version we're deleting + version(str): the version id of the backup being deleted + Raises: + StoreError: code 404 if this backup version doesn't exist + """ + + with (yield self._upload_linearizer.queue(user_id)): + yield self.store.delete_e2e_room_keys_version(user_id, version) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 45d955e6f5..cab57a8849 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -309,8 +309,8 @@ class FederationHandler(BaseHandler): if sent_to_us_directly: logger.warn( - "[%s %s] Failed to fetch %d prev events: rejecting", - room_id, event_id, len(prevs - seen), + "[%s %s] Rejecting: failed to fetch %d prev events: %s", + room_id, event_id, len(prevs - seen), shortstr(prevs - seen) ) raise FederationError( "ERROR", @@ -452,8 +452,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "[%s %s]: Requesting %d prev_events: %s", - room_id, event_id, len(prevs - seen), shortstr(prevs - seen) + "[%s %s]: Requesting missing events between %s and %s", + room_id, event_id, shortstr(latest), event_id, ) # XXX: we set timeout to 10s to help workaround @@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): + latest_events, limit): in_room = yield self.auth.check_host_in_room( room_id, origin @@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Host not in room.") limit = min(limit, 20) - min_depth = max(min_depth, 0) missing_events = yield self.store.get_missing_events( room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, limit=limit, - min_depth=min_depth, ) missing_events = yield filter_events_for_server( diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 38e1737ec9..dc88620885 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -16,7 +16,7 @@ import logging from collections import namedtuple -from six import iteritems +from six import PY3, iteritems from six.moves import range import msgpack @@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( @classmethod def from_token(cls, token): + if PY3: + # The argument raw=False is only available on new versions of + # msgpack, and only really needed on Python 3. Gate it behind + # a PY3 check to avoid causing issues on Debian-packaged versions. + decoded = msgpack.loads(decode_base64(token), raw=False) + else: + decoded = msgpack.loads(decode_base64(token)) return RoomListNextBatch(**{ cls.REVERSE_KEY_DICT[key]: val - for key, val in msgpack.loads(decode_base64(token)).items() + for key, val in decoded.items() }) def to_token(self): diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index d8413d6aa7..f11b430126 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -20,6 +20,7 @@ from six import iteritems from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo from synapse.types import get_localpart_from_id from synapse.util.metrics import Measure @@ -98,7 +99,6 @@ class UserDirectoryHandler(object): """ return self.store.search_user_dir(user_id, search_term, limit) - @defer.inlineCallbacks def notify_new_event(self): """Called when there may be more deltas to process """ @@ -108,11 +108,15 @@ class UserDirectoryHandler(object): if self._is_processing: return + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + self._is_processing = False + self._is_processing = True - try: - yield self._unsafe_process() - finally: - self._is_processing = False + run_as_background_process("user_directory.notify_new_event", process) @defer.inlineCallbacks def handle_local_profile_change(self, user_id, profile): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 1a79f6305b..24b6110c20 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object): ) self.clock = hs.get_clock() self._store = hs.get_datastore() - self.version_string = hs.version_string.encode('ascii') + self.version_string_bytes = hs.version_string.encode('ascii') self.default_timeout = 60 def schedule(x): @@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object): ignore_backoff=ignore_backoff, ) - method = request.method - destination = request.destination + method_bytes = request.method.encode("ascii") + destination_bytes = request.destination.encode("ascii") path_bytes = request.path.encode("ascii") if request.query: query_bytes = encode_query_args(request.query) @@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object): query_bytes = b"" headers_dict = { - "User-Agent": [self.version_string], - "Host": [request.destination], + b"User-Agent": [self.version_string_bytes], + b"Host": [destination_bytes], } with limiter: @@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url = urllib.parse.urlunparse(( - b"matrix", destination.encode("ascii"), + url_bytes = urllib.parse.urlunparse(( + b"matrix", destination_bytes, path_bytes, None, query_bytes, b"", - )).decode('ascii') + )) + url_str = url_bytes.decode('ascii') - http_url = urllib.parse.urlunparse(( + url_to_sign_bytes = urllib.parse.urlunparse(( b"", b"", path_bytes, None, query_bytes, b"", - )).decode('ascii') + )) while True: try: json = request.get_json() if json: - data = encode_canonical_json(json) - headers_dict["Content-Type"] = ["application/json"] + headers_dict[b"Content-Type"] = [b"application/json"] self.sign_request( - destination, method, http_url, headers_dict, json + destination_bytes, method_bytes, url_to_sign_bytes, + headers_dict, json, ) - else: - data = None - self.sign_request(destination, method, http_url, headers_dict) - - logger.info( - "{%s} [%s] Sending request: %s %s", - request.txn_id, destination, method, url - ) - - if data: + data = encode_canonical_json(json) producer = FileBodyProducer( BytesIO(data), - cooperator=self._cooperator + cooperator=self._cooperator, ) else: producer = None + self.sign_request( + destination_bytes, method_bytes, url_to_sign_bytes, + headers_dict, + ) - request_deferred = treq.request( - method, - url, + logger.info( + "{%s} [%s] Sending request: %s %s", + request.txn_id, request.destination, request.method, + url_str, + ) + + # we don't want all the fancy cookie and redirect handling that + # treq.request gives: just use the raw Agent. + request_deferred = self.agent.request( + method_bytes, + url_bytes, headers=Headers(headers_dict), - data=producer, - agent=self.agent, - reactor=self.hs.get_reactor(), - unbuffered=True + bodyProducer=producer, ) request_deferred = timeout_deferred( @@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object): logger.warn( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, - destination, - method, - url, + request.destination, + request.method, + url_str, _flatten_response_never_received(e), ) @@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object): logger.debug( "{%s} [%s] Waiting %ss before re-sending...", request.txn_id, - destination, + request.destination, delay, ) @@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object): logger.info( "{%s} [%s] Got response headers: %d %s", request.txn_id, - destination, + request.destination, response.code, response.phrase.decode('ascii', errors='replace'), ) @@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object): destination_is must be non-None. method (bytes): The HTTP method of the request url_bytes (bytes): The URI path of the request - headers_dict (dict): Dictionary of request headers to append to - content (bytes): The body of the request + headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to + append to + content (object): The body of the request destination_is (bytes): As 'destination', but if the destination is an identity server diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index fedb4e6b18..62045a918b 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -39,7 +39,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", + "synapse_http_server_response_time_seconds", + "sec", ["method", "servlet", "tag", "code"], ) @@ -79,15 +80,11 @@ response_size = Counter( # than when the response was written. in_flight_requests_ru_utime = Counter( - "synapse_http_server_in_flight_requests_ru_utime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"] ) in_flight_requests_ru_stime = Counter( - "synapse_http_server_in_flight_requests_ru_stime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"] ) in_flight_requests_db_txn_count = Counter( @@ -134,7 +131,7 @@ def _get_in_flight_counts(): # type counts = {} for rm in reqs: - key = (rm.method, rm.name,) + key = (rm.method, rm.name) counts[key] = counts.get(key, 0) + 1 return counts @@ -175,7 +172,8 @@ class RequestMetrics(object): if context != self.start_context: logger.warn( "Context have unexpectedly changed %r, %r", - context, self.start_context + context, + self.start_context, ) return @@ -192,10 +190,10 @@ class RequestMetrics(object): resource_usage = context.get_resource_usage() response_ru_utime.labels(self.method, self.name, tag).inc( - resource_usage.ru_utime, + resource_usage.ru_utime ) response_ru_stime.labels(self.method, self.name, tag).inc( - resource_usage.ru_stime, + resource_usage.ru_stime ) response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count @@ -222,8 +220,15 @@ class RequestMetrics(object): diff = new_stats - self._request_stats self._request_stats = new_stats - in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) - in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) + # max() is used since rapid use of ru_stime/ru_utime can end up with the + # count going backwards due to NTP, time smearing, fine-grained + # correction, or floating points. Who knows, really? + in_flight_requests_ru_utime.labels(self.method, self.name).inc( + max(diff.ru_utime, 0) + ) + in_flight_requests_ru_stime.labels(self.method, self.name).inc( + max(diff.ru_stime, 0) + ) in_flight_requests_db_txn_count.labels(self.method, self.name).inc( diff.db_txn_count diff --git a/synapse/notifier.py b/synapse/notifier.py index 340b16ce25..de02b1017e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -186,9 +186,9 @@ class Notifier(object): def count_listeners(): all_user_streams = set() - for x in self.room_to_user_streams.values(): + for x in list(self.room_to_user_streams.values()): all_user_streams |= x - for x in self.user_to_user_stream.values(): + for x in list(self.user_to_user_stream.values()): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) @@ -196,7 +196,7 @@ class Notifier(object): LaterGauge( "synapse_notifier_rooms", "", [], - lambda: count(bool, self.room_to_user_streams.values()), + lambda: count(bool, list(self.room_to_user_streams.values())), ) LaterGauge( "synapse_notifier_users", "", [], diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1a5a10d974..16fb5e8471 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -526,8 +526,7 @@ def load_jinja2_templates(config): Returns: (notif_template_html, notif_template_text) """ - logger.info("loading jinja2") - + logger.info("loading email templates from '%s'", config.email_template_dir) loader = jinja2.FileSystemLoader(config.email_template_dir) env = jinja2.Environment(loader=loader) env.filters["format_ts"] = format_ts_filter diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2947f37f1a..943876456b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -53,9 +53,10 @@ REQUIREMENTS = { "pillow>=3.1.2": ["PIL"], "pydenticon>=0.2": ["pydenticon"], "sortedcontainers>=1.4.4": ["sortedcontainers"], + "psutil>=2.0.0": ["psutil>=2.0.0"], "pysaml2>=3.0.0": ["saml2"], "pymacaroons-pynacl>=0.9.3": ["pymacaroons"], - "msgpack-python>=0.3.0": ["msgpack"], + "msgpack-python>=0.4.2": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], "six>=1.10": ["six"], @@ -79,9 +80,6 @@ CONDITIONAL_REQUIREMENTS = { "matrix-synapse-ldap3": { "matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"], }, - "psutil": { - "psutil>=2.0.0": ["psutil>=2.0.0"], - }, "postgres": { "psycopg2>=2.6": ["psycopg2"] } diff --git a/res/templates/mail-Vector.css b/synapse/res/templates/mail-Vector.css index 6a3e36eda1..6a3e36eda1 100644 --- a/res/templates/mail-Vector.css +++ b/synapse/res/templates/mail-Vector.css diff --git a/res/templates/mail.css b/synapse/res/templates/mail.css index 5ab3e1b06d..5ab3e1b06d 100644 --- a/res/templates/mail.css +++ b/synapse/res/templates/mail.css diff --git a/res/templates/notif.html b/synapse/res/templates/notif.html index 88b921ca9c..88b921ca9c 100644 --- a/res/templates/notif.html +++ b/synapse/res/templates/notif.html diff --git a/res/templates/notif.txt b/synapse/res/templates/notif.txt index a37bee9833..a37bee9833 100644 --- a/res/templates/notif.txt +++ b/synapse/res/templates/notif.txt diff --git a/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html index fcdb3109fe..fcdb3109fe 100644 --- a/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html diff --git a/res/templates/notif_mail.txt b/synapse/res/templates/notif_mail.txt index 24843042a5..24843042a5 100644 --- a/res/templates/notif_mail.txt +++ b/synapse/res/templates/notif_mail.txt diff --git a/res/templates/room.html b/synapse/res/templates/room.html index 723c222d25..723c222d25 100644 --- a/res/templates/room.html +++ b/synapse/res/templates/room.html diff --git a/res/templates/room.txt b/synapse/res/templates/room.txt index 84648c710e..84648c710e 100644 --- a/res/templates/room.txt +++ b/synapse/res/templates/room.txt diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 3418f06fd6..4856822a5d 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -46,6 +46,7 @@ from synapse.rest.client.v2_alpha import ( receipts, register, report_event, + room_keys, sendtodevice, sync, tags, @@ -102,6 +103,7 @@ class ClientRestResource(JsonResource): auth.register_servlets(hs, client_resource) receipts.register_servlets(hs, client_resource) read_marker.register_servlets(hs, client_resource) + room_keys.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py new file mode 100644 index 0000000000..45b5817d8b --- /dev/null +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -0,0 +1,372 @@ +# -*- coding: utf-8 -*- +# Copyright 2017, 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import Codes, SynapseError +from synapse.http.servlet import ( + RestServlet, + parse_json_object_from_request, + parse_string, +) + +from ._base import client_v2_patterns + +logger = logging.getLogger(__name__) + + +class RoomKeysServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/room_keys/keys(/(?P<room_id>[^/]+))?(/(?P<session_id>[^/]+))?$" + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(RoomKeysServlet, self).__init__() + self.auth = hs.get_auth() + self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, room_id, session_id): + """ + Uploads one or more encrypted E2E room keys for backup purposes. + room_id: the ID of the room the keys are for (optional) + session_id: the ID for the E2E room keys for the room (optional) + version: the version of the user's backup which this data is for. + the version must already have been created via the /room_keys/version API. + + Each session has: + * first_message_index: a numeric index indicating the oldest message + encrypted by this session. + * forwarded_count: how many times the uploading client claims this key + has been shared (forwarded) + * is_verified: whether the client that uploaded the keys claims they + were sent by a device which they've verified + * session_data: base64-encrypted data describing the session. + + Returns 200 OK on success with body {} + Returns 403 Forbidden if the version in question is not the most recently + created version (i.e. if this is an old client trying to write to a stale backup) + Returns 404 Not Found if the version in question doesn't exist + + The API is designed to be otherwise agnostic to the room_key encryption + algorithm being used. Sessions are merged with existing ones in the + backup using the heuristics: + * is_verified sessions always win over unverified sessions + * older first_message_index always win over newer sessions + * lower forwarded_count always wins over higher forwarded_count + + We trust the clients not to lie and corrupt their own backups. + It also means that if your access_token is stolen, the attacker could + delete your backup. + + POST /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1 + Content-Type: application/json + + { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + + Or... + + POST /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1 + Content-Type: application/json + + { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + + Or... + + POST /room_keys/keys?version=1 HTTP/1.1 + Content-Type: application/json + + { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + version = parse_string(request, "version") + + if session_id: + body = { + "sessions": { + session_id: body + } + } + + if room_id: + body = { + "rooms": { + room_id: body + } + } + + yield self.e2e_room_keys_handler.upload_room_keys( + user_id, version, body + ) + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_GET(self, request, room_id, session_id): + """ + Retrieves one or more encrypted E2E room keys for backup purposes. + Symmetric with the PUT version of the API. + + room_id: the ID of the room to retrieve the keys for (optional) + session_id: the ID for the E2E room keys to retrieve the keys for (optional) + version: the version of the user's backup which this data is for. + the version must already have been created via the /change_secret API. + + Returns as follows: + + GET /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1 + { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + + Or... + + GET /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1 + { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + + Or... + + GET /room_keys/keys?version=1 HTTP/1.1 + { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + version = parse_string(request, "version") + + room_keys = yield self.e2e_room_keys_handler.get_room_keys( + user_id, version, room_id, session_id + ) + + if session_id: + room_keys = room_keys['rooms'][room_id]['sessions'][session_id] + elif room_id: + room_keys = room_keys['rooms'][room_id] + + defer.returnValue((200, room_keys)) + + @defer.inlineCallbacks + def on_DELETE(self, request, room_id, session_id): + """ + Deletes one or more encrypted E2E room keys for a user for backup purposes. + + DELETE /room_keys/keys/!abc:matrix.org/c0ff33?version=1 + HTTP/1.1 200 OK + {} + + room_id: the ID of the room whose keys to delete (optional) + session_id: the ID for the E2E session to delete (optional) + version: the version of the user's backup which this data is for. + the version must already have been created via the /change_secret API. + """ + + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + version = parse_string(request, "version") + + yield self.e2e_room_keys_handler.delete_room_keys( + user_id, version, room_id, session_id + ) + defer.returnValue((200, {})) + + +class RoomKeysNewVersionServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/room_keys/version$" + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(RoomKeysNewVersionServlet, self).__init__() + self.auth = hs.get_auth() + self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + + @defer.inlineCallbacks + def on_POST(self, request): + """ + Create a new backup version for this user's room_keys with the given + info. The version is allocated by the server and returned to the user + in the response. This API is intended to be used whenever the user + changes the encryption key for their backups, ensuring that backups + encrypted with different keys don't collide. + + It takes out an exclusive lock on this user's room_key backups, to ensure + clients only upload to the current backup. + + The algorithm passed in the version info is a reverse-DNS namespaced + identifier to describe the format of the encrypted backupped keys. + + The auth_data is { user_id: "user_id", nonce: <random string> } + encrypted using the algorithm and current encryption key described above. + + POST /room_keys/version + Content-Type: application/json + { + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + + HTTP/1.1 200 OK + Content-Type: application/json + { + "version": 12345 + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + info = parse_json_object_from_request(request) + + new_version = yield self.e2e_room_keys_handler.create_version( + user_id, info + ) + defer.returnValue((200, {"version": new_version})) + + # we deliberately don't have a PUT /version, as these things really should + # be immutable to avoid people footgunning + + +class RoomKeysVersionServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/room_keys/version(/(?P<version>[^/]+))?$" + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(RoomKeysVersionServlet, self).__init__() + self.auth = hs.get_auth() + self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + + @defer.inlineCallbacks + def on_GET(self, request, version): + """ + Retrieve the version information about a given version of the user's + room_keys backup. If the version part is missing, returns info about the + most current backup version (if any) + + It takes out an exclusive lock on this user's room_key backups, to ensure + clients only upload to the current backup. + + Returns 404 if the given version does not exist. + + GET /room_keys/version/12345 HTTP/1.1 + { + "version": "12345", + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + + try: + info = yield self.e2e_room_keys_handler.get_version_info( + user_id, version + ) + except SynapseError as e: + if e.code == 404: + raise SynapseError(404, "No backup found", Codes.NOT_FOUND) + defer.returnValue((200, info)) + + @defer.inlineCallbacks + def on_DELETE(self, request, version): + """ + Delete the information about a given version of the user's + room_keys backup. If the version part is missing, deletes the most + current backup version (if any). Doesn't delete the actual room data. + + DELETE /room_keys/version/12345 HTTP/1.1 + HTTP/1.1 200 OK + {} + """ + if version is None: + raise SynapseError(400, "No version specified to delete", Codes.NOT_FOUND) + + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + + yield self.e2e_room_keys_handler.delete_version( + user_id, version + ) + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + RoomKeysServlet(hs).register(http_server) + RoomKeysVersionServlet(hs).register(http_server) + RoomKeysNewVersionServlet(hs).register(http_server) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index a828ff4438..08b1867fab 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse import twisted.internet.error import twisted.web.http -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.resource import Resource from synapse.api.errors import ( @@ -36,8 +36,8 @@ from synapse.api.errors import ( ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import logcontext from synapse.util.async_helpers import Linearizer -from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import is_ascii, random_string @@ -492,10 +492,11 @@ class MediaRepository(object): )) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - )) + ) if t_byte_source: try: @@ -534,10 +535,11 @@ class MediaRepository(object): )) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - )) + ) if t_byte_source: try: @@ -620,15 +622,17 @@ class MediaRepository(object): for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), thumbnailer.crop, t_width, t_height, t_type, - )) + ) elif t_method == "scale": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), thumbnailer.scale, t_width, t_height, t_type, - )) + ) else: logger.error("Unrecognized method: %r", t_method) continue diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index a6189224ee..896078fe76 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -21,9 +21,10 @@ import sys import six -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.protocols.basic import FileSender +from synapse.util import logcontext from synapse.util.file_consumer import BackgroundFileConsumer from synapse.util.logcontext import make_deferred_yieldable @@ -64,9 +65,10 @@ class MediaStorage(object): with self.store_into_file(file_info) as (f, fname, finish_cb): # Write to the main repository - yield make_deferred_yieldable(threads.deferToThread( + yield logcontext.defer_to_thread( + self.hs.get_reactor(), _write_file_synchronously, source, f, - )) + ) yield finish_cb() defer.returnValue(fname) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index af01040a38..8c892ff187 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -596,10 +596,13 @@ def _iterate_over_text(tree, *tags_to_ignore): # to be returned. elements = iter([tree]) while True: - el = next(elements) + el = next(elements, None) + if el is None: + return + if isinstance(el, string_types): yield el - elif el is not None and el.tag not in tags_to_ignore: + elif el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately # return it if the text exists. if el.text: diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 7b9f8b4d79..5aa03031f6 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -17,9 +17,10 @@ import logging import os import shutil -from twisted.internet import defer, threads +from twisted.internet import defer from synapse.config._base import Config +from synapse.util import logcontext from synapse.util.logcontext import run_in_background from .media_storage import FileResponder @@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider): if not os.path.exists(dirname): os.makedirs(dirname) - return threads.deferToThread( + return logcontext.defer_to_thread( + self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname, ) diff --git a/synapse/server.py b/synapse/server.py index 938a05f9dc..3e9d3d8256 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,6 +51,7 @@ from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.e2e_keys import E2eKeysHandler +from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler @@ -130,6 +131,7 @@ class HomeServer(object): 'auth_handler', 'device_handler', 'e2e_keys_handler', + 'e2e_room_keys_handler', 'event_handler', 'event_stream_handler', 'initial_sync_handler', @@ -299,6 +301,9 @@ class HomeServer(object): def build_e2e_keys_handler(self): return E2eKeysHandler(self) + def build_e2e_room_keys_handler(self): + return E2eRoomKeysHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 23b4a8d76d..53c685c173 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -30,6 +30,7 @@ from .appservice import ApplicationServiceStore, ApplicationServiceTransactionSt from .client_ips import ClientIpStore from .deviceinbox import DeviceInboxStore from .directory import DirectoryStore +from .e2e_room_keys import EndToEndRoomKeyStore from .end_to_end_keys import EndToEndKeyStore from .engines import PostgresEngine from .event_federation import EventFederationStore @@ -77,6 +78,7 @@ class DataStore(RoomMemberStore, RoomStore, ApplicationServiceTransactionStore, ReceiptsStore, EndToEndKeyStore, + EndToEndRoomKeyStore, SearchStore, TagsStore, AccountDataStore, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index be61147b9b..d9d0255d0b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,7 +18,7 @@ import threading import time from six import PY2, iteritems, iterkeys, itervalues -from six.moves import intern, range +from six.moves import builtins, intern, range from canonicaljson import json from prometheus_client import Histogram @@ -1233,7 +1233,7 @@ def db_to_json(db_content): # psycopg2 on Python 2 returns buffer objects, which we need to cast to # bytes to decode - if PY2 and isinstance(db_content, buffer): + if PY2 and isinstance(db_content, builtins.buffer): db_content = bytes(db_content) # Decode it to a Unicode string before feeding it to json.loads, so we diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py new file mode 100644 index 0000000000..f25ded2295 --- /dev/null +++ b/synapse/storage/e2e_room_keys.py @@ -0,0 +1,320 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from twisted.internet import defer + +from synapse.api.errors import StoreError + +from ._base import SQLBaseStore + + +class EndToEndRoomKeyStore(SQLBaseStore): + + @defer.inlineCallbacks + def get_e2e_room_key(self, user_id, version, room_id, session_id): + """Get the encrypted E2E room key for a given session from a given + backup version of room_keys. We only store the 'best' room key for a given + session at a given time, as determined by the handler. + + Args: + user_id(str): the user whose backup we're querying + version(str): the version ID of the backup for the set of keys we're querying + room_id(str): the ID of the room whose keys we're querying. + This is a bit redundant as it's implied by the session_id, but + we include for consistency with the rest of the API. + session_id(str): the session whose room_key we're querying. + + Returns: + A deferred dict giving the session_data and message metadata for + this room key. + """ + + row = yield self._simple_select_one( + table="e2e_room_keys", + keyvalues={ + "user_id": user_id, + "version": version, + "room_id": room_id, + "session_id": session_id, + }, + retcols=( + "first_message_index", + "forwarded_count", + "is_verified", + "session_data", + ), + desc="get_e2e_room_key", + ) + + row["session_data"] = json.loads(row["session_data"]) + + defer.returnValue(row) + + @defer.inlineCallbacks + def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): + """Replaces or inserts the encrypted E2E room key for a given session in + a given backup + + Args: + user_id(str): the user whose backup we're setting + version(str): the version ID of the backup we're updating + room_id(str): the ID of the room whose keys we're setting + session_id(str): the session whose room_key we're setting + room_key(dict): the room_key being set + Raises: + StoreError + """ + + yield self._simple_upsert( + table="e2e_room_keys", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "session_id": session_id, + }, + values={ + "version": version, + "first_message_index": room_key['first_message_index'], + "forwarded_count": room_key['forwarded_count'], + "is_verified": room_key['is_verified'], + "session_data": json.dumps(room_key['session_data']), + }, + lock=False, + ) + + @defer.inlineCallbacks + def get_e2e_room_keys( + self, user_id, version, room_id=None, session_id=None + ): + """Bulk get the E2E room keys for a given backup, optionally filtered to a given + room, or a given session. + + Args: + user_id(str): the user whose backup we're querying + version(str): the version ID of the backup for the set of keys we're querying + room_id(str): Optional. the ID of the room whose keys we're querying, if any. + If not specified, we return the keys for all the rooms in the backup. + session_id(str): Optional. the session whose room_key we're querying, if any. + If specified, we also require the room_id to be specified. + If not specified, we return all the keys in this version of + the backup (or for the specified room) + + Returns: + A deferred list of dicts giving the session_data and message metadata for + these room keys. + """ + + keyvalues = { + "user_id": user_id, + "version": version, + } + if room_id: + keyvalues['room_id'] = room_id + if session_id: + keyvalues['session_id'] = session_id + + rows = yield self._simple_select_list( + table="e2e_room_keys", + keyvalues=keyvalues, + retcols=( + "user_id", + "room_id", + "session_id", + "first_message_index", + "forwarded_count", + "is_verified", + "session_data", + ), + desc="get_e2e_room_keys", + ) + + sessions = {'rooms': {}} + for row in rows: + room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}}) + room_entry['sessions'][row['session_id']] = { + "first_message_index": row["first_message_index"], + "forwarded_count": row["forwarded_count"], + "is_verified": row["is_verified"], + "session_data": json.loads(row["session_data"]), + } + + defer.returnValue(sessions) + + @defer.inlineCallbacks + def delete_e2e_room_keys( + self, user_id, version, room_id=None, session_id=None + ): + """Bulk delete the E2E room keys for a given backup, optionally filtered to a given + room or a given session. + + Args: + user_id(str): the user whose backup we're deleting from + version(str): the version ID of the backup for the set of keys we're deleting + room_id(str): Optional. the ID of the room whose keys we're deleting, if any. + If not specified, we delete the keys for all the rooms in the backup. + session_id(str): Optional. the session whose room_key we're querying, if any. + If specified, we also require the room_id to be specified. + If not specified, we delete all the keys in this version of + the backup (or for the specified room) + + Returns: + A deferred of the deletion transaction + """ + + keyvalues = { + "user_id": user_id, + "version": version, + } + if room_id: + keyvalues['room_id'] = room_id + if session_id: + keyvalues['session_id'] = session_id + + yield self._simple_delete( + table="e2e_room_keys", + keyvalues=keyvalues, + desc="delete_e2e_room_keys", + ) + + @staticmethod + def _get_current_version(txn, user_id): + txn.execute( + "SELECT MAX(version) FROM e2e_room_keys_versions " + "WHERE user_id=? AND deleted=0", + (user_id,) + ) + row = txn.fetchone() + if not row: + raise StoreError(404, 'No current backup version') + return row[0] + + def get_e2e_room_keys_version_info(self, user_id, version=None): + """Get info metadata about a version of our room_keys backup. + + Args: + user_id(str): the user whose backup we're querying + version(str): Optional. the version ID of the backup we're querying about + If missing, we return the information about the current version. + Raises: + StoreError: with code 404 if there are no e2e_room_keys_versions present + Returns: + A deferred dict giving the info metadata for this backup version + """ + + def _get_e2e_room_keys_version_info_txn(txn): + if version is None: + this_version = self._get_current_version(txn, user_id) + else: + this_version = version + + result = self._simple_select_one_txn( + txn, + table="e2e_room_keys_versions", + keyvalues={ + "user_id": user_id, + "version": this_version, + "deleted": 0, + }, + retcols=( + "version", + "algorithm", + "auth_data", + ), + ) + result["auth_data"] = json.loads(result["auth_data"]) + return result + + return self.runInteraction( + "get_e2e_room_keys_version_info", + _get_e2e_room_keys_version_info_txn + ) + + def create_e2e_room_keys_version(self, user_id, info): + """Atomically creates a new version of this user's e2e_room_keys store + with the given version info. + + Args: + user_id(str): the user whose backup we're creating a version + info(dict): the info about the backup version to be created + + Returns: + A deferred string for the newly created version ID + """ + + def _create_e2e_room_keys_version_txn(txn): + txn.execute( + "SELECT MAX(version) FROM e2e_room_keys_versions WHERE user_id=?", + (user_id,) + ) + current_version = txn.fetchone()[0] + if current_version is None: + current_version = '0' + + new_version = str(int(current_version) + 1) + + self._simple_insert_txn( + txn, + table="e2e_room_keys_versions", + values={ + "user_id": user_id, + "version": new_version, + "algorithm": info["algorithm"], + "auth_data": json.dumps(info["auth_data"]), + }, + ) + + return new_version + + return self.runInteraction( + "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn + ) + + def delete_e2e_room_keys_version(self, user_id, version=None): + """Delete a given backup version of the user's room keys. + Doesn't delete their actual key data. + + Args: + user_id(str): the user whose backup version we're deleting + version(str): Optional. the version ID of the backup version we're deleting + If missing, we delete the current backup version info. + Raises: + StoreError: with code 404 if there are no e2e_room_keys_versions present, + or if the version requested doesn't exist. + """ + + def _delete_e2e_room_keys_version_txn(txn): + if version is None: + this_version = self._get_current_version(txn, user_id) + else: + this_version = version + + return self._simple_update_one_txn( + txn, + table="e2e_room_keys_versions", + keyvalues={ + "user_id": user_id, + "version": this_version, + }, + updatevalues={ + "deleted": 1, + } + ) + + return self.runInteraction( + "delete_e2e_room_keys_version", + _delete_e2e_room_keys_version_txn + ) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 24345b20a6..3faca2a042 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -376,33 +376,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, @defer.inlineCallbacks def get_missing_events(self, room_id, earliest_events, latest_events, - limit, min_depth): + limit): ids = yield self.runInteraction( "get_missing_events", self._get_missing_events, - room_id, earliest_events, latest_events, limit, min_depth + room_id, earliest_events, latest_events, limit, ) - events = yield self._get_events(ids) - - events = sorted( - [ev for ev in events if ev.depth >= min_depth], - key=lambda e: e.depth, - ) - - defer.returnValue(events[:limit]) + defer.returnValue(events) def _get_missing_events(self, txn, room_id, earliest_events, latest_events, - limit, min_depth): - - earliest_events = set(earliest_events) - front = set(latest_events) - earliest_events + limit): - event_results = set() + seen_events = set(earliest_events) + front = set(latest_events) - seen_events + event_results = [] query = ( "SELECT prev_event_id FROM event_edges " - "WHERE event_id = ? AND is_state = ? " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -411,18 +403,20 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, for event_id in front: txn.execute( query, - (event_id, False, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) - for e_id, in txn: - new_front.add(e_id) + new_results = set(t[0] for t in txn) - seen_events - new_front -= earliest_events - new_front -= event_results + new_front |= new_results + seen_events |= new_results + event_results.extend(new_results) front = new_front - event_results |= new_front + # we built the list working backwards from latest_events; we now need to + # reverse it so that the events are approximately chronological. + event_results.reverse() return event_results diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a1331c1a61..8af17921e3 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index c7987bfcdd..2743b52bad 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -29,7 +29,7 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/storage/schema/delta/51/e2e_room_keys.sql b/synapse/storage/schema/delta/51/e2e_room_keys.sql new file mode 100644 index 0000000000..c0e66a697d --- /dev/null +++ b/synapse/storage/schema/delta/51/e2e_room_keys.sql @@ -0,0 +1,39 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- users' optionally backed up encrypted e2e sessions +CREATE TABLE e2e_room_keys ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + session_id TEXT NOT NULL, + version TEXT NOT NULL, + first_message_index INT, + forwarded_count INT, + is_verified BOOLEAN, + session_data TEXT NOT NULL +); + +CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id); + +-- the metadata for each generation of encrypted e2e session backups +CREATE TABLE e2e_room_keys_versions ( + user_id TEXT NOT NULL, + version TEXT NOT NULL, + algorithm TEXT NOT NULL, + auth_data TEXT NOT NULL, + deleted SMALLINT DEFAULT 0 NOT NULL +); + +CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version); diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 5623391f6e..158e9dbe7b 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -27,7 +27,7 @@ from ._base import SQLBaseStore # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index a3032cdce9..d8bf953ec0 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index f2bde74dc5..625aedc940 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -15,6 +15,8 @@ import logging +from six import integer_types + from sortedcontainers import SortedDict from synapse.util import caches @@ -47,7 +49,7 @@ class StreamChangeCache(object): def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) is int or type(stream_pos) is long + assert type(stream_pos) in integer_types if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 89224b26cc..4c6e92beb8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works. import logging import threading -from twisted.internet import defer +from twisted.internet import defer, threads logger = logging.getLogger(__name__) @@ -562,58 +562,76 @@ def _set_context_cb(result, context): return result -# modules to ignore in `logcontext_tracer` -_to_ignore = [ - "synapse.util.logcontext", - "synapse.http.server", - "synapse.storage._base", - "synapse.util.async_helpers", -] +def defer_to_thread(reactor, f, *args, **kwargs): + """ + Calls the function `f` using a thread from the reactor's default threadpool and + returns the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked, and whose threadpool we should use for the + function. + + Normally this will be hs.get_reactor(). + + f (callable): The function to call. + args: positional arguments to pass to f. -def logcontext_tracer(frame, event, arg): - """A tracer that logs whenever a logcontext "unexpectedly" changes within - a function. Probably inaccurate. + kwargs: keyword arguments to pass to f. - Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. """ - if event == 'call': - name = frame.f_globals["__name__"] - if name.startswith("synapse"): - if name == "synapse.util.logcontext": - if frame.f_code.co_name in ["__enter__", "__exit__"]: - tracer = frame.f_back.f_trace - if tracer: - tracer.just_changed = True - - tracer = frame.f_trace - if tracer: - return tracer - - if not any(name.startswith(ig) for ig in _to_ignore): - return LineTracer() - - -class LineTracer(object): - __slots__ = ["context", "just_changed"] - - def __init__(self): - self.context = LoggingContext.current_context() - self.just_changed = False - - def __call__(self, frame, event, arg): - if event in 'line': - if self.just_changed: - self.context = LoggingContext.current_context() - self.just_changed = False - else: - c = LoggingContext.current_context() - if c != self.context: - logger.info( - "Context changed! %s -> %s, %s, %s", - self.context, c, - frame.f_code.co_filename, frame.f_lineno - ) - self.context = c + return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) - return self + +def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): + """ + A wrapper for twisted.internet.threads.deferToThreadpool, which handles + logcontexts correctly. + + Calls the function `f` using a thread from the given threadpool and returns + the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked. Normally this will be hs.get_reactor(). + + threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for + running `f`. Normally this will be hs.get_reactor().getThreadPool(). + + f (callable): The function to call. + + args: positional arguments to pass to f. + + kwargs: keyword arguments to pass to f. + + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. + """ + logcontext = LoggingContext.current_context() + + def g(): + with LoggingContext(parent_context=logcontext): + return f(*args, **kwargs) + + return make_deferred_yieldable( + threads.deferToThreadPool(reactor, threadpool, g) + ) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 8d0f2a8918..9cb7e9c9ab 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -70,6 +70,8 @@ def manhole(username, password, globals): Returns: twisted.internet.protocol.Factory: A factory to pass to ``listenTCP`` """ + if not isinstance(password, bytes): + password = password.encode('ascii') checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( **{username: password} @@ -82,7 +84,7 @@ def manhole(username, password, globals): ) factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) - factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY) return factory diff --git a/synapse/visibility.py b/synapse/visibility.py index c64ad2144c..43f48196be 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -219,7 +219,7 @@ def filter_events_for_server(store, server_name, events): # Whatever else we do, we need to check for senders which have requested # erasure of their data. erased_senders = yield store.are_users_erased( - e.sender for e in events, + (e.sender for e in events), ) def redact_disallowed(event, state): diff --git a/synctl b/synctl index 09b64459b1..7e79b05c39 100755 --- a/synctl +++ b/synctl @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -48,7 +49,16 @@ def pid_running(pid): def write(message, colour=NORMAL, stream=sys.stdout): - if colour == NORMAL: + # Lets check if we're writing to a TTY before colouring + should_colour = False + try: + should_colour = stream.isatty() + except AttributeError: + # Just in case `isatty` isn't defined on everything. The python + # docs are incredibly vague. + pass + + if not should_colour: stream.write(message + "\n") else: stream.write(colour + message + NORMAL + "\n") @@ -66,8 +76,7 @@ def start(configfile): try: subprocess.check_call(args) - write("started synapse.app.homeserver(%r)" % - (configfile,), colour=GREEN) + write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN) except subprocess.CalledProcessError as e: write( "error starting (exit code: %d); see above for logs" % e.returncode, @@ -76,21 +85,15 @@ def start(configfile): def start_worker(app, configfile, worker_configfile): - args = [ - "python", "-B", - "-m", app, - "-c", configfile, - "-c", worker_configfile - ] + args = [sys.executable, "-B", "-m", app, "-c", configfile, "-c", worker_configfile] try: subprocess.check_call(args) write("started %s(%r)" % (app, worker_configfile), colour=GREEN) except subprocess.CalledProcessError as e: write( - "error starting %s(%r) (exit code: %d); see above for logs" % ( - app, worker_configfile, e.returncode, - ), + "error starting %s(%r) (exit code: %d); see above for logs" + % (app, worker_configfile, e.returncode), colour=RED, ) @@ -110,9 +113,9 @@ def stop(pidfile, app): abort("Cannot stop %s: Unknown error" % (app,)) -Worker = collections.namedtuple("Worker", [ - "app", "configfile", "pidfile", "cache_factor", "cache_factors", -]) +Worker = collections.namedtuple( + "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"] +) def main(): @@ -131,24 +134,20 @@ def main(): help="the homeserver config file, defaults to homeserver.yaml", ) parser.add_argument( - "-w", "--worker", - metavar="WORKERCONFIG", - help="start or stop a single worker", + "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker" ) parser.add_argument( - "-a", "--all-processes", + "-a", + "--all-processes", metavar="WORKERCONFIGDIR", help="start or stop all the workers in the given directory" - " and the main synapse process", + " and the main synapse process", ) options = parser.parse_args() if options.worker and options.all_processes: - write( - 'Cannot use "--worker" with "--all-processes"', - stream=sys.stderr - ) + write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr) sys.exit(1) configfile = options.configfile @@ -157,9 +156,7 @@ def main(): write( "No config file found\n" "To generate a config file, run '%s -c %s --generate-config" - " --server-name=<server name>'\n" % ( - " ".join(SYNAPSE), options.configfile - ), + " --server-name=<server name>'\n" % (" ".join(SYNAPSE), options.configfile), stream=sys.stderr, ) sys.exit(1) @@ -184,8 +181,7 @@ def main(): worker_configfile = options.worker if not os.path.exists(worker_configfile): write( - "No worker config found at %r" % (worker_configfile,), - stream=sys.stderr, + "No worker config found at %r" % (worker_configfile,), stream=sys.stderr ) sys.exit(1) worker_configfiles.append(worker_configfile) @@ -201,9 +197,9 @@ def main(): stream=sys.stderr, ) sys.exit(1) - worker_configfiles.extend(sorted(glob.glob( - os.path.join(worker_configdir, "*.yaml") - ))) + worker_configfiles.extend( + sorted(glob.glob(os.path.join(worker_configdir, "*.yaml"))) + ) workers = [] for worker_configfile in worker_configfiles: @@ -213,14 +209,12 @@ def main(): if worker_app == "synapse.app.homeserver": # We need to special case all of this to pick up options that may # be set in the main config file or in this worker config file. - worker_pidfile = ( - worker_config.get("pid_file") - or pidfile + worker_pidfile = worker_config.get("pid_file") or pidfile + worker_cache_factor = ( + worker_config.get("synctl_cache_factor") or cache_factor ) - worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor worker_cache_factors = ( - worker_config.get("synctl_cache_factors") - or cache_factors + worker_config.get("synctl_cache_factors") or cache_factors ) daemonize = worker_config.get("daemonize") or config.get("daemonize") assert daemonize, "Main process must have daemonize set to true" @@ -229,19 +223,27 @@ def main(): for key in worker_config: if key == "worker_app": # But we allow worker_app continue - assert not key.startswith("worker_"), \ - "Main process cannot use worker_* config" + assert not key.startswith( + "worker_" + ), "Main process cannot use worker_* config" else: worker_pidfile = worker_config["worker_pid_file"] worker_daemonize = worker_config["worker_daemonize"] assert worker_daemonize, "In config %r: expected '%s' to be True" % ( - worker_configfile, "worker_daemonize") + worker_configfile, + "worker_daemonize", + ) worker_cache_factor = worker_config.get("synctl_cache_factor") worker_cache_factors = worker_config.get("synctl_cache_factors", {}) - workers.append(Worker( - worker_app, worker_configfile, worker_pidfile, worker_cache_factor, - worker_cache_factors, - )) + workers.append( + Worker( + worker_app, + worker_configfile, + worker_pidfile, + worker_cache_factor, + worker_cache_factors, + ) + ) action = options.action diff --git a/tests/handlers/test_e2e_room_keys.py b/tests/handlers/test_e2e_room_keys.py new file mode 100644 index 0000000000..9e08eac0a5 --- /dev/null +++ b/tests/handlers/test_e2e_room_keys.py @@ -0,0 +1,397 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy + +import mock + +from twisted.internet import defer + +import synapse.api.errors +import synapse.handlers.e2e_room_keys +import synapse.storage +from synapse.api import errors + +from tests import unittest, utils + +# sample room_key data for use in the tests +room_keys = { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": False, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } +} + + +class E2eRoomKeysHandlerTestCase(unittest.TestCase): + def __init__(self, *args, **kwargs): + super(E2eRoomKeysHandlerTestCase, self).__init__(*args, **kwargs) + self.hs = None # type: synapse.server.HomeServer + self.handler = None # type: synapse.handlers.e2e_keys.E2eRoomKeysHandler + + @defer.inlineCallbacks + def setUp(self): + self.hs = yield utils.setup_test_homeserver( + self.addCleanup, + handlers=None, + replication_layer=mock.Mock(), + ) + self.handler = synapse.handlers.e2e_room_keys.E2eRoomKeysHandler(self.hs) + self.local_user = "@boris:" + self.hs.hostname + + @defer.inlineCallbacks + def test_get_missing_current_version_info(self): + """Check that we get a 404 if we ask for info about the current version + if there is no version. + """ + res = None + try: + yield self.handler.get_version_info(self.local_user) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_get_missing_version_info(self): + """Check that we get a 404 if we ask for info about a specific version + if it doesn't exist. + """ + res = None + try: + yield self.handler.get_version_info(self.local_user, "bogus_version") + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_create_version(self): + """Check that we can create and then retrieve versions. + """ + res = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(res, "1") + + # check we can retrieve it as the current version + res = yield self.handler.get_version_info(self.local_user) + self.assertDictEqual(res, { + "version": "1", + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + + # check we can retrieve it as a specific version + res = yield self.handler.get_version_info(self.local_user, "1") + self.assertDictEqual(res, { + "version": "1", + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + + # upload a new one... + res = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "second_version_auth_data", + }) + self.assertEqual(res, "2") + + # check we can retrieve it as the current version + res = yield self.handler.get_version_info(self.local_user) + self.assertDictEqual(res, { + "version": "2", + "algorithm": "m.megolm_backup.v1", + "auth_data": "second_version_auth_data", + }) + + @defer.inlineCallbacks + def test_delete_missing_version(self): + """Check that we get a 404 on deleting nonexistent versions + """ + res = None + try: + yield self.handler.delete_version(self.local_user, "1") + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_delete_missing_current_version(self): + """Check that we get a 404 on deleting nonexistent current version + """ + res = None + try: + yield self.handler.delete_version(self.local_user) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_delete_version(self): + """Check that we can create and then delete versions. + """ + res = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(res, "1") + + # check we can delete it + yield self.handler.delete_version(self.local_user, "1") + + # check that it's gone + res = None + try: + yield self.handler.get_version_info(self.local_user, "1") + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_get_missing_room_keys(self): + """Check that we get a 404 on querying missing room_keys + """ + res = None + try: + yield self.handler.get_room_keys(self.local_user, "bogus_version") + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + # check we also get a 404 even if the version is valid + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(version, "1") + + res = None + try: + yield self.handler.get_room_keys(self.local_user, version) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + # TODO: test the locking semantics when uploading room_keys, + # although this is probably best done in sytest + + @defer.inlineCallbacks + def test_upload_room_keys_no_versions(self): + """Check that we get a 404 on uploading keys when no versions are defined + """ + res = None + try: + yield self.handler.upload_room_keys(self.local_user, "no_version", room_keys) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_upload_room_keys_bogus_version(self): + """Check that we get a 404 on uploading keys when an nonexistent version + is specified + """ + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(version, "1") + + res = None + try: + yield self.handler.upload_room_keys( + self.local_user, "bogus_version", room_keys + ) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + @defer.inlineCallbacks + def test_upload_room_keys_wrong_version(self): + """Check that we get a 403 on uploading keys for an old version + """ + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(version, "1") + + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "second_version_auth_data", + }) + self.assertEqual(version, "2") + + res = None + try: + yield self.handler.upload_room_keys(self.local_user, "1", room_keys) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 403) + + @defer.inlineCallbacks + def test_upload_room_keys_insert(self): + """Check that we can insert and retrieve keys for a session + """ + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(version, "1") + + yield self.handler.upload_room_keys(self.local_user, version, room_keys) + + res = yield self.handler.get_room_keys(self.local_user, version) + self.assertDictEqual(res, room_keys) + + # check getting room_keys for a given room + res = yield self.handler.get_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org" + ) + self.assertDictEqual(res, room_keys) + + # check getting room_keys for a given session_id + res = yield self.handler.get_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org", + session_id="c0ff33", + ) + self.assertDictEqual(res, room_keys) + + @defer.inlineCallbacks + def test_upload_room_keys_merge(self): + """Check that we can upload a new room_key for an existing session and + have it correctly merged""" + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(version, "1") + + yield self.handler.upload_room_keys(self.local_user, version, room_keys) + + new_room_keys = copy.deepcopy(room_keys) + new_room_key = new_room_keys['rooms']['!abc:matrix.org']['sessions']['c0ff33'] + + # test that increasing the message_index doesn't replace the existing session + new_room_key['first_message_index'] = 2 + new_room_key['session_data'] = 'new' + yield self.handler.upload_room_keys(self.local_user, version, new_room_keys) + + res = yield self.handler.get_room_keys(self.local_user, version) + self.assertEqual( + res['rooms']['!abc:matrix.org']['sessions']['c0ff33']['session_data'], + "SSBBTSBBIEZJU0gK" + ) + + # test that marking the session as verified however /does/ replace it + new_room_key['is_verified'] = True + yield self.handler.upload_room_keys(self.local_user, version, new_room_keys) + + res = yield self.handler.get_room_keys(self.local_user, version) + self.assertEqual( + res['rooms']['!abc:matrix.org']['sessions']['c0ff33']['session_data'], + "new" + ) + + # test that a session with a higher forwarded_count doesn't replace one + # with a lower forwarding count + new_room_key['forwarded_count'] = 2 + new_room_key['session_data'] = 'other' + yield self.handler.upload_room_keys(self.local_user, version, new_room_keys) + + res = yield self.handler.get_room_keys(self.local_user, version) + self.assertEqual( + res['rooms']['!abc:matrix.org']['sessions']['c0ff33']['session_data'], + "new" + ) + + # TODO: check edge cases as well as the common variations here + + @defer.inlineCallbacks + def test_delete_room_keys(self): + """Check that we can insert and delete keys for a session + """ + version = yield self.handler.create_version(self.local_user, { + "algorithm": "m.megolm_backup.v1", + "auth_data": "first_version_auth_data", + }) + self.assertEqual(version, "1") + + # check for bulk-delete + yield self.handler.upload_room_keys(self.local_user, version, room_keys) + yield self.handler.delete_room_keys(self.local_user, version) + res = None + try: + yield self.handler.get_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org", + session_id="c0ff33", + ) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + # check for bulk-delete per room + yield self.handler.upload_room_keys(self.local_user, version, room_keys) + yield self.handler.delete_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org", + ) + res = None + try: + yield self.handler.get_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org", + session_id="c0ff33", + ) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) + + # check for bulk-delete per session + yield self.handler.upload_room_keys(self.local_user, version, room_keys) + yield self.handler.delete_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org", + session_id="c0ff33", + ) + res = None + try: + yield self.handler.get_room_keys( + self.local_user, + version, + room_id="!abc:matrix.org", + session_id="c0ff33", + ) + except errors.SynapseError as e: + res = e.code + self.assertEqual(res, 404) diff --git a/tests/handlers/test_roomlist.py b/tests/handlers/test_roomlist.py new file mode 100644 index 0000000000..61eebb6985 --- /dev/null +++ b/tests/handlers/test_roomlist.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.handlers.room_list import RoomListNextBatch + +import tests.unittest +import tests.utils + + +class RoomListTestCase(tests.unittest.TestCase): + """ Tests RoomList's RoomListNextBatch. """ + + def setUp(self): + pass + + def test_check_read_batch_tokens(self): + batch_token = RoomListNextBatch( + stream_ordering="abcdef", + public_room_stream_id="123", + current_limit=20, + direction_is_forward=True, + ).to_token() + next_batch = RoomListNextBatch.from_token(batch_token) + self.assertEquals(next_batch.stream_ordering, "abcdef") + self.assertEquals(next_batch.public_room_stream_id, "123") + self.assertEquals(next_batch.current_limit, 20) + self.assertEquals(next_batch.direction_is_forward, True) diff --git a/tox.ini b/tox.ini index 87b5e4782d..04d2f721bf 100644 --- a/tox.ini +++ b/tox.ini @@ -108,10 +108,10 @@ commands = [testenv:pep8] skip_install = True -basepython = python2.7 +basepython = python3.6 deps = flake8 -commands = /bin/sh -c "flake8 synapse tests {env:PEP8SUFFIX:}" +commands = /bin/sh -c "flake8 synapse tests scripts scripts-dev scripts/register_new_matrix_user scripts/synapse_port_db synctl {env:PEP8SUFFIX:}" [testenv:check_isort] skip_install = True |