summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.github/ISSUE_TEMPLATE.md47
-rw-r--r--MANIFEST.in1
-rw-r--r--UPGRADE.rst75
-rw-r--r--contrib/prometheus/README20
-rw-r--r--contrib/prometheus/consoles/synapse.html395
-rw-r--r--contrib/prometheus/synapse.rules21
-rw-r--r--docs/postgres.rst2
-rwxr-xr-xjenkins-dendron-haproxy-postgres.sh1
-rwxr-xr-xjenkins-dendron-postgres.sh1
-rwxr-xr-xjenkins-postgres.sh1
-rwxr-xr-xjenkins-sqlite.sh1
-rwxr-xr-x[-rw-r--r--]scripts-dev/federation_client.py87
-rw-r--r--synapse/api/auth.py8
-rw-r--r--synapse/app/_base.py99
-rw-r--r--synapse/app/appservice.py50
-rw-r--r--synapse/app/client_reader.py53
-rw-r--r--synapse/app/federation_reader.py53
-rw-r--r--synapse/app/federation_sender.py57
-rw-r--r--synapse/app/frontend_proxy.py64
-rwxr-xr-xsynapse/app/homeserver.py114
-rw-r--r--synapse/app/media_repository.py53
-rw-r--r--synapse/app/pusher.py57
-rw-r--r--synapse/app/synchrotron.py69
-rw-r--r--synapse/app/user_dir.py53
-rw-r--r--synapse/config/server.py33
-rw-r--r--synapse/config/workers.py1
-rw-r--r--synapse/crypto/keyclient.py17
-rw-r--r--synapse/crypto/keyring.py285
-rw-r--r--synapse/events/spamcheck.py38
-rw-r--r--synapse/federation/federation_base.py138
-rw-r--r--synapse/federation/federation_client.py8
-rw-r--r--synapse/handlers/device.py68
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/message.py8
-rw-r--r--synapse/handlers/room_member.py22
-rw-r--r--synapse/handlers/sync.py133
-rw-r--r--synapse/http/endpoint.py116
-rw-r--r--synapse/push/httppusher.py20
-rw-r--r--synapse/python_dependencies.py3
-rw-r--r--synapse/rest/client/v1/admin.py16
-rw-r--r--synapse/rest/client/v2_alpha/keys.py6
-rw-r--r--synapse/rest/client/v2_alpha/sync.py5
-rw-r--r--synapse/storage/keys.py41
-rw-r--r--synapse/visibility.py14
-rw-r--r--tests/crypto/test_keyring.py229
-rw-r--r--tests/test_dns.py26
-rw-r--r--tests/utils.py1
-rw-r--r--tox.ini34
48 files changed, 1855 insertions, 805 deletions
diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md
new file mode 100644
index 0000000000..d2050a3e44
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE.md
@@ -0,0 +1,47 @@
+<!-- 
+
+**IF YOU HAVE SUPPORT QUESTIONS ABOUT RUNNING OR CONFIGURING YOUR OWN HOME SERVER**: 
+You will likely get better support more quickly if you ask in ** #matrix:matrix.org ** ;)
+
+
+This is a bug report template. By following the instructions below and
+filling out the sections with your information, you will help the us to get all
+the necessary data to fix your issue.
+
+You can also preview your report before submitting it. You may remove sections
+that aren't relevant to your particular case.
+
+Text between <!-- and --​> marks will be invisible in the report.
+
+-->
+
+### Description
+
+Describe here the problem that you are experiencing, or the feature you are requesting.
+
+### Steps to reproduce
+
+- For bugs, list the steps
+- that reproduce the bug
+- using hyphens as bullet points
+
+Describe how what happens differs from what you expected.
+
+If you can identify any relevant log snippets from _homeserver.log_, please include
+those here (please be careful to remove any personal or private data):
+
+### Version information
+
+<!-- IMPORTANT: please answer the following questions, to help us narrow down the problem -->
+
+- **Homeserver**: Was this issue identified on matrix.org or another homeserver?
+
+If not matrix.org:
+- **Version**:        What version of Synapse is running? <!-- 
+You can find the Synapse version by inspecting the server headers (replace matrix.org with
+your own homeserver domain):
+$ curl -v https://matrix.org/_matrix/client/versions 2>&1 | grep "Server:"
+-->
+- **Install method**: package manager/git clone/pip      
+- **Platform**:       Tell us about the environment in which your homeserver is operating
+                      - distro, hardware, if it's running in a vm/container, etc.
diff --git a/MANIFEST.in b/MANIFEST.in
index 981698143f..afb60e12ee 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -27,4 +27,5 @@ exclude jenkins*.sh
 exclude jenkins*
 recursive-exclude jenkins *.sh
 
+prune .github
 prune demo/etc
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 62b22e9108..2efe7ea60f 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -5,39 +5,48 @@ Before upgrading check if any special steps are required to upgrade from the
 what you currently have installed to current version of synapse. The extra
 instructions that may be required are listed later in this document.
 
-If synapse was installed in a virtualenv then active that virtualenv before
-upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then run:
+1. If synapse was installed in a virtualenv then active that virtualenv before
+   upgrading. If synapse is installed in a virtualenv in ``~/.synapse/`` then
+   run:
 
-.. code:: bash
+   .. code:: bash
 
-    source ~/.synapse/bin/activate
+       source ~/.synapse/bin/activate
 
-If synapse was installed using pip then upgrade to the latest version by
-running:
+2. If synapse was installed using pip then upgrade to the latest version by
+   running:
 
-.. code:: bash
+   .. code:: bash
 
-    pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
+       pip install --upgrade --process-dependency-links https://github.com/matrix-org/synapse/tarball/master
 
-If synapse was installed using git then upgrade to the latest version by
-running:
+       # restart synapse
+       synctl restart
 
-.. code:: bash
 
-    # Pull the latest version of the master branch.
-    git pull
-    # Update the versions of synapse's python dependencies.
-    python synapse/python_dependencies.py | xargs -n1 pip install --upgrade
-	
-To check whether your update was sucessfull, run:
+   If synapse was installed using git then upgrade to the latest version by
+   running:
 
-.. code:: bash
+   .. code:: bash
+
+       # Pull the latest version of the master branch.
+       git pull
+       # Update the versions of synapse's python dependencies.
+       python synapse/python_dependencies.py | xargs pip install --upgrade
 
-	 # replace your.server.domain with ther domain of your synapse homeserver
-	 curl https://<your.server.domain>/_matrix/federation/v1/version 
+       # restart synapse
+       ./synctl restart
 
-So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
 
+To check whether your update was sucessful, you can check the Server header
+returned by the Client-Server API:
+
+.. code:: bash
+
+    # replace <host.name> with the hostname of your synapse homeserver.
+    # You may need to specify a port (eg, :8448) if your server is not
+    # configured on port 443.
+    curl -kv https://<host.name>/_matrix/client/versions 2>&1 | grep "Server:"
 
 Upgrading to v0.15.0
 ====================
@@ -77,7 +86,7 @@ It has been replaced by specifying a list of application service registrations i
 ``homeserver.yaml``::
 
   app_service_config_files: ["registration-01.yaml", "registration-02.yaml"]
-  
+
 Where ``registration-01.yaml`` looks like::
 
   url: <String>  # e.g. "https://my.application.service.com"
@@ -166,7 +175,7 @@ This release completely changes the database schema and so requires upgrading
 it before starting the new version of the homeserver.
 
 The script "database-prepare-for-0.5.0.sh" should be used to upgrade the
-database. This will save all user information, such as logins and profiles, 
+database. This will save all user information, such as logins and profiles,
 but will otherwise purge the database. This includes messages, which
 rooms the home server was a member of and room alias mappings.
 
@@ -175,18 +184,18 @@ file and ask for help in #matrix:matrix.org. The upgrade process is,
 unfortunately, non trivial and requires human intervention to resolve any
 resulting conflicts during the upgrade process.
 
-Before running the command the homeserver should be first completely 
+Before running the command the homeserver should be first completely
 shutdown. To run it, simply specify the location of the database, e.g.:
 
   ./scripts/database-prepare-for-0.5.0.sh "homeserver.db"
 
-Once this has successfully completed it will be safe to restart the 
-homeserver. You may notice that the homeserver takes a few seconds longer to 
+Once this has successfully completed it will be safe to restart the
+homeserver. You may notice that the homeserver takes a few seconds longer to
 restart than usual as it reinitializes the database.
 
 On startup of the new version, users can either rejoin remote rooms using room
 aliases or by being reinvited. Alternatively, if any other homeserver sends a
-message to a room that the homeserver was previously in the local HS will 
+message to a room that the homeserver was previously in the local HS will
 automatically rejoin the room.
 
 Upgrading to v0.4.0
@@ -245,7 +254,7 @@ automatically generate default config use::
         --config-path homeserver.config \
         --generate-config
 
-This config can be edited if desired, for example to specify a different SSL 
+This config can be edited if desired, for example to specify a different SSL
 certificate to use. Once done you can run the home server using::
 
     $ python synapse/app/homeserver.py --config-path homeserver.config
@@ -266,20 +275,20 @@ This release completely changes the database schema and so requires upgrading
 it before starting the new version of the homeserver.
 
 The script "database-prepare-for-0.0.1.sh" should be used to upgrade the
-database. This will save all user information, such as logins and profiles, 
+database. This will save all user information, such as logins and profiles,
 but will otherwise purge the database. This includes messages, which
 rooms the home server was a member of and room alias mappings.
 
-Before running the command the homeserver should be first completely 
+Before running the command the homeserver should be first completely
 shutdown. To run it, simply specify the location of the database, e.g.:
 
   ./scripts/database-prepare-for-0.0.1.sh "homeserver.db"
 
-Once this has successfully completed it will be safe to restart the 
-homeserver. You may notice that the homeserver takes a few seconds longer to 
+Once this has successfully completed it will be safe to restart the
+homeserver. You may notice that the homeserver takes a few seconds longer to
 restart than usual as it reinitializes the database.
 
 On startup of the new version, users can either rejoin remote rooms using room
 aliases or by being reinvited. Alternatively, if any other homeserver sends a
-message to a room that the homeserver was previously in the local HS will 
+message to a room that the homeserver was previously in the local HS will
 automatically rejoin the room.
