summary refs log tree commit diff
path: root/synapse/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/server.py')
-rw-r--r--synapse/server.py172
1 files changed, 109 insertions, 63 deletions
diff --git a/synapse/server.py b/synapse/server.py
index 1fcc7375d3..fe94836a2c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -23,17 +23,18 @@
 # Imports required for the default HomeServer() implementation
 import abc
 import logging
+import os
 
-from twisted.enterprise import adbapi
 from twisted.mail.smtp import sendmail
-from twisted.web.client import BrowserLikePolicyForHTTPS
 
 from synapse.api.auth import Auth
 from synapse.api.filtering import Filtering
 from synapse.api.ratelimiting import Ratelimiter
 from synapse.appservice.api import ApplicationServiceApi
 from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
+from synapse.crypto.context_factory import RegularPolicyForHTTPS
 from synapse.crypto.keyring import Keyring
 from synapse.events.builder import EventBuilderFactory
 from synapse.events.spamcheck import SpamChecker
@@ -49,22 +50,24 @@ from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.sender import FederationSender
 from synapse.federation.transport.client import TransportLayerClient
 from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
-from synapse.groups.groups_server import GroupsServerHandler
+from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
 from synapse.handlers import Handlers
 from synapse.handlers.account_validity import AccountValidityHandler
 from synapse.handlers.acme import AcmeHandler
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler, MacaroonGenerator
+from synapse.handlers.cas_handler import CasHandler
 from synapse.handlers.deactivate_account import DeactivateAccountHandler
 from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler
 from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
-from synapse.handlers.groups_local import GroupsLocalHandler
+from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
 from synapse.handlers.initial_sync import InitialSyncHandler
 from synapse.handlers.message import EventCreationHandler, MessageHandler
 from synapse.handlers.pagination import PaginationHandler
+from synapse.handlers.password_policy import PasswordPolicyHandler
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
 from synapse.handlers.read_marker import ReadMarkerHandler
@@ -84,6 +87,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
 from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
+from synapse.replication.tcp.client import ReplicationDataHandler
+from synapse.replication.tcp.handler import ReplicationCommandHandler
+from synapse.replication.tcp.resource import ReplicationStreamer
+from synapse.replication.tcp.streams import STREAMS_MAP
 from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
@@ -95,9 +102,11 @@ from synapse.server_notices.worker_server_notices_sender import (
     WorkerServerNoticesSender,
 )
 from synapse.state import StateHandler, StateResolutionHandler
+from synapse.storage import DataStores, Storage
 from synapse.streams.events import EventSources
 from synapse.util import Clock
 from synapse.util.distributor import Distributor
+from synapse.util.stringutils import random_string
 
 logger = logging.getLogger(__name__)
 
@@ -130,7 +139,6 @@ class HomeServer(object):
 
     DEPENDENCIES = [
         "http_client",
-        "db_pool",
         "federation_client",
         "federation_server",
         "handlers",
@@ -167,6 +175,7 @@ class HomeServer(object):
         "filtering",
         "http_client_context_factory",
         "simple_http_client",
+        "proxied_http_client",
         "media_repository",
         "media_repository_resource",
         "federation_transport_client",
@@ -194,8 +203,15 @@ class HomeServer(object):
         "sendmail",
         "registration_handler",
         "account_validity_handler",
+        "cas_handler",
         "saml_handler",
+        "oidc_handler",
         "event_client_serializer",
+        "password_policy_handler",
+        "storage",
+        "replication_streamer",
+        "replication_data_handler",
+        "replication_streams",
     ]
 
     REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
@@ -205,36 +221,60 @@ class HomeServer(object):
     # instantiated during setup() for future return by get_datastore()
     DATASTORE_CLASS = abc.abstractproperty()
 
-    def __init__(self, hostname, reactor=None, **kwargs):
+    def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwargs):
         """
         Args:
             hostname : The hostname for the server.
+            config: The full config for the homeserver.
         """
         if not reactor:
             from twisted.internet import reactor
 
         self._reactor = reactor
         self.hostname = hostname
+        self.config = config
         self._building = {}
         self._listening_services = []
+        self.start_time = None
+
+        self._instance_id = random_string(5)
+        self._instance_name = config.worker_name or "master"
 
         self.clock = Clock(reactor)
         self.distributor = Distributor()
-        self.ratelimiter = Ratelimiter()
-        self.admin_redaction_ratelimiter = Ratelimiter()
-        self.registration_ratelimiter = Ratelimiter()
 
-        self.datastore = None
+        self.registration_ratelimiter = Ratelimiter(
+            clock=self.clock,
+            rate_hz=config.rc_registration.per_second,
+            burst_count=config.rc_registration.burst_count,
+        )
+
+        self.datastores = None
 
         # Other kwargs are explicit dependencies
         for depname in kwargs:
             setattr(self, depname, kwargs[depname])
 
+    def get_instance_id(self):
+        """A unique ID for this synapse process instance.
+
+        This is used to distinguish running instances in worker-based
+        deployments.
+        """
+        return self._instance_id
+
+    def get_instance_name(self) -> str:
+        """A unique name for this synapse process.
+
+        Used to identify the process over replication and in config. Does not
+        change over restarts.
+        """
+        return self._instance_name
+
     def setup(self):
         logger.info("Setting up.")
-        with self.get_db_conn() as conn:
-            self.datastore = self.DATASTORE_CLASS(conn, self)
-            conn.commit()
+        self.start_time = int(self.get_clock().time())
+        self.datastores = DataStores(self.DATASTORE_CLASS, self)
         logger.info("Finished setting up.")
 
     def setup_master(self):
