diff --git a/synapse/__init__.py b/synapse/__init__.py
index d111335a1a..96e37308d6 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.9.2-r2"
+__version__ = "0.9.3"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index d5bf0be85c..4da62e5d8d 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -370,6 +370,8 @@ class Auth(object):
user_agent=user_agent
)
+ request.authenticated_entity = user.to_string()
+
defer.returnValue((user, ClientInfo(device_id, token_id)))
except KeyError:
raise AuthError(
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 65a5dfa84e..49e27c9e11 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -34,8 +34,7 @@ from twisted.application import service
from twisted.enterprise import adbapi
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File
-from twisted.web.server import Site, GzipEncoderFactory
-from twisted.web.http import proxiedLogFormatter, combinedLogFormatter
+from twisted.web.server import Site, GzipEncoderFactory, Request
from synapse.http.server import JsonResource, RootRedirect
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@@ -61,11 +60,13 @@ import twisted.manhole.telnet
import synapse
+import contextlib
import logging
import os
import re
import resource
import subprocess
+import time
logger = logging.getLogger("synapse.app.homeserver")
@@ -87,16 +88,10 @@ class SynapseHomeServer(HomeServer):
return MatrixFederationHttpClient(self)
def build_resource_for_client(self):
- res = ClientV1RestResource(self)
- if self.config.gzip_responses:
- res = gz_wrap(res)
- return res
+ return ClientV1RestResource(self)
def build_resource_for_client_v2_alpha(self):
- res = ClientV2AlphaRestResource(self)
- if self.config.gzip_responses:
- res = gz_wrap(res)
- return res
+ return ClientV2AlphaRestResource(self)
def build_resource_for_federation(self):
return JsonResource(self)
@@ -119,7 +114,7 @@ class SynapseHomeServer(HomeServer):
def build_resource_for_content_repo(self):
return ContentRepoResource(
- self, self.upload_dir, self.auth, self.content_addr
+ self, self.config.uploads_path, self.auth, self.content_addr
)
def build_resource_for_media_repository(self):
@@ -145,152 +140,105 @@ class SynapseHomeServer(HomeServer):
**self.db_config.get("args", {})
)
- def create_resource_tree(self, redirect_root_to_web_client):
- """Create the resource tree for this Home Server.
+ def _listener_http(self, config, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ tls = listener_config.get("tls", False)
+ site_tag = listener_config.get("tag", port)
- This in unduly complicated because Twisted does not support putting
- child resources more than 1 level deep at a time.
-
- Args:
- web_client (bool): True to enable the web client.
- redirect_root_to_web_client (bool): True to redirect '/' to the
- location of the web client. This does nothing if web_client is not
- True.
- """
- config = self.get_config()
- web_client = config.web_client
-
- # list containing (path_str, Resource) e.g:
- # [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
- desired_tree = [
- (CLIENT_PREFIX, self.get_resource_for_client()),
- (CLIENT_V2_ALPHA_PREFIX, self.get_resource_for_client_v2_alpha()),
- (FEDERATION_PREFIX, self.get_resource_for_federation()),
- (CONTENT_REPO_PREFIX, self.get_resource_for_content_repo()),
- (SERVER_KEY_PREFIX, self.get_resource_for_server_key()),
- (SERVER_KEY_V2_PREFIX, self.get_resource_for_server_key_v2()),
- (MEDIA_PREFIX, self.get_resource_for_media_repository()),
- (STATIC_PREFIX, self.get_resource_for_static_content()),
- ]
-
- if web_client:
- logger.info("Adding the web client.")
- desired_tree.append((WEB_CLIENT_PREFIX,
- self.get_resource_for_web_client()))
-
- if web_client and redirect_root_to_web_client:
- self.root_resource = RootRedirect(WEB_CLIENT_PREFIX)
- else:
- self.root_resource = Resource()
+ if tls and config.no_tls:
+ return
metrics_resource = self.get_resource_for_metrics()
- if config.metrics_port is None and metrics_resource is not None:
- desired_tree.append((METRICS_PREFIX, metrics_resource))
-
- # ideally we'd just use getChild and putChild but getChild doesn't work
- # unless you give it a Request object IN ADDITION to the name :/ So
- # instead, we'll store a copy of this mapping so we can actually add
- # extra resources to existing nodes. See self._resource_id for the key.
- resource_mappings = {}
- for full_path, res in desired_tree:
- logger.info("Attaching %s to path %s", res, full_path)
- last_resource = self.root_resource
- for path_seg in full_path.split('/')[1:-1]:
- if path_seg not in last_resource.listNames():
- # resource doesn't exist, so make a "dummy resource"
- child_resource = Resource()
- last_resource.putChild(path_seg, child_resource)
- res_id = self._resource_id(last_resource, path_seg)
- resource_mappings[res_id] = child_resource
- last_resource = child_resource
- else:
- # we have an existing Resource, use that instead.
- res_id = self._resource_id(last_resource, path_seg)
- last_resource = resource_mappings[res_id]
-
- # ===========================
- # now attach the actual desired resource
- last_path_seg = full_path.split('/')[-1]
-
- # if there is already a resource here, thieve its children and
- # replace it
- res_id = self._resource_id(last_resource, last_path_seg)
- if res_id in resource_mappings:
- # there is a dummy resource at this path already, which needs
- # to be replaced with the desired resource.
- existing_dummy_resource = resource_mappings[res_id]
- for child_name in existing_dummy_resource.listNames():
- child_res_id = self._resource_id(existing_dummy_resource,
- child_name)
- child_resource = resource_mappings[child_res_id]
- # steal the children
- res.putChild(child_name, child_resource)
-
- # finally, insert the desired resource in the right place
- last_resource.putChild(last_path_seg, res)
- res_id = self._resource_id(last_resource, last_path_seg)
- resource_mappings[res_id] = res
-
- return self.root_resource
-
- def _resource_id(self, resource, path_seg):
- """Construct an arbitrary resource ID so you can retrieve the mapping
- later.
-
- If you want to represent resource A putChild resource B with path C,
- the mapping should looks like _resource_id(A,C) = B.
-
- Args:
- resource (Resource): The *parent* Resource
- path_seg (str): The name of the child Resource to be attached.
- Returns:
- str: A unique string which can be a key to the child Resource.
- """
- return "%s-%s" % (resource, path_seg)
- def start_listening(self):
- config = self.get_config()
-
- if not config.no_tls and config.bind_port is not None:
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "client":
+ if res["compress"]:
+ client_v1 = gz_wrap(self.get_resource_for_client())
+ client_v2 = gz_wrap(self.get_resource_for_client_v2_alpha())
+ else:
+ client_v1 = self.get_resource_for_client()
+ client_v2 = self.get_resource_for_client_v2_alpha()
+
+ resources.update({
+ CLIENT_PREFIX: client_v1,
+ CLIENT_V2_ALPHA_PREFIX: client_v2,
+ })
+
+ if name == "federation":
+ resources.update({
+ FEDERATION_PREFIX: self.get_resource_for_federation(),
+ })
+
+ if name in ["static", "client"]:
+ resources.update({
+ STATIC_PREFIX: self.get_resource_for_static_content(),
+ })
+
+ if name in ["media", "federation", "client"]:
+ resources.update({
+ MEDIA_PREFIX: self.get_resource_for_media_repository(),
+ CONTENT_REPO_PREFIX: self.get_resource_for_content_repo(),
+ })
+
+ if name in ["keys", "federation"]:
+ resources.update({
+ SERVER_KEY_PREFIX: self.get_resource_for_server_key(),
+ SERVER_KEY_V2_PREFIX: self.get_resource_for_server_key_v2(),
+ })
+
+ if name == "webclient":
+ resources[WEB_CLIENT_PREFIX] = self.get_resource_for_web_client()
+
+ if name == "metrics" and metrics_resource:
+ resources[METRICS_PREFIX] = metrics_resource
+
+ root_resource = create_resource_tree(resources)
+ if tls:
reactor.listenSSL(
- config.bind_port,
+ port,
SynapseSite(
- "synapse.access.https",
- config,
- self.root_resource,
+ "synapse.access.https.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
),
self.tls_context_factory,
- interface=config.bind_host
+ interface=bind_address
)
- logger.info("Synapse now listening on port %d", config.bind_port)
-
- if config.unsecure_port is not None:
+ else:
reactor.listenTCP(
- config.unsecure_port,
+ port,
SynapseSite(
- "synapse.access.http",
- config,
- self.root_resource,
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
),
- interface=config.bind_host
+ interface=bind_address
)
- logger.info("Synapse now listening on port %d", config.unsecure_port)
+ logger.info("Synapse now listening on port %d", port)
- metrics_resource = self.get_resource_for_metrics()
- if metrics_resource and config.metrics_port is not None:
- reactor.listenTCP(
- config.metrics_port,
- SynapseSite(
- "synapse.access.metrics",
- config,
- metrics_resource,
- ),
- interface=config.metrics_bind_host,
- )
- logger.info(
- "Metrics now running on %s port %d",
- config.metrics_bind_host, config.metrics_port,
- )
+ def start_listening(self):
+ config = self.get_config()
+
+ for listener in config.listeners:
+ if listener["type"] == "http":
+ self._listener_http(config, listener)
+ elif listener["type"] == "manhole":
+ f = twisted.manhole.telnet.ShellFactory()
+ f.username = "matrix"
+ f.password = "rabbithole"
+ f.namespace['hs'] = self
+ reactor.listenTCP(
+ listener["port"],
+ f,
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain(
@@ -425,11 +373,6 @@ def setup(config_options):
events.USE_FROZEN_DICTS = config.use_frozen_dicts
- if re.search(":[0-9]+$", config.server_name):
- domain_with_port = config.server_name
- else:
- domain_with_port = "%s:%s" % (config.server_name, config.bind_port)
-
tls_context_factory = context_factory.ServerContextFactory(config)
database_engine = create_engine(config.database_config["name"])
@@ -437,8 +380,6 @@ def setup(config_options):
hs = SynapseHomeServer(
config.server_name,
- domain_with_port=domain_with_port,
- upload_dir=os.path.abspath("uploads"),
db_config=config.database_config,
tls_context_factory=tls_context_factory,
config=config,
@@ -447,10 +388,6 @@ def setup(config_options):
database_engine=database_engine,
)
- hs.create_resource_tree(
- redirect_root_to_web_client=True,
- )
-
logger.info("Preparing database: %r...", config.database_config)
try:
@@ -475,13 +412,6 @@ def setup(config_options):
logger.info("Database prepared in %r.", config.database_config)
- if config.manhole:
- f = twisted.manhole.telnet.ShellFactory()
- f.username = "matrix"
- f.password = "rabbithole"
- f.namespace['hs'] = hs
- reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
-
hs.start_listening()
hs.get_pusherpool().start()
@@ -507,22 +437,194 @@ class SynapseService(service.Service):
return self._port.stopListening()
+class SynapseRequest(Request):
+ def __init__(self, site, *args, **kw):
+ Request.__init__(self, *args, **kw)
+ self.site = site
+ self.authenticated_entity = None
+ self.start_time = 0
+
+ def __repr__(self):
+ # We overwrite this so that we don't log ``access_token``
+ return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
+ self.__class__.__name__,
+ id(self),
+ self.method,
+ self.get_redacted_uri(),
+ self.clientproto,
+ self.site.site_tag,
+ )
+
+ def get_redacted_uri(self):
+ return re.sub(
+ r'(\?.*access_token=)[^&]*(.*)$',
+ r'\1<redacted>\2',
+ self.uri
+ )
+
+ def get_user_agent(self):
+ return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
+
+ def started_processing(self):
+ self.site.access_logger.info(
+ "%s - %s - Received request: %s %s",
+ self.getClientIP(),
+ self.site.site_tag,
+ self.method,
+ self.get_redacted_uri()
+ )
+ self.start_time = int(time.time() * 1000)
+
+ def finished_processing(self):
+ self.site.access_logger.info(
+ "%s - %s - {%s}"
+ " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"",
+ self.getClientIP(),
+ self.site.site_tag,
+ self.authenticated_entity,
+ int(time.time() * 1000) - self.start_time,
+ self.sentLength,
+ self.code,
+ self.method,
+ self.get_redacted_uri(),
+ self.clientproto,
+ self.get_user_agent(),
+ )
+
+ @contextlib.contextmanager
+ def processing(self):
+ self.started_processing()
+ yield
+ self.finished_processing()
+
+
+class XForwardedForRequest(SynapseRequest):
+ def __init__(self, *args, **kw):
+ SynapseRequest.__init__(self, *args, **kw)
+
+ """
+ Add a layer on top of another request that only uses the value of an
+ X-Forwarded-For header as the result of C{getClientIP}.
+ """
+ def getClientIP(self):
+ """
+ @return: The client address (the first address) in the value of the
+ I{X-Forwarded-For header}. If the header is not present, return
+ C{b"-"}.
+ """
+ return self.requestHeaders.getRawHeaders(
+ b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+
+
+class SynapseRequestFactory(object):
+ def __init__(self, site, x_forwarded_for):
+ self.site = site
+ self.x_forwarded_for = x_forwarded_for
+
+ def __call__(self, *args, **kwargs):
+ if self.x_forwarded_for:
+ return XForwardedForRequest(self.site, *args, **kwargs)
+ else:
+ return SynapseRequest(self.site, *args, **kwargs)
+
+
class SynapseSite(Site):
"""
Subclass of a twisted http Site that does access logging with python's
standard logging
"""
- def __init__(self, logger_name, config, resource, *args, **kwargs):
+ def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
Site.__init__(self, resource, *args, **kwargs)
- if config.captcha_ip_origin_is_x_forwarded:
- self._log_formatter = proxiedLogFormatter
- else:
- self._log_formatter = combinedLogFormatter
+
+ self.site_tag = site_tag
+
+ proxied = config.get("x_forwarded", False)
+ self.requestFactory = SynapseRequestFactory(self, proxied)
self.access_logger = logging.getLogger(logger_name)
def log(self, request):
- line = self._log_formatter(self._logDateTime, request)
- self.access_logger.info(line)
+ pass
+
+
+def create_resource_tree(desired_tree, redirect_root_to_web_client=True):
+ """Create the resource tree for this Home Server.
+
+ This in unduly complicated because Twisted does not support putting
+ child resources more than 1 level deep at a time.
+
+ Args:
+ web_client (bool): True to enable the web client.
+ redirect_root_to_web_client (bool): True to redirect '/' to the
+ location of the web client. This does nothing if web_client is not
+ True.
+ """
+ if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree:
+ root_resource = RootRedirect(WEB_CLIENT_PREFIX)
+ else:
+ root_resource = Resource()
+
+ # ideally we'd just use getChild and putChild but getChild doesn't work
+ # unless you give it a Request object IN ADDITION to the name :/ So
+ # instead, we'll store a copy of this mapping so we can actually add
+ # extra resources to existing nodes. See self._resource_id for the key.
+ resource_mappings = {}
+ for full_path, res in desired_tree.items():
+ logger.info("Attaching %s to path %s", res, full_path)
+ last_resource = root_resource
+ for path_seg in full_path.split('/')[1:-1]:
+ if path_seg not in last_resource.listNames():
+ # resource doesn't exist, so make a "dummy resource"
+ child_resource = Resource()
+ last_resource.putChild(path_seg, child_resource)
+ res_id = _resource_id(last_resource, path_seg)
+ resource_mappings[res_id] = child_resource
+ last_resource = child_resource
+ else:
+ # we have an existing Resource, use that instead.
+ res_id = _resource_id(last_resource, path_seg)
+ last_resource = resource_mappings[res_id]
+
+ # ===========================
+ # now attach the actual desired resource
+ last_path_seg = full_path.split('/')[-1]
+
+ # if there is already a resource here, thieve its children and
+ # replace it
+ res_id = _resource_id(last_resource, last_path_seg)
+ if res_id in resource_mappings:
+ # there is a dummy resource at this path already, which needs
+ # to be replaced with the desired resource.
+ existing_dummy_resource = resource_mappings[res_id]
+ for child_name in existing_dummy_resource.listNames():
+ child_res_id = _resource_id(
+ existing_dummy_resource, child_name
+ )
+ child_resource = resource_mappings[child_res_id]
+ # steal the children
+ res.putChild(child_name, child_resource)
+
+ # finally, insert the desired resource in the right place
+ last_resource.putChild(last_path_seg, res)
+ res_id = _resource_id(last_resource, last_path_seg)
+ resource_mappings[res_id] = res
+
+ return root_resource
+
+
+def _resource_id(resource, path_seg):
+ """Construct an arbitrary resource ID so you can retrieve the mapping
+ later.
+
+ If you want to represent resource A putChild resource B with path C,
+ the mapping should looks like _resource_id(A,C) = B.
+
+ Args:
+ resource (Resource): The *parent* Resource
+ path_seg (str): The name of the child Resource to be attached.
+ Returns:
+ str: A unique string which can be a key to the child Resource.
+ """
+ return "%s-%s" % (resource, path_seg)
def run(hs):
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index d4163d6272..d483c67c6a 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -148,7 +148,7 @@ class Config(object):
if not config_args.config_path:
config_parser.error(
"Must supply a config file.\nA config file can be automatically"
- " generated using \"--generate-config -h SERVER_NAME"
+ " generated using \"--generate-config -H SERVER_NAME"
" -c CONFIG-FILE\""
)
@@ -209,7 +209,7 @@ class Config(object):
if not config_args.config_path:
config_parser.error(
"Must supply a config file.\nA config file can be automatically"
- " generated using \"--generate-config -h SERVER_NAME"
+ " generated using \"--generate-config -H SERVER_NAME"
" -c CONFIG-FILE\""
)
diff --git a/synapse/config/captcha.py b/synapse/config/captcha.py
index ba221121cb..cf72dc4340 100644
--- a/synapse/config/captcha.py
+++ b/synapse/config/captcha.py
@@ -21,10 +21,6 @@ class CaptchaConfig(Config):
self.recaptcha_private_key = config["recaptcha_private_key"]
self.recaptcha_public_key = config["recaptcha_public_key"]
self.enable_registration_captcha = config["enable_registration_captcha"]
- # XXX: This is used for more than just captcha
- self.captcha_ip_origin_is_x_forwarded = (
- config["captcha_ip_origin_is_x_forwarded"]
- )
self.captcha_bypass_secret = config.get("captcha_bypass_secret")
self.recaptcha_siteverify_api = config["recaptcha_siteverify_api"]
@@ -43,10 +39,6 @@ class CaptchaConfig(Config):
# public/private key.
enable_registration_captcha: False
- # When checking captchas, use the X-Forwarded-For (XFF) header
- # as the client IP and not the actual client IP.
- captcha_ip_origin_is_x_forwarded: False
-
# A secret key used to bypass the captcha test entirely.
#captcha_bypass_secret: "YOUR_SECRET_HERE"
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index 0cfb30ce7f..ae5a691527 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -28,10 +28,4 @@ class MetricsConfig(Config):
# Enable collection and rendering of performance metrics
enable_metrics: False
-
- # Separate port to accept metrics requests on
- # metrics_port: 8081
-
- # Which host to bind the metric listener to
- # metrics_bind_host: 127.0.0.1
"""
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index adaf4e4bb2..6891abd71d 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -21,13 +21,18 @@ class ContentRepositoryConfig(Config):
self.max_upload_size = self.parse_size(config["max_upload_size"])
self.max_image_pixels = self.parse_size(config["max_image_pixels"])
self.media_store_path = self.ensure_directory(config["media_store_path"])
+ self.uploads_path = self.ensure_directory(config["uploads_path"])
def default_config(self, config_dir_path, server_name):
media_store = self.default_path("media_store")
+ uploads_path = self.default_path("uploads")
return """
# Directory where uploaded images and attachments are stored.
media_store_path: "%(media_store)s"
+ # Directory where in-progress uploads are stored.
+ uploads_path: "%(uploads_path)s"
+
# The largest allowed upload size in bytes
max_upload_size: "10M"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index d0c8fb8f3c..f4d4a87103 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -20,26 +20,97 @@ class ServerConfig(Config):
def read_config(self, config):
self.server_name = config["server_name"]
- self.bind_port = config["bind_port"]
- self.bind_host = config["bind_host"]
- self.unsecure_port = config["unsecure_port"]
- self.manhole = config.get("manhole")
self.pid_file = self.abspath(config.get("pid_file"))
self.web_client = config["web_client"]
self.soft_file_limit = config["soft_file_limit"]
self.daemonize = config.get("daemonize")
self.use_frozen_dicts = config.get("use_frozen_dicts", True)
- self.gzip_responses = config["gzip_responses"]
+
+ self.listeners = config.get("listeners", [])
+
+ bind_port = config.get("bind_port")
+ if bind_port:
+ self.listeners = []
+ bind_host = config.get("bind_host", "")
+ gzip_responses = config.get("gzip_responses", True)
+
+ names = ["client", "webclient"] if self.web_client else ["client"]
+
+ self.listeners.append({
+ "port": bind_port,
+ "bind_address": bind_host,
+ "tls": True,
+ "type": "http",
+ "resources": [
+ {
+ "names": names,
+ "compress": gzip_responses,
+ },
+ {
+ "names": ["federation"],
+ "compress": False,
+ }
+ ]
+ })
+
+ unsecure_port = config.get("unsecure_port", bind_port - 400)
+ if unsecure_port:
+ self.listeners.append({
+ "port": unsecure_port,
+ "bind_address": bind_host,
+ "tls": False,
+ "type": "http",
+ "resources": [
+ {
+ "names": names,
+ "compress": gzip_responses,
+ },
+ {
+ "names": ["federation"],
+ "compress": False,
+ }
+ ]
+ })
+
+ manhole = config.get("manhole")
+ if manhole:
+ self.listeners.append({
+ "port": manhole,
+ "bind_address": "127.0.0.1",
+ "type": "manhole",
+ })
+
+ metrics_port = config.get("metrics_port")
+ if metrics_port:
+ self.listeners.append({
+ "port": metrics_port,
+ "bind_address": config.get("metrics_bind_host", "127.0.0.1"),
+ "tls": False,
+ "type": "http",
+ "resources": [
+ {
+ "names": ["metrics"],
+ "compress": False,
+ },
+ ]
+ })
# Attempt to guess the content_addr for the v0 content repostitory
content_addr = config.get("content_addr")
if not content_addr:
+ for listener in self.listeners:
+ if listener["type"] == "http" and not listener.get("tls", False):
+ unsecure_port = listener["port"]
+ break
+ else:
+ raise RuntimeError("Could not determine 'content_addr'")
+
host = self.server_name
if ':' not in host:
- host = "%s:%d" % (host, self.unsecure_port)
+ host = "%s:%d" % (host, unsecure_port)
else:
host = host.split(':')[0]
- host = "%s:%d" % (host, self.unsecure_port)
+ host = "%s:%d" % (host, unsecure_port)
content_addr = "http://%s" % (host,)
self.content_addr = content_addr
@@ -61,18 +132,6 @@ class ServerConfig(Config):
# e.g. matrix.org, localhost:8080, etc.
server_name: "%(server_name)s"
- # The port to listen for HTTPS requests on.
- # For when matrix traffic is sent directly to synapse.
- bind_port: %(bind_port)s
-
- # The port to listen for HTTP requests on.
- # For when matrix traffic passes through loadbalancer that unwraps TLS.
- unsecure_port: %(unsecure_port)s
-
- # Local interface to listen on.
- # The empty string will cause synapse to listen on all interfaces.
- bind_host: ""
-
# When running as a daemon, the file to store the pid in
pid_file: %(pid_file)s
@@ -84,14 +143,64 @@ class ServerConfig(Config):
# hard limit.
soft_file_limit: 0
- # Turn on the twisted telnet manhole service on localhost on the given
- # port.
- #manhole: 9000
-
- # Should synapse compress HTTP responses to clients that support it?
- # This should be disabled if running synapse behind a load balancer
- # that can do automatic compression.
- gzip_responses: True
+ # List of ports that Synapse should listen on, their purpose and their
+ # configuration.
+ listeners:
+ # Main HTTPS listener
+ # For when matrix traffic is sent directly to synapse.
+ -
+ # The port to listen for HTTPS requests on.
+ port: %(bind_port)s
+
+ # Local interface to listen on.
+ # The empty string will cause synapse to listen on all interfaces.
+ bind_address: ''
+
+ # This is a 'http' listener, allows us to specify 'resources'.
+ type: http
+
+ tls: true
+
+ # Use the X-Forwarded-For (XFF) header as the client IP and not the
+ # actual client IP.
+ x_forwarded: false
+
+ # List of HTTP resources to serve on this listener.
+ resources:
+ -
+ # List of resources to host on this listener.
+ names:
+ - client # The client-server APIs, both v1 and v2
+ - webclient # The bundled webclient.
+
+ # Should synapse compress HTTP responses to clients that support it?
+ # This should be disabled if running synapse behind a load balancer
+ # that can do automatic compression.
+ compress: true
+
+ - names: [federation] # Federation APIs
+ compress: false
+
+ # Unsecure HTTP listener,
+ # For when matrix traffic passes through loadbalancer that unwraps TLS.
+ - port: %(unsecure_port)s
+ tls: false
+ bind_address: ''
+ type: http
+
+ x_forwarded: false
+
+ resources:
+ - names: [client, webclient]
+ compress: true
+ - names: [federation]
+ compress: false
+
+ # Turn on the twisted telnet manhole service on localhost on the given
+ # port.
+ # - port: 9000
+ # bind_address: 127.0.0.1
+ # type: manhole
""" % locals()
def read_arguments(self, args):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 31190e700a..bad93c6b2f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -94,6 +94,7 @@ class TransportLayerServer(object):
yield self.keyring.verify_json_for_server(origin, json_request)
logger.info("Request from %s", origin)
+ request.authenticated_entity = origin
defer.returnValue((origin, content))
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 8269482e47..1240e51649 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -177,7 +177,7 @@ class ApplicationServicesHandler(object):
return
user_info = yield self.store.get_user_by_id(user_id)
- if not user_info:
+ if user_info:
defer.returnValue(False)
return
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 867fdbefb0..e324662f18 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -380,15 +380,6 @@ class MessageHandler(BaseHandler):
if limit is None:
limit = 10
- messages, token = yield self.store.get_recent_events_for_room(
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
-
- start_token = now_token.copy_and_replace("room_key", token[0])
- end_token = now_token.copy_and_replace("room_key", token[1])
-
room_members = [
m for m in current_state.values()
if m.type == EventTypes.Member
@@ -396,19 +387,38 @@ class MessageHandler(BaseHandler):
]
presence_handler = self.hs.get_handlers().presence_handler
- presence = []
- for m in room_members:
- try:
- member_presence = yield presence_handler.get_state(
- target_user=UserID.from_string(m.user_id),
- auth_user=auth_user,
- as_event=True,
- )
- presence.append(member_presence)
- except SynapseError:
- logger.exception(
- "Failed to get member presence of %r", m.user_id
+
+ @defer.inlineCallbacks
+ def get_presence():
+ presence_defs = yield defer.DeferredList(
+ [
+ presence_handler.get_state(
+ target_user=UserID.from_string(m.user_id),
+ auth_user=auth_user,
+ as_event=True,
+ check_auth=False,
+ )
+ for m in room_members
+ ],
+ consumeErrors=True,
+ )
+
+ defer.returnValue([p for success, p in presence_defs if success])
+
+ presence, (messages, token) = yield defer.gatherResults(
+ [
+ get_presence(),
+ self.store.get_recent_events_for_room(
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
)
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ start_token = now_token.copy_and_replace("room_key", token[0])
+ end_token = now_token.copy_and_replace("room_key", token[1])
time_now = self.clock.time_msec()
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 023ad33ab0..7c03198313 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -191,24 +191,24 @@ class PresenceHandler(BaseHandler):
defer.returnValue(False)
@defer.inlineCallbacks
- def get_state(self, target_user, auth_user, as_event=False):
+ def get_state(self, target_user, auth_user, as_event=False, check_auth=True):
if self.hs.is_mine(target_user):
- visible = yield self.is_presence_visible(
- observer_user=auth_user,
- observed_user=target_user
- )
+ if check_auth:
+ visible = yield self.is_presence_visible(
+ observer_user=auth_user,
+ observed_user=target_user
+ )
- if not visible:
- raise SynapseError(404, "Presence information not visible")
- state = yield self.store.get_presence_state(target_user.localpart)
- if "mtime" in state:
- del state["mtime"]
- state["presence"] = state.pop("state")
+ if not visible:
+ raise SynapseError(404, "Presence information not visible")
if target_user in self._user_cachemap:
- cached_state = self._user_cachemap[target_user].get_state()
- if "last_active" in cached_state:
- state["last_active"] = cached_state["last_active"]
+ state = self._user_cachemap[target_user].get_state()
+ else:
+ state = yield self.store.get_presence_state(target_user.localpart)
+ if "mtime" in state:
+ del state["mtime"]
+ state["presence"] = state.pop("state")
else:
# TODO(paul): Have remote server send us permissions set
state = self._get_or_offline_usercache(target_user).get_state()
diff --git a/synapse/http/client.py b/synapse/http/client.py
index e746f2416e..49737d55da 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -61,21 +61,31 @@ class SimpleHttpClient(object):
self.agent = Agent(reactor, pool=pool)
self.version_string = hs.version_string
- def request(self, method, *args, **kwargs):
+ def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
d = preserve_context_over_fn(
self.agent.request,
- method, *args, **kwargs
+ method, uri, *args, **kwargs
)
+ logger.info("Sending request %s %s", method, uri)
+
def _cb(response):
incoming_responses_counter.inc(method, response.code)
+ logger.info(
+ "Received response to %s %s: %s",
+ method, uri, response.code
+ )
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
+ logger.info(
+ "Error sending request to %s %s: %s %s",
+ method, uri, failure.type, failure.getErrorMessage()
+ )
return failure
d.addCallbacks(_cb, _eb)
@@ -84,7 +94,9 @@ class SimpleHttpClient(object):
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}):
+ # TODO: Do we ever want to log message contents?
logger.debug("post_urlencoded_get_json args: %s", args)
+
query_bytes = urllib.urlencode(args, True)
response = yield self.request(
@@ -97,7 +109,7 @@ class SimpleHttpClient(object):
bodyProducer=FileBodyProducer(StringIO(query_bytes))
)
- body = yield readBody(response)
+ body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -105,7 +117,7 @@ class SimpleHttpClient(object):
def post_json_get_json(self, uri, post_json):
json_str = encode_canonical_json(post_json)
- logger.info("HTTP POST %s -> %s", json_str, uri)
+ logger.debug("HTTP POST %s -> %s", json_str, uri)
response = yield self.request(
"POST",
@@ -116,7 +128,7 @@ class SimpleHttpClient(object):
bodyProducer=FileBodyProducer(StringIO(json_str))
)
- body = yield readBody(response)
+ body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -149,7 +161,7 @@ class SimpleHttpClient(object):
})
)
- body = yield readBody(response)
+ body = yield preserve_context_over_fn(readBody, response)
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
@@ -192,7 +204,7 @@ class SimpleHttpClient(object):
bodyProducer=FileBodyProducer(StringIO(json_str))
)
- body = yield readBody(response)
+ body = yield preserve_context_over_fn(readBody, response)
if 200 <= response.code < 300:
defer.returnValue(json.loads(body))
@@ -226,7 +238,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
)
try:
- body = yield readBody(response)
+ body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(body)
except PartialDownloadError as e:
# twisted dislikes google's response, no content length.
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 7f3d8fc884..ed47e701e7 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json
import simplejson as json
import logging
+import sys
import urllib
import urlparse
logger = logging.getLogger(__name__)
+outbound_logger = logging.getLogger("synapse.http.outbound")
metrics = synapse.metrics.get_metrics_for(__name__)
@@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self.version_string = hs.version_string
+ self._next_id = 1
+
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
@@ -123,88 +127,98 @@ class MatrixFederationHttpClient(object):
("", "", path_bytes, param_bytes, query_bytes, "",)
)
- logger.info("Sending request to %s: %s %s",
- destination, method, url_bytes)
+ txn_id = "%s-O-%s" % (method, self._next_id)
+ self._next_id = (self._next_id + 1) % (sys.maxint - 1)
- logger.debug(
- "Types: %s",
- [
- type(destination), type(method), type(path_bytes),
- type(param_bytes),
- type(query_bytes)
- ]
+ outbound_logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ txn_id, destination, method, url_bytes
)
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
retries_left = 5
- endpoint = self._getEndpoint(reactor, destination)
-
- while True:
- producer = None
- if body_callback:
- producer = body_callback(method, url_bytes, headers_dict)
-
- try:
- request_deferred = preserve_context_over_fn(
- self.agent.request,
- destination,
- endpoint,
- method,
- path_bytes,
- param_bytes,
- query_bytes,
- Headers(headers_dict),
- producer
- )
+ endpoint = preserve_context_over_fn(
+ self._getEndpoint, reactor, destination
+ )
- response = yield self.clock.time_bound_deferred(
- request_deferred,
- time_out=timeout/1000. if timeout else 60,
- )
+ log_result = None
+ try:
+ while True:
+ producer = None
+ if body_callback:
+ producer = body_callback(method, url_bytes, headers_dict)
+
+ try:
+ def send_request():
+ request_deferred = self.agent.request(
+ destination,
+ endpoint,
+ method,
+ path_bytes,
+ param_bytes,
+ query_bytes,
+ Headers(headers_dict),
+ producer
+ )
+
+ return self.clock.time_bound_deferred(
+ request_deferred,
+ time_out=timeout/1000. if timeout else 60,
+ )
+
+ response = yield preserve_context_over_fn(
+ send_request,
+ )
+
+ log_result = "%d %s" % (response.code, response.phrase,)
+ break
+ except Exception as e:
+ if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+ logger.warn(
+ "DNS Lookup failed to %s with %s",
+ destination,
+ e
+ )
+ log_result = "DNS Lookup failed to %s with %s" % (
+ destination, e
+ )
+ raise
- logger.debug("Got response to %s", method)
- break
- except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
- "DNS Lookup failed to %s with %s",
+ "{%s} Sending request failed to %s: %s %s: %s - %s",
+ txn_id,
destination,
- e
+ method,
+ url_bytes,
+ type(e).__name__,
+ _flatten_response_never_received(e),
)
- raise
-
- logger.warn(
- "Sending request failed to %s: %s %s: %s - %s",
- destination,
- method,
- url_bytes,
- type(e).__name__,
- _flatten_response_never_received(e),
- )
- if retries_left and not timeout:
- yield sleep(2 ** (5 - retries_left))
- retries_left -= 1
- else:
- raise
-
- logger.info(
- "Received response %d %s for %s: %s %s",
- response.code,
- response.phrase,
- destination,
- method,
- url_bytes
- )
+ log_result = "%s - %s" % (
+ type(e).__name__, _flatten_response_never_received(e),
+ )
+
+ if retries_left and not timeout:
+ yield sleep(2 ** (5 - retries_left))
+ retries_left -= 1
+ else:
+ raise
+ finally:
+ outbound_logger.info(
+ "{%s} [%s] Result: %s",
+ txn_id,
+ destination,
+ log_result,
+ )
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
- body = yield readBody(response)
+ body = yield preserve_context_over_fn(readBody, response)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -284,10 +298,7 @@ class MatrixFederationHttpClient(object):
"Content-Type not application/json"
)
- logger.debug("Getting resp body")
- body = yield readBody(response)
- logger.debug("Got resp body")
-
+ body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
@@ -330,9 +341,7 @@ class MatrixFederationHttpClient(object):
"Content-Type not application/json"
)
- logger.debug("Getting resp body")
- body = yield readBody(response)
- logger.debug("Got resp body")
+ body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -390,9 +399,7 @@ class MatrixFederationHttpClient(object):
"Content-Type not application/json"
)
- logger.debug("Getting resp body")
- body = yield readBody(response)
- logger.debug("Got resp body")
+ body = yield preserve_context_over_fn(readBody, response)
defer.returnValue(json.loads(body))
@@ -435,7 +442,10 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
- length = yield _readBodyToFile(response, output_stream, max_size)
+ length = yield preserve_context_over_fn(
+ _readBodyToFile,
+ response, output_stream, max_size
+ )
except:
logger.exception("Failed to download body")
raise
diff --git a/synapse/http/server.py b/synapse/http/server.py
index ae8f3b3972..807ff95c65 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -79,53 +79,39 @@ def request_handler(request_handler):
_next_request_id += 1
with LoggingContext(request_id) as request_context:
request_context.request = request_id
- code = None
- start = self.clock.time_msec()
- try:
- logger.info(
- "Received request: %s %s",
- request.method, request.path
- )
- d = request_handler(self, request)
- with PreserveLoggingContext():
- yield d
- code = request.code
- except CodeMessageException as e:
- code = e.code
- if isinstance(e, SynapseError):
- logger.info(
- "%s SynapseError: %s - %s", request, code, e.msg
+ with request.processing():
+ try:
+ d = request_handler(self, request)
+ with PreserveLoggingContext():
+ yield d
+ except CodeMessageException as e:
+ code = e.code
+ if isinstance(e, SynapseError):
+ logger.info(
+ "%s SynapseError: %s - %s", request, code, e.msg
+ )
+ else:
+ logger.exception(e)
+ outgoing_responses_counter.inc(request.method, str(code))
+ respond_with_json(
+ request, code, cs_exception(e), send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ version_string=self.version_string,
+ )
+ except:
+ logger.exception(
+ "Failed handle request %s.%s on %r: %r",
+ request_handler.__module__,
+ request_handler.__name__,
+ self,
+ request
+ )
+ respond_with_json(
+ request,
+ 500,
+ {"error": "Internal server error"},
+ send_cors=True
)
- else:
- logger.exception(e)
- outgoing_responses_counter.inc(request.method, str(code))
- respond_with_json(
- request, code, cs_exception(e), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- )
- except:
- code = 500
- logger.exception(
- "Failed handle request %s.%s on %r: %r",
- request_handler.__module__,
- request_handler.__name__,
- self,
- request
- )
- respond_with_json(
- request,
- 500,
- {"error": "Internal server error"},
- send_cors=True
- )
- finally:
- code = str(code) if code else "-"
- end = self.clock.time_msec()
- logger.info(
- "Processed request: %dms %s %s %s",
- end-start, code, request.method, request.path
- )
return wrapped_request_handler
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 078abfc56d..bdd03dcbe8 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, ObservableDeferred
from synapse.types import StreamToken
import synapse.metrics
@@ -45,21 +45,11 @@ class _NotificationListener(object):
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
+ __slots__ = ["deferred"]
def __init__(self, deferred):
self.deferred = deferred
- def notified(self):
- return self.deferred.called
-
- def notify(self, token):
- """ Inform whoever is listening about the new events.
- """
- try:
- self.deferred.callback(token)
- except defer.AlreadyCalledError:
- pass
-
class _NotifierUserStream(object):
"""This represents a user connected to the event stream.
@@ -75,11 +65,12 @@ class _NotifierUserStream(object):
appservice=None):
self.user = str(user)
self.appservice = appservice
- self.listeners = set()
self.rooms = set(rooms)
self.current_token = current_token
self.last_notified_ms = time_now_ms
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
+
def notify(self, stream_key, stream_id, time_now_ms):
"""Notify any listeners for this user of a new event from an
event source.
@@ -91,12 +82,10 @@ class _NotifierUserStream(object):
self.current_token = self.current_token.copy_and_advance(
stream_key, stream_id
)
- if self.listeners:
- self.last_notified_ms = time_now_ms
- listeners = self.listeners
- self.listeners = set()
- for listener in listeners:
- listener.notify(self.current_token)
+ self.last_notified_ms = time_now_ms
+ noify_deferred = self.notify_deferred
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
+ noify_deferred.callback(self.current_token)
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
@@ -114,6 +103,18 @@ class _NotifierUserStream(object):
self.appservice, set()
).discard(self)
+ def count_listeners(self):
+ return len(self.notify_deferred.observers())
+
+ def new_listener(self, token):
+ """Returns a deferred that is resolved when there is a new token
+ greater than the given token.
+ """
+ if self.current_token.is_after(token):
+ return _NotificationListener(defer.succeed(self.current_token))
+ else:
+ return _NotificationListener(self.notify_deferred.observe())
+
class Notifier(object):
""" This class is responsible for notifying any listeners when there are
@@ -158,7 +159,7 @@ class Notifier(object):
for x in self.appservice_to_user_streams.values():
all_user_streams |= x
- return sum(len(stream.listeners) for stream in all_user_streams)
+ return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
metrics.register_callback(
@@ -286,10 +287,6 @@ class Notifier(object):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
-
- deferred = defer.Deferred()
- time_now_ms = self.clock.time_msec()
-
user = str(user)
user_stream = self.user_to_user_stream.get(user)
if user_stream is None:
@@ -302,55 +299,44 @@ class Notifier(object):
rooms=rooms,
appservice=appservice,
current_token=current_token,
- time_now_ms=time_now_ms,
+ time_now_ms=self.clock.time_msec(),
)
self._register_with_keys(user_stream)
+
+ result = None
+ if timeout:
+ # Will be set to a _NotificationListener that we'll be waiting on.
+ # Allows us to cancel it.
+ listener = None
+
+ def timed_out():
+ if listener:
+ listener.deferred.cancel()
+ timer = self.clock.call_later(timeout/1000., timed_out)
+
+ prev_token = from_token
+ while not result:
+ try:
+ current_token = user_stream.current_token
+
+ result = yield callback(prev_token, current_token)
+ if result:
+ break
+
+ # Now we wait for the _NotifierUserStream to be told there
+ # is a new token.
+ # We need to supply the token we supplied to callback so
+ # that we don't miss any current_token updates.
+ prev_token = current_token
+ listener = user_stream.new_listener(prev_token)
+ yield listener.deferred
+ except defer.CancelledError:
+ break
+
+ self.clock.cancel_call_later(timer, ignore_errs=True)
else:
current_token = user_stream.current_token
-
- listener = [_NotificationListener(deferred)]
-
- if timeout and not current_token.is_after(from_token):
- user_stream.listeners.add(listener[0])
-
- if current_token.is_after(from_token):
result = yield callback(from_token, current_token)
- else:
- result = None
-
- timer = [None]
-
- if result:
- user_stream.listeners.discard(listener[0])
- defer.returnValue(result)
- return
-
- if timeout:
- timed_out = [False]
-
- def _timeout_listener():
- timed_out[0] = True
- timer[0] = None
- user_stream.listeners.discard(listener[0])
- listener[0].notify(current_token)
-
- # We create multiple notification listeners so we have to manage
- # canceling the timeout ourselves.
- timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
-
- while not result and not timed_out[0]:
- new_token = yield deferred
- deferred = defer.Deferred()
- listener[0] = _NotificationListener(deferred)
- user_stream.listeners.add(listener[0])
- result = yield callback(current_token, new_token)
- current_token = new_token
-
- if timer[0] is not None:
- try:
- self.clock.cancel_call_later(timer[0])
- except:
- logger.exception("Failed to cancel notifer timer")
defer.returnValue(result)
@@ -368,6 +354,9 @@ class Notifier(object):
@defer.inlineCallbacks
def check_for_updates(before_token, after_token):
+ if not after_token.is_after(before_token):
+ defer.returnValue(None)
+
events = []
end_token = from_token
for name, source in self.event_sources.sources.items():
@@ -402,7 +391,7 @@ class Notifier(object):
expired_streams = []
expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
for stream in self.user_to_user_stream.values():
- if stream.listeners:
+ if stream.count_listeners():
continue
if stream.last_notified_ms < expire_before_ts:
expired_streams.append(stream)
diff --git a/synapse/rest/client/v1/transactions.py b/synapse/rest/client/v1/transactions.py
index d933fea18a..b861069b89 100644
--- a/synapse/rest/client/v1/transactions.py
+++ b/synapse/rest/client/v1/transactions.py
@@ -39,10 +39,10 @@ class HttpTransactionStore(object):
A tuple of (HTTP response code, response content) or None.
"""
try:
- logger.debug("get_response Key: %s TxnId: %s", key, txn_id)
+ logger.debug("get_response TxnId: %s", txn_id)
(last_txn_id, response) = self.transactions[key]
if txn_id == last_txn_id:
- logger.info("get_response: Returning a response for %s", key)
+ logger.info("get_response: Returning a response for %s", txn_id)
return response
except KeyError:
pass
@@ -58,7 +58,7 @@ class HttpTransactionStore(object):
txn_id (str): The transaction ID for this request.
response (tuple): A tuple of (HTTP response code, response content)
"""
- logger.debug("store_response Key: %s TxnId: %s", key, txn_id)
+ logger.debug("store_response TxnId: %s", txn_id)
self.transactions[key] = (txn_id, response)
def store_client_transaction(self, request, txn_id, response):
diff --git a/synapse/server.py b/synapse/server.py
index 8b3dc675cc..4d1fb1cbf6 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -132,16 +132,8 @@ class BaseHomeServer(object):
setattr(BaseHomeServer, "get_%s" % (depname), _get)
def get_ip_from_request(self, request):
- # May be an X-Forwarding-For header depending on config
- ip_addr = request.getClientIP()
- if self.config.captcha_ip_origin_is_x_forwarded:
- # use the header
- if request.requestHeaders.hasHeader("X-Forwarded-For"):
- ip_addr = request.requestHeaders.getRawHeaders(
- "X-Forwarded-For"
- )[0]
-
- return ip_addr
+ # X-Forwarded-For is handled by our custom request type.
+ return request.getClientIP()
def is_mine(self, domain_specific_string):
return domain_specific_string.domain == self.hostname
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 260714ccc2..07ff25cef3 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -91,8 +91,12 @@ class Clock(object):
with PreserveLoggingContext():
return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
- def cancel_call_later(self, timer):
- timer.cancel()
+ def cancel_call_later(self, timer, ignore_errs=False):
+ try:
+ timer.cancel()
+ except:
+ if not ignore_errs:
+ raise
def time_bound_deferred(self, given_deferred, time_out):
if given_deferred.called:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1c2044e5b4..5a1d545c96 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -38,6 +38,9 @@ class ObservableDeferred(object):
deferred.
If consumeErrors is true errors will be captured from the origin deferred.
+
+ Cancelling or otherwise resolving an observer will not affect the original
+ ObservableDeferred.
"""
__slots__ = ["_deferred", "_observers", "_result"]
@@ -45,7 +48,7 @@ class ObservableDeferred(object):
def __init__(self, deferred, consumeErrors=False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
- object.__setattr__(self, "_observers", [])
+ object.__setattr__(self, "_observers", set())
def callback(r):
self._result = (True, r)
@@ -74,12 +77,21 @@ class ObservableDeferred(object):
def observe(self):
if not self._result:
d = defer.Deferred()
- self._observers.append(d)
+
+ def remove(r):
+ self._observers.discard(d)
+ return r
+ d.addBoth(remove)
+
+ self._observers.add(d)
return d
else:
success, res = self._result
return defer.succeed(res) if success else defer.fail(res)
+ def observers(self):
+ return self._observers
+
def __getattr__(self, name):
return getattr(self._deferred, name)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a92d518b43..7e6062c1b8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -140,6 +140,37 @@ class PreserveLoggingContext(object):
)
+class _PreservingContextDeferred(defer.Deferred):
+ """A deferred that ensures that all callbacks and errbacks are called with
+ the given logging context.
+ """
+ def __init__(self, context):
+ self._log_context = context
+ defer.Deferred.__init__(self)
+
+ def addCallbacks(self, callback, errback=None,
+ callbackArgs=None, callbackKeywords=None,
+ errbackArgs=None, errbackKeywords=None):
+ callback = self._wrap_callback(callback)
+ errback = self._wrap_callback(errback)
+ return defer.Deferred.addCallbacks(
+ self, callback,
+ errback=errback,
+ callbackArgs=callbackArgs,
+ callbackKeywords=callbackKeywords,
+ errbackArgs=errbackArgs,
+ errbackKeywords=errbackKeywords,
+ )
+
+ def _wrap_callback(self, f):
+ def g(res, *args, **kwargs):
+ with PreserveLoggingContext():
+ LoggingContext.thread_local.current_context = self._log_context
+ res = f(res, *args, **kwargs)
+ return res
+ return g
+
+
def preserve_context_over_fn(fn, *args, **kwargs):
"""Takes a function and invokes it with the given arguments, but removes
and restores the current logging context while doing so.
@@ -160,24 +191,7 @@ def preserve_context_over_deferred(deferred):
"""Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context.
"""
- d = defer.Deferred()
-
current_context = LoggingContext.current_context()
-
- def cb(res):
- with PreserveLoggingContext():
- LoggingContext.thread_local.current_context = current_context
- res = d.callback(res)
- return res
-
- def eb(failure):
- with PreserveLoggingContext():
- LoggingContext.thread_local.current_context = current_context
- res = d.errback(failure)
- return res
-
- if deferred.called:
- return deferred
-
- deferred.addCallbacks(cb, eb)
+ d = _PreservingContextDeferred(current_context)
+ deferred.chainDeferred(d)
return d
|