diff --git a/contrib/prometheus/README b/contrib/prometheus/README
new file mode 100644
index 0000000000..eb91db2de2
--- /dev/null
+++ b/contrib/prometheus/README
@@ -0,0 +1,20 @@
+This directory contains some sample monitoring config for using the
+'Prometheus' monitoring server against synapse.
+
+To use it, first install prometheus by following the instructions at
+
+  http://prometheus.io/
+
+Then add a new job to the main prometheus.conf file:
+
+  job: {
+    name: "synapse"
+
+    target_group: {
+      target: "http://SERVER.LOCATION.HERE:PORT/_synapse/metrics"
+    }
+  }
+
+Metrics are disabled by default when running synapse; they must be enabled
+with the 'enable-metrics' option, either in the synapse config file or as a
+command-line option.
diff --git a/contrib/prometheus/consoles/synapse.html b/contrib/prometheus/consoles/synapse.html
new file mode 100644
index 0000000000..e23d8a1fce
--- /dev/null
+++ b/contrib/prometheus/consoles/synapse.html
@@ -0,0 +1,395 @@
+{{ template "head" . }}
+
+{{ template "prom_content_head" . }}
+<h1>System Resources</h1>
+
+<h3>CPU</h3>
+<div id="process_resource_utime"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#process_resource_utime"),
+  expr: "rate(process_cpu_seconds_total[2m]) * 100",
+  name: "[[job]]",  
+  min: 0,
+  max: 100,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "%",
+  yTitle: "CPU Usage"
+})
+</script>
+
+<h3>Memory</h3>
+<div id="process_resource_maxrss"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#process_resource_maxrss"),
+  expr: "process_psutil_rss:max",
+  name: "Maxrss",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "bytes",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>File descriptors</h3>
+<div id="process_fds"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#process_fds"),
+  expr: "process_open_fds{job='synapse'}",
+  name: "FDs",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "",
+  yTitle: "Descriptors"
+})
+</script>
+
+<h1>Reactor</h1>
+
+<h3>Total reactor time</h3>
+<div id="reactor_total_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#reactor_total_time"),
+  expr: "rate(python_twisted_reactor_tick_time:total[2m]) / 1000",
+  name: "time",
+  max: 1,
+  min: 0,
+  renderer: "area",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>Average reactor tick time</h3>
+<div id="reactor_average_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#reactor_average_time"),
+  expr: "rate(python_twisted_reactor_tick_time:total[2m]) / rate(python_twisted_reactor_tick_time:count[2m]) / 1000",
+  name: "time",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s",
+  yTitle: "Time"
+})
+</script>
+
+<h3>Pending calls per tick</h3>
+<div id="reactor_pending_calls"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#reactor_pending_calls"),
+  expr: "rate(python_twisted_reactor_pending_calls:total[30s])/rate(python_twisted_reactor_pending_calls:count[30s])",
+  name: "calls",
+  min: 0,
+  renderer: "line",
+  height: 150,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yTitle: "Pending Cals"
+})
+</script>
+
+<h1>Storage</h1>
+
+<h3>Queries</h3>
+<div id="synapse_storage_query_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_query_time"),
+  expr: "rate(synapse_storage_query_time:count[2m])",
+  name: "[[verb]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "queries/s",
+  yTitle: "Queries"
+})
+</script>
+
+<h3>Transactions</h3>
+<div id="synapse_storage_transaction_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_transaction_time"),
+  expr: "rate(synapse_storage_transaction_time:count[2m])",
+  name: "[[desc]]",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "txn/s",
+  yTitle: "Transactions"
+})
+</script>
+
+<h3>Transaction execution time</h3>
+<div id="synapse_storage_transactions_time_msec"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_transactions_time_msec"),
+  expr: "rate(synapse_storage_transaction_time:total[2m]) / 1000",
+  name: "[[desc]]",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>Database scheduling latency</h3>
+<div id="synapse_storage_schedule_time"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_storage_schedule_time"),
+  expr: "rate(synapse_storage_schedule_time:total[2m]) / 1000",
+  name: "Total latency",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "Usage"
+})
+</script>
+
+<h3>Cache hit ratio</h3>
+<div id="synapse_cache_ratio"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_cache_ratio"),
+  expr: "rate(synapse_util_caches_cache:total[2m]) * 100",
+  name: "[[name]]",
+  min: 0,
+  max: 100,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "%",
+  yTitle: "Percentage"
+})
+</script>
+
+<h3>Cache size</h3>
+<div id="synapse_cache_size"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_cache_size"),
+  expr: "synapse_util_caches_cache:size",
+  name: "[[name]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "",
+  yTitle: "Items"
+})
+</script>
+
+<h1>Requests</h1>
+
+<h3>Requests by Servlet</h3>
+<div id="synapse_http_server_requests_servlet"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_requests_servlet"),
+  expr: "rate(synapse_http_server_requests:servlet[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+<h4>&nbsp;(without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
+<div id="synapse_http_server_requests_servlet_minus_events"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
+  expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Average response times</h3>
+<div id="synapse_http_server_response_time_avg"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_response_time_avg"),
+  expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/req",
+  yTitle: "Response time"
+})
+</script>
+
+<h3>All responses by code</h3>
+<div id="synapse_http_server_responses"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_responses"),
+  expr: "rate(synapse_http_server_responses[2m])",
+  name: "[[method]] / [[code]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Error responses by code</h3>
+<div id="synapse_http_server_responses_err"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_responses_err"),
+  expr: "rate(synapse_http_server_responses{code=~\"[45]..\"}[2m])",
+  name: "[[method]] / [[code]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+
+<h3>CPU Usage</h3>
+<div id="synapse_http_server_response_ru_utime"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_response_ru_utime"),
+  expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "CPU Usage"
+})
+</script>
+
+
+<h3>DB Usage</h3>
+<div id="synapse_http_server_response_db_txn_duration"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
+  expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/s",
+  yTitle: "DB Usage"
+})
+</script>
+
+
+<h3>Average event send times</h3>
+<div id="synapse_http_server_send_time_avg"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_http_server_send_time_avg"),
+  expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
+  name: "[[servlet]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "s/req",
+  yTitle: "Response time"
+})
+</script>
+
+<h1>Federation</h1>
+
+<h3>Sent Messages</h3>
+<div id="synapse_federation_client_sent"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_federation_client_sent"),
+  expr: "rate(synapse_federation_client_sent[2m])",
+  name: "[[type]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Received Messages</h3>
+<div id="synapse_federation_server_received"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_federation_server_received"),
+  expr: "rate(synapse_federation_server_received[2m])",
+  name: "[[type]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "req/s",
+  yTitle: "Requests"
+})
+</script>
+
+<h3>Pending</h3>
+<div id="synapse_federation_transaction_queue_pending"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_federation_transaction_queue_pending"),
+  expr: "synapse_federation_transaction_queue_pending",
+  name: "[[type]]",
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "",
+  yTitle: "Units"
+})
+</script>
+
+<h1>Clients</h1>
+
+<h3>Notifiers</h3>
+<div id="synapse_notifier_listeners"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_notifier_listeners"),
+  expr: "synapse_notifier_listeners",
+  name: "listeners",
+  min: 0,
+  yAxisFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yHoverFormatter: PromConsole.NumberFormatter.humanizeNoSmallPrefix,
+  yUnits: "",
+  yTitle: "Listeners"
+})
+</script>
+
+<h3>Notified Events</h3>
+<div id="synapse_notifier_notified_events"></div>
+<script>
+new PromConsole.Graph({
+  node: document.querySelector("#synapse_notifier_notified_events"),
+  expr: "rate(synapse_notifier_notified_events[2m])",
+  name: "events",
+  yAxisFormatter: PromConsole.NumberFormatter.humanize,
+  yHoverFormatter: PromConsole.NumberFormatter.humanize,
+  yUnits: "events/s",
+  yTitle: "Event rate"
+})
+</script>
+
+{{ template "prom_content_tail" . }}
+
+{{ template "tail" }}
diff --git a/contrib/prometheus/synapse.rules b/contrib/prometheus/synapse.rules
new file mode 100644
index 0000000000..b6f84174b0
--- /dev/null
+++ b/contrib/prometheus/synapse.rules
@@ -0,0 +1,21 @@
+synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
+synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
+
+synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
+synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
+
+synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
+
+synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
+synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
+
+synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0
+synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0
+synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job)
+
+synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0
+synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0
+synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job)
+
+synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0
+synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0
diff --git a/docs/postgres.rst b/docs/postgres.rst
index b592801e93..904942ec74 100644
--- a/docs/postgres.rst
+++ b/docs/postgres.rst
@@ -1,6 +1,8 @@
 Using Postgres
 --------------
 
+Postgres version 9.4 or later is known to work.
+
 Set up database
 ===============
 
diff --git a/jenkins-dendron-haproxy-postgres.sh b/jenkins-dendron-haproxy-postgres.sh
index d64b2d2c9d..07979bf8b8 100755
--- a/jenkins-dendron-haproxy-postgres.sh
+++ b/jenkins-dendron-haproxy-postgres.sh
@@ -17,6 +17,7 @@ export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy
 ./sytest/jenkins/prep_sytest_for_postgres.sh
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \
     --dendron $WORKSPACE/dendron/bin/dendron \
     --haproxy \
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index 37ae746f4b..3b932fe340 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -15,5 +15,6 @@ export SYNAPSE_CACHE_FACTOR=1
 ./sytest/jenkins/prep_sytest_for_postgres.sh
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \
     --dendron $WORKSPACE/dendron/bin/dendron \
diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh
index f2ca8ccdff..1afb736394 100755
--- a/jenkins-postgres.sh
+++ b/jenkins-postgres.sh
@@ -14,4 +14,5 @@ export SYNAPSE_CACHE_FACTOR=1
 ./sytest/jenkins/prep_sytest_for_postgres.sh
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \
diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh
index 84613d979c..baf4713a01 100755
--- a/jenkins-sqlite.sh
+++ b/jenkins-sqlite.sh
@@ -12,4 +12,5 @@ export SYNAPSE_CACHE_FACTOR=1
 ./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
 
 ./sytest/jenkins/install_and_run.sh \
+    --python $WORKSPACE/.tox/py27/bin/python \
     --synapse-directory $WORKSPACE \
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index d1ab42d3af..82a90ef6fa 100644..100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -1,10 +1,30 @@
+#!/usr/bin/env python
+#
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import argparse
 import nacl.signing
 import json
 import base64
 import requests
 import sys
 import srvlookup
-
+import yaml
 
 def encode_base64(input_bytes):
     """Encode bytes as a base64 string without any padding."""
@@ -120,11 +140,13 @@ def get_json(origin_name, origin_key, destination, path):
             origin_name, key, sig,
         )
         authorization_headers.append(bytes(header))
-        sys.stderr.write(header)
-        sys.stderr.write("\n")
+        print ("Authorization: %s" % header, file=sys.stderr)
+
+    dest = lookup(destination, path)
+    print ("Requesting %s" % dest, file=sys.stderr)
 
     result = requests.get(
-        lookup(destination, path),
+        dest,
         headers={"Authorization": authorization_headers[0]},
         verify=False,
     )
@@ -133,17 +155,66 @@ def get_json(origin_name, origin_key, destination, path):
 
 
 def main():
-    origin_name, keyfile, destination, path = sys.argv[1:]
+    parser = argparse.ArgumentParser(
+        description=
+            "Signs and sends a federation request to a matrix homeserver",
+    )
+
+    parser.add_argument(
+        "-N", "--server-name",
+        help="Name to give as the local homeserver. If unspecified, will be "
+             "read from the config file.",
+    )
+
+    parser.add_argument(
+        "-k", "--signing-key-path",
+        help="Path to the file containing the private ed25519 key to sign the "
+             "request with.",
+    )
+
+    parser.add_argument(
+        "-c", "--config",
+        default="homeserver.yaml",
+        help="Path to server config file. Ignored if --server-name and "
+             "--signing-key-path are both given.",
+    )
 
