summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2017-09-25 11:50:11 +0100
committerGitHub <noreply@github.com>2017-09-25 11:50:11 +0100
commit94133d7ce8ab8dca8b8f47c0f2666d59486884a5 (patch)
treed472d3d5530e45eee0c156a0446bf42da0a81288 /synapse
parentFix iteration of requests_missing_keys; list doesn't have .values() (diff)
parentFix logcontxt leak in keyclient (#2465) (diff)
downloadsynapse-94133d7ce8ab8dca8b8f47c0f2666d59486884a5.tar.xz
Merge branch 'develop' into develop
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py8
-rw-r--r--synapse/app/_base.py99
-rw-r--r--synapse/app/appservice.py50
-rw-r--r--synapse/app/client_reader.py53
-rw-r--r--synapse/app/federation_reader.py53
-rw-r--r--synapse/app/federation_sender.py57
-rw-r--r--synapse/app/frontend_proxy.py64
-rwxr-xr-xsynapse/app/homeserver.py114
-rw-r--r--synapse/app/media_repository.py53
-rw-r--r--synapse/app/pusher.py57
-rw-r--r--synapse/app/synchrotron.py69
-rw-r--r--synapse/app/user_dir.py53
-rw-r--r--synapse/config/server.py33
-rw-r--r--synapse/config/workers.py1
-rw-r--r--synapse/crypto/keyclient.py17
-rw-r--r--synapse/crypto/keyring.py285
-rw-r--r--synapse/events/spamcheck.py38
-rw-r--r--synapse/federation/federation_base.py138
-rw-r--r--synapse/federation/federation_client.py8
-rw-r--r--synapse/handlers/device.py68
-rw-r--r--synapse/handlers/federation.py16
-rw-r--r--synapse/handlers/message.py8
-rw-r--r--synapse/handlers/room_member.py22
-rw-r--r--synapse/handlers/sync.py133
-rw-r--r--synapse/http/endpoint.py116
-rw-r--r--synapse/push/httppusher.py20
-rw-r--r--synapse/python_dependencies.py3
-rw-r--r--synapse/rest/client/v1/admin.py16
-rw-r--r--synapse/rest/client/v2_alpha/keys.py6
-rw-r--r--synapse/rest/client/v2_alpha/sync.py5
-rw-r--r--synapse/storage/keys.py41
-rw-r--r--synapse/visibility.py14
32 files changed, 963 insertions, 755 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e3da45b416..72858cca1f 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -519,6 +519,14 @@ class Auth(object):
             )
 
     def is_server_admin(self, user):