@@ -266,7 +306,10 @@ class HomeServer(object):
         return self.clock
 
     def get_datastore(self):
-        return self.datastore
+        return self.datastores.main
+
+    def get_datastores(self):
+        return self.datastores
 
     def get_config(self):
         return self.config
@@ -274,15 +317,9 @@ class HomeServer(object):
     def get_distributor(self):
         return self.distributor
 
-    def get_ratelimiter(self):
-        return self.ratelimiter
-
-    def get_registration_ratelimiter(self):
+    def get_registration_ratelimiter(self) -> Ratelimiter:
         return self.registration_ratelimiter
 
-    def get_admin_redaction_ratelimiter(self):
-        return self.admin_redaction_ratelimiter
-
     def build_federation_client(self):
         return FederationClient(self)
 
@@ -302,12 +339,19 @@ class HomeServer(object):
         return (
             InsecureInterceptableContextFactory()
             if self.config.use_insecure_ssl_client_just_for_testing_do_not_use
-            else BrowserLikePolicyForHTTPS()
+            else RegularPolicyForHTTPS()
         )
 
     def build_simple_http_client(self):
         return SimpleHttpClient(self)
 
+    def build_proxied_http_client(self):
+        return SimpleHttpClient(
+            self,
+            http_proxy=os.getenvb(b"http_proxy"),
+            https_proxy=os.getenvb(b"HTTPS_PROXY"),
+        )
+
     def build_room_creation_handler(self):
         return RoomCreationHandler(self)
 
@@ -405,36 +449,11 @@ class HomeServer(object):
         return PusherPool(self)
 
     def build_http_client(self):
-        tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
+        tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
             self.config
         )
         return MatrixFederationHttpClient(self, tls_client_options_factory)
 
-    def build_db_pool(self):
-        name = self.db_config["name"]
-
-        return adbapi.ConnectionPool(
-            name, cp_reactor=self.get_reactor(), **self.db_config.get("args", {})
-        )
-
-    def get_db_conn(self, run_new_connection=True):
-        """Makes a new connection to the database, skipping the db pool
-
-        Returns:
-            Connection: a connection object implementing the PEP-249 spec
-        """
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v
-            for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
     def build_media_repository_resource(self):
         # build the media repo resource. This indirects through the HomeServer
         # to ensure that we only have a single instance of
@@ -461,7 +480,7 @@ class HomeServer(object):
         return ReadMarkerHandler(self)
 
     def build_tcp_replication(self):
-        raise NotImplementedError()
+        return ReplicationCommandHandler(self)
 
     def build_action_generator(self):
         return ActionGenerator(self)
@@ -470,10 +489,16 @@ class HomeServer(object):
         return UserDirectoryHandler(self)
 
     def build_groups_local_handler(self):
-        return GroupsLocalHandler(self)
+        if self.config.worker_app:
+            return GroupsLocalWorkerHandler(self)
+        else:
+            return GroupsLocalHandler(self)
 
     def build_groups_server_handler(self):
-        return GroupsServerHandler(self)
+        if self.config.worker_app:
+            return GroupsServerWorkerHandler(self)
+        else:
+            return GroupsServerHandler(self)
 
     def build_groups_attestation_signing(self):
         return GroupAttestationSigning(self)
@@ -529,14 +554,37 @@ class HomeServer(object):
     def build_account_validity_handler(self):
         return AccountValidityHandler(self)
 
+    def build_cas_handler(self):
+        return CasHandler(self)
+
     def build_saml_handler(self):
         from synapse.handlers.saml_handler import SamlHandler
 
         return SamlHandler(self)
 
+    def build_oidc_handler(self):
+        from synapse.handlers.oidc_handler import OidcHandler
+
+        return OidcHandler(self)
+
     def build_event_client_serializer(self):
         return EventClientSerializer(self)
 
+    def build_password_policy_handler(self):
+        return PasswordPolicyHandler(self)
+
+    def build_storage(self) -> Storage:
+        return Storage(self, self.datastores)
+
+    def build_replication_streamer(self) -> ReplicationStreamer:
+        return ReplicationStreamer(self)
+
+    def build_replication_data_handler(self):
+        return ReplicationDataHandler(self)
+
+    def build_replication_streams(self):
+        return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}
+
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
 
@@ -558,24 +606,22 @@ def _make_dependency_method(depname):
         try:
             builder = getattr(hs, "build_%s" % (depname))
         except AttributeError:
-            builder = None
+            raise NotImplementedError(
+                "%s has no %s nor a builder for it" % (type(hs).__name__, depname)
+            )
 
-        if builder:
-            # Prevent cyclic dependencies from deadlocking
-            if depname in hs._building:
-                raise ValueError("Cyclic dependency while building %s" % (depname,))
-            hs._building[depname] = 1
+        # Prevent cyclic dependencies from deadlocking
+        if depname in hs._building:
+            raise ValueError("Cyclic dependency while building %s" % (depname,))
 
+        hs._building[depname] = 1
+        try:
             dep = builder()
             setattr(hs, depname, dep)
-
+        finally:
             del hs._building[depname]
 
-            return dep
-
-        raise NotImplementedError(
-            "%s has no %s nor a builder for it" % (type(hs).__name__, depname)
-        )
+        return dep
 
     setattr(HomeServer, "get_%s" % (depname), _get)