-    with open(keyfile) as f:
+    parser.add_argument(
+        "-d", "--destination",
+        default="matrix.org",
+        help="name of the remote homeserver. We will do SRV lookups and "
+             "connect appropriately.",
+    )
+
+    parser.add_argument(
+        "path",
+        help="request path. We will add '/_matrix/federation/v1/' to this."
+    )
+
+    args = parser.parse_args()
+
+    if not args.server_name or not args.signing_key_path:
+        read_args_from_config(args)
+
+    with open(args.signing_key_path) as f:
         key = read_signing_keys(f)[0]
 
     result = get_json(
-        origin_name, key, destination, "/_matrix/federation/v1/" + path
+        args.server_name, key, args.destination, "/_matrix/federation/v1/" + args.path
     )
 
     json.dump(result, sys.stdout)
-    print ""
+    print ("")
+
+
+def read_args_from_config(args):
+    with open(args.config, 'r') as fh:
+        config = yaml.safe_load(fh)
+        if not args.server_name:
+            args.server_name = config['server_name']
+        if not args.signing_key_path:
+            args.signing_key_path = config['signing_key_path']
+
 
 if __name__ == "__main__":
     main()
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e3da45b416..72858cca1f 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -519,6 +519,14 @@ class Auth(object):
             )
 
     def is_server_admin(self, user):
+        """ Check if the given user is a local server admin.
+
+        Args:
+            user (str): mxid of user to check
+
+        Returns:
+            bool: True if the user is an admin
+        """
         return self.store.is_server_admin(user)
 
     @defer.inlineCallbacks
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
new file mode 100644
index 0000000000..cd0e815919
--- /dev/null
+++ b/synapse/app/_base.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+
+import affinity
+from daemonize import Daemonize
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+from twisted.internet import reactor
+
+
+def start_worker_reactor(appname, config):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        config (synapse.config.Config): config object
+    """
+
+    logger = logging.getLogger(config.worker_app)
+
+    start_reactor(
+        appname,
+        config.soft_file_limit,
+        config.gc_thresholds,
+        config.worker_pid_file,
+        config.worker_daemonize,
+        config.worker_cpu_affinity,
+        logger,
+    )
+
+
+def start_reactor(
+        appname,
+        soft_file_limit,
+        gc_thresholds,
+        pid_file,
+        daemonize,
+        cpu_affinity,
+        logger,
+):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        soft_file_limit (int):
+        gc_thresholds:
+        pid_file (str): name of pid file to write to if daemonize is True
+        daemonize (bool): true to run the reactor in a background process
+        cpu_affinity (int|None): cpu affinity mask
+        logger (logging.Logger): logger instance to pass to Daemonize
+    """
+
+    def run():
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
+            logger.info("Running")
+            if cpu_affinity is not None:
+                logger.info("Setting CPU affinity to %s" % cpu_affinity)
+                affinity.set_process_affinity_mask(0, cpu_affinity)
+            change_resource_limit(soft_file_limit)
+            if gc_thresholds:
+                gc.set_threshold(*gc_thresholds)
+            reactor.run()
+
+    if daemonize:
+        daemon = Daemonize(
+            app=appname,
+            pid=pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a476efa63..ba2657bbad 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -13,38 +13,31 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.appservice")
 
 
@@ -181,36 +174,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-appservice",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-appservice", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 09bc1935f1..129cfa901f 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -13,47 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.client_reader")
 
 
@@ -183,36 +175,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-client-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-client-reader", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..40cebe6f4a 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,44 +13,36 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_reader")
 
 
@@ -172,36 +164,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-reader", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 03327dc47a..389e3909d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,44 +13,37 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_sender")
 
 
@@ -213,36 +206,12 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
-
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-sender",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-sender", config)
 
 
 class FederationSenderHandler(object):
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 132f18a979..bee4c47498 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -13,48 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request,
+)
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-from synapse.api.errors import SynapseError
-from synapse.http.servlet import (
-    RestServlet, parse_json_object_from_request,
-)
-from synapse.rest.client.v2_alpha._base import client_v2_patterns
-
-from synapse import events
-
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
-
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
@@ -234,36 +225,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-frontend-proxy",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-frontend-proxy", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 081e7cce59..84ad8f04a0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,61 +13,48 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import synapse
-
 import gc
 import logging
 import os
 import sys
 
+import synapse
 import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+    LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+    STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
-    check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
-
-from twisted.internet import reactor, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
-    FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
-    SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
-    SERVER_KEY_V2_PREFIX,
-)
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
 from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+    check_requirements
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
 
 logger = logging.getLogger("synapse.app.homeserver")
 
@@ -446,37 +433,18 @@ def run(hs):
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
 