+        """ Check if the given user is a local server admin.
+
+        Args:
+            user (str): mxid of user to check
+
+        Returns:
+            bool: True if the user is an admin
+        """
         return self.store.is_server_admin(user)
 
     @defer.inlineCallbacks
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
new file mode 100644
index 0000000000..cd0e815919
--- /dev/null
+++ b/synapse/app/_base.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+
+import affinity
+from daemonize import Daemonize
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+from twisted.internet import reactor
+
+
+def start_worker_reactor(appname, config):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        config (synapse.config.Config): config object
+    """
+
+    logger = logging.getLogger(config.worker_app)
+
+    start_reactor(
+        appname,
+        config.soft_file_limit,
+        config.gc_thresholds,
+        config.worker_pid_file,
+        config.worker_daemonize,
+        config.worker_cpu_affinity,
+        logger,
+    )
+
+
+def start_reactor(
+        appname,
+        soft_file_limit,
+        gc_thresholds,
+        pid_file,
+        daemonize,
+        cpu_affinity,
+        logger,
+):
+    """ Run the reactor in the main process
+
+    Daemonizes if necessary, and then configures some resources, before starting
+    the reactor
+
+    Args:
+        appname (str): application name which will be sent to syslog
+        soft_file_limit (int):
+        gc_thresholds:
+        pid_file (str): name of pid file to write to if daemonize is True
+        daemonize (bool): true to run the reactor in a background process
+        cpu_affinity (int|None): cpu affinity mask
+        logger (logging.Logger): logger instance to pass to Daemonize
+    """
+
+    def run():
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
+            logger.info("Running")
+            if cpu_affinity is not None:
+                logger.info("Setting CPU affinity to %s" % cpu_affinity)
+                affinity.set_process_affinity_mask(0, cpu_affinity)
+            change_resource_limit(soft_file_limit)
+            if gc_thresholds:
+                gc.set_threshold(*gc_thresholds)
+            reactor.run()
+
+    if daemonize:
+        daemon = Daemonize(
+            app=appname,
+            pid=pid_file,
+            action=run,
+            auto_close_fds=False,
+            verbose=True,
+            logger=logger,
+        )
+        daemon.start()
+    else:
+        run()
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a476efa63..ba2657bbad 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -13,38 +13,31 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.appservice")
 
 
@@ -181,36 +174,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-appservice",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-appservice", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 09bc1935f1..129cfa901f 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -13,47 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import PublicRoomListRestServlet
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.client_reader")
 
 
@@ -183,36 +175,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-client-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-client-reader", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..40cebe6f4a 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,44 +13,36 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_reader")
 
 
@@ -172,36 +164,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-reader",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-reader", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 03327dc47a..389e3909d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,44 +13,37 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.federation_sender")
 
 
@@ -213,36 +206,12 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
-
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-federation-sender",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-federation-sender", config)
 
 
 class FederationSenderHandler(object):
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 132f18a979..bee4c47498 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -13,48 +13,39 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request,
+)
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-from synapse.api.errors import SynapseError
-from synapse.http.servlet import (
-    RestServlet, parse_json_object_from_request,
-)
-from synapse.rest.client.v2_alpha._base import client_v2_patterns
-
-from synapse import events
-
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
-
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
@@ -234,36 +225,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-frontend-proxy",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-frontend-proxy", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 081e7cce59..84ad8f04a0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,61 +13,48 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import synapse
-
 import gc
 import logging
 import os
 import sys
 
+import synapse
 import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+    LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+    STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
 from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
-    check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
-
-from twisted.internet import reactor, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
-    FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
-    SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
-    SERVER_KEY_V2_PREFIX,
-)
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
 from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+    check_requirements
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
 
 logger = logging.getLogger("synapse.app.homeserver")
 
@@ -446,37 +433,18 @@ def run(hs):
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
 
-    def in_thread():
-        # Uncomment to enable tracing of log context changes.
-        # sys.settrace(logcontext_tracer)
-
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            change_resource_limit(hs.config.soft_file_limit)
-            if hs.config.gc_thresholds:
-                gc.set_threshold(*hs.config.gc_thresholds)
-            reactor.run()
-
-    if hs.config.daemonize:
-
-        if hs.config.print_pidfile:
-            print (hs.config.pid_file)
-
-        daemon = Daemonize(
-            app="synapse-homeserver",
-            pid=hs.config.pid_file,
-            action=lambda: in_thread(),
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-
-        daemon.start()
-    else:
-        in_thread()
+    if hs.config.daemonize and hs.config.print_pidfile:
+        print (hs.config.pid_file)
+
+    _base.start_reactor(
+        "synapse-homeserver",
+        hs.config.soft_file_limit,
+        hs.config.gc_thresholds,
+        hs.config.pid_file,
+        hs.config.daemonize,
+        hs.config.cpu_affinity,
+        logger,
+    )
 
 
 def main():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f57ec784fe..36c18bdbcb 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -13,14 +13,21 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
+from synapse import events
+from synapse.api.urls import (
+    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -33,27 +40,12 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
-    CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.media_repository")
 
 
@@ -180,36 +172,13 @@ def start(config_options):
     ss.get_handlers()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-media-repository",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-media-repository", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f9114acfcb..db9a4d16f4 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -13,41 +13,33 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+import sys
 
 import synapse
-
-from synapse.server import HomeServer
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
 from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
+from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
-    PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-
-from synapse import events
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.pusher")
 
 
@@ -244,18 +236,6 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
@@ -263,18 +243,7 @@ def start(config_options):
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-pusher",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-pusher", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4bdd99a966..80e4ba5336 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -13,56 +13,50 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import contextlib
+import logging
+import sys
 
 import synapse
-
 from synapse.api.constants import EventTypes
+from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
 from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
-
-from twisted.internet import reactor, defer
+from twisted.internet import defer, reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
 logger = logging.getLogger("synapse.app.synchrotron")
 
 
@@ -440,36 +434,13 @@ def start(config_options):
     ss.setup()
     ss.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ss.get_datastore().start_profiling()
         ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-synchrotron",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-synchrotron", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 8c6300db9d..be661a70c7 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -14,16 +14,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse
+import logging
+import sys
 
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
 from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
 from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -31,26 +34,17 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn
 from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
 from twisted.internet import reactor
 from twisted.web.resource import Resource
 
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
 logger = logging.getLogger("synapse.app.user_dir")
 
 
@@ -233,36 +227,13 @@ def start(config_options):
     ps.setup()
     ps.start_listening(config.worker_listeners)
 
-    def run():
-        # make sure that we run the reactor with the sentinel log context,
-        # otherwise other PreserveLoggingContext instances will get confused
-        # and complain when they see the logcontext arbitrarily swapping
-        # between the sentinel and `run` logcontexts.
-        with PreserveLoggingContext():
-            logger.info("Running")
-            change_resource_limit(config.soft_file_limit)
-            if config.gc_thresholds:
-                gc.set_threshold(*config.gc_thresholds)
-            reactor.run()
-
     def start():
         ps.get_datastore().start_profiling()
         ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
-    if config.worker_daemonize:
-        daemon = Daemonize(
-            app="synapse-user-dir",
-            pid=config.worker_pid_file,
-            action=run,
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
-        daemon.start()
-    else:
-        run()
+    _base.start_worker_reactor("synapse-user-dir", config)
 
 
 if __name__ == '__main__':
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 28b4e5f50c..c9a1715f1f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ class ServerConfig(Config):
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
+        self.cpu_affinity = config.get("cpu_affinity")
 
         # Whether to send federation traffic out in this process. This only
         # applies to some federation traffic, and so shouldn't be used to
@@ -41,6 +43,12 @@ class ServerConfig(Config):
 
         self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
 
+        # Whether we should block invites sent to users on this server
+        # (other than those sent by local server admins)
+        self.block_non_admin_invites = config.get(
+            "block_non_admin_invites", False,
+        )
+
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
                 self.public_baseurl += '/'
@@ -147,6 +155,27 @@ class ServerConfig(Config):
         # When running as a daemon, the file to store the pid in
         pid_file: %(pid_file)s
 
+        # CPU affinity mask. Setting this restricts the CPUs on which the
+        # process will be scheduled. It is represented as a bitmask, with the
+        # lowest order bit corresponding to the first logical CPU and the
+        # highest order bit corresponding to the last logical CPU. Not all CPUs
+        # may exist on a given system but a mask may specify more CPUs than are
+        # present.
+        #
+        # For example:
+        #    0x00000001  is processor #0,
+        #    0x00000003  is processors #0 and #1,
+        #    0xFFFFFFFF  is all processors (#0 through #31).
+        #
+        # Pinning a Python process to a single CPU is desirable, because Python
+        # is inherently single-threaded due to the GIL, and can suffer a
+        # 30-40%% slowdown due to cache blow-out and thread context switching
+        # if the scheduler happens to schedule the underlying threads across
+        # different cores. See
+        # https://www.mirantis.com/blog/improve-performance-python-programs-restricting-single-cpu/.
+        #
+        # cpu_affinity: 0xFFFFFFFF
+
         # Whether to serve a web client from the HTTP/HTTPS root resource.
         web_client: True
 
@@ -171,6 +200,10 @@ class ServerConfig(Config):
         # and sync operations. The default value is -1, means no upper limit.
         # filter_timeline_limit: 5000
 
+        # Whether room invites to users on this server should be blocked
+        # (except those sent by local server admins). The default is False.
+        # block_non_admin_invites: True
+
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 99d5d8aaeb..c5a5a8919c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -33,6 +33,7 @@ class WorkerConfig(Config):
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+        self.worker_cpu_affinity = config.get("worker_cpu_affinity")
 
         if self.worker_listeners:
             for listener in self.worker_listeners:
diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py
index c2bd64d6c2..f1fd488b90 100644
--- a/synapse/crypto/keyclient.py
+++ b/synapse/crypto/keyclient.py
@@ -13,14 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+from synapse.util import logcontext
 from twisted.web.http import HTTPClient
 from twisted.internet.protocol import Factory
 from twisted.internet import defer, reactor
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.logcontext import (
-    preserve_context_over_fn, preserve_context_over_deferred
-)
 import simplejson as json
 import logging
 
@@ -43,14 +40,10 @@ def fetch_server_key(server_name, ssl_context_factory, path=KEY_API_V1):
 
     for i in range(5):
         try:
-            protocol = yield preserve_context_over_fn(
-                endpoint.connect, factory
-            )
-            server_response, server_certificate = yield preserve_context_over_deferred(
-                protocol.remote_key
-            )
-            defer.returnValue((server_response, server_certificate))
-            return
+            with logcontext.PreserveLoggingContext():
+                protocol = yield endpoint.connect(factory)
+                server_response, server_certificate = yield protocol.remote_key
+                defer.returnValue((server_response, server_certificate))
         except SynapseKeyClientError as e:
             logger.exception("Error getting key for %r" % (server_name,))
             if e.status.startswith("4"):
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index c900f4d6df..054bac456d 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,10 +16,9 @@
 
 from synapse.crypto.keyclient import fetch_server_key
 from synapse.api.errors import SynapseError, Codes
-from synapse.util import unwrapFirstError
-from synapse.util.async import ObservableDeferred
+from synapse.util import unwrapFirstError, logcontext
 from synapse.util.logcontext import (
-    preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
+    PreserveLoggingContext,
     preserve_fn
 )
 from synapse.util.metrics import Measure
@@ -57,7 +57,8 @@ Attributes:
     json_object(dict): The JSON object to verify.
     deferred(twisted.internet.defer.Deferred):
         A deferred (server_name, key_id, verify_key) tuple that resolves when
-        a verify key has been fetched
+        a verify key has been fetched. The deferreds' callbacks are run with no
+        logcontext.
 """
 
 
@@ -74,23 +75,32 @@ class Keyring(object):
         self.perspective_servers = self.config.perspectives
         self.hs = hs
 
+        # map from server name to Deferred. Has an entry for each server with
+        # an ongoing key download; the Deferred completes once the download
+        # completes.
+        #
+        # These are regular, logcontext-agnostic Deferreds.
         self.key_downloads = {}
 
     def verify_json_for_server(self, server_name, json_object):
-        return self.verify_json_objects_for_server(
-            [(server_name, json_object)]
-        )[0]
+        return logcontext.make_deferred_yieldable(
+            self.verify_json_objects_for_server(
+                [(server_name, json_object)]
+            )[0]
+        )
 
     def verify_json_objects_for_server(self, server_and_json):
-        """Bulk verfies signatures of json objects, bulk fetching keys as
+        """Bulk verifies signatures of json objects, bulk fetching keys as
         necessary.
 
         Args:
             server_and_json (list): List of pairs of (server_name, json_object)
 
         Returns:
