summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-05-29 00:25:22 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-05-29 00:25:22 +0100
commit7a6df013cc8a128278d2ce7e5eb569e0b424f9b0 (patch)
tree5de624a65953eb96ab67274462d850a88c0cce3c /synapse/app
parentmake lazy_load_members configurable in filters (diff)
parentMerge pull request #3256 from matrix-org/3218-official-prom (diff)
downloadsynapse-7a6df013cc8a128278d2ce7e5eb569e0b424f9b0.tar.xz
merge develop
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/appservice.py20
-rw-r--r--synapse/app/client_reader.py5
-rw-r--r--synapse/app/event_creator.py5
-rw-r--r--synapse/app/federation_reader.py5
-rw-r--r--synapse/app/federation_sender.py36
-rw-r--r--synapse/app/frontend_proxy.py7
-rwxr-xr-xsynapse/app/homeserver.py73
-rw-r--r--synapse/app/media_repository.py5
-rw-r--r--synapse/app/pusher.py40
-rw-r--r--synapse/app/synchrotron.py109
-rwxr-xr-xsynapse/app/synctl.py5
-rw-r--r--synapse/app/user_dir.py18
12 files changed, 210 insertions, 118 deletions
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index c6fe4516d1..b1efacc9f8 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -32,11 +32,11 @@ from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.internet import reactor, defer
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.appservice")
 
@@ -64,7 +64,7 @@ class AppserviceServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -74,6 +74,7 @@ class AppserviceServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
@@ -112,9 +113,14 @@ class ASReplicationHandler(ReplicationClientHandler):
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
-            preserve_fn(
-                self.appservice_handler.notify_interested_services
-            )(max_stream_id)
+            run_in_background(self._notify_app_services, max_stream_id)
+
+    @defer.inlineCallbacks
+    def _notify_app_services(self, room_stream_id):
+        try:
+            yield self.appservice_handler.notify_interested_services(room_stream_id)
+        except Exception:
+            logger.exception("Error notifying application services of event")
 
 
 def start(config_options):
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 0a8ce9bc66..38b98382c6 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.client_reader")
 
@@ -88,7 +88,7 @@ class ClientReaderServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -98,6 +98,7 @@ class ClientReaderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 172e989b54..bd7f3d5679 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -52,7 +52,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.event_creator")
 
@@ -104,7 +104,7 @@ class EventCreatorServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -114,6 +114,7 @@ class EventCreatorServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 20d157911b..6e10b27b9e 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -41,7 +41,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.federation_reader")
 
@@ -77,7 +77,7 @@ class FederationReaderServer(HomeServer):
                         FEDERATION_PREFIX: TransportLayerServer(self),
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -87,6 +87,7 @@ class FederationReaderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index f760826d27..6f24e32d6d 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,11 +38,11 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.federation_sender")
 
@@ -91,7 +91,7 @@ class FederationSenderServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -101,6 +101,7 @@ class FederationSenderServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
@@ -229,7 +230,7 @@ class FederationSenderHandler(object):
         # presence, typing, etc.
         if stream_name == "federation":
             send_queue.process_rows_for_federation(self.federation_sender, rows)
-            preserve_fn(self.update_token)(token)
+            run_in_background(self.update_token, token)
 
         # We also need to poke the federation sender when new events happen
         elif stream_name == "events":
@@ -237,19 +238,22 @@ class FederationSenderHandler(object):
 
     @defer.inlineCallbacks
     def update_token(self, token):
-        self.federation_position = token
-
-        # We linearize here to ensure we don't have races updating the token
-        with (yield self._fed_position_linearizer.queue(None)):
-            if self._last_ack < self.federation_position:
-                yield self.store.update_federation_out_pos(
-                    "federation", self.federation_position
-                )
+        try:
+            self.federation_position = token
+
+            # We linearize here to ensure we don't have races updating the token
+            with (yield self._fed_position_linearizer.queue(None)):
+                if self._last_ack < self.federation_position:
+                    yield self.store.update_federation_out_pos(
+                        "federation", self.federation_position
+                    )
 
-                # We ACK this token over replication so that the master can drop
-                # its in memory queues
-                self.replication_client.send_federation_ack(self.federation_position)
-                self._last_ack = self.federation_position
+                    # We ACK this token over replication so that the master can drop
+                    # its in memory queues
+                    self.replication_client.send_federation_ack(self.federation_position)
+                    self._last_ack = self.federation_position
+        except Exception:
+            logger.exception("Error updating federation stream position")
 
 
 if __name__ == '__main__':
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 816c080d18..0f700ee786 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -44,7 +44,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
@@ -90,7 +90,7 @@ class KeyUploadServlet(RestServlet):
             # They're actually trying to upload something, proxy to main synapse.
             # Pass through the auth headers, if any, in case the access token
             # is there.
