diff --git a/synapse/server.py b/synapse/server.py
index 1fcc7375d3..fe94836a2c 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -23,17 +23,18 @@
# Imports required for the default HomeServer() implementation
import abc
import logging
+import os
-from twisted.enterprise import adbapi
from twisted.mail.smtp import sendmail
-from twisted.web.client import BrowserLikePolicyForHTTPS
from synapse.api.auth import Auth
from synapse.api.filtering import Filtering
from synapse.api.ratelimiting import Ratelimiter
from synapse.appservice.api import ApplicationServiceApi
from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
+from synapse.crypto.context_factory import RegularPolicyForHTTPS
from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker
@@ -49,22 +50,24 @@ from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
-from synapse.groups.groups_server import GroupsServerHandler
+from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
from synapse.handlers import Handlers
from synapse.handlers.account_validity import AccountValidityHandler
from synapse.handlers.acme import AcmeHandler
from synapse.handlers.appservice import ApplicationServicesHandler
from synapse.handlers.auth import AuthHandler, MacaroonGenerator
+from synapse.handlers.cas_handler import CasHandler
from synapse.handlers.deactivate_account import DeactivateAccountHandler
from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler
from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
-from synapse.handlers.groups_local import GroupsLocalHandler
+from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler
+from synapse.handlers.password_policy import PasswordPolicyHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import BaseProfileHandler, MasterProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
@@ -84,6 +87,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.notifier import Notifier
from synapse.push.action_generator import ActionGenerator
from synapse.push.pusherpool import PusherPool
+from synapse.replication.tcp.client import ReplicationDataHandler
+from synapse.replication.tcp.handler import ReplicationCommandHandler
+from synapse.replication.tcp.resource import ReplicationStreamer
+from synapse.replication.tcp.streams import STREAMS_MAP
from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
@@ -95,9 +102,11 @@ from synapse.server_notices.worker_server_notices_sender import (
WorkerServerNoticesSender,
)
from synapse.state import StateHandler, StateResolutionHandler
+from synapse.storage import DataStores, Storage
from synapse.streams.events import EventSources
from synapse.util import Clock
from synapse.util.distributor import Distributor
+from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
@@ -130,7 +139,6 @@ class HomeServer(object):
DEPENDENCIES = [
"http_client",
- "db_pool",
"federation_client",
"federation_server",
"handlers",
@@ -167,6 +175,7 @@ class HomeServer(object):
"filtering",
"http_client_context_factory",
"simple_http_client",
+ "proxied_http_client",
"media_repository",
"media_repository_resource",
"federation_transport_client",
@@ -194,8 +203,15 @@ class HomeServer(object):
"sendmail",
"registration_handler",
"account_validity_handler",
+ "cas_handler",
"saml_handler",
+ "oidc_handler",
"event_client_serializer",
+ "password_policy_handler",
+ "storage",
+ "replication_streamer",
+ "replication_data_handler",
+ "replication_streams",
]
REQUIRED_ON_MASTER_STARTUP = ["user_directory_handler", "stats_handler"]
@@ -205,36 +221,60 @@ class HomeServer(object):
# instantiated during setup() for future return by get_datastore()
DATASTORE_CLASS = abc.abstractproperty()
- def __init__(self, hostname, reactor=None, **kwargs):
+ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwargs):
"""
Args:
hostname : The hostname for the server.
+ config: The full config for the homeserver.
"""
if not reactor:
from twisted.internet import reactor
self._reactor = reactor
self.hostname = hostname
+ self.config = config
self._building = {}
self._listening_services = []
+ self.start_time = None
+
+ self._instance_id = random_string(5)
+ self._instance_name = config.worker_name or "master"
self.clock = Clock(reactor)
self.distributor = Distributor()
- self.ratelimiter = Ratelimiter()
- self.admin_redaction_ratelimiter = Ratelimiter()
- self.registration_ratelimiter = Ratelimiter()
- self.datastore = None
+ self.registration_ratelimiter = Ratelimiter(
+ clock=self.clock,
+ rate_hz=config.rc_registration.per_second,
+ burst_count=config.rc_registration.burst_count,
+ )
+
+ self.datastores = None
# Other kwargs are explicit dependencies
for depname in kwargs:
setattr(self, depname, kwargs[depname])
+ def get_instance_id(self):
+ """A unique ID for this synapse process instance.
+
+ This is used to distinguish running instances in worker-based
+ deployments.
+ """
+ return self._instance_id
+
+ def get_instance_name(self) -> str:
+ """A unique name for this synapse process.
+
+ Used to identify the process over replication and in config. Does not
+ change over restarts.
+ """
+ return self._instance_name
+
def setup(self):
logger.info("Setting up.")
- with self.get_db_conn() as conn:
- self.datastore = self.DATASTORE_CLASS(conn, self)
- conn.commit()
+ self.start_time = int(self.get_clock().time())
+ self.datastores = DataStores(self.DATASTORE_CLASS, self)
logger.info("Finished setting up.")
def setup_master(self):
@@ -266,7 +306,10 @@ class HomeServer(object):
return self.clock
def get_datastore(self):
- return self.datastore
+ return self.datastores.main
+
+ def get_datastores(self):
+ return self.datastores
def get_config(self):
return self.config
@@ -274,15 +317,9 @@ class HomeServer(object):
def get_distributor(self):
return self.distributor
- def get_ratelimiter(self):
- return self.ratelimiter
-
- def get_registration_ratelimiter(self):
+ def get_registration_ratelimiter(self) -> Ratelimiter:
return self.registration_ratelimiter
- def get_admin_redaction_ratelimiter(self):
- return self.admin_redaction_ratelimiter
-
def build_federation_client(self):
return FederationClient(self)
@@ -302,12 +339,19 @@ class HomeServer(object):
return (
InsecureInterceptableContextFactory()
if self.config.use_insecure_ssl_client_just_for_testing_do_not_use
- else BrowserLikePolicyForHTTPS()
+ else RegularPolicyForHTTPS()
)
def build_simple_http_client(self):
return SimpleHttpClient(self)
+ def build_proxied_http_client(self):
+ return SimpleHttpClient(
+ self,
+ http_proxy=os.getenvb(b"http_proxy"),
+ https_proxy=os.getenvb(b"HTTPS_PROXY"),
+ )
+
def build_room_creation_handler(self):
return RoomCreationHandler(self)
@@ -405,36 +449,11 @@ class HomeServer(object):
return PusherPool(self)
def build_http_client(self):
- tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
+ tls_client_options_factory = context_factory.FederationPolicyForHTTPS(
self.config
)
return MatrixFederationHttpClient(self, tls_client_options_factory)
- def build_db_pool(self):
- name = self.db_config["name"]
-
- return adbapi.ConnectionPool(
- name, cp_reactor=self.get_reactor(), **self.db_config.get("args", {})
- )
-
- def get_db_conn(self, run_new_connection=True):
- """Makes a new connection to the database, skipping the db pool
-
- Returns:
- Connection: a connection object implementing the PEP-249 spec
- """
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v
- for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def build_media_repository_resource(self):
# build the media repo resource. This indirects through the HomeServer
# to ensure that we only have a single instance of
@@ -461,7 +480,7 @@ class HomeServer(object):
return ReadMarkerHandler(self)
def build_tcp_replication(self):
- raise NotImplementedError()
+ return ReplicationCommandHandler(self)
def build_action_generator(self):
return ActionGenerator(self)
@@ -470,10 +489,16 @@ class HomeServer(object):
return UserDirectoryHandler(self)
def build_groups_local_handler(self):
- return GroupsLocalHandler(self)
+ if self.config.worker_app:
+ return GroupsLocalWorkerHandler(self)
+ else:
+ return GroupsLocalHandler(self)
def build_groups_server_handler(self):
- return GroupsServerHandler(self)
+ if self.config.worker_app:
+ return GroupsServerWorkerHandler(self)
+ else:
+ return GroupsServerHandler(self)
def build_groups_attestation_signing(self):
return GroupAttestationSigning(self)
@@ -529,14 +554,37 @@ class HomeServer(object):
def build_account_validity_handler(self):
return AccountValidityHandler(self)
+ def build_cas_handler(self):
+ return CasHandler(self)
+
def build_saml_handler(self):
from synapse.handlers.saml_handler import SamlHandler
return SamlHandler(self)
+ def build_oidc_handler(self):
+ from synapse.handlers.oidc_handler import OidcHandler
+
+ return OidcHandler(self)
+
def build_event_client_serializer(self):
return EventClientSerializer(self)
+ def build_password_policy_handler(self):
+ return PasswordPolicyHandler(self)
+
+ def build_storage(self) -> Storage:
+ return Storage(self, self.datastores)
+
+ def build_replication_streamer(self) -> ReplicationStreamer:
+ return ReplicationStreamer(self)
+
+ def build_replication_data_handler(self):
+ return ReplicationDataHandler(self)
+
+ def build_replication_streams(self):
+ return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
@@ -558,24 +606,22 @@ def _make_dependency_method(depname):
try:
builder = getattr(hs, "build_%s" % (depname))
except AttributeError:
- builder = None
+ raise NotImplementedError(
+ "%s has no %s nor a builder for it" % (type(hs).__name__, depname)
+ )
- if builder:
- # Prevent cyclic dependencies from deadlocking
- if depname in hs._building:
- raise ValueError("Cyclic dependency while building %s" % (depname,))
- hs._building[depname] = 1
+ # Prevent cyclic dependencies from deadlocking
+ if depname in hs._building:
+ raise ValueError("Cyclic dependency while building %s" % (depname,))
+ hs._building[depname] = 1
+ try:
dep = builder()
setattr(hs, depname, dep)
-
+ finally:
del hs._building[depname]
- return dep
-
- raise NotImplementedError(
- "%s has no %s nor a builder for it" % (type(hs).__name__, depname)
- )
+ return dep
setattr(HomeServer, "get_%s" % (depname), _get)
|