diff options
Diffstat (limited to '')
39 files changed, 1313 insertions, 592 deletions
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000000..d2050a3e44 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,47 @@ +<!-- + +**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**: +You will likely get better support more quickly if you ask in ** #matrix:matrix.org ** ;) + + +This is a bug report template. By following the instructions below and +filling out the sections with your information, you will help the us to get all +the necessary data to fix your issue. + +You can also preview your report before submitting it. You may remove sections +that aren't relevant to your particular case. + +Text between <!-- and --​> marks will be invisible in the report. + +--> + +### Description + +Describe here the problem that you are experiencing, or the feature you are requesting. + +### Steps to reproduce + +- For bugs, list the steps +- that reproduce the bug +- using hyphens as bullet points + +Describe how what happens differs from what you expected. + +If you can identify any relevant log snippets from _homeserver.log_, please include +those here (please be careful to remove any personal or private data): + +### Version information + +<!-- IMPORTANT: please answer the following questions, to help us narrow down the problem --> + +- **Homeserver**: Was this issue identified on matrix.org or another homeserver? + +If not matrix.org: +- **Version**: What version of Synapse is running? <!-- +You can find the Synapse version by inspecting the server headers (replace matrix.org with +your own homeserver domain): +$ curl -v https://matrix.org/_matrix/client/versions 2>&1 | grep "Server:" +--> +- **Install method**: package manager/git clone/pip +- **Platform**: Tell us about the environment in which your homeserver is operating + - distro, hardware, if it's running in a vm/container, etc. diff --git a/MANIFEST.in b/MANIFEST.in index 981698143f..afb60e12ee 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -27,4 +27,5 @@ exclude jenkins*.sh exclude jenkins* recursive-exclude jenkins *.sh +prune .github prune demo/etc diff --git a/UPGRADE.rst b/UPGRADE.rst index 62b22e9108..2efe7ea60f 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -5,39 +5,48 @@ Before upgrading check if any special steps are required to upgrade from the what you currently have installed to current version of synapse. The extra instructions that may be required are listed later in this document. -If synapse was installed in a virtualenv then active that virtualenv before -upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then run: +1. If synapse was installed in a virtualenv then active that virtualenv before + upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then + run: -.. code:: bash + .. code:: bash - source ~/.synapse/bin/activate + source ~/.synapse/bin/activate -If synapse was installed using pip then upgrade to the latest version by -running: +2. If synapse was installed using pip then upgrade to the latest version by + running: -.. code:: bash + .. code:: bash - pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master + pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master -If synapse was installed using git then upgrade to the latest version by -running: + # restart synapse + synctl restart -.. code:: bash - # Pull the latest version of the master branch. - git pull - # Update the versions of synapse's python dependencies. - python synapse/python_dependencies.py | xargs -n1 pip install --upgrade - -To check whether your update was sucessfull, run: + If synapse was installed using git then upgrade to the latest version by + running: -.. code:: bash + .. code:: bash + + # Pull the latest version of the master branch. + git pull + # Update the versions of synapse's python dependencies. + python synapse/python_dependencies.py | xargs pip install --upgrade - # replace your.server.domain with ther domain of your synapse homeserver - curl https://<your.server.domain>/_matrix/federation/v1/version + # restart synapse + ./synctl restart -So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version. +To check whether your update was sucessful, you can check the Server header +returned by the Client-Server API: + +.. code:: bash + + # replace <host.name> with the hostname of your synapse homeserver. + # You may need to specify a port (eg, :8448) if your server is not + # configured on port 443. + curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:" Upgrading to v0.15.0 ==================== @@ -77,7 +86,7 @@ It has been replaced by specifying a list of application service registrations i ``homeserver.yaml``:: app_service_config_files: ["registration-01.yaml", "registration-02.yaml"] - + Where ``registration-01.yaml`` looks like:: url: <String> # e.g. "https://my.application.service.com" @@ -166,7 +175,7 @@ This release completely changes the database schema and so requires upgrading it before starting the new version of the homeserver. The script "database-prepare-for-0.5.0.sh" should be used to upgrade the -database. This will save all user information, such as logins and profiles, +database. This will save all user information, such as logins and profiles, but will otherwise purge the database. This includes messages, which rooms the home server was a member of and room alias mappings. @@ -175,18 +184,18 @@ file and ask for help in #matrix:matrix.org. The upgrade process is, unfortunately, non trivial and requires human intervention to resolve any resulting conflicts during the upgrade process. -Before running the command the homeserver should be first completely +Before running the command the homeserver should be first completely shutdown. To run it, simply specify the location of the database, e.g.: ./scripts/database-prepare-for-0.5.0.sh "homeserver.db" -Once this has successfully completed it will be safe to restart the -homeserver. You may notice that the homeserver takes a few seconds longer to +Once this has successfully completed it will be safe to restart the +homeserver. You may notice that the homeserver takes a few seconds longer to restart than usual as it reinitializes the database. On startup of the new version, users can either rejoin remote rooms using room aliases or by being reinvited. Alternatively, if any other homeserver sends a -message to a room that the homeserver was previously in the local HS will +message to a room that the homeserver was previously in the local HS will automatically rejoin the room. Upgrading to v0.4.0 @@ -245,7 +254,7 @@ automatically generate default config use:: --config-path homeserver.config \ --generate-config -This config can be edited if desired, for example to specify a different SSL +This config can be edited if desired, for example to specify a different SSL certificate to use. Once done you can run the home server using:: $ python synapse/app/homeserver.py --config-path homeserver.config @@ -266,20 +275,20 @@ This release completely changes the database schema and so requires upgrading it before starting the new version of the homeserver. The script "database-prepare-for-0.0.1.sh" should be used to upgrade the -database. This will save all user information, such as logins and profiles, +database. This will save all user information, such as logins and profiles, but will otherwise purge the database. This includes messages, which rooms the home server was a member of and room alias mappings. -Before running the command the homeserver should be first completely +Before running the command the homeserver should be first completely shutdown. To run it, simply specify the location of the database, e.g.: ./scripts/database-prepare-for-0.0.1.sh "homeserver.db" -Once this has successfully completed it will be safe to restart the -homeserver. You may notice that the homeserver takes a few seconds longer to +Once this has successfully completed it will be safe to restart the +homeserver. You may notice that the homeserver takes a few seconds longer to restart than usual as it reinitializes the database. On startup of the new version, users can either rejoin remote rooms using room aliases or by being reinvited. Alternatively, if any other homeserver sends a -message to a room that the homeserver was previously in the local HS will +message to a room that the homeserver was previously in the local HS will automatically rejoin the room. diff --git a/contrib/prometheus/README b/contrib/prometheus/README new file mode 100644 index 0000000000..eb91db2de2 --- /dev/null +++ b/contrib/prometheus/README @@ -0,0 +1,20 @@ +This directory contains some sample monitoring config for using the +'Prometheus' monitoring server against synapse. + +To use it, first install prometheus by following the instructions at + + http://prometheus.io/ + +Then add a new job to the main prometheus.conf file: + + job: { + name: "synapse" + + target_group: { + target: "http://SERVER.LOCATION.HERE:PORT/_synapse/metrics" + } + } + +Metrics are disabled by default when running synapse; they must be enabled +with the 'enable-metrics' option, either in the synapse config file or as a +command-line option. diff --git a/contrib/prometheus/consoles/synapse.html b/contrib/prometheus/consoles/synapse.html new file mode 100644 index 0000000000..e23d8a1fce --- /dev/null +++ b/contrib/prometheus/consoles/synapse.html @@ -0,0 +1,395 @@ +{{ template "head" . }} + +{{ template "prom_content_head" . }} +<h1>System Resources</h1> + +<h3>CPU</h3> +<div id="process_resource_utime"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#process_resource_utime"), + expr: "rate(process_cpu_seconds_total[2m]) * 100", + name: "[[job]]", + min: 0, + max: 100, + renderer: "line", + height: 150, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "%", + yTitle: "CPU Usage" +}) +</script> + +<h3>Memory</h3> +<div id="process_resource_maxrss"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#process_resource_maxrss"), + expr: "process_psutil_rss:max", + name: "Maxrss", + min: 0, + renderer: "line", + height: 150, + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "bytes", + yTitle: "Usage" +}) +</script> + +<h3>File descriptors</h3> +<div id="process_fds"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#process_fds"), + expr: "process_open_fds{job='synapse'}", + name: "FDs", + min: 0, + renderer: "line", + height: 150, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "", + yTitle: "Descriptors" +}) +</script> + +<h1>Reactor</h1> + +<h3>Total reactor time</h3> +<div id="reactor_total_time"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#reactor_total_time"), + expr: "rate(python_twisted_reactor_tick_time:total[2m]) / 1000", + name: "time", + max: 1, + min: 0, + renderer: "area", + height: 150, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/s", + yTitle: "Usage" +}) +</script> + +<h3>Average reactor tick time</h3> +<div id="reactor_average_time"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#reactor_average_time"), + expr: "rate(python_twisted_reactor_tick_time:total[2m]) / rate(python_twisted_reactor_tick_time:count[2m]) / 1000", + name: "time", + min: 0, + renderer: "line", + height: 150, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s", + yTitle: "Time" +}) +</script> + +<h3>Pending calls per tick</h3> +<div id="reactor_pending_calls"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#reactor_pending_calls"), + expr: "rate(python_twisted_reactor_pending_calls:total[30s])/rate(python_twisted_reactor_pending_calls:count[30s])", + name: "calls", + min: 0, + renderer: "line", + height: 150, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yTitle: "Pending Cals" +}) +</script> + +<h1>Storage</h1> + +<h3>Queries</h3> +<div id="synapse_storage_query_time"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_storage_query_time"), + expr: "rate(synapse_storage_query_time:count[2m])", + name: "[[verb]]", + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "queries/s", + yTitle: "Queries" +}) +</script> + +<h3>Transactions</h3> +<div id="synapse_storage_transaction_time"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_storage_transaction_time"), + expr: "rate(synapse_storage_transaction_time:count[2m])", + name: "[[desc]]", + min: 0, + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "txn/s", + yTitle: "Transactions" +}) +</script> + +<h3>Transaction execution time</h3> +<div id="synapse_storage_transactions_time_msec"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_storage_transactions_time_msec"), + expr: "rate(synapse_storage_transaction_time:total[2m]) / 1000", + name: "[[desc]]", + min: 0, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/s", + yTitle: "Usage" +}) +</script> + +<h3>Database scheduling latency</h3> +<div id="synapse_storage_schedule_time"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_storage_schedule_time"), + expr: "rate(synapse_storage_schedule_time:total[2m]) / 1000", + name: "Total latency", + min: 0, + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/s", + yTitle: "Usage" +}) +</script> + +<h3>Cache hit ratio</h3> +<div id="synapse_cache_ratio"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_cache_ratio"), + expr: "rate(synapse_util_caches_cache:total[2m]) * 100", + name: "[[name]]", + min: 0, + max: 100, + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "%", + yTitle: "Percentage" +}) +</script> + +<h3>Cache size</h3> +<div id="synapse_cache_size"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_cache_size"), + expr: "synapse_util_caches_cache:size", + name: "[[name]]", + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "", + yTitle: "Items" +}) +</script> + +<h1>Requests</h1> + +<h3>Requests by Servlet</h3> +<div id="synapse_http_server_requests_servlet"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_requests_servlet"), + expr: "rate(synapse_http_server_requests:servlet[2m])", + name: "[[servlet]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "req/s", + yTitle: "Requests" +}) +</script> +<h4> (without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4> +<div id="synapse_http_server_requests_servlet_minus_events"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"), + expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])", + name: "[[servlet]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "req/s", + yTitle: "Requests" +}) +</script> + +<h3>Average response times</h3> +<div id="synapse_http_server_response_time_avg"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_response_time_avg"), + expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000", + name: "[[servlet]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/req", + yTitle: "Response time" +}) +</script> + +<h3>All responses by code</h3> +<div id="synapse_http_server_responses"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_responses"), + expr: "rate(synapse_http_server_responses[2m])", + name: "[[method]] / [[code]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "req/s", + yTitle: "Requests" +}) +</script> + +<h3>Error responses by code</h3> +<div id="synapse_http_server_responses_err"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_responses_err"), + expr: "rate(synapse_http_server_responses{code=~\"[45]..\"}[2m])", + name: "[[method]] / [[code]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "req/s", + yTitle: "Requests" +}) +</script> + + +<h3>CPU Usage</h3> +<div id="synapse_http_server_response_ru_utime"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_response_ru_utime"), + expr: "rate(synapse_http_server_response_ru_utime:total[2m])", + name: "[[servlet]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/s", + yTitle: "CPU Usage" +}) +</script> + + +<h3>DB Usage</h3> +<div id="synapse_http_server_response_db_txn_duration"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_response_db_txn_duration"), + expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])", + name: "[[servlet]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/s", + yTitle: "DB Usage" +}) +</script> + + +<h3>Average event send times</h3> +<div id="synapse_http_server_send_time_avg"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_http_server_send_time_avg"), + expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000", + name: "[[servlet]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "s/req", + yTitle: "Response time" +}) +</script> + +<h1>Federation</h1> + +<h3>Sent Messages</h3> +<div id="synapse_federation_client_sent"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_federation_client_sent"), + expr: "rate(synapse_federation_client_sent[2m])", + name: "[[type]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "req/s", + yTitle: "Requests" +}) +</script> + +<h3>Received Messages</h3> +<div id="synapse_federation_server_received"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_federation_server_received"), + expr: "rate(synapse_federation_server_received[2m])", + name: "[[type]]", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "req/s", + yTitle: "Requests" +}) +</script> + +<h3>Pending</h3> +<div id="synapse_federation_transaction_queue_pending"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_federation_transaction_queue_pending"), + expr: "synapse_federation_transaction_queue_pending", + name: "[[type]]", + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "", + yTitle: "Units" +}) +</script> + +<h1>Clients</h1> + +<h3>Notifiers</h3> +<div id="synapse_notifier_listeners"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_notifier_listeners"), + expr: "synapse_notifier_listeners", + name: "listeners", + min: 0, + yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix, + yUnits: "", + yTitle: "Listeners" +}) +</script> + +<h3>Notified Events</h3> +<div id="synapse_notifier_notified_events"></div> +<script> +new PromConsole.Graph({ + node: document.querySelector("#synapse_notifier_notified_events"), + expr: "rate(synapse_notifier_notified_events[2m])", + name: "events", + yAxisFormatter: PromConsole.NumberFormatter.humanize, + yHoverFormatter: PromConsole.NumberFormatter.humanize, + yUnits: "events/s", + yTitle: "Event rate" +}) +</script> + +{{ template "prom_content_tail" . }} + +{{ template "tail" }} diff --git a/contrib/prometheus/synapse.rules b/contrib/prometheus/synapse.rules new file mode 100644 index 0000000000..b6f84174b0 --- /dev/null +++ b/contrib/prometheus/synapse.rules @@ -0,0 +1,21 @@ +synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0) +synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0) + +synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method) +synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet) + +synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet) + +synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m]) +synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s]) + +synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0 +synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0 +synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job) + +synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0 +synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0 +synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job) + +synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0 +synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0 diff --git a/docs/postgres.rst b/docs/postgres.rst index b592801e93..904942ec74 100644 --- a/docs/postgres.rst +++ b/docs/postgres.rst @@ -1,6 +1,8 @@ Using Postgres -------------- +Postgres version 9.4 or later is known to work. + Set up database =============== diff --git a/jenkins-dendron-haproxy-postgres.sh b/jenkins-dendron-haproxy-postgres.sh index d64b2d2c9d..07979bf8b8 100755 --- a/jenkins-dendron-haproxy-postgres.sh +++ b/jenkins-dendron-haproxy-postgres.sh @@ -17,6 +17,7 @@ export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy ./sytest/jenkins/prep_sytest_for_postgres.sh ./sytest/jenkins/install_and_run.sh \ + --python $WORKSPACE/.tox/py27/bin/python \ --synapse-directory $WORKSPACE \ --dendron $WORKSPACE/dendron/bin/dendron \ --haproxy \ diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 37ae746f4b..3b932fe340 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -15,5 +15,6 @@ export SYNAPSE_CACHE_FACTOR=1 ./sytest/jenkins/prep_sytest_for_postgres.sh ./sytest/jenkins/install_and_run.sh \ + --python $WORKSPACE/.tox/py27/bin/python \ --synapse-directory $WORKSPACE \ --dendron $WORKSPACE/dendron/bin/dendron \ diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index f2ca8ccdff..1afb736394 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -14,4 +14,5 @@ export SYNAPSE_CACHE_FACTOR=1 ./sytest/jenkins/prep_sytest_for_postgres.sh ./sytest/jenkins/install_and_run.sh \ + --python $WORKSPACE/.tox/py27/bin/python \ --synapse-directory $WORKSPACE \ diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index 84613d979c..baf4713a01 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -12,4 +12,5 @@ export SYNAPSE_CACHE_FACTOR=1 ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git ./sytest/jenkins/install_and_run.sh \ + --python $WORKSPACE/.tox/py27/bin/python \ --synapse-directory $WORKSPACE \ diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index d1ab42d3af..82a90ef6fa 100644..100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -1,10 +1,30 @@ +#!/usr/bin/env python +# +# Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import argparse import nacl.signing import json import base64 import requests import sys import srvlookup - +import yaml def encode_base64(input_bytes): """Encode bytes as a base64 string without any padding.""" @@ -120,11 +140,13 @@ def get_json(origin_name, origin_key, destination, path): origin_name, key, sig, ) authorization_headers.append(bytes(header)) - sys.stderr.write(header) - sys.stderr.write("\n") + print ("Authorization: %s" % header, file=sys.stderr) + + dest = lookup(destination, path) + print ("Requesting %s" % dest, file=sys.stderr) result = requests.get( - lookup(destination, path), + dest, headers={"Authorization": authorization_headers[0]}, verify=False, ) @@ -133,17 +155,66 @@ def get_json(origin_name, origin_key, destination, path): def main(): - origin_name, keyfile, destination, path = sys.argv[1:] + parser = argparse.ArgumentParser( + description= + "Signs and sends a federation request to a matrix homeserver", + ) + + parser.add_argument( + "-N", "--server-name", + help="Name to give as the local homeserver. If unspecified, will be " + "read from the config file.", + ) + + parser.add_argument( + "-k", "--signing-key-path", + help="Path to the file containing the private ed25519 key to sign the " + "request with.", + ) + + parser.add_argument( + "-c", "--config", + default="homeserver.yaml", + help="Path to server config file. Ignored if --server-name and " + "--signing-key-path are both given.", + ) - with open(keyfile) as f: + parser.add_argument( + "-d", "--destination", + default="matrix.org", + help="name of the remote homeserver. We will do SRV lookups and " + "connect appropriately.", + ) + + parser.add_argument( + "path", + help="request path. We will add '/_matrix/federation/v1/' to this." + ) + + args = parser.parse_args() + + if not args.server_name or not args.signing_key_path: + read_args_from_config(args) + + with open(args.signing_key_path) as f: key = read_signing_keys(f)[0] result = get_json( - origin_name, key, destination, "/_matrix/federation/v1/" + path + args.server_name, key, args.destination, "/_matrix/federation/v1/" + args.path ) json.dump(result, sys.stdout) - print "" + print ("") + + +def read_args_from_config(args): + with open(args.config, 'r') as fh: + config = yaml.safe_load(fh) + if not args.server_name: + args.server_name = config['server_name'] + if not args.signing_key_path: + args.signing_key_path = config['signing_key_path'] + if __name__ == "__main__": main() diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 7d158a46a4..bc167b59af 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -252,6 +252,25 @@ class Porter(object): ) return + if table in ( + "user_directory", "user_directory_search", "users_who_share_rooms", + "users_in_pubic_room", + ): + # We don't port these tables, as they're a faff and we can regenreate + # them anyway. + self.progress.update(table, table_size) # Mark table as done + return + + if table == "user_directory_stream_pos": + # We need to make sure there is a single row, `(X, null), as that is + # what synapse expects to be there. + yield self.postgres_store._simple_insert( + table=table, + values={"stream_id": None}, + ) + self.progress.update(table, table_size) # Mark table as done + return + forward_select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) diff --git a/synapse/app/_base.py b/synapse/app/_base.py new file mode 100644 index 0000000000..cd0e815919 --- /dev/null +++ b/synapse/app/_base.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import gc +import logging + +import affinity +from daemonize import Daemonize +from synapse.util import PreserveLoggingContext +from synapse.util.rlimit import change_resource_limit +from twisted.internet import reactor + + +def start_worker_reactor(appname, config): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor. Pulls configuration from the 'worker' settings in 'config'. + + Args: + appname (str): application name which will be sent to syslog + config (synapse.config.Config): config object + """ + + logger = logging.getLogger(config.worker_app) + + start_reactor( + appname, + config.soft_file_limit, + config.gc_thresholds, + config.worker_pid_file, + config.worker_daemonize, + config.worker_cpu_affinity, + logger, + ) + + +def start_reactor( + appname, + soft_file_limit, + gc_thresholds, + pid_file, + daemonize, + cpu_affinity, + logger, +): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor + + Args: + appname (str): application name which will be sent to syslog + soft_file_limit (int): + gc_thresholds: + pid_file (str): name of pid file to write to if daemonize is True + daemonize (bool): true to run the reactor in a background process + cpu_affinity (int|None): cpu affinity mask + logger (logging.Logger): logger instance to pass to Daemonize + """ + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + if cpu_affinity is not None: + logger.info("Setting CPU affinity to %s" % cpu_affinity) + affinity.set_process_affinity_mask(0, cpu_affinity) + change_resource_limit(soft_file_limit) + if gc_thresholds: + gc.set_threshold(*gc_thresholds) + reactor.run() + + if daemonize: + daemon = Daemonize( + app=appname, + pid=pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a476efa63..ba2657bbad 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -13,38 +13,31 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.appservice") @@ -181,36 +174,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-appservice", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-appservice", config) if __name__ == '__main__': diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 09bc1935f1..129cfa901f 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -13,47 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite +from synapse.crypto import context_factory from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import PublicRoomListRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.client_reader") @@ -183,36 +175,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-client-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-client-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index eb392e1c9d..40cebe6f4a 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -13,44 +13,36 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.urls import FEDERATION_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import FEDERATION_PREFIX -from synapse.federation.transport.server import TransportLayerServer -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_reader") @@ -172,36 +164,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 03327dc47a..389e3909d1 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -13,44 +13,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_sender") @@ -213,36 +206,12 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-sender", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-sender", config) class FederationSenderHandler(object): diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 132f18a979..bee4c47498 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -13,48 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.errors import SynapseError +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite +from synapse.crypto import context_factory from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, +) +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.crypto import context_factory -from synapse.api.errors import SynapseError -from synapse.http.servlet import ( - RestServlet, parse_json_object_from_request, -) -from synapse.rest.client.v2_alpha._base import client_v2_patterns - -from synapse import events - - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - - logger = logging.getLogger("synapse.app.frontend_proxy") @@ -234,36 +225,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-frontend-proxy", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-frontend-proxy", config) if __name__ == '__main__': diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 081e7cce59..84ad8f04a0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -13,61 +13,48 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import synapse - import gc import logging import os import sys +import synapse import synapse.config.logger +from synapse import events +from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \ + LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \ + STATIC_PREFIX, WEB_CLIENT_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError - -from synapse.python_dependencies import ( - check_requirements, CONDITIONAL_REQUIREMENTS -) - -from synapse.rest import ClientRestResource -from synapse.storage.engines import create_engine, IncorrectDatabaseSetup -from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database - -from synapse.server import HomeServer - -from twisted.internet import reactor, defer -from twisted.application import service -from twisted.web.resource import Resource, EncodingResourceWrapper -from twisted.web.static import File -from twisted.web.server import GzipEncoderFactory -from synapse.http.server import RootRedirect -from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource -from synapse.rest.key.v1.server_key_resource import LocalKey -from synapse.rest.key.v2 import KeyApiV2Resource -from synapse.api.urls import ( - FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, - SERVER_KEY_V2_PREFIX, -) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.federation.transport.server import TransportLayerServer +from synapse.http.server import RootRedirect +from synapse.http.site import SynapseSite from synapse.metrics import register_memory_metrics -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \ + check_requirements from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory -from synapse.federation.transport.server import TransportLayerServer - -from synapse.util.rlimit import change_resource_limit -from synapse.util.versionstring import get_version_string +from synapse.rest import ClientRestResource +from synapse.rest.key.v1.server_key_resource import LocalKey +from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.rest.media.v0.content_repository import ContentRepoResource +from synapse.rest.media.v1.media_repository import MediaRepositoryResource +from synapse.server import HomeServer +from synapse.storage import are_all_users_on_domain +from synapse.storage.engines import IncorrectDatabaseSetup, create_engine +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole - -from synapse.http.site import SynapseSite - -from synapse import events - -from daemonize import Daemonize +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from twisted.application import service +from twisted.internet import defer, reactor +from twisted.web.resource import EncodingResourceWrapper, Resource +from twisted.web.server import GzipEncoderFactory +from twisted.web.static import File logger = logging.getLogger("synapse.app.homeserver") @@ -446,37 +433,18 @@ def run(hs): # be quite busy the first few minutes clock.call_later(5 * 60, phone_stats_home) - def in_thread(): - # Uncomment to enable tracing of log context changes. - # sys.settrace(logcontext_tracer) - - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) - reactor.run() - - if hs.config.daemonize: - - if hs.config.print_pidfile: - print (hs.config.pid_file) - - daemon = Daemonize( - app="synapse-homeserver", - pid=hs.config.pid_file, - action=lambda: in_thread(), - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - in_thread() + if hs.config.daemonize and hs.config.print_pidfile: + print (hs.config.pid_file) + + _base.start_reactor( + "synapse-homeserver", + hs.config.soft_file_limit, + hs.config.gc_thresholds, + hs.config.pid_file, + hs.config.daemonize, + hs.config.cpu_affinity, + logger, + ) def main(): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index f57ec784fe..36c18bdbcb 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -13,14 +13,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - +from synapse import events +from synapse.api.urls import ( + CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX +) +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -33,27 +40,12 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import ( - CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX -) -from synapse.crypto import context_factory - -from synapse import events - - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.media_repository") @@ -180,36 +172,13 @@ def start(config_options): ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-media-repository", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-media-repository", config) if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f9114acfcb..db9a4d16f4 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -13,41 +13,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys import synapse - -from synapse.server import HomeServer +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.roommember import RoomMemberStore +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.storage.engines import create_engine +from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.engines import create_engine +from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn, \ - PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string - -from synapse import events - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.pusher") @@ -244,18 +236,6 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_pusherpool().start() ps.get_datastore().start_profiling() @@ -263,18 +243,7 @@ def start(config_options): reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-pusher", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-pusher", config) if __name__ == '__main__': diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d06a05acd9..576ac6fb7e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -13,57 +13,51 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import logging +import sys import synapse - from synapse.api.constants import EventTypes +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.rest.client.v2_alpha import sync -from synapse.rest.client.v1 import events -from synapse.rest.client.v1.room import RoomInitialSyncRestServlet -from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.filtering import SlavedFilteringStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1 import events +from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.rest.client.v1.room import RoomInitialSyncRestServlet +from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string - -from twisted.internet import reactor, defer +from twisted.internet import defer, reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import contextlib -import gc - logger = logging.getLogger("synapse.app.synchrotron") @@ -446,36 +440,13 @@ def start(config_options): ss.setup() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_datastore().start_profiling() ss.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-synchrotron", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-synchrotron", config) if __name__ == '__main__': diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 8c6300db9d..be661a70c7 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -14,16 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse +import logging +import sys -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v2_alpha import user_directory +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from synapse import events - from twisted.internet import reactor from twisted.web.resource import Resource -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.user_dir") @@ -233,36 +227,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-user-dir", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-user-dir", config) if __name__ == '__main__': diff --git a/synapse/config/server.py b/synapse/config/server.py index 28b4e5f50c..89d61a0503 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +30,7 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") + self.cpu_affinity = config.get("cpu_affinity") # Whether to send federation traffic out in this process. This only # applies to some federation traffic, and so shouldn't be used to @@ -147,6 +149,27 @@ class ServerConfig(Config): # When running as a daemon, the file to store the pid in pid_file: %(pid_file)s + # CPU affinity mask. Setting this restricts the CPUs on which the + # process will be scheduled. It is represented as a bitmask, with the + # lowest order bit corresponding to the first logical CPU and the + # highest order bit corresponding to the last logical CPU. Not all CPUs + # may exist on a given system but a mask may specify more CPUs than are + # present. + # + # For example: + # 0x00000001 is processor #0, + # 0x00000003 is processors #0 and #1, + # 0xFFFFFFFF is all processors (#0 through #31). + # + # Pinning a Python process to a single CPU is desirable, because Python + # is inherently single-threaded due to the GIL, and can suffer a + # 30-40%% slowdown due to cache blow-out and thread context switching + # if the scheduler happens to schedule the underlying threads across + # different cores. See + # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/. + # + # cpu_affinity: 0xFFFFFFFF + # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 99d5d8aaeb..c5a5a8919c 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -33,6 +33,7 @@ class WorkerConfig(Config): self.worker_name = config.get("worker_name", self.worker_app) self.worker_main_http_uri = config.get("worker_main_http_uri", None) + self.worker_cpu_affinity = config.get("worker_cpu_affinity") if self.worker_listeners: for listener in self.worker_listeners: diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 1bb27edc0f..51851d04e5 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,10 +16,9 @@ from synapse.crypto.keyclient import fetch_server_key from synapse.api.errors import SynapseError, Codes -from synapse.util import unwrapFirstError -from synapse.util.async import ObservableDeferred +from synapse.util import unwrapFirstError, logcontext from synapse.util.logcontext import ( - preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext, + preserve_context_over_fn, PreserveLoggingContext, preserve_fn ) from synapse.util.metrics import Measure @@ -74,6 +74,11 @@ class Keyring(object): self.perspective_servers = self.config.perspectives self.hs = hs + # map from server name to Deferred. Has an entry for each server with + # an ongoing key download; the Deferred completes once the download + # completes. + # + # These are regular, logcontext-agnostic Deferreds. self.key_downloads = {} def verify_json_for_server(self, server_name, json_object): @@ -82,7 +87,7 @@ class Keyring(object): )[0] def verify_json_objects_for_server(self, server_and_json): - """Bulk verfies signatures of json objects, bulk fetching keys as + """Bulk verifies signatures of json objects, bulk fetching keys as necessary. Args: @@ -212,7 +217,13 @@ class Keyring(object): Args: server_names (list): list of server_names we want to lookup server_to_deferred (dict): server_name to deferred which gets - resolved once we've finished looking up keys for that server + resolved once we've finished looking up keys for that server. + The Deferreds should be regular twisted ones which call their + callbacks with no logcontext. + + Returns: a Deferred which resolves once all key lookups for the given + servers have completed. Follows the synapse rules of logcontext + preservation. """ while True: wait_on = [ @@ -226,15 +237,13 @@ class Keyring(object): else: break - for server_name, deferred in server_to_deferred.items(): - d = ObservableDeferred(preserve_context_over_deferred(deferred)) - self.key_downloads[server_name] = d - - def rm(r, server_name): - self.key_downloads.pop(server_name, None) - return r + def rm(r, server_name_): + self.key_downloads.pop(server_name_, None) + return r - d.addBoth(rm, server_name) + for server_name, deferred in server_to_deferred.items(): + self.key_downloads[server_name] = deferred + deferred.addBoth(rm, server_name) def get_server_verify_keys(self, verify_requests): """Tries to find at least one key for each verify request @@ -333,7 +342,7 @@ class Keyring(object): Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from server_name -> key_id -> VerifyKey """ - res = yield preserve_context_over_deferred(defer.gatherResults( + res = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store.get_server_verify_keys)( server_name, key_ids @@ -341,7 +350,7 @@ class Keyring(object): for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(dict(res)) @@ -362,13 +371,13 @@ class Keyring(object): ) defer.returnValue({}) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(get_key)(p_name, p_keys) for p_name, p_keys in self.perspective_servers.items() ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) union_of_keys = {} for result in results: @@ -402,13 +411,13 @@ class Keyring(object): defer.returnValue(keys) - results = yield preserve_context_over_deferred(defer.gatherResults( + results = yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(get_key)(server_name, key_ids) for server_name, key_ids in server_name_and_key_ids ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) merged = {} for result in results: @@ -485,7 +494,7 @@ class Keyring(object): for server_name, response_keys in processed_response.items(): keys.setdefault(server_name, {}).update(response_keys) - yield preserve_context_over_deferred(defer.gatherResults( + yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=server_name, @@ -495,7 +504,7 @@ class Keyring(object): for server_name, response_keys in keys.items() ], consumeErrors=True - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(keys) @@ -543,7 +552,7 @@ class Keyring(object): keys.update(response_keys) - yield preserve_context_over_deferred(defer.gatherResults( + yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store_keys)( server_name=key_server_name, @@ -553,7 +562,7 @@ class Keyring(object): for key_server_name, verify_keys in keys.items() ], consumeErrors=True - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) defer.returnValue(keys) @@ -619,7 +628,7 @@ class Keyring(object): response_keys.update(verify_keys) response_keys.update(old_verify_keys) - yield preserve_context_over_deferred(defer.gatherResults( + yield logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store.store_server_keys_json)( server_name=server_name, @@ -632,7 +641,7 @@ class Keyring(object): for key_id in updated_key_ids ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) results[server_name] = response_keys @@ -710,7 +719,6 @@ class Keyring(object): defer.returnValue(verify_keys) - @defer.inlineCallbacks def store_keys(self, server_name, from_server, verify_keys): """Store a collection of verify keys for a given server Args: @@ -721,7 +729,7 @@ class Keyring(object): A deferred that completes when the keys are stored. """ # TODO(markjh): Store whether the keys have expired. - yield preserve_context_over_deferred(defer.gatherResults( + return logcontext.make_deferred_yieldable(defer.gatherResults( [ preserve_fn(self.store.store_server_verify_key)( server_name, server_name, key.time_added, key @@ -729,4 +737,4 @@ class Keyring(object): for key_id, key in verify_keys.items() ], consumeErrors=True, - )).addErrback(unwrapFirstError) + ).addErrback(unwrapFirstError)) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index ed60d494ff..dac4b3f4e0 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler): user_id (str) from_token (StreamToken) """ + now_token = yield self.hs.get_event_sources().get_current_token() + room_ids = yield self.store.get_rooms_for_user(user_id) # First we check if any devices have changed @@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler): # Then work out if any users have since joined rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) + member_events = yield self.store.get_membership_changes_for_user( + user_id, from_token.room_key, now_token.room_key + ) + rooms_changed.update(event.room_id for event in member_events) + stream_ordering = RoomStreamToken.parse_stream_token( - from_token.room_key).stream + from_token.room_key + ).stream possibly_changed = set(changed) + possibly_left = set() for room_id in rooms_changed: + current_state_ids = yield self.store.get_current_state_ids(room_id) + + # The user may have left the room + # TODO: Check if they actually did or if we were just invited. + if room_id not in room_ids: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_left.add(state_key) + continue + # Fetch the current state at the time. try: event_ids = yield self.store.get_forward_extremeties_for_room( @@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler): # ordering: treat it the same as a new room event_ids = [] - current_state_ids = yield self.store.get_current_state_ids(room_id) - # special-case for an empty prev state: include all members # in the changed list if not event_ids: @@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler): possibly_changed.add(state_key) continue + current_member_id = current_state_ids.get((EventTypes.Member, user_id)) + if not current_member_id: + continue + # mapping from event_id -> state_dict prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + # Check if we've joined the room? If so we just blindly add all the users to + # the "possibly changed" users. + for state_dict in prev_state_ids.itervalues(): + member_event = state_dict.get((EventTypes.Member, user_id), None) + if not member_event or member_event != current_member_id: + for key, event_id in current_state_ids.iteritems(): + etype, state_key = key + if etype != EventTypes.Member: + continue + possibly_changed.add(state_key) + break + # If there has been any change in membership, include them in the # possibly changed list. We'll check if they are joined below, # and we're not toooo worried about spuriously adding users. @@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler): # check if this member has changed since any of the extremities # at the stream_ordering, and add them to the list if so. - for state_dict in prev_state_ids.values(): + for state_dict in prev_state_ids.itervalues(): prev_event_id = state_dict.get(key, None) if not prev_event_id or prev_event_id != event_id: - possibly_changed.add(state_key) + if state_key != user_id: + possibly_changed.add(state_key) break - users_who_share_room = yield self.store.get_users_who_share_room_with_user( - user_id - ) + if possibly_changed or possibly_left: + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) - # Take the intersection of the users whose devices may have changed - # and those that actually still share a room with the user - defer.returnValue(users_who_share_room & possibly_changed) + # Take the intersection of the users whose devices may have changed + # and those that actually still share a room with the user + possibly_joined = possibly_changed & users_who_share_room + possibly_left = (possibly_changed | possibly_left) - users_who_share_room + else: + possibly_joined = [] + possibly_left = [] + + defer.returnValue({ + "changed": list(possibly_joined), + "left": list(possibly_left), + }) @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b790a7c2ef..4669199b2d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1606,7 +1606,7 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR - if event.type == EventTypes.GuestAccess: + if event.type == EventTypes.GuestAccess and not context.rejected: yield self.maybe_kick_guest_users(event) defer.returnValue(context) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d7b90a35ea..f2e4ffcec6 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -119,6 +119,16 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [ return bool(self.join or self.invite or self.leave) +class DeviceLists(collections.namedtuple("DeviceLists", [ + "changed", # list of user_ids whose devices may have changed + "left", # list of user_ids whose devices we no longer track +])): + __slots__ = [] + + def __nonzero__(self): + return bool(self.changed or self.left) + + class SyncResult(collections.namedtuple("SyncResult", [ "next_batch", # Token for the next sync "presence", # List of presence events for the user. @@ -296,6 +306,11 @@ class SyncHandler(object): timeline_limit = sync_config.filter_collection.timeline_limit() block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline() + # Pull out the current state, as we always want to include those events + # in the timeline if they're there. + current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = frozenset(current_state_ids.itervalues()) + if recents is None or newly_joined_room or timeline_limit < len(recents): limited = True else: @@ -307,6 +322,7 @@ class SyncHandler(object): self.store, sync_config.user.to_string(), recents, + always_include_ids=current_state_ids, ) else: recents = [] @@ -342,6 +358,7 @@ class SyncHandler(object): self.store, sync_config.user.to_string(), loaded_recents, + always_include_ids=current_state_ids, ) loaded_recents.extend(recents) recents = loaded_recents @@ -548,7 +565,8 @@ class SyncHandler(object): res = yield self._generate_sync_entry_for_rooms( sync_result_builder, account_data_by_room ) - newly_joined_rooms, newly_joined_users = res + newly_joined_rooms, newly_joined_users, _, _ = res + _, _, newly_left_rooms, newly_left_users = res block_all_presence_data = ( since_token is None and @@ -562,7 +580,11 @@ class SyncHandler(object): yield self._generate_sync_entry_for_to_device(sync_result_builder) device_lists = yield self._generate_sync_entry_for_device_list( - sync_result_builder + sync_result_builder, + newly_joined_rooms=newly_joined_rooms, + newly_joined_users=newly_joined_users, + newly_left_rooms=newly_left_rooms, + newly_left_users=newly_left_users, ) device_id = sync_config.device_id @@ -635,25 +657,50 @@ class SyncHandler(object): @measure_func("_generate_sync_entry_for_device_list") @defer.inlineCallbacks - def _generate_sync_entry_for_device_list(self, sync_result_builder): + def _generate_sync_entry_for_device_list(self, sync_result_builder, + newly_joined_rooms, newly_joined_users, + newly_left_rooms, newly_left_users): user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token if since_token and since_token.device_list_key: - room_ids = yield self.store.get_rooms_for_user(user_id) - - user_ids_changed = set() changed = yield self.store.get_user_whose_devices_changed( since_token.device_list_key ) - for other_user_id in changed: - other_room_ids = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(other_room_ids): - user_ids_changed.add(other_user_id) - defer.returnValue(user_ids_changed) + # TODO: Be more clever than this, i.e. remove users who we already + # share a room with? + for room_id in newly_joined_rooms: + joined_users = yield self.state.get_current_user_in_room(room_id) + newly_joined_users.update(joined_users) + + for room_id in newly_left_rooms: + left_users = yield self.state.get_current_user_in_room(room_id) + newly_left_users.update(left_users) + + # TODO: Check that these users are actually new, i.e. either they + # weren't in the previous sync *or* they left and rejoined. + changed.update(newly_joined_users) + + if not changed and not newly_left_users: + defer.returnValue(DeviceLists( + changed=[], + left=newly_left_users, + )) + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + + defer.returnValue(DeviceLists( + changed=users_who_share_room & changed, + left=set(newly_left_users) - users_who_share_room, + )) else: - defer.returnValue([]) + defer.returnValue(DeviceLists( + changed=[], + left=[], + )) @defer.inlineCallbacks def _generate_sync_entry_for_to_device(self, sync_result_builder): @@ -817,8 +864,8 @@ class SyncHandler(object): account_data_by_room(dict): Dictionary of per room account data Returns: - Deferred(tuple): Returns a 2-tuple of - `(newly_joined_rooms, newly_joined_users)` + Deferred(tuple): Returns a 4-tuple of + `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)` """ user_id = sync_result_builder.sync_config.user.to_string() block_all_room_ephemeral = ( @@ -849,7 +896,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - defer.returnValue(([], [])) + defer.returnValue(([], [], [], [])) ignored_account_data = yield self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id, @@ -862,7 +909,7 @@ class SyncHandler(object): if since_token: res = yield self._get_rooms_changed(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms = res + room_entries, invited, newly_joined_rooms, newly_left_rooms = res tags_by_room = yield self.store.get_updated_tags( user_id, since_token.account_data_key, @@ -870,6 +917,7 @@ class SyncHandler(object): else: res = yield self._get_all_rooms(sync_result_builder, ignored_users) room_entries, invited, newly_joined_rooms = res + newly_left_rooms = [] tags_by_room = yield self.store.get_tags_for_user(user_id) @@ -890,17 +938,30 @@ class SyncHandler(object): # Now we want to get any newly joined users newly_joined_users = set() + newly_left_users = set() if since_token: for joined_sync in sync_result_builder.joined: it = itertools.chain( - joined_sync.timeline.events, joined_sync.state.values() + joined_sync.timeline.events, joined_sync.state.itervalues() ) for event in it: if event.type == EventTypes.Member: if event.membership == Membership.JOIN: newly_joined_users.add(event.state_key) - - defer.returnValue((newly_joined_rooms, newly_joined_users)) + else: + prev_content = event.unsigned.get("prev_content", {}) + prev_membership = prev_content.get("membership", None) + if prev_membership == Membership.JOIN: + newly_left_users.add(event.state_key) + + newly_left_users -= newly_joined_users + + defer.returnValue(( + newly_joined_rooms, + newly_joined_users, + newly_left_rooms, + newly_left_users, + )) @defer.inlineCallbacks def _have_rooms_changed(self, sync_result_builder): @@ -970,15 +1031,17 @@ class SyncHandler(object): mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) newly_joined_rooms = [] + newly_left_rooms = [] room_entries = [] invited = [] - for room_id, events in mem_change_events_by_room_id.items(): + for room_id, events in mem_change_events_by_room_id.iteritems(): non_joins = [e for e in events if e.membership != Membership.JOIN] has_join = len(non_joins) != len(events) # We want to figure out if we joined the room at some point since # the last sync (even if we have since left). This is to make sure # we do send down the room, and with full state, where necessary + old_state_ids = None if room_id in joined_room_ids or has_join: old_state_ids = yield self.get_state_at(room_id, since_token) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) @@ -996,6 +1059,26 @@ class SyncHandler(object): if not non_joins: continue + # Check if we have left the room. This can either be because we were + # joined before *or* that we since joined and then left. + if events[-1].membership != Membership.JOIN: + if has_join: + newly_left_rooms.append(room_id) + else: + if not old_state_ids: + old_state_ids = yield self.get_state_at(room_id, since_token) + old_mem_ev_id = old_state_ids.get( + (EventTypes.Member, user_id), + None, + ) + old_mem_ev = None + if old_mem_ev_id: + old_mem_ev = yield self.store.get_event( + old_mem_ev_id, allow_none=True + ) + if old_mem_ev and old_mem_ev.membership == Membership.JOIN: + newly_left_rooms.append(room_id) + # Only bother if we're still currently invited should_invite = non_joins[-1].membership == Membership.INVITE if should_invite: @@ -1073,7 +1156,7 @@ class SyncHandler(object): upto_token=since_token, )) - defer.returnValue((room_entries, invited, newly_joined_rooms)) + defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms)) @defer.inlineCallbacks def _get_all_rooms(self, sync_result_builder, ignored_users): @@ -1322,6 +1405,7 @@ class SyncResultBuilder(object): self.archived = [] self.device = [] self.groups = None + self.to_device = [] class RoomSyncResultBuilder(object): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 803ac3e75b..b0d64aa6c4 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -20,6 +20,8 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes, Membership +from synapse.metrics import get_metrics_for +from synapse.util.caches import metrics as cache_metrics from synapse.util.caches.descriptors import cached from synapse.util.async import Linearizer @@ -31,6 +33,23 @@ logger = logging.getLogger(__name__) rules_by_room = {} +push_metrics = get_metrics_for(__name__) + +push_rules_invalidation_counter = push_metrics.register_counter( + "push_rules_invalidation_counter" +) +push_rules_state_size_counter = push_metrics.register_counter( + "push_rules_state_size_counter" +) + +# Measures whether we use the fast path of using state deltas, or if we have to +# recalculate from scratch +push_rules_delta_state_cache_metric = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values + cache_name="push_rules_delta_state_cache_metric", +) + class BulkPushRuleEvaluator(object): """Calculates the outcome of push rules for an event for all users in the @@ -41,6 +60,12 @@ class BulkPushRuleEvaluator(object): self.hs = hs self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # There's not good value for this + cache_name="room_push_rule_cache", + ) + @defer.inlineCallbacks def _get_rules_for_event(self, event, context): """This gets the rules for all users in the room at the time of the event, @@ -78,7 +103,10 @@ class BulkPushRuleEvaluator(object): # It's important that RulesForRoom gets added to self._get_rules_for_room.cache # before any lookup methods get called on it as otherwise there may be # a race if invalidate_all gets called (which assumes its in the cache) - return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache) + return RulesForRoom( + self.hs, room_id, self._get_rules_for_room.cache, + self.room_push_rule_cache_metrics, + ) @defer.inlineCallbacks def action_for_event_by_user(self, event, context): @@ -161,17 +189,19 @@ class RulesForRoom(object): the entire cache for the room. """ - def __init__(self, hs, room_id, rules_for_room_cache): + def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics): """ Args: hs (HomeServer) room_id (str) rules_for_room_cache(Cache): The cache object that caches these RoomsForUser objects. + room_push_rule_cache_metrics (CacheMetric) """ self.room_id = room_id self.is_mine_id = hs.is_mine_id self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = room_push_rule_cache_metrics self.linearizer = Linearizer(name="rules_for_room") @@ -213,11 +243,19 @@ class RulesForRoom(object): """ state_group = context.state_group + if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() + defer.returnValue(self.rules_by_user) + with (yield self.linearizer.queue(())): if state_group and self.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() defer.returnValue(self.rules_by_user) + self.room_push_rule_cache_metrics.inc_misses() + ret_rules_by_user = {} missing_member_event_ids = {} if state_group and self.state_group == context.prev_group: @@ -225,8 +263,13 @@ class RulesForRoom(object): # results. ret_rules_by_user = self.rules_by_user current_state_ids = context.delta_ids + + push_rules_delta_state_cache_metric.inc_hits() else: current_state_ids = context.current_state_ids + push_rules_delta_state_cache_metric.inc_misses() + + push_rules_state_size_counter.inc_by(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids @@ -273,6 +316,14 @@ class RulesForRoom(object): yield self._update_rules_with_member_event_ids( ret_rules_by_user, missing_member_event_ids, state_group, event ) + else: + # The push rules didn't change but lets update the cache anyway + self.update_cache( + self.sequence, + members={}, # There were no membership changes + rules_by_user=ret_rules_by_user, + state_group=state_group + ) if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -371,6 +422,7 @@ class RulesForRoom(object): self.state_group = object() self.member_map = {} self.rules_by_user = {} + push_rules_invalidation_counter.inc() def update_cache(self, sequence, members, rules_by_user, state_group): if sequence == self.sequence: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 8a5d473108..62c41cd9db 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -244,6 +244,26 @@ class HttpPusher(object): @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): + if self.data.get('format') == 'event_id_only': + d = { + 'notification': { + 'event_id': event.event_id, + 'room_id': event.room_id, + 'counts': { + 'unread': badge, + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + defer.returnValue(d) + ctx = yield push_tools.get_context_for_event( self.store, self.state_handler, event, self.user_id ) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index ed7f1c89ad..630e92c90e 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -31,7 +31,7 @@ REQUIREMENTS = { "pyyaml": ["yaml"], "pyasn1": ["pyasn1"], "daemonize": ["daemonize"], - "py-bcrypt": ["bcrypt"], + "bcrypt": ["bcrypt"], "pillow": ["PIL"], "pydenticon": ["pydenticon"], "ujson": ["ujson"], @@ -40,6 +40,7 @@ REQUIREMENTS = { "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], "phonenumbers>=8.2.0": ["phonenumbers"], + "affinity": ["affinity"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index 7d786e8de3..465b25033d 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet): DEFAULT_MESSAGE = ( "Sharing illegal content on this server is not permitted and rooms in" - " violatation will be blocked." + " violation will be blocked." ) def __init__(self, hs): @@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet): class ResetPasswordRestServlet(ClientV1RestServlet): """Post request to allow an administrator reset password for a user. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/reset_password/ @user:to_reset_password?access_token=admin_access_token @@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): """Post request to allow an administrator reset password for a user. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. """ UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) @@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet): class GetUsersPaginatedRestServlet(ClientV1RestServlet): """Get request to get specific number of users from Synapse. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/ @admin:user?access_token=admin_access_token&start=0&limit=10 @@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_GET(self, request, target_user_id): """Get request to get specific number of users from Synapse. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. """ target_user = UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) @@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, target_user_id): """Post request to get specific number of users from Synapse.. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/ @admin:user?access_token=admin_access_token @@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet): class SearchUsersRestServlet(ClientV1RestServlet): """Get request to search user table for specific users according to search term. - This need a user have a administrator access in Synapse. + This needs user to have administrator access in Synapse. Example: http://localhost:8008/_matrix/client/api/v1/admin/search_users/ @admin:user?access_token=admin_access_token&term=alice @@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet): def on_GET(self, request, target_user_id): """Get request to search user table for specific users according to search term. - This need a user have a administrator access in Synapse. + This needs user to have a administrator access in Synapse. """ target_user = UserID.from_string(target_user_id) requester = yield self.auth.get_user_by_req(request) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 6a3cfe84f8..943e87e7fd 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet): user_id = requester.user.to_string() - changed = yield self.device_handler.get_user_ids_changed( + results = yield self.device_handler.get_user_ids_changed( user_id, from_token, ) - defer.returnValue((200, { - "changed": list(changed), - })) + defer.returnValue((200, results)) class OneTimeKeyServlet(RestServlet): diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 5f208a4c1c..a1e0e53b33 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet): filter_id = parse_string(request, "filter", default=None) full_state = parse_boolean(request, "full_state", default=False) - logger.info( + logger.debug( "/sync: user=%r, timeout=%r, since=%r," " set_presence=%r, filter_id=%r, device_id=%r" % ( user, timeout, since, set_presence, filter_id, device_id @@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet): "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, "device_lists": { - "changed": list(sync_result.device_lists), + "changed": list(sync_result.device_lists.changed), + "left": list(sync_result.device_lists.left), }, "presence": SyncRestServlet.encode_presence( sync_result.presence, time_now diff --git a/synapse/visibility.py b/synapse/visibility.py index 5590b866ed..d7dbdc77ff 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = ( @defer.inlineCallbacks -def filter_events_for_clients(store, user_tuples, events, event_id_to_state): +def filter_events_for_clients(store, user_tuples, events, event_id_to_state, + always_include_ids=frozenset()): """ Returns dict of user_id -> list of events that user is allowed to see. @@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): * the user has not been a member of the room since the given events events ([synapse.events.EventBase]): list of events to filter + always_include_ids (set(event_id)): set of event ids to specifically + include (unless sender is ignored) """ forgotten = yield preserve_context_over_deferred(defer.gatherResults([ defer.maybeDeferred( @@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): if not event.is_state() and event.sender in ignore_list: return False + if event.event_id in always_include_ids: + return True + state = event_id_to_state[event.event_id] # get the room_visibility at the time of the event. @@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_client(store, user_id, events, is_peeking=False): +def filter_events_for_client(store, user_id, events, is_peeking=False, + always_include_ids=frozenset()): """ Check which events a user is allowed to see @@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False): types=types ) res = yield filter_events_for_clients( - store, [(user_id, is_peeking)], events, event_id_to_state + store, [(user_id, is_peeking)], events, event_id_to_state, + always_include_ids=always_include_ids, ) defer.returnValue(res.get(user_id, [])) diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py new file mode 100644 index 0000000000..da2c9e44e7 --- /dev/null +++ b/tests/crypto/test_keyring.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.crypto import keyring +from synapse.util.logcontext import LoggingContext +from tests import utils, unittest +from twisted.internet import defer + + +class KeyringTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.hs = yield utils.setup_test_homeserver(handlers=None) + + @defer.inlineCallbacks + def test_wait_for_previous_lookups(self): + sentinel_context = LoggingContext.current_context() + + kr = keyring.Keyring(self.hs) + + def check_context(_, expected): + self.assertEquals( + LoggingContext.current_context().test_key, expected + ) + + lookup_1_deferred = defer.Deferred() + lookup_2_deferred = defer.Deferred() + + with LoggingContext("one") as context_one: + context_one.test_key = "one" + + wait_1_deferred = kr.wait_for_previous_lookups( + ["server1"], + {"server1": lookup_1_deferred}, + ) + + # there were no previous lookups, so the deferred should be ready + self.assertTrue(wait_1_deferred.called) + # ... so we should have preserved the LoggingContext. + self.assertIs(LoggingContext.current_context(), context_one) + wait_1_deferred.addBoth(check_context, "one") + + with LoggingContext("two") as context_two: + context_two.test_key = "two" + + # set off another wait. It should block because the first lookup + # hasn't yet completed. + wait_2_deferred = kr.wait_for_previous_lookups( + ["server1"], + {"server1": lookup_2_deferred}, + ) + self.assertFalse(wait_2_deferred.called) + # ... so we should have reset the LoggingContext. + self.assertIs(LoggingContext.current_context(), sentinel_context) + wait_2_deferred.addBoth(check_context, "two") + + # let the first lookup complete (in the sentinel context) + lookup_1_deferred.callback(None) + + # now the second wait should complete and restore our + # loggingcontext. + yield wait_2_deferred diff --git a/tox.ini b/tox.ini index 39ad305360..f408defc8f 100644 --- a/tox.ini +++ b/tox.ini @@ -14,14 +14,38 @@ deps = setenv = PYTHONDONTWRITEBYTECODE = no_byte_code - # As of twisted 16.4, trial tries to import the tests as a package, which - # means it needs to be on the pythonpath. - PYTHONPATH = {toxinidir} + commands = - /bin/sh -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \ - {envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}" + /usr/bin/find "{toxinidir}" -name '*.pyc' -delete + coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \ + "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:} {env:DUMP_COVERAGE_COMMAND:coverage report -m} +[testenv:py27] + +# As of twisted 16.4, trial tries to import the tests as a package (previously +# it loaded the files explicitly), which means they need to be on the +# pythonpath. Our sdist doesn't include the 'tests' package, so normally it +# doesn't work within the tox virtualenv. +# +# As a workaround, we tell tox to do install with 'pip -e', which just +# creates a symlink to the project directory instead of unpacking the sdist. +# +# (An alternative to this would be to set PYTHONPATH to include the project +# directory. Note two problems with this: +# +# - if you set it via `setenv`, then it is also set during the 'install' +# phase, which inhibits unpacking the sdist, so the virtualenv isn't +# useful for anything else without setting PYTHONPATH similarly. +# +# - `synapse` is also loaded from PYTHONPATH so even if you only set +# PYTHONPATH for the test phase, we're still running the tests against +# the working copy rather than the contents of the sdist. So frankly +# you might as well use -e in the first place. +# +# ) +usedevelop=true + [testenv:packaging] deps = check-manifest |