-            auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+            auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
             headers = {
                 "Authorization": auth_headers,
             }
@@ -142,7 +142,7 @@ class FrontendProxyServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -152,6 +152,7 @@ class FrontendProxyServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e477c7ced6..449bfacdb9 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -34,8 +34,8 @@ from synapse.module_api import ModuleApi
 from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import RootRedirect
 from synapse.http.site import SynapseSite
-from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX
 from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
     check_requirements
 from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
@@ -48,6 +48,7 @@ from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
@@ -56,10 +57,12 @@ from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 from twisted.application import service
 from twisted.internet import defer, reactor
-from twisted.web.resource import EncodingResourceWrapper, Resource
+from twisted.web.resource import EncodingResourceWrapper, NoResource
 from twisted.web.server import GzipEncoderFactory
 from twisted.web.static import File
 
+from prometheus_client.twisted import MetricsResource
+
 logger = logging.getLogger("synapse.app.homeserver")
 
 
@@ -126,7 +129,7 @@ class SynapseHomeServer(HomeServer):
         if WEB_CLIENT_PREFIX in resources:
             root_resource = RootRedirect(WEB_CLIENT_PREFIX)
         else:
-            root_resource = Resource()
+            root_resource = NoResource()
 
         root_resource = create_resource_tree(resources, root_resource)
 
@@ -139,6 +142,7 @@ class SynapseHomeServer(HomeServer):
                     site_tag,
                     listener_config,
                     root_resource,
+                    self.version_string,
                 ),
                 self.tls_server_context_factory,
             )
@@ -152,6 +156,7 @@ class SynapseHomeServer(HomeServer):
                     site_tag,
                     listener_config,
                     root_resource,
+                    self.version_string,
                 )
             )
         logger.info("Synapse now listening on port %d", port)
@@ -181,6 +186,15 @@ class SynapseHomeServer(HomeServer):
                 "/_matrix/client/versions": client_resource,
             })
 
