summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorAmber H. Brown <hawkowl@atleastfornow.net>2020-01-17 01:03:26 +1100
committerAmber H. Brown <hawkowl@atleastfornow.net>2020-01-17 01:03:26 +1100
commitc76a412cc354e632078e3e19579bbafcb7619a51 (patch)
treef24dc26a76d92c751c620fd46aa2c5c25ca9fafe /synapse/app
parentRe-sync every so often, in case caches appear (diff)
parentAdd StateMap type alias (#6715) (diff)
downloadsynapse-c76a412cc354e632078e3e19579bbafcb7619a51.tar.xz
Merge remote-tracking branch 'origin/develop' into hawkowl/cache-config-without-synctl
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py12
-rw-r--r--synapse/app/admin_cmd.py14
-rw-r--r--synapse/app/appservice.py10
-rw-r--r--synapse/app/client_reader.py5
-rw-r--r--synapse/app/event_creator.py5
-rw-r--r--synapse/app/federation_reader.py5
-rw-r--r--synapse/app/federation_sender.py17
-rw-r--r--synapse/app/frontend_proxy.py5
-rw-r--r--synapse/app/homeserver.py87
-rw-r--r--synapse/app/media_repository.py13
-rw-r--r--synapse/app/pusher.py17
-rw-r--r--synapse/app/synchrotron.py30
-rw-r--r--synapse/app/user_dir.py17
13 files changed, 85 insertions, 152 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 2ac7d5c064..0e8b467a3e 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -237,6 +237,12 @@ def start(hs, listeners=None):
     """
     Start a Synapse server or worker.
 
+    Should be called once the reactor is running and (if we're using ACME) the
+    TLS certificates are in place.
+
+    Will start the main HTTP listeners and do some other startup tasks, and then
+    notify systemd.
+
     Args:
         hs (synapse.server.HomeServer)
         listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
@@ -269,7 +275,7 @@ def start(hs, listeners=None):
 
         # It is now safe to start your Synapse.
         hs.start_listening(listeners)
-        hs.get_datastore().start_profiling()
+        hs.get_datastore().db.start_profiling()
 
         setup_sentry(hs)
         setup_sdnotify(hs)
@@ -311,9 +317,7 @@ def setup_sdnotify(hs):
 
     # Tell systemd our state, if we're using it. This will silently fail if
     # we're not using systemd.
-    hs.get_reactor().addSystemEventTrigger(
-        "after", "startup", sdnotify, b"READY=1\nMAINPID=%i" % (os.getpid(),)
-    )
+    sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))
 
     hs.get_reactor().addSystemEventTrigger(
         "before", "shutdown", sdnotify, b"STOPPING=1"
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 04751a6a5e..1c7c6ec0c8 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -45,7 +45,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.logcontext import LoggingContext
 from synapse.util.versionstring import get_version_string
 
@@ -85,8 +84,7 @@ class AdminCmdServer(HomeServer):
 
 
 class AdminCmdReplicationHandler(ReplicationClientHandler):
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
+    async def on_rdata(self, stream_name, token, rows):
         pass
 
     def get_streams_to_replicate(self):
@@ -105,8 +103,10 @@ def export_data_command(hs, args):
     user_id = args.user_id
     directory = args.output_directory
 
-    res = yield hs.get_handlers().admin_handler.export_user_data(
-        user_id, FileExfiltrationWriter(user_id, directory=directory)
+    res = yield defer.ensureDeferred(
+        hs.get_handlers().admin_handler.export_user_data(
+            user_id, FileExfiltrationWriter(user_id, directory=directory)
+        )
     )
     print(res)
 
@@ -229,14 +229,10 @@ def start(config_options):
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = AdminCmdServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 02b900f382..2217d4a4fb 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -34,7 +34,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -116,9 +115,8 @@ class ASReplicationHandler(ReplicationClientHandler):
         super(ASReplicationHandler, self).__init__(hs.get_datastore())
         self.appservice_handler = hs.get_application_service_handler()
 
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
-        yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+    async def on_rdata(self, stream_name, token, rows):
+        await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
@@ -143,8 +141,6 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     if config.notify_appservices:
         sys.stderr.write(
             "\nThe appservices must be disabled in the main synapse process"
@@ -159,10 +155,8 @@ def start(config_options):
 
     ps = AppserviceServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ps, config, use_worker_options=True)
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index dadb487d5f..3edfe19567 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -62,7 +62,6 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
 from synapse.rest.client.versions import VersionsRestServlet
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -181,14 +180,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = ClientReaderServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index d110599a35..d0ddbe38fc 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -57,7 +57,6 @@ from synapse.rest.client.v1.room import (
 )
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -180,14 +179,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = EventCreatorServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 418c086254..311523e0ed 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -46,7 +46,6 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -162,14 +161,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = FederationReaderServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 139221ad34..a57cf991ac 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -40,7 +40,7 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.replication.tcp.streams._base import ReceiptsStream
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
+from synapse.storage.database import Database
 from synapse.types import ReadReceipt
 from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
@@ -59,8 +59,8 @@ class FederationSenderSlaveStore(
     SlavedDeviceStore,
     SlavedPresenceStore,
 ):
-    def __init__(self, db_conn, hs):
-        super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
+    def __init__(self, database: Database, db_conn, hs):
+        super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs)
 
         # We pull out the current federation stream position now so that we
         # always have a known value for the federation position in memory so
@@ -69,7 +69,7 @@ class FederationSenderSlaveStore(
         self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
 
     def _get_federation_out_pos(self, db_conn):
-        sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
+        sql = "SELECT stream_id FROM federation_stream_position WHERE type = ?"
         sql = self.database_engine.convert_param_style(sql)
 
         txn = db_conn.cursor()
@@ -145,9 +145,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
         self.send_handler = FederationSenderHandler(hs, self)
 
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
-        yield super(FederationSenderReplicationHandler, self).on_rdata(
+    async def on_rdata(self, stream_name, token, rows):
+        await super(FederationSenderReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -173,8 +172,6 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     if config.send_federation:
         sys.stderr.write(
             "\nThe send_federation must be disabled in the main synapse process"
@@ -189,10 +186,8 @@ def start(config_options):
 
     ss = FederationSenderServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index e647459d0e..30e435eead 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -39,7 +39,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v2_alpha._base import client_patterns
 from synapse.server import HomeServer
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -234,14 +233,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = FrontendProxyServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 98e528187a..d86d924bea 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -31,7 +31,7 @@ from prometheus_client import Gauge
 from twisted.application import service
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
-from twisted.web.resource import EncodingResourceWrapper, NoResource
+from twisted.web.resource import EncodingResourceWrapper, IResource, NoResource
 from twisted.web.server import GzipEncoderFactory
 from twisted.web.static import File
 
@@ -39,7 +39,6 @@ import synapse
 import synapse.config.logger
 from synapse import events
 from synapse.api.urls import (
-    CONTENT_REPO_PREFIX,
     FEDERATION_PREFIX,
     LEGACY_MEDIA_PREFIX,
     MEDIA_PREFIX,
@@ -65,12 +64,18 @@ from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.admin import AdminRestResource
 from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.rest.well_known import WellKnownResource
 from synapse.server import HomeServer
+<<<<<<< HEAD
 from synapse.storage import DataStore, are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
 from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+=======
+from synapse.storage import DataStore
+from synapse.storage.engines import IncorrectDatabaseSetup
+from synapse.storage.prepare_database import UpgradeDatabaseException
+from synapse.util.caches import CACHE_SIZE_FACTOR
+>>>>>>> origin/develop
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.module_loader import load_module
@@ -110,7 +115,16 @@ class SynapseHomeServer(HomeServer):
         for path, resmodule in additional_resources.items():
             handler_cls, config = load_module(resmodule)
             handler = handler_cls(config, module_api)
-            resources[path] = AdditionalResource(self, handler.handle_request)
+            if IResource.providedBy(handler):
+                resource = handler
+            elif hasattr(handler, "handle_request"):
+                resource = AdditionalResource(self, handler.handle_request)
+            else:
+                raise ConfigError(
+                    "additional_resource %s does not implement a known interface"
+                    % (resmodule["module"],)
+                )
+            resources[path] = resource
 
         # try to find something useful to redirect '/' to
         if WEB_CLIENT_PREFIX in resources:
@@ -222,13 +236,7 @@ class SynapseHomeServer(HomeServer):
             if self.get_config().enable_media_repo:
                 media_repo = self.get_media_repository_resource()
                 resources.update(
-                    {
-                        MEDIA_PREFIX: media_repo,
-                        LEGACY_MEDIA_PREFIX: media_repo,
-                        CONTENT_REPO_PREFIX: ContentRepoResource(
-                            self, self.config.uploads_path
-                        ),
-                    }
+                    {MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo}
                 )
             elif name == "media":
                 raise ConfigError(
@@ -293,22 +301,6 @@ class SynapseHomeServer(HomeServer):
             else:
                 logger.warning("Unrecognized listener type: %s", listener["type"])
 
-    def run_startup_checks(self, db_conn, database_engine):
-        all_users_native = are_all_users_on_domain(
-            db_conn.cursor(), database_engine, self.hostname
-        )
-        if not all_users_native:
-            quit_with_error(
-                "Found users in database not native to %s!\n"
-                "You cannot changed a synapse server_name after it's been configured"
-                % (self.hostname,)
-            )
-
-        try:
-            database_engine.check_database(db_conn.cursor())
-        except IncorrectDatabaseSetup as e:
-            quit_with_error(str(e))
-
 
 # Gauges to expose monthly active user control metrics
 current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
@@ -333,7 +325,7 @@ def setup(config_options):
             "Synapse Homeserver", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + str(e) + "\n")
+        sys.stderr.write("\nERROR: %s\n" % (e,))
         sys.exit(1)
 
     if not config:
@@ -343,40 +335,23 @@ def setup(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-    config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
-
     hs = SynapseHomeServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
 
-    logger.info("Preparing database: %s...", config.database_config["name"])
+    logger.info("Setting up server")
 
     try:
-        with hs.get_db_conn(run_new_connection=False) as db_conn:
-            prepare_database(db_conn, database_engine, config=config)
-            database_engine.on_new_connection(db_conn)
-
-            hs.run_startup_checks(db_conn, database_engine)
-
-            db_conn.commit()
-    except UpgradeDatabaseException:
-        sys.stderr.write(
-            "\nFailed to upgrade database.\n"
-            "Have you checked for version specific instructions in"
-            " UPGRADES.rst?\n"
-        )
-        sys.exit(1)
-
-    logger.info("Database prepared in %s.", config.database_config["name"])
+        hs.setup()
+    except IncorrectDatabaseSetup as e:
+        quit_with_error(str(e))
+    except UpgradeDatabaseException as e:
+        quit_with_error("Failed to upgrade database: %s" % (e,))
 
-    hs.setup()
     hs.setup_master()
 
     @defer.inlineCallbacks
@@ -435,7 +410,7 @@ def setup(config_options):
             _base.start(hs, config.listeners)
 
             hs.get_pusherpool().start()
-            hs.get_datastore().start_doing_background_updates()
+            hs.get_datastore().db.updates.start_doing_background_updates()
         except Exception:
             # Print the exception and bail out.
             print("Error during startup:", file=sys.stderr)
@@ -541,8 +516,10 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
     # Database version
     #
 
-    stats["database_engine"] = hs.get_datastore().database_engine_name
-    stats["database_server_version"] = hs.get_datastore().get_server_version()
+    # This only reports info about the *main* database.
+    stats["database_engine"] = hs.get_datastore().db.engine.module.__name__
+    stats["database_server_version"] = hs.get_datastore().db.engine.server_version
+
     logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
     try:
         yield hs.get_proxied_http_client().put_json(
@@ -584,7 +561,7 @@ def run(hs):
     def performance_stats_init():
         _stats_process.clear()
         _stats_process.append(
-            (int(hs.get_clock().time(), resource.getrusage(resource.RUSAGE_SELF)))
+            (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
         )
 
     def start_phone_stats_home():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 2c6dd3ef02..5b5832214a 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -21,7 +21,7 @@ from twisted.web.resource import NoResource
 
 import synapse
 from synapse import events
-from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+from synapse.api.urls import LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
 from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
@@ -34,13 +34,12 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -49,6 +48,7 @@ logger = logging.getLogger("synapse.app.media_repository")
 
 
 class MediaRepositorySlavedStore(
+    RoomStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedClientIpStore,
@@ -83,9 +83,6 @@ class MediaRepositoryServer(HomeServer):
                         {
                             MEDIA_PREFIX: media_repo,
                             LEGACY_MEDIA_PREFIX: media_repo,
-                            CONTENT_REPO_PREFIX: ContentRepoResource(
-                                self, self.config.uploads_path
-                            ),
                             "/_synapse/admin": admin_resource,
                         }
                     )
@@ -157,14 +154,10 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = MediaRepositoryServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 01a5ffc363..e46b6ac598 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -33,10 +33,10 @@ from synapse.replication.slave.storage.account_data import SlavedAccountDataStor
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage import DataStore
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.versionstring import get_version_string
@@ -45,7 +45,11 @@ logger = logging.getLogger("synapse.app.pusher")
 
 
 class PusherSlaveStore(
-    SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, SlavedAccountDataStore
+    SlavedEventStore,
+    SlavedPusherStore,
+    SlavedReceiptsStore,
+    SlavedAccountDataStore,
+    RoomStore,
 ):
     update_pusher_last_stream_ordering_and_success = __func__(
         DataStore.update_pusher_last_stream_ordering_and_success
@@ -137,9 +141,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
         self.pusher_pool = hs.get_pusherpool()
 
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
-        yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+    async def on_rdata(self, stream_name, token, rows):
+        await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
@@ -198,14 +201,10 @@ def start(config_options):
     # Force the pushers to start since they will be disabled in the main config
     config.start_pushers = True
 
-    database_engine = create_engine(config.database_config)
-
     ps = PusherServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ps, config, use_worker_options=True)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index b14da09f47..3218da07bd 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,14 +48,13 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams.events import EventsStreamEventRow
+from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
 from synapse.rest.client.v2_alpha import sync
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.presence import UserPresenceState
-from synapse.storage.engines import create_engine
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.stringutils import random_string
@@ -151,7 +150,7 @@ class SynchrotronPresence(object):
 
     def set_state(self, user, state, ignore_status_msg=False):
         # TODO Hows this supposed to work?
-        pass
+        return defer.succeed(None)
 
     get_states = __func__(PresenceHandler.get_states)
     get_state = __func__(PresenceHandler.get_state)
@@ -359,9 +358,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
-        yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+    async def on_rdata(self, stream_name, token, rows):
+        await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
@@ -372,8 +370,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
     def get_currently_syncing_users(self):
         return self.presence_handler.get_currently_syncing_users()
 
-    @defer.inlineCallbacks
-    def process_and_notify(self, stream_name, token, rows):
+    async def process_and_notify(self, stream_name, token, rows):
         try:
             if stream_name == "events":
                 # We shouldn't get multiple rows per token for events stream, so
@@ -381,7 +378,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
                 for row in rows:
                     if row.type != EventsStreamEventRow.TypeId:
                         continue
-                    event = yield self.store.get_event(row.data.event_id)
+                    assert isinstance(row, EventsStreamRow)
+
+                    event = await self.store.get_event(
+                        row.data.event_id, allow_rejected=True
+                    )
+                    if event.rejected_reason:
+                        continue
+
                     extra_users = ()
                     if event.type == EventTypes.Member:
                         extra_users = (event.state_key,)
@@ -413,11 +417,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
-                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    room_ids = await self.store.get_rooms_for_user(row.user_id)
                     all_room_ids.update(room_ids)
                 self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
-                yield self.presence_handler.process_replication_rows(token, rows)
+                await self.presence_handler.process_replication_rows(token, rows)
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
                     "groups_key", token, users=[row.user_id for row in rows]
@@ -437,14 +441,10 @@ def start(config_options):
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     ss = SynchrotronServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
         application_service_handler=SynchrotronApplicationService(),
     )
 
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 6cb100319f..ba536d6f04 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -43,7 +43,7 @@ from synapse.replication.tcp.streams.events import (
 from synapse.rest.client.v2_alpha import user_directory
 from synapse.server import HomeServer
 from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
-from synapse.storage.engines import create_engine
+from synapse.storage.database import Database
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
@@ -60,11 +60,11 @@ class UserDirectorySlaveStore(
     UserDirectoryStore,
     BaseSlavedStore,
 ):
-    def __init__(self, db_conn, hs):
-        super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
+    def __init__(self, database: Database, db_conn, hs):
+        super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs)
 
         events_max = self._stream_id_gen.get_current_token()
-        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
+        curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
             db_conn,
             "current_state_delta_stream",
             entity_column="room_id",
@@ -172,9 +172,8 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
         self.user_directory = hs.get_user_directory_handler()
 
-    @defer.inlineCallbacks
-    def on_rdata(self, stream_name, token, rows):
-        yield super(UserDirectoryReplicationHandler, self).on_rdata(
+    async def on_rdata(self, stream_name, token, rows):
+        await super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         if stream_name == EventsStream.NAME:
@@ -199,8 +198,6 @@ def start(config_options):
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
-    database_engine = create_engine(config.database_config)
-
     if config.update_user_directory:
         sys.stderr.write(
             "\nThe update_user_directory must be disabled in the main synapse process"
@@ -215,10 +212,8 @@ def start(config_options):
 
     ss = UserDirectoryServer(
         config.server_name,
-        db_config=config.database_config,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
-        database_engine=database_engine,
     )
 
     setup_logging(ss, config, use_worker_options=True)