diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index fcdc8e6e10..54f35900f8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,14 +16,10 @@
import synapse
-import contextlib
+import gc
import logging
import os
-import re
-import resource
-import subprocess
import sys
-import time
from synapse.config._base import ConfigError
from synapse.python_dependencies import (
@@ -33,22 +29,15 @@ from synapse.python_dependencies import (
from synapse.rest import ClientRestResource
from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.server import HomeServer
-
-from twisted.conch.manhole import ColoredManhole
-from twisted.conch.insults import insults
-from twisted.conch import manhole_ssh
-from twisted.cred import checkers, portal
-
-
from twisted.internet import reactor, task, defer
from twisted.application import service
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File
-from twisted.web.server import Site, GzipEncoderFactory, Request
+from twisted.web.server import GzipEncoderFactory
from synapse.http.server import RootRedirect
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@@ -62,10 +51,18 @@ from synapse.api.urls import (
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext
+from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
from synapse.federation.transport.server import TransportLayerServer
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.manhole import manhole
+
+from synapse.http.site import SynapseSite
+
from synapse import events
from daemonize import Daemonize
@@ -73,9 +70,6 @@ from daemonize import Daemonize
logger = logging.getLogger("synapse.app.homeserver")
-ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
-
def gz_wrap(r):
return EncodingResourceWrapper(r, [GzipEncoderFactory()])
@@ -154,7 +148,7 @@ class SynapseHomeServer(HomeServer):
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
- self, self.config.uploads_path, self.auth, self.content_addr
+ self, self.config.uploads_path
),
})
@@ -173,7 +167,12 @@ class SynapseHomeServer(HomeServer):
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationResource(self)
- root_resource = create_resource_tree(resources)
+ if WEB_CLIENT_PREFIX in resources:
+ root_resource = RootRedirect(WEB_CLIENT_PREFIX)
+ else:
+ root_resource = Resource()
+
+ root_resource = create_resource_tree(resources, root_resource)
if tls:
reactor.listenSSL(
port,
@@ -206,24 +205,13 @@ class SynapseHomeServer(HomeServer):
if listener["type"] == "http":
self._listener_http(config, listener)
elif listener["type"] == "manhole":
- checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
- matrix="rabbithole"
- )
-
- rlm = manhole_ssh.TerminalRealm()
- rlm.chainedProtocolFactory = lambda: insults.ServerProtocol(
- ColoredManhole,
- {
- "__name__": "__console__",
- "hs": self,
- }
- )
-
- f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
-
reactor.listenTCP(
listener["port"],
- f,
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
interface=listener.get("bind_address", '127.0.0.1')
)
else:
@@ -245,7 +233,7 @@ class SynapseHomeServer(HomeServer):
except IncorrectDatabaseSetup as e:
quit_with_error(e.message)
- def get_db_conn(self):
+ def get_db_conn(self, run_new_connection=True):
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
db_params = {
@@ -254,7 +242,8 @@ class SynapseHomeServer(HomeServer):
}
db_conn = self.database_engine.module.connect(**db_params)
- self.database_engine.on_new_connection(db_conn)
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
return db_conn
@@ -268,86 +257,6 @@ def quit_with_error(error_string):
sys.exit(1)
-def get_version_string():
- try:
- null = open(os.devnull, 'w')
- cwd = os.path.dirname(os.path.abspath(__file__))
- try:
- git_branch = subprocess.check_output(
- ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
- stderr=null,
- cwd=cwd,
- ).strip()
- git_branch = "b=" + git_branch
- except subprocess.CalledProcessError:
- git_branch = ""
-
- try:
- git_tag = subprocess.check_output(
- ['git', 'describe', '--exact-match'],
- stderr=null,
- cwd=cwd,
- ).strip()
- git_tag = "t=" + git_tag
- except subprocess.CalledProcessError:
- git_tag = ""
-
- try:
- git_commit = subprocess.check_output(
- ['git', 'rev-parse', '--short', 'HEAD'],
- stderr=null,
- cwd=cwd,
- ).strip()
- except subprocess.CalledProcessError:
- git_commit = ""
-
- try:
- dirty_string = "-this_is_a_dirty_checkout"
- is_dirty = subprocess.check_output(
- ['git', 'describe', '--dirty=' + dirty_string],
- stderr=null,
- cwd=cwd,
- ).strip().endswith(dirty_string)
-
- git_dirty = "dirty" if is_dirty else ""
- except subprocess.CalledProcessError:
- git_dirty = ""
-
- if git_branch or git_tag or git_commit or git_dirty:
- git_version = ",".join(
- s for s in
- (git_branch, git_tag, git_commit, git_dirty,)
- if s
- )
-
- return (
- "Synapse/%s (%s)" % (
- synapse.__version__, git_version,
- )
- ).encode("ascii")
- except Exception as e:
- logger.info("Failed to check for git repository: %s", e)
-
- return ("Synapse/%s" % (synapse.__version__,)).encode("ascii")
-
-
-def change_resource_limit(soft_file_no):
- try:
- soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
-
- if not soft_file_no:
- soft_file_no = hard
-
- resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard))
- logger.info("Set file limit to: %d", soft_file_no)
-
- resource.setrlimit(
- resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)
- )
- except (ValueError, resource.error) as e:
- logger.warn("Failed to set file or core limit: %s", e)
-
-
def setup(config_options):
"""
Args:
@@ -358,10 +267,9 @@ def setup(config_options):
HomeServer
"""
try:
- config = HomeServerConfig.load_config(
+ config = HomeServerConfig.load_or_generate_config(
"Synapse Homeserver",
config_options,
- generate_section="Homeserver"
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
@@ -377,7 +285,7 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
- version_string = get_version_string()
+ version_string = "Synapse/" + get_version_string(synapse)
logger.info("Server hostname: %s", config.server_name)
logger.info("Server version: %s", version_string)
@@ -386,7 +294,7 @@ def setup(config_options):
tls_server_context_factory = context_factory.ServerContextFactory(config)
- database_engine = create_engine(config)
+ database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
hs = SynapseHomeServer(
@@ -394,7 +302,6 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
- content_addr=config.content_addr,
version_string=version_string,
database_engine=database_engine,
)
@@ -402,8 +309,10 @@ def setup(config_options):
logger.info("Preparing database: %s...", config.database_config['name'])
try:
- db_conn = hs.get_db_conn()
- database_engine.prepare_database(db_conn)
+ db_conn = hs.get_db_conn(run_new_connection=False)
+ prepare_database(db_conn, database_engine, config=config)
+ database_engine.on_new_connection(db_conn)
+
hs.run_startup_checks(db_conn, database_engine)
db_conn.commit()
@@ -427,6 +336,8 @@ def setup(config_options):
hs.get_datastore().start_doing_background_updates()
hs.get_replication_layer().start_get_pdu_cache()
+ register_memory_metrics(hs)
+
reactor.callWhenRunning(start)
return hs
@@ -442,215 +353,13 @@ class SynapseService(service.Service):
def startService(self):
hs = setup(self.config)
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
def stopService(self):
return self._port.stopListening()
-class SynapseRequest(Request):
- def __init__(self, site, *args, **kw):
- Request.__init__(self, *args, **kw)
- self.site = site
- self.authenticated_entity = None
- self.start_time = 0
-
- def __repr__(self):
- # We overwrite this so that we don't log ``access_token``
- return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
- self.__class__.__name__,
- id(self),
- self.method,
- self.get_redacted_uri(),
- self.clientproto,
- self.site.site_tag,
- )
-
- def get_redacted_uri(self):
- return ACCESS_TOKEN_RE.sub(
- r'\1<redacted>\3',
- self.uri
- )
-
- def get_user_agent(self):
- return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
-
- def started_processing(self):
- self.site.access_logger.info(
- "%s - %s - Received request: %s %s",
- self.getClientIP(),
- self.site.site_tag,
- self.method,
- self.get_redacted_uri()
- )
- self.start_time = int(time.time() * 1000)
-
- def finished_processing(self):
-
- try:
- context = LoggingContext.current_context()
- ru_utime, ru_stime = context.get_resource_usage()
- db_txn_count = context.db_txn_count
- db_txn_duration = context.db_txn_duration
- except:
- ru_utime, ru_stime = (0, 0)
- db_txn_count, db_txn_duration = (0, 0)
-
- self.site.access_logger.info(
- "%s - %s - {%s}"
- " Processed request: %dms (%dms, %dms) (%dms/%d)"
- " %sB %s \"%s %s %s\" \"%s\"",
- self.getClientIP(),
- self.site.site_tag,
- self.authenticated_entity,
- int(time.time() * 1000) - self.start_time,
- int(ru_utime * 1000),
- int(ru_stime * 1000),
- int(db_txn_duration * 1000),
- int(db_txn_count),
- self.sentLength,
- self.code,
- self.method,
- self.get_redacted_uri(),
- self.clientproto,
- self.get_user_agent(),
- )
-
- @contextlib.contextmanager
- def processing(self):
- self.started_processing()
- yield
- self.finished_processing()
-
-
-class XForwardedForRequest(SynapseRequest):
- def __init__(self, *args, **kw):
- SynapseRequest.__init__(self, *args, **kw)
-
- """
- Add a layer on top of another request that only uses the value of an
- X-Forwarded-For header as the result of C{getClientIP}.
- """
- def getClientIP(self):
- """
- @return: The client address (the first address) in the value of the
- I{X-Forwarded-For header}. If the header is not present, return
- C{b"-"}.
- """
- return self.requestHeaders.getRawHeaders(
- b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
-
-
-class SynapseRequestFactory(object):
- def __init__(self, site, x_forwarded_for):
- self.site = site
- self.x_forwarded_for = x_forwarded_for
-
- def __call__(self, *args, **kwargs):
- if self.x_forwarded_for:
- return XForwardedForRequest(self.site, *args, **kwargs)
- else:
- return SynapseRequest(self.site, *args, **kwargs)
-
-
-class SynapseSite(Site):
- """
- Subclass of a twisted http Site that does access logging with python's
- standard logging
- """
- def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
- Site.__init__(self, resource, *args, **kwargs)
-
- self.site_tag = site_tag
-
- proxied = config.get("x_forwarded", False)
- self.requestFactory = SynapseRequestFactory(self, proxied)
- self.access_logger = logging.getLogger(logger_name)
-
- def log(self, request):
- pass
-
-
-def create_resource_tree(desired_tree, redirect_root_to_web_client=True):
- """Create the resource tree for this Home Server.
-
- This in unduly complicated because Twisted does not support putting
- child resources more than 1 level deep at a time.
-
- Args:
- web_client (bool): True to enable the web client.
- redirect_root_to_web_client (bool): True to redirect '/' to the
- location of the web client. This does nothing if web_client is not
- True.
- """
- if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree:
- root_resource = RootRedirect(WEB_CLIENT_PREFIX)
- else:
- root_resource = Resource()
-
- # ideally we'd just use getChild and putChild but getChild doesn't work
- # unless you give it a Request object IN ADDITION to the name :/ So
- # instead, we'll store a copy of this mapping so we can actually add
- # extra resources to existing nodes. See self._resource_id for the key.
- resource_mappings = {}
- for full_path, res in desired_tree.items():
- logger.info("Attaching %s to path %s", res, full_path)
- last_resource = root_resource
- for path_seg in full_path.split('/')[1:-1]:
- if path_seg not in last_resource.listNames():
- # resource doesn't exist, so make a "dummy resource"
- child_resource = Resource()
- last_resource.putChild(path_seg, child_resource)
- res_id = _resource_id(last_resource, path_seg)
- resource_mappings[res_id] = child_resource
- last_resource = child_resource
- else:
- # we have an existing Resource, use that instead.
- res_id = _resource_id(last_resource, path_seg)
- last_resource = resource_mappings[res_id]
-
- # ===========================
- # now attach the actual desired resource
- last_path_seg = full_path.split('/')[-1]
-
- # if there is already a resource here, thieve its children and
- # replace it
- res_id = _resource_id(last_resource, last_path_seg)
- if res_id in resource_mappings:
- # there is a dummy resource at this path already, which needs
- # to be replaced with the desired resource.
- existing_dummy_resource = resource_mappings[res_id]
- for child_name in existing_dummy_resource.listNames():
- child_res_id = _resource_id(
- existing_dummy_resource, child_name
- )
- child_resource = resource_mappings[child_res_id]
- # steal the children
- res.putChild(child_name, child_resource)
-
- # finally, insert the desired resource in the right place
- last_resource.putChild(last_path_seg, res)
- res_id = _resource_id(last_resource, last_path_seg)
- resource_mappings[res_id] = res
-
- return root_resource
-
-
-def _resource_id(resource, path_seg):
- """Construct an arbitrary resource ID so you can retrieve the mapping
- later.
-
- If you want to represent resource A putChild resource B with path C,
- the mapping should looks like _resource_id(A,C) = B.
-
- Args:
- resource (Resource): The *parent* Resourceb
- path_seg (str): The name of the child Resource to be attached.
- Returns:
- str: A unique string which can be a key to the child Resource.
- """
- return "%s-%s" % (resource, path_seg)
-
-
def run(hs):
PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE:
@@ -676,6 +385,8 @@ def run(hs):
start_time = hs.get_clock().time()
+ stats = {}
+
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@@ -684,7 +395,10 @@ def run(hs):
if uptime < 0:
uptime = 0
- stats = {}
+ # If the stats directory is empty then this is the first time we've
+ # reported stats.
+ first_time = not stats
+
stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
@@ -697,6 +411,25 @@ def run(hs):
daily_messages = yield hs.get_datastore().count_daily_messages()
if daily_messages is not None:
stats["daily_messages"] = daily_messages
+ else:
+ stats.pop("daily_messages", None)
+
+ if first_time:
+ # Add callbacks to report the synapse stats as metrics whenever
+ # prometheus requests them, typically every 30s.
+ # As some of the stats are expensive to calculate we only update
+ # them when synapse phones home to matrix.org every 24 hours.
+ metrics = get_metrics_for("synapse.usage")
+ metrics.add_callback("timestamp", lambda: stats["timestamp"])
+ metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
+ metrics.add_callback("total_users", lambda: stats["total_users"])
+ metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
+ metrics.add_callback(
+ "daily_active_users", lambda: stats["daily_active_users"]
+ )
+ metrics.add_callback(
+ "daily_messages", lambda: stats.get("daily_messages", 0)
+ )
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
@@ -717,6 +450,8 @@ def run(hs):
# sys.settrace(logcontext_tracer)
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
reactor.run()
if hs.config.daemonize:
|