+        if name == "consent":
+            from synapse.rest.consent.consent_resource import ConsentResource
+            consent_resource = ConsentResource(self)
+            if compress:
+                consent_resource = gz_wrap(consent_resource)
+            resources.update({
+                "/_matrix/consent": consent_resource,
+            })
+
         if name == "federation":
             resources.update({
                 FEDERATION_PREFIX: TransportLayerServer(self),
@@ -218,7 +232,7 @@ class SynapseHomeServer(HomeServer):
             resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
 
         if name == "metrics" and self.get_config().enable_metrics:
-            resources[METRICS_PREFIX] = MetricsResource(self)
+            resources[METRICS_PREFIX] = MetricsResource(RegistryProxy())
 
         if name == "replication":
             resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
@@ -350,8 +364,6 @@ def setup(config_options):
         hs.get_datastore().start_doing_background_updates()
         hs.get_federation_client().start_get_pdu_cache()
 
-        register_memory_metrics(hs)
-
     reactor.callWhenRunning(start)
 
     return hs
@@ -402,6 +414,10 @@ def run(hs):
 
     stats = {}
 
+    # Contains the list of processes we will be monitoring
+    # currently either 0 or 1
+    stats_process = []
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -425,8 +441,21 @@ def run(hs):
         stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
         stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
+        r30_results = yield hs.get_datastore().count_r30_users()
+        for name, count in r30_results.iteritems():
+            stats["r30_users_" + name] = count
+
         daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
         stats["daily_sent_messages"] = daily_sent_messages
+        stats["cache_factor"] = CACHE_SIZE_FACTOR
+        stats["event_cache_size"] = hs.config.event_cache_size
+
+        if len(stats_process) > 0:
+            stats["memory_rss"] = 0
+            stats["cpu_average"] = 0
+            for process in stats_process:
+                stats["memory_rss"] += process.memory_info().rss
+                stats["cpu_average"] += int(process.cpu_percent(interval=None))
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
@@ -437,10 +466,40 @@ def run(hs):
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
 
+    def performance_stats_init():
+        try:
+            import psutil
+            process = psutil.Process()
+            # Ensure we can fetch both, and make the initial request for cpu_percent
+            # so the next request will use this as the initial point.
+            process.memory_info().rss
+            process.cpu_percent(interval=None)
+            logger.info("report_stats can use psutil")
+            stats_process.append(process)
+        except (ImportError, AttributeError):
+            logger.warn(
+                "report_stats enabled but psutil is not installed or incorrect version."
+                " Disabling reporting of memory/cpu stats."
+                " Ensuring psutil is available will help matrix.org track performance"
+                " changes across releases."
+            )
+
+    def generate_user_daily_visit_stats():
+        hs.get_datastore().generate_user_daily_visits()
+
+    # Rather than update on per session basis, batch up the requests.
+    # If you increase the loop period, the accuracy of user_daily_visits
+    # table will decrease
+    clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
+
     if hs.config.report_stats:
         logger.info("Scheduling stats reporting for 3 hour intervals")
         clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
 
+        # We need to defer this init for the cases that we daemonize
+        # otherwise the process ID we get is that of the non-daemon process
+        clock.call_later(0, performance_stats_init)
+
         # We wait 5 minutes to send the first set of stats as the server can
         # be quite busy the first few minutes
         clock.call_later(5 * 60, phone_stats_home)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 84c5791b3b..9c93195f0a 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -43,7 +43,7 @@ from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.media_repository")
 
@@ -84,7 +84,7 @@ class MediaRepositoryServer(HomeServer):
                         ),
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -94,6 +94,7 @@ class MediaRepositoryServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 98a4a7c62c..3912eae48c 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -33,11 +33,11 @@ from synapse.server import HomeServer
 from synapse.storage import DataStore
 from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.pusher")
 
@@ -94,7 +94,7 @@ class PusherServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -104,6 +104,7 @@ class PusherServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
@@ -140,24 +141,27 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
-        preserve_fn(self.poke_pushers)(stream_name, token, rows)
+        run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
     def poke_pushers(self, stream_name, token, rows):
-        if stream_name == "pushers":
-            for row in rows:
-                if row.deleted:
-                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
-                else:
-                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
-        elif stream_name == "events":
-            yield self.pusher_pool.on_new_notifications(
-                token, token,
-            )
-        elif stream_name == "receipts":
-            yield self.pusher_pool.on_new_receipts(
-                token, token, set(row.room_id for row in rows)
-            )
+        try:
+            if stream_name == "pushers":
+                for row in rows:
+                    if row.deleted:
+                        yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                    else:
+                        yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+            elif stream_name == "events":
+                yield self.pusher_pool.on_new_notifications(
+                    token, token,
+                )
+            elif stream_name == "receipts":
+                yield self.pusher_pool.on_new_receipts(
+                    token, token, set(row.room_id for row in rows)
+                )
+        except Exception:
+            logger.exception("Error poking pushers")
 
     def stop_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index abe91dcfbd..c6294a7a0c 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -51,12 +51,14 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
 from synapse.util.versionstring import get_version_string
 from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
+
+from six import iteritems
 
 logger = logging.getLogger("synapse.app.synchrotron")
 
@@ -211,7 +213,7 @@ class SynchrotronPresence(object):
 
     def get_currently_syncing_users(self):
         return [
-            user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
             if count > 0
         ]
 
@@ -269,7 +271,7 @@ class SynchrotronServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -279,6 +281,7 @@ class SynchrotronServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
@@ -325,8 +328,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
-
-        preserve_fn(self.process_and_notify)(stream_name, token, rows)
+        run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
         args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -338,55 +340,58 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
     @defer.inlineCallbacks
     def process_and_notify(self, stream_name, token, rows):
-        if stream_name == "events":
-            # We shouldn't get multiple rows per token for events stream, so
-            # we don't need to optimise this for multiple rows.
-            for row in rows:
-                event = yield self.store.get_event(row.event_id)
-                extra_users = ()
-                if event.type == EventTypes.Member:
-                    extra_users = (event.state_key,)
-                max_token = self.store.get_room_max_stream_ordering()
-                self.notifier.on_new_room_event(
-                    event, token, max_token, extra_users
+        try:
+            if stream_name == "events":
+                # We shouldn't get multiple rows per token for events stream, so
+                # we don't need to optimise this for multiple rows.
+                for row in rows:
+                    event = yield self.store.get_event(row.event_id)
+                    extra_users = ()
+                    if event.type == EventTypes.Member:
+                        extra_users = (event.state_key,)
+                    max_token = self.store.get_room_max_stream_ordering()
+                    self.notifier.on_new_room_event(
+                        event, token, max_token, extra_users
+                    )
+            elif stream_name == "push_rules":
+                self.notifier.on_new_event(
+                    "push_rules_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "push_rules":
-            self.notifier.on_new_event(
-                "push_rules_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name in ("account_data", "tag_account_data",):
-            self.notifier.on_new_event(
-                "account_data_key", token, users=[row.user_id for row in rows],
-            )
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "receipt_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "typing":
-            self.typing_handler.process_replication_rows(token, rows)
-            self.notifier.on_new_event(
-                "typing_key", token, rooms=[row.room_id for row in rows],
-            )
-        elif stream_name == "to_device":
-            entities = [row.entity for row in rows if row.entity.startswith("@")]
-            if entities:
+            elif stream_name in ("account_data", "tag_account_data",):
                 self.notifier.on_new_event(
-                    "to_device_key", token, users=entities,
+                    "account_data_key", token, users=[row.user_id for row in rows],
                 )
-        elif stream_name == "device_lists":
-            all_room_ids = set()
-            for row in rows:
-                room_ids = yield self.store.get_rooms_for_user(row.user_id)
-                all_room_ids.update(room_ids)
-            self.notifier.on_new_event(
-                "device_list_key", token, rooms=all_room_ids,
-            )
-        elif stream_name == "presence":
-            yield self.presence_handler.process_replication_rows(token, rows)
-        elif stream_name == "receipts":
-            self.notifier.on_new_event(
-                "groups_key", token, users=[row.user_id for row in rows],
-            )
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "typing":
+                self.typing_handler.process_replication_rows(token, rows)
+                self.notifier.on_new_event(
+                    "typing_key", token, rooms=[row.room_id for row in rows],
+                )
+            elif stream_name == "to_device":
+                entities = [row.entity for row in rows if row.entity.startswith("@")]
+                if entities:
+                    self.notifier.on_new_event(
+                        "to_device_key", token, users=entities,
+                    )
+            elif stream_name == "device_lists":
+                all_room_ids = set()
+                for row in rows:
+                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    all_room_ids.update(room_ids)
+                self.notifier.on_new_event(
+                    "device_list_key", token, rooms=all_room_ids,
+                )
+            elif stream_name == "presence":
+                yield self.presence_handler.process_replication_rows(token, rows)
+            elif stream_name == "receipts":
+                self.notifier.on_new_event(
+                    "groups_key", token, users=[row.user_id for row in rows],
+                )
+        except Exception:
+            logger.exception("Error processing replication")
 
 
 def start(config_options):
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 0f0ddfa78a..712dfa870e 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -38,7 +38,7 @@ def pid_running(pid):
     try:
         os.kill(pid, 0)
         return True
-    except OSError, err:
+    except OSError as err:
         if err.errno == errno.EPERM:
             return True
         return False
@@ -98,7 +98,7 @@ def stop(pidfile, app):
         try:
             os.kill(pid, signal.SIGTERM)
             write("stopped %s" % (app,), colour=GREEN)
-        except OSError, err:
+        except OSError as err:
             if err.errno == errno.ESRCH:
                 write("%s not running" % (app,), colour=YELLOW)
             elif err.errno == errno.EPERM:
@@ -252,6 +252,7 @@ def main():
             for running_pid in running_pids:
                 while pid_running(running_pid):
                     time.sleep(0.2)
+            write("All processes exited; now restarting...")
 
     if action == "start" or action == "restart":
         if start_stop_synapse:
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 494ccb702c..53eb3474da 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -39,11 +39,11 @@ from synapse.storage.engines import create_engine
 from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import Resource
+from twisted.internet import reactor, defer
+from twisted.web.resource import NoResource
 
 logger = logging.getLogger("synapse.app.user_dir")
 
@@ -116,7 +116,7 @@ class UserDirectoryServer(HomeServer):
                         "/_matrix/client/api/v1": resource,
                     })
 
-        root_resource = create_resource_tree(resources, Resource())
+        root_resource = create_resource_tree(resources, NoResource())
 
         _base.listen_tcp(
             bind_addresses,
@@ -126,6 +126,7 @@ class UserDirectoryServer(HomeServer):
                 site_tag,
                 listener_config,
                 root_resource,
+                self.version_string,
             )
         )
 
@@ -164,7 +165,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
-            preserve_fn(self.user_directory.notify_new_event)()
+            run_in_background(self._notify_directory)
+
+    @defer.inlineCallbacks
+    def _notify_directory(self):
+        try:
+            yield self.user_directory.notify_new_event()
+        except Exception:
+            logger.exception("Error notifiying user directory of state update")
 
 
 def start(config_options):