diff options
102 files changed, 1159 insertions, 960 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml index ec3848b048..5395028426 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -23,99 +23,106 @@ jobs: - run: docker push matrixdotorg/synapse:latest - run: docker push matrixdotorg/synapse:latest-py3 sytestpy2: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy2postgres: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy2merged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs - + path: /logs sytestpy2postgresmerged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy2 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy2 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3postgres: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3merged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3 + - run: /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs sytestpy3postgresmerged: - machine: true + docker: + - image: matrixdotorg/sytest-synapsepy3 + working_directory: /src steps: - checkout - run: bash .circleci/merge_base_branch.sh - - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3 + - run: POSTGRES=1 /synapse_sytest.sh - store_artifacts: - path: ~/project/logs + path: /logs destination: logs - store_test_results: - path: logs + path: /logs workflows: version: 2 diff --git a/.circleci/merge_base_branch.sh b/.circleci/merge_base_branch.sh index 6b0bf3aa48..b2c8c40f4c 100755 --- a/.circleci/merge_base_branch.sh +++ b/.circleci/merge_base_branch.sh @@ -16,7 +16,7 @@ then GITBASE="develop" else # Get the reference, using the GitHub API - GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'` + GITBASE=`wget -O- https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'` fi # Show what we are before @@ -31,4 +31,4 @@ git fetch -u origin $GITBASE git merge --no-edit origin/$GITBASE # Show what we are after. -git show -s \ No newline at end of file +git show -s diff --git a/.travis.yml b/.travis.yml index 2077f6af72..7ee1a278db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,7 @@ matrix: - python: 2.7 env: TOX_ENV=packaging - - python: 2.7 + - python: 3.6 env: TOX_ENV=pep8 - python: 2.7 diff --git a/CHANGES.md b/CHANGES.md index 048b9f95db..fb98c934c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,58 @@ +Synapse 0.33.7 (2018-10-18) +=========================== + +**Warning**: This release removes the example email notification templates from +`res/templates` (they are now internal to the python package). This should only +affect you if you (a) deploy your Synapse instance from a git checkout or a +github snapshot URL, and (b) have email notifications enabled. + +If you have email notifications enabled, you should ensure that +`email.template_dir` is either configured to point at a directory where you +have installed customised templates, or leave it unset to use the default +templates. + +Synapse 0.33.7rc2 (2018-10-17) +============================== + +Features +-------- + +- Ship the example email templates as part of the package ([\#4052](https://github.com/matrix-org/synapse/issues/4052)) + +Bugfixes +-------- + +- Fix bug which made get_missing_events return too few events ([\#4045](https://github.com/matrix-org/synapse/issues/4045)) + + +Synapse 0.33.7rc1 (2018-10-15) +============================== + +Features +-------- + +- Add support for end-to-end key backup (MSC1687) ([\#4019](https://github.com/matrix-org/synapse/issues/4019)) + + +Bugfixes +-------- + +- Fix bug in event persistence logic which caused 'NoneType is not iterable' ([\#3995](https://github.com/matrix-org/synapse/issues/3995)) +- Fix exception in background metrics collection ([\#3996](https://github.com/matrix-org/synapse/issues/3996)) +- Fix exception handling in fetching remote profiles ([\#3997](https://github.com/matrix-org/synapse/issues/3997)) +- Fix handling of rejected threepid invites ([\#3999](https://github.com/matrix-org/synapse/issues/3999)) +- Workers now start on Python 3. ([\#4027](https://github.com/matrix-org/synapse/issues/4027)) +- Synapse now starts on Python 3.7. ([\#4033](https://github.com/matrix-org/synapse/issues/4033)) + + +Internal Changes +---------------- + +- Log exceptions in looping calls ([\#4008](https://github.com/matrix-org/synapse/issues/4008)) +- Optimisation for serving federation requests ([\#4017](https://github.com/matrix-org/synapse/issues/4017)) +- Add metric to count number of non-empty sync responses ([\#4022](https://github.com/matrix-org/synapse/issues/4022)) + + Synapse 0.33.6 (2018-10-04) =========================== diff --git a/MANIFEST.in b/MANIFEST.in index c6a37ac685..25cdf0a61b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -12,12 +12,12 @@ recursive-include synapse/storage/schema *.sql recursive-include synapse/storage/schema *.py recursive-include docs * -recursive-include res * recursive-include scripts * recursive-include scripts-dev * recursive-include synapse *.pyi recursive-include tests *.py +recursive-include synapse/res * recursive-include synapse/static *.css recursive-include synapse/static *.gif recursive-include synapse/static *.html diff --git a/README.rst b/README.rst index e1ea351f84..456a3d9d43 100644 --- a/README.rst +++ b/README.rst @@ -174,6 +174,12 @@ Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate a synapse server in a single Docker image, at https://hub.docker.com/r/avhost/docker-matrix/tags/ +Slavi Pantaleev has created an Ansible playbook, +which installs the offical Docker image of Matrix Synapse +along with many other Matrix-related services (Postgres database, riot-web, coturn, mxisd, SSL support, etc.). +For more details, see +https://github.com/spantaleev/matrix-docker-ansible-deploy + Configuring Synapse ------------------- diff --git a/UPGRADE.rst b/UPGRADE.rst index 6cf3169f75..55c77eedde 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -48,6 +48,19 @@ returned by the Client-Server API: # configured on port 443. curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:" +Upgrading to v0.33.7 +==================== + +This release removes the example email notification templates from +``res/templates`` (they are now internal to the python package). This should +only affect you if you (a) deploy your Synapse instance from a git checkout or +a github snapshot URL, and (b) have email notifications enabled. + +If you have email notifications enabled, you should ensure that +``email.template_dir`` is either configured to point at a directory where you +have installed customised templates, or leave it unset to use the default +templates. + Upgrading to v0.27.3 ==================== diff --git a/changelog.d/3698.misc b/changelog.d/3698.misc new file mode 100644 index 0000000000..12537e76f2 --- /dev/null +++ b/changelog.d/3698.misc @@ -0,0 +1 @@ +Add information about the [matrix-docker-ansible-deploy](https://github.com/spantaleev/matrix-docker-ansible-deploy) playbook diff --git a/changelog.d/3969.bugfix b/changelog.d/3969.bugfix new file mode 100644 index 0000000000..ca2759e91e --- /dev/null +++ b/changelog.d/3969.bugfix @@ -0,0 +1 @@ +Fix HTTP error response codes for federated group requests. diff --git a/changelog.d/3995.bugfix b/changelog.d/3995.bugfix deleted file mode 100644 index 2adc36756b..0000000000 --- a/changelog.d/3995.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix bug in event persistence logic which caused 'NoneType is not iterable' \ No newline at end of file diff --git a/changelog.d/3996.bugfix b/changelog.d/3996.bugfix deleted file mode 100644 index a056485ea1..0000000000 --- a/changelog.d/3996.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix exception in background metrics collection diff --git a/changelog.d/3997.bugfix b/changelog.d/3997.bugfix deleted file mode 100644 index b060ee8c18..0000000000 --- a/changelog.d/3997.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix exception handling in fetching remote profiles diff --git a/changelog.d/3999.bugfix b/changelog.d/3999.bugfix deleted file mode 100644 index dc3b2caffa..0000000000 --- a/changelog.d/3999.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix handling of rejected threepid invites diff --git a/changelog.d/4008.misc b/changelog.d/4008.misc deleted file mode 100644 index 5730210054..0000000000 --- a/changelog.d/4008.misc +++ /dev/null @@ -1 +0,0 @@ -Log exceptions in looping calls diff --git a/changelog.d/4017.misc b/changelog.d/4017.misc deleted file mode 100644 index b1ceb06560..0000000000 --- a/changelog.d/4017.misc +++ /dev/null @@ -1 +0,0 @@ -Optimisation for serving federation requests \ No newline at end of file diff --git a/changelog.d/4019.feature b/changelog.d/4019.feature deleted file mode 100644 index 49e066d269..0000000000 --- a/changelog.d/4019.feature +++ /dev/null @@ -1 +0,0 @@ -Add support for end-to-end key backup (MSC1687) diff --git a/changelog.d/4022.misc b/changelog.d/4022.misc deleted file mode 100644 index 5b0e136795..0000000000 --- a/changelog.d/4022.misc +++ /dev/null @@ -1 +0,0 @@ -Add metric to count number of non-empty sync responses diff --git a/changelog.d/4027.bugfix b/changelog.d/4027.bugfix deleted file mode 100644 index c9bbff3f68..0000000000 --- a/changelog.d/4027.bugfix +++ /dev/null @@ -1 +0,0 @@ -Workers now start on Python 3. diff --git a/changelog.d/4033.bugfix b/changelog.d/4033.bugfix deleted file mode 100644 index 4e9e38cdcf..0000000000 --- a/changelog.d/4033.bugfix +++ /dev/null @@ -1,2 +0,0 @@ -Synapse now starts on Python 3.7. -_All_ workers now start on Python 3. diff --git a/changelog.d/4041.misc b/changelog.d/4041.misc new file mode 100644 index 0000000000..8cce9daac9 --- /dev/null +++ b/changelog.d/4041.misc @@ -0,0 +1 @@ +Run the CircleCI builds in docker containers diff --git a/changelog.d/4046.bugfix b/changelog.d/4046.bugfix new file mode 100644 index 0000000000..5046dd1ce3 --- /dev/null +++ b/changelog.d/4046.bugfix @@ -0,0 +1 @@ +Fix issue where Python 3 users couldn't paginate /publicRooms diff --git a/changelog.d/4049.misc b/changelog.d/4049.misc new file mode 100644 index 0000000000..4370d9dfa6 --- /dev/null +++ b/changelog.d/4049.misc @@ -0,0 +1 @@ +Only colourise synctl output when attached to tty diff --git a/changelog.d/4050.bugfix b/changelog.d/4050.bugfix new file mode 100644 index 0000000000..3d1f6af847 --- /dev/null +++ b/changelog.d/4050.bugfix @@ -0,0 +1 @@ +Fix URL priewing to work in Python 3.7 diff --git a/changelog.d/4057.bugfix b/changelog.d/4057.bugfix new file mode 100644 index 0000000000..7577731255 --- /dev/null +++ b/changelog.d/4057.bugfix @@ -0,0 +1 @@ +synctl will use the right python executable to run worker processes \ No newline at end of file diff --git a/changelog.d/4060.bugfix b/changelog.d/4060.bugfix new file mode 100644 index 0000000000..78d69a8819 --- /dev/null +++ b/changelog.d/4060.bugfix @@ -0,0 +1 @@ +Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting. diff --git a/changelog.d/4061.bugfix b/changelog.d/4061.bugfix new file mode 100644 index 0000000000..94ffcf7a51 --- /dev/null +++ b/changelog.d/4061.bugfix @@ -0,0 +1 @@ +Fix some metrics being racy and causing exceptions when polled by Prometheus. diff --git a/changelog.d/4063.misc b/changelog.d/4063.misc new file mode 100644 index 0000000000..677fcb90ad --- /dev/null +++ b/changelog.d/4063.misc @@ -0,0 +1 @@ +Refactor room alias creation code diff --git a/changelog.d/4067.bugfix b/changelog.d/4067.bugfix new file mode 100644 index 0000000000..78d69a8819 --- /dev/null +++ b/changelog.d/4067.bugfix @@ -0,0 +1 @@ +Manhole now works again on Python 3, instead of failing with a "couldn't match all kex parts" when connecting. diff --git a/changelog.d/4068.bugfix b/changelog.d/4068.bugfix new file mode 100644 index 0000000000..74bda7491f --- /dev/null +++ b/changelog.d/4068.bugfix @@ -0,0 +1 @@ +Fix bug which prevented email notifications from being sent unless an absolute path was given for `email_templates`. \ No newline at end of file diff --git a/changelog.d/4068.misc b/changelog.d/4068.misc new file mode 100644 index 0000000000..db6c4ade59 --- /dev/null +++ b/changelog.d/4068.misc @@ -0,0 +1 @@ +Make the Python scripts in the top-level scripts folders meet pep8 and pass flake8. diff --git a/changelog.d/4073.misc b/changelog.d/4073.misc new file mode 100644 index 0000000000..fc304bef06 --- /dev/null +++ b/changelog.d/4073.misc @@ -0,0 +1 @@ +Add psutil as an explicit dependency diff --git a/changelog.d/4074.bugfix b/changelog.d/4074.bugfix new file mode 100644 index 0000000000..b3b6b00243 --- /dev/null +++ b/changelog.d/4074.bugfix @@ -0,0 +1 @@ +Correctly account for cpu usage by background threads diff --git a/changelog.d/4075.misc b/changelog.d/4075.misc new file mode 100644 index 0000000000..d08b8cc271 --- /dev/null +++ b/changelog.d/4075.misc @@ -0,0 +1 @@ +Clean up threading and logcontexts in pushers \ No newline at end of file diff --git a/changelog.d/4076.misc b/changelog.d/4076.misc new file mode 100644 index 0000000000..9dd000decf --- /dev/null +++ b/changelog.d/4076.misc @@ -0,0 +1 @@ +Correctly manage logcontexts during startup to fix some "Unexpected logging context" warnings \ No newline at end of file diff --git a/changelog.d/4077.misc b/changelog.d/4077.misc new file mode 100644 index 0000000000..52ca4c1de2 --- /dev/null +++ b/changelog.d/4077.misc @@ -0,0 +1 @@ +Give some more things logcontexts diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index cfe88788f2..a38b929f50 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -211,7 +211,9 @@ email: require_transport_security: False notif_from: "{{ SYNAPSE_SMTP_FROM or "hostmaster@" + SYNAPSE_SERVER_NAME }}" app_name: Matrix - template_dir: res/templates + # if template_dir is unset, uses the example templates that are part of + # the Synapse distribution. + #template_dir: res/templates notif_template_html: notif_mail.html notif_template_text: notif_mail.txt notif_for_new_users: True diff --git a/scripts-dev/check_auth.py b/scripts-dev/check_auth.py index 4fa8792a5f..b3d11f49ec 100644 --- a/scripts-dev/check_auth.py +++ b/scripts-dev/check_auth.py @@ -1,21 +1,20 @@ -from synapse.events import FrozenEvent -from synapse.api.auth import Auth - -from mock import Mock +from __future__ import print_function import argparse import itertools import json import sys +from mock import Mock + +from synapse.api.auth import Auth +from synapse.events import FrozenEvent + def check_auth(auth, auth_chain, events): auth_chain.sort(key=lambda e: e.depth) - auth_map = { - e.event_id: e - for e in auth_chain - } + auth_map = {e.event_id: e for e in auth_chain} create_events = {} for e in auth_chain: @@ -25,31 +24,26 @@ def check_auth(auth, auth_chain, events): for e in itertools.chain(auth_chain, events): auth_events_list = [auth_map[i] for i, _ in e.auth_events] - auth_events = { - (e.type, e.state_key): e - for e in auth_events_list - } + auth_events = {(e.type, e.state_key): e for e in auth_events_list} auth_events[("m.room.create", "")] = create_events[e.room_id] try: auth.check(e, auth_events=auth_events) except Exception as ex: - print "Failed:", e.event_id, e.type, e.state_key - print "Auth_events:", auth_events - print ex - print json.dumps(e.get_dict(), sort_keys=True, indent=4) + print("Failed:", e.event_id, e.type, e.state_key) + print("Auth_events:", auth_events) + print(ex) + print(json.dumps(e.get_dict(), sort_keys=True, indent=4)) # raise - print "Success:", e.event_id, e.type, e.state_key + print("Success:", e.event_id, e.type, e.state_key) + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( - 'json', - nargs='?', - type=argparse.FileType('r'), - default=sys.stdin, + 'json', nargs='?', type=argparse.FileType('r'), default=sys.stdin ) args = parser.parse_args() diff --git a/scripts-dev/check_event_hash.py b/scripts-dev/check_event_hash.py index 7ccae34d48..8535f99697 100644 --- a/scripts-dev/check_event_hash.py +++ b/scripts-dev/check_event_hash.py @@ -1,10 +1,15 @@ -from synapse.crypto.event_signing import * -from unpaddedbase64 import encode_base64 - import argparse import hashlib -import sys import json +import logging +import sys + +from unpaddedbase64 import encode_base64 + +from synapse.crypto.event_signing import ( + check_event_content_hash, + compute_event_reference_hash, +) class dictobj(dict): @@ -24,27 +29,26 @@ class dictobj(dict): def main(): parser = argparse.ArgumentParser() - parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), - default=sys.stdin) + parser.add_argument( + "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin + ) args = parser.parse_args() logging.basicConfig() event_json = dictobj(json.load(args.input_json)) - algorithms = { - "sha256": hashlib.sha256, - } + algorithms = {"sha256": hashlib.sha256} for alg_name in event_json.hashes: if check_event_content_hash(event_json, algorithms[alg_name]): - print "PASS content hash %s" % (alg_name,) + print("PASS content hash %s" % (alg_name,)) else: - print "FAIL content hash %s" % (alg_name,) + print("FAIL content hash %s" % (alg_name,)) for algorithm in algorithms.values(): name, h_bytes = compute_event_reference_hash(event_json, algorithm) - print "Reference hash %s: %s" % (name, encode_base64(h_bytes)) + print("Reference hash %s: %s" % (name, encode_base64(h_bytes))) -if __name__=="__main__": - main() +if __name__ == "__main__": + main() diff --git a/scripts-dev/check_signature.py b/scripts-dev/check_signature.py index 079577908a..612f17ca7f 100644 --- a/scripts-dev/check_signature.py +++ b/scripts-dev/check_signature.py @@ -1,15 +1,15 @@ -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes, write_signing_keys -from unpaddedbase64 import decode_base64 - -import urllib2 +import argparse import json +import logging import sys +import urllib2 + import dns.resolver -import pprint -import argparse -import logging +from signedjson.key import decode_verify_key_bytes, write_signing_keys +from signedjson.sign import verify_signed_json +from unpaddedbase64 import decode_base64 + def get_targets(server_name): if ":" in server_name: @@ -23,6 +23,7 @@ def get_targets(server_name): except dns.resolver.NXDOMAIN: yield (server_name, 8448) + def get_server_keys(server_name, target, port): url = "https://%s:%i/_matrix/key/v1" % (target, port) keys = json.load(urllib2.urlopen(url)) @@ -33,12 +34,14 @@ def get_server_keys(server_name, target, port): verify_keys[key_id] = verify_key return verify_keys + def main(): parser = argparse.ArgumentParser() parser.add_argument("signature_name") - parser.add_argument("input_json", nargs="?", type=argparse.FileType('r'), - default=sys.stdin) + parser.add_argument( + "input_json", nargs="?", type=argparse.FileType('r'), default=sys.stdin + ) args = parser.parse_args() logging.basicConfig() @@ -48,24 +51,23 @@ def main(): for target, port in get_targets(server_name): try: keys = get_server_keys(server_name, target, port) - print "Using keys from https://%s:%s/_matrix/key/v1" % (target, port) + print("Using keys from https://%s:%s/_matrix/key/v1" % (target, port)) write_signing_keys(sys.stdout, keys.values()) break - except: + except Exception: logging.exception("Error talking to %s:%s", target, port) json_to_check = json.load(args.input_json) - print "Checking JSON:" + print("Checking JSON:") for key_id in json_to_check["signatures"][args.signature_name]: try: key = keys[key_id] verify_signed_json(json_to_check, args.signature_name, key) - print "PASS %s" % (key_id,) - except: + print("PASS %s" % (key_id,)) + except Exception: logging.exception("Check for key %s failed" % (key_id,)) - print "FAIL %s" % (key_id,) + print("FAIL %s" % (key_id,)) if __name__ == '__main__': main() - diff --git a/scripts-dev/convert_server_keys.py b/scripts-dev/convert_server_keys.py index 151551f22c..dde8596697 100644 --- a/scripts-dev/convert_server_keys.py +++ b/scripts-dev/convert_server_keys.py @@ -1,13 +1,21 @@ -import psycopg2 -import yaml -import sys +import hashlib import json +import sys import time -import hashlib -from unpaddedbase64 import encode_base64 + +import six + +import psycopg2 +import yaml +from canonicaljson import encode_canonical_json from signedjson.key import read_signing_keys from signedjson.sign import sign_json -from canonicaljson import encode_canonical_json +from unpaddedbase64 import encode_base64 + +if six.PY2: + db_type = six.moves.builtins.buffer +else: + db_type = memoryview def select_v1_keys(connection): @@ -39,7 +47,9 @@ def select_v2_json(connection): cursor.close() results = {} for server_name, key_id, key_json in rows: - results.setdefault(server_name, {})[key_id] = json.loads(str(key_json).decode("utf-8")) + results.setdefault(server_name, {})[key_id] = json.loads( + str(key_json).decode("utf-8") + ) return results @@ -47,10 +57,7 @@ def convert_v1_to_v2(server_name, valid_until, keys, certificate): return { "old_verify_keys": {}, "server_name": server_name, - "verify_keys": { - key_id: {"key": key} - for key_id, key in keys.items() - }, + "verify_keys": {key_id: {"key": key} for key_id, key in keys.items()}, "valid_until_ts": valid_until, "tls_fingerprints": [fingerprint(certificate)], } @@ -65,7 +72,7 @@ def rows_v2(server, json): valid_until = json["valid_until_ts"] key_json = encode_canonical_json(json) for key_id in json["verify_keys"]: - yield (server, key_id, "-", valid_until, valid_until, buffer(key_json)) + yield (server, key_id, "-", valid_until, valid_until, db_type(key_json)) def main(): @@ -87,7 +94,7 @@ def main(): result = {} for server in keys: - if not server in json: + if server not in json: v2_json = convert_v1_to_v2( server, valid_until, keys[server], certificates[server] ) @@ -96,10 +103,7 @@ def main(): yaml.safe_dump(result, sys.stdout, default_flow_style=False) - rows = list( - row for server, json in result.items() - for row in rows_v2(server, json) - ) + rows = list(row for server, json in result.items() for row in rows_v2(server, json)) cursor = connection.cursor() cursor.executemany( @@ -107,7 +111,7 @@ def main(): " server_name, key_id, from_server," " ts_added_ms, ts_valid_until_ms, key_json" ") VALUES (%s, %s, %s, %s, %s, %s)", - rows + rows, ) connection.commit() diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py index 47dac7772d..1deb0fe2b7 100755 --- a/scripts-dev/definitions.py +++ b/scripts-dev/definitions.py @@ -1,8 +1,16 @@ #! /usr/bin/python +from __future__ import print_function + +import argparse import ast +import os +import re +import sys + import yaml + class DefinitionVisitor(ast.NodeVisitor): def __init__(self): super(DefinitionVisitor, self).__init__() @@ -42,15 +50,18 @@ def non_empty(defs): functions = {name: non_empty(f) for name, f in defs['def'].items()} classes = {name: non_empty(f) for name, f in defs['class'].items()} result = {} - if functions: result['def'] = functions - if classes: result['class'] = classes + if functions: + result['def'] = functions + if classes: + result['class'] = classes names = defs['names'] uses = [] for name in names.get('Load', ()): if name not in names.get('Param', ()) and name not in names.get('Store', ()): uses.append(name) uses.extend(defs['attrs']) - if uses: result['uses'] = uses + if uses: + result['uses'] = uses result['names'] = names result['attrs'] = defs['attrs'] return result @@ -95,7 +106,6 @@ def used_names(prefix, item, defs, names): if __name__ == '__main__': - import sys, os, argparse, re parser = argparse.ArgumentParser(description='Find definitions.') parser.add_argument( @@ -105,24 +115,28 @@ if __name__ == '__main__': "--ignore", action="append", metavar="REGEXP", help="Ignore a pattern" ) parser.add_argument( - "--pattern", action="append", metavar="REGEXP", - help="Search for a pattern" + "--pattern", action="append", metavar="REGEXP", help="Search for a pattern" ) parser.add_argument( - "directories", nargs='+', metavar="DIR", - help="Directories to search for definitions" + "directories", + nargs='+', + metavar="DIR", + help="Directories to search for definitions", ) parser.add_argument( - "--referrers", default=0, type=int, - help="Include referrers up to the given depth" + "--referrers", + default=0, + type=int, + help="Include referrers up to the given depth", ) parser.add_argument( - "--referred", default=0, type=int, - help="Include referred down to the given depth" + "--referred", + default=0, + type=int, + help="Include referred down to the given depth", ) parser.add_argument( - "--format", default="yaml", - help="Output format, one of 'yaml' or 'dot'" + "--format", default="yaml", help="Output format, one of 'yaml' or 'dot'" ) args = parser.parse_args() @@ -162,7 +176,7 @@ if __name__ == '__main__': for used_by in entry.get("used", ()): referrers.add(used_by) for name, definition in names.items(): - if not name in referrers: + if name not in referrers: continue if ignore and any(pattern.match(name) for pattern in ignore): continue @@ -176,7 +190,7 @@ if __name__ == '__main__': for uses in entry.get("uses", ()): referred.add(uses) for name, definition in names.items(): - if not name in referred: + if name not in referred: continue if ignore and any(pattern.match(name) for pattern in ignore): continue @@ -185,12 +199,12 @@ if __name__ == '__main__': if args.format == 'yaml': yaml.dump(result, sys.stdout, default_flow_style=False) elif args.format == 'dot': - print "digraph {" + print("digraph {") for name, entry in result.items(): - print name + print(name) for used_by in entry.get("used", ()): if used_by in result: - print used_by, "->", name - print "}" + print(used_by, "->", name) + print("}") else: raise ValueError("Unknown format %r" % (args.format)) diff --git a/scripts-dev/dump_macaroon.py b/scripts-dev/dump_macaroon.py index fcc5568835..22b30fa78e 100755 --- a/scripts-dev/dump_macaroon.py +++ b/scripts-dev/dump_macaroon.py @@ -1,8 +1,11 @@ #!/usr/bin/env python2 -import pymacaroons +from __future__ import print_function + import sys +import pymacaroons + if len(sys.argv) == 1: sys.stderr.write("usage: %s macaroon [key]\n" % (sys.argv[0],)) sys.exit(1) @@ -11,14 +14,14 @@ macaroon_string = sys.argv[1] key = sys.argv[2] if len(sys.argv) > 2 else None macaroon = pymacaroons.Macaroon.deserialize(macaroon_string) -print macaroon.inspect() +print(macaroon.inspect()) -print "" +print("") verifier = pymacaroons.Verifier() verifier.satisfy_general(lambda c: True) try: verifier.verify(macaroon, key) - print "Signature is correct" + print("Signature is correct") except Exception as e: - print str(e) + print(str(e)) diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index d2acc7654d..2566ce7cef 100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -18,21 +18,21 @@ from __future__ import print_function import argparse +import base64 +import json +import sys from urlparse import urlparse, urlunparse import nacl.signing -import json -import base64 import requests -import sys - -from requests.adapters import HTTPAdapter import srvlookup import yaml +from requests.adapters import HTTPAdapter # uncomment the following to enable debug logging of http requests -#from httplib import HTTPConnection -#HTTPConnection.debuglevel = 1 +# from httplib import HTTPConnection +# HTTPConnection.debuglevel = 1 + def encode_base64(input_bytes): """Encode bytes as a base64 string without any padding.""" @@ -58,15 +58,15 @@ def decode_base64(input_string): def encode_canonical_json(value): return json.dumps( - value, - # Encode code-points outside of ASCII as UTF-8 rather than \u escapes - ensure_ascii=False, - # Remove unecessary white space. - separators=(',',':'), - # Sort the keys of dictionaries. - sort_keys=True, - # Encode the resulting unicode as UTF-8 bytes. - ).encode("UTF-8") + value, + # Encode code-points outside of ASCII as UTF-8 rather than \u escapes + ensure_ascii=False, + # Remove unecessary white space. + separators=(',', ':'), + # Sort the keys of dictionaries. + sort_keys=True, + # Encode the resulting unicode as UTF-8 bytes. + ).encode("UTF-8") def sign_json(json_object, signing_key, signing_name): @@ -88,6 +88,7 @@ def sign_json(json_object, signing_key, signing_name): NACL_ED25519 = "ed25519" + def decode_signing_key_base64(algorithm, version, key_base64): """Decode a base64 encoded signing key Args: @@ -143,14 +144,12 @@ def request_json(method, origin_name, origin_key, destination, path, content): authorization_headers = [] for key, sig in signed_json["signatures"][origin_name].items(): - header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( - origin_name, key, sig, - ) + header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (origin_name, key, sig) authorization_headers.append(bytes(header)) - print ("Authorization: %s" % header, file=sys.stderr) + print("Authorization: %s" % header, file=sys.stderr) dest = "matrix://%s%s" % (destination, path) - print ("Requesting %s" % dest, file=sys.stderr) + print("Requesting %s" % dest, file=sys.stderr) s = requests.Session() s.mount("matrix://", MatrixConnectionAdapter()) @@ -158,10 +157,7 @@ def request_json(method, origin_name, origin_key, destination, path, content): result = s.request( method=method, url=dest, - headers={ - "Host": destination, - "Authorization": authorization_headers[0] - }, + headers={"Host": destination, "Authorization": authorization_headers[0]}, verify=False, data=content, ) @@ -171,50 +167,50 @@ def request_json(method, origin_name, origin_key, destination, path, content): def main(): parser = argparse.ArgumentParser( - description= - "Signs and sends a federation request to a matrix homeserver", + description="Signs and sends a federation request to a matrix homeserver" ) parser.add_argument( - "-N", "--server-name", + "-N", + "--server-name", help="Name to give as the local homeserver. If unspecified, will be " - "read from the config file.", + "read from the config file.", ) parser.add_argument( - "-k", "--signing-key-path", + "-k", + "--signing-key-path", help="Path to the file containing the private ed25519 key to sign the " - "request with.", + "request with.", ) parser.add_argument( - "-c", "--config", + "-c", + "--config", default="homeserver.yaml", help="Path to server config file. Ignored if --server-name and " - "--signing-key-path are both given.", + "--signing-key-path are both given.", ) parser.add_argument( - "-d", "--destination", + "-d", + "--destination", default="matrix.org", help="name of the remote homeserver. We will do SRV lookups and " - "connect appropriately.", + "connect appropriately.", ) parser.add_argument( - "-X", "--method", + "-X", + "--method", help="HTTP method to use for the request. Defaults to GET if --data is" - "unspecified, POST if it is." + "unspecified, POST if it is.", ) - parser.add_argument( - "--body", - help="Data to send as the body of the HTTP request" - ) + parser.add_argument("--body", help="Data to send as the body of the HTTP request") parser.add_argument( - "path", - help="request path. We will add '/_matrix/federation/v1/' to this." + "path", help="request path. We will add '/_matrix/federation/v1/' to this." ) args = parser.parse_args() @@ -227,13 +223,15 @@ def main(): result = request_json( args.method, - args.server_name, key, args.destination, + args.server_name, + key, + args.destination, "/_matrix/federation/v1/" + args.path, content=args.body, ) json.dump(result, sys.stdout) - print ("") + print("") def read_args_from_config(args): @@ -253,7 +251,7 @@ class MatrixConnectionAdapter(HTTPAdapter): return s, 8448 if ":" in s: - out = s.rsplit(":",1) + out = s.rsplit(":", 1) try: port = int(out[1]) except ValueError: @@ -263,7 +261,7 @@ class MatrixConnectionAdapter(HTTPAdapter): try: srv = srvlookup.lookup("matrix", "tcp", s)[0] return srv.host, srv.port - except: + except Exception: return s, 8448 def get_connection(self, url, proxies=None): @@ -272,10 +270,9 @@ class MatrixConnectionAdapter(HTTPAdapter): (host, port) = self.lookup(parsed.netloc) netloc = "%s:%d" % (host, port) print("Connecting to %s" % (netloc,), file=sys.stderr) - url = urlunparse(( - "https", netloc, parsed.path, parsed.params, parsed.query, - parsed.fragment, - )) + url = urlunparse( + ("https", netloc, parsed.path, parsed.params, parsed.query, parsed.fragment) + ) return super(MatrixConnectionAdapter, self).get_connection(url, proxies) diff --git a/scripts-dev/hash_history.py b/scripts-dev/hash_history.py index 616d6a10e7..514d80fa60 100644 --- a/scripts-dev/hash_history.py +++ b/scripts-dev/hash_history.py @@ -1,23 +1,31 @@ -from synapse.storage.pdu import PduStore -from synapse.storage.signatures import SignatureStore -from synapse.storage._base import SQLBaseStore -from synapse.federation.units import Pdu -from synapse.crypto.event_signing import ( - add_event_pdu_content_hash, compute_pdu_event_reference_hash -) -from synapse.api.events.utils import prune_pdu -from unpaddedbase64 import encode_base64, decode_base64 -from canonicaljson import encode_canonical_json +from __future__ import print_function + import sqlite3 import sys +from unpaddedbase64 import decode_base64, encode_base64 + +from synapse.crypto.event_signing import ( + add_event_pdu_content_hash, + compute_pdu_event_reference_hash, +) +from synapse.federation.units import Pdu +from synapse.storage._base import SQLBaseStore +from synapse.storage.pdu import PduStore +from synapse.storage.signatures import SignatureStore + + class Store(object): _get_pdu_tuples = PduStore.__dict__["_get_pdu_tuples"] _get_pdu_content_hashes_txn = SignatureStore.__dict__["_get_pdu_content_hashes_txn"] _get_prev_pdu_hashes_txn = SignatureStore.__dict__["_get_prev_pdu_hashes_txn"] - _get_pdu_origin_signatures_txn = SignatureStore.__dict__["_get_pdu_origin_signatures_txn"] + _get_pdu_origin_signatures_txn = SignatureStore.__dict__[ + "_get_pdu_origin_signatures_txn" + ] _store_pdu_content_hash_txn = SignatureStore.__dict__["_store_pdu_content_hash_txn"] - _store_pdu_reference_hash_txn = SignatureStore.__dict__["_store_pdu_reference_hash_txn"] + _store_pdu_reference_hash_txn = SignatureStore.__dict__[ + "_store_pdu_reference_hash_txn" + ] _store_prev_pdu_hash_txn = SignatureStore.__dict__["_store_prev_pdu_hash_txn"] _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] @@ -26,9 +34,7 @@ store = Store() def select_pdus(cursor): - cursor.execute( - "SELECT pdu_id, origin FROM pdus ORDER BY depth ASC" - ) + cursor.execute("SELECT pdu_id, origin FROM pdus ORDER BY depth ASC") ids = cursor.fetchall() @@ -41,23 +47,30 @@ def select_pdus(cursor): for pdu in pdus: try: if pdu.prev_pdus: - print "PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus + print("PROCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus) for pdu_id, origin, hashes in pdu.prev_pdus: ref_alg, ref_hsh = reference_hashes[(pdu_id, origin)] hashes[ref_alg] = encode_base64(ref_hsh) - store._store_prev_pdu_hash_txn(cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh) - print "SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus + store._store_prev_pdu_hash_txn( + cursor, pdu.pdu_id, pdu.origin, pdu_id, origin, ref_alg, ref_hsh + ) + print("SUCCESS", pdu.pdu_id, pdu.origin, pdu.prev_pdus) pdu = add_event_pdu_content_hash(pdu) ref_alg, ref_hsh = compute_pdu_event_reference_hash(pdu) reference_hashes[(pdu.pdu_id, pdu.origin)] = (ref_alg, ref_hsh) - store._store_pdu_reference_hash_txn(cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh) + store._store_pdu_reference_hash_txn( + cursor, pdu.pdu_id, pdu.origin, ref_alg, ref_hsh + ) for alg, hsh_base64 in pdu.hashes.items(): - print alg, hsh_base64 - store._store_pdu_content_hash_txn(cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64)) + print(alg, hsh_base64) + store._store_pdu_content_hash_txn( + cursor, pdu.pdu_id, pdu.origin, alg, decode_base64(hsh_base64) + ) + + except Exception: + print("FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus) - except: - print "FAILED_", pdu.pdu_id, pdu.origin, pdu.prev_pdus def main(): conn = sqlite3.connect(sys.argv[1]) @@ -65,5 +78,6 @@ def main(): select_pdus(cursor) conn.commit() -if __name__=='__main__': + +if __name__ == '__main__': main() diff --git a/scripts-dev/list_url_patterns.py b/scripts-dev/list_url_patterns.py index 58d40c4ff4..da027be26e 100755 --- a/scripts-dev/list_url_patterns.py +++ b/scripts-dev/list_url_patterns.py @@ -1,18 +1,17 @@ #! /usr/bin/python -import ast import argparse +import ast import os import sys + import yaml PATTERNS_V1 = [] PATTERNS_V2 = [] -RESULT = { - "v1": PATTERNS_V1, - "v2": PATTERNS_V2, -} +RESULT = {"v1": PATTERNS_V1, "v2": PATTERNS_V2} + class CallVisitor(ast.NodeVisitor): def visit_Call(self, node): @@ -21,7 +20,6 @@ class CallVisitor(ast.NodeVisitor): else: return - if name == "client_path_patterns": PATTERNS_V1.append(node.args[0].s) elif name == "client_v2_patterns": @@ -42,8 +40,10 @@ def find_patterns_in_file(filepath): parser = argparse.ArgumentParser(description='Find url patterns.') parser.add_argument( - "directories", nargs='+', metavar="DIR", - help="Directories to search for definitions" + "directories", + nargs='+', + metavar="DIR", + help="Directories to search for definitions", ) args = parser.parse_args() diff --git a/scripts-dev/tail-synapse.py b/scripts-dev/tail-synapse.py index 18be711e92..1a36b94038 100644 --- a/scripts-dev/tail-synapse.py +++ b/scripts-dev/tail-synapse.py @@ -1,8 +1,9 @@ -import requests import collections +import json import sys import time -import json + +import requests Entry = collections.namedtuple("Entry", "name position rows") @@ -30,11 +31,11 @@ def parse_response(content): def replicate(server, streams): - return parse_response(requests.get( - server + "/_synapse/replication", - verify=False, - params=streams - ).content) + return parse_response( + requests.get( + server + "/_synapse/replication", verify=False, params=streams + ).content + ) def main(): @@ -45,7 +46,7 @@ def main(): try: streams = { row.name: row.position - for row in replicate(server, {"streams":"-1"})["streams"].rows + for row in replicate(server, {"streams": "-1"})["streams"].rows } except requests.exceptions.ConnectionError as e: time.sleep(0.1) @@ -53,8 +54,8 @@ def main(): while True: try: results = replicate(server, streams) - except: - sys.stdout.write("connection_lost("+ repr(streams) + ")\n") + except Exception: + sys.stdout.write("connection_lost(" + repr(streams) + ")\n") break for update in results.values(): for row in update.rows: @@ -62,6 +63,5 @@ def main(): streams[update.name] = update.position - -if __name__=='__main__': +if __name__ == '__main__': main() diff --git a/scripts/hash_password b/scripts/hash_password index 215ab25cfe..a62bb5aa83 100755 --- a/scripts/hash_password +++ b/scripts/hash_password @@ -1,12 +1,10 @@ #!/usr/bin/env python import argparse - +import getpass import sys import bcrypt -import getpass - import yaml bcrypt_rounds=12 @@ -52,4 +50,3 @@ if __name__ == "__main__": password = prompt_for_pass() print bcrypt.hashpw(password + password_pepper, bcrypt.gensalt(bcrypt_rounds)) - diff --git a/scripts/move_remote_media_to_new_store.py b/scripts/move_remote_media_to_new_store.py index 7914ead889..e630936f78 100755 --- a/scripts/move_remote_media_to_new_store.py +++ b/scripts/move_remote_media_to_new_store.py @@ -36,12 +36,9 @@ from __future__ import print_function import argparse import logging - -import sys - import os - import shutil +import sys from synapse.rest.media.v1.filepath import MediaFilePaths @@ -77,24 +74,23 @@ def move_media(origin_server, file_id, src_paths, dest_paths): if not os.path.exists(original_file): logger.warn( "Original for %s/%s (%s) does not exist", - origin_server, file_id, original_file, + origin_server, + file_id, + original_file, ) else: mkdir_and_move( - original_file, - dest_paths.remote_media_filepath(origin_server, file_id), + original_file, dest_paths.remote_media_filepath(origin_server, file_id) ) # now look for thumbnails - original_thumb_dir = src_paths.remote_media_thumbnail_dir( - origin_server, file_id, - ) + original_thumb_dir = src_paths.remote_media_thumbnail_dir(origin_server, file_id) if not os.path.exists(original_thumb_dir): return mkdir_and_move( original_thumb_dir, - dest_paths.remote_media_thumbnail_dir(origin_server, file_id) + dest_paths.remote_media_thumbnail_dir(origin_server, file_id), ) @@ -109,24 +105,16 @@ def mkdir_and_move(original_file, dest_file): if __name__ == "__main__": parser = argparse.ArgumentParser( - description=__doc__, - formatter_class = argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument( - "-v", action='store_true', help='enable debug logging') - parser.add_argument( - "src_repo", - help="Path to source content repo", - ) - parser.add_argument( - "dest_repo", - help="Path to source content repo", + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) + parser.add_argument("-v", action='store_true', help='enable debug logging') + parser.add_argument("src_repo", help="Path to source content repo") + parser.add_argument("dest_repo", help="Path to source content repo") args = parser.parse_args() logging_config = { "level": logging.DEBUG if args.v else logging.INFO, - "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s" + "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s", } logging.basicConfig(**logging_config) diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user index 91bdb3a25b..89143c5d59 100755 --- a/scripts/register_new_matrix_user +++ b/scripts/register_new_matrix_user @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function import argparse import getpass @@ -22,19 +23,23 @@ import hmac import json import sys import urllib2 + +from six import input + import yaml def request_registration(user, password, server_location, shared_secret, admin=False): req = urllib2.Request( "%s/_matrix/client/r0/admin/register" % (server_location,), - headers={'Content-Type': 'application/json'} + headers={'Content-Type': 'application/json'}, ) try: if sys.version_info[:3] >= (2, 7, 9): # As of version 2.7.9, urllib2 now checks SSL certs import ssl + f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) else: f = urllib2.urlopen(req) @@ -42,18 +47,15 @@ def request_registration(user, password, server_location, shared_secret, admin=F f.close() nonce = json.loads(body)["nonce"] except urllib2.HTTPError as e: - print "ERROR! Received %d %s" % (e.code, e.reason,) + print("ERROR! Received %d %s" % (e.code, e.reason)) if 400 <= e.code < 500: if e.info().type == "application/json": resp = json.load(e) if "error" in resp: - print resp["error"] + print(resp["error"]) sys.exit(1) - mac = hmac.new( - key=shared_secret, - digestmod=hashlib.sha1, - ) + mac = hmac.new(key=shared_secret, digestmod=hashlib.sha1) mac.update(nonce) mac.update("\x00") @@ -75,30 +77,31 @@ def request_registration(user, password, server_location, shared_secret, admin=F server_location = server_location.rstrip("/") - print "Sending registration request..." + print("Sending registration request...") req = urllib2.Request( "%s/_matrix/client/r0/admin/register" % (server_location,), data=json.dumps(data), - headers={'Content-Type': 'application/json'} + headers={'Content-Type': 'application/json'}, ) try: if sys.version_info[:3] >= (2, 7, 9): # As of version 2.7.9, urllib2 now checks SSL certs import ssl + f = urllib2.urlopen(req, context=ssl.SSLContext(ssl.PROTOCOL_SSLv23)) else: f = urllib2.urlopen(req) f.read() f.close() - print "Success." + print("Success.") except urllib2.HTTPError as e: - print "ERROR! Received %d %s" % (e.code, e.reason,) + print("ERROR! Received %d %s" % (e.code, e.reason)) if 400 <= e.code < 500: if e.info().type == "application/json": resp = json.load(e) if "error" in resp: - print resp["error"] + print(resp["error"]) sys.exit(1) @@ -106,35 +109,35 @@ def register_new_user(user, password, server_location, shared_secret, admin): if not user: try: default_user = getpass.getuser() - except: + except Exception: default_user = None if default_user: - user = raw_input("New user localpart [%s]: " % (default_user,)) + user = input("New user localpart [%s]: " % (default_user,)) if not user: user = default_user else: - user = raw_input("New user localpart: ") + user = input("New user localpart: ") if not user: - print "Invalid user name" + print("Invalid user name") sys.exit(1) if not password: password = getpass.getpass("Password: ") if not password: - print "Password cannot be blank." + print("Password cannot be blank.") sys.exit(1) confirm_password = getpass.getpass("Confirm password: ") if password != confirm_password: - print "Passwords do not match" + print("Passwords do not match") sys.exit(1) if admin is None: - admin = raw_input("Make admin [no]: ") + admin = input("Make admin [no]: ") if admin in ("y", "yes", "true"): admin = True else: @@ -146,42 +149,51 @@ def register_new_user(user, password, server_location, shared_secret, admin): if __name__ == "__main__": parser = argparse.ArgumentParser( description="Used to register new users with a given home server when" - " registration has been disabled. The home server must be" - " configured with the 'registration_shared_secret' option" - " set.", + " registration has been disabled. The home server must be" + " configured with the 'registration_shared_secret' option" + " set." ) parser.add_argument( - "-u", "--user", + "-u", + "--user", default=None, help="Local part of the new user. Will prompt if omitted.", ) parser.add_argument( - "-p", "--password", + "-p", + "--password", default=None, help="New password for user. Will prompt if omitted.", ) admin_group = parser.add_mutually_exclusive_group() admin_group.add_argument( - "-a", "--admin", + "-a", + "--admin", action="store_true", - help="Register new user as an admin. Will prompt if --no-admin is not set either.", + help=( + "Register new user as an admin. " + "Will prompt if --no-admin is not set either." + ), ) admin_group.add_argument( "--no-admin", action="store_true", - help="Register new user as a regular user. Will prompt if --admin is not set either.", + help=( + "Register new user as a regular user. " + "Will prompt if --admin is not set either." + ), ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( - "-c", "--config", + "-c", + "--config", type=argparse.FileType('r'), help="Path to server config file. Used to read in shared secret.", ) group.add_argument( - "-k", "--shared-secret", - help="Shared secret as defined in server config file.", + "-k", "--shared-secret", help="Shared secret as defined in server config file." ) parser.add_argument( @@ -189,7 +201,7 @@ if __name__ == "__main__": default="https://localhost:8448", nargs='?', help="URL to use to talk to the home server. Defaults to " - " 'https://localhost:8448'.", + " 'https://localhost:8448'.", ) args = parser.parse_args() @@ -198,7 +210,7 @@ if __name__ == "__main__": config = yaml.safe_load(args.config) secret = config.get("registration_shared_secret", None) if not secret: - print "No 'registration_shared_secret' defined in config." + print("No 'registration_shared_secret' defined in config.") sys.exit(1) else: secret = args.shared_secret diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index b9b828c154..2f6e69e552 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -15,23 +15,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.enterprise import adbapi - -from synapse.storage._base import LoggingTransaction, SQLBaseStore -from synapse.storage.engines import create_engine -from synapse.storage.prepare_database import prepare_database - import argparse import curses import logging import sys import time import traceback -import yaml from six import string_types +import yaml + +from twisted.enterprise import adbapi +from twisted.internet import defer, reactor + +from synapse.storage._base import LoggingTransaction, SQLBaseStore +from synapse.storage.engines import create_engine +from synapse.storage.prepare_database import prepare_database logger = logging.getLogger("synapse_port_db") @@ -105,6 +105,7 @@ class Store(object): *All* database interactions should go through this object. """ + def __init__(self, db_pool, engine): self.db_pool = db_pool self.database_engine = engine @@ -135,7 +136,8 @@ class Store(object): txn = conn.cursor() return func( LoggingTransaction(txn, desc, self.database_engine, [], []), - *args, **kwargs + *args, + **kwargs ) except self.database_engine.module.DatabaseError as e: if self.database_engine.is_deadlock(e): @@ -158,22 +160,20 @@ class Store(object): def r(txn): txn.execute(sql, args) return txn.fetchall() + return self.runInteraction("execute_sql", r) def insert_many_txn(self, txn, table, headers, rows): sql = "INSERT INTO %s (%s) VALUES (%s)" % ( table, ", ".join(k for k in headers), - ", ".join("%s" for _ in headers) + ", ".join("%s" for _ in headers), ) try: txn.executemany(sql, rows) - except: - logger.exception( - "Failed to insert: %s", - table, - ) + except Exception: + logger.exception("Failed to insert: %s", table) raise @@ -206,7 +206,7 @@ class Porter(object): "table_name": table, "forward_rowid": 1, "backward_rowid": 0, - } + }, ) forward_chunk = 1 @@ -221,10 +221,10 @@ class Porter(object): table, forward_chunk, backward_chunk ) else: + def delete_all(txn): txn.execute( - "DELETE FROM port_from_sqlite3 WHERE table_name = %s", - (table,) + "DELETE FROM port_from_sqlite3 WHERE table_name = %s", (table,) ) txn.execute("TRUNCATE %s CASCADE" % (table,)) @@ -232,11 +232,7 @@ class Porter(object): yield self.postgres_store._simple_insert( table="port_from_sqlite3", - values={ - "table_name": table, - "forward_rowid": 1, - "backward_rowid": 0, - } + values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0}, ) forward_chunk = 1 @@ -251,12 +247,16 @@ class Porter(object): ) @defer.inlineCallbacks - def handle_table(self, table, postgres_size, table_size, forward_chunk, - backward_chunk): + def handle_table( + self, table, postgres_size, table_size, forward_chunk, backward_chunk + ): logger.info( "Table %s: %i/%i (rows %i-%i) already ported", - table, postgres_size, table_size, - backward_chunk+1, forward_chunk-1, + table, + postgres_size, + table_size, + backward_chunk + 1, + forward_chunk - 1, ) if not table_size: @@ -271,7 +271,9 @@ class Porter(object): return if table in ( - "user_directory", "user_directory_search", "users_who_share_rooms", + "user_directory", + "user_directory_search", + "users_who_share_rooms", "users_in_pubic_room", ): # We don't port these tables, as they're a faff and we can regenreate @@ -283,37 +285,35 @@ class Porter(object): # We need to make sure there is a single row, `(X, null), as that is # what synapse expects to be there. yield self.postgres_store._simple_insert( - table=table, - values={"stream_id": None}, + table=table, values={"stream_id": None} ) self.progress.update(table, table_size) # Mark table as done return forward_select = ( - "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" - % (table,) + "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) ) backward_select = ( - "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" - % (table,) + "SELECT rowid, * FROM %s WHERE rowid <= ? ORDER BY rowid LIMIT ?" % (table,) ) do_forward = [True] do_backward = [True] while True: + def r(txn): forward_rows = [] backward_rows = [] if do_forward[0]: - txn.execute(forward_select, (forward_chunk, self.batch_size,)) + txn.execute(forward_select, (forward_chunk, self.batch_size)) forward_rows = txn.fetchall() if not forward_rows: do_forward[0] = False if do_backward[0]: - txn.execute(backward_select, (backward_chunk, self.batch_size,)) + txn.execute(backward_select, (backward_chunk, self.batch_size)) backward_rows = txn.fetchall() if not backward_rows: do_backward[0] = False @@ -325,9 +325,7 @@ class Porter(object): return headers, forward_rows, backward_rows - headers, frows, brows = yield self.sqlite_store.runInteraction( - "select", r - ) + headers, frows, brows = yield self.sqlite_store.runInteraction("select", r) if frows or brows: if frows: @@ -339,9 +337,7 @@ class Porter(object): rows = self._convert_rows(table, headers, rows) def insert(txn): - self.postgres_store.insert_many_txn( - txn, table, headers[1:], rows - ) + self.postgres_store.insert_many_txn(txn, table, headers[1:], rows) self.postgres_store._simple_update_one_txn( txn, @@ -362,8 +358,9 @@ class Porter(object): return @defer.inlineCallbacks - def handle_search_table(self, postgres_size, table_size, forward_chunk, - backward_chunk): + def handle_search_table( + self, postgres_size, table_size, forward_chunk, backward_chunk + ): select = ( "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" " FROM event_search as es" @@ -373,8 +370,9 @@ class Porter(object): ) while True: + def r(txn): - txn.execute(select, (forward_chunk, self.batch_size,)) + txn.execute(select, (forward_chunk, self.batch_size)) rows = txn.fetchall() headers = [column[0] for column in txn.description] @@ -402,18 +400,21 @@ class Porter(object): else: rows_dict.append(d) - txn.executemany(sql, [ - ( - row["event_id"], - row["room_id"], - row["key"], - row["sender"], - row["value"], - row["origin_server_ts"], - row["stream_ordering"], - ) - for row in rows_dict - ]) + txn.executemany( + sql, + [ + ( + row["event_id"], + row["room_id"], + row["key"], + row["sender"], + row["value"], + row["origin_server_ts"], + row["stream_ordering"], + ) + for row in rows_dict + ], + ) self.postgres_store._simple_update_one_txn( txn, @@ -437,7 +438,8 @@ class Porter(object): def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{ - k: v for k, v in db_config.get("args", {}).items() + k: v + for k, v in db_config.get("args", {}).items() if not k.startswith("cp_") } ) @@ -450,13 +452,11 @@ class Porter(object): def run(self): try: sqlite_db_pool = adbapi.ConnectionPool( - self.sqlite_config["name"], - **self.sqlite_config["args"] + self.sqlite_config["name"], **self.sqlite_config["args"] ) postgres_db_pool = adbapi.ConnectionPool( - self.postgres_config["name"], - **self.postgres_config["args"] + self.postgres_config["name"], **self.postgres_config["args"] ) sqlite_engine = create_engine(sqlite_config) @@ -465,9 +465,7 @@ class Porter(object): self.sqlite_store = Store(sqlite_db_pool, sqlite_engine) self.postgres_store = Store(postgres_db_pool, postgres_engine) - yield self.postgres_store.execute( - postgres_engine.check_database - ) + yield self.postgres_store.execute(postgres_engine.check_database) # Step 1. Set up databases. self.progress.set_state("Preparing SQLite3") @@ -477,6 +475,7 @@ class Porter(object): self.setup_db(postgres_config, postgres_engine) self.progress.set_state("Creating port tables") + def create_port_table(txn): txn.execute( "CREATE TABLE IF NOT EXISTS port_from_sqlite3 (" @@ -501,9 +500,7 @@ class Porter(object): ) try: - yield self.postgres_store.runInteraction( - "alter_table", alter_table - ) + yield self.postgres_store.runInteraction("alter_table", alter_table) except Exception as e: pass @@ -514,11 +511,7 @@ class Porter(object): # Step 2. Get tables. self.progress.set_state("Fetching tables") sqlite_tables = yield self.sqlite_store._simple_select_onecol( - table="sqlite_master", - keyvalues={ - "type": "table", - }, - retcol="name", + table="sqlite_master", keyvalues={"type": "table"}, retcol="name" ) postgres_tables = yield self.postgres_store._simple_select_onecol( @@ -545,18 +538,14 @@ class Porter(object): # Step 4. Do the copying. self.progress.set_state("Copying to postgres") yield defer.gatherResults( - [ - self.handle_table(*res) - for res in setup_res - ], - consumeErrors=True, + [self.handle_table(*res) for res in setup_res], consumeErrors=True ) # Step 5. Do final post-processing yield self._setup_state_group_id_seq() self.progress.done() - except: + except Exception: global end_error_exec_info end_error_exec_info = sys.exc_info() logger.exception("") @@ -566,9 +555,7 @@ class Porter(object): def _convert_rows(self, table, headers, rows): bool_col_names = BOOLEAN_COLUMNS.get(table, []) - bool_cols = [ - i for i, h in enumerate(headers) if h in bool_col_names - ] + bool_cols = [i for i, h in enumerate(headers) if h in bool_col_names] class BadValueException(Exception): pass @@ -577,18 +564,21 @@ class Porter(object): if j in bool_cols: return bool(col) elif isinstance(col, string_types) and "\0" in col: - logger.warn("DROPPING ROW: NUL value in table %s col %s: %r", table, headers[j], col) - raise BadValueException(); + logger.warn( + "DROPPING ROW: NUL value in table %s col %s: %r", + table, + headers[j], + col, + ) + raise BadValueException() return col outrows = [] for i, row in enumerate(rows): try: - outrows.append(tuple( - conv(j, col) - for j, col in enumerate(row) - if j > 0 - )) + outrows.append( + tuple(conv(j, col) for j, col in enumerate(row) if j > 0) + ) except BadValueException: pass @@ -616,9 +606,7 @@ class Porter(object): return headers, [r for r in rows if r[ts_ind] < yesterday] - headers, rows = yield self.sqlite_store.runInteraction( - "select", r, - ) + headers, rows = yield self.sqlite_store.runInteraction("select", r) rows = self._convert_rows("sent_transactions", headers, rows) @@ -639,7 +627,7 @@ class Porter(object): txn.execute( "SELECT rowid FROM sent_transactions WHERE ts >= ?" " ORDER BY rowid ASC LIMIT 1", - (yesterday,) + (yesterday,), ) rows = txn.fetchall() @@ -657,21 +645,17 @@ class Porter(object): "table_name": "sent_transactions", "forward_rowid": next_chunk, "backward_rowid": 0, - } + }, ) def get_sent_table_size(txn): txn.execute( - "SELECT count(*) FROM sent_transactions" - " WHERE ts >= ?", - (yesterday,) + "SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,) ) size, = txn.fetchone() return int(size) - remaining_count = yield self.sqlite_store.execute( - get_sent_table_size - ) + remaining_count = yield self.sqlite_store.execute(get_sent_table_size) total_count = remaining_count + inserted_rows @@ -680,13 +664,11 @@ class Porter(object): @defer.inlineCallbacks def _get_remaining_count_to_port(self, table, forward_chunk, backward_chunk): frows = yield self.sqlite_store.execute_sql( - "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), - forward_chunk, + "SELECT count(*) FROM %s WHERE rowid >= ?" % (table,), forward_chunk ) brows = yield self.sqlite_store.execute_sql( - "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), - backward_chunk, + "SELECT count(*) FROM %s WHERE rowid <= ?" % (table,), backward_chunk ) defer.returnValue(frows[0][0] + brows[0][0]) @@ -694,7 +676,7 @@ class Porter(object): @defer.inlineCallbacks def _get_already_ported_count(self, table): rows = yield self.postgres_store.execute_sql( - "SELECT count(*) FROM %s" % (table,), + "SELECT count(*) FROM %s" % (table,) ) defer.returnValue(rows[0][0]) @@ -717,22 +699,21 @@ class Porter(object): def _setup_state_group_id_seq(self): def r(txn): txn.execute("SELECT MAX(id) FROM state_groups") - next_id = txn.fetchone()[0]+1 - txn.execute( - "ALTER SEQUENCE state_group_id_seq RESTART WITH %s", - (next_id,), - ) + next_id = txn.fetchone()[0] + 1 + txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,)) + return self.postgres_store.runInteraction("setup_state_group_id_seq", r) ############################################## -###### The following is simply UI stuff ###### +# The following is simply UI stuff ############################################## class Progress(object): """Used to report progress of the port """ + def __init__(self): self.tables = {} @@ -758,6 +739,7 @@ class Progress(object): class CursesProgress(Progress): """Reports progress to a curses window """ + def __init__(self, stdscr): self.stdscr = stdscr @@ -801,7 +783,7 @@ class CursesProgress(Progress): duration = int(now) - int(self.start_time) minutes, seconds = divmod(duration, 60) - duration_str = '%02dm %02ds' % (minutes, seconds,) + duration_str = '%02dm %02ds' % (minutes, seconds) if self.finished: status = "Time spent: %s (Done!)" % (duration_str,) @@ -814,16 +796,12 @@ class CursesProgress(Progress): est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60) else: est_remaining_str = "Unknown" - status = ( - "Time spent: %s (est. remaining: %s)" - % (duration_str, est_remaining_str,) + status = "Time spent: %s (est. remaining: %s)" % ( + duration_str, + est_remaining_str, ) - self.stdscr.addstr( - 0, 0, - status, - curses.A_BOLD, - ) + self.stdscr.addstr(0, 0, status, curses.A_BOLD) max_len = max([len(t) for t in self.tables.keys()]) @@ -831,9 +809,7 @@ class CursesProgress(Progress): middle_space = 1 items = self.tables.items() - items.sort( - key=lambda i: (i[1]["perc"], i[0]), - ) + items.sort(key=lambda i: (i[1]["perc"], i[0])) for i, (table, data) in enumerate(items): if i + 2 >= rows: @@ -844,9 +820,7 @@ class CursesProgress(Progress): color = curses.color_pair(2) if perc == 100 else curses.color_pair(1) self.stdscr.addstr( - i + 2, left_margin + max_len - len(table), - table, - curses.A_BOLD | color, + i + 2, left_margin + max_len - len(table), table, curses.A_BOLD | color ) size = 20 @@ -857,15 +831,13 @@ class CursesProgress(Progress): ) self.stdscr.addstr( - i + 2, left_margin + max_len + middle_space, + i + 2, + left_margin + max_len + middle_space, "%s %3d%% (%d/%d)" % (progress, perc, data["num_done"], data["total"]), ) if self.finished: - self.stdscr.addstr( - rows - 1, 0, - "Press any key to exit...", - ) + self.stdscr.addstr(rows - 1, 0, "Press any key to exit...") self.stdscr.refresh() self.last_update = time.time() @@ -877,29 +849,25 @@ class CursesProgress(Progress): def set_state(self, state): self.stdscr.clear() - self.stdscr.addstr( - 0, 0, - state + "...", - curses.A_BOLD, - ) + self.stdscr.addstr(0, 0, state + "...", curses.A_BOLD) self.stdscr.refresh() class TerminalProgress(Progress): """Just prints progress to the terminal """ + def update(self, table, num_done): super(TerminalProgress, self).update(table, num_done) data = self.tables[table] - print "%s: %d%% (%d/%d)" % ( - table, data["perc"], - data["num_done"], data["total"], + print( + "%s: %d%% (%d/%d)" % (table, data["perc"], data["num_done"], data["total"]) ) def set_state(self, state): - print state + "..." + print(state + "...") ############################################## @@ -909,34 +877,38 @@ class TerminalProgress(Progress): if __name__ == "__main__": parser = argparse.ArgumentParser( description="A script to port an existing synapse SQLite database to" - " a new PostgreSQL database." + " a new PostgreSQL database." ) parser.add_argument("-v", action='store_true') parser.add_argument( - "--sqlite-database", required=True, + "--sqlite-database", + required=True, help="The snapshot of the SQLite database file. This must not be" - " currently used by a running synapse server" + " currently used by a running synapse server", ) parser.add_argument( - "--postgres-config", type=argparse.FileType('r'), required=True, - help="The database config file for the PostgreSQL database" + "--postgres-config", + type=argparse.FileType('r'), + required=True, + help="The database config file for the PostgreSQL database", ) parser.add_argument( - "--curses", action='store_true', - help="display a curses based progress UI" + "--curses", action='store_true', help="display a curses based progress UI" ) parser.add_argument( - "--batch-size", type=int, default=1000, + "--batch-size", + type=int, + default=1000, help="The number of rows to select from the SQLite table each" - " iteration [default=1000]", + " iteration [default=1000]", ) args = parser.parse_args() logging_config = { "level": logging.DEBUG if args.v else logging.INFO, - "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s" + "format": "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s", } if args.curses: diff --git a/setup.py b/setup.py index b00c2af367..00b69c43f5 100755 --- a/setup.py +++ b/setup.py @@ -1,6 +1,8 @@ #!/usr/bin/env python -# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2014-2017 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd +# Copyright 2017-2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -86,7 +88,7 @@ setup( name="matrix-synapse", version=version, packages=find_packages(exclude=["tests", "tests.*"]), - description="Reference Synapse Home Server", + description="Reference homeserver for the Matrix decentralised comms protocol", install_requires=dependencies['requirements'](include_conditional=True).keys(), dependency_links=dependencies["DEPENDENCY_LINKS"].values(), include_package_data=True, diff --git a/synapse/__init__.py b/synapse/__init__.py index 43c5821ade..1ddbbbebfb 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -27,4 +27,4 @@ try: except ImportError: pass -__version__ = "0.33.6" +__version__ = "0.33.7" diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e3f0d99a3f..0b85b377e3 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,7 @@ import sys from six import iteritems +import psutil from prometheus_client import Gauge from twisted.application import service @@ -502,7 +503,6 @@ def run(hs): def performance_stats_init(): try: - import psutil process = psutil.Process() # Ensure we can fetch both, and make the initial request for cpu_percent # so the next request will use this as the initial point. @@ -510,12 +510,9 @@ def run(hs): process.cpu_percent(interval=None) logger.info("report_stats can use psutil") stats_process.append(process) - except (ImportError, AttributeError): - logger.warn( - "report_stats enabled but psutil is not installed or incorrect version." - " Disabling reporting of memory/cpu stats." - " Ensuring psutil is available will help matrix.org track performance" - " changes across releases." + except (AttributeError): + logger.warning( + "Unable to read memory/cpu stats. Disabling reporting." ) def generate_user_daily_visit_stats(): @@ -530,10 +527,13 @@ def run(hs): clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) # monthly active user limiting functionality - clock.looping_call( - hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60 - ) - hs.get_datastore().reap_monthly_active_users() + def reap_monthly_active_users(): + return run_as_background_process( + "reap_monthly_active_users", + hs.get_datastore().reap_monthly_active_users, + ) + clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60) + reap_monthly_active_users() @defer.inlineCallbacks def generate_monthly_active_users(): @@ -547,12 +547,23 @@ def run(hs): registered_reserved_users_mau_gauge.set(float(reserved_count)) max_mau_gauge.set(float(hs.config.max_mau_value)) - hs.get_datastore().initialise_reserved_users( - hs.config.mau_limits_reserved_threepids + def start_generate_monthly_active_users(): + return run_as_background_process( + "generate_monthly_active_users", + generate_monthly_active_users, + ) + + # XXX is this really supposed to be a background process? it looks + # like it needs to complete before some of the other stuff runs. + run_as_background_process( + "initialise_reserved_users", + hs.get_datastore().initialise_reserved_users, + hs.config.mau_limits_reserved_threepids, ) - generate_monthly_active_users() + + start_generate_monthly_active_users() if hs.config.limit_usage_by_mau: - clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000) # End of monthly active user settings if hs.config.report_stats: @@ -568,7 +579,7 @@ def run(hs): clock.call_later(5 * 60, start_phone_stats_home) if hs.config.daemonize and hs.config.print_pidfile: - print (hs.config.pid_file) + print(hs.config.pid_file) _base.start_reactor( "synapse-homeserver", diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 0f9f8e19f6..83b0863f00 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - self.pusher_pool.on_new_notifications( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - self.pusher_pool.on_new_receipts( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) except Exception: @@ -183,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler): def start_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) logger.info("Starting pusher %r / %r", user_id, key) - return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id) + return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id) def start(config_options): diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py index 8fccf573ee..79fe9c3dac 100644 --- a/synapse/config/__main__.py +++ b/synapse/config/__main__.py @@ -28,7 +28,7 @@ if __name__ == "__main__": sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) - print (getattr(config, key)) + print(getattr(config, key)) sys.exit(0) else: sys.stderr.write("Unknown command %r\n" % (action,)) diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 3d2e90dd5b..14dae65ea0 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -106,10 +106,7 @@ class Config(object): @classmethod def check_file(cls, file_path, config_name): if file_path is None: - raise ConfigError( - "Missing config for %s." - % (config_name,) - ) + raise ConfigError("Missing config for %s." % (config_name,)) try: os.stat(file_path) except OSError as e: @@ -128,9 +125,7 @@ class Config(object): if e.errno != errno.EEXIST: raise if not os.path.isdir(dir_path): - raise ConfigError( - "%s is not a directory" % (dir_path,) - ) + raise ConfigError("%s is not a directory" % (dir_path,)) return dir_path @classmethod @@ -156,21 +151,20 @@ class Config(object): return results def generate_config( - self, - config_dir_path, - server_name, - is_generating_file, - report_stats=None, + self, config_dir_path, server_name, is_generating_file, report_stats=None ): default_config = "# vim:ft=yaml\n" - default_config += "\n\n".join(dedent(conf) for conf in self.invoke_all( - "default_config", - config_dir_path=config_dir_path, - server_name=server_name, - is_generating_file=is_generating_file, - report_stats=report_stats, - )) + default_config += "\n\n".join( + dedent(conf) + for conf in self.invoke_all( + "default_config", + config_dir_path=config_dir_path, + server_name=server_name, + is_generating_file=is_generating_file, + report_stats=report_stats, + ) + ) config = yaml.load(default_config) @@ -178,23 +172,22 @@ class Config(object): @classmethod def load_config(cls, description, argv): - config_parser = argparse.ArgumentParser( - description=description, - ) + config_parser = argparse.ArgumentParser(description=description) config_parser.add_argument( - "-c", "--config-path", + "-c", + "--config-path", action="append", metavar="CONFIG_FILE", help="Specify config file. Can be given multiple times and" - " may specify directories containing *.yaml files." + " may specify directories containing *.yaml files.", ) config_parser.add_argument( "--keys-directory", metavar="DIRECTORY", help="Where files such as certs and signing keys are stored when" - " their location is given explicitly in the config." - " Defaults to the directory containing the last config file", + " their location is given explicitly in the config." + " Defaults to the directory containing the last config file", ) config_args = config_parser.parse_args(argv) @@ -203,9 +196,7 @@ class Config(object): obj = cls() obj.read_config_files( - config_files, - keys_directory=config_args.keys_directory, - generate_keys=False, + config_files, keys_directory=config_args.keys_directory, generate_keys=False ) return obj @@ -213,38 +204,38 @@ class Config(object): def load_or_generate_config(cls, description, argv): config_parser = argparse.ArgumentParser(add_help=False) config_parser.add_argument( - "-c", "--config-path", + "-c", + "--config-path", action="append", metavar="CONFIG_FILE", help="Specify config file. Can be given multiple times and" - " may specify directories containing *.yaml files." + " may specify directories containing *.yaml files.", ) config_parser.add_argument( "--generate-config", action="store_true", - help="Generate a config file for the server name" + help="Generate a config file for the server name", ) config_parser.add_argument( "--report-stats", action="store", help="Whether the generated config reports anonymized usage statistics", - choices=["yes", "no"] + choices=["yes", "no"], ) config_parser.add_argument( "--generate-keys", action="store_true", - help="Generate any missing key files then exit" + help="Generate any missing key files then exit", ) config_parser.add_argument( "--keys-directory", metavar="DIRECTORY", help="Used with 'generate-*' options to specify where files such as" - " certs and signing keys should be stored in, unless explicitly" - " specified in the config." + " certs and signing keys should be stored in, unless explicitly" + " specified in the config.", ) config_parser.add_argument( - "-H", "--server-name", - help="The server name to generate a config file for" + "-H", "--server-name", help="The server name to generate a config file for" ) config_args, remaining_args = config_parser.parse_known_args(argv) @@ -257,8 +248,8 @@ class Config(object): if config_args.generate_config: if config_args.report_stats is None: config_parser.error( - "Please specify either --report-stats=yes or --report-stats=no\n\n" + - MISSING_REPORT_STATS_SPIEL + "Please specify either --report-stats=yes or --report-stats=no\n\n" + + MISSING_REPORT_STATS_SPIEL ) if not config_files: config_parser.error( @@ -287,26 +278,32 @@ class Config(object): config_dir_path=config_dir_path, server_name=server_name, report_stats=(config_args.report_stats == "yes"), - is_generating_file=True + is_generating_file=True, ) obj.invoke_all("generate_files", config) config_file.write(config_str) - print(( - "A config file has been generated in %r for server name" - " %r with corresponding SSL keys and self-signed" - " certificates. Please review this file and customise it" - " to your needs." - ) % (config_path, server_name)) + print( + ( + "A config file has been generated in %r for server name" + " %r with corresponding SSL keys and self-signed" + " certificates. Please review this file and customise it" + " to your needs." + ) + % (config_path, server_name) + ) print( "If this server name is incorrect, you will need to" " regenerate the SSL certificates" ) return else: - print(( - "Config file %r already exists. Generating any missing key" - " files." - ) % (config_path,)) + print( + ( + "Config file %r already exists. Generating any missing key" + " files." + ) + % (config_path,) + ) generate_keys = True parser = argparse.ArgumentParser( @@ -338,8 +335,7 @@ class Config(object): return obj - def read_config_files(self, config_files, keys_directory=None, - generate_keys=False): + def read_config_files(self, config_files, keys_directory=None, generate_keys=False): if not keys_directory: keys_directory = os.path.dirname(config_files[-1]) @@ -364,8 +360,9 @@ class Config(object): if "report_stats" not in config: raise ConfigError( - MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + "\n" + - MISSING_REPORT_STATS_SPIEL + MISSING_REPORT_STATS_CONFIG_INSTRUCTIONS + + "\n" + + MISSING_REPORT_STATS_SPIEL ) if generate_keys: @@ -399,16 +396,16 @@ def find_config_files(search_paths): for entry in os.listdir(config_path): entry_path = os.path.join(config_path, entry) if not os.path.isfile(entry_path): - print ( - "Found subdirectory in config directory: %r. IGNORING." - ) % (entry_path, ) + err = "Found subdirectory in config directory: %r. IGNORING." + print(err % (entry_path,)) continue if not entry.endswith(".yaml"): - print ( - "Found file in config directory that does not" - " end in '.yaml': %r. IGNORING." - ) % (entry_path, ) + err = ( + "Found file in config directory that does not end in " + "'.yaml': %r. IGNORING." + ) + print(err % (entry_path,)) continue files.append(entry_path) diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index fe156b6930..93d70cff14 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -13,10 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import print_function + # This file can't be called email.py because if it is, we cannot: import email.utils +import logging +import os + +import pkg_resources -from ._base import Config +from ._base import Config, ConfigError + +logger = logging.getLogger(__name__) class EmailConfig(Config): @@ -38,7 +46,6 @@ class EmailConfig(Config): "smtp_host", "smtp_port", "notif_from", - "template_dir", "notif_template_html", "notif_template_text", ] @@ -62,9 +69,26 @@ class EmailConfig(Config): self.email_smtp_host = email_config["smtp_host"] self.email_smtp_port = email_config["smtp_port"] self.email_notif_from = email_config["notif_from"] - self.email_template_dir = email_config["template_dir"] self.email_notif_template_html = email_config["notif_template_html"] self.email_notif_template_text = email_config["notif_template_text"] + + template_dir = email_config.get("template_dir") + # we need an absolute path, because we change directory after starting (and + # we don't yet know what auxilliary templates like mail.css we will need). + # (Note that loading as package_resources with jinja.PackageLoader doesn't + # work for the same reason.) + if not template_dir: + template_dir = pkg_resources.resource_filename( + 'synapse', 'res/templates' + ) + template_dir = os.path.abspath(template_dir) + + for f in self.email_notif_template_text, self.email_notif_template_html: + p = os.path.join(template_dir, f) + if not os.path.isfile(p): + raise ConfigError("Unable to find email template file %s" % (p, )) + self.email_template_dir = template_dir + self.email_notif_for_new_users = email_config.get( "notif_for_new_users", True ) @@ -113,7 +137,9 @@ class EmailConfig(Config): # require_transport_security: False # notif_from: "Your Friendly %(app)s Home Server <noreply@example.com>" # app_name: Matrix - # template_dir: res/templates + # # if template_dir is unset, uses the example templates that are part of + # # the Synapse distribution. + # #template_dir: res/templates # notif_template_html: notif_mail.html # notif_template_text: notif_mail.txt # notif_for_new_users: True diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 819e8f7331..4efe95faa4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -507,19 +507,19 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): + latest_events, limit): with (yield self._server_linearizer.queue((origin, room_id))): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) logger.info( "on_get_missing_events: earliest_events: %r, latest_events: %r," - " limit: %d, min_depth: %d", - earliest_events, latest_events, limit, min_depth + " limit: %d", + earliest_events, latest_events, limit, ) missing_events = yield self.handler.on_get_missing_events( - origin, room_id, earliest_events, latest_events, limit, min_depth + origin, room_id, earliest_events, latest_events, limit, ) if len(missing_events) < 5: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2f874b4838..7288d49074 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -560,7 +560,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): limit = int(content.get("limit", 10)) - min_depth = int(content.get("min_depth", 0)) earliest_events = content.get("earliest_events", []) latest_events = content.get("latest_events", []) @@ -569,7 +568,6 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, - min_depth=min_depth, limit=limit, ) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 2a5eab124f..329e3c7d71 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -22,7 +22,7 @@ import bcrypt import pymacaroons from canonicaljson import json -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.client import PartialDownloadError import synapse.util.stringutils as stringutils @@ -37,8 +37,8 @@ from synapse.api.errors import ( ) from synapse.module_api import ModuleApi from synapse.types import UserID +from synapse.util import logcontext from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable from ._base import BaseHandler @@ -884,11 +884,7 @@ class AuthHandler(BaseHandler): bcrypt.gensalt(self.bcrypt_rounds), ).decode('ascii') - return make_deferred_yieldable( - threads.deferToThreadPool( - self.hs.get_reactor(), self.hs.get_reactor().getThreadPool(), _do_hash - ), - ) + return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -913,13 +909,7 @@ class AuthHandler(BaseHandler): if not isinstance(stored_hash, bytes): stored_hash = stored_hash.encode('ascii') - return make_deferred_yieldable( - threads.deferToThreadPool( - self.hs.get_reactor(), - self.hs.get_reactor().getThreadPool(), - _do_validate_hash, - ), - ) + return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash) else: return defer.succeed(False) diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index b078df4a76..75fe50c42c 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -17,8 +17,8 @@ import logging from twisted.internet import defer from synapse.api.errors import SynapseError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID, create_requester -from synapse.util.logcontext import run_in_background from ._base import BaseHandler @@ -121,7 +121,7 @@ class DeactivateAccountHandler(BaseHandler): None """ if not self._user_parter_running: - run_in_background(self._user_parter_loop) + run_as_background_process("user_parter_loop", self._user_parter_loop) @defer.inlineCallbacks def _user_parter_loop(self): diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 18741c5fac..02f12f6645 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -80,42 +80,60 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def create_association(self, user_id, room_alias, room_id, servers=None): - # association creation for human users - # TODO(erikj): Do user auth. + def create_association(self, requester, room_alias, room_id, servers=None, + send_event=True): + """Attempt to create a new alias - if not self.spam_checker.user_may_create_room_alias(user_id, room_alias): - raise SynapseError( - 403, "This user is not permitted to create this alias", - ) + Args: + requester (Requester) + room_alias (RoomAlias) + room_id (str) + servers (list[str]|None): List of servers that others servers + should try and join via + send_event (bool): Whether to send an updated m.room.aliases event - can_create = yield self.can_modify_alias( - room_alias, - user_id=user_id - ) - if not can_create: - raise SynapseError( - 400, "This alias is reserved by an application service.", - errcode=Codes.EXCLUSIVE - ) - yield self._create_association(room_alias, room_id, servers, creator=user_id) + Returns: + Deferred + """ - @defer.inlineCallbacks - def create_appservice_association(self, service, room_alias, room_id, - servers=None): - if not service.is_interested_in_alias(room_alias.to_string()): - raise SynapseError( - 400, "This application service has not reserved" - " this kind of alias.", errcode=Codes.EXCLUSIVE + user_id = requester.user.to_string() + + service = requester.app_service + if service: + if not service.is_interested_in_alias(room_alias.to_string()): + raise SynapseError( + 400, "This application service has not reserved" + " this kind of alias.", errcode=Codes.EXCLUSIVE + ) + else: + if not self.spam_checker.user_may_create_room_alias(user_id, room_alias): + raise AuthError( + 403, "This user is not permitted to create this alias", + ) + + can_create = yield self.can_modify_alias( + room_alias, + user_id=user_id ) + if not can_create: + raise AuthError( + 400, "This alias is reserved by an application service.", + errcode=Codes.EXCLUSIVE + ) - # association creation for app services - yield self._create_association(room_alias, room_id, servers) + yield self._create_association(room_alias, room_id, servers, creator=user_id) + if send_event: + yield self.send_room_alias_update_event( + requester, + room_id + ) @defer.inlineCallbacks - def delete_association(self, requester, user_id, room_alias): + def delete_association(self, requester, room_alias): # association deletion for human users + user_id = requester.user.to_string() + try: can_delete = yield self._user_can_delete_alias(room_alias, user_id) except StoreError as e: @@ -143,7 +161,6 @@ class DirectoryHandler(BaseHandler): try: yield self.send_room_alias_update_event( requester, - requester.user.to_string(), room_id ) @@ -261,7 +278,7 @@ class DirectoryHandler(BaseHandler): ) @defer.inlineCallbacks - def send_room_alias_update_event(self, requester, user_id, room_id): + def send_room_alias_update_event(self, requester, room_id): aliases = yield self.store.get_aliases_for_room(room_id) yield self.event_creation_handler.create_and_send_nonmember_event( @@ -270,7 +287,7 @@ class DirectoryHandler(BaseHandler): "type": EventTypes.Aliases, "state_key": self.hs.hostname, "room_id": room_id, - "sender": user_id, + "sender": requester.user.to_string(), "content": {"aliases": aliases}, }, ratelimit=False diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 1daa08df7e..cd5b9bbb19 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -309,8 +309,8 @@ class FederationHandler(BaseHandler): if sent_to_us_directly: logger.warn( - "[%s %s] Failed to fetch %d prev events: rejecting", - room_id, event_id, len(prevs - seen), + "[%s %s] Rejecting: failed to fetch %d prev events: %s", + room_id, event_id, len(prevs - seen), shortstr(prevs - seen) ) raise FederationError( "ERROR", @@ -452,8 +452,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "[%s %s]: Requesting %d prev_events: %s", - room_id, event_id, len(prevs - seen), shortstr(prevs - seen) + "[%s %s]: Requesting missing events between %s and %s", + room_id, event_id, shortstr(latest), event_id, ) # XXX: we set timeout to 10s to help workaround @@ -1852,7 +1852,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): + latest_events, limit): in_room = yield self.auth.check_host_in_room( room_id, origin @@ -1861,14 +1861,12 @@ class FederationHandler(BaseHandler): raise AuthError(403, "Host not in room.") limit = min(limit, 20) - min_depth = max(min_depth, 0) missing_events = yield self.store.get_missing_events( room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, limit=limit, - min_depth=min_depth, ) missing_events = yield filter_events_for_server( @@ -2522,7 +2520,7 @@ class FederationHandler(BaseHandler): if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: - self._notify_persisted_event(event, max_stream_id) + yield self._notify_persisted_event(event, max_stream_id) def _notify_persisted_event(self, event, max_stream_id): """Checks to see if notifier/pushers should be notified about the @@ -2555,7 +2553,7 @@ class FederationHandler(BaseHandler): extra_users=extra_users ) - self.pusher_pool.on_new_notifications( + return self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 53e5e2648b..173315af6c 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -20,7 +20,7 @@ from six import iteritems from twisted.internet import defer -from synapse.api.errors import SynapseError +from synapse.api.errors import HttpResponseException, SynapseError from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) @@ -37,9 +37,23 @@ def _create_rerouter(func_name): ) else: destination = get_domain_from_id(group_id) - return getattr(self.transport_client, func_name)( + d = getattr(self.transport_client, func_name)( destination, group_id, *args, **kwargs ) + + # Capture errors returned by the remote homeserver and + # re-throw specific errors as SynapseErrors. This is so + # when the remote end responds with things like 403 Not + # In Group, we can communicate that to the client instead + # of a 500. + def h(failure): + failure.trap(HttpResponseException) + e = failure.value + if e.code == 403: + raise e.to_synapse_error() + return failure + d.addErrback(h) + return d return f diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 4954b23a0d..6c4fcfb10a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -779,7 +779,7 @@ class EventCreationHandler(object): event, context=context ) - self.pusher_pool.on_new_notifications( + yield self.pusher_pool.on_new_notifications( event_stream_id, max_stream_id, ) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a6f3181f09..4c2690ba26 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler): "receipt_key", max_batch_id, rooms=affected_room_ids ) # Note that the min here shouldn't be relied upon to be accurate. - self.hs.get_pusherpool().on_new_receipts( + yield self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids, ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index c3f820b975..ab1571b27b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -190,10 +190,11 @@ class RoomCreationHandler(BaseHandler): if room_alias: directory_handler = self.hs.get_handlers().directory_handler yield directory_handler.create_association( - user_id=user_id, + requester=requester, room_id=room_id, room_alias=room_alias, servers=[self.hs.hostname], + send_event=False, ) preset_config = config.get( @@ -289,7 +290,7 @@ class RoomCreationHandler(BaseHandler): if room_alias: result["room_alias"] = room_alias.to_string() yield directory_handler.send_room_alias_update_event( - requester, user_id, room_id + requester, room_id ) defer.returnValue(result) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 38e1737ec9..dc88620885 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -16,7 +16,7 @@ import logging from collections import namedtuple -from six import iteritems +from six import PY3, iteritems from six.moves import range import msgpack @@ -444,9 +444,16 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( @classmethod def from_token(cls, token): + if PY3: + # The argument raw=False is only available on new versions of + # msgpack, and only really needed on Python 3. Gate it behind + # a PY3 check to avoid causing issues on Debian-packaged versions. + decoded = msgpack.loads(decode_base64(token), raw=False) + else: + decoded = msgpack.loads(decode_base64(token)) return RoomListNextBatch(**{ cls.REVERSE_KEY_DICT[key]: val - for key, val in msgpack.loads(decode_base64(token)).items() + for key, val in decoded.items() }) def to_token(self): diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index d8413d6aa7..f11b430126 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -20,6 +20,7 @@ from six import iteritems from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules, Membership +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo from synapse.types import get_localpart_from_id from synapse.util.metrics import Measure @@ -98,7 +99,6 @@ class UserDirectoryHandler(object): """ return self.store.search_user_dir(user_id, search_term, limit) - @defer.inlineCallbacks def notify_new_event(self): """Called when there may be more deltas to process """ @@ -108,11 +108,15 @@ class UserDirectoryHandler(object): if self._is_processing: return + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + self._is_processing = False + self._is_processing = True - try: - yield self._unsafe_process() - finally: - self._is_processing = False + run_as_background_process("user_directory.notify_new_event", process) @defer.inlineCallbacks def handle_local_profile_change(self, user_id, profile): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fcc02fc77d..24b6110c20 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object): Returns: Deferred: resolves with the http response object on success. - Fails with ``HTTPRequestException``: if we get an HTTP response + Fails with ``HttpResponseException``: if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -480,7 +480,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -534,7 +534,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -589,7 +589,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -640,7 +640,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -684,7 +684,7 @@ class MatrixFederationHttpClient(object): Deferred: resolves with an (int,dict) tuple of the file length and a dict of the response headers. - Fails with ``HTTPRequestException`` if we get an HTTP response code + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300 Fails with ``NotRetryingDestination`` if we are not yet ready diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index fedb4e6b18..62045a918b 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -39,7 +39,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", + "synapse_http_server_response_time_seconds", + "sec", ["method", "servlet", "tag", "code"], ) @@ -79,15 +80,11 @@ response_size = Counter( # than when the response was written. in_flight_requests_ru_utime = Counter( - "synapse_http_server_in_flight_requests_ru_utime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"] ) in_flight_requests_ru_stime = Counter( - "synapse_http_server_in_flight_requests_ru_stime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"] ) in_flight_requests_db_txn_count = Counter( @@ -134,7 +131,7 @@ def _get_in_flight_counts(): # type counts = {} for rm in reqs: - key = (rm.method, rm.name,) + key = (rm.method, rm.name) counts[key] = counts.get(key, 0) + 1 return counts @@ -175,7 +172,8 @@ class RequestMetrics(object): if context != self.start_context: logger.warn( "Context have unexpectedly changed %r, %r", - context, self.start_context + context, + self.start_context, ) return @@ -192,10 +190,10 @@ class RequestMetrics(object): resource_usage = context.get_resource_usage() response_ru_utime.labels(self.method, self.name, tag).inc( - resource_usage.ru_utime, + resource_usage.ru_utime ) response_ru_stime.labels(self.method, self.name, tag).inc( - resource_usage.ru_stime, + resource_usage.ru_stime ) response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count @@ -222,8 +220,15 @@ class RequestMetrics(object): diff = new_stats - self._request_stats self._request_stats = new_stats - in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) - in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) + # max() is used since rapid use of ru_stime/ru_utime can end up with the + # count going backwards due to NTP, time smearing, fine-grained + # correction, or floating points. Who knows, really? + in_flight_requests_ru_utime.labels(self.method, self.name).inc( + max(diff.ru_utime, 0) + ) + in_flight_requests_ru_stime.labels(self.method, self.name).inc( + max(diff.ru_stime, 0) + ) in_flight_requests_db_txn_count.labels(self.method, self.name).inc( diff.db_txn_count diff --git a/synapse/notifier.py b/synapse/notifier.py index 340b16ce25..de02b1017e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -186,9 +186,9 @@ class Notifier(object): def count_listeners(): all_user_streams = set() - for x in self.room_to_user_streams.values(): + for x in list(self.room_to_user_streams.values()): all_user_streams |= x - for x in self.user_to_user_stream.values(): + for x in list(self.user_to_user_stream.values()): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) @@ -196,7 +196,7 @@ class Notifier(object): LaterGauge( "synapse_notifier_rooms", "", [], - lambda: count(bool, self.room_to_user_streams.values()), + lambda: count(bool, list(self.room_to_user_streams.values())), ) LaterGauge( "synapse_notifier_users", "", [], diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index d746371420..f369124258 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,8 +18,7 @@ import logging from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled -from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure +from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) @@ -71,18 +70,11 @@ class EmailPusher(object): # See httppusher self.max_stream_ordering = None - self.processing = False + self._is_processing = False - @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - try: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() - except Exception: - logger.exception("Error starting email pusher") + self._start_processing() def on_stop(self): if self.timed_call: @@ -92,43 +84,52 @@ class EmailPusher(object): pass self.timed_call = None - @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) - yield self._process() + self._start_processing() def on_new_receipts(self, min_stream_id, max_stream_id): # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire - return defer.succeed(None) + pass - @defer.inlineCallbacks def on_timer(self): self.timed_call = None - yield self._process() + self._start_processing() + + def _start_processing(self): + if self._is_processing: + return + + run_as_background_process("emailpush.process", self._process) @defer.inlineCallbacks def _process(self): - if self.processing: - return + # we should never get here if we are already processing + assert not self._is_processing + + try: + self._is_processing = True + + if self.throttle_params is None: + # this is our first loop: load up the throttle params + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) - with LoggingContext("emailpush._process"): - with Measure(self.clock, "emailpush._process"): + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except Exception: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + yield self._unsafe_process() + except Exception: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self._is_processing = False @defer.inlineCallbacks def _unsafe_process(self): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 48abd5e4d6..6bd703632d 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -22,9 +22,8 @@ from prometheus_client import Counter from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException -from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure from . import push_rule_evaluator, push_tools @@ -61,7 +60,7 @@ class HttpPusher(object): self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.failing_since = pusherdict['failing_since'] self.timed_call = None - self.processing = False + self._is_processing = False # This is the highest stream ordering we know it's safe to process. # When new events arrive, we'll be given a window of new events: we @@ -92,34 +91,27 @@ class HttpPusher(object): self.data_minus_url.update(self.data) del self.data_minus_url['url'] - @defer.inlineCallbacks def on_started(self): - try: - yield self._process() - except Exception: - logger.exception("Error starting http pusher") + self._start_processing() - @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) - yield self._process() + self._start_processing() - @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id): # Note that the min here shouldn't be relied upon to be accurate. # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... - with LoggingContext("push.on_new_receipts"): - with Measure(self.clock, "push.on_new_receipts"): - badge = yield push_tools.get_badge_count( - self.hs.get_datastore(), self.user_id - ) - yield self._send_badge(badge) + run_as_background_process("http_pusher.on_new_receipts", self._update_badge) @defer.inlineCallbacks + def _update_badge(self): + badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id) + yield self._send_badge(badge) + def on_timer(self): - yield self._process() + self._start_processing() def on_stop(self): if self.timed_call: @@ -129,27 +121,31 @@ class HttpPusher(object): pass self.timed_call = None + def _start_processing(self): + if self._is_processing: + return + + run_as_background_process("httppush.process", self._process) + @defer.inlineCallbacks def _process(self): - if self.processing: - return + # we should never get here if we are already processing + assert not self._is_processing - with LoggingContext("push._process"): - with Measure(self.clock, "push._process"): + try: + self._is_processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except Exception: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + yield self._unsafe_process() + except Exception: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self._is_processing = False @defer.inlineCallbacks def _unsafe_process(self): diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 1a5a10d974..16fb5e8471 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -526,8 +526,7 @@ def load_jinja2_templates(config): Returns: (notif_template_html, notif_template_text) """ - logger.info("loading jinja2") - + logger.info("loading email templates from '%s'", config.email_template_dir) loader = jinja2.FileSystemLoader(config.email_template_dir) env = jinja2.Environment(loader=loader) env.filters["format_ts"] = format_ts_filter diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 9f7d5ef217..5a4e73ccd6 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -20,24 +20,39 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push.pusher import PusherFactory -from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) class PusherPool: + """ + The pusher pool. This is responsible for dispatching notifications of new events to + the http and email pushers. + + It provides three methods which are designed to be called by the rest of the + application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these + delegates to each of the relevant pushers. + + Note that it is expected that each pusher will have its own 'processing' loop which + will send out the notifications in the background, rather than blocking until the + notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and + Pusher.on_new_receipts are not expected to return deferreds. + """ def __init__(self, _hs): self.hs = _hs self.pusher_factory = PusherFactory(_hs) - self.start_pushers = _hs.config.start_pushers + self._should_start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pushers = {} - @defer.inlineCallbacks def start(self): - pushers = yield self.store.get_all_pushers() - self._start_pushers(pushers) + """Starts the pushers off in a background process. + """ + if not self._should_start_pushers: + logger.info("Not starting pushers because they are disabled in the config") + return + run_as_background_process("start_pushers", self._start_pushers) @defer.inlineCallbacks def add_pusher(self, user_id, access_token, kind, app_id, @@ -86,7 +101,7 @@ class PusherPool: last_stream_ordering=last_stream_ordering, profile_tag=profile_tag, ) - yield self._refresh_pusher(app_id, pushkey, user_id) + yield self.start_pusher_by_id(app_id, pushkey, user_id) @defer.inlineCallbacks def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey, @@ -123,45 +138,23 @@ class PusherPool: p['app_id'], p['pushkey'], p['user_name'], ) - def on_new_notifications(self, min_stream_id, max_stream_id): - run_as_background_process( - "on_new_notifications", - self._on_new_notifications, min_stream_id, max_stream_id, - ) - @defer.inlineCallbacks - def _on_new_notifications(self, min_stream_id, max_stream_id): + def on_new_notifications(self, min_stream_id, max_stream_id): try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id ) - deferreds = [] - for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - deferreds.append( - run_in_background( - p.on_new_notifications, - min_stream_id, max_stream_id, - ) - ) - - yield make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True), - ) + p.on_new_notifications(min_stream_id, max_stream_id) + except Exception: logger.exception("Exception in pusher on_new_notifications") - def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): - run_as_background_process( - "on_new_receipts", - self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids, - ) - @defer.inlineCallbacks - def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive @@ -171,26 +164,20 @@ class PusherPool: # This returns a tuple, user_id is at index 3 users_affected = set([r[3] for r in updated_receipts]) - deferreds = [] - for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - deferreds.append( - run_in_background( - p.on_new_receipts, - min_stream_id, max_stream_id, - ) - ) - - yield make_deferred_yieldable( - defer.gatherResults(deferreds, consumeErrors=True), - ) + p.on_new_receipts(min_stream_id, max_stream_id) + except Exception: logger.exception("Exception in pusher on_new_receipts") @defer.inlineCallbacks - def _refresh_pusher(self, app_id, pushkey, user_id): + def start_pusher_by_id(self, app_id, pushkey, user_id): + """Look up the details for the given pusher, and start it""" + if not self._should_start_pushers: + return + resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( app_id, pushkey ) @@ -201,33 +188,49 @@ class PusherPool: p = r if p: + self._start_pusher(p) - self._start_pushers([p]) + @defer.inlineCallbacks + def _start_pushers(self): + """Start all the pushers - def _start_pushers(self, pushers): - if not self.start_pushers: - logger.info("Not starting pushers because they are disabled in the config") - return + Returns: + Deferred + """ + pushers = yield self.store.get_all_pushers() logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: - try: - p = self.pusher_factory.create_pusher(pusherdict) - except Exception: - logger.exception("Couldn't start a pusher: caught Exception") - continue - if p: - appid_pushkey = "%s:%s" % ( - pusherdict['app_id'], - pusherdict['pushkey'], - ) - byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + self._start_pusher(pusherdict) + logger.info("Started pushers") - if appid_pushkey in byuser: - byuser[appid_pushkey].on_stop() - byuser[appid_pushkey] = p - run_in_background(p.on_started) + def _start_pusher(self, pusherdict): + """Start the given pusher - logger.info("Started pushers") + Args: + pusherdict (dict): + + Returns: + None + """ + try: + p = self.pusher_factory.create_pusher(pusherdict) + except Exception: + logger.exception("Couldn't start a pusher: caught Exception") + return + + if not p: + return + + appid_pushkey = "%s:%s" % ( + pusherdict['app_id'], + pusherdict['pushkey'], + ) + byuser = self.pushers.setdefault(pusherdict['user_name'], {}) + + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p + p.on_started() @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 2947f37f1a..943876456b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -53,9 +53,10 @@ REQUIREMENTS = { "pillow>=3.1.2": ["PIL"], "pydenticon>=0.2": ["pydenticon"], "sortedcontainers>=1.4.4": ["sortedcontainers"], + "psutil>=2.0.0": ["psutil>=2.0.0"], "pysaml2>=3.0.0": ["saml2"], "pymacaroons-pynacl>=0.9.3": ["pymacaroons"], - "msgpack-python>=0.3.0": ["msgpack"], + "msgpack-python>=0.4.2": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], "six>=1.10": ["six"], @@ -79,9 +80,6 @@ CONDITIONAL_REQUIREMENTS = { "matrix-synapse-ldap3": { "matrix-synapse-ldap3>=0.1": ["ldap_auth_provider"], }, - "psutil": { - "psutil>=2.0.0": ["psutil>=2.0.0"], - }, "postgres": { "psycopg2>=2.6": ["psycopg2"] } diff --git a/res/templates/mail-Vector.css b/synapse/res/templates/mail-Vector.css index 6a3e36eda1..6a3e36eda1 100644 --- a/res/templates/mail-Vector.css +++ b/synapse/res/templates/mail-Vector.css diff --git a/res/templates/mail.css b/synapse/res/templates/mail.css index 5ab3e1b06d..5ab3e1b06d 100644 --- a/res/templates/mail.css +++ b/synapse/res/templates/mail.css diff --git a/res/templates/notif.html b/synapse/res/templates/notif.html index 88b921ca9c..88b921ca9c 100644 --- a/res/templates/notif.html +++ b/synapse/res/templates/notif.html diff --git a/res/templates/notif.txt b/synapse/res/templates/notif.txt index a37bee9833..a37bee9833 100644 --- a/res/templates/notif.txt +++ b/synapse/res/templates/notif.txt diff --git a/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html index fcdb3109fe..fcdb3109fe 100644 --- a/res/templates/notif_mail.html +++ b/synapse/res/templates/notif_mail.html diff --git a/res/templates/notif_mail.txt b/synapse/res/templates/notif_mail.txt index 24843042a5..24843042a5 100644 --- a/res/templates/notif_mail.txt +++ b/synapse/res/templates/notif_mail.txt diff --git a/res/templates/room.html b/synapse/res/templates/room.html index 723c222d25..723c222d25 100644 --- a/res/templates/room.html +++ b/synapse/res/templates/room.html diff --git a/res/templates/room.txt b/synapse/res/templates/room.txt index 84648c710e..84648c710e 100644 --- a/res/templates/room.txt +++ b/synapse/res/templates/room.txt diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 97733f3026..0220acf644 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -74,38 +74,11 @@ class ClientDirectoryServer(ClientV1RestServlet): if room is None: raise SynapseError(400, "Room does not exist") - dir_handler = self.handlers.directory_handler + requester = yield self.auth.get_user_by_req(request) - try: - # try to auth as a user - requester = yield self.auth.get_user_by_req(request) - try: - user_id = requester.user.to_string() - yield dir_handler.create_association( - user_id, room_alias, room_id, servers - ) - yield dir_handler.send_room_alias_update_event( - requester, - user_id, - room_id - ) - except SynapseError as e: - raise e - except Exception: - logger.exception("Failed to create association") - raise - except AuthError: - # try to auth as an application service - service = yield self.auth.get_appservice_by_req(request) - yield dir_handler.create_appservice_association( - service, room_alias, room_id, servers - ) - logger.info( - "Application service at %s created alias %s pointing to %s", - service.url, - room_alias.to_string(), - room_id - ) + yield self.handlers.directory_handler.create_association( + requester, room_alias, room_id, servers + ) defer.returnValue((200, {})) @@ -135,7 +108,7 @@ class ClientDirectoryServer(ClientV1RestServlet): room_alias = RoomAlias.from_string(room_alias) yield dir_handler.delete_association( - requester, user.to_string(), room_alias + requester, room_alias ) logger.info( diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index a828ff4438..08b1867fab 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -25,7 +25,7 @@ from six.moves.urllib import parse as urlparse import twisted.internet.error import twisted.web.http -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.web.resource import Resource from synapse.api.errors import ( @@ -36,8 +36,8 @@ from synapse.api.errors import ( ) from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import logcontext from synapse.util.async_helpers import Linearizer -from synapse.util.logcontext import make_deferred_yieldable from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import is_ascii, random_string @@ -492,10 +492,11 @@ class MediaRepository(object): )) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - )) + ) if t_byte_source: try: @@ -534,10 +535,11 @@ class MediaRepository(object): )) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, t_width, t_height, t_method, t_type - )) + ) if t_byte_source: try: @@ -620,15 +622,17 @@ class MediaRepository(object): for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), thumbnailer.crop, t_width, t_height, t_type, - )) + ) elif t_method == "scale": - t_byte_source = yield make_deferred_yieldable(threads.deferToThread( + t_byte_source = yield logcontext.defer_to_thread( + self.hs.get_reactor(), thumbnailer.scale, t_width, t_height, t_type, - )) + ) else: logger.error("Unrecognized method: %r", t_method) continue diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index a6189224ee..896078fe76 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -21,9 +21,10 @@ import sys import six -from twisted.internet import defer, threads +from twisted.internet import defer from twisted.protocols.basic import FileSender +from synapse.util import logcontext from synapse.util.file_consumer import BackgroundFileConsumer from synapse.util.logcontext import make_deferred_yieldable @@ -64,9 +65,10 @@ class MediaStorage(object): with self.store_into_file(file_info) as (f, fname, finish_cb): # Write to the main repository - yield make_deferred_yieldable(threads.deferToThread( + yield logcontext.defer_to_thread( + self.hs.get_reactor(), _write_file_synchronously, source, f, - )) + ) yield finish_cb() defer.returnValue(fname) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index af01040a38..8c892ff187 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -596,10 +596,13 @@ def _iterate_over_text(tree, *tags_to_ignore): # to be returned. elements = iter([tree]) while True: - el = next(elements) + el = next(elements, None) + if el is None: + return + if isinstance(el, string_types): yield el - elif el is not None and el.tag not in tags_to_ignore: + elif el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately # return it if the text exists. if el.text: diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 7b9f8b4d79..5aa03031f6 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -17,9 +17,10 @@ import logging import os import shutil -from twisted.internet import defer, threads +from twisted.internet import defer from synapse.config._base import Config +from synapse.util import logcontext from synapse.util.logcontext import run_in_background from .media_storage import FileResponder @@ -120,7 +121,8 @@ class FileStorageProviderBackend(StorageProvider): if not os.path.exists(dirname): os.makedirs(dirname) - return threads.deferToThread( + return logcontext.defer_to_thread( + self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname, ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index be61147b9b..d9d0255d0b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,7 +18,7 @@ import threading import time from six import PY2, iteritems, iterkeys, itervalues -from six.moves import intern, range +from six.moves import builtins, intern, range from canonicaljson import json from prometheus_client import Histogram @@ -1233,7 +1233,7 @@ def db_to_json(db_content): # psycopg2 on Python 2 returns buffer objects, which we need to cast to # bytes to decode - if PY2 and isinstance(db_content, buffer): + if PY2 and isinstance(db_content, builtins.buffer): db_content = bytes(db_content) # Decode it to a Unicode string before feeding it to json.loads, so we diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 24345b20a6..3faca2a042 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -376,33 +376,25 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, @defer.inlineCallbacks def get_missing_events(self, room_id, earliest_events, latest_events, - limit, min_depth): + limit): ids = yield self.runInteraction( "get_missing_events", self._get_missing_events, - room_id, earliest_events, latest_events, limit, min_depth + room_id, earliest_events, latest_events, limit, ) - events = yield self._get_events(ids) - - events = sorted( - [ev for ev in events if ev.depth >= min_depth], - key=lambda e: e.depth, - ) - - defer.returnValue(events[:limit]) + defer.returnValue(events) def _get_missing_events(self, txn, room_id, earliest_events, latest_events, - limit, min_depth): - - earliest_events = set(earliest_events) - front = set(latest_events) - earliest_events + limit): - event_results = set() + seen_events = set(earliest_events) + front = set(latest_events) - seen_events + event_results = [] query = ( "SELECT prev_event_id FROM event_edges " - "WHERE event_id = ? AND is_state = ? " + "WHERE room_id = ? AND event_id = ? AND is_state = ? " "LIMIT ?" ) @@ -411,18 +403,20 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, for event_id in front: txn.execute( query, - (event_id, False, limit - len(event_results)) + (room_id, event_id, False, limit - len(event_results)) ) - for e_id, in txn: - new_front.add(e_id) + new_results = set(t[0] for t in txn) - seen_events - new_front -= earliest_events - new_front -= event_results + new_front |= new_results + seen_events |= new_results + event_results.extend(new_results) front = new_front - event_results |= new_front + # we built the list working backwards from latest_events; we now need to + # reverse it so that the events are approximately chronological. + event_results.reverse() return event_results diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a1331c1a61..8af17921e3 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -32,7 +32,7 @@ logger = logging.getLogger(__name__) # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index c7987bfcdd..2743b52bad 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -29,7 +29,7 @@ from ._base import SQLBaseStore logger = logging.getLogger(__name__) if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py index 5623391f6e..158e9dbe7b 100644 --- a/synapse/storage/signatures.py +++ b/synapse/storage/signatures.py @@ -27,7 +27,7 @@ from ._base import SQLBaseStore # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index a3032cdce9..d8bf953ec0 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -30,7 +30,7 @@ from ._base import SQLBaseStore, db_to_json # py2 sqlite has buffer hardcoded as only binary type, so we must use it, # despite being deprecated and removed in favor of memoryview if six.PY2: - db_binary_type = buffer + db_binary_type = six.moves.builtins.buffer else: db_binary_type = memoryview diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index f2bde74dc5..625aedc940 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -15,6 +15,8 @@ import logging +from six import integer_types + from sortedcontainers import SortedDict from synapse.util import caches @@ -47,7 +49,7 @@ class StreamChangeCache(object): def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) is int or type(stream_pos) is long + assert type(stream_pos) in integer_types if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 89224b26cc..4c6e92beb8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works. import logging import threading -from twisted.internet import defer +from twisted.internet import defer, threads logger = logging.getLogger(__name__) @@ -562,58 +562,76 @@ def _set_context_cb(result, context): return result -# modules to ignore in `logcontext_tracer` -_to_ignore = [ - "synapse.util.logcontext", - "synapse.http.server", - "synapse.storage._base", - "synapse.util.async_helpers", -] +def defer_to_thread(reactor, f, *args, **kwargs): + """ + Calls the function `f` using a thread from the reactor's default threadpool and + returns the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked, and whose threadpool we should use for the + function. + + Normally this will be hs.get_reactor(). + + f (callable): The function to call. + args: positional arguments to pass to f. -def logcontext_tracer(frame, event, arg): - """A tracer that logs whenever a logcontext "unexpectedly" changes within - a function. Probably inaccurate. + kwargs: keyword arguments to pass to f. - Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. """ - if event == 'call': - name = frame.f_globals["__name__"] - if name.startswith("synapse"): - if name == "synapse.util.logcontext": - if frame.f_code.co_name in ["__enter__", "__exit__"]: - tracer = frame.f_back.f_trace - if tracer: - tracer.just_changed = True - - tracer = frame.f_trace - if tracer: - return tracer - - if not any(name.startswith(ig) for ig in _to_ignore): - return LineTracer() - - -class LineTracer(object): - __slots__ = ["context", "just_changed"] - - def __init__(self): - self.context = LoggingContext.current_context() - self.just_changed = False - - def __call__(self, frame, event, arg): - if event in 'line': - if self.just_changed: - self.context = LoggingContext.current_context() - self.just_changed = False - else: - c = LoggingContext.current_context() - if c != self.context: - logger.info( - "Context changed! %s -> %s, %s, %s", - self.context, c, - frame.f_code.co_filename, frame.f_lineno - ) - self.context = c + return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) - return self + +def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): + """ + A wrapper for twisted.internet.threads.deferToThreadpool, which handles + logcontexts correctly. + + Calls the function `f` using a thread from the given threadpool and returns + the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked. Normally this will be hs.get_reactor(). + + threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for + running `f`. Normally this will be hs.get_reactor().getThreadPool(). + + f (callable): The function to call. + + args: positional arguments to pass to f. + + kwargs: keyword arguments to pass to f. + + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. + """ + logcontext = LoggingContext.current_context() + + def g(): + with LoggingContext(parent_context=logcontext): + return f(*args, **kwargs) + + return make_deferred_yieldable( + threads.deferToThreadPool(reactor, threadpool, g) + ) diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 8d0f2a8918..9cb7e9c9ab 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -70,6 +70,8 @@ def manhole(username, password, globals): Returns: twisted.internet.protocol.Factory: A factory to pass to ``listenTCP`` """ + if not isinstance(password, bytes): + password = password.encode('ascii') checker = checkers.InMemoryUsernamePasswordDatabaseDontUse( **{username: password} @@ -82,7 +84,7 @@ def manhole(username, password, globals): ) factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker])) - factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY) - factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY) + factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY) + factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY) return factory diff --git a/synctl b/synctl index 09b64459b1..7e79b05c39 100755 --- a/synctl +++ b/synctl @@ -1,6 +1,7 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -48,7 +49,16 @@ def pid_running(pid): def write(message, colour=NORMAL, stream=sys.stdout): - if colour == NORMAL: + # Lets check if we're writing to a TTY before colouring + should_colour = False + try: + should_colour = stream.isatty() + except AttributeError: + # Just in case `isatty` isn't defined on everything. The python + # docs are incredibly vague. + pass + + if not should_colour: stream.write(message + "\n") else: stream.write(colour + message + NORMAL + "\n") @@ -66,8 +76,7 @@ def start(configfile): try: subprocess.check_call(args) - write("started synapse.app.homeserver(%r)" % - (configfile,), colour=GREEN) + write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN) except subprocess.CalledProcessError as e: write( "error starting (exit code: %d); see above for logs" % e.returncode, @@ -76,21 +85,15 @@ def start(configfile): def start_worker(app, configfile, worker_configfile): - args = [ - "python", "-B", - "-m", app, - "-c", configfile, - "-c", worker_configfile - ] + args = [sys.executable, "-B", "-m", app, "-c", configfile, "-c", worker_configfile] try: subprocess.check_call(args) write("started %s(%r)" % (app, worker_configfile), colour=GREEN) except subprocess.CalledProcessError as e: write( - "error starting %s(%r) (exit code: %d); see above for logs" % ( - app, worker_configfile, e.returncode, - ), + "error starting %s(%r) (exit code: %d); see above for logs" + % (app, worker_configfile, e.returncode), colour=RED, ) @@ -110,9 +113,9 @@ def stop(pidfile, app): abort("Cannot stop %s: Unknown error" % (app,)) -Worker = collections.namedtuple("Worker", [ - "app", "configfile", "pidfile", "cache_factor", "cache_factors", -]) +Worker = collections.namedtuple( + "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"] +) def main(): @@ -131,24 +134,20 @@ def main(): help="the homeserver config file, defaults to homeserver.yaml", ) parser.add_argument( - "-w", "--worker", - metavar="WORKERCONFIG", - help="start or stop a single worker", + "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker" ) parser.add_argument( - "-a", "--all-processes", + "-a", + "--all-processes", metavar="WORKERCONFIGDIR", help="start or stop all the workers in the given directory" - " and the main synapse process", + " and the main synapse process", ) options = parser.parse_args() if options.worker and options.all_processes: - write( - 'Cannot use "--worker" with "--all-processes"', - stream=sys.stderr - ) + write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr) sys.exit(1) configfile = options.configfile @@ -157,9 +156,7 @@ def main(): write( "No config file found\n" "To generate a config file, run '%s -c %s --generate-config" - " --server-name=<server name>'\n" % ( - " ".join(SYNAPSE), options.configfile - ), + " --server-name=<server name>'\n" % (" ".join(SYNAPSE), options.configfile), stream=sys.stderr, ) sys.exit(1) @@ -184,8 +181,7 @@ def main(): worker_configfile = options.worker if not os.path.exists(worker_configfile): write( - "No worker config found at %r" % (worker_configfile,), - stream=sys.stderr, + "No worker config found at %r" % (worker_configfile,), stream=sys.stderr ) sys.exit(1) worker_configfiles.append(worker_configfile) @@ -201,9 +197,9 @@ def main(): stream=sys.stderr, ) sys.exit(1) - worker_configfiles.extend(sorted(glob.glob( - os.path.join(worker_configdir, "*.yaml") - ))) + worker_configfiles.extend( + sorted(glob.glob(os.path.join(worker_configdir, "*.yaml"))) + ) workers = [] for worker_configfile in worker_configfiles: @@ -213,14 +209,12 @@ def main(): if worker_app == "synapse.app.homeserver": # We need to special case all of this to pick up options that may # be set in the main config file or in this worker config file. - worker_pidfile = ( - worker_config.get("pid_file") - or pidfile + worker_pidfile = worker_config.get("pid_file") or pidfile + worker_cache_factor = ( + worker_config.get("synctl_cache_factor") or cache_factor ) - worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor worker_cache_factors = ( - worker_config.get("synctl_cache_factors") - or cache_factors + worker_config.get("synctl_cache_factors") or cache_factors ) daemonize = worker_config.get("daemonize") or config.get("daemonize") assert daemonize, "Main process must have daemonize set to true" @@ -229,19 +223,27 @@ def main(): for key in worker_config: if key == "worker_app": # But we allow worker_app continue - assert not key.startswith("worker_"), \ - "Main process cannot use worker_* config" + assert not key.startswith( + "worker_" + ), "Main process cannot use worker_* config" else: worker_pidfile = worker_config["worker_pid_file"] worker_daemonize = worker_config["worker_daemonize"] assert worker_daemonize, "In config %r: expected '%s' to be True" % ( - worker_configfile, "worker_daemonize") + worker_configfile, + "worker_daemonize", + ) worker_cache_factor = worker_config.get("synctl_cache_factor") worker_cache_factors = worker_config.get("synctl_cache_factors", {}) - workers.append(Worker( - worker_app, worker_configfile, worker_pidfile, worker_cache_factor, - worker_cache_factors, - )) + workers.append( + Worker( + worker_app, + worker_configfile, + worker_pidfile, + worker_cache_factor, + worker_cache_factors, + ) + ) action = options.action diff --git a/tests/handlers/test_roomlist.py b/tests/handlers/test_roomlist.py new file mode 100644 index 0000000000..61eebb6985 --- /dev/null +++ b/tests/handlers/test_roomlist.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.handlers.room_list import RoomListNextBatch + +import tests.unittest +import tests.utils + + +class RoomListTestCase(tests.unittest.TestCase): + """ Tests RoomList's RoomListNextBatch. """ + + def setUp(self): + pass + + def test_check_read_batch_tokens(self): + batch_token = RoomListNextBatch( + stream_ordering="abcdef", + public_room_stream_id="123", + current_limit=20, + direction_is_forward=True, + ).to_token() + next_batch = RoomListNextBatch.from_token(batch_token) + self.assertEquals(next_batch.stream_ordering, "abcdef") + self.assertEquals(next_batch.public_room_stream_id, "123") + self.assertEquals(next_batch.current_limit, 20) + self.assertEquals(next_batch.direction_is_forward, True) diff --git a/tox.ini b/tox.ini index 87b5e4782d..04d2f721bf 100644 --- a/tox.ini +++ b/tox.ini @@ -108,10 +108,10 @@ commands = [testenv:pep8] skip_install = True -basepython = python2.7 +basepython = python3.6 deps = flake8 -commands = /bin/sh -c "flake8 synapse tests {env:PEP8SUFFIX:}" +commands = /bin/sh -c "flake8 synapse tests scripts scripts-dev scripts/register_new_matrix_user scripts/synapse_port_db synctl {env:PEP8SUFFIX:}" [testenv:check_isort] skip_install = True |