-            list of deferreds indicating success or failure to verify each
-            json object's signature for the given server_name.
+            List<Deferred>: for each input pair, a deferred indicating success
+                or failure to verify each json object's signature for the given
+                server_name. The deferreds run their callbacks in the sentinel
+                logcontext.
         """
         verify_requests = []
 
@@ -117,93 +127,71 @@ class Keyring(object):
 
             verify_requests.append(verify_request)
 
-        @defer.inlineCallbacks
-        def handle_key_deferred(verify_request):
-            server_name = verify_request.server_name
-            try:
-                _, key_id, verify_key = yield verify_request.deferred
-            except IOError as e:
-                logger.warn(
-                    "Got IOError when downloading keys for %s: %s %s",
-                    server_name, type(e).__name__, str(e.message),
-                )
-                raise SynapseError(
-                    502,
-                    "Error downloading keys for %s" % (server_name,),
-                    Codes.UNAUTHORIZED,
-                )
-            except Exception as e:
-                logger.exception(
-                    "Got Exception when downloading keys for %s: %s %s",
-                    server_name, type(e).__name__, str(e.message),
-                )
-                raise SynapseError(
-                    401,
-                    "No key for %s with id %s" % (server_name, key_ids),
-                    Codes.UNAUTHORIZED,
-                )
+        preserve_fn(self._start_key_lookups)(verify_requests)
+
+        # Pass those keys to handle_key_deferred so that the json object
+        # signatures can be verified
+        handle = preserve_fn(_handle_key_deferred)
+        return [
+            handle(rq) for rq in verify_requests
+        ]
 
-            json_object = verify_request.json_object
+    @defer.inlineCallbacks
+    def _start_key_lookups(self, verify_requests):
+        """Sets off the key fetches for each verify request
 