-    def in_thread():
-        # Uncomment to enable tracing of log context changes.
-        # sys.settrace(logcontext_tracer)
-
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            change_resource_limit(hs.config.soft_file_limit)
-            if hs.config.gc_thresholds:
-                gc.set_threshold(*hs.config.gc_thresholds)
-            reactor.run()
-
-    if hs.config.daemonize:
-
-        if hs.config.print_pidfile:
-            print (hs.config.pid_file)
-
-        daemon = Daemonize(
-            app="synapse-homeserver",
-            pid=hs.config.pid_file,
-            action=lambda: in_thread(),
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-
-        daemon.start()
-    else:
-        in_thread()
+    if hs.config.daemonize and hs.config.print_pidfile:
+        print (hs.config.pid_file)
+
+    _base.start_reactor(
+        "synapse-homeserver",
+        hs.config.soft_file_limit,
+        hs.config.gc_thresholds,
+        hs.config.pid_file,
+        hs.config.daemonize,
+        hs.config.cpu_affinity,
+        logger,
+    )
 
 
 def main():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f57ec784fe..36c18bdbcb 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -13,14 +13,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import (
+    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -33,27 +40,12 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
-    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.media_repository")
 
 
@@ -180,36 +172,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-media-repository",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-media-repository", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f9114acfcb..db9a4d16f4 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -13,41 +13,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
 from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
-    PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.pusher")
 
 
@@ -244,18 +236,6 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
@@ -263,18 +243,7 @@ def start(config_options):
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-pusher",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-pusher", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4bdd99a966..80e4ba5336 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -13,56 +13,50 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import contextlib
+import logging
+import sys
 
 import synapse
-
 from synapse.api.constants import EventTypes
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -440,36 +434,13 @@ def start(config_options):
     ss.setup()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_datastore().start_profiling()
         ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-synchrotron",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-synchrotron", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 8c6300db9d..be661a70c7 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -14,16 +14,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse
+import logging
+import sys
 
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.user_dir")
 
 
@@ -233,36 +227,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-user-dir",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-user-dir", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 28b4e5f50c..c9a1715f1f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ class ServerConfig(Config):
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
+        self.cpu_affinity = config.get("cpu_affinity")
 
         # Whether to send federation traffic out in this process. This only
         # applies to some federation traffic, and so shouldn't be used to
@@ -41,6 +43,12 @@ class ServerConfig(Config):
 
         self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
 
+        # Whether we should block invites sent to users on this server
+        # (other than those sent by local server admins)
+        self.block_non_admin_invites = config.get(
+            "block_non_admin_invites", False,
+        )
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
@@ -147,6 +155,27 @@ class ServerConfig(Config):
         # When running as a daemon, the file to store the pid in
         pid_file: %(pid_file)s
 
+        # CPU affinity mask. Setting this restricts the CPUs on which the
+        # process will be scheduled. It is represented as a bitmask, with the
+        # lowest order bit corresponding to the first logical CPU and the
+        # highest order bit corresponding to the last logical CPU. Not all CPUs
+        # may exist on a given system but a mask may specify more CPUs than are
+        # present.
+        #
+        # For example:
+        #    0x00000001  is processor #0,
+        #    0x00000003  is processors #0 and #1,
+        #    0xFFFFFFFF  is all processors (#0 through #31).
+        #
+        # Pinning a Python process to a single CPU is desirable, because Python
+        # is inherently single-threaded due to the GIL, and can suffer a
+        # 30-40%% slowdown due to cache blow-out and thread context switching
+        # if the scheduler happens to schedule the underlying threads across
+        # different cores. See
+        # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
+        #
+        # cpu_affinity: 0xFFFFFFFF
+
         # Whether to serve a web client from the HTTP/HTTPS root resource.
         web_client: True
 
@@ -171,6 +200,10 @@ class ServerConfig(Config):
         # and sync operations. The default value is -1, means no upper limit.
         # filter_timeline_limit: 5000
 
+        # Whether room invites to users on this server should be blocked
+        # (except those sent by local server admins). The default is False.
+        # block_non_admin_invites: True
+
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 99d5d8aaeb..c5a5a8919c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -33,6 +33,7 @@ class WorkerConfig(Config):
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+        self.worker_cpu_affinity = config.get("worker_cpu_affinity")
 
         if self.worker_listeners:
             for listener in self.worker_listeners:
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index c2bd64d6c2..f1fd488b90 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -13,14 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+from synapse.util import logcontext
 from twisted.web.http import HTTPClient
 from twisted.internet.protocol import Factory
 from twisted.internet import defer, reactor
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.logcontext import (
-    preserve_context_over_fn, preserve_context_over_deferred
-)
 import simplejson as json
 import logging
 
@@ -43,14 +40,10 @@ def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
 
     for i in range(5):
         try:
-            protocol = yield preserve_context_over_fn(
-                endpoint.connect, factory
-            )
-            server_response, server_certificate = yield preserve_context_over_deferred(
-                protocol.remote_key
-            )
-            defer.returnValue((server_response, server_certificate))
-            return
+            with logcontext.PreserveLoggingContext():
+                protocol = yield endpoint.connect(factory)
+                server_response, server_certificate = yield protocol.remote_key
+                defer.returnValue((server_response, server_certificate))
         except SynapseKeyClientError as e:
             logger.exception("Error getting key for %r" % (server_name,))
             if e.status.startswith("4"):
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index c900f4d6df..054bac456d 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,10 +16,9 @@
 
 from synapse.crypto.keyclient import fetch_server_key
 from synapse.api.errors import SynapseError, Codes
-from synapse.util import unwrapFirstError
-from synapse.util.async import ObservableDeferred
+from synapse.util import unwrapFirstError, logcontext
 from synapse.util.logcontext import (
-    preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
+    PreserveLoggingContext,
     preserve_fn
 )
 from synapse.util.metrics import Measure
@@ -57,7 +57,8 @@ Attributes:
     json_object(dict): The JSON object to verify.
     deferred(twisted.internet.defer.Deferred):
         A deferred (server_name, key_id, verify_key) tuple that resolves when
-        a verify key has been fetched
+        a verify key has been fetched. The deferreds' callbacks are run with no
+        logcontext.
 """
 
 
@@ -74,23 +75,32 @@ class Keyring(object):
         self.perspective_servers = self.config.perspectives
         self.hs = hs
 
+        # map from server name to Deferred. Has an entry for each server with
+        # an ongoing key download; the Deferred completes once the download
+        # completes.
+        #
+        # These are regular, logcontext-agnostic Deferreds.
         self.key_downloads = {}
 
     def verify_json_for_server(self, server_name, json_object):
-        return self.verify_json_objects_for_server(
-            [(server_name, json_object)]
-        )[0]
+        return logcontext.make_deferred_yieldable(
+            self.verify_json_objects_for_server(
+                [(server_name, json_object)]
+            )[0]
+        )
 
     def verify_json_objects_for_server(self, server_and_json):
-        """Bulk verfies signatures of json objects, bulk fetching keys as
+        """Bulk verifies signatures of json objects, bulk fetching keys as
         necessary.
 
         Args:
             server_and_json (list): List of pairs of (server_name, json_object)
 
         Returns:
-            list of deferreds indicating success or failure to verify each
-            json object's signature for the given server_name.
+            List<Deferred>: for each input pair, a deferred indicating success
+                or failure to verify each json object's signature for the given
+                server_name. The deferreds run their callbacks in the sentinel
+                logcontext.
         """
         verify_requests = []
 
@@ -117,93 +127,71 @@ class Keyring(object):
 
             verify_requests.append(verify_request)
 
-        @defer.inlineCallbacks
-        def handle_key_deferred(verify_request):
-            server_name = verify_request.server_name
-            try:
-                _, key_id, verify_key = yield verify_request.deferred
-            except IOError as e:
-                logger.warn(
-                    "Got IOError when downloading keys for %s: %s %s",
-                    server_name, type(e).__name__, str(e.message),
-                )
-                raise SynapseError(
-                    502,
-                    "Error downloading keys for %s" % (server_name,),
-                    Codes.UNAUTHORIZED,
-                )
-            except Exception as e:
-                logger.exception(
-                    "Got Exception when downloading keys for %s: %s %s",
-                    server_name, type(e).__name__, str(e.message),
-                )
-                raise SynapseError(
-                    401,
-                    "No key for %s with id %s" % (server_name, key_ids),
-                    Codes.UNAUTHORIZED,
-                )
+        preserve_fn(self._start_key_lookups)(verify_requests)
+
+        # Pass those keys to handle_key_deferred so that the json object
+        # signatures can be verified
+        handle = preserve_fn(_handle_key_deferred)
+        return [
+            handle(rq) for rq in verify_requests
+        ]
 
-            json_object = verify_request.json_object
+    @defer.inlineCallbacks
+    def _start_key_lookups(self, verify_requests):
+        """Sets off the key fetches for each verify request
 
-            logger.debug("Got key %s %s:%s for server %s, verifying" % (
-                key_id, verify_key.alg, verify_key.version, server_name,
-            ))
-            try:
-                verify_signed_json(json_object, server_name, verify_key)
-            except:
-                raise SynapseError(
-                    401,
-                    "Invalid signature for server %s with key %s:%s" % (
-                        server_name, verify_key.alg, verify_key.version
-                    ),
-                    Codes.UNAUTHORIZED,
-                )
+        Once each fetch completes, verify_request.deferred will be resolved.
+
+        Args:
+            verify_requests (List[VerifyKeyRequest]):
+        """
 
+        # create a deferred for each server we're going to look up the keys
+        # for; we'll resolve them once we have completed our lookups.
+        # These will be passed into wait_for_previous_lookups to block
+        # any other lookups until we have finished.
+        # The deferreds are called with no logcontext.
         server_to_deferred = {
-            server_name: defer.Deferred()
-            for server_name, _ in server_and_json
+            rq.server_name: defer.Deferred()
+            for rq in verify_requests
         }
 
-        with PreserveLoggingContext():
+        # We want to wait for any previous lookups to complete before
+        # proceeding.
+        yield self.wait_for_previous_lookups(
+            [rq.server_name for rq in verify_requests],
+            server_to_deferred,
+        )
 
-            # We want to wait for any previous lookups to complete before
-            # proceeding.
-            wait_on_deferred = self.wait_for_previous_lookups(
-                [server_name for server_name, _ in server_and_json],
-                server_to_deferred,
-            )
+        # Actually start fetching keys.
+        self._get_server_verify_keys(verify_requests)
 
-            # Actually start fetching keys.
-            wait_on_deferred.addBoth(
-                lambda _: self.get_server_verify_keys(verify_requests)
-            )
+        # When we've finished fetching all the keys for a given server_name,
+        # resolve the deferred passed to `wait_for_previous_lookups` so that
+        # any lookups waiting will proceed.
+        #
+        # map from server name to a set of request ids
+        server_to_request_ids = {}
 
-            # When we've finished fetching all the keys for a given server_name,
-            # resolve the deferred passed to `wait_for_previous_lookups` so that
-            # any lookups waiting will proceed.
-            server_to_request_ids = {}
-
-            def remove_deferreds(res, server_name, verify_request):
-                request_id = id(verify_request)
-                server_to_request_ids[server_name].discard(request_id)
-                if not server_to_request_ids[server_name]:
-                    d = server_to_deferred.pop(server_name, None)
-                    if d:
-                        d.callback(None)
-                return res
-
-            for verify_request in verify_requests:
-                server_name = verify_request.server_name
-                request_id = id(verify_request)
-                server_to_request_ids.setdefault(server_name, set()).add(request_id)
-                deferred.addBoth(remove_deferreds, server_name, verify_request)
+        for verify_request in verify_requests:
+            server_name = verify_request.server_name
+            request_id = id(verify_request)
+            server_to_request_ids.setdefault(server_name, set()).add(request_id)
 
-        # Pass those keys to handle_key_deferred so that the json object
-        # signatures can be verified
-        return [
-            preserve_context_over_fn(handle_key_deferred, verify_request)
-            for verify_request in verify_requests
-        ]
+        def remove_deferreds(res, verify_request):
+            server_name = verify_request.server_name
+            request_id = id(verify_request)
+            server_to_request_ids[server_name].discard(request_id)
+            if not server_to_request_ids[server_name]:
+                d = server_to_deferred.pop(server_name, None)
+                if d:
+                    d.callback(None)
+            return res
+
+        for verify_request in verify_requests:
+            verify_request.deferred.addBoth(
+                remove_deferreds, verify_request,
+            )
 
     @defer.inlineCallbacks
     def wait_for_previous_lookups(self, server_names, server_to_deferred):
@@ -212,7 +200,13 @@ class Keyring(object):
         Args:
             server_names (list): list of server_names we want to lookup
             server_to_deferred (dict): server_name to deferred which gets
-                resolved once we've finished looking up keys for that server
+                resolved once we've finished looking up keys for that server.
+                The Deferreds should be regular twisted ones which call their
+                callbacks with no logcontext.
+
+        Returns: a Deferred which resolves once all key lookups for the given
+            servers have completed. Follows the synapse rules of logcontext
+            preservation.
         """
         while True:
             wait_on = [
@@ -226,17 +220,15 @@ class Keyring(object):
             else:
                 break
 
-        for server_name, deferred in server_to_deferred.items():
-            d = ObservableDeferred(preserve_context_over_deferred(deferred))
-            self.key_downloads[server_name] = d
-
-            def rm(r, server_name):
-                self.key_downloads.pop(server_name, None)
-                return r
+        def rm(r, server_name_):
+            self.key_downloads.pop(server_name_, None)
+            return r
 
-            d.addBoth(rm, server_name)
+        for server_name, deferred in server_to_deferred.items():
+            self.key_downloads[server_name] = deferred
+            deferred.addBoth(rm, server_name)
 
-    def get_server_verify_keys(self, verify_requests):
+    def _get_server_verify_keys(self, verify_requests):
         """Tries to find at least one key for each verify request
 
         For each verify_request, verify_request.deferred is called back with
@@ -305,21 +297,23 @@ class Keyring(object):
                     if not missing_keys:
                         break
 
-                for verify_request in requests_missing_keys:
-                    verify_request.deferred.errback(SynapseError(
-                        401,
-                        "No key for %s with id %s" % (
-                            verify_request.server_name, verify_request.key_ids,
-                        ),
-                        Codes.UNAUTHORIZED,
-                    ))
+                with PreserveLoggingContext():
+                    for verify_request in requests_missing_keys:
+                        verify_request.deferred.errback(SynapseError(
+                            401,
+                            "No key for %s with id %s" % (
+                                verify_request.server_name, verify_request.key_ids,
+                            ),
+                            Codes.UNAUTHORIZED,
+                        ))
 
         def on_err(err):
-            for verify_request in verify_requests:
-                if not verify_request.deferred.called:
-                    verify_request.deferred.errback(err)
+            with PreserveLoggingContext():
+                for verify_request in verify_requests:
+                    if not verify_request.deferred.called:
+                        verify_request.deferred.errback(err)
 
-        do_iterations().addErrback(on_err)
+        preserve_fn(do_iterations)().addErrback(on_err)
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
@@ -333,7 +327,7 @@ class Keyring(object):
             Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
                 server_name -> key_id -> VerifyKey
         """
-        res = yield preserve_context_over_deferred(defer.gatherResults(
+        res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.get_server_verify_keys)(
                     server_name, key_ids
@@ -341,7 +335,7 @@ class Keyring(object):
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(dict(res))
 
@@ -362,13 +356,13 @@ class Keyring(object):
                 )
                 defer.returnValue({})
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(get_key)(p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         union_of_keys = {}
         for result in results:
@@ -402,13 +396,13 @@ class Keyring(object):
 
             defer.returnValue(keys)
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(get_key)(server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         merged = {}
         for result in results:
@@ -485,7 +479,7 @@ class Keyring(object):
             for server_name, response_keys in processed_response.items():
                 keys.setdefault(server_name, {}).update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=server_name,
@@ -495,7 +489,7 @@ class Keyring(object):
                 for server_name, response_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -543,7 +537,7 @@ class Keyring(object):
 
             keys.update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=key_server_name,
@@ -553,7 +547,7 @@ class Keyring(object):
                 for key_server_name, verify_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -619,7 +613,7 @@ class Keyring(object):
         response_keys.update(verify_keys)
         response_keys.update(old_verify_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_keys_json)(
                     server_name=server_name,
@@ -632,7 +626,7 @@ class Keyring(object):
                 for key_id in updated_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         results[server_name] = response_keys
 
@@ -710,7 +704,6 @@ class Keyring(object):
 
         defer.returnValue(verify_keys)
 
-    @defer.inlineCallbacks
     def store_keys(self, server_name, from_server, verify_keys):
         """Store a collection of verify keys for a given server
         Args:
@@ -721,7 +714,7 @@ class Keyring(object):
             A deferred that completes when the keys are stored.
         """
         # TODO(markjh): Store whether the keys have expired.
-        yield preserve_context_over_deferred(defer.gatherResults(
+        return logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_verify_key)(
                     server_name, server_name, key.time_added, key
@@ -729,4 +722,48 @@ class Keyring(object):
                 for key_id, key in verify_keys.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
+
+
+@defer.inlineCallbacks
+def _handle_key_deferred(verify_request):
+    server_name = verify_request.server_name
+    try:
+        with PreserveLoggingContext():
+            _, key_id, verify_key = yield verify_request.deferred
+    except IOError as e:
+        logger.warn(
+            "Got IOError when downloading keys for %s: %s %s",
+            server_name, type(e).__name__, str(e.message),
+        )
+        raise SynapseError(
+            502,
+            "Error downloading keys for %s" % (server_name,),
+            Codes.UNAUTHORIZED,
+        )
+    except Exception as e:
+        logger.exception(
+            "Got Exception when downloading keys for %s: %s %s",
+            server_name, type(e).__name__, str(e.message),
+        )
+        raise SynapseError(
+            401,
+            "No key for %s with id %s" % (server_name, verify_request.key_ids),
+            Codes.UNAUTHORIZED,
+        )
+
+    json_object = verify_request.json_object
+
+    logger.debug("Got key %s %s:%s for server %s, verifying" % (
+        key_id, verify_key.alg, verify_key.version, server_name,
+    ))
+    try:
+        verify_signed_json(json_object, server_name, verify_key)
+    except:
+        raise SynapseError(
+            401,
+            "Invalid signature for server %s with key %s:%s" % (
+                server_name, verify_key.alg, verify_key.version
+            ),
+            Codes.UNAUTHORIZED,
+        )
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
new file mode 100644
index 0000000000..56fa9e556e
--- /dev/null
+++ b/synapse/events/spamcheck.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def check_event_for_spam(event):
+    """Checks if a given event is considered "spammy" by this server.
+
+    If the server considers an event spammy, then it will be rejected if
+    sent by a local user. If it is sent by a user on another server, then
+    users receive a blank event.
+
+    Args:
+        event (synapse.events.EventBase): the event to be checked
+
+    Returns:
+        bool: True if the event is spammy.
+    """
+    if not hasattr(event, "content") or "body" not in event.content:
+        return False
+
+    # for example:
+    #
+    # if "the third flower is green" in event.content["body"]:
+    #    return True
+
+    return False
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 2339cc9034..babd9ea078 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -12,21 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
-from twisted.internet import defer
-
-from synapse.events.utils import prune_event
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
-
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-
 import logging
 
+from synapse.api.errors import SynapseError
+from synapse.crypto.event_signing import check_event_content_hash
+from synapse.events import spamcheck
+from synapse.events.utils import prune_event
+from synapse.util import unwrapFirstError, logcontext
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -57,56 +50,52 @@ class FederationBase(object):
         """
         deferreds = self._check_sigs_and_hashes(pdus)
 
-        def callback(pdu):
-            return pdu
+        @defer.inlineCallbacks
+        def handle_check_result(pdu, deferred):
+            try:
+                res = yield logcontext.make_deferred_yieldable(deferred)
+            except SynapseError:
+                res = None
 
-        def errback(failure, pdu):
-            failure.trap(SynapseError)
-            return None
-
-        def try_local_db(res, pdu):
             if not res:
                 # Check local db.
-                return self.store.get_event(
+                res = yield self.store.get_event(
                     pdu.event_id,
                     allow_rejected=True,
                     allow_none=True,
                 )
-            return res
 
-        def try_remote(res, pdu):
             if not res and pdu.origin != origin:
-                return self.get_pdu(
-                    destinations=[pdu.origin],
-                    event_id=pdu.event_id,
-                    outlier=outlier,
-                    timeout=10000,
-                ).addErrback(lambda e: None)
-            return res
-
-        def warn(res, pdu):
+                try:
+                    res = yield self.get_pdu(
+                        destinations=[pdu.origin],
+                        event_id=pdu.event_id,
+                        outlier=outlier,
+                        timeout=10000,
+                    )
+                except SynapseError:
+                    pass
+
             if not res:
                 logger.warn(
                     "Failed to find copy of %s with valid signature",
                     pdu.event_id,
                 )
-            return res
 
-        for pdu, deferred in zip(pdus, deferreds):
-            deferred.addCallbacks(
-                callback, errback, errbackArgs=[pdu]
-            ).addCallback(
-                try_local_db, pdu
-            ).addCallback(
-                try_remote, pdu
-            ).addCallback(
-                warn, pdu
-            )
+            defer.returnValue(res)
 
-        valid_pdus = yield preserve_context_over_deferred(defer.gatherResults(
-            deferreds,
-            consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        handle = logcontext.preserve_fn(handle_check_result)
+        deferreds2 = [
+            handle(pdu, deferred)
+            for pdu, deferred in zip(pdus, deferreds)
+        ]
+
+        valid_pdus = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults(
+                deferreds2,
+                consumeErrors=True,
+            )
+        ).addErrback(unwrapFirstError)
 
         if include_none:
             defer.returnValue(valid_pdus)
@@ -114,15 +103,24 @@ class FederationBase(object):
             defer.returnValue([p for p in valid_pdus if p])
 
     def _check_sigs_and_hash(self, pdu):
-        return self._check_sigs_and_hashes([pdu])[0]
+        return logcontext.make_deferred_yieldable(
+            self._check_sigs_and_hashes([pdu])[0],
+        )
 
     def _check_sigs_and_hashes(self, pdus):
-        """Throws a SynapseError if a PDU does not have the correct
-        signatures.
+        """Checks that each of the received events is correctly signed by the
+        sending server.
+
+        Args:
+            pdus (list[FrozenEvent]): the events to be checked
 
         Returns:
-            FrozenEvent: Either the given event or it redacted if it failed the
-            content hash check.
+            list[Deferred]: for each input event, a deferred which:
+              * returns the original event if the checks pass
+              * returns a redacted version of the event (if the signature
+                matched but the hash did not)
+              * throws a SynapseError if the signature check failed.
+            The deferreds run their callbacks in the sentinel logcontext.
         """
 
         redacted_pdus = [
@@ -130,26 +128,38 @@ class FederationBase(object):
             for pdu in pdus
         ]
 
-        deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([
+        deferreds = self.keyring.verify_json_objects_for_server([
             (p.origin, p.get_pdu_json())
             for p in redacted_pdus
         ])
 
+        ctx = logcontext.LoggingContext.current_context()
+
         def callback(_, pdu, redacted):
-            if not check_event_content_hash(pdu):
-                logger.warn(
-                    "Event content has been tampered, redacting %s: %s",
-                    pdu.event_id, pdu.get_pdu_json()
-                )
-                return redacted
-            return pdu
+            with logcontext.PreserveLoggingContext(ctx):
+                if not check_event_content_hash(pdu):
+                    logger.warn(
+                        "Event content has been tampered, redacting %s: %s",
+                        pdu.event_id, pdu.get_pdu_json()
+                    )
+                    return redacted
+
+                if spamcheck.check_event_for_spam(pdu):
+                    logger.warn(
+                        "Event contains spam, redacting %s: %s",
+                        pdu.event_id, pdu.get_pdu_json()
+                    )
+                    return redacted
+
+                return pdu
 
         def errback(failure, pdu):
             failure.trap(SynapseError)
-            logger.warn(
-                "Signature check failed for %s",
-                pdu.event_id,
-            )
+            with logcontext.PreserveLoggingContext(ctx):
+                logger.warn(
+                    "Signature check failed for %s",
+                    pdu.event_id,
+                )
             return failure
 
         for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 861441708b..7c5e5d957f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,7 +22,7 @@ from synapse.api.constants import Membership
 from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
-from synapse.util import unwrapFirstError
+from synapse.util import unwrapFirstError, logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@@ -189,10 +189,10 @@ class FederationClient(FederationBase):
         ]
 
         # FIXME: We should handle signature failures more gracefully.
-        pdus[:] = yield preserve_context_over_deferred(defer.gatherResults(
+        pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             self._check_sigs_and_hashes(pdus),
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(pdus)
 
@@ -252,7 +252,7 @@ class FederationClient(FederationBase):
                     pdu = pdu_list[0]
 
                     # Check signatures are correct.
-                    signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
+                    signed_pdu = yield self._check_sigs_and_hash(pdu)
 
                     break
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ed60d494ff..dac4b3f4e0 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
+        now_token = yield self.hs.get_event_sources().get_current_token()
+
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # First we check if any devices have changed
@@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler):
         # Then work out if any users have since joined
         rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
 
+        member_events = yield self.store.get_membership_changes_for_user(
+            user_id, from_token.room_key, now_token.room_key
+        )
+        rooms_changed.update(event.room_id for event in member_events)
+
         stream_ordering = RoomStreamToken.parse_stream_token(
-            from_token.room_key).stream
+            from_token.room_key
+        ).stream
 
         possibly_changed = set(changed)
+        possibly_left = set()
         for room_id in rooms_changed:
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+            # The user may have left the room
+            # TODO: Check if they actually did or if we were just invited.
+            if room_id not in room_ids:
+                for key, event_id in current_state_ids.iteritems():
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_left.add(state_key)
+                continue
+
             # Fetch the current state at the time.
             try:
                 event_ids = yield self.store.get_forward_extremeties_for_room(
@@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler):
                 # ordering: treat it the same as a new room
                 event_ids = []
 
-            current_state_ids = yield self.store.get_current_state_ids(room_id)
-
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
@@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler):
                     possibly_changed.add(state_key)
                 continue
 
+            current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+            if not current_member_id:
+                continue
+
             # mapping from event_id -> state_dict
             prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
 
+            # Check if we've joined the room? If so we just blindly add all the users to
+            # the "possibly changed" users.
+            for state_dict in prev_state_ids.itervalues():
+                member_event = state_dict.get((EventTypes.Member, user_id), None)
+                if not member_event or member_event != current_member_id:
+                    for key, event_id in current_state_ids.iteritems():
+                        etype, state_key = key
+                        if etype != EventTypes.Member:
+                            continue
+                        possibly_changed.add(state_key)
+                    break
+
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
@@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler):
 
                 # check if this member has changed since any of the extremities
                 # at the stream_ordering, and add them to the list if so.
-                for state_dict in prev_state_ids.values():
+                for state_dict in prev_state_ids.itervalues():
                     prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
-                        possibly_changed.add(state_key)
+                        if state_key != user_id:
+                            possibly_changed.add(state_key)
                         break
 
-        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-            user_id
-        )
+        if possibly_changed or possibly_left:
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
 
-        # Take the intersection of the users whose devices may have changed
-        # and those that actually still share a room with the user
-        defer.returnValue(users_who_share_room & possibly_changed)
+            # Take the intersection of the users whose devices may have changed
+            # and those that actually still share a room with the user
+            possibly_joined = possibly_changed & users_who_share_room
+            possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+        else:
+            possibly_joined = []
+            possibly_left = []
+
+        defer.returnValue({
+            "changed": list(possibly_joined),
+            "left": list(possibly_left),
+        })
 
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b790a7c2ef..18f87cad67 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1074,6 +1074,9 @@ class FederationHandler(BaseHandler):
         if is_blocked:
             raise SynapseError(403, "This room has been blocked on this server")
 
+        if self.hs.config.block_non_admin_invites:
+            raise SynapseError(403, "This server does not accept room invites")
+
         membership = event.content.get("membership")
         if event.type != EventTypes.Member or membership != Membership.INVITE:
             raise SynapseError(400, "The event was not an m.room.member invite event")
@@ -1606,7 +1609,7 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
-        if event.type == EventTypes.GuestAccess:
+        if event.type == EventTypes.GuestAccess and not context.rejected:
             yield self.maybe_kick_guest_users(event)
 
         defer.returnValue(context)
@@ -2090,6 +2093,14 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+        """Handle an exchange_third_party_invite request from a remote server
+
+        The remote server will call this when it wants to turn a 3pid invite
+        into a normal m.room.member invite.
+
+        Returns:
+            Deferred: resolves (to None)
+        """
         builder = self.event_builder_factory.new(event_dict)
 
         message_handler = self.hs.get_handlers().message_handler
@@ -2108,9 +2119,12 @@ class FederationHandler(BaseHandler):
             raise e
         yield self._check_signature(event, context)
 
+        # XXX we send the invite here, but send_membership_event also sends it,
+        # so we end up making two requests. I think this is redundant.
         returned_invite = yield self.send_invite(origin, event)
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
+
         member_handler = self.hs.get_handlers().room_member_handler
         yield member_handler.send_membership_event(None, event, context)
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index be4f123c54..da18bf23db 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+from synapse.events import spamcheck
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -321,6 +321,12 @@ class MessageHandler(BaseHandler):
             token_id=requester.access_token_id,
             txn_id=txn_id
         )
+
+        if spamcheck.check_event_for_spam(event):
+            raise SynapseError(
+                403, "Spam is not permitted here", Codes.FORBIDDEN
+            )
+
         yield self.send_nonmember_event(
             requester,
             event,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b3f979b246..9a498c2d3e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -191,6 +191,8 @@ class RoomMemberHandler(BaseHandler):
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
 
+        # if this is a join with a 3pid signature, we may need to turn a 3pid
+        # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
             replication = self.hs.get_replication_layer()
             yield replication.exchange_third_party_invite(
@@ -208,6 +210,16 @@ class RoomMemberHandler(BaseHandler):
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
 
+        if (effective_membership_state == "invite" and
+                self.hs.config.block_non_admin_invites):
+            is_requester_admin = yield self.auth.is_server_admin(
+                requester.user,
+            )
+            if not is_requester_admin:
+                raise SynapseError(
+                    403, "Invites have been disabled on this server",
+                )
+
         latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
@@ -471,6 +483,16 @@ class RoomMemberHandler(BaseHandler):
             requester,
             txn_id
     ):
+        if self.hs.config.block_non_admin_invites:
+            is_requester_admin = yield self.auth.is_server_admin(
+                requester.user,
+            )
+            if not is_requester_admin:
+                raise SynapseError(
+                    403, "Invites have been disabled on this server",
+                    Codes.FORBIDDEN,
+                )
+
         invitee = yield self._lookup_3pid(
             id_server, medium, address
         )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e6df1819b9..af1b527840 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -108,6 +108,16 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
         return True
 
 
+class DeviceLists(collections.namedtuple("DeviceLists", [
+    "changed",   # list of user_ids whose devices may have changed
+    "left",      # list of user_ids whose devices we no longer track
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(self.changed or self.left)
+
+
 class SyncResult(collections.namedtuple("SyncResult", [
     "next_batch",  # Token for the next sync
     "presence",  # List of presence events for the user.
@@ -283,6 +293,11 @@ class SyncHandler(object):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
 
+            # Pull out the current state, as we always want to include those events
+            # in the timeline if they're there.
+            current_state_ids = yield self.state.get_current_state_ids(room_id)
+            current_state_ids = frozenset(current_state_ids.itervalues())
+
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
             else:
@@ -294,6 +309,7 @@ class SyncHandler(object):
                     self.store,
                     sync_config.user.to_string(),
                     recents,
+                    always_include_ids=current_state_ids,
                 )
             else:
                 recents = []
@@ -329,6 +345,7 @@ class SyncHandler(object):
                     self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
+                    always_include_ids=current_state_ids,
                 )
                 loaded_recents.extend(recents)
                 recents = loaded_recents
@@ -535,7 +552,8 @@ class SyncHandler(object):
         res = yield self._generate_sync_entry_for_rooms(
             sync_result_builder, account_data_by_room
         )
-        newly_joined_rooms, newly_joined_users = res
+        newly_joined_rooms, newly_joined_users, _, _ = res
+        _, _, newly_left_rooms, newly_left_users = res
 
         block_all_presence_data = (
             since_token is None and
@@ -549,7 +567,11 @@ class SyncHandler(object):
         yield self._generate_sync_entry_for_to_device(sync_result_builder)
 
         device_lists = yield self._generate_sync_entry_for_device_list(
-            sync_result_builder
+            sync_result_builder,
+            newly_joined_rooms=newly_joined_rooms,
+            newly_joined_users=newly_joined_users,
+            newly_left_rooms=newly_left_rooms,
+            newly_left_users=newly_left_users,
         )
 
         device_id = sync_config.device_id
@@ -574,7 +596,9 @@ class SyncHandler(object):
 
     @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
-    def _generate_sync_entry_for_device_list(self, sync_result_builder):
+    def _generate_sync_entry_for_device_list(self, sync_result_builder,
+                                             newly_joined_rooms, newly_joined_users,
+                                             newly_left_rooms, newly_left_users):
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
@@ -582,16 +606,40 @@ class SyncHandler(object):
             changed = yield self.store.get_user_whose_devices_changed(
                 since_token.device_list_key
             )
-            if not changed:
-                defer.returnValue([])
+
+            # TODO: Be more clever than this, i.e. remove users who we already
+            # share a room with?
+            for room_id in newly_joined_rooms:
+                joined_users = yield self.state.get_current_user_in_room(room_id)
+                newly_joined_users.update(joined_users)
+
+            for room_id in newly_left_rooms:
+                left_users = yield self.state.get_current_user_in_room(room_id)
+                newly_left_users.update(left_users)
+
+            # TODO: Check that these users are actually new, i.e. either they
+            # weren't in the previous sync *or* they left and rejoined.
+            changed.update(newly_joined_users)
+
+            if not changed and not newly_left_users:
+                defer.returnValue(DeviceLists(
+                    changed=[],
+                    left=newly_left_users,
+                ))
 
             users_who_share_room = yield self.store.get_users_who_share_room_with_user(
                 user_id
             )
 
-            defer.returnValue(users_who_share_room & changed)
+            defer.returnValue(DeviceLists(
+                changed=users_who_share_room & changed,
+                left=set(newly_left_users) - users_who_share_room,
+            ))
         else:
-            defer.returnValue([])
+            defer.returnValue(DeviceLists(
+                changed=[],
+                left=[],
+            ))
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_to_device(self, sync_result_builder):
@@ -755,8 +803,8 @@ class SyncHandler(object):
             account_data_by_room(dict): Dictionary of per room account data
 
         Returns:
-            Deferred(tuple): Returns a 2-tuple of
-            `(newly_joined_rooms, newly_joined_users)`
+            Deferred(tuple): Returns a 4-tuple of
+            `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
@@ -787,7 +835,7 @@ class SyncHandler(object):
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        defer.returnValue(([], []))
+                        defer.returnValue(([], [], [], []))
 
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id,
@@ -800,7 +848,7 @@ class SyncHandler(object):
 
         if since_token:
             res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms = res
+            room_entries, invited, newly_joined_rooms, newly_left_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
                 user_id, since_token.account_data_key,
@@ -808,6 +856,7 @@ class SyncHandler(object):
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
             room_entries, invited, newly_joined_rooms = res
+            newly_left_rooms = []
 
             tags_by_room = yield self.store.get_tags_for_user(user_id)
 
@@ -828,17 +877,30 @@ class SyncHandler(object):
 
         # Now we want to get any newly joined users
         newly_joined_users = set()
+        newly_left_users = set()
         if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
-                    joined_sync.timeline.events, joined_sync.state.values()
+                    joined_sync.timeline.events, joined_sync.state.itervalues()
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
                         if event.membership == Membership.JOIN:
                             newly_joined_users.add(event.state_key)
-
-        defer.returnValue((newly_joined_rooms, newly_joined_users))
+                        else:
+                            prev_content = event.unsigned.get("prev_content", {})
+                            prev_membership = prev_content.get("membership", None)
+                            if prev_membership == Membership.JOIN:
+                                newly_left_users.add(event.state_key)
+
+        newly_left_users -= newly_joined_users
+
+        defer.returnValue((
+            newly_joined_rooms,
+            newly_joined_users,
+            newly_left_rooms,
+            newly_left_users,
+        ))
 
     @defer.inlineCallbacks
     def _have_rooms_changed(self, sync_result_builder):
@@ -908,15 +970,28 @@ class SyncHandler(object):
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
         newly_joined_rooms = []
+        newly_left_rooms = []
         room_entries = []
         invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
+        for room_id, events in mem_change_events_by_room_id.iteritems():
             non_joins = [e for e in events if e.membership != Membership.JOIN]
             has_join = len(non_joins) != len(events)
 
             # We want to figure out if we joined the room at some point since
             # the last sync (even if we have since left). This is to make sure
             # we do send down the room, and with full state, where necessary
+
+            old_state_ids = None
+            if room_id in joined_room_ids and non_joins:
+                # Always include if the user (re)joined the room, especially
+                # important so that device list changes are calculated correctly.
+                # If there are non join member events, but we are still in the room,
+                # then the user must have left and joined
+                newly_joined_rooms.append(room_id)
+
+                # User is in the room so we don't need to do the invite/leave checks
+                continue
+
             if room_id in joined_room_ids or has_join:
                 old_state_ids = yield self.get_state_at(room_id, since_token)
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
@@ -928,12 +1003,33 @@ class SyncHandler(object):
                 if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
                     newly_joined_rooms.append(room_id)
 
-                if room_id in joined_room_ids:
-                    continue
+            # If user is in the room then we don't need to do the invite/leave checks
+            if room_id in joined_room_ids:
+                continue
 
             if not non_joins:
                 continue
 
+            # Check if we have left the room. This can either be because we were
+            # joined before *or* that we since joined and then left.
+            if events[-1].membership != Membership.JOIN:
+                if has_join:
+                    newly_left_rooms.append(room_id)
+                else:
+                    if not old_state_ids:
+                        old_state_ids = yield self.get_state_at(room_id, since_token)
+                        old_mem_ev_id = old_state_ids.get(
+                            (EventTypes.Member, user_id),
+                            None,
+                        )
+                        old_mem_ev = None
+                        if old_mem_ev_id:
+                            old_mem_ev = yield self.store.get_event(
+                                old_mem_ev_id, allow_none=True
+                            )
+                    if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
+                        newly_left_rooms.append(room_id)
+
             # Only bother if we're still currently invited
             should_invite = non_joins[-1].membership == Membership.INVITE
             if should_invite:
@@ -1011,7 +1107,7 @@ class SyncHandler(object):
                     upto_token=since_token,
                 ))
 
-        defer.returnValue((room_entries, invited, newly_joined_rooms))
+        defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
 
     @defer.inlineCallbacks
     def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1259,6 +1355,7 @@ class SyncResultBuilder(object):
         self.invited = []
         self.archived = []
         self.device = []
+        self.to_device = []
 
 
 class RoomSyncResultBuilder(object):
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index d8923c9abb..241b17f2cb 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import socket
 
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.internet import defer, reactor
@@ -30,7 +31,10 @@ logger = logging.getLogger(__name__)
 
 SERVER_CACHE = {}
 
-
+# our record of an individual server which can be tried to reach a destination.
+#
+# "host" is actually a dotted-quad or ipv6 address string. Except when there's
+# no SRV record, in which case it is the original hostname.
 _Server = collections.namedtuple(
     "_Server", "priority weight host port expires"
 )
@@ -219,9 +223,10 @@ class SRVClientEndpoint(object):
                 return self.default_server
             else:
                 raise ConnectError(
-                    "Not server available for %s" % self.service_name
+                    "No server available for %s" % self.service_name
                 )
 
+        # look for all servers with the same priority
         min_priority = self.servers[0].priority
         weight_indexes = list(
             (index, server.weight + 1)
@@ -231,11 +236,22 @@ class SRVClientEndpoint(object):
 
         total_weight = sum(weight for index, weight in weight_indexes)
         target_weight = random.randint(0, total_weight)
-
         for index, weight in weight_indexes:
             target_weight -= weight
             if target_weight <= 0:
                 server = self.servers[index]
+                # XXX: this looks totally dubious:
+                #
+                # (a) we never reuse a server until we have been through
+                #     all of the servers at the same priority, so if the
+                #     weights are A: 100, B:1, we always do ABABAB instead of
+                #     AAAA...AAAB (approximately).
+                #
+                # (b) After using all the servers at the lowest priority,
+                #     we move onto the next priority. We should only use the
+                #     second priority if servers at the top priority are
+                #     unreachable.
+                #
                 del self.servers[index]
                 self.used_servers.append(server)
                 return server
@@ -280,26 +296,21 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
                 continue
 
             payload = answer.payload
-            host = str(payload.target)
-            srv_ttl = answer.ttl
 
-            try:
-                answers, _, _ = yield dns_client.lookupAddress(host)
-            except DNSNameError:
-                continue
+            hosts = yield _get_hosts_for_srv_record(
+                dns_client, str(payload.target)
+            )
 
-            for answer in answers:
-                if answer.type == dns.A and answer.payload:
-                    ip = answer.payload.dottedQuad()
-                    host_ttl = min(srv_ttl, answer.ttl)
+            for (ip, ttl) in hosts:
+                host_ttl = min(answer.ttl, ttl)
 
-                    servers.append(_Server(
-                        host=ip,
-                        port=int(payload.port),
-                        priority=int(payload.priority),
-                        weight=int(payload.weight),
-                        expires=int(clock.time()) + host_ttl,
-                    ))
+                servers.append(_Server(
+                    host=ip,
+                    port=int(payload.port),
+                    priority=int(payload.priority),
+                    weight=int(payload.weight),
+                    expires=int(clock.time()) + host_ttl,
+                ))
 
         servers.sort()
         cache[service_name] = list(servers)
@@ -317,3 +328,68 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
             raise e
 
     defer.returnValue(servers)
+
+
+@defer.inlineCallbacks
+def _get_hosts_for_srv_record(dns_client, host):
+    """Look up each of the hosts in a SRV record
+
+    Args:
+        dns_client (twisted.names.dns.IResolver):
+        host (basestring): host to look up
+
+    Returns:
+        Deferred[list[(str, int)]]: a list of (host, ttl) pairs
+
+    """
+    ip4_servers = []
+    ip6_servers = []
+
+    def cb(res):
+        # lookupAddress and lookupIP6Address return a three-tuple
+        # giving the answer, authority, and additional sections of the
+        # response.
+        #
+        # we only care about the answers.
+
+        return res[0]
+
+    def eb(res):
+        res.trap(DNSNameError)
+        return []
+
+    # no logcontexts here, so we can safely fire these off and gatherResults
+    d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
+    d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+    results = yield defer.gatherResults([d1, d2], consumeErrors=True)
+
+    for result in results:
+        for answer in result:
+            if not answer.payload:
+                continue
+
+            try:
+                if answer.type == dns.A:
+                    ip = answer.payload.dottedQuad()
+                    ip4_servers.append((ip, answer.ttl))
+                elif answer.type == dns.AAAA:
+                    ip = socket.inet_ntop(
+                        socket.AF_INET6, answer.payload.address,
+                    )
+                    ip6_servers.append((ip, answer.ttl))
+                else:
+                    # the most likely candidate here is a CNAME record.
+                    # rfc2782 says srvs may not point to aliases.
+                    logger.warn(
+                        "Ignoring unexpected DNS record type %s for %s",
+                        answer.type, host,
+                    )
+                    continue
+            except Exception as e:
+                logger.warn("Ignoring invalid DNS response for %s: %s",
+                            host, e)
+                continue
+
+    # keep the ipv4 results before the ipv6 results, mostly to match historical
+    # behaviour.
+    defer.returnValue(ip4_servers + ip6_servers)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 8a5d473108..62c41cd9db 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -244,6 +244,26 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _build_notification_dict(self, event, tweaks, badge):
+        if self.data.get('format') == 'event_id_only':
+            d = {
+                'notification': {
+                    'event_id': event.event_id,
+                    'room_id': event.room_id,
+                    'counts': {
+                        'unread': badge,
+                    },
+                    'devices': [
+                        {
+                            'app_id': self.app_id,
+                            'pushkey': self.pushkey,
+                            'pushkey_ts': long(self.pushkey_ts / 1000),
+                            'data': self.data_minus_url,
+                        }
+                    ]
+                }
+            }
+            defer.returnValue(d)
+
         ctx = yield push_tools.get_context_for_event(
             self.store, self.state_handler, event, self.user_id
         )
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index ed7f1c89ad..630e92c90e 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -31,7 +31,7 @@ REQUIREMENTS = {
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
     "daemonize": ["daemonize"],
-    "py-bcrypt": ["bcrypt"],
+    "bcrypt": ["bcrypt"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
     "ujson": ["ujson"],
@@ -40,6 +40,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "affinity": ["affinity"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 7d786e8de3..465b25033d 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
 
     DEFAULT_MESSAGE = (
         "Sharing illegal content on this server is not permitted and rooms in"
-        " violatation will be blocked."
+        " violation will be blocked."
     )
 
     def __init__(self, hs):
@@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet):
 
 class ResetPasswordRestServlet(ClientV1RestServlet):
     """Post request to allow an administrator reset password for a user.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
             @user:to_reset_password?access_token=admin_access_token
@@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
         """Post request to allow an administrator reset password for a user.
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         """
         UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
@@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
 
 class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     """Get request to get specific number of users from Synapse.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
             @admin:user?access_token=admin_access_token&start=0&limit=10
@@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_GET(self, request, target_user_id):
         """Get request to get specific number of users from Synapse.
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         """
         target_user = UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
@@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
         """Post request to get specific number of users from Synapse..
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
             @admin:user?access_token=admin_access_token
@@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
 class SearchUsersRestServlet(ClientV1RestServlet):
     """Get request to search user table for specific users according to
     search term.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/search_users/
             @admin:user?access_token=admin_access_token&term=alice
@@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
     def on_GET(self, request, target_user_id):
         """Get request to search user table for specific users according to
         search term.
-        This need a user have a administrator access in Synapse.
+        This needs user to have a administrator access in Synapse.
         """
         target_user = UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6a3cfe84f8..943e87e7fd 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet):
 
         user_id = requester.user.to_string()
 
-        changed = yield self.device_handler.get_user_ids_changed(
+        results = yield self.device_handler.get_user_ids_changed(
             user_id, from_token,
         )
 
-        defer.returnValue((200, {
-            "changed": list(changed),
-        }))
+        defer.returnValue((200, results))
 
 
 class OneTimeKeyServlet(RestServlet):
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 6dcc407451..978af9c280 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet):
         filter_id = parse_string(request, "filter", default=None)
         full_state = parse_boolean(request, "full_state", default=False)
 
-        logger.info(
+        logger.debug(
             "/sync: user=%r, timeout=%r, since=%r,"
             " set_presence=%r, filter_id=%r, device_id=%r" % (
                 user, timeout, since, set_presence, filter_id, device_id
@@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet):
             "account_data": {"events": sync_result.account_data},
             "to_device": {"events": sync_result.to_device},
             "device_lists": {
-                "changed": list(sync_result.device_lists),
+                "changed": list(sync_result.device_lists.changed),
+                "left": list(sync_result.device_lists.left),
             },
             "presence": SyncRestServlet.encode_presence(
                 sync_result.presence, time_now
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 3b5e0a4fb9..87aeaf71d6 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -113,30 +113,37 @@ class KeyStore(SQLBaseStore):
                 keys[key_id] = key
         defer.returnValue(keys)
 
-    @defer.inlineCallbacks
     def store_server_verify_key(self, server_name, from_server, time_now_ms,
                                 verify_key):
         """Stores a NACL verification key for the given server.
         Args:
             server_name (str): The name of the server.
-            key_id (str): The version of the key for the server.
             from_server (str): Where the verification key was looked up
-            ts_now_ms (int): The time now in milliseconds
-            verification_key (VerifyKey): The NACL verify key.
+            time_now_ms (int): The time now in milliseconds
+            verify_key (nacl.signing.VerifyKey): The NACL verify key.
         """
-        yield self._simple_upsert(
-            table="server_signature_keys",
-            keyvalues={
-                "server_name": server_name,
-                "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
-            },
-            values={
-                "from_server": from_server,
-                "ts_added_ms": time_now_ms,
-                "verify_key": buffer(verify_key.encode()),
-            },
-            desc="store_server_verify_key",
-        )
+        key_id = "%s:%s" % (verify_key.alg, verify_key.version)
+
+        def _txn(txn):
+            self._simple_upsert_txn(
+                txn,
+                table="server_signature_keys",
+                keyvalues={
+                    "server_name": server_name,
+                    "key_id": key_id,
+                },
+                values={
+                    "from_server": from_server,
+                    "ts_added_ms": time_now_ms,
+                    "verify_key": buffer(verify_key.encode()),
+                },
+            )
+            txn.call_after(
+                self._get_server_verify_key.invalidate,
+                (server_name, key_id)
+            )
+
+        return self.runInteraction("store_server_verify_key", _txn)
 
     def store_server_keys_json(self, server_name, key_id, from_server,
                                ts_now_ms, ts_expires_ms, key_json_bytes):
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 5590b866ed..d7dbdc77ff 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = (
 
 
 @defer.inlineCallbacks
-def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
+                              always_include_ids=frozenset()):
     """ Returns dict of user_id -> list of events that user is allowed to
     see.
 
@@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             * the user has not been a member of the room since the
             given events
         events ([synapse.events.EventBase]): list of events to filter
+        always_include_ids (set(event_id)): set of event ids to specifically
+            include (unless sender is ignored)
     """
     forgotten = yield preserve_context_over_deferred(defer.gatherResults([
         defer.maybeDeferred(
@@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
         if not event.is_state() and event.sender in ignore_list:
             return False
 
+        if event.event_id in always_include_ids:
+            return True
+
         state = event_id_to_state[event.event_id]
 
         # get the room_visibility at the time of the event.
@@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
 
 
 @defer.inlineCallbacks
-def filter_events_for_client(store, user_id, events, is_peeking=False):
+def filter_events_for_client(store, user_id, events, is_peeking=False,
+                             always_include_ids=frozenset()):
     """
     Check which events a user is allowed to see
 
@@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False):
         types=types
     )
     res = yield filter_events_for_clients(
-        store, [(user_id, is_peeking)], events, event_id_to_state
+        store, [(user_id, is_peeking)], events, event_id_to_state,
+        always_include_ids=always_include_ids,
     )
     defer.returnValue(res.get(user_id, []))
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
new file mode 100644
index 0000000000..570312da84
--- /dev/null
+++ b/tests/crypto/test_keyring.py
@@ -0,0 +1,229 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+
+import signedjson.key
+import signedjson.sign
+from mock import Mock
+from synapse.api.errors import SynapseError
+from synapse.crypto import keyring
+from synapse.util import async, logcontext
+from synapse.util.logcontext import LoggingContext
+from tests import unittest, utils
+from twisted.internet import defer
+
+
+class MockPerspectiveServer(object):
+    def __init__(self):
+        self.server_name = "mock_server"
+        self.key = signedjson.key.generate_signing_key(0)
+
+    def get_verify_keys(self):
+        vk = signedjson.key.get_verify_key(self.key)
+        return {
+            "%s:%s" % (vk.alg, vk.version): vk,
+        }
+
+    def get_signed_key(self, server_name, verify_key):
+        key_id = "%s:%s" % (verify_key.alg, verify_key.version)
+        res = {
+            "server_name": server_name,
+            "old_verify_keys": {},
+            "valid_until_ts": time.time() * 1000 + 3600,
+            "verify_keys": {
+                key_id: {
+                    "key": signedjson.key.encode_verify_key_base64(verify_key)
+                }
+            }
+        }
+        signedjson.sign.sign_json(res, self.server_name, self.key)
+        return res
+
+
+class KeyringTestCase(unittest.TestCase):
+    @defer.inlineCallbacks
+    def setUp(self):
+        self.mock_perspective_server = MockPerspectiveServer()
+        self.http_client = Mock()
+        self.hs = yield utils.setup_test_homeserver(
+            handlers=None,
+            http_client=self.http_client,
+        )
+        self.hs.config.perspectives = {
+            self.mock_perspective_server.server_name:
+                self.mock_perspective_server.get_verify_keys()
+        }
+
+    def check_context(self, _, expected):
+        self.assertEquals(
+            getattr(LoggingContext.current_context(), "test_key", None),
+            expected
+        )
+
+    @defer.inlineCallbacks
+    def test_wait_for_previous_lookups(self):
+        sentinel_context = LoggingContext.current_context()
+
+        kr = keyring.Keyring(self.hs)
+
+        lookup_1_deferred = defer.Deferred()
+        lookup_2_deferred = defer.Deferred()
+
+        with LoggingContext("one") as context_one:
+            context_one.test_key = "one"
+
+            wait_1_deferred = kr.wait_for_previous_lookups(
+                ["server1"],
+                {"server1": lookup_1_deferred},
+            )
+
+            # there were no previous lookups, so the deferred should be ready
+            self.assertTrue(wait_1_deferred.called)
+            # ... so we should have preserved the LoggingContext.
+            self.assertIs(LoggingContext.current_context(), context_one)
+            wait_1_deferred.addBoth(self.check_context, "one")
+
+        with LoggingContext("two") as context_two:
+            context_two.test_key = "two"
+
+            # set off another wait. It should block because the first lookup
+            # hasn't yet completed.
+            wait_2_deferred = kr.wait_for_previous_lookups(
+                ["server1"],
+                {"server1": lookup_2_deferred},
+            )
+            self.assertFalse(wait_2_deferred.called)
+            # ... so we should have reset the LoggingContext.
+            self.assertIs(LoggingContext.current_context(), sentinel_context)
+            wait_2_deferred.addBoth(self.check_context, "two")
+
+            # let the first lookup complete (in the sentinel context)
+            lookup_1_deferred.callback(None)
+
+            # now the second wait should complete and restore our
+            # loggingcontext.
+            yield wait_2_deferred
+
+    @defer.inlineCallbacks
+    def test_verify_json_objects_for_server_awaits_previous_requests(self):
+        key1 = signedjson.key.generate_signing_key(1)
+
+        kr = keyring.Keyring(self.hs)
+        json1 = {}
+        signedjson.sign.sign_json(json1, "server10", key1)
+
+        persp_resp = {
+            "server_keys": [
+                self.mock_perspective_server.get_signed_key(
+                    "server10",
+                    signedjson.key.get_verify_key(key1)
+                ),
+            ]
+        }
+        persp_deferred = defer.Deferred()
+
+        @defer.inlineCallbacks
+        def get_perspectives(**kwargs):
+            self.assertEquals(
+                LoggingContext.current_context().test_key, "11",
+            )
+            with logcontext.PreserveLoggingContext():
+                yield persp_deferred
+            defer.returnValue(persp_resp)
+        self.http_client.post_json.side_effect = get_perspectives
+
+        with LoggingContext("11") as context_11:
+            context_11.test_key = "11"
+
+            # start off a first set of lookups
+            res_deferreds = kr.verify_json_objects_for_server(
+                [("server10", json1),
+                 ("server11", {})
+                 ]
+            )
+
+            # the unsigned json should be rejected pretty quickly
+            self.assertTrue(res_deferreds[1].called)
+            try:
+                yield res_deferreds[1]
+                self.assertFalse("unsigned json didn't cause a failure")
+            except SynapseError:
+                pass
+
+            self.assertFalse(res_deferreds[0].called)
+            res_deferreds[0].addBoth(self.check_context, None)
+
+            # wait a tick for it to send the request to the perspectives server
+            # (it first tries the datastore)
+            yield async.sleep(0.005)
+            self.http_client.post_json.assert_called_once()
+
+            self.assertIs(LoggingContext.current_context(), context_11)
+
+            context_12 = LoggingContext("12")
+            context_12.test_key = "12"
+            with logcontext.PreserveLoggingContext(context_12):
+                # a second request for a server with outstanding requests
+                # should block rather than start a second call
+                self.http_client.post_json.reset_mock()
+                self.http_client.post_json.return_value = defer.Deferred()
+
+                res_deferreds_2 = kr.verify_json_objects_for_server(
+                    [("server10", json1)],
+                )
+                yield async.sleep(0.005)
+                self.http_client.post_json.assert_not_called()
+                res_deferreds_2[0].addBoth(self.check_context, None)
+
+            # complete the first request
+            with logcontext.PreserveLoggingContext():
+                persp_deferred.callback(persp_resp)
+            self.assertIs(LoggingContext.current_context(), context_11)
+
+            with logcontext.PreserveLoggingContext():
+                yield res_deferreds[0]
+                yield res_deferreds_2[0]
+
+    @defer.inlineCallbacks
+    def test_verify_json_for_server(self):
+        kr = keyring.Keyring(self.hs)
+
+        key1 = signedjson.key.generate_signing_key(1)
+        yield self.hs.datastore.store_server_verify_key(
+            "server9", "", time.time() * 1000,
+            signedjson.key.get_verify_key(key1),
+        )
+        json1 = {}
+        signedjson.sign.sign_json(json1, "server9", key1)
+
+        sentinel_context = LoggingContext.current_context()
+
+        with LoggingContext("one") as context_one:
+            context_one.test_key = "one"
+
+            defer = kr.verify_json_for_server("server9", {})
+            try:
+                yield defer
+                self.fail("should fail on unsigned json")
+            except SynapseError:
+                pass
+            self.assertIs(LoggingContext.current_context(), context_one)
+
+            defer = kr.verify_json_for_server("server9", json1)
+            self.assertFalse(defer.called)
+            self.assertIs(LoggingContext.current_context(), sentinel_context)
+            yield defer
+
+            self.assertIs(LoggingContext.current_context(), context_one)
diff --git a/tests/test_dns.py b/tests/test_dns.py
index c394c57ee7..d08b0f4333 100644
--- a/tests/test_dns.py
+++ b/tests/test_dns.py
@@ -24,15 +24,17 @@ from synapse.http.endpoint import resolve_service
 from tests.utils import MockClock
 
 
+@unittest.DEBUG
 class DnsTestCase(unittest.TestCase):
 
     @defer.inlineCallbacks
     def test_resolve(self):
         dns_client_mock = Mock()
 
-        service_name = "test_service.examle.com"
+        service_name = "test_service.example.com"
         host_name = "example.com"
         ip_address = "127.0.0.1"
+        ip6_address = "::1"
 
         answer_srv = dns.RRHeader(
             type=dns.SRV,
@@ -48,8 +50,22 @@ class DnsTestCase(unittest.TestCase):
             )
         )
 
-        dns_client_mock.lookupService.return_value = ([answer_srv], None, None)
-        dns_client_mock.lookupAddress.return_value = ([answer_a], None, None)
+        answer_aaaa = dns.RRHeader(
+            type=dns.AAAA,
+            payload=dns.Record_AAAA(
+                address=ip6_address,
+            )
+        )
+
+        dns_client_mock.lookupService.return_value = defer.succeed(
+            ([answer_srv], None, None),
+        )
+        dns_client_mock.lookupAddress.return_value = defer.succeed(
+            ([answer_a], None, None),
+        )
+        dns_client_mock.lookupIPV6Address.return_value = defer.succeed(
+            ([answer_aaaa], None, None),
+        )
 
         cache = {}
 
@@ -59,10 +75,12 @@ class DnsTestCase(unittest.TestCase):
 
         dns_client_mock.lookupService.assert_called_once_with(service_name)
         dns_client_mock.lookupAddress.assert_called_once_with(host_name)
+        dns_client_mock.lookupIPV6Address.assert_called_once_with(host_name)
 
-        self.assertEquals(len(servers), 1)
+        self.assertEquals(len(servers), 2)
         self.assertEquals(servers, cache[service_name])
         self.assertEquals(servers[0].host, ip_address)
+        self.assertEquals(servers[1].host, ip6_address)
 
     @defer.inlineCallbacks
     def test_from_cache_expired_and_dns_fail(self):
diff --git a/tests/utils.py b/tests/utils.py
index 4f7e32b3ab..3c81a3e16d 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -56,6 +56,7 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
         config.worker_replication_url = ""
         config.worker_app = None
         config.email_enable_notifs = False
+        config.block_non_admin_invites = False
 
     config.use_frozen_dicts = True
     config.database_config = {"name": "sqlite3"}
diff --git a/tox.ini b/tox.ini
index 39ad305360..f408defc8f 100644
--- a/tox.ini
+++ b/tox.ini
@@ -14,14 +14,38 @@ deps =
 
 setenv =
     PYTHONDONTWRITEBYTECODE = no_byte_code
-    # As of twisted 16.4, trial tries to import the tests as a package, which
-    # means it needs to be on the pythonpath.
-    PYTHONPATH = {toxinidir}
+
 commands =
-    /bin/sh -c "find {toxinidir} -name '*.pyc' -delete ; coverage run {env:COVERAGE_OPTS:} --source={toxinidir}/synapse \
-        {envbindir}/trial {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}"
+    /usr/bin/find "{toxinidir}" -name '*.pyc' -delete
+    coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
+        "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests} {env:TOXSUFFIX:}
     {env:DUMP_COVERAGE_COMMAND:coverage report -m}
 
+[testenv:py27]
+
+# As of twisted 16.4, trial tries to import the tests as a package (previously
+# it loaded the files explicitly), which means they need to be on the
+# pythonpath. Our sdist doesn't include the 'tests' package, so normally it
+# doesn't work within the tox virtualenv.
+#
+# As a workaround, we tell tox to do install with 'pip -e', which just
+# creates a symlink to the project directory instead of unpacking the sdist.
+#
+# (An alternative to this would be to set PYTHONPATH to include the project
+# directory. Note two problems with this:
+#
+#   - if you set it via `setenv`, then it is also set during the 'install'
+#     phase, which inhibits unpacking the sdist, so the virtualenv isn't
+#     useful for anything else without setting PYTHONPATH similarly.
+#
+#   - `synapse` is also loaded from PYTHONPATH so even if you only set
+#     PYTHONPATH for the test phase, we're still running the tests against
+#     the working copy rather than the contents of the sdist. So frankly
+#     you might as well use -e in the first place.
+#
+# )
+usedevelop=true
+
 [testenv:packaging]
 deps =
     check-manifest