summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/__init__.py2
-rw-r--r--synapse/app/_base.py30
-rw-r--r--synapse/app/appservice.py8
-rw-r--r--synapse/app/client_reader.py8
-rw-r--r--synapse/app/event_creator.py23
-rw-r--r--synapse/app/federation_reader.py8
-rw-r--r--synapse/app/federation_sender.py8
-rw-r--r--synapse/app/frontend_proxy.py12
-rwxr-xr-xsynapse/app/homeserver.py75
-rw-r--r--synapse/app/media_repository.py8
-rw-r--r--synapse/app/pusher.py29
-rw-r--r--synapse/app/synchrotron.py20
-rwxr-xr-xsynapse/app/synctl.py284
-rw-r--r--synapse/app/user_dir.py8
14 files changed, 115 insertions, 408 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 3b6b9368b8..c3afcc573b 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -24,7 +24,7 @@ try:
     python_dependencies.check_requirements()
 except python_dependencies.MissingRequirementError as e:
     message = "\n".join([
-        "Missing Requirement: %s" % (e.message,),
+        "Missing Requirement: %s" % (str(e),),
         "To install run:",
         "    pip install --upgrade --force \"%s\"" % (e.dependency,),
         "",
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 7c866e246a..18584226e9 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,6 +17,7 @@ import gc
 import logging
 import sys
 
+import psutil
 from daemonize import Daemonize
 
 from twisted.internet import error, reactor
@@ -24,12 +25,6 @@ from twisted.internet import error, reactor
 from synapse.util import PreserveLoggingContext
 from synapse.util.rlimit import change_resource_limit
 
-try:
-    import affinity
-except Exception:
-    affinity = None
-
-
 logger = logging.getLogger(__name__)
 
 
@@ -89,15 +84,20 @@ def start_reactor(
         with PreserveLoggingContext():
             logger.info("Running")
             if cpu_affinity is not None:
-                if not affinity:
-                    quit_with_error(
-                        "Missing package 'affinity' required for cpu_affinity\n"
-                        "option\n\n"
-                        "Install by running:\n\n"
-                        "   pip install affinity\n\n"
-                    )
-                logger.info("Setting CPU affinity to %s" % cpu_affinity)
-                affinity.set_process_affinity_mask(0, cpu_affinity)
+                # Turn the bitmask into bits, reverse it so we go from 0 up
+                mask_to_bits = bin(cpu_affinity)[2:][::-1]
+
+                cpus = []
+                cpu_num = 0
+
+                for i in mask_to_bits:
+                    if i == "1":
+                        cpus.append(cpu_num)
+                    cpu_num += 1
+
+                p = psutil.Process()
+                p.cpu_affinity(cpus)
+
             change_resource_limit(soft_file_limit)
             if gc_thresholds:
                 gc.set_threshold(*gc_thresholds)
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 3348a8ec6d..8559e141af 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -51,10 +51,7 @@ class AppserviceSlaveStore(
 
 
 class AppserviceServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = AppserviceSlaveStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -139,7 +136,7 @@ def start(config_options):
             "Synapse appservice", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.appservice"
@@ -175,7 +172,6 @@ def start(config_options):
 
     def start():
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ab79a45646..76aed8c60a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -74,10 +74,7 @@ class ClientReaderSlavedStore(
 
 
 class ClientReaderServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = ClientReaderSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -156,7 +153,7 @@ def start(config_options):
             "Synapse client reader", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.client_reader"
@@ -184,7 +181,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 03d39968a8..e4a68715aa 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -45,6 +45,11 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.profile import (
+    ProfileAvatarURLRestServlet,
+    ProfileDisplaynameRestServlet,
+    ProfileRestServlet,
+)
 from synapse.rest.client.v1.room import (
     JoinRoomAliasServlet,
     RoomMembershipRestServlet,
@@ -53,6 +58,7 @@ from synapse.rest.client.v1.room import (
 )
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
+from synapse.storage.user_directory import UserDirectoryStore
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
 from synapse.util.manhole import manhole
@@ -62,6 +68,9 @@ logger = logging.getLogger("synapse.app.event_creator")
 
 
 class EventCreatorSlavedStore(
+    # FIXME(#3714): We need to add UserDirectoryStore as we write directly
+    # rather than going via the correct worker.
+    UserDirectoryStore,
     DirectoryStore,
     SlavedTransactionStore,
     SlavedProfileStore,
@@ -81,10 +90,7 @@ class EventCreatorSlavedStore(
 
 
 class EventCreatorServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = EventCreatorSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -101,6 +107,9 @@ class EventCreatorServer(HomeServer):
                     RoomMembershipRestServlet(self).register(resource)
                     RoomStateEventRestServlet(self).register(resource)
                     JoinRoomAliasServlet(self).register(resource)
+                    ProfileAvatarURLRestServlet(self).register(resource)
+                    ProfileDisplaynameRestServlet(self).register(resource)
+                    ProfileRestServlet(self).register(resource)
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
@@ -160,7 +169,7 @@ def start(config_options):
             "Synapse event creator", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.event_creator"
@@ -169,6 +178,9 @@ def start(config_options):
 
     setup_logging(config, use_worker_options=True)
 
+    # This should only be done on the user directory worker or the master
+    config.update_user_directory = False
+
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     database_engine = create_engine(config.database_config)
@@ -190,7 +202,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7d8105778d..228a297fb8 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -72,10 +72,7 @@ class FederationReaderSlavedStore(
 
 
 class FederationReaderServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = FederationReaderSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -143,7 +140,7 @@ def start(config_options):
             "Synapse federation reader", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.federation_reader"
@@ -171,7 +168,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index d59007099b..e9a99d76e1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -78,10 +78,7 @@ class FederationSenderSlaveStore(
 
 
 class FederationSenderServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = FederationSenderSlaveStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -163,7 +160,7 @@ def start(config_options):
             "Synapse federation sender", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.federation_sender"
@@ -204,7 +201,6 @@ def start(config_options):
 
     def start():
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
     _base.start_worker_reactor("synapse-federation-sender", config)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 8d484c1cd4..f5c61dec5b 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet):
             "Authorization": auth_headers,
         }
         result = yield self.http_client.get_json(
-            self.main_uri + request.uri,
+            self.main_uri + request.uri.decode('ascii'),
             headers=headers,
         )
         defer.returnValue((200, result))
@@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet):
                 "Authorization": auth_headers,
             }
             result = yield self.http_client.post_json_get_json(
-                self.main_uri + request.uri,
+                self.main_uri + request.uri.decode('ascii'),
                 body,
                 headers=headers,
             )
@@ -148,10 +148,7 @@ class FrontendProxySlavedStore(
 
 
 class FrontendProxyServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = FrontendProxySlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -231,7 +228,7 @@ def start(config_options):
             "Synapse frontend proxy", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.frontend_proxy"
@@ -261,7 +258,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 005921dcf7..593e1e75db 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -20,6 +20,7 @@ import sys
 
 from six import iteritems
 
+import psutil
 from prometheus_client import Gauge
 
 from twisted.application import service
@@ -62,7 +63,7 @@ from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
-from synapse.storage import are_all_users_on_domain
+from synapse.storage import DataStore, are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
 from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -111,6 +112,8 @@ def build_resource_for_web_client(hs):
 
 
 class SynapseHomeServer(HomeServer):
+    DATASTORE_CLASS = DataStore
+
     def _listener_http(self, config, listener_config):
         port = listener_config["port"]
         bind_addresses = listener_config["bind_addresses"]
@@ -299,12 +302,16 @@ class SynapseHomeServer(HomeServer):
         try:
             database_engine.check_database(db_conn.cursor())
         except IncorrectDatabaseSetup as e:
-            quit_with_error(e.message)
+            quit_with_error(str(e))
 
 
 # Gauges to expose monthly active user control metrics
 current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
 max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
+registered_reserved_users_mau_gauge = Gauge(
+    "synapse_admin_mau:registered_reserved_users",
+    "Registered users with reserved threepids"
+)
 
 
 def setup(config_options):
@@ -322,7 +329,7 @@ def setup(config_options):
             config_options,
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     if not config:
@@ -356,13 +363,13 @@ def setup(config_options):
     logger.info("Preparing database: %s...", config.database_config['name'])
 
     try:
-        db_conn = hs.get_db_conn(run_new_connection=False)
-        prepare_database(db_conn, database_engine, config=config)
-        database_engine.on_new_connection(db_conn)
+        with hs.get_db_conn(run_new_connection=False) as db_conn:
+            prepare_database(db_conn, database_engine, config=config)
+            database_engine.on_new_connection(db_conn)
 
-        hs.run_startup_checks(db_conn, database_engine)
+            hs.run_startup_checks(db_conn, database_engine)
 
-        db_conn.commit()
+            db_conn.commit()
     except UpgradeDatabaseException:
         sys.stderr.write(
             "\nFailed to upgrade database.\n"
@@ -378,10 +385,8 @@ def setup(config_options):
 
     def start():
         hs.get_pusherpool().start()
-        hs.get_state_handler().start_caching()
         hs.get_datastore().start_profiling()
         hs.get_datastore().start_doing_background_updates()
-        hs.get_federation_client().start_get_pdu_cache()
 
     reactor.callWhenRunning(start)
 
@@ -451,6 +456,10 @@ def run(hs):
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
+        version = sys.version_info
+        stats["python_version"] = "{}.{}.{}".format(
+            version.major, version.minor, version.micro
+        )
         stats["total_users"] = yield hs.get_datastore().count_all_users()
 
         total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
@@ -494,7 +503,6 @@ def run(hs):
 
     def performance_stats_init():
         try:
-            import psutil
             process = psutil.Process()
             # Ensure we can fetch both, and make the initial request for cpu_percent
             # so the next request will use this as the initial point.
@@ -502,12 +510,9 @@ def run(hs):
             process.cpu_percent(interval=None)
             logger.info("report_stats can use psutil")
             stats_process.append(process)
-        except (ImportError, AttributeError):
-            logger.warn(
-                "report_stats enabled but psutil is not installed or incorrect version."
-                " Disabling reporting of memory/cpu stats."
-                " Ensuring psutil is available will help matrix.org track performance"
-                " changes across releases."
+        except (AttributeError):
+            logger.warning(
+                "Unable to read memory/cpu stats. Disabling reporting."
             )
 
     def generate_user_daily_visit_stats():
@@ -522,25 +527,35 @@ def run(hs):
     clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
 
     # monthly active user limiting functionality
-    clock.looping_call(
-        hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
-    )
-    hs.get_datastore().reap_monthly_active_users()
+    def reap_monthly_active_users():
+        return run_as_background_process(
+            "reap_monthly_active_users",
+            hs.get_datastore().reap_monthly_active_users,
+        )
+    clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
+    reap_monthly_active_users()
 
     @defer.inlineCallbacks
     def generate_monthly_active_users():
-        count = 0
+        current_mau_count = 0
+        reserved_count = 0
+        store = hs.get_datastore()
         if hs.config.limit_usage_by_mau:
-            count = yield hs.get_datastore().get_monthly_active_count()
-        current_mau_gauge.set(float(count))
+            current_mau_count = yield store.get_monthly_active_count()
+            reserved_count = yield store.get_registered_reserved_users_count()
+        current_mau_gauge.set(float(current_mau_count))
+        registered_reserved_users_mau_gauge.set(float(reserved_count))
         max_mau_gauge.set(float(hs.config.max_mau_value))
 
-    hs.get_datastore().initialise_reserved_users(
-        hs.config.mau_limits_reserved_threepids
-    )
-    generate_monthly_active_users()
+    def start_generate_monthly_active_users():
+        return run_as_background_process(
+            "generate_monthly_active_users",
+            generate_monthly_active_users,
+        )
+
+    start_generate_monthly_active_users()
     if hs.config.limit_usage_by_mau:
-        clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+        clock.looping_call(start_generate_monthly_active_users, 5 * 60 * 1000)
     # End of monthly active user settings
 
     if hs.config.report_stats:
@@ -556,7 +571,7 @@ def run(hs):
         clock.call_later(5 * 60, start_phone_stats_home)
 
     if hs.config.daemonize and hs.config.print_pidfile:
-        print (hs.config.pid_file)
+        print(hs.config.pid_file)
 
     _base.start_reactor(
         "synapse-homeserver",
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index fd1f6cbf7e..acc0487adc 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -60,10 +60,7 @@ class MediaRepositorySlavedStore(
 
 
 class MediaRepositoryServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = MediaRepositorySlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -136,7 +133,7 @@ def start(config_options):
             "Synapse media repository", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.media_repository"
@@ -171,7 +168,6 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def start():
-        ss.get_state_handler().start_caching()
         ss.get_datastore().start_profiling()
 
     reactor.callWhenRunning(start)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index a4fc7e91fa..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,39 +50,36 @@ class PusherSlaveStore(
     SlavedAccountDataStore
 ):
     update_pusher_last_stream_ordering_and_success = (
-        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
     )
 
     update_pusher_failing_since = (
-        DataStore.update_pusher_failing_since.__func__
+        __func__(DataStore.update_pusher_failing_since)
     )
 
     update_pusher_last_stream_ordering = (
-        DataStore.update_pusher_last_stream_ordering.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering)
     )
 
     get_throttle_params_by_room = (
-        DataStore.get_throttle_params_by_room.__func__
+        __func__(DataStore.get_throttle_params_by_room)
     )
 
     set_throttle_params = (
-        DataStore.set_throttle_params.__func__
+        __func__(DataStore.set_throttle_params)
     )
 
     get_time_of_last_push_action_before = (
-        DataStore.get_time_of_last_push_action_before.__func__
+        __func__(DataStore.get_time_of_last_push_action_before)
     )
 
     get_profile_displayname = (
-        DataStore.get_profile_displayname.__func__
+        __func__(DataStore.get_profile_displayname)
     )
 
 
 class PusherServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = PusherSlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = PusherSlaveStore
 
     def remove_pusher(self, app_id, push_key, user_id):
         self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -163,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                self.pusher_pool.on_new_notifications(
+                yield self.pusher_pool.on_new_notifications(
                     token, token,
                 )
             elif stream_name == "receipts":
-                self.pusher_pool.on_new_receipts(
+                yield self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
                 )
         except Exception:
@@ -185,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
     def start_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
         logger.info("Starting pusher %r / %r", user_id, key)
-        return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+        return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
 
 
 def start(config_options):
@@ -194,7 +192,7 @@ def start(config_options):
             "Synapse pusher", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.pusher"
@@ -231,7 +229,6 @@ def start(config_options):
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 27e1998660..3926c7f263 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -147,7 +147,7 @@ class SynchrotronPresence(object):
         and haven't come back yet. If there are poke the master about them.
         """
         now = self.clock.time_msec()
-        for user_id, last_sync_ms in self.users_going_offline.items():
+        for user_id, last_sync_ms in list(self.users_going_offline.items()):
             if now - last_sync_ms > 10 * 1000:
                 self.users_going_offline.pop(user_id, None)
                 self.send_user_sync(user_id, False, last_sync_ms)
@@ -156,9 +156,9 @@ class SynchrotronPresence(object):
         # TODO Hows this supposed to work?
         pass
 
-    get_states = PresenceHandler.get_states.__func__
-    get_state = PresenceHandler.get_state.__func__
-    current_state_for_users = PresenceHandler.current_state_for_users.__func__
+    get_states = __func__(PresenceHandler.get_states)
+    get_state = __func__(PresenceHandler.get_state)
+    current_state_for_users = __func__(PresenceHandler.current_state_for_users)
 
     def user_syncing(self, user_id, affect_presence):
         if affect_presence:
@@ -208,7 +208,7 @@ class SynchrotronPresence(object):
         ) for row in rows]
 
         for state in states:
-            self.user_to_current_state[row.user_id] = state
+            self.user_to_current_state[state.user_id] = state
 
         stream_id = token
         yield self.notify_from_replication(states, stream_id)
@@ -249,10 +249,7 @@ class SynchrotronApplicationService(object):
 
 
 class SynchrotronServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = SynchrotronSlavedStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -413,7 +410,7 @@ def start(config_options):
             "Synapse synchrotron", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.synchrotron"
@@ -438,7 +435,6 @@ def start(config_options):
 
     def start():
         ss.get_datastore().start_profiling()
-        ss.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)
 
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
deleted file mode 100755
index d658f967ba..0000000000
--- a/synapse/app/synctl.py
+++ /dev/null
@@ -1,284 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import argparse
-import collections
-import errno
-import glob
-import os
-import os.path
-import signal
-import subprocess
-import sys
-import time
-
-from six import iteritems
-
-import yaml
-
-SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
-
-GREEN = "\x1b[1;32m"
-YELLOW = "\x1b[1;33m"
-RED = "\x1b[1;31m"
-NORMAL = "\x1b[m"
-
-
-def pid_running(pid):
-    try:
-        os.kill(pid, 0)
-        return True
-    except OSError as err:
-        if err.errno == errno.EPERM:
-            return True
-        return False
-
-
-def write(message, colour=NORMAL, stream=sys.stdout):
-    if colour == NORMAL:
-        stream.write(message + "\n")
-    else:
-        stream.write(colour + message + NORMAL + "\n")
-
-
-def abort(message, colour=RED, stream=sys.stderr):
-    write(message, colour, stream)
-    sys.exit(1)
-
-
-def start(configfile):
-    write("Starting ...")
-    args = SYNAPSE
-    args.extend(["--daemonize", "-c", configfile])
-
-    try:
-        subprocess.check_call(args)
-        write("started synapse.app.homeserver(%r)" %
-              (configfile,), colour=GREEN)
-    except subprocess.CalledProcessError as e:
-        write(
-            "error starting (exit code: %d); see above for logs" % e.returncode,
-            colour=RED,
-        )
-
-
-def start_worker(app, configfile, worker_configfile):
-    args = [
-        "python", "-B",
-        "-m", app,
-        "-c", configfile,
-        "-c", worker_configfile
-    ]
-
-    try:
-        subprocess.check_call(args)
-        write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
-    except subprocess.CalledProcessError as e:
-        write(
-            "error starting %s(%r) (exit code: %d); see above for logs" % (
-                app, worker_configfile, e.returncode,
-            ),
-            colour=RED,
-        )
-
-
-def stop(pidfile, app):
-    if os.path.exists(pidfile):
-        pid = int(open(pidfile).read())
-        try:
-            os.kill(pid, signal.SIGTERM)
-            write("stopped %s" % (app,), colour=GREEN)
-        except OSError as err:
-            if err.errno == errno.ESRCH:
-                write("%s not running" % (app,), colour=YELLOW)
-            elif err.errno == errno.EPERM:
-                abort("Cannot stop %s: Operation not permitted" % (app,))
-            else:
-                abort("Cannot stop %s: Unknown error" % (app,))
-
-
-Worker = collections.namedtuple("Worker", [
-    "app", "configfile", "pidfile", "cache_factor"
-])
-
-
-def main():
-
-    parser = argparse.ArgumentParser()
-
-    parser.add_argument(
-        "action",
-        choices=["start", "stop", "restart"],
-        help="whether to start, stop or restart the synapse",
-    )
-    parser.add_argument(
-        "configfile",
-        nargs="?",
-        default="homeserver.yaml",
-        help="the homeserver config file, defaults to homeserver.yaml",
-    )
-    parser.add_argument(
-        "-w", "--worker",
-        metavar="WORKERCONFIG",
-        help="start or stop a single worker",
-    )
-    parser.add_argument(
-        "-a", "--all-processes",
-        metavar="WORKERCONFIGDIR",
-        help="start or stop all the workers in the given directory"
-             " and the main synapse process",
-    )
-
-    options = parser.parse_args()
-
-    if options.worker and options.all_processes:
-        write(
-            'Cannot use "--worker" with "--all-processes"',
-            stream=sys.stderr
-        )
-        sys.exit(1)
-
-    configfile = options.configfile
-
-    if not os.path.exists(configfile):
-        write(
-            "No config file found\n"
-            "To generate a config file, run '%s -c %s --generate-config"
-            " --server-name=<server name>'\n" % (
-                " ".join(SYNAPSE), options.configfile
-            ),
-            stream=sys.stderr,
-        )
-        sys.exit(1)
-
-    with open(configfile) as stream:
-        config = yaml.load(stream)
-
-    pidfile = config["pid_file"]
-    cache_factor = config.get("synctl_cache_factor")
-    start_stop_synapse = True
-
-    if cache_factor:
-        os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-
-    cache_factors = config.get("synctl_cache_factors", {})
-    for cache_name, factor in iteritems(cache_factors):
-        os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
-
-    worker_configfiles = []
-    if options.worker:
-        start_stop_synapse = False
-        worker_configfile = options.worker
-        if not os.path.exists(worker_configfile):
-            write(
-                "No worker config found at %r" % (worker_configfile,),
-                stream=sys.stderr,
-            )
-            sys.exit(1)
-        worker_configfiles.append(worker_configfile)
-
-    if options.all_processes:
-        # To start the main synapse with -a you need to add a worker file
-        # with worker_app == "synapse.app.homeserver"
-        start_stop_synapse = False
-        worker_configdir = options.all_processes
-        if not os.path.isdir(worker_configdir):
-            write(
-                "No worker config directory found at %r" % (worker_configdir,),
-                stream=sys.stderr,
-            )
-            sys.exit(1)
-        worker_configfiles.extend(sorted(glob.glob(
-            os.path.join(worker_configdir, "*.yaml")
-        )))
-
-    workers = []
-    for worker_configfile in worker_configfiles:
-        with open(worker_configfile) as stream:
-            worker_config = yaml.load(stream)
-        worker_app = worker_config["worker_app"]
-        if worker_app == "synapse.app.homeserver":
-            # We need to special case all of this to pick up options that may
-            # be set in the main config file or in this worker config file.
-            worker_pidfile = (
-                worker_config.get("pid_file")
-                or pidfile
-            )
-            worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
-            daemonize = worker_config.get("daemonize") or config.get("daemonize")
-            assert daemonize, "Main process must have daemonize set to true"
-
-            # The master process doesn't support using worker_* config.
-            for key in worker_config:
-                if key == "worker_app":  # But we allow worker_app
-                    continue
-                assert not key.startswith("worker_"), \
-                    "Main process cannot use worker_* config"
-        else:
-            worker_pidfile = worker_config["worker_pid_file"]
-            worker_daemonize = worker_config["worker_daemonize"]
-            assert worker_daemonize, "In config %r: expected '%s' to be True" % (
-                worker_configfile, "worker_daemonize")
-            worker_cache_factor = worker_config.get("synctl_cache_factor")
-        workers.append(Worker(
-            worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
-        ))
-
-    action = options.action
-
-    if action == "stop" or action == "restart":
-        for worker in workers:
-            stop(worker.pidfile, worker.app)
-
-        if start_stop_synapse:
-            stop(pidfile, "synapse.app.homeserver")
-
-    # Wait for synapse to actually shutdown before starting it again
-    if action == "restart":
-        running_pids = []
-        if start_stop_synapse and os.path.exists(pidfile):
-            running_pids.append(int(open(pidfile).read()))
-        for worker in workers:
-            if os.path.exists(worker.pidfile):
-                running_pids.append(int(open(worker.pidfile).read()))
-        if len(running_pids) > 0:
-            write("Waiting for process to exit before restarting...")
-            for running_pid in running_pids:
-                while pid_running(running_pid):
-                    time.sleep(0.2)
-            write("All processes exited; now restarting...")
-
-    if action == "start" or action == "restart":
-        if start_stop_synapse:
-            # Check if synapse is already running
-            if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
-                abort("synapse.app.homeserver already running")
-            start(configfile)
-
-        for worker in workers:
-            if worker.cache_factor:
-                os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
-
-            start_worker(worker.app, configfile, worker.configfile)
-
-            if cache_factor:
-                os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
-            else:
-                os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
-
-
-if __name__ == "__main__":
-    main()
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 1388a42b59..0a5f62b509 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -94,10 +94,7 @@ class UserDirectorySlaveStore(
 
 
 class UserDirectoryServer(HomeServer):
-    def setup(self):
-        logger.info("Setting up.")
-        self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
-        logger.info("Finished setting up.")
+    DATASTORE_CLASS = UserDirectorySlaveStore
 
     def _listen_http(self, listener_config):
         port = listener_config["port"]
@@ -191,7 +188,7 @@ def start(config_options):
             "Synapse user directory", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.user_dir"
@@ -232,7 +229,6 @@ def start(config_options):
 
     def start():
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)