-            logger.debug("Got key %s %s:%s for server %s, verifying" % (
-                key_id, verify_key.alg, verify_key.version, server_name,
-            ))
-            try:
-                verify_signed_json(json_object, server_name, verify_key)
-            except:
-                raise SynapseError(
-                    401,
-                    "Invalid signature for server %s with key %s:%s" % (
-                        server_name, verify_key.alg, verify_key.version
-                    ),
-                    Codes.UNAUTHORIZED,
-                )
+        Once each fetch completes, verify_request.deferred will be resolved.
+
+        Args:
+            verify_requests (List[VerifyKeyRequest]):
+        """
 
+        # create a deferred for each server we're going to look up the keys
+        # for; we'll resolve them once we have completed our lookups.
+        # These will be passed into wait_for_previous_lookups to block
+        # any other lookups until we have finished.
+        # The deferreds are called with no logcontext.
         server_to_deferred = {
-            server_name: defer.Deferred()
-            for server_name, _ in server_and_json
+            rq.server_name: defer.Deferred()
+            for rq in verify_requests
         }
 
-        with PreserveLoggingContext():
+        # We want to wait for any previous lookups to complete before
+        # proceeding.
+        yield self.wait_for_previous_lookups(
+            [rq.server_name for rq in verify_requests],
+            server_to_deferred,
+        )
 
-            # We want to wait for any previous lookups to complete before
-            # proceeding.
-            wait_on_deferred = self.wait_for_previous_lookups(
-                [server_name for server_name, _ in server_and_json],
-                server_to_deferred,
-            )
+        # Actually start fetching keys.
+        self._get_server_verify_keys(verify_requests)
 
-            # Actually start fetching keys.
-            wait_on_deferred.addBoth(
-                lambda _: self.get_server_verify_keys(verify_requests)
-            )
+        # When we've finished fetching all the keys for a given server_name,
+        # resolve the deferred passed to `wait_for_previous_lookups` so that
+        # any lookups waiting will proceed.
+        #
+        # map from server name to a set of request ids
+        server_to_request_ids = {}
 
-            # When we've finished fetching all the keys for a given server_name,
-            # resolve the deferred passed to `wait_for_previous_lookups` so that
-            # any lookups waiting will proceed.
-            server_to_request_ids = {}
-
-            def remove_deferreds(res, server_name, verify_request):
-                request_id = id(verify_request)
-                server_to_request_ids[server_name].discard(request_id)
-                if not server_to_request_ids[server_name]:
-                    d = server_to_deferred.pop(server_name, None)
-                    if d:
-                        d.callback(None)
-                return res
-
-            for verify_request in verify_requests:
-                server_name = verify_request.server_name
-                request_id = id(verify_request)
-                server_to_request_ids.setdefault(server_name, set()).add(request_id)
-                deferred.addBoth(remove_deferreds, server_name, verify_request)
+        for verify_request in verify_requests:
+            server_name = verify_request.server_name
+            request_id = id(verify_request)
+            server_to_request_ids.setdefault(server_name, set()).add(request_id)
 
-        # Pass those keys to handle_key_deferred so that the json object
-        # signatures can be verified
-        return [
-            preserve_context_over_fn(handle_key_deferred, verify_request)
-            for verify_request in verify_requests
-        ]
+        def remove_deferreds(res, verify_request):
+            server_name = verify_request.server_name
+            request_id = id(verify_request)
+            server_to_request_ids[server_name].discard(request_id)
+            if not server_to_request_ids[server_name]:
+                d = server_to_deferred.pop(server_name, None)
+                if d:
+                    d.callback(None)
+            return res
+
+        for verify_request in verify_requests:
+            verify_request.deferred.addBoth(
+                remove_deferreds, verify_request,
+            )
 
     @defer.inlineCallbacks
     def wait_for_previous_lookups(self, server_names, server_to_deferred):
@@ -212,7 +200,13 @@ class Keyring(object):
         Args:
             server_names (list): list of server_names we want to lookup
             server_to_deferred (dict): server_name to deferred which gets
-                resolved once we've finished looking up keys for that server
+                resolved once we've finished looking up keys for that server.
+                The Deferreds should be regular twisted ones which call their
+                callbacks with no logcontext.
+
+        Returns: a Deferred which resolves once all key lookups for the given
+            servers have completed. Follows the synapse rules of logcontext
+            preservation.
         """
         while True:
             wait_on = [
@@ -226,17 +220,15 @@ class Keyring(object):
             else:
                 break
 
-        for server_name, deferred in server_to_deferred.items():
-            d = ObservableDeferred(preserve_context_over_deferred(deferred))
-            self.key_downloads[server_name] = d
-
-            def rm(r, server_name):
-                self.key_downloads.pop(server_name, None)
-                return r
+        def rm(r, server_name_):
+            self.key_downloads.pop(server_name_, None)
+            return r
 
-            d.addBoth(rm, server_name)
+        for server_name, deferred in server_to_deferred.items():
+            self.key_downloads[server_name] = deferred
+            deferred.addBoth(rm, server_name)
 
-    def get_server_verify_keys(self, verify_requests):
+    def _get_server_verify_keys(self, verify_requests):
         """Tries to find at least one key for each verify request
 
         For each verify_request, verify_request.deferred is called back with
@@ -305,21 +297,23 @@ class Keyring(object):
                     if not missing_keys:
                         break
 
-                for verify_request in requests_missing_keys:
-                    verify_request.deferred.errback(SynapseError(
-                        401,
-                        "No key for %s with id %s" % (
-                            verify_request.server_name, verify_request.key_ids,
-                        ),
-                        Codes.UNAUTHORIZED,
-                    ))
+                with PreserveLoggingContext():
+                    for verify_request in requests_missing_keys:
+                        verify_request.deferred.errback(SynapseError(
+                            401,
+                            "No key for %s with id %s" % (
+                                verify_request.server_name, verify_request.key_ids,
+                            ),
+                            Codes.UNAUTHORIZED,
+                        ))
 
         def on_err(err):
-            for verify_request in verify_requests:
-                if not verify_request.deferred.called:
-                    verify_request.deferred.errback(err)
+            with PreserveLoggingContext():
+                for verify_request in verify_requests:
+                    if not verify_request.deferred.called:
+                        verify_request.deferred.errback(err)
 
-        do_iterations().addErrback(on_err)
+        preserve_fn(do_iterations)().addErrback(on_err)
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
@@ -333,7 +327,7 @@ class Keyring(object):
             Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
                 server_name -> key_id -> VerifyKey
         """
-        res = yield preserve_context_over_deferred(defer.gatherResults(
+        res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.get_server_verify_keys)(
                     server_name, key_ids
@@ -341,7 +335,7 @@ class Keyring(object):
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(dict(res))
 
@@ -362,13 +356,13 @@ class Keyring(object):
                 )
                 defer.returnValue({})
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(get_key)(p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         union_of_keys = {}
         for result in results:
@@ -402,13 +396,13 @@ class Keyring(object):
 
             defer.returnValue(keys)
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(get_key)(server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         merged = {}
         for result in results:
@@ -485,7 +479,7 @@ class Keyring(object):
             for server_name, response_keys in processed_response.items():
                 keys.setdefault(server_name, {}).update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=server_name,
@@ -495,7 +489,7 @@ class Keyring(object):
                 for server_name, response_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -543,7 +537,7 @@ class Keyring(object):
 
             keys.update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store_keys)(
                     server_name=key_server_name,
@@ -553,7 +547,7 @@ class Keyring(object):
                 for key_server_name, verify_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -619,7 +613,7 @@ class Keyring(object):
         response_keys.update(verify_keys)
         response_keys.update(old_verify_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_keys_json)(
                     server_name=server_name,
@@ -632,7 +626,7 @@ class Keyring(object):
                 for key_id in updated_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         results[server_name] = response_keys
 
@@ -710,7 +704,6 @@ class Keyring(object):
 
         defer.returnValue(verify_keys)
 
-    @defer.inlineCallbacks
     def store_keys(self, server_name, from_server, verify_keys):
         """Store a collection of verify keys for a given server
         Args:
@@ -721,7 +714,7 @@ class Keyring(object):
             A deferred that completes when the keys are stored.
         """
         # TODO(markjh): Store whether the keys have expired.
-        yield preserve_context_over_deferred(defer.gatherResults(
+        return logcontext.make_deferred_yieldable(defer.gatherResults(
             [
                 preserve_fn(self.store.store_server_verify_key)(
                     server_name, server_name, key.time_added, key
@@ -729,4 +722,48 @@ class Keyring(object):
                 for key_id, key in verify_keys.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
+
+
+@defer.inlineCallbacks
+def _handle_key_deferred(verify_request):
+    server_name = verify_request.server_name
+    try:
+        with PreserveLoggingContext():
+            _, key_id, verify_key = yield verify_request.deferred
+    except IOError as e:
+        logger.warn(
+            "Got IOError when downloading keys for %s: %s %s",
+            server_name, type(e).__name__, str(e.message),
+        )
+        raise SynapseError(
+            502,
+            "Error downloading keys for %s" % (server_name,),
+            Codes.UNAUTHORIZED,
+        )
+    except Exception as e:
+        logger.exception(
+            "Got Exception when downloading keys for %s: %s %s",
+            server_name, type(e).__name__, str(e.message),
+        )
+        raise SynapseError(
+            401,
+            "No key for %s with id %s" % (server_name, verify_request.key_ids),
+            Codes.UNAUTHORIZED,
+        )
+
+    json_object = verify_request.json_object
+
+    logger.debug("Got key %s %s:%s for server %s, verifying" % (
+        key_id, verify_key.alg, verify_key.version, server_name,
+    ))
+    try:
+        verify_signed_json(json_object, server_name, verify_key)
+    except:
+        raise SynapseError(
+            401,
+            "Invalid signature for server %s with key %s:%s" % (
+                server_name, verify_key.alg, verify_key.version
+            ),
+            Codes.UNAUTHORIZED,
+        )
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
new file mode 100644
index 0000000000..56fa9e556e
--- /dev/null
+++ b/synapse/events/spamcheck.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def check_event_for_spam(event):
+    """Checks if a given event is considered "spammy" by this server.
+
+    If the server considers an event spammy, then it will be rejected if
+    sent by a local user. If it is sent by a user on another server, then
+    users receive a blank event.
+
+    Args:
+        event (synapse.events.EventBase): the event to be checked
+
+    Returns:
+        bool: True if the event is spammy.
+    """
+    if not hasattr(event, "content") or "body" not in event.content:
+        return False
+
+    # for example:
+    #
+    # if "the third flower is green" in event.content["body"]:
+    #    return True
+
+    return False
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 2339cc9034..babd9ea078 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -12,21 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
-from twisted.internet import defer
-
-from synapse.events.utils import prune_event
-
-from synapse.crypto.event_signing import check_event_content_hash
-
-from synapse.api.errors import SynapseError
-
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-
 import logging
 
+from synapse.api.errors import SynapseError
+from synapse.crypto.event_signing import check_event_content_hash
+from synapse.events import spamcheck
+from synapse.events.utils import prune_event
+from synapse.util import unwrapFirstError, logcontext
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -57,56 +50,52 @@ class FederationBase(object):
         """
         deferreds = self._check_sigs_and_hashes(pdus)
 
-        def callback(pdu):
-            return pdu
+        @defer.inlineCallbacks
+        def handle_check_result(pdu, deferred):
+            try:
+                res = yield logcontext.make_deferred_yieldable(deferred)
+            except SynapseError:
+                res = None
 
-        def errback(failure, pdu):
-            failure.trap(SynapseError)
-            return None
-
-        def try_local_db(res, pdu):
             if not res:
                 # Check local db.
-                return self.store.get_event(
+                res = yield self.store.get_event(
                     pdu.event_id,
                     allow_rejected=True,
                     allow_none=True,
                 )
-            return res
 
-        def try_remote(res, pdu):
             if not res and pdu.origin != origin:
-                return self.get_pdu(
-                    destinations=[pdu.origin],
-                    event_id=pdu.event_id,
-                    outlier=outlier,
-                    timeout=10000,
-                ).addErrback(lambda e: None)
-            return res
-
-        def warn(res, pdu):
+                try:
+                    res = yield self.get_pdu(
+                        destinations=[pdu.origin],
+                        event_id=pdu.event_id,
+                        outlier=outlier,
+                        timeout=10000,
+                    )
+                except SynapseError:
+                    pass
+
             if not res:
                 logger.warn(
                     "Failed to find copy of %s with valid signature",
                     pdu.event_id,
                 )
-            return res
 
-        for pdu, deferred in zip(pdus, deferreds):
-            deferred.addCallbacks(
-                callback, errback, errbackArgs=[pdu]
-            ).addCallback(
-                try_local_db, pdu
-            ).addCallback(
-                try_remote, pdu
-            ).addCallback(
-                warn, pdu
-            )
+            defer.returnValue(res)
 
-        valid_pdus = yield preserve_context_over_deferred(defer.gatherResults(
-            deferreds,
-            consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        handle = logcontext.preserve_fn(handle_check_result)
+        deferreds2 = [
+            handle(pdu, deferred)
+            for pdu, deferred in zip(pdus, deferreds)
+        ]
+
+        valid_pdus = yield logcontext.make_deferred_yieldable(
+            defer.gatherResults(
+                deferreds2,
+                consumeErrors=True,
+            )
+        ).addErrback(unwrapFirstError)
 
         if include_none:
             defer.returnValue(valid_pdus)
@@ -114,15 +103,24 @@ class FederationBase(object):
             defer.returnValue([p for p in valid_pdus if p])
 
     def _check_sigs_and_hash(self, pdu):
-        return self._check_sigs_and_hashes([pdu])[0]
+        return logcontext.make_deferred_yieldable(
+            self._check_sigs_and_hashes([pdu])[0],
+        )
 
     def _check_sigs_and_hashes(self, pdus):
-        """Throws a SynapseError if a PDU does not have the correct
-        signatures.
+        """Checks that each of the received events is correctly signed by the
+        sending server.
+
+        Args:
+            pdus (list[FrozenEvent]): the events to be checked
 
         Returns:
-            FrozenEvent: Either the given event or it redacted if it failed the
-            content hash check.
+            list[Deferred]: for each input event, a deferred which:
+              * returns the original event if the checks pass
+              * returns a redacted version of the event (if the signature
+                matched but the hash did not)
+              * throws a SynapseError if the signature check failed.
+            The deferreds run their callbacks in the sentinel logcontext.
         """
 
         redacted_pdus = [
@@ -130,26 +128,38 @@ class FederationBase(object):
             for pdu in pdus
         ]
 
-        deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([
+        deferreds = self.keyring.verify_json_objects_for_server([
             (p.origin, p.get_pdu_json())
             for p in redacted_pdus
         ])
 
+        ctx = logcontext.LoggingContext.current_context()
+
         def callback(_, pdu, redacted):
-            if not check_event_content_hash(pdu):
-                logger.warn(
-                    "Event content has been tampered, redacting %s: %s",
-                    pdu.event_id, pdu.get_pdu_json()
-                )
-                return redacted
-            return pdu
+            with logcontext.PreserveLoggingContext(ctx):
+                if not check_event_content_hash(pdu):
+                    logger.warn(
+                        "Event content has been tampered, redacting %s: %s",
+                        pdu.event_id, pdu.get_pdu_json()
+                    )
+                    return redacted
+
+                if spamcheck.check_event_for_spam(pdu):
+                    logger.warn(
+                        "Event contains spam, redacting %s: %s",
+                        pdu.event_id, pdu.get_pdu_json()
+                    )
+                    return redacted
+
+                return pdu
 
         def errback(failure, pdu):
             failure.trap(SynapseError)
-            logger.warn(
-                "Signature check failed for %s",
-                pdu.event_id,
-            )
+            with logcontext.PreserveLoggingContext(ctx):
+                logger.warn(
+                    "Signature check failed for %s",
+                    pdu.event_id,
+                )
             return failure
 
         for deferred, pdu, redacted in zip(deferreds, pdus, redacted_pdus):
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 861441708b..7c5e5d957f 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,7 +22,7 @@ from synapse.api.constants import Membership
 from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
-from synapse.util import unwrapFirstError
+from synapse.util import unwrapFirstError, logcontext
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@@ -189,10 +189,10 @@ class FederationClient(FederationBase):
         ]
 
         # FIXME: We should handle signature failures more gracefully.
-        pdus[:] = yield preserve_context_over_deferred(defer.gatherResults(
+        pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             self._check_sigs_and_hashes(pdus),
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(pdus)
 
@@ -252,7 +252,7 @@ class FederationClient(FederationBase):
                     pdu = pdu_list[0]
 
                     # Check signatures are correct.
-                    signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
+                    signed_pdu = yield self._check_sigs_and_hash(pdu)
 
                     break
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ed60d494ff..dac4b3f4e0 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -270,6 +270,8 @@ class DeviceHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
+        now_token = yield self.hs.get_event_sources().get_current_token()
+
         room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # First we check if any devices have changed
@@ -280,11 +282,30 @@ class DeviceHandler(BaseHandler):
         # Then work out if any users have since joined
         rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
 
+        member_events = yield self.store.get_membership_changes_for_user(
+            user_id, from_token.room_key, now_token.room_key
+        )
+        rooms_changed.update(event.room_id for event in member_events)
+
         stream_ordering = RoomStreamToken.parse_stream_token(
-            from_token.room_key).stream
+            from_token.room_key
+        ).stream
 
         possibly_changed = set(changed)
+        possibly_left = set()
         for room_id in rooms_changed:
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
+
+            # The user may have left the room
+            # TODO: Check if they actually did or if we were just invited.
+            if room_id not in room_ids:
+                for key, event_id in current_state_ids.iteritems():
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_left.add(state_key)
+                continue
+
             # Fetch the current state at the time.
             try:
                 event_ids = yield self.store.get_forward_extremeties_for_room(
@@ -295,8 +316,6 @@ class DeviceHandler(BaseHandler):
                 # ordering: treat it the same as a new room
                 event_ids = []
 
-            current_state_ids = yield self.store.get_current_state_ids(room_id)
-
             # special-case for an empty prev state: include all members
             # in the changed list
             if not event_ids:
@@ -307,9 +326,25 @@ class DeviceHandler(BaseHandler):
                     possibly_changed.add(state_key)
                 continue
 
+            current_member_id = current_state_ids.get((EventTypes.Member, user_id))
+            if not current_member_id:
+                continue
+
             # mapping from event_id -> state_dict
             prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
 
+            # Check if we've joined the room? If so we just blindly add all the users to
+            # the "possibly changed" users.
+            for state_dict in prev_state_ids.itervalues():
+                member_event = state_dict.get((EventTypes.Member, user_id), None)
+                if not member_event or member_event != current_member_id:
+                    for key, event_id in current_state_ids.iteritems():
+                        etype, state_key = key
+                        if etype != EventTypes.Member:
+                            continue
+                        possibly_changed.add(state_key)
+                    break
+
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
@@ -320,19 +355,30 @@ class DeviceHandler(BaseHandler):
 
                 # check if this member has changed since any of the extremities
                 # at the stream_ordering, and add them to the list if so.
-                for state_dict in prev_state_ids.values():
+                for state_dict in prev_state_ids.itervalues():
                     prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
-                        possibly_changed.add(state_key)
+                        if state_key != user_id:
+                            possibly_changed.add(state_key)
                         break
 
-        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-            user_id
-        )
+        if possibly_changed or possibly_left:
+            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+                user_id
+            )
 
-        # Take the intersection of the users whose devices may have changed
-        # and those that actually still share a room with the user
-        defer.returnValue(users_who_share_room & possibly_changed)
+            # Take the intersection of the users whose devices may have changed
+            # and those that actually still share a room with the user
+            possibly_joined = possibly_changed & users_who_share_room
+            possibly_left = (possibly_changed | possibly_left) - users_who_share_room
+        else:
+            possibly_joined = []
+            possibly_left = []
+
+        defer.returnValue({
+            "changed": list(possibly_joined),
+            "left": list(possibly_left),
+        })
 
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b790a7c2ef..18f87cad67 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1074,6 +1074,9 @@ class FederationHandler(BaseHandler):
         if is_blocked:
             raise SynapseError(403, "This room has been blocked on this server")
 
+        if self.hs.config.block_non_admin_invites:
+            raise SynapseError(403, "This server does not accept room invites")
+
         membership = event.content.get("membership")
         if event.type != EventTypes.Member or membership != Membership.INVITE:
             raise SynapseError(400, "The event was not an m.room.member invite event")
@@ -1606,7 +1609,7 @@ class FederationHandler(BaseHandler):
 
             context.rejected = RejectedReason.AUTH_ERROR
 
-        if event.type == EventTypes.GuestAccess:
+        if event.type == EventTypes.GuestAccess and not context.rejected:
             yield self.maybe_kick_guest_users(event)
 
         defer.returnValue(context)
@@ -2090,6 +2093,14 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
+        """Handle an exchange_third_party_invite request from a remote server
+
+        The remote server will call this when it wants to turn a 3pid invite
+        into a normal m.room.member invite.
+
+        Returns:
+            Deferred: resolves (to None)
+        """
         builder = self.event_builder_factory.new(event_dict)
 
         message_handler = self.hs.get_handlers().message_handler
@@ -2108,9 +2119,12 @@ class FederationHandler(BaseHandler):
             raise e
         yield self._check_signature(event, context)
 
+        # XXX we send the invite here, but send_membership_event also sends it,
+        # so we end up making two requests. I think this is redundant.
         returned_invite = yield self.send_invite(origin, event)
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
+
         member_handler = self.hs.get_handlers().room_member_handler
         yield member_handler.send_membership_event(None, event, context)
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index be4f123c54..da18bf23db 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+from synapse.events import spamcheck
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -321,6 +321,12 @@ class MessageHandler(BaseHandler):
             token_id=requester.access_token_id,
             txn_id=txn_id
         )
+
+        if spamcheck.check_event_for_spam(event):
+            raise SynapseError(
+                403, "Spam is not permitted here", Codes.FORBIDDEN
+            )
+
         yield self.send_nonmember_event(
             requester,
             event,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index b3f979b246..9a498c2d3e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -191,6 +191,8 @@ class RoomMemberHandler(BaseHandler):
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
 
+        # if this is a join with a 3pid signature, we may need to turn a 3pid
+        # invite into a normal invite before we can handle the join.
         if third_party_signed is not None:
             replication = self.hs.get_replication_layer()
             yield replication.exchange_third_party_invite(
@@ -208,6 +210,16 @@ class RoomMemberHandler(BaseHandler):
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
 
+        if (effective_membership_state == "invite" and
+                self.hs.config.block_non_admin_invites):
+            is_requester_admin = yield self.auth.is_server_admin(
+                requester.user,
+            )
+            if not is_requester_admin:
+                raise SynapseError(
+                    403, "Invites have been disabled on this server",
+                )
+
         latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         current_state_ids = yield self.state_handler.get_current_state_ids(
             room_id, latest_event_ids=latest_event_ids,
@@ -471,6 +483,16 @@ class RoomMemberHandler(BaseHandler):
             requester,
             txn_id
     ):
+        if self.hs.config.block_non_admin_invites:
+            is_requester_admin = yield self.auth.is_server_admin(
+                requester.user,
+            )
+            if not is_requester_admin:
+                raise SynapseError(
+                    403, "Invites have been disabled on this server",
+                    Codes.FORBIDDEN,
+                )
+
         invitee = yield self._lookup_3pid(
             id_server, medium, address
         )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e6df1819b9..af1b527840 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -108,6 +108,16 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
         return True
 
 
+class DeviceLists(collections.namedtuple("DeviceLists", [
+    "changed",   # list of user_ids whose devices may have changed
+    "left",      # list of user_ids whose devices we no longer track
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        return bool(self.changed or self.left)
+
+
 class SyncResult(collections.namedtuple("SyncResult", [
     "next_batch",  # Token for the next sync
     "presence",  # List of presence events for the user.
@@ -283,6 +293,11 @@ class SyncHandler(object):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = sync_config.filter_collection.blocks_all_room_timeline()
 
+            # Pull out the current state, as we always want to include those events
+            # in the timeline if they're there.
+            current_state_ids = yield self.state.get_current_state_ids(room_id)
+            current_state_ids = frozenset(current_state_ids.itervalues())
+
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
             else:
@@ -294,6 +309,7 @@ class SyncHandler(object):
                     self.store,
                     sync_config.user.to_string(),
                     recents,
+                    always_include_ids=current_state_ids,
                 )
             else:
                 recents = []
@@ -329,6 +345,7 @@ class SyncHandler(object):
                     self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
+                    always_include_ids=current_state_ids,
                 )
                 loaded_recents.extend(recents)
                 recents = loaded_recents
@@ -535,7 +552,8 @@ class SyncHandler(object):
         res = yield self._generate_sync_entry_for_rooms(
             sync_result_builder, account_data_by_room
         )
-        newly_joined_rooms, newly_joined_users = res
+        newly_joined_rooms, newly_joined_users, _, _ = res
+        _, _, newly_left_rooms, newly_left_users = res
 
         block_all_presence_data = (
             since_token is None and
@@ -549,7 +567,11 @@ class SyncHandler(object):
         yield self._generate_sync_entry_for_to_device(sync_result_builder)
 
         device_lists = yield self._generate_sync_entry_for_device_list(
-            sync_result_builder
+            sync_result_builder,
+            newly_joined_rooms=newly_joined_rooms,
+            newly_joined_users=newly_joined_users,
+            newly_left_rooms=newly_left_rooms,
+            newly_left_users=newly_left_users,
         )
 
         device_id = sync_config.device_id
@@ -574,7 +596,9 @@ class SyncHandler(object):
 
     @measure_func("_generate_sync_entry_for_device_list")
     @defer.inlineCallbacks
-    def _generate_sync_entry_for_device_list(self, sync_result_builder):
+    def _generate_sync_entry_for_device_list(self, sync_result_builder,
+                                             newly_joined_rooms, newly_joined_users,
+                                             newly_left_rooms, newly_left_users):
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
@@ -582,16 +606,40 @@ class SyncHandler(object):
             changed = yield self.store.get_user_whose_devices_changed(
                 since_token.device_list_key
             )
-            if not changed:
-                defer.returnValue([])
+
+            # TODO: Be more clever than this, i.e. remove users who we already
+            # share a room with?
+            for room_id in newly_joined_rooms:
+                joined_users = yield self.state.get_current_user_in_room(room_id)
+                newly_joined_users.update(joined_users)
+
+            for room_id in newly_left_rooms:
+                left_users = yield self.state.get_current_user_in_room(room_id)
+                newly_left_users.update(left_users)
+
+            # TODO: Check that these users are actually new, i.e. either they
+            # weren't in the previous sync *or* they left and rejoined.
+            changed.update(newly_joined_users)
+
+            if not changed and not newly_left_users:
+                defer.returnValue(DeviceLists(
+                    changed=[],
+                    left=newly_left_users,
+                ))
 
             users_who_share_room = yield self.store.get_users_who_share_room_with_user(
                 user_id
             )
 
-            defer.returnValue(users_who_share_room & changed)
+            defer.returnValue(DeviceLists(
+                changed=users_who_share_room & changed,
+                left=set(newly_left_users) - users_who_share_room,
+            ))
         else:
-            defer.returnValue([])
+            defer.returnValue(DeviceLists(
+                changed=[],
+                left=[],
+            ))
 
     @defer.inlineCallbacks
     def _generate_sync_entry_for_to_device(self, sync_result_builder):
@@ -755,8 +803,8 @@ class SyncHandler(object):
             account_data_by_room(dict): Dictionary of per room account data
 
         Returns:
-            Deferred(tuple): Returns a 2-tuple of
-            `(newly_joined_rooms, newly_joined_users)`
+            Deferred(tuple): Returns a 4-tuple of
+            `(newly_joined_rooms, newly_joined_users, newly_left_rooms, newly_left_users)`
         """
         user_id = sync_result_builder.sync_config.user.to_string()
         block_all_room_ephemeral = (
@@ -787,7 +835,7 @@ class SyncHandler(object):
                     )
                     if not tags_by_room:
                         logger.debug("no-oping sync")
-                        defer.returnValue(([], []))
+                        defer.returnValue(([], [], [], []))
 
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id,
@@ -800,7 +848,7 @@ class SyncHandler(object):
 
         if since_token:
             res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
-            room_entries, invited, newly_joined_rooms = res
+            room_entries, invited, newly_joined_rooms, newly_left_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
                 user_id, since_token.account_data_key,
@@ -808,6 +856,7 @@ class SyncHandler(object):
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
             room_entries, invited, newly_joined_rooms = res
+            newly_left_rooms = []
 
             tags_by_room = yield self.store.get_tags_for_user(user_id)
 
@@ -828,17 +877,30 @@ class SyncHandler(object):
 
         # Now we want to get any newly joined users
         newly_joined_users = set()
+        newly_left_users = set()
         if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
-                    joined_sync.timeline.events, joined_sync.state.values()
+                    joined_sync.timeline.events, joined_sync.state.itervalues()
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
                         if event.membership == Membership.JOIN:
                             newly_joined_users.add(event.state_key)
-
-        defer.returnValue((newly_joined_rooms, newly_joined_users))
+                        else:
+                            prev_content = event.unsigned.get("prev_content", {})
+                            prev_membership = prev_content.get("membership", None)
+                            if prev_membership == Membership.JOIN:
+                                newly_left_users.add(event.state_key)
+
+        newly_left_users -= newly_joined_users
+
+        defer.returnValue((
+            newly_joined_rooms,
+            newly_joined_users,
+            newly_left_rooms,
+            newly_left_users,
+        ))
 
     @defer.inlineCallbacks
     def _have_rooms_changed(self, sync_result_builder):
@@ -908,15 +970,28 @@ class SyncHandler(object):
             mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
 
         newly_joined_rooms = []
+        newly_left_rooms = []
         room_entries = []
         invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
+        for room_id, events in mem_change_events_by_room_id.iteritems():
             non_joins = [e for e in events if e.membership != Membership.JOIN]
             has_join = len(non_joins) != len(events)
 
             # We want to figure out if we joined the room at some point since
             # the last sync (even if we have since left). This is to make sure
             # we do send down the room, and with full state, where necessary
+
+            old_state_ids = None
+            if room_id in joined_room_ids and non_joins:
+                # Always include if the user (re)joined the room, especially
+                # important so that device list changes are calculated correctly.
+                # If there are non join member events, but we are still in the room,
+                # then the user must have left and joined
+                newly_joined_rooms.append(room_id)
+
+                # User is in the room so we don't need to do the invite/leave checks
+                continue
+
             if room_id in joined_room_ids or has_join:
                 old_state_ids = yield self.get_state_at(room_id, since_token)
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
@@ -928,12 +1003,33 @@ class SyncHandler(object):
                 if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
                     newly_joined_rooms.append(room_id)
 
-                if room_id in joined_room_ids:
-                    continue
+            # If user is in the room then we don't need to do the invite/leave checks
+            if room_id in joined_room_ids:
+                continue
 
             if not non_joins:
                 continue
 
+            # Check if we have left the room. This can either be because we were
+            # joined before *or* that we since joined and then left.
+            if events[-1].membership != Membership.JOIN:
+                if has_join:
+                    newly_left_rooms.append(room_id)
+                else:
+                    if not old_state_ids:
+                        old_state_ids = yield self.get_state_at(room_id, since_token)
+                        old_mem_ev_id = old_state_ids.get(
+                            (EventTypes.Member, user_id),
+                            None,
+                        )
+                        old_mem_ev = None
+                        if old_mem_ev_id:
+                            old_mem_ev = yield self.store.get_event(
+                                old_mem_ev_id, allow_none=True
+                            )
+                    if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
+                        newly_left_rooms.append(room_id)
+
             # Only bother if we're still currently invited
             should_invite = non_joins[-1].membership == Membership.INVITE
             if should_invite:
@@ -1011,7 +1107,7 @@ class SyncHandler(object):
                     upto_token=since_token,
                 ))
 
-        defer.returnValue((room_entries, invited, newly_joined_rooms))
+        defer.returnValue((room_entries, invited, newly_joined_rooms, newly_left_rooms))
 
     @defer.inlineCallbacks
     def _get_all_rooms(self, sync_result_builder, ignored_users):
@@ -1259,6 +1355,7 @@ class SyncResultBuilder(object):
         self.invited = []
         self.archived = []
         self.device = []
+        self.to_device = []
 
 
 class RoomSyncResultBuilder(object):
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index d8923c9abb..241b17f2cb 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import socket
 
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.internet import defer, reactor
@@ -30,7 +31,10 @@ logger = logging.getLogger(__name__)
 
 SERVER_CACHE = {}
 
-
+# our record of an individual server which can be tried to reach a destination.
+#
+# "host" is actually a dotted-quad or ipv6 address string. Except when there's
+# no SRV record, in which case it is the original hostname.
 _Server = collections.namedtuple(
     "_Server", "priority weight host port expires"
 )
@@ -219,9 +223,10 @@ class SRVClientEndpoint(object):
                 return self.default_server
             else:
                 raise ConnectError(
-                    "Not server available for %s" % self.service_name
+                    "No server available for %s" % self.service_name
                 )
 
+        # look for all servers with the same priority
         min_priority = self.servers[0].priority
         weight_indexes = list(
             (index, server.weight + 1)
@@ -231,11 +236,22 @@ class SRVClientEndpoint(object):
 
         total_weight = sum(weight for index, weight in weight_indexes)
         target_weight = random.randint(0, total_weight)
-
         for index, weight in weight_indexes:
             target_weight -= weight
             if target_weight <= 0:
                 server = self.servers[index]
+                # XXX: this looks totally dubious:
+                #
+                # (a) we never reuse a server until we have been through
+                #     all of the servers at the same priority, so if the
+                #     weights are A: 100, B:1, we always do ABABAB instead of
+                #     AAAA...AAAB (approximately).
+                #
+                # (b) After using all the servers at the lowest priority,
+                #     we move onto the next priority. We should only use the
+                #     second priority if servers at the top priority are
+                #     unreachable.
+                #
                 del self.servers[index]
                 self.used_servers.append(server)
                 return server
@@ -280,26 +296,21 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
                 continue
 
             payload = answer.payload
-            host = str(payload.target)
-            srv_ttl = answer.ttl
 
-            try:
-                answers, _, _ = yield dns_client.lookupAddress(host)
-            except DNSNameError:
-                continue
+            hosts = yield _get_hosts_for_srv_record(
+                dns_client, str(payload.target)
+            )
 
-            for answer in answers:
-                if answer.type == dns.A and answer.payload:
-                    ip = answer.payload.dottedQuad()
-                    host_ttl = min(srv_ttl, answer.ttl)
+            for (ip, ttl) in hosts:
+                host_ttl = min(answer.ttl, ttl)
 
-                    servers.append(_Server(
-                        host=ip,
-                        port=int(payload.port),
-                        priority=int(payload.priority),
-                        weight=int(payload.weight),
-                        expires=int(clock.time()) + host_ttl,
-                    ))
+                servers.append(_Server(
+                    host=ip,
+                    port=int(payload.port),
+                    priority=int(payload.priority),
+                    weight=int(payload.weight),
+                    expires=int(clock.time()) + host_ttl,
+                ))
 
         servers.sort()
         cache[service_name] = list(servers)
@@ -317,3 +328,68 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
             raise e
 
     defer.returnValue(servers)
+
+
+@defer.inlineCallbacks
+def _get_hosts_for_srv_record(dns_client, host):
+    """Look up each of the hosts in a SRV record
+
+    Args:
+        dns_client (twisted.names.dns.IResolver):
+        host (basestring): host to look up
+
+    Returns:
+        Deferred[list[(str, int)]]: a list of (host, ttl) pairs
+
+    """
+    ip4_servers = []
+    ip6_servers = []
+
+    def cb(res):
+        # lookupAddress and lookupIP6Address return a three-tuple
+        # giving the answer, authority, and additional sections of the
+        # response.
+        #
+        # we only care about the answers.
+
+        return res[0]
+
+    def eb(res):
+        res.trap(DNSNameError)
+        return []
+
+    # no logcontexts here, so we can safely fire these off and gatherResults
+    d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
+    d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+    results = yield defer.gatherResults([d1, d2], consumeErrors=True)
+
+    for result in results:
+        for answer in result:
+            if not answer.payload:
+                continue
+
+            try:
+                if answer.type == dns.A:
+                    ip = answer.payload.dottedQuad()
+                    ip4_servers.append((ip, answer.ttl))
+                elif answer.type == dns.AAAA:
+                    ip = socket.inet_ntop(
+                        socket.AF_INET6, answer.payload.address,
+                    )
+                    ip6_servers.append((ip, answer.ttl))
+                else:
+                    # the most likely candidate here is a CNAME record.
+                    # rfc2782 says srvs may not point to aliases.
+                    logger.warn(
+                        "Ignoring unexpected DNS record type %s for %s",
+                        answer.type, host,
+                    )
+                    continue
+            except Exception as e:
+                logger.warn("Ignoring invalid DNS response for %s: %s",
+                            host, e)
+                continue
+
+    # keep the ipv4 results before the ipv6 results, mostly to match historical
+    # behaviour.
+    defer.returnValue(ip4_servers + ip6_servers)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 8a5d473108..62c41cd9db 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -244,6 +244,26 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _build_notification_dict(self, event, tweaks, badge):
+        if self.data.get('format') == 'event_id_only':
+            d = {
+                'notification': {
+                    'event_id': event.event_id,
+                    'room_id': event.room_id,
+                    'counts': {
+                        'unread': badge,
+                    },
+                    'devices': [
+                        {
+                            'app_id': self.app_id,
+                            'pushkey': self.pushkey,
+                            'pushkey_ts': long(self.pushkey_ts / 1000),
+                            'data': self.data_minus_url,
+                        }
+                    ]
+                }
+            }
+            defer.returnValue(d)
+
         ctx = yield push_tools.get_context_for_event(
             self.store, self.state_handler, event, self.user_id
         )
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index ed7f1c89ad..630e92c90e 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -31,7 +31,7 @@ REQUIREMENTS = {
     "pyyaml": ["yaml"],
     "pyasn1": ["pyasn1"],
     "daemonize": ["daemonize"],
-    "py-bcrypt": ["bcrypt"],
+    "bcrypt": ["bcrypt"],
     "pillow": ["PIL"],
     "pydenticon": ["pydenticon"],
     "ujson": ["ujson"],
@@ -40,6 +40,7 @@ REQUIREMENTS = {
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
     "phonenumbers>=8.2.0": ["phonenumbers"],
+    "affinity": ["affinity"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 7d786e8de3..465b25033d 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -168,7 +168,7 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
 
     DEFAULT_MESSAGE = (
         "Sharing illegal content on this server is not permitted and rooms in"
-        " violatation will be blocked."
+        " violation will be blocked."
     )
 
     def __init__(self, hs):
@@ -296,7 +296,7 @@ class QuarantineMediaInRoom(ClientV1RestServlet):
 
 class ResetPasswordRestServlet(ClientV1RestServlet):
     """Post request to allow an administrator reset password for a user.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/reset_password/
             @user:to_reset_password?access_token=admin_access_token
@@ -319,7 +319,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
         """Post request to allow an administrator reset password for a user.
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         """
         UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
@@ -343,7 +343,7 @@ class ResetPasswordRestServlet(ClientV1RestServlet):
 
 class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     """Get request to get specific number of users from Synapse.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
             @admin:user?access_token=admin_access_token&start=0&limit=10
@@ -362,7 +362,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_GET(self, request, target_user_id):
         """Get request to get specific number of users from Synapse.
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         """
         target_user = UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
@@ -395,7 +395,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def on_POST(self, request, target_user_id):
         """Post request to get specific number of users from Synapse..
-        This need a user have a administrator access in Synapse.
+        This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/users_paginate/
             @admin:user?access_token=admin_access_token
@@ -433,7 +433,7 @@ class GetUsersPaginatedRestServlet(ClientV1RestServlet):
 class SearchUsersRestServlet(ClientV1RestServlet):
     """Get request to search user table for specific users according to
     search term.
-    This need a user have a administrator access in Synapse.
+    This needs user to have administrator access in Synapse.
         Example:
             http://localhost:8008/_matrix/client/api/v1/admin/search_users/
             @admin:user?access_token=admin_access_token&term=alice
@@ -453,7 +453,7 @@ class SearchUsersRestServlet(ClientV1RestServlet):
     def on_GET(self, request, target_user_id):
         """Get request to search user table for specific users according to
         search term.
-        This need a user have a administrator access in Synapse.
+        This needs user to have a administrator access in Synapse.
         """
         target_user = UserID.from_string(target_user_id)
         requester = yield self.auth.get_user_by_req(request)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 6a3cfe84f8..943e87e7fd 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -188,13 +188,11 @@ class KeyChangesServlet(RestServlet):
 
         user_id = requester.user.to_string()
 
-        changed = yield self.device_handler.get_user_ids_changed(
+        results = yield self.device_handler.get_user_ids_changed(
             user_id, from_token,
         )
 
-        defer.returnValue((200, {
-            "changed": list(changed),
-        }))
+        defer.returnValue((200, results))
 
 
 class OneTimeKeyServlet(RestServlet):
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 6dcc407451..978af9c280 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -110,7 +110,7 @@ class SyncRestServlet(RestServlet):
         filter_id = parse_string(request, "filter", default=None)
         full_state = parse_boolean(request, "full_state", default=False)
 
-        logger.info(
+        logger.debug(
             "/sync: user=%r, timeout=%r, since=%r,"
             " set_presence=%r, filter_id=%r, device_id=%r" % (
                 user, timeout, since, set_presence, filter_id, device_id
@@ -189,7 +189,8 @@ class SyncRestServlet(RestServlet):
             "account_data": {"events": sync_result.account_data},
             "to_device": {"events": sync_result.to_device},
             "device_lists": {
-                "changed": list(sync_result.device_lists),
+                "changed": list(sync_result.device_lists.changed),
+                "left": list(sync_result.device_lists.left),
             },
             "presence": SyncRestServlet.encode_presence(
                 sync_result.presence, time_now
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 3b5e0a4fb9..87aeaf71d6 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -113,30 +113,37 @@ class KeyStore(SQLBaseStore):
                 keys[key_id] = key
         defer.returnValue(keys)
 
-    @defer.inlineCallbacks
     def store_server_verify_key(self, server_name, from_server, time_now_ms,
                                 verify_key):
         """Stores a NACL verification key for the given server.
         Args:
             server_name (str): The name of the server.
-            key_id (str): The version of the key for the server.
             from_server (str): Where the verification key was looked up
-            ts_now_ms (int): The time now in milliseconds
-            verification_key (VerifyKey): The NACL verify key.
+            time_now_ms (int): The time now in milliseconds
+            verify_key (nacl.signing.VerifyKey): The NACL verify key.
         """
-        yield self._simple_upsert(
-            table="server_signature_keys",
-            keyvalues={
-                "server_name": server_name,
-                "key_id": "%s:%s" % (verify_key.alg, verify_key.version),
-            },
-            values={
-                "from_server": from_server,
-                "ts_added_ms": time_now_ms,
-                "verify_key": buffer(verify_key.encode()),
-            },
-            desc="store_server_verify_key",
-        )
+        key_id = "%s:%s" % (verify_key.alg, verify_key.version)
+
+        def _txn(txn):
+            self._simple_upsert_txn(
+                txn,
+                table="server_signature_keys",
+                keyvalues={
+                    "server_name": server_name,
+                    "key_id": key_id,
+                },
+                values={
+                    "from_server": from_server,
+                    "ts_added_ms": time_now_ms,
+                    "verify_key": buffer(verify_key.encode()),
+                },
+            )
+            txn.call_after(
+                self._get_server_verify_key.invalidate,
+                (server_name, key_id)
+            )
+
+        return self.runInteraction("store_server_verify_key", _txn)
 
     def store_server_keys_json(self, server_name, key_id, from_server,
                                ts_now_ms, ts_expires_ms, key_json_bytes):
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 5590b866ed..d7dbdc77ff 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -43,7 +43,8 @@ MEMBERSHIP_PRIORITY = (
 
 
 @defer.inlineCallbacks
-def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
+def filter_events_for_clients(store, user_tuples, events, event_id_to_state,
+                              always_include_ids=frozenset()):
     """ Returns dict of user_id -> list of events that user is allowed to
     see.
 
@@ -54,6 +55,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
             * the user has not been a member of the room since the
             given events
         events ([synapse.events.EventBase]): list of events to filter
+        always_include_ids (set(event_id)): set of event ids to specifically
+            include (unless sender is ignored)
     """
     forgotten = yield preserve_context_over_deferred(defer.gatherResults([
         defer.maybeDeferred(
@@ -91,6 +94,9 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
         if not event.is_state() and event.sender in ignore_list:
             return False
 
+        if event.event_id in always_include_ids:
+            return True
+
         state = event_id_to_state[event.event_id]
 
         # get the room_visibility at the time of the event.
@@ -189,7 +195,8 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
 
 
 @defer.inlineCallbacks
-def filter_events_for_client(store, user_id, events, is_peeking=False):
+def filter_events_for_client(store, user_id, events, is_peeking=False,
+                             always_include_ids=frozenset()):
     """
     Check which events a user is allowed to see
 
@@ -213,6 +220,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False):
         types=types
     )
     res = yield filter_events_for_clients(
-        store, [(user_id, is_peeking)], events, event_id_to_state
+        store, [(user_id, is_peeking)], events, event_id_to_state,
+        always_include_ids=always_include_ids,
     )
     defer.returnValue(res.get(user_id, []))