diff --git a/synapse/__init__.py b/synapse/__init__.py
index 119359be68..cf22fabd61 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -35,4 +35,4 @@ try:
except ImportError:
pass
-__version__ = "1.0.0"
+__version__ = "1.1.0"
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index d50a9840d4..1ebb7ae539 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -27,7 +27,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
from synapse.app import check_bind_error
from synapse.crypto import context_factory
-from synapse.util import PreserveLoggingContext
+from synapse.logging.context import PreserveLoggingContext
from synapse.util.async_helpers import Linearizer
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -93,33 +93,36 @@ def start_reactor(
install_dns_limiter(reactor)
def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
-
- change_resource_limit(soft_file_limit)
- if gc_thresholds:
- gc.set_threshold(*gc_thresholds)
- reactor.run()
-
- if daemonize:
- if print_pidfile:
- print(pid_file)
-
- daemon = Daemonize(
- app=appname,
- pid=pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ logger.info("Running")
+ change_resource_limit(soft_file_limit)
+ if gc_thresholds:
+ gc.set_threshold(*gc_thresholds)
+ reactor.run()
+
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ #
+ # We also need to drop the logcontext before forking if we're daemonizing,
+ # otherwise the cputime metrics get confused about the per-thread resource usage
+ # appearing to go backwards.
+ with PreserveLoggingContext():
+ if daemonize:
+ if print_pidfile:
+ print(pid_file)
+
+ daemon = Daemonize(
+ app=appname,
+ pid=pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
def quit_with_error(error_string):
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9120bdb143..be44249ed6 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -26,6 +26,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
@@ -36,7 +37,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 90bc79cdda..ff11beca82 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
@@ -64,7 +65,6 @@ from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index ff522e4499..cacad25eac 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
@@ -59,7 +60,6 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 9421420930..11e80dbae0 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -28,6 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
@@ -48,7 +49,6 @@ from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 969be58d0b..97da7bdcbf 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
@@ -44,7 +45,6 @@ from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 2fd7d57ebf..417a10bbd2 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -29,6 +29,7 @@ from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
@@ -41,7 +42,6 @@ from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 49da105cf6..639b1429c0 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -54,6 +54,7 @@ from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext
from synapse.metrics import RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
@@ -72,7 +73,6 @@ from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.module_loader import load_module
from synapse.util.rlimit import change_resource_limit
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index cf0e2036c3..f23b9b6eda 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -27,6 +27,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
@@ -40,7 +41,6 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index df29ea5ecb..4f929edf86 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -26,6 +26,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import __func__
@@ -38,7 +39,6 @@ from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 858949910d..de4797fddc 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -31,6 +31,7 @@ from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
@@ -57,7 +58,6 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 2d9d2e1bbc..1177ddd72e 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -28,6 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
+from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
@@ -46,7 +47,6 @@ from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index b54bf5411f..e5b36494f5 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -53,8 +53,8 @@ import logging
from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
+from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.logcontext import run_in_background
logger = logging.getLogger(__name__)
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index cf39936da7..8381b8eb29 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -112,13 +112,17 @@ class EmailConfig(Config):
missing = []
for k in required:
if k not in email_config:
- missing.append(k)
+ missing.append("email." + k)
+
+ if config.get("public_baseurl") is None:
+ missing.append("public_base_url")
if len(missing) > 0:
raise RuntimeError(
- "email.password_reset_behaviour is set to 'local' "
- "but required keys are missing: %s"
- % (", ".join(["email." + k for k in missing]),)
+ "Password resets emails are configured to be sent from "
+ "this homeserver due to a partial 'email' block. "
+ "However, the following required keys are missing: %s"
+ % (", ".join(missing),)
)
# Templates for password reset emails
@@ -156,13 +160,6 @@ class EmailConfig(Config):
filepath, "email.password_reset_template_success_html"
)
- if config.get("public_baseurl") is None:
- raise RuntimeError(
- "email.password_reset_behaviour is set to 'local' but no "
- "public_baseurl is set. This is necessary to generate password "
- "reset links"
- )
-
if self.email_enable_notifs:
required = [
"smtp_host",
@@ -233,11 +230,13 @@ class EmailConfig(Config):
# app_name: Matrix
#
# # Enable email notifications by default
+ # #
# notif_for_new_users: True
#
# # Defining a custom URL for Riot is only needed if email notifications
# # should contain links to a self-hosted installation of Riot; when set
# # the "app_name" setting is ignored
+ # #
# riot_base_url: "http://localhost/riot"
#
# # Enable sending password reset emails via the configured, trusted
@@ -250,16 +249,22 @@ class EmailConfig(Config):
# #
# # If this option is set to false and SMTP options have not been
# # configured, resetting user passwords via email will be disabled
+ # #
# #trust_identity_server_for_password_resets: false
#
# # Configure the time that a validation email or text message code
# # will expire after sending
# #
# # This is currently used for password resets
+ # #
# #validation_token_lifetime: 1h
#
# # Template directory. All template files should be stored within this
- # # directory
+ # # directory. If not set, default templates from within the Synapse
+ # # package will be used
+ # #
+ # # For the list of default templates, please see
+ # # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
# #
# #template_dir: res/templates
#
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 931aec41c0..0f5554211c 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -24,7 +24,7 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
import synapse
from synapse.app import _base as appbase
-from synapse.util.logcontext import LoggingContextFilter
+from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
from ._base import Config
@@ -40,7 +40,7 @@ formatters:
filters:
context:
- (): synapse.util.logcontext.LoggingContextFilter
+ (): synapse.logging.context.LoggingContextFilter
request: ""
handlers:
diff --git a/synapse/config/password.py b/synapse/config/password.py
index 598f84fc0c..d5b5953f2f 100644
--- a/synapse/config/password.py
+++ b/synapse/config/password.py
@@ -26,6 +26,7 @@ class PasswordConfig(Config):
password_config = {}
self.password_enabled = password_config.get("enabled", True)
+ self.password_localdb_enabled = password_config.get("localdb_enabled", True)
self.password_pepper = password_config.get("pepper", "")
def generate_config_section(self, config_dir_path, server_name, **kwargs):
@@ -35,6 +36,12 @@ class PasswordConfig(Config):
#
#enabled: false
+ # Uncomment to disable authentication against the local password
+ # database. This is ignored if `enabled` is false, and is only useful
+ # if you have other password_providers.
+ #
+ #localdb_enabled: false
+
# Uncomment and change to a secret random string for extra security.
# DO NOT CHANGE THIS AFTER INITIAL SETUP!
#
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 8c587f3fd2..33f31cf213 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -23,7 +23,7 @@ class RateLimitConfig(object):
class FederationRateLimitConfig(object):
_items_and_default = {
- "window_size": 10000,
+ "window_size": 1000,
"sleep_limit": 10,
"sleep_delay": 500,
"reject_limit": 50,
@@ -54,7 +54,7 @@ class RatelimitConfig(Config):
# Load the new-style federation config, if it exists. Otherwise, fall
# back to the old method.
- if "federation_rc" in config:
+ if "rc_federation" in config:
self.rc_federation = FederationRateLimitConfig(**config["rc_federation"])
else:
self.rc_federation = FederationRateLimitConfig(
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 4a59e6ec90..b895c4e9f4 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -71,9 +71,8 @@ class RegistrationConfig(Config):
self.default_identity_server = config.get("default_identity_server")
self.allow_guest_access = config.get("allow_guest_access", False)
- self.invite_3pid_guest = self.allow_guest_access and config.get(
- "invite_3pid_guest", False
- )
+ if config.get("invite_3pid_guest", False):
+ raise ConfigError("invite_3pid_guest is no longer supported")
self.auto_join_rooms = config.get("auto_join_rooms", [])
for room_alias in self.auto_join_rooms:
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index 872a1ba934..6a8161547a 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
@@ -25,6 +26,11 @@ class SAML2Config(Config):
if not saml2_config or not saml2_config.get("enabled", True):
return
+ try:
+ check_requirements("saml2")
+ except DependencyException as e:
+ raise ConfigError(e.message)
+
self.saml2_enabled = True
import saml2.config
@@ -37,6 +43,11 @@ class SAML2Config(Config):
if config_path is not None:
self.saml2_sp_config.load_file(config_path)
+ # session lifetime: in milliseconds
+ self.saml2_session_lifetime = self.parse_duration(
+ saml2_config.get("saml_session_lifetime", "5m")
+ )
+
def _default_saml_config_dict(self):
import saml2
@@ -72,6 +83,12 @@ class SAML2Config(Config):
# so it is not normally necessary to specify them unless you need to
# override them.
#
+ # Once SAML support is enabled, a metadata file will be exposed at
+ # https://<server>:<port>/_matrix/saml2/metadata.xml, which you may be able to
+ # use to configure your SAML IdP with. Alternatively, you can manually configure
+ # the IdP to use an ACS location of
+ # https://<server>:<port>/_matrix/saml2/authn_response.
+ #
#saml2_config:
# sp_config:
# # point this to the IdP's metadata. You can use either a local file or
@@ -81,7 +98,15 @@ class SAML2Config(Config):
# remote:
# - url: https://our_idp/metadata.xml
#
- # # The rest of sp_config is just used to generate our metadata xml, and you
+ # # By default, the user has to go to our login page first. If you'd like to
+ # # allow IdP-initiated login, set 'allow_unsolicited: True' in a
+ # # 'service.sp' section:
+ # #
+ # #service:
+ # # sp:
+ # # allow_unsolicited: True
+ #
+ # # The examples below are just used to generate our metadata xml, and you
# # may well not need it, depending on your setup. Alternatively you
# # may need a whole lot more detail - see the pysaml2 docs!
#
@@ -104,6 +129,12 @@ class SAML2Config(Config):
# # separate pysaml2 configuration file:
# #
# config_path: "%(config_dir_path)s/sp_conf.py"
+ #
+ # # the lifetime of a SAML session. This defines how long a user has to
+ # # complete the authentication process, if allow_unsolicited is unset.
+ # # The default is 5 minutes.
+ # #
+ # # saml_session_lifetime: 5m
""" % {
"config_dir_path": config_dir_path
}
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 8fcf801418..ca508a224f 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -23,7 +23,7 @@ import six
from unpaddedbase64 import encode_base64
-from OpenSSL import crypto
+from OpenSSL import SSL, crypto
from twisted.internet._sslverify import Certificate, trustRootFromCertificates
from synapse.config._base import Config, ConfigError
@@ -81,6 +81,27 @@ class TlsConfig(Config):
"federation_verify_certificates", True
)
+ # Minimum TLS version to use for outbound federation traffic
+ self.federation_client_minimum_tls_version = str(
+ config.get("federation_client_minimum_tls_version", 1)
+ )
+
+ if self.federation_client_minimum_tls_version not in ["1", "1.1", "1.2", "1.3"]:
+ raise ConfigError(
+ "federation_client_minimum_tls_version must be one of: 1, 1.1, 1.2, 1.3"
+ )
+
+ # Prevent people shooting themselves in the foot here by setting it to
+ # the biggest number blindly
+ if self.federation_client_minimum_tls_version == "1.3":
+ if getattr(SSL, "OP_NO_TLSv1_3", None) is None:
+ raise ConfigError(
+ (
+ "federation_client_minimum_tls_version cannot be 1.3, "
+ "your OpenSSL does not support it"
+ )
+ )
+
# Whitelist of domains to not verify certificates for
fed_whitelist_entries = config.get(
"federation_certificate_verification_whitelist", []
@@ -261,6 +282,15 @@ class TlsConfig(Config):
#
#federation_verify_certificates: false
+ # The minimum TLS version that will be used for outbound federation requests.
+ #
+ # Defaults to `1`. Configurable to `1`, `1.1`, `1.2`, or `1.3`. Note
+ # that setting this value higher than `1.2` will prevent federation to most
+ # of the public Matrix network: only configure it to `1.3` if you have an
+ # entirely private federation setup and you can ensure TLS 1.3 support.
+ #
+ #federation_client_minimum_tls_version: 1.2
+
# Skip federation certificate verification on the following whitelist
# of domains.
#
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 2bc5cc3807..4f48e8e88d 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -24,12 +24,25 @@ from OpenSSL import SSL, crypto
from twisted.internet._sslverify import _defaultCurveName
from twisted.internet.abstract import isIPAddress, isIPv6Address
from twisted.internet.interfaces import IOpenSSLClientConnectionCreator
-from twisted.internet.ssl import CertificateOptions, ContextFactory, platformTrust
+from twisted.internet.ssl import (
+ CertificateOptions,
+ ContextFactory,
+ TLSVersion,
+ platformTrust,
+)
from twisted.python.failure import Failure
logger = logging.getLogger(__name__)
+_TLS_VERSION_MAP = {
+ "1": TLSVersion.TLSv1_0,
+ "1.1": TLSVersion.TLSv1_1,
+ "1.2": TLSVersion.TLSv1_2,
+ "1.3": TLSVersion.TLSv1_3,
+}
+
+
class ServerContextFactory(ContextFactory):
"""Factory for PyOpenSSL SSL contexts that are used to handle incoming
connections."""
@@ -43,16 +56,18 @@ class ServerContextFactory(ContextFactory):
try:
_ecCurve = crypto.get_elliptic_curve(_defaultCurveName)
context.set_tmp_ecdh(_ecCurve)
-
except Exception:
logger.exception("Failed to enable elliptic curve for TLS")
- context.set_options(SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3)
+
+ context.set_options(
+ SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL.OP_NO_TLSv1 | SSL.OP_NO_TLSv1_1
+ )
context.use_certificate_chain_file(config.tls_certificate_file)
context.use_privatekey(config.tls_private_key)
# https://hynek.me/articles/hardening-your-web-servers-ssl-ciphers/
context.set_cipher_list(
- "ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1"
+ "ECDH+AESGCM:ECDH+CHACHA20:ECDH+AES256:ECDH+AES128:!aNULL:!SHA1:!AESCCM"
)
def getContext(self):
@@ -79,10 +94,22 @@ class ClientTLSOptionsFactory(object):
# Use CA root certs provided by OpenSSL
trust_root = platformTrust()
- self._verify_ssl_context = CertificateOptions(trustRoot=trust_root).getContext()
+ # "insecurelyLowerMinimumTo" is the argument that will go lower than
+ # Twisted's default, which is why it is marked as "insecure" (since
+ # Twisted's defaults are reasonably secure). But, since Twisted is
+ # moving to TLS 1.2 by default, we want to respect the config option if
+ # it is set to 1.0 (which the alternate option, raiseMinimumTo, will not
+ # let us do).
+ minTLS = _TLS_VERSION_MAP[config.federation_client_minimum_tls_version]
+
+ self._verify_ssl = CertificateOptions(
+ trustRoot=trust_root, insecurelyLowerMinimumTo=minTLS
+ )
+ self._verify_ssl_context = self._verify_ssl.getContext()
self._verify_ssl_context.set_info_callback(self._context_info_cb)
- self._no_verify_ssl_context = CertificateOptions().getContext()
+ self._no_verify_ssl = CertificateOptions(insecurelyLowerMinimumTo=minTLS)
+ self._no_verify_ssl_context = self._no_verify_ssl.getContext()
self._no_verify_ssl_context.set_info_callback(self._context_info_cb)
def get_options(self, host):
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 10c2eb7f0f..341c863152 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -44,15 +44,16 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
-from synapse.storage.keys import FetchKeyResult
-from synapse.util import logcontext, unwrapFirstError
-from synapse.util.async_helpers import yieldable_gather_results
-from synapse.util.logcontext import (
+from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
+ make_deferred_yieldable,
preserve_fn,
run_in_background,
)
+from synapse.storage.keys import FetchKeyResult
+from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import yieldable_gather_results
from synapse.util.metrics import Measure
from synapse.util.retryutils import NotRetryingDestination
@@ -140,7 +141,7 @@ class Keyring(object):
"""
req = VerifyJsonRequest(server_name, json_object, validity_time, request_name)
requests = (req,)
- return logcontext.make_deferred_yieldable(self._verify_objects(requests)[0])
+ return make_deferred_yieldable(self._verify_objects(requests)[0])
def verify_json_objects_for_server(self, server_and_json):
"""Bulk verifies signatures of json objects, bulk fetching keys as
@@ -557,7 +558,7 @@ class BaseV2KeyFetcher(object):
signed_key_json_bytes = encode_canonical_json(signed_key_json)
- yield logcontext.make_deferred_yieldable(
+ yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
@@ -612,7 +613,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
defer.returnValue({})
- results = yield logcontext.make_deferred_yieldable(
+ results = yield make_deferred_yieldable(
defer.gatherResults(
[run_in_background(get_key, server) for server in self.key_servers],
consumeErrors=True,
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index a96cdada3d..a9545e6c1b 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -19,7 +19,7 @@ from frozendict import frozendict
from twisted.internet import defer
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.logging.context import make_deferred_yieldable, run_in_background
class EventContext(object):
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index f24f0c16f0..987de5cab7 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -392,7 +392,11 @@ class EventClientSerializer(object):
serialized_event["content"].pop("m.relates_to", None)
r = serialized_event["unsigned"].setdefault("m.relations", {})
- r[RelationTypes.REPLACE] = {"event_id": edit.event_id}
+ r[RelationTypes.REPLACE] = {
+ "event_id": edit.event_id,
+ "origin_server_ts": edit.origin_server_ts,
+ "sender": edit.sender,
+ }
defer.returnValue(serialized_event)
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 1e925b19e7..f7bb806ae7 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -27,8 +27,14 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.events import event_type_from_format_version
from synapse.events.utils import prune_event
from synapse.http.servlet import assert_params_in_dict
+from synapse.logging.context import (
+ LoggingContext,
+ PreserveLoggingContext,
+ make_deferred_yieldable,
+ preserve_fn,
+)
from synapse.types import get_domain_from_id
-from synapse.util import logcontext, unwrapFirstError
+from synapse.util import unwrapFirstError
logger = logging.getLogger(__name__)
@@ -73,7 +79,7 @@ class FederationBase(object):
@defer.inlineCallbacks
def handle_check_result(pdu, deferred):
try:
- res = yield logcontext.make_deferred_yieldable(deferred)
+ res = yield make_deferred_yieldable(deferred)
except SynapseError:
res = None
@@ -102,10 +108,10 @@ class FederationBase(object):
defer.returnValue(res)
- handle = logcontext.preserve_fn(handle_check_result)
+ handle = preserve_fn(handle_check_result)
deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
- valid_pdus = yield logcontext.make_deferred_yieldable(
+ valid_pdus = yield make_deferred_yieldable(
defer.gatherResults(deferreds2, consumeErrors=True)
).addErrback(unwrapFirstError)
@@ -115,7 +121,7 @@ class FederationBase(object):
defer.returnValue([p for p in valid_pdus if p])
def _check_sigs_and_hash(self, room_version, pdu):
- return logcontext.make_deferred_yieldable(
+ return make_deferred_yieldable(
self._check_sigs_and_hashes(room_version, [pdu])[0]
)
@@ -133,14 +139,14 @@ class FederationBase(object):
* returns a redacted version of the event (if the signature
matched but the hash did not)
* throws a SynapseError if the signature check failed.
- The deferreds run their callbacks in the sentinel logcontext.
+ The deferreds run their callbacks in the sentinel
"""
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
- ctx = logcontext.LoggingContext.current_context()
+ ctx = LoggingContext.current_context()
def callback(_, pdu):
- with logcontext.PreserveLoggingContext(ctx):
+ with PreserveLoggingContext(ctx):
if not check_event_content_hash(pdu):
# let's try to distinguish between failures because the event was
# redacted (which are somewhat expected) vs actual ball-tampering
@@ -178,7 +184,7 @@ class FederationBase(object):
def errback(failure, pdu):
failure.trap(SynapseError)
- with logcontext.PreserveLoggingContext(ctx):
+ with PreserveLoggingContext(ctx):
logger.warn(
"Signature check failed for %s: %s",
pdu.event_id,
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3883eb525e..3cb4b94420 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -39,10 +39,10 @@ from synapse.api.room_versions import (
)
from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
-from synapse.util import logcontext, unwrapFirstError
+from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.utils import log_function
+from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
-from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -207,7 +207,7 @@ class FederationClient(FederationBase):
]
# FIXME: We should handle signature failures more gracefully.
- pdus[:] = yield logcontext.make_deferred_yieldable(
+ pdus[:] = yield make_deferred_yieldable(
defer.gatherResults(
self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True
).addErrback(unwrapFirstError)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2e0cebb638..8c0a18b120 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -42,6 +42,8 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
from synapse.http.endpoint import parse_server_name
+from synapse.logging.context import nested_logging_context
+from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
@@ -50,8 +52,6 @@ from synapse.types import get_domain_from_id
from synapse.util import glob_to_regex
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import nested_logging_context
-from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit.
diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py
index 7535f79203..44edcabed4 100644
--- a/synapse/federation/persistence.py
+++ b/synapse/federation/persistence.py
@@ -21,9 +21,7 @@ These actions are mostly only used by the :py:mod:`.replication` module.
import logging
-from twisted.internet import defer
-
-from synapse.util.logutils import log_function
+from synapse.logging.utils import log_function
logger = logging.getLogger(__name__)
@@ -63,33 +61,3 @@ class TransactionActions(object):
return self.store.set_received_txn_response(
transaction.transaction_id, origin, code, response
)
-
- @defer.inlineCallbacks
- @log_function
- def prepare_to_send(self, transaction):
- """ Persists the `Transaction` we are about to send and works out the
- correct value for the `prev_ids` key.
-
- Returns:
- Deferred
- """
- transaction.prev_ids = yield self.store.prep_send_transaction(
- transaction.transaction_id,
- transaction.destination,
- transaction.origin_server_ts,
- )
-
- @log_function
- def delivered(self, transaction, response_code, response_dict):
- """ Marks the given `Transaction` as having been successfully
- delivered to the remote homeserver, and what the response was.
-
- Returns:
- Deferred
- """
- return self.store.delivered_txn(
- transaction.transaction_id,
- transaction.destination,
- response_code,
- response_dict,
- )
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 766c5a37cd..d46f4aaeb1 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -26,6 +26,11 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
+from synapse.logging.context import (
+ make_deferred_yieldable,
+ preserve_fn,
+ run_in_background,
+)
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
@@ -33,7 +38,6 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util import logcontext
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
@@ -210,10 +214,10 @@ class FederationSender(object):
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
- yield logcontext.make_deferred_yieldable(
+ yield make_deferred_yieldable(
defer.gatherResults(
[
- logcontext.run_in_background(handle_room_events, evs)
+ run_in_background(handle_room_events, evs)
for evs in itervalues(events_by_room)
],
consumeErrors=True,
@@ -360,7 +364,7 @@ class FederationSender(object):
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
- @logcontext.preserve_fn # the caller should not yield on this
+ @preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c987bb9a0d..0460a8c4ac 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -63,8 +63,6 @@ class TransactionManager(object):
len(edus),
)
- logger.debug("TX [%s] Persisting transaction...", destination)
-
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
@@ -76,9 +74,6 @@ class TransactionManager(object):
self._next_txn_id += 1
- yield self._transaction_actions.prepare_to_send(transaction)
-
- logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
destination,
@@ -118,10 +113,6 @@ class TransactionManager(object):
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
- yield self._transaction_actions.delivered(transaction, code, response)
-
- logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
-
if code == 200:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index aecd142309..1aae9ec9e7 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
-from synapse.util.logutils import log_function
+from synapse.logging.utils import log_function
logger = logging.getLogger(__name__)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 955f0f4308..2efdcff7ef 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -36,8 +36,8 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string_from_args,
)
+from synapse.logging.context import run_in_background
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
-from synapse.util.logcontext import run_in_background
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index e73757570c..f497711133 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -43,9 +43,9 @@ from signedjson.sign import sign_json
from twisted.internet import defer
from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
+from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
-from synapse.util.logcontext import run_in_background
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 0719da3ab7..1f1708ba7d 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -22,9 +22,10 @@ from email.mime.text import MIMEText
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.logging.context import make_deferred_yieldable
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID
from synapse.util import stringutils
-from synapse.util.logcontext import make_deferred_yieldable
try:
from synapse.push.mailer import load_jinja2_templates
@@ -67,7 +68,14 @@ class AccountValidityHandler(object):
)
# Check the renewal emails to send and send them every 30min.
- self.clock.looping_call(self.send_renewal_emails, 30 * 60 * 1000)
+ def send_emails():
+ # run as a background process to make sure that the database transactions
+ # have a logcontext to report to
+ return run_as_background_process(
+ "send_renewals", self.send_renewal_emails
+ )
+
+ self.clock.looping_call(send_emails, 30 * 60 * 1000)
@defer.inlineCallbacks
def send_renewal_emails(self):
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 5cc89d43f6..8f089f0e33 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,13 +23,13 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
event_processing_loop_counter,
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import log_failure
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 97b21c4093..ef5585aa99 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -36,9 +36,9 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
+from synapse.logging.context import defer_to_thread
from synapse.module_api import ModuleApi
from synapse.types import UserID
-from synapse.util import logcontext
from synapse.util.caches.expiringcache import ExpiringCache
from ._base import BaseHandler
@@ -743,7 +743,7 @@ class AuthHandler(BaseHandler):
result = (result, None)
defer.returnValue(result)
- if login_type == LoginType.PASSWORD:
+ if login_type == LoginType.PASSWORD and self.hs.config.password_localdb_enabled:
known_login_type = True
canonical_user_id = yield self._check_local_password(
@@ -987,7 +987,7 @@ class AuthHandler(BaseHandler):
bcrypt.gensalt(self.bcrypt_rounds),
).decode("ascii")
- return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash)
+ return defer_to_thread(self.hs.get_reactor(), _do_hash)
def validate_hash(self, password, stored_hash):
"""Validates that self.hash(password) == stored_hash.
@@ -1013,7 +1013,7 @@ class AuthHandler(BaseHandler):
if not isinstance(stored_hash, bytes):
stored_hash = stored_hash.encode("ascii")
- return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
+ return defer_to_thread(self.hs.get_reactor(), _do_validate_hash)
else:
return defer.succeed(False)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index f59d0479b5..99e8413092 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -101,9 +101,13 @@ class DeviceWorkerHandler(BaseHandler):
room_ids = yield self.store.get_rooms_for_user(user_id)
- # First we check if any devices have changed
- changed = yield self.store.get_user_whose_devices_changed(
- from_token.device_list_key
+ # First we check if any devices have changed for users that we share
+ # rooms with.
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+ changed = yield self.store.get_users_whose_devices_changed(
+ from_token.device_list_key, users_who_share_room
)
# Then work out if any users have since joined
@@ -188,10 +192,6 @@ class DeviceWorkerHandler(BaseHandler):
break
if possibly_changed or possibly_left:
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
-
# Take the intersection of the users whose devices may have changed
# and those that actually still share a room with the user
possibly_joined = possibly_changed & users_who_share_room
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 807900fe52..55b4ab3a1a 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -23,8 +23,8 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import UserID, get_domain_from_id
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 5836d3c639..6a38328af3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -21,8 +21,8 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
+from synapse.logging.utils import log_function
from synapse.types import UserID
-from synapse.util.logutils import log_function
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 02d397c498..57be968c67 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,13 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.crypto.event_signing import compute_event_signature
from synapse.event_auth import auth_types_for_event
from synapse.events.validator import EventValidator
+from synapse.logging.context import (
+ make_deferred_yieldable,
+ nested_logging_context,
+ preserve_fn,
+ run_in_background,
+)
+from synapse.logging.utils import log_function
from synapse.replication.http.federation import (
ReplicationCleanRoomRestServlet,
ReplicationFederationSendEventsRestServlet,
@@ -52,10 +59,9 @@ from synapse.replication.http.federation import (
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
from synapse.state import StateResolutionStore, resolve_events_with_store
from synapse.types import UserID, get_domain_from_id
-from synapse.util import logcontext, unwrapFirstError
+from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room
-from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -338,7 +344,7 @@ class FederationHandler(BaseHandler):
room_version = yield self.store.get_room_version(room_id)
- with logcontext.nested_logging_context(p):
+ with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
@@ -532,7 +538,7 @@ class FederationHandler(BaseHandler):
event_id,
ev.event_id,
)
- with logcontext.nested_logging_context(ev.event_id):
+ with nested_logging_context(ev.event_id):
try:
yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
except FederationError as e:
@@ -725,10 +731,10 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch,
)
- results = yield logcontext.make_deferred_yieldable(
+ results = yield make_deferred_yieldable(
defer.gatherResults(
[
- logcontext.run_in_background(
+ run_in_background(
self.federation_client.get_pdu,
[dest],
event_id,
@@ -994,10 +1000,8 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
- resolve = logcontext.preserve_fn(
- self.state_handler.resolve_state_groups_for_events
- )
- states = yield logcontext.make_deferred_yieldable(
+ resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
+ states = yield make_deferred_yieldable(
defer.gatherResults(
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
)
@@ -1171,7 +1175,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
- logcontext.run_in_background(self._handle_queued_pdus, room_queue)
+ run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True)
@@ -1191,7 +1195,7 @@ class FederationHandler(BaseHandler):
p.event_id,
p.room_id,
)
- with logcontext.nested_logging_context(p.event_id):
+ with nested_logging_context(p.event_id):
yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
@@ -1610,7 +1614,7 @@ class FederationHandler(BaseHandler):
success = True
finally:
if not success:
- logcontext.run_in_background(
+ run_in_background(
self.store.remove_push_actions_from_staging, event.event_id
)
@@ -1629,7 +1633,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def prep(ev_info):
event = ev_info["event"]
- with logcontext.nested_logging_context(suffix=event.event_id):
+ with nested_logging_context(suffix=event.event_id):
res = yield self._prep_event(
origin,
event,
@@ -1639,12 +1643,9 @@ class FederationHandler(BaseHandler):
)
defer.returnValue(res)
- contexts = yield logcontext.make_deferred_yieldable(
+ contexts = yield make_deferred_yieldable(
defer.gatherResults(
- [
- logcontext.run_in_background(prep, ev_info)
- for ev_info in event_infos
- ],
+ [run_in_background(prep, ev_info) for ev_info in event_infos],
consumeErrors=True,
)
)
@@ -2106,10 +2107,10 @@ class FederationHandler(BaseHandler):
room_version = yield self.store.get_room_version(event.room_id)
- different_events = yield logcontext.make_deferred_yieldable(
+ different_events = yield make_deferred_yieldable(
defer.gatherResults(
[
- logcontext.run_in_background(
+ run_in_background(
self.store.get_event, d, allow_none=True, allow_rejected=False
)
for d in different_auth
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index a1fe9d116f..54c966c8a6 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -21,12 +21,12 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken, UserID
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 683da6bf32..eaeda7a5cb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -34,13 +34,13 @@ from synapse.api.errors import (
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.validator import EventValidator
+from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 76ee97ddd3..20bcfed334 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -20,10 +20,10 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
+from synapse.logging.context import run_in_background
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
-from synapse.util.logcontext import run_in_background
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 5204073a38..6f3537e435 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,14 +34,14 @@ from twisted.internet import defer
import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
+from synapse.logging.context import run_in_background
+from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cachedInlineCallbacks
-from synapse.util.logcontext import run_in_background
-from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -1017,11 +1017,28 @@ class PresenceEventSource(object):
if from_key is not None:
from_key = int(from_key)
+ max_token = self.store.get_current_presence_token()
+ if from_key == max_token:
+ # This is necessary as due to the way stream ID generators work
+ # we may get updates that have a stream ID greater than the max
+ # token (e.g. max_token is N but stream generator may return
+ # results for N+2, due to N+1 not having finished being
+ # persisted yet).
+ #
+ # This is usually fine, as it just means that we may send down
+ # some presence updates multiple times. However, we need to be
+ # careful that the sync stream either actually does make some
+ # progress or doesn't return, otherwise clients will end up
+ # tight looping calling /sync due to it immediately returning
+ # the same token repeatedly.
+ #
+ # Hence this guard where we just return nothing so that the sync
+ # doesn't return. C.f. #5503.
+ defer.returnValue(([], max_token))
+
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
- max_token = self.store.get_current_presence_token()
-
users_interested_in = yield self._get_interested_in(user, explicit_room_id)
user_ids_changed = set()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index e487b90c08..853020180b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -506,87 +506,6 @@ class RegistrationHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def get_or_create_user(self, requester, localpart, displayname, password_hash=None):
- """Creates a new user if the user does not exist,
- else revokes all previous access tokens and generates a new one.
-
- Args:
- localpart : The local part of the user ID to register. If None,
- one will be randomly generated.
- Returns:
- A tuple of (user_id, access_token).
- Raises:
- RegistrationError if there was a problem registering.
-
- NB this is only used in tests. TODO: move it to the test package!
- """
- if localpart is None:
- raise SynapseError(400, "Request must include user id")
- yield self.auth.check_auth_blocking()
- need_register = True
-
- try:
- yield self.check_username(localpart)
- except SynapseError as e:
- if e.errcode == Codes.USER_IN_USE:
- need_register = False
- else:
- raise
-
- user = UserID(localpart, self.hs.hostname)
- user_id = user.to_string()
- token = self.macaroon_gen.generate_access_token(user_id)
-
- if need_register:
- yield self.register_with_store(
- user_id=user_id,
- token=token,
- password_hash=password_hash,
- create_profile_with_displayname=user.localpart,
- )
- else:
- yield self._auth_handler.delete_access_tokens_for_user(user_id)
- yield self.store.add_access_token_to_user(user_id=user_id, token=token)
-
- if displayname is not None:
- logger.info("setting user display name: %s -> %s", user_id, displayname)
- yield self.profile_handler.set_displayname(
- user, requester, displayname, by_admin=True
- )
-
- defer.returnValue((user_id, token))
-
- @defer.inlineCallbacks
- def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
- """Get a guest access token for a 3PID, creating a guest account if
- one doesn't already exist.
-
- Args:
- medium (str)
- address (str)
- inviter_user_id (str): The user ID who is trying to invite the
- 3PID
-
- Returns:
- Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
- 3PID guest account.
- """
- access_token = yield self.store.get_3pid_guest_access_token(medium, address)
- if access_token:
- user_info = yield self.auth.get_user_by_access_token(access_token)
-
- defer.returnValue((user_info["user"].to_string(), access_token))
-
- user_id, access_token = yield self.register(
- generate_token=True, make_guest=True
- )
- access_token = yield self.store.save_or_get_3pid_guest_access_token(
- medium, address, access_token, inviter_user_id
- )
-
- defer.returnValue((user_id, access_token))
-
- @defer.inlineCallbacks
def _join_user_to_room(self, requester, room_identifier):
room_id = None
room_member_handler = self.hs.get_room_member_handler()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index c3420b4b22..e0196ef83e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -119,24 +119,6 @@ class RoomMemberHandler(object):
raise NotImplementedError()
@abc.abstractmethod
- def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
- """Get a guest access token for a 3PID, creating a guest account if
- one doesn't already exist.
-
- Args:
- requester (Requester)
- medium (str)
- address (str)
- inviter_user_id (str): The user ID who is trying to invite the
- 3PID
-
- Returns:
- Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
- 3PID guest account.
- """
- raise NotImplementedError()
-
- @abc.abstractmethod
def _user_joined_room(self, target, room_id):
"""Notifies distributor on master process that the user has joined the
room.
@@ -823,6 +805,7 @@ class RoomMemberHandler(object):
"sender": user.to_string(),
"state_key": token,
},
+ ratelimit=False,
txn_id=txn_id,
)
@@ -889,21 +872,6 @@ class RoomMemberHandler(object):
"sender_avatar_url": inviter_avatar_url,
}
- if self.config.invite_3pid_guest:
- guest_user_id, guest_access_token = yield self.get_or_register_3pid_guest(
- requester=requester,
- medium=medium,
- address=address,
- inviter_user_id=inviter_user_id,
- )
-
- invite_config.update(
- {
- "guest_access_token": guest_access_token,
- "guest_user_id": guest_user_id,
- }
- )
-
try:
data = yield self.simple_http_client.post_json_get_json(
is_url, invite_config
@@ -1023,12 +991,6 @@ class RoomMemberMasterHandler(RoomMemberHandler):
yield self.store.locally_reject_invite(target.to_string(), room_id)
defer.returnValue({})
- def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
- """Implements RoomMemberHandler.get_or_register_3pid_guest
- """
- rg = self.registration_handler
- return rg.get_or_register_3pid_guest(medium, address, inviter_user_id)
-
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index da501f38c0..fc873a3ba6 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,7 +20,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
- ReplicationRegister3PIDGuestRestServlet as Repl3PID,
ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
@@ -33,7 +32,6 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberWorkerHandler, self).__init__(hs)
- self._get_register_3pid_client = Repl3PID.make_client(hs)
self._remote_join_client = ReplRemoteJoin.make_client(hs)
self._remote_reject_client = ReplRejectInvite.make_client(hs)
self._notify_change_client = ReplJoinedLeft.make_client(hs)
@@ -80,13 +78,3 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="left"
)
-
- def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
- """Implements RoomMemberHandler.get_or_register_3pid_guest
- """
- return self._get_register_3pid_client(
- requester=requester,
- medium=medium,
- address=address,
- inviter_user_id=inviter_user_id,
- )
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
new file mode 100644
index 0000000000..a1ce6929cf
--- /dev/null
+++ b/synapse/handlers/saml_handler.py
@@ -0,0 +1,123 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+import attr
+import saml2
+from saml2.client import Saml2Client
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import parse_string
+from synapse.rest.client.v1.login import SSOAuthHandler
+
+logger = logging.getLogger(__name__)
+
+
+class SamlHandler:
+ def __init__(self, hs):
+ self._saml_client = Saml2Client(hs.config.saml2_sp_config)
+ self._sso_auth_handler = SSOAuthHandler(hs)
+
+ # a map from saml session id to Saml2SessionData object
+ self._outstanding_requests_dict = {}
+
+ self._clock = hs.get_clock()
+ self._saml2_session_lifetime = hs.config.saml2_session_lifetime
+
+ def handle_redirect_request(self, client_redirect_url):
+ """Handle an incoming request to /login/sso/redirect
+
+ Args:
+ client_redirect_url (bytes): the URL that we should redirect the
+ client to when everything is done
+
+ Returns:
+ bytes: URL to redirect to
+ """
+ reqid, info = self._saml_client.prepare_for_authenticate(
+ relay_state=client_redirect_url
+ )
+
+ now = self._clock.time_msec()
+ self._outstanding_requests_dict[reqid] = Saml2SessionData(creation_time=now)
+
+ for key, value in info["headers"]:
+ if key == "Location":
+ return value
+
+ # this shouldn't happen!
+ raise Exception("prepare_for_authenticate didn't return a Location header")
+
+ def handle_saml_response(self, request):
+ """Handle an incoming request to /_matrix/saml2/authn_response
+
+ Args:
+ request (SynapseRequest): the incoming request from the browser. We'll
+ respond to it with a redirect.
+
+ Returns:
+ Deferred[none]: Completes once we have handled the request.
+ """
+ resp_bytes = parse_string(request, "SAMLResponse", required=True)
+ relay_state = parse_string(request, "RelayState", required=True)
+
+ # expire outstanding sessions before parse_authn_request_response checks
+ # the dict.
+ self.expire_sessions()
+
+ try:
+ saml2_auth = self._saml_client.parse_authn_request_response(
+ resp_bytes,
+ saml2.BINDING_HTTP_POST,
+ outstanding=self._outstanding_requests_dict,
+ )
+ except Exception as e:
+ logger.warning("Exception parsing SAML2 response: %s", e)
+ raise SynapseError(400, "Unable to parse SAML2 response: %s" % (e,))
+
+ if saml2_auth.not_signed:
+ logger.warning("SAML2 response was not signed")
+ raise SynapseError(400, "SAML2 response was not signed")
+
+ if "uid" not in saml2_auth.ava:
+ logger.warning("SAML2 response lacks a 'uid' attestation")
+ raise SynapseError(400, "uid not in SAML2 response")
+
+ self._outstanding_requests_dict.pop(saml2_auth.in_response_to, None)
+
+ username = saml2_auth.ava["uid"][0]
+ displayName = saml2_auth.ava.get("displayName", [None])[0]
+
+ return self._sso_auth_handler.on_successful_auth(
+ username, request, relay_state, user_display_name=displayName
+ )
+
+ def expire_sessions(self):
+ expire_before = self._clock.time_msec() - self._saml2_session_lifetime
+ to_expire = set()
+ for reqid, data in self._outstanding_requests_dict.items():
+ if data.creation_time < expire_before:
+ to_expire.add(reqid)
+ for reqid in to_expire:
+ logger.debug("Expiring session id %s", reqid)
+ del self._outstanding_requests_dict[reqid]
+
+
+@attr.s
+class Saml2SessionData:
+ """Data we track about SAML2 sessions"""
+
+ # time the session was created, in milliseconds
+ creation_time = attr.ib()
diff --git a/synapse/handlers/set_password.py b/synapse/handlers/set_password.py
index 5a0995d4fe..d90c9e0108 100644
--- a/synapse/handlers/set_password.py
+++ b/synapse/handlers/set_password.py
@@ -33,6 +33,9 @@ class SetPasswordHandler(BaseHandler):
@defer.inlineCallbacks
def set_password(self, user_id, newpassword, requester=None):
+ if not self.hs.config.password_localdb_enabled:
+ raise SynapseError(403, "Password change disabled", errcode=Codes.FORBIDDEN)
+
password_hash = yield self._auth_handler.hash(newpassword)
except_device_id = requester.device_id if requester else None
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c5188a1f8e..cd1ac0a27a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -25,6 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.logging.context import LoggingContext
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
@@ -33,7 +34,6 @@ from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
from synapse.visibility import filter_events_for_client
@@ -1058,40 +1058,74 @@ class SyncHandler(object):
newly_left_rooms,
newly_left_users,
):
+ """Generate the DeviceLists section of sync
+
+ Args:
+ sync_result_builder (SyncResultBuilder)
+ newly_joined_rooms (set[str]): Set of rooms user has joined since
+ previous sync
+ newly_joined_or_invited_users (set[str]): Set of users that have
+ joined or been invited to a room since previous sync.
+ newly_left_rooms (set[str]): Set of rooms user has left since
+ previous sync
+ newly_left_users (set[str]): Set of users that have left a room
+ we're in since previous sync
+
+ Returns:
+ Deferred[DeviceLists]
+ """
+
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
+ # We're going to mutate these fields, so lets copy them rather than
+ # assume they won't get used later.
+ newly_joined_or_invited_users = set(newly_joined_or_invited_users)
+ newly_left_users = set(newly_left_users)
+
if since_token and since_token.device_list_key:
- changed = yield self.store.get_user_whose_devices_changed(
- since_token.device_list_key
+ # We want to figure out what user IDs the client should refetch
+ # device keys for, and which users we aren't going to track changes
+ # for anymore.
+ #
+ # For the first step we check:
+ # a. if any users we share a room with have updated their devices,
+ # and
+ # b. we also check if we've joined any new rooms, or if a user has
+ # joined a room we're in.
+ #
+ # For the second step we just find any users we no longer share a
+ # room with by looking at all users that have left a room plus users
+ # that were in a room we've left.
+
+ users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+ user_id
+ )
+
+ # Step 1a, check for changes in devices of users we share a room with
+ users_that_have_changed = yield self.store.get_users_whose_devices_changed(
+ since_token.device_list_key, users_who_share_room
)
- # TODO: Be more clever than this, i.e. remove users who we already
- # share a room with?
+ # Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = yield self.state.get_current_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)
- for room_id in newly_left_rooms:
- left_users = yield self.state.get_current_users_in_room(room_id)
- newly_left_users.update(left_users)
-
# TODO: Check that these users are actually new, i.e. either they
# weren't in the previous sync *or* they left and rejoined.
- changed.update(newly_joined_or_invited_users)
+ users_that_have_changed.update(newly_joined_or_invited_users)
- if not changed and not newly_left_users:
- defer.returnValue(DeviceLists(changed=[], left=newly_left_users))
+ # Now find users that we no longer track
+ for room_id in newly_left_rooms:
+ left_users = yield self.state.get_current_users_in_room(room_id)
+ newly_left_users.update(left_users)
- users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id
- )
+ # Remove any users that we still share a room with.
+ newly_left_users -= users_who_share_room
defer.returnValue(
- DeviceLists(
- changed=users_who_share_room & changed,
- left=set(newly_left_users) - users_who_share_room,
- )
+ DeviceLists(changed=users_that_have_changed, left=newly_left_users)
)
else:
defer.returnValue(DeviceLists(changed=[], left=[]))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index f8062c8671..c3e0c8fc7e 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,9 +19,9 @@ from collections import namedtuple
from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
+from synapse.logging.context import run_in_background
from synapse.types import UserID, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 9bc7035c8d..45d5010952 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -45,9 +45,9 @@ from synapse.http import (
cancelled_to_request_timed_out_error,
redact_uri,
)
+from synapse.logging.context import make_deferred_yieldable
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
-from synapse.util.logcontext import make_deferred_yieldable
logger = logging.getLogger(__name__)
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 414cde0777..054c321a20 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -30,9 +30,9 @@ from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.caches.ttlcache import TTLCache
-from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
# period to cache .well-known results for by default
diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py
index 1f22f78a75..ecc88f9b96 100644
--- a/synapse/http/federation/srv_resolver.py
+++ b/synapse/http/federation/srv_resolver.py
@@ -25,7 +25,7 @@ from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
-from synapse.util.logcontext import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable
logger = logging.getLogger(__name__)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 5ef8bb60a3..dee3710f68 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -48,8 +48,8 @@ from synapse.api.errors import (
from synapse.http import QuieterFileBodyProducer
from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.logging.context import make_deferred_yieldable
from synapse.util.async_helpers import timeout_deferred
-from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 62045a918b..46af27c8f6 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -19,8 +19,8 @@ import threading
from prometheus_client.core import Counter, Histogram
+from synapse.logging.context import LoggingContext
from synapse.metrics import LaterGauge
-from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 6fd13e87d1..72a3d67eb6 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -16,10 +16,11 @@
import cgi
import collections
+import http.client
import logging
-
-from six import PY3
-from six.moves import http_client, urllib
+import types
+import urllib
+from io import BytesIO
from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
@@ -38,13 +39,8 @@ from synapse.api.errors import (
SynapseError,
UnrecognizedRequestError,
)
+from synapse.logging.context import preserve_fn
from synapse.util.caches import intern_dict
-from synapse.util.logcontext import preserve_fn
-
-if PY3:
- from io import BytesIO
-else:
- from cStringIO import StringIO as BytesIO
logger = logging.getLogger(__name__)
@@ -69,16 +65,15 @@ def wrap_json_request_handler(h):
The handler method must have a signature of "handle_foo(self, request)",
where "request" must be a SynapseRequest.
- The handler must return a deferred. If the deferred succeeds we assume that
- a response has been sent. If the deferred fails with a SynapseError we use
+ The handler must return a deferred or a coroutine. If the deferred succeeds
+ we assume that a response has been sent. If the deferred fails with a SynapseError we use
it to send a JSON response with the appropriate HTTP reponse code. If the
deferred fails with any other type of error we send a 500 reponse.
"""
- @defer.inlineCallbacks
- def wrapped_request_handler(self, request):
+ async def wrapped_request_handler(self, request):
try:
- yield h(self, request)
+ await h(self, request)
except SynapseError as e:
code = e.code
logger.info("%s SynapseError: %s - %s", request, code, e.msg)
@@ -142,10 +137,12 @@ def wrap_html_request_handler(h):
where "request" must be a SynapseRequest.
"""
- def wrapped_request_handler(self, request):
- d = defer.maybeDeferred(h, self, request)
- d.addErrback(_return_html_error, request)
- return d
+ async def wrapped_request_handler(self, request):
+ try:
+ return await h(self, request)
+ except Exception:
+ f = failure.Failure()
+ return _return_html_error(f, request)
return wrap_async_request_handler(wrapped_request_handler)
@@ -171,7 +168,7 @@ def _return_html_error(f, request):
exc_info=(f.type, f.value, f.getTracebackObject()),
)
else:
- code = http_client.INTERNAL_SERVER_ERROR
+ code = http.client.INTERNAL_SERVER_ERROR
msg = "Internal server error"
logger.error(
@@ -201,10 +198,9 @@ def wrap_async_request_handler(h):
logged until the deferred completes.
"""
- @defer.inlineCallbacks
- def wrapped_async_request_handler(self, request):
+ async def wrapped_async_request_handler(self, request):
with request.processing():
- yield h(self, request)
+ await h(self, request)
# we need to preserve_fn here, because the synchronous render method won't yield for
# us (obviously)
@@ -270,12 +266,11 @@ class JsonResource(HttpServer, resource.Resource):
def render(self, request):
""" This gets called by twisted every time someone sends us a request.
"""
- self._async_render(request)
+ defer.ensureDeferred(self._async_render(request))
return NOT_DONE_YET
@wrap_json_request_handler
- @defer.inlineCallbacks
- def _async_render(self, request):
+ async def _async_render(self, request):
""" This gets called from render() every time someone sends us a request.
This checks if anyone has registered a callback for that method and
path.
@@ -292,26 +287,19 @@ class JsonResource(HttpServer, resource.Resource):
# Now trigger the callback. If it returns a response, we send it
# here. If it throws an exception, that is handled by the wrapper
# installed by @request_handler.
-
- def _unquote(s):
- if PY3:
- # On Python 3, unquote is unicode -> unicode
- return urllib.parse.unquote(s)
- else:
- # On Python 2, unquote is bytes -> bytes We need to encode the
- # URL again (as it was decoded by _get_handler_for request), as
- # ASCII because it's a URL, and then decode it to get the UTF-8
- # characters that were quoted.
- return urllib.parse.unquote(s.encode("ascii")).decode("utf8")
-
kwargs = intern_dict(
{
- name: _unquote(value) if value else value
+ name: urllib.parse.unquote(value) if value else value
for name, value in group_dict.items()
}
)
- callback_return = yield callback(request, **kwargs)
+ callback_return = callback(request, **kwargs)
+
+ # Is it synchronous? We'll allow this for now.
+ if isinstance(callback_return, (defer.Deferred, types.CoroutineType)):
+ callback_return = await callback_return
+
if callback_return is not None:
code, response = callback_return
self._send_response(request, code, response)
@@ -360,6 +348,29 @@ class JsonResource(HttpServer, resource.Resource):
)
+class DirectServeResource(resource.Resource):
+ def render(self, request):
+ """
+ Render the request, using an asynchronous render handler if it exists.
+ """
+ async_render_callback_name = "_async_render_" + request.method.decode("ascii")
+
+ # Try and get the async renderer
+ callback = getattr(self, async_render_callback_name, None)
+
+ # No async renderer for this request method.
+ if not callback:
+ return super().render(request)
+
+ resp = callback(request)
+
+ # If it's a coroutine, turn it into a Deferred
+ if isinstance(resp, types.CoroutineType):
+ defer.ensureDeferred(resp)
+
+ return NOT_DONE_YET
+
+
def _options_handler(request):
"""Request handler for OPTIONS requests
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 93f679ea48..df5274c177 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -19,7 +19,7 @@ from twisted.web.server import Request, Site
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics, requests_counter
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.logging.context import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/synapse/logging/__init__.py
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
new file mode 100644
index 0000000000..30dfa1d6b2
--- /dev/null
+++ b/synapse/logging/context.py
@@ -0,0 +1,693 @@
+# Copyright 2014-2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+""" Thread-local-alike tracking of log contexts within synapse
+
+This module provides objects and utilities for tracking contexts through
+synapse code, so that log lines can include a request identifier, and so that
+CPU and database activity can be accounted for against the request that caused
+them.
+
+See doc/log_contexts.rst for details on how this works.
+"""
+
+import logging
+import threading
+import types
+
+from twisted.internet import defer, threads
+
+logger = logging.getLogger(__name__)
+
+try:
+ import resource
+
+ # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined
+ # to be 1 on linux so we hard code it.
+ RUSAGE_THREAD = 1
+
+ # If the system doesn't support RUSAGE_THREAD then this should throw an
+ # exception.
+ resource.getrusage(RUSAGE_THREAD)
+
+ def get_thread_resource_usage():
+ return resource.getrusage(RUSAGE_THREAD)
+
+
+except Exception:
+ # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
+ # won't track resource usage by returning None.
+ def get_thread_resource_usage():
+ return None
+
+
+# get an id for the current thread.
+#
+# threading.get_ident doesn't actually return an OS-level tid, and annoyingly,
+# on Linux it actually returns the same value either side of a fork() call. However
+# we only fork in one place, so it's not worth the hoop-jumping to get a real tid.
+#
+get_thread_id = threading.get_ident
+
+
+class ContextResourceUsage(object):
+ """Object for tracking the resources used by a log context
+
+ Attributes:
+ ru_utime (float): user CPU time (in seconds)
+ ru_stime (float): system CPU time (in seconds)
+ db_txn_count (int): number of database transactions done
+ db_sched_duration_sec (float): amount of time spent waiting for a
+ database connection
+ db_txn_duration_sec (float): amount of time spent doing database
+ transactions (excluding scheduling time)
+ evt_db_fetch_count (int): number of events requested from the database
+ """
+
+ __slots__ = [
+ "ru_stime",
+ "ru_utime",
+ "db_txn_count",
+ "db_txn_duration_sec",
+ "db_sched_duration_sec",
+ "evt_db_fetch_count",
+ ]
+
+ def __init__(self, copy_from=None):
+ """Create a new ContextResourceUsage
+
+ Args:
+ copy_from (ContextResourceUsage|None): if not None, an object to
+ copy stats from
+ """
+ if copy_from is None:
+ self.reset()
+ else:
+ self.ru_utime = copy_from.ru_utime
+ self.ru_stime = copy_from.ru_stime
+ self.db_txn_count = copy_from.db_txn_count
+
+ self.db_txn_duration_sec = copy_from.db_txn_duration_sec
+ self.db_sched_duration_sec = copy_from.db_sched_duration_sec
+ self.evt_db_fetch_count = copy_from.evt_db_fetch_count
+
+ def copy(self):
+ return ContextResourceUsage(copy_from=self)
+
+ def reset(self):
+ self.ru_stime = 0.0
+ self.ru_utime = 0.0
+ self.db_txn_count = 0
+
+ self.db_txn_duration_sec = 0
+ self.db_sched_duration_sec = 0
+ self.evt_db_fetch_count = 0
+
+ def __repr__(self):
+ return (
+ "<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+ "db_txn_count='%r', db_txn_duration_sec='%r', "
+ "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
+ ) % (
+ self.ru_stime,
+ self.ru_utime,
+ self.db_txn_count,
+ self.db_txn_duration_sec,
+ self.db_sched_duration_sec,
+ self.evt_db_fetch_count,
+ )
+
+ def __iadd__(self, other):
+ """Add another ContextResourceUsage's stats to this one's.
+
+ Args:
+ other (ContextResourceUsage): the other resource usage object
+ """
+ self.ru_utime += other.ru_utime
+ self.ru_stime += other.ru_stime
+ self.db_txn_count += other.db_txn_count
+ self.db_txn_duration_sec += other.db_txn_duration_sec
+ self.db_sched_duration_sec += other.db_sched_duration_sec
+ self.evt_db_fetch_count += other.evt_db_fetch_count
+ return self
+
+ def __isub__(self, other):
+ self.ru_utime -= other.ru_utime
+ self.ru_stime -= other.ru_stime
+ self.db_txn_count -= other.db_txn_count
+ self.db_txn_duration_sec -= other.db_txn_duration_sec
+ self.db_sched_duration_sec -= other.db_sched_duration_sec
+ self.evt_db_fetch_count -= other.evt_db_fetch_count
+ return self
+
+ def __add__(self, other):
+ res = ContextResourceUsage(copy_from=self)
+ res += other
+ return res
+
+ def __sub__(self, other):
+ res = ContextResourceUsage(copy_from=self)
+ res -= other
+ return res
+
+
+class LoggingContext(object):
+ """Additional context for log formatting. Contexts are scoped within a
+ "with" block.
+
+ If a parent is given when creating a new context, then:
+ - logging fields are copied from the parent to the new context on entry
+ - when the new context exits, the cpu usage stats are copied from the
+ child to the parent
+
+ Args:
+ name (str): Name for the context for debugging.
+ parent_context (LoggingContext|None): The parent of the new context
+ """
+
+ __slots__ = [
+ "previous_context",
+ "name",
+ "parent_context",
+ "_resource_usage",
+ "usage_start",
+ "main_thread",
+ "alive",
+ "request",
+ "tag",
+ ]
+
+ thread_local = threading.local()
+
+ class Sentinel(object):
+ """Sentinel to represent the root context"""
+
+ __slots__ = []
+
+ def __str__(self):
+ return "sentinel"
+
+ def copy_to(self, record):
+ pass
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def add_database_transaction(self, duration_sec):
+ pass
+
+ def add_database_scheduled(self, sched_sec):
+ pass
+
+ def record_event_fetch(self, event_count):
+ pass
+
+ def __nonzero__(self):
+ return False
+
+ __bool__ = __nonzero__ # python3
+
+ sentinel = Sentinel()
+
+ def __init__(self, name=None, parent_context=None, request=None):
+ self.previous_context = LoggingContext.current_context()
+ self.name = name
+
+ # track the resources used by this context so far
+ self._resource_usage = ContextResourceUsage()
+
+ # If alive has the thread resource usage when the logcontext last
+ # became active.
+ self.usage_start = None
+
+ self.main_thread = get_thread_id()
+ self.request = None
+ self.tag = ""
+ self.alive = True
+
+ self.parent_context = parent_context
+
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
+ if request is not None:
+ # the request param overrides the request from the parent context
+ self.request = request
+
+ def __str__(self):
+ if self.request:
+ return str(self.request)
+ return "%s@%x" % (self.name, id(self))
+
+ @classmethod
+ def current_context(cls):
+ """Get the current logging context from thread local storage
+
+ Returns:
+ LoggingContext: the current logging context
+ """
+ return getattr(cls.thread_local, "current_context", cls.sentinel)
+
+ @classmethod
+ def set_current_context(cls, context):
+ """Set the current logging context in thread local storage
+ Args:
+ context(LoggingContext): The context to activate.
+ Returns:
+ The context that was previously active
+ """
+ current = cls.current_context()
+
+ if current is not context:
+ current.stop()
+ cls.thread_local.current_context = context
+ context.start()
+ return current
+
+ def __enter__(self):
+ """Enters this logging context into thread local storage"""
+ old_context = self.set_current_context(self)
+ if self.previous_context != old_context:
+ logger.warn(
+ "Expected previous context %r, found %r",
+ self.previous_context,
+ old_context,
+ )
+ self.alive = True
+
+ return self
+
+ def __exit__(self, type, value, traceback):
+ """Restore the logging context in thread local storage to the state it
+ was before this context was entered.
+ Returns:
+ None to avoid suppressing any exceptions that were thrown.
+ """
+ current = self.set_current_context(self.previous_context)
+ if current is not self:
+ if current is self.sentinel:
+ logger.warning("Expected logging context %s was lost", self)
+ else:
+ logger.warning(
+ "Expected logging context %s but found %s", self, current
+ )
+ self.previous_context = None
+ self.alive = False
+
+ # if we have a parent, pass our CPU usage stats on
+ if self.parent_context is not None and hasattr(
+ self.parent_context, "_resource_usage"
+ ):
+ self.parent_context._resource_usage += self._resource_usage
+
+ # reset them in case we get entered again
+ self._resource_usage.reset()
+
+ def copy_to(self, record):
+ """Copy logging fields from this context to a log record or
+ another LoggingContext
+ """
+
+ # 'request' is the only field we currently use in the logger, so that's
+ # all we need to copy
+ record.request = self.request
+
+ def start(self):
+ if get_thread_id() != self.main_thread:
+ logger.warning("Started logcontext %s on different thread", self)
+ return
+
+ # If we haven't already started record the thread resource usage so
+ # far
+ if not self.usage_start:
+ self.usage_start = get_thread_resource_usage()
+
+ def stop(self):
+ if get_thread_id() != self.main_thread:
+ logger.warning("Stopped logcontext %s on different thread", self)
+ return
+
+ # When we stop, let's record the cpu used since we started
+ if not self.usage_start:
+ logger.warning("Called stop on logcontext %s without calling start", self)
+ return
+
+ utime_delta, stime_delta = self._get_cputime()
+ self._resource_usage.ru_utime += utime_delta
+ self._resource_usage.ru_stime += stime_delta
+
+ self.usage_start = None
+
+ def get_resource_usage(self):
+ """Get resources used by this logcontext so far.
+
+ Returns:
+ ContextResourceUsage: a *copy* of the object tracking resource
+ usage so far
+ """
+ # we always return a copy, for consistency
+ res = self._resource_usage.copy()
+
+ # If we are on the correct thread and we're currently running then we
+ # can include resource usage so far.
+ is_main_thread = get_thread_id() == self.main_thread
+ if self.alive and self.usage_start and is_main_thread:
+ utime_delta, stime_delta = self._get_cputime()
+ res.ru_utime += utime_delta
+ res.ru_stime += stime_delta
+
+ return res
+
+ def _get_cputime(self):
+ """Get the cpu usage time so far
+
+ Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
+ """
+ current = get_thread_resource_usage()
+
+ utime_delta = current.ru_utime - self.usage_start.ru_utime
+ stime_delta = current.ru_stime - self.usage_start.ru_stime
+
+ # sanity check
+ if utime_delta < 0:
+ logger.error(
+ "utime went backwards! %f < %f",
+ current.ru_utime,
+ self.usage_start.ru_utime,
+ )
+ utime_delta = 0
+
+ if stime_delta < 0:
+ logger.error(
+ "stime went backwards! %f < %f",
+ current.ru_stime,
+ self.usage_start.ru_stime,
+ )
+ stime_delta = 0
+
+ return utime_delta, stime_delta
+
+ def add_database_transaction(self, duration_sec):
+ if duration_sec < 0:
+ raise ValueError("DB txn time can only be non-negative")
+ self._resource_usage.db_txn_count += 1
+ self._resource_usage.db_txn_duration_sec += duration_sec
+
+ def add_database_scheduled(self, sched_sec):
+ """Record a use of the database pool
+
+ Args:
+ sched_sec (float): number of seconds it took us to get a
+ connection
+ """
+ if sched_sec < 0:
+ raise ValueError("DB scheduling time can only be non-negative")
+ self._resource_usage.db_sched_duration_sec += sched_sec
+
+ def record_event_fetch(self, event_count):
+ """Record a number of events being fetched from the db
+
+ Args:
+ event_count (int): number of events being fetched
+ """
+ self._resource_usage.evt_db_fetch_count += event_count
+
+
+class LoggingContextFilter(logging.Filter):
+ """Logging filter that adds values from the current logging context to each
+ record.
+ Args:
+ **defaults: Default values to avoid formatters complaining about
+ missing fields
+ """
+
+ def __init__(self, **defaults):
+ self.defaults = defaults
+
+ def filter(self, record):
+ """Add each fields from the logging contexts to the record.
+ Returns:
+ True to include the record in the log output.
+ """
+ context = LoggingContext.current_context()
+ for key, value in self.defaults.items():
+ setattr(record, key, value)
+
+ # context should never be None, but if it somehow ends up being, then
+ # we end up in a death spiral of infinite loops, so let's check, for
+ # robustness' sake.
+ if context is not None:
+ context.copy_to(record)
+
+ return True
+
+
+class PreserveLoggingContext(object):
+ """Captures the current logging context and restores it when the scope is
+ exited. Used to restore the context after a function using
+ @defer.inlineCallbacks is resumed by a callback from the reactor."""
+
+ __slots__ = ["current_context", "new_context", "has_parent"]
+
+ def __init__(self, new_context=None):
+ if new_context is None:
+ new_context = LoggingContext.sentinel
+ self.new_context = new_context
+
+ def __enter__(self):
+ """Captures the current logging context"""
+ self.current_context = LoggingContext.set_current_context(self.new_context)
+
+ if self.current_context:
+ self.has_parent = self.current_context.previous_context is not None
+ if not self.current_context.alive:
+ logger.debug("Entering dead context: %s", self.current_context)
+
+ def __exit__(self, type, value, traceback):
+ """Restores the current logging context"""
+ context = LoggingContext.set_current_context(self.current_context)
+
+ if context != self.new_context:
+ if context is LoggingContext.sentinel:
+ logger.warning("Expected logging context %s was lost", self.new_context)
+ else:
+ logger.warning(
+ "Expected logging context %s but found %s",
+ self.new_context,
+ context,
+ )
+
+ if self.current_context is not LoggingContext.sentinel:
+ if not self.current_context.alive:
+ logger.debug("Restoring dead context: %s", self.current_context)
+
+
+def nested_logging_context(suffix, parent_context=None):
+ """Creates a new logging context as a child of another.
+
+ The nested logging context will have a 'request' made up of the parent context's
+ request, plus the given suffix.
+
+ CPU/db usage stats will be added to the parent context's on exit.
+
+ Normal usage looks like:
+
+ with nested_logging_context(suffix):
+ # ... do stuff
+
+ Args:
+ suffix (str): suffix to add to the parent context's 'request'.
+ parent_context (LoggingContext|None): parent context. Will use the current context
+ if None.
+
+ Returns:
+ LoggingContext: new logging context.
+ """
+ if parent_context is None:
+ parent_context = LoggingContext.current_context()
+ return LoggingContext(
+ parent_context=parent_context, request=parent_context.request + "-" + suffix
+ )
+
+
+def preserve_fn(f):
+ """Function decorator which wraps the function with run_in_background"""
+
+ def g(*args, **kwargs):
+ return run_in_background(f, *args, **kwargs)
+
+ return g
+
+
+def run_in_background(f, *args, **kwargs):
+ """Calls a function, ensuring that the current context is restored after
+ return from the function, and that the sentinel context is set once the
+ deferred returned by the function completes.
+
+ Useful for wrapping functions that return a deferred or coroutine, which you don't
+ yield or await on (for instance because you want to pass it to
+ deferred.gatherResults()).
+
+ Note that if you completely discard the result, you should make sure that
+ `f` doesn't raise any deferred exceptions, otherwise a scary-looking
+ CRITICAL error about an unhandled error will be logged without much
+ indication about where it came from.
+ """
+ current = LoggingContext.current_context()
+ try:
+ res = f(*args, **kwargs)
+ except: # noqa: E722
+ # the assumption here is that the caller doesn't want to be disturbed
+ # by synchronous exceptions, so let's turn them into Failures.
+ return defer.fail()
+
+ if isinstance(res, types.CoroutineType):
+ res = defer.ensureDeferred(res)
+
+ if not isinstance(res, defer.Deferred):
+ return res
+
+ if res.called and not res.paused:
+ # The function should have maintained the logcontext, so we can
+ # optimise out the messing about
+ return res
+
+ # The function may have reset the context before returning, so
+ # we need to restore it now.
+ ctx = LoggingContext.set_current_context(current)
+
+ # The original context will be restored when the deferred
+ # completes, but there is nothing waiting for it, so it will
+ # get leaked into the reactor or some other function which
+ # wasn't expecting it. We therefore need to reset the context
+ # here.
+ #
+ # (If this feels asymmetric, consider it this way: we are
+ # effectively forking a new thread of execution. We are
+ # probably currently within a ``with LoggingContext()`` block,
+ # which is supposed to have a single entry and exit point. But
+ # by spawning off another deferred, we are effectively
+ # adding a new exit point.)
+ res.addBoth(_set_context_cb, ctx)
+ return res
+
+
+def make_deferred_yieldable(deferred):
+ """Given a deferred, make it follow the Synapse logcontext rules:
+
+ If the deferred has completed (or is not actually a Deferred), essentially
+ does nothing (just returns another completed deferred with the
+ result/failure).
+
+ If the deferred has not yet completed, resets the logcontext before
+ returning a deferred. Then, when the deferred completes, restores the
+ current logcontext before running callbacks/errbacks.
+
+ (This is more-or-less the opposite operation to run_in_background.)
+ """
+ if not isinstance(deferred, defer.Deferred):
+ return deferred
+
+ if deferred.called and not deferred.paused:
+ # it looks like this deferred is ready to run any callbacks we give it
+ # immediately. We may as well optimise out the logcontext faffery.
+ return deferred
+
+ # ok, we can't be sure that a yield won't block, so let's reset the
+ # logcontext, and add a callback to the deferred to restore it.
+ prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
+ deferred.addBoth(_set_context_cb, prev_context)
+ return deferred
+
+
+def _set_context_cb(result, context):
+ """A callback function which just sets the logging context"""
+ LoggingContext.set_current_context(context)
+ return result
+
+
+def defer_to_thread(reactor, f, *args, **kwargs):
+ """
+ Calls the function `f` using a thread from the reactor's default threadpool and
+ returns the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked, and whose threadpool we should use for the
+ function.
+
+ Normally this will be hs.get_reactor().
+
+ f (callable): The function to call.
+
+ args: positional arguments to pass to f.
+
+ kwargs: keyword arguments to pass to f.
+
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
+ """
+ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
+
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+ """
+ A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+ logcontexts correctly.
+
+ Calls the function `f` using a thread from the given threadpool and returns
+ the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+ threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+ running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+ f (callable): The function to call.
+
+ args: positional arguments to pass to f.
+
+ kwargs: keyword arguments to pass to f.
+
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
+ """
+ logcontext = LoggingContext.current_context()
+
+ def g():
+ with LoggingContext(parent_context=logcontext):
+ return f(*args, **kwargs)
+
+ return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py
new file mode 100644
index 0000000000..fbf570c756
--- /dev/null
+++ b/synapse/logging/formatter.py
@@ -0,0 +1,53 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import traceback
+
+from six import StringIO
+
+
+class LogFormatter(logging.Formatter):
+ """Log formatter which gives more detail for exceptions
+
+ This is the same as the standard log formatter, except that when logging
+ exceptions [typically via log.foo("msg", exc_info=1)], it prints the
+ sequence that led up to the point at which the exception was caught.
+ (Normally only stack frames between the point the exception was raised and
+ where it was caught are logged).
+ """
+
+ def __init__(self, *args, **kwargs):
+ super(LogFormatter, self).__init__(*args, **kwargs)
+
+ def formatException(self, ei):
+ sio = StringIO()
+ (typ, val, tb) = ei
+
+ # log the stack above the exception capture point if possible, but
+ # check that we actually have an f_back attribute to work around
+ # https://twistedmatrix.com/trac/ticket/9305
+
+ if tb and hasattr(tb.tb_frame, "f_back"):
+ sio.write("Capture point (most recent call last):\n")
+ traceback.print_stack(tb.tb_frame.f_back, None, sio)
+
+ traceback.print_exception(typ, val, tb, None, sio)
+ s = sio.getvalue()
+ sio.close()
+ if s[-1:] == "\n":
+ s = s[:-1]
+ return s
diff --git a/synapse/util/logutils.py b/synapse/logging/utils.py
index 7df0fa6087..7df0fa6087 100644
--- a/synapse/util/logutils.py
+++ b/synapse/logging/utils.py
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 1f30179b51..eaf0aaa86e 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -437,7 +437,10 @@ def runUntilCurrentTimer(func):
counts = gc.get_count()
for i in (2, 1, 0):
if threshold[i] < counts[i]:
- logger.info("Collecting gc %d", i)
+ if i == 0:
+ logger.debug("Collecting gc %d", i)
+ else:
+ logger.info("Collecting gc %d", i)
start = time.time()
unreachable = gc.collect(i)
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 167e2c068a..edd6b42db3 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -22,7 +22,7 @@ from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
from twisted.internet import defer
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.logging.context import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index bf43ca09be..a0be2c5ca3 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -12,10 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
from twisted.internet import defer
from synapse.types import UserID
+logger = logging.getLogger(__name__)
+
class ModuleApi(object):
"""A proxy object that gets passed to password auth providers so they
@@ -76,8 +80,13 @@ class ModuleApi(object):
@defer.inlineCallbacks
def register(self, localpart, displayname=None, emails=[]):
- """Registers a new user with given localpart and optional
- displayname, emails.
+ """Registers a new user with given localpart and optional displayname, emails.
+
+ Also returns an access token for the new user.
+
+ Deprecated: avoid this, as it generates a new device with no way to
+ return that device to the user. Prefer separate calls to register_user and
+ register_device.
Args:
localpart (str): The localpart of the new user.
@@ -85,17 +94,56 @@ class ModuleApi(object):
emails (List[str]): Emails to bind to the new user.
Returns:
- Deferred: a 2-tuple of (user_id, access_token)
+ Deferred[tuple[str, str]]: a 2-tuple of (user_id, access_token)
"""
- # Register the user
- reg = self.hs.get_registration_handler()
- user_id, access_token = yield reg.register(
- localpart=localpart, default_display_name=displayname, bind_emails=emails
+ logger.warning(
+ "Using deprecated ModuleApi.register which creates a dummy user device."
)
-
+ user_id = yield self.register_user(localpart, displayname, emails)
+ _, access_token = yield self.register_device(user_id)
defer.returnValue((user_id, access_token))
@defer.inlineCallbacks
+ def register_user(self, localpart, displayname=None, emails=[]):
+ """Registers a new user with given localpart and optional displayname, emails.
+
+ Args:
+ localpart (str): The localpart of the new user.
+ displayname (str|None): The displayname of the new user.
+ emails (List[str]): Emails to bind to the new user.
+
+ Returns:
+ Deferred[str]: user_id
+ """
+ user_id, _ = yield self.hs.get_registration_handler().register(
+ localpart=localpart,
+ default_display_name=displayname,
+ bind_emails=emails,
+ generate_token=False,
+ )
+
+ defer.returnValue(user_id)
+
+ def register_device(self, user_id, device_id=None, initial_display_name=None):
+ """Register a device for a user and generate an access token.
+
+ Args:
+ user_id (str): full canonical @user:id
+ device_id (str|None): The device ID to check, or None to generate
+ a new one.
+ initial_display_name (str|None): An optional display name for the
+ device.
+
+ Returns:
+ defer.Deferred[tuple[str, str]]: Tuple of device ID and access token
+ """
+ return self.hs.get_registration_handler().register_device(
+ user_id=user_id,
+ device_id=device_id,
+ initial_display_name=initial_display_name,
+ )
+
+ @defer.inlineCallbacks
def invalidate_access_token(self, access_token):
"""Invalidate an access token for a user
diff --git a/synapse/notifier.py b/synapse/notifier.py
index d398078eed..918ef64897 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -23,12 +23,12 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
+from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
-from synapse.util.logcontext import PreserveLoggingContext
-from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 96d087de22..134bf805eb 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,5 +1,6 @@
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2017 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -248,6 +249,18 @@ BASE_APPEND_OVERRIDE_RULES = [
],
"actions": ["notify", {"set_tweak": "highlight", "value": True}],
},
+ {
+ "rule_id": "global/override/.m.rule.reaction",
+ "conditions": [
+ {
+ "kind": "event_match",
+ "key": "type",
+ "pattern": "m.reaction",
+ "_id": "_reaction",
+ }
+ ],
+ "actions": ["dont_notify"],
+ },
]
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 809199fe88..521c6e2cd7 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -29,6 +29,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
+from synapse.logging.context import make_deferred_yieldable
from synapse.push.presentable_names import (
calculate_room_name,
descriptor_from_member_events,
@@ -36,7 +37,6 @@ from synapse.push.presentable_names import (
)
from synapse.types import UserID
from synapse.util.async_helpers import concurrently_execute
-from synapse.util.logcontext import make_deferred_yieldable
from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 0a76a3762f..2d9cbbaefc 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -156,70 +156,6 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
defer.returnValue((200, ret))
-class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
- """Gets/creates a guest account for given 3PID.
-
- Request format:
-
- POST /_synapse/replication/get_or_register_3pid_guest/
-
- {
- "requester": ...,
- "medium": ...,
- "address": ...,
- "inviter_user_id": ...
- }
- """
-
- NAME = "get_or_register_3pid_guest"
- PATH_ARGS = ()
-
- def __init__(self, hs):
- super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
-
- self.registeration_handler = hs.get_registration_handler()
- self.store = hs.get_datastore()
- self.clock = hs.get_clock()
-
- @staticmethod
- def _serialize_payload(requester, medium, address, inviter_user_id):
- """
- Args:
- requester(Requester)
- medium (str)
- address (str)
- inviter_user_id (str): The user ID who is trying to invite the
- 3PID
- """
- return {
- "requester": requester.serialize(),
- "medium": medium,
- "address": address,
- "inviter_user_id": inviter_user_id,
- }
-
- @defer.inlineCallbacks
- def _handle_request(self, request):
- content = parse_json_object_from_request(request)
-
- medium = content["medium"]
- address = content["address"]
- inviter_user_id = content["inviter_user_id"]
-
- requester = Requester.deserialize(self.store, content["requester"])
-
- if requester.user:
- request.authenticated_entity = requester.user.to_string()
-
- logger.info("get_or_register_3pid_guest: %r", content)
-
- ret = yield self.registeration_handler.get_or_register_3pid_guest(
- medium, address, inviter_user_id
- )
-
- defer.returnValue((200, ret))
-
-
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room
@@ -272,5 +208,4 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
def register_servlets(hs, http_server):
ReplicationRemoteJoinRestServlet(hs).register(http_server)
ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
- ReplicationRegister3PIDGuestRestServlet(hs).register(http_server)
ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 97efb835ad..5ffdf2675d 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -62,9 +62,9 @@ from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string
from .commands import (
diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py
index 36404b797d..6da71dc46f 100644
--- a/synapse/rest/client/transactions.py
+++ b/synapse/rest/client/transactions.py
@@ -17,8 +17,8 @@
to ensure idempotency when performing PUTs using the REST API."""
import logging
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.util.async_helpers import ObservableDeferred
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index ede6bc8b1e..b13043cc64 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -86,6 +86,7 @@ class LoginRestServlet(RestServlet):
self.jwt_enabled = hs.config.jwt_enabled
self.jwt_secret = hs.config.jwt_secret
self.jwt_algorithm = hs.config.jwt_algorithm
+ self.saml2_enabled = hs.config.saml2_enabled
self.cas_enabled = hs.config.cas_enabled
self.auth_handler = self.hs.get_auth_handler()
self.registration_handler = hs.get_registration_handler()
@@ -97,6 +98,9 @@ class LoginRestServlet(RestServlet):
flows = []
if self.jwt_enabled:
flows.append({"type": LoginRestServlet.JWT_TYPE})
+ if self.saml2_enabled:
+ flows.append({"type": LoginRestServlet.SSO_TYPE})
+ flows.append({"type": LoginRestServlet.TOKEN_TYPE})
if self.cas_enabled:
flows.append({"type": LoginRestServlet.SSO_TYPE})
@@ -279,19 +283,7 @@ class LoginRestServlet(RestServlet):
yield auth_handler.validate_short_term_login_token_and_get_user_id(token)
)
- device_id = login_submission.get("device_id")
- initial_display_name = login_submission.get("initial_device_display_name")
- device_id, access_token = yield self.registration_handler.register_device(
- user_id, device_id, initial_display_name
- )
-
- result = {
- "user_id": user_id, # may have changed
- "access_token": access_token,
- "home_server": self.hs.hostname,
- "device_id": device_id,
- }
-
+ result = yield self._register_device_with_callback(user_id, login_submission)
defer.returnValue(result)
@defer.inlineCallbacks
@@ -320,61 +312,63 @@ class LoginRestServlet(RestServlet):
user_id = UserID(user, self.hs.hostname).to_string()
- auth_handler = self.auth_handler
- registered_user_id = yield auth_handler.check_user_exists(user_id)
- if registered_user_id:
- device_id = login_submission.get("device_id")
- initial_display_name = login_submission.get("initial_device_display_name")
- device_id, access_token = yield self.registration_handler.register_device(
- registered_user_id, device_id, initial_display_name
+ registered_user_id = yield self.auth_handler.check_user_exists(user_id)
+ if not registered_user_id:
+ registered_user_id, _ = (
+ yield self.registration_handler.register(
+ localpart=user, generate_token=False
+ )
)
- result = {
- "user_id": registered_user_id,
- "access_token": access_token,
- "home_server": self.hs.hostname,
- }
- else:
- user_id, access_token = (
- yield self.registration_handler.register(localpart=user)
- )
+ result = yield self._register_device_with_callback(
+ registered_user_id, login_submission
+ )
+ defer.returnValue(result)
- device_id = login_submission.get("device_id")
- initial_display_name = login_submission.get("initial_device_display_name")
- device_id, access_token = yield self.registration_handler.register_device(
- registered_user_id, device_id, initial_display_name
- )
- result = {
- "user_id": user_id, # may have changed
- "access_token": access_token,
- "home_server": self.hs.hostname,
- }
+class BaseSSORedirectServlet(RestServlet):
+ """Common base class for /login/sso/redirect impls"""
- defer.returnValue(result)
+ PATTERNS = client_patterns("/login/(cas|sso)/redirect", v1=True)
+ def on_GET(self, request):
+ args = request.args
+ if b"redirectUrl" not in args:
+ return 400, "Redirect URL not specified for SSO auth"
+ client_redirect_url = args[b"redirectUrl"][0]
+ sso_url = self.get_sso_url(client_redirect_url)
+ request.redirect(sso_url)
+ finish_request(request)
+
+ def get_sso_url(self, client_redirect_url):
+ """Get the URL to redirect to, to perform SSO auth
+
+ Args:
+ client_redirect_url (bytes): the URL that we should redirect the
+ client to when everything is done
+
+ Returns:
+ bytes: URL to redirect to
+ """
+ # to be implemented by subclasses
+ raise NotImplementedError()
-class CasRedirectServlet(RestServlet):
- PATTERNS = client_patterns("/login/(cas|sso)/redirect", v1=True)
+class CasRedirectServlet(BaseSSORedirectServlet):
def __init__(self, hs):
super(CasRedirectServlet, self).__init__()
self.cas_server_url = hs.config.cas_server_url.encode("ascii")
self.cas_service_url = hs.config.cas_service_url.encode("ascii")
- def on_GET(self, request):
- args = request.args
- if b"redirectUrl" not in args:
- return (400, "Redirect URL not specified for CAS auth")
+ def get_sso_url(self, client_redirect_url):
client_redirect_url_param = urllib.parse.urlencode(
- {b"redirectUrl": args[b"redirectUrl"][0]}
+ {b"redirectUrl": client_redirect_url}
).encode("ascii")
hs_redirect_url = self.cas_service_url + b"/_matrix/client/r0/login/cas/ticket"
service_param = urllib.parse.urlencode(
{b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)}
).encode("ascii")
- request.redirect(b"%s/login?%s" % (self.cas_server_url, service_param))
- finish_request(request)
+ return b"%s/login?%s" % (self.cas_server_url, service_param)
class CasTicketServlet(RestServlet):
@@ -457,6 +451,16 @@ class CasTicketServlet(RestServlet):
return user, attributes
+class SAMLRedirectServlet(BaseSSORedirectServlet):
+ PATTERNS = client_patterns("/login/sso/redirect", v1=True)
+
+ def __init__(self, hs):
+ self._saml_handler = hs.get_saml_handler()
+
+ def get_sso_url(self, client_redirect_url):
+ return self._saml_handler.handle_redirect_request(client_redirect_url)
+
+
class SSOAuthHandler(object):
"""
Utility class for Resources and Servlets which handle the response from a SSO
@@ -532,3 +536,5 @@ def register_servlets(hs, http_server):
if hs.config.cas_enabled:
CasRedirectServlet(hs).register(http_server)
CasTicketServlet(hs).register(http_server)
+ elif hs.config.saml2_enabled:
+ SAMLRedirectServlet(hs).register(http_server)
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 9a32892d8b..1ddf9997ff 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -24,12 +24,14 @@ import jinja2
from jinja2 import TemplateNotFound
from twisted.internet import defer
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import NotFoundError, StoreError, SynapseError
from synapse.config import ConfigError
-from synapse.http.server import finish_request, wrap_html_request_handler
+from synapse.http.server import (
+ DirectServeResource,
+ finish_request,
+ wrap_html_request_handler,
+)
from synapse.http.servlet import parse_string
from synapse.types import UserID
@@ -47,7 +49,7 @@ else:
return a == b
-class ConsentResource(Resource):
+class ConsentResource(DirectServeResource):
"""A twisted Resource to display a privacy policy and gather consent to it
When accessed via GET, returns the privacy policy via a template.
@@ -87,7 +89,7 @@ class ConsentResource(Resource):
Args:
hs (synapse.server.HomeServer): homeserver
"""
- Resource.__init__(self)
+ super().__init__()
self.hs = hs
self.store = hs.get_datastore()
@@ -118,18 +120,12 @@ class ConsentResource(Resource):
self._hmac_secret = hs.config.form_secret.encode("utf-8")
- def render_GET(self, request):
- self._async_render_GET(request)
- return NOT_DONE_YET
-
@wrap_html_request_handler
- @defer.inlineCallbacks
- def _async_render_GET(self, request):
+ async def _async_render_GET(self, request):
"""
Args:
request (twisted.web.http.Request):
"""
-
version = parse_string(request, "v", default=self._default_consent_version)
username = parse_string(request, "u", required=False, default="")
userhmac = None
@@ -145,7 +141,7 @@ class ConsentResource(Resource):
else:
qualified_user_id = UserID(username, self.hs.hostname).to_string()
- u = yield self.store.get_user_by_id(qualified_user_id)
+ u = await defer.maybeDeferred(self.store.get_user_by_id, qualified_user_id)
if u is None:
raise NotFoundError("Unknown user")
@@ -165,13 +161,8 @@ class ConsentResource(Resource):
except TemplateNotFound:
raise NotFoundError("Unknown policy version")
- def render_POST(self, request):
- self._async_render_POST(request)
- return NOT_DONE_YET
-
@wrap_html_request_handler
- @defer.inlineCallbacks
- def _async_render_POST(self, request):
+ async def _async_render_POST(self, request):
"""
Args:
request (twisted.web.http.Request):
@@ -188,12 +179,12 @@ class ConsentResource(Resource):
qualified_user_id = UserID(username, self.hs.hostname).to_string()
try:
- yield self.store.user_set_consent_version(qualified_user_id, version)
+ await self.store.user_set_consent_version(qualified_user_id, version)
except StoreError as e:
if e.code != 404:
raise
raise NotFoundError("Unknown user")
- yield self.registration_handler.post_consent_actions(qualified_user_id)
+ await self.registration_handler.post_consent_actions(qualified_user_id)
try:
self._render_template(request, "success.html")
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index ec8b9d7269..031a316693 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -16,18 +16,20 @@ import logging
from io import BytesIO
from twisted.internet import defer
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import Codes, SynapseError
from synapse.crypto.keyring import ServerKeyFetcher
-from synapse.http.server import respond_with_json_bytes, wrap_json_request_handler
+from synapse.http.server import (
+ DirectServeResource,
+ respond_with_json_bytes,
+ wrap_json_request_handler,
+)
from synapse.http.servlet import parse_integer, parse_json_object_from_request
logger = logging.getLogger(__name__)
-class RemoteKey(Resource):
+class RemoteKey(DirectServeResource):
"""HTTP resource for retreiving the TLS certificate and NACL signature
verification keys for a collection of servers. Checks that the reported
X.509 TLS certificate matches the one used in the HTTPS connection. Checks
@@ -94,13 +96,8 @@ class RemoteKey(Resource):
self.clock = hs.get_clock()
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
- def render_GET(self, request):
- self.async_render_GET(request)
- return NOT_DONE_YET
-
@wrap_json_request_handler
- @defer.inlineCallbacks
- def async_render_GET(self, request):
+ async def _async_render_GET(self, request):
if len(request.postpath) == 1:
server, = request.postpath
query = {server.decode("ascii"): {}}
@@ -114,20 +111,15 @@ class RemoteKey(Resource):
else:
raise SynapseError(404, "Not found %r" % request.postpath, Codes.NOT_FOUND)
- yield self.query_keys(request, query, query_remote_on_cache_miss=True)
-
- def render_POST(self, request):
- self.async_render_POST(request)
- return NOT_DONE_YET
+ await self.query_keys(request, query, query_remote_on_cache_miss=True)
@wrap_json_request_handler
- @defer.inlineCallbacks
- def async_render_POST(self, request):
+ async def _async_render_POST(self, request):
content = parse_json_object_from_request(request)
query = content["server_keys"]
- yield self.query_keys(request, query, query_remote_on_cache_miss=True)
+ await self.query_keys(request, query, query_remote_on_cache_miss=True)
@defer.inlineCallbacks
def query_keys(self, request, query, query_remote_on_cache_miss=False):
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 3318638d3e..5fefee4dde 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -25,7 +25,7 @@ from twisted.protocols.basic import FileSender
from synapse.api.errors import Codes, SynapseError, cs_error
from synapse.http.server import finish_request, respond_with_json
-from synapse.util import logcontext
+from synapse.logging.context import make_deferred_yieldable
from synapse.util.stringutils import is_ascii
logger = logging.getLogger(__name__)
@@ -75,9 +75,7 @@ def respond_with_file(request, media_type, file_path, file_size=None, upload_nam
add_file_headers(request, media_type, file_size, upload_name)
with open(file_path, "rb") as f:
- yield logcontext.make_deferred_yieldable(
- FileSender().beginFileTransfer(f, request)
- )
+ yield make_deferred_yieldable(FileSender().beginFileTransfer(f, request))
finish_request(request)
else:
diff --git a/synapse/rest/media/v1/config_resource.py b/synapse/rest/media/v1/config_resource.py
index fa3d6680fc..9f747de263 100644
--- a/synapse/rest/media/v1/config_resource.py
+++ b/synapse/rest/media/v1/config_resource.py
@@ -14,31 +14,28 @@
# limitations under the License.
#
-from twisted.internet import defer
-from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
-from synapse.http.server import respond_with_json, wrap_json_request_handler
+from synapse.http.server import (
+ DirectServeResource,
+ respond_with_json,
+ wrap_json_request_handler,
+)
-class MediaConfigResource(Resource):
+class MediaConfigResource(DirectServeResource):
isLeaf = True
def __init__(self, hs):
- Resource.__init__(self)
+ super().__init__()
config = hs.get_config()
self.clock = hs.get_clock()
self.auth = hs.get_auth()
self.limits_dict = {"m.upload.size": config.max_upload_size}
- def render_GET(self, request):
- self._async_render_GET(request)
- return NOT_DONE_YET
-
@wrap_json_request_handler
- @defer.inlineCallbacks
- def _async_render_GET(self, request):
- yield self.auth.get_user_by_req(request)
+ async def _async_render_GET(self, request):
+ await self.auth.get_user_by_req(request)
respond_with_json(request, 200, self.limits_dict, send_cors=True)
def render_OPTIONS(self, request):
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index a21a35f843..66a01559e1 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -14,37 +14,31 @@
# limitations under the License.
import logging
-from twisted.internet import defer
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
-
import synapse.http.servlet
-from synapse.http.server import set_cors_headers, wrap_json_request_handler
+from synapse.http.server import (
+ DirectServeResource,
+ set_cors_headers,
+ wrap_json_request_handler,
+)
from ._base import parse_media_id, respond_404
logger = logging.getLogger(__name__)
-class DownloadResource(Resource):
+class DownloadResource(DirectServeResource):
isLeaf = True
def __init__(self, hs, media_repo):
- Resource.__init__(self)
-
+ super().__init__()
self.media_repo = media_repo
self.server_name = hs.hostname
# this is expected by @wrap_json_request_handler
self.clock = hs.get_clock()
- def render_GET(self, request):
- self._async_render_GET(request)
- return NOT_DONE_YET
-
@wrap_json_request_handler
- @defer.inlineCallbacks
- def _async_render_GET(self, request):
+ async def _async_render_GET(self, request):
set_cors_headers(request)
request.setHeader(
b"Content-Security-Policy",
@@ -58,7 +52,7 @@ class DownloadResource(Resource):
)
server_name, media_id, name = parse_media_id(request)
if server_name == self.server_name:
- yield self.media_repo.get_local_media(request, media_id, name)
+ await self.media_repo.get_local_media(request, media_id, name)
else:
allow_remote = synapse.http.servlet.parse_boolean(
request, "allow_remote", default=True
@@ -72,4 +66,4 @@ class DownloadResource(Resource):
respond_404(request)
return
- yield self.media_repo.get_remote_media(request, server_name, media_id, name)
+ await self.media_repo.get_remote_media(request, server_name, media_id, name)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index df3d985a38..65afffbb42 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -33,8 +33,8 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
+from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util import logcontext
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import random_string
@@ -463,7 +463,7 @@ class MediaRepository(object):
)
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield logcontext.defer_to_thread(
+ t_byte_source = yield defer_to_thread(
self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer,
@@ -511,7 +511,7 @@ class MediaRepository(object):
)
thumbnailer = Thumbnailer(input_path)
- t_byte_source = yield logcontext.defer_to_thread(
+ t_byte_source = yield defer_to_thread(
self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer,
@@ -596,7 +596,7 @@ class MediaRepository(object):
return
if thumbnailer.transpose_method is not None:
- m_width, m_height = yield logcontext.defer_to_thread(
+ m_width, m_height = yield defer_to_thread(
self.hs.get_reactor(), thumbnailer.transpose
)
@@ -616,11 +616,11 @@ class MediaRepository(object):
for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
# Generate the thumbnail
if t_method == "crop":
- t_byte_source = yield logcontext.defer_to_thread(
+ t_byte_source = yield defer_to_thread(
self.hs.get_reactor(), thumbnailer.crop, t_width, t_height, t_type
)
elif t_method == "scale":
- t_byte_source = yield logcontext.defer_to_thread(
+ t_byte_source = yield defer_to_thread(
self.hs.get_reactor(), thumbnailer.scale, t_width, t_height, t_type
)
else:
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index eff86836fb..25e5ac2848 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -24,9 +24,8 @@ import six
from twisted.internet import defer
from twisted.protocols.basic import FileSender
-from synapse.util import logcontext
+from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.util.file_consumer import BackgroundFileConsumer
-from synapse.util.logcontext import make_deferred_yieldable
from ._base import Responder
@@ -65,7 +64,7 @@ class MediaStorage(object):
with self.store_into_file(file_info) as (f, fname, finish_cb):
# Write to the main repository
- yield logcontext.defer_to_thread(
+ yield defer_to_thread(
self.hs.get_reactor(), _write_file_synchronously, source, f
)
yield finish_cb()
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index de6f292ffb..5871737bfd 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -32,22 +32,21 @@ from canonicaljson import json
from twisted.internet import defer
from twisted.internet.error import DNSLookupError
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import Codes, SynapseError
from synapse.http.client import SimpleHttpClient
from synapse.http.server import (
+ DirectServeResource,
respond_with_json,
respond_with_json_bytes,
wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_string
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.media.v1._base import get_filename_from_headers
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string
from ._base import FileInfo
@@ -58,11 +57,11 @@ _charset_match = re.compile(br"<\s*meta[^>]*charset\s*=\s*([a-z0-9-]+)", flags=r
_content_type_match = re.compile(r'.*; *charset="?(.*?)"?(;|$)', flags=re.I)
-class PreviewUrlResource(Resource):
+class PreviewUrlResource(DirectServeResource):
isLeaf = True
def __init__(self, hs, media_repo, media_storage):
- Resource.__init__(self)
+ super().__init__()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
@@ -96,18 +95,14 @@ class PreviewUrlResource(Resource):
)
def render_OPTIONS(self, request):
+ request.setHeader(b"Allow", b"OPTIONS, GET")
return respond_with_json(request, 200, {}, send_cors=True)
- def render_GET(self, request):
- self._async_render_GET(request)
- return NOT_DONE_YET
-
@wrap_json_request_handler
- @defer.inlineCallbacks
- def _async_render_GET(self, request):
+ async def _async_render_GET(self, request):
# XXX: if get_user_by_req fails, what should we do in an async render?
- requester = yield self.auth.get_user_by_req(request)
+ requester = await self.auth.get_user_by_req(request)
url = parse_string(request, "url")
if b"ts" in request.args:
ts = parse_integer(request, "ts")
@@ -159,7 +154,7 @@ class PreviewUrlResource(Resource):
else:
logger.info("Returning cached response")
- og = yield make_deferred_yieldable(observable.observe())
+ og = await make_deferred_yieldable(defer.maybeDeferred(observable.observe))
respond_with_json_bytes(request, 200, og, send_cors=True)
@defer.inlineCallbacks
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 359b45ebfc..e8f559acc1 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -20,8 +20,7 @@ import shutil
from twisted.internet import defer
from synapse.config._base import Config
-from synapse.util import logcontext
-from synapse.util.logcontext import run_in_background
+from synapse.logging.context import defer_to_thread, run_in_background
from .media_storage import FileResponder
@@ -125,7 +124,7 @@ class FileStorageProviderBackend(StorageProvider):
if not os.path.exists(dirname):
os.makedirs(dirname)
- return logcontext.defer_to_thread(
+ return defer_to_thread(
self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname
)
diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py
index ca84c9f139..08329884ac 100644
--- a/synapse/rest/media/v1/thumbnail_resource.py
+++ b/synapse/rest/media/v1/thumbnail_resource.py
@@ -17,10 +17,12 @@
import logging
from twisted.internet import defer
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
-from synapse.http.server import set_cors_headers, wrap_json_request_handler
+from synapse.http.server import (
+ DirectServeResource,
+ set_cors_headers,
+ wrap_json_request_handler,
+)
from synapse.http.servlet import parse_integer, parse_string
from ._base import (
@@ -34,11 +36,11 @@ from ._base import (
logger = logging.getLogger(__name__)
-class ThumbnailResource(Resource):
+class ThumbnailResource(DirectServeResource):
isLeaf = True
def __init__(self, hs, media_repo, media_storage):
- Resource.__init__(self)
+ super().__init__()
self.store = hs.get_datastore()
self.media_repo = media_repo
@@ -47,13 +49,8 @@ class ThumbnailResource(Resource):
self.server_name = hs.hostname
self.clock = hs.get_clock()
- def render_GET(self, request):
- self._async_render_GET(request)
- return NOT_DONE_YET
-
@wrap_json_request_handler
- @defer.inlineCallbacks
- def _async_render_GET(self, request):
+ async def _async_render_GET(self, request):
set_cors_headers(request)
server_name, media_id, _ = parse_media_id(request)
width = parse_integer(request, "width", required=True)
@@ -63,21 +60,21 @@ class ThumbnailResource(Resource):
if server_name == self.server_name:
if self.dynamic_thumbnails:
- yield self._select_or_generate_local_thumbnail(
+ await self._select_or_generate_local_thumbnail(
request, media_id, width, height, method, m_type
)
else:
- yield self._respond_local_thumbnail(
+ await self._respond_local_thumbnail(
request, media_id, width, height, method, m_type
)
self.media_repo.mark_recently_accessed(None, media_id)
else:
if self.dynamic_thumbnails:
- yield self._select_or_generate_remote_thumbnail(
+ await self._select_or_generate_remote_thumbnail(
request, server_name, media_id, width, height, method, m_type
)
else:
- yield self._respond_remote_thumbnail(
+ await self._respond_remote_thumbnail(
request, server_name, media_id, width, height, method, m_type
)
self.media_repo.mark_recently_accessed(server_name, media_id)
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index d1d7e959f0..5d76bbdf68 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -15,22 +15,24 @@
import logging
-from twisted.internet import defer
-from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import SynapseError
-from synapse.http.server import respond_with_json, wrap_json_request_handler
+from synapse.http.server import (
+ DirectServeResource,
+ respond_with_json,
+ wrap_json_request_handler,
+)
from synapse.http.servlet import parse_string
logger = logging.getLogger(__name__)
-class UploadResource(Resource):
+class UploadResource(DirectServeResource):
isLeaf = True
def __init__(self, hs, media_repo):
- Resource.__init__(self)
+ super().__init__()
self.media_repo = media_repo
self.filepaths = media_repo.filepaths
@@ -41,18 +43,13 @@ class UploadResource(Resource):
self.max_upload_size = hs.config.max_upload_size
self.clock = hs.get_clock()
- def render_POST(self, request):
- self._async_render_POST(request)
- return NOT_DONE_YET
-
def render_OPTIONS(self, request):
respond_with_json(request, 200, {}, send_cors=True)
return NOT_DONE_YET
@wrap_json_request_handler
- @defer.inlineCallbacks
- def _async_render_POST(self, request):
- requester = yield self.auth.get_user_by_req(request)
+ async def _async_render_POST(self, request):
+ requester = await self.auth.get_user_by_req(request)
# TODO: The checks here are a bit late. The content will have
# already been uploaded to a tmp file at this point
content_length = request.getHeader(b"Content-Length").decode("ascii")
@@ -81,7 +78,7 @@ class UploadResource(Resource):
# disposition = headers.getRawHeaders(b"Content-Disposition")[0]
# TODO(markjh): parse content-dispostion
- content_uri = yield self.media_repo.create_content(
+ content_uri = await self.media_repo.create_content(
media_type, upload_name, request.content, content_length, requester.user
)
diff --git a/synapse/rest/saml2/response_resource.py b/synapse/rest/saml2/response_resource.py
index ab14b70675..69ecc5e4b4 100644
--- a/synapse/rest/saml2/response_resource.py
+++ b/synapse/rest/saml2/response_resource.py
@@ -13,59 +13,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import logging
-import saml2
-from saml2.client import Saml2Client
+from synapse.http.server import DirectServeResource, wrap_html_request_handler
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
-from synapse.api.errors import CodeMessageException
-from synapse.http.server import wrap_html_request_handler
-from synapse.http.servlet import parse_string
-from synapse.rest.client.v1.login import SSOAuthHandler
-
-logger = logging.getLogger(__name__)
-
-
-class SAML2ResponseResource(Resource):
+class SAML2ResponseResource(DirectServeResource):
"""A Twisted web resource which handles the SAML response"""
isLeaf = 1
def __init__(self, hs):
- Resource.__init__(self)
-
- self._saml_client = Saml2Client(hs.config.saml2_sp_config)
- self._sso_auth_handler = SSOAuthHandler(hs)
-
- def render_POST(self, request):
- self._async_render_POST(request)
- return NOT_DONE_YET
+ super().__init__()
+ self._saml_handler = hs.get_saml_handler()
@wrap_html_request_handler
- def _async_render_POST(self, request):
- resp_bytes = parse_string(request, "SAMLResponse", required=True)
- relay_state = parse_string(request, "RelayState", required=True)
-
- try:
- saml2_auth = self._saml_client.parse_authn_request_response(
- resp_bytes, saml2.BINDING_HTTP_POST
- )
- except Exception as e:
- logger.warning("Exception parsing SAML2 response", exc_info=1)
- raise CodeMessageException(400, "Unable to parse SAML2 response: %s" % (e,))
-
- if saml2_auth.not_signed:
- raise CodeMessageException(400, "SAML2 response was not signed")
-
- if "uid" not in saml2_auth.ava:
- raise CodeMessageException(400, "uid not in SAML2 response")
-
- username = saml2_auth.ava["uid"][0]
-
- displayName = saml2_auth.ava.get("displayName", [None])[0]
- return self._sso_auth_handler.on_successful_auth(
- username, request, relay_state, user_display_name=displayName
- )
+ async def _async_render_POST(self, request):
+ return await self._saml_handler.handle_saml_response(request)
diff --git a/synapse/server.py b/synapse/server.py
index a9592c396c..9e28dba2b1 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -194,6 +194,7 @@ class HomeServer(object):
"sendmail",
"registration_handler",
"account_validity_handler",
+ "saml_handler",
"event_client_serializer",
]
@@ -524,6 +525,11 @@ class HomeServer(object):
def build_account_validity_handler(self):
return AccountValidityHandler(self)
+ def build_saml_handler(self):
+ from synapse.handlers.saml_handler import SamlHandler
+
+ return SamlHandler(self)
+
def build_event_client_serializer(self):
return EventClientSerializer(self)
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 1b454a56a1..9f708fa205 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -28,11 +28,11 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events.snapshot import EventContext
+from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.expiringcache import ExpiringCache
-from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 29589853c6..2f940dbae6 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -30,12 +30,12 @@ from prometheus_client import Histogram
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id
from synapse.util import batch_iter
from synapse.util.caches.descriptors import Cache
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.stringutils import exception_to_unicode
# import a function which will return a monotonic time, in seconds
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 3413a46675..d2b113a4e7 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -24,6 +24,7 @@ from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.util import batch_iter
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
logger = logging.getLogger(__name__)
@@ -391,22 +392,47 @@ class DeviceWorkerStore(SQLBaseStore):
return now_stream_id, []
- @defer.inlineCallbacks
- def get_user_whose_devices_changed(self, from_key):
- """Get set of users whose devices have changed since `from_key`.
+ def get_users_whose_devices_changed(self, from_key, user_ids):
+ """Get set of users whose devices have changed since `from_key` that
+ are in the given list of user_ids.
+
+ Args:
+ from_key (str): The device lists stream token
+ user_ids (Iterable[str])
+
+ Returns:
+ Deferred[set[str]]: The set of user_ids whose devices have changed
+ since `from_key`
"""
from_key = int(from_key)
- changed = self._device_list_stream_cache.get_all_entities_changed(from_key)
- if changed is not None:
- defer.returnValue(set(changed))
- sql = """
- SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
- """
- rows = yield self._execute(
- "get_user_whose_devices_changed", None, sql, from_key
+ # Get set of users who *may* have changed. Users not in the returned
+ # list have definitely not changed.
+ to_check = list(
+ self._device_list_stream_cache.get_entities_changed(user_ids, from_key)
+ )
+
+ if not to_check:
+ return defer.succeed(set())
+
+ def _get_users_whose_devices_changed_txn(txn):
+ changes = set()
+
+ sql = """
+ SELECT DISTINCT user_id FROM device_lists_stream
+ WHERE stream_id > ?
+ AND user_id IN (%s)
+ """
+
+ for chunk in batch_iter(to_check, 100):
+ txn.execute(sql % (",".join("?" for _ in chunk),), (from_key,) + chunk)
+ changes.update(user_id for user_id, in txn)
+
+ return changes
+
+ return self.runInteraction(
+ "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn
)
- defer.returnValue(set(row[0] for row in rows))
def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index fefba39ea1..b486ca50eb 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,6 +33,8 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.logging.utils import log_function
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
@@ -45,8 +47,6 @@ from synapse.util import batch_iter
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.util.frozenutils import frozendict_json_encoder
-from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
-from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -253,7 +253,14 @@ class EventsStore(
)
# Read the extrems every 60 minutes
- hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+ def read_forward_extremities():
+ # run as a background process to make sure that the database transactions
+ # have a logcontext to report to
+ return run_as_background_process(
+ "read_forward_extremities", self._read_forward_extremities
+ )
+
+ hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000)
@defer.inlineCallbacks
def _read_forward_extremities(self):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 6d680d405a..09db872511 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -29,14 +29,14 @@ from synapse.api.room_versions import EventFormatVersions
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import get_domain_from_id
-from synapse.util.logcontext import (
+from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
)
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import get_domain_from_id
from synapse.util.metrics import Measure
from ._base import SQLBaseStore
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 983ce13291..aea5b3276b 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -25,6 +25,7 @@ from twisted.internet import defer
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, ThreepidValidationError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.types import UserID
@@ -432,19 +433,6 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_3pid_guest_access_token(self, medium, address):
- ret = yield self._simple_select_one(
- "threepid_guest_access_tokens",
- {"medium": medium, "address": address},
- ["guest_access_token"],
- True,
- "get_3pid_guest_access_token",
- )
- if ret:
- defer.returnValue(ret["guest_access_token"])
- defer.returnValue(None)
-
- @defer.inlineCallbacks
def get_user_id_by_threepid(self, medium, address, require_verified=False):
"""Returns user id from threepid
@@ -619,9 +607,15 @@ class RegistrationStore(
)
# Create a background job for culling expired 3PID validity tokens
- hs.get_clock().looping_call(
- self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
- )
+ def start_cull():
+ # run as a background process to make sure that the database transactions
+ # have a logcontext to report to
+ return run_as_background_process(
+ "cull_expired_threepid_validation_tokens",
+ self.cull_expired_threepid_validation_tokens,
+ )
+
+ hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
@defer.inlineCallbacks
def _backgroud_update_set_deactivated_flag(self, progress, batch_size):
@@ -972,40 +966,6 @@ class RegistrationStore(
defer.returnValue(res if res else False)
- @defer.inlineCallbacks
- def save_or_get_3pid_guest_access_token(
- self, medium, address, access_token, inviter_user_id
- ):
- """
- Gets the 3pid's guest access token if exists, else saves access_token.
-
- Args:
- medium (str): Medium of the 3pid. Must be "email".
- address (str): 3pid address.
- access_token (str): The access token to persist if none is
- already persisted.
- inviter_user_id (str): User ID of the inviter.
-
- Returns:
- deferred str: Whichever access token is persisted at the end
- of this function call.
- """
-
- def insert(txn):
- txn.execute(
- "INSERT INTO threepid_guest_access_tokens "
- "(medium, address, guest_access_token, first_inviter) "
- "VALUES (?, ?, ?, ?)",
- (medium, address, access_token, inviter_user_id),
- )
-
- try:
- yield self.runInteraction("save_3pid_guest_access_token", insert)
- defer.returnValue(access_token)
- except self.database_engine.module.IntegrityError:
- ret = yield self.get_3pid_guest_access_token(medium, address)
- defer.returnValue(ret)
-
def add_user_pending_deactivation(self, user_id):
"""
Adds a user to the table of users who need to be parted from all the rooms they're
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index d9482a3843..386a9dbe14 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -41,12 +41,12 @@ from six.moves import range
from twisted.internet import defer
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index b1188f6bcb..fd18619178 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -133,34 +133,6 @@ class TransactionStore(SQLBaseStore):
desc="set_received_txn_response",
)
- def prep_send_transaction(self, transaction_id, destination, origin_server_ts):
- """Persists an outgoing transaction and calculates the values for the
- previous transaction id list.
-
- This should be called before sending the transaction so that it has the
- correct value for the `prev_ids` key.
-
- Args:
- transaction_id (str)
- destination (str)
- origin_server_ts (int)
-
- Returns:
- list: A list of previous transaction ids.
- """
- return defer.succeed([])
-
- def delivered_txn(self, transaction_id, destination, code, response_dict):
- """Persists the response for an outgoing transaction.
-
- Args:
- transaction_id (str)
- destination (str)
- code (int)
- response_json (str)
- """
- pass
-
@defer.inlineCallbacks
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index dcc747cac1..f506b2a695 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -21,7 +21,7 @@ import attr
from twisted.internet import defer, task
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.logging import context
logger = logging.getLogger(__name__)
@@ -46,7 +46,7 @@ class Clock(object):
@defer.inlineCallbacks
def sleep(self, seconds):
d = defer.Deferred()
- with PreserveLoggingContext():
+ with context.PreserveLoggingContext():
self._reactor.callLater(seconds, d.callback, seconds)
res = yield d
defer.returnValue(res)
@@ -62,7 +62,10 @@ class Clock(object):
def looping_call(self, f, msec):
"""Call a function repeatedly.
- Waits `msec` initially before calling `f` for the first time.
+ Waits `msec` initially before calling `f` for the first time.
+
+ Note that the function will be called with no logcontext, so if it is anything
+ other than trivial, you probably want to wrap it in run_as_background_process.
Args:
f(function): The function to call repeatedly.
@@ -77,6 +80,9 @@ class Clock(object):
def call_later(self, delay, callback, *args, **kwargs):
"""Call something later
+ Note that the function will be called with no logcontext, so if it is anything
+ other than trivial, you probably want to wrap it in run_as_background_process.
+
Args:
delay(float): How long to wait in seconds.
callback(function): Function to call
@@ -85,10 +91,10 @@ class Clock(object):
"""
def wrapped_callback(*args, **kwargs):
- with PreserveLoggingContext():
+ with context.PreserveLoggingContext():
callback(*args, **kwargs)
- with PreserveLoggingContext():
+ with context.PreserveLoggingContext():
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer, ignore_errs=False):
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 7757b8708a..58a6b8764f 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -23,13 +23,12 @@ from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.python import failure
-from synapse.util import Clock, logcontext, unwrapFirstError
-
-from .logcontext import (
+from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
)
+from synapse.util import Clock, unwrapFirstError
logger = logging.getLogger(__name__)
@@ -153,7 +152,7 @@ def concurrently_execute(func, args, limit):
except StopIteration:
pass
- return logcontext.make_deferred_yieldable(
+ return make_deferred_yieldable(
defer.gatherResults(
[run_in_background(_concurrently_execute_inner) for _ in range(limit)],
consumeErrors=True,
@@ -174,7 +173,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
Deferred[list]: Resolved when all functions have been invoked, or errors if
one of the function calls fails.
"""
- return logcontext.make_deferred_yieldable(
+ return make_deferred_yieldable(
defer.gatherResults(
[run_in_background(func, item, *args, **kwargs) for item in iter],
consumeErrors=True,
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index d2f25063aa..675db2f448 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -24,7 +24,8 @@ from six import itervalues, string_types
from twisted.internet import defer
-from synapse.util import logcontext, unwrapFirstError
+from synapse.logging.context import make_deferred_yieldable, preserve_fn
+from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
@@ -388,7 +389,7 @@ class CacheDescriptor(_CacheDescriptorBase):
except KeyError:
ret = defer.maybeDeferred(
- logcontext.preserve_fn(self.function_to_call), obj, *args, **kwargs
+ preserve_fn(self.function_to_call), obj, *args, **kwargs
)
def onErr(f):
@@ -408,7 +409,7 @@ class CacheDescriptor(_CacheDescriptorBase):
observer = result_d.observe()
if isinstance(observer, defer.Deferred):
- return logcontext.make_deferred_yieldable(observer)
+ return make_deferred_yieldable(observer)
else:
return observer
@@ -563,7 +564,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
cached_defers.append(
defer.maybeDeferred(
- logcontext.preserve_fn(self.function_to_call), **args_to_call
+ preserve_fn(self.function_to_call), **args_to_call
).addCallbacks(complete_all, errback)
)
@@ -571,7 +572,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
lambda _: results, unwrapFirstError
)
- return logcontext.make_deferred_yieldable(d)
+ return make_deferred_yieldable(d)
else:
return results
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index cbe54d45dd..d6908e169d 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -16,9 +16,9 @@ import logging
from twisted.internet import defer
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import register_cache
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -78,7 +78,7 @@ class ResponseCache(object):
*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
- logcontext.run_in_background).
+ synapse.logging.context.run_in_background).
Can return either a new Deferred (which also doesn't follow the synapse
logcontext rules), or, if *deferred* was already complete, the actual
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 5a79db821c..45af8d3eeb 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,8 +17,8 @@ import logging
from twisted.internet import defer
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 629ed44149..8b17d1c8b8 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -17,7 +17,7 @@ from six.moves import queue
from twisted.internet import threads
-from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.logging.context import make_deferred_yieldable, run_in_background
class BackgroundFileConsumer(object):
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 6b0d2deea0..40e5c10a49 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -1,4 +1,4 @@
-# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,668 +12,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-""" Thread-local-alike tracking of log contexts within synapse
-
-This module provides objects and utilities for tracking contexts through
-synapse code, so that log lines can include a request identifier, and so that
-CPU and database activity can be accounted for against the request that caused
-them.
-
-See doc/log_contexts.rst for details on how this works.
+"""
+Backwards compatibility re-exports of ``synapse.logging.context`` functionality.
"""
-import logging
-import threading
-
-from twisted.internet import defer, threads
-
-logger = logging.getLogger(__name__)
-
-try:
- import resource
-
- # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined
- # to be 1 on linux so we hard code it.
- RUSAGE_THREAD = 1
-
- # If the system doesn't support RUSAGE_THREAD then this should throw an
- # exception.
- resource.getrusage(RUSAGE_THREAD)
-
- def get_thread_resource_usage():
- return resource.getrusage(RUSAGE_THREAD)
-
-
-except Exception:
- # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
- # won't track resource usage by returning None.
- def get_thread_resource_usage():
- return None
-
-
-class ContextResourceUsage(object):
- """Object for tracking the resources used by a log context
-
- Attributes:
- ru_utime (float): user CPU time (in seconds)
- ru_stime (float): system CPU time (in seconds)
- db_txn_count (int): number of database transactions done
- db_sched_duration_sec (float): amount of time spent waiting for a
- database connection
- db_txn_duration_sec (float): amount of time spent doing database
- transactions (excluding scheduling time)
- evt_db_fetch_count (int): number of events requested from the database
- """
-
- __slots__ = [
- "ru_stime",
- "ru_utime",
- "db_txn_count",
- "db_txn_duration_sec",
- "db_sched_duration_sec",
- "evt_db_fetch_count",
- ]
-
- def __init__(self, copy_from=None):
- """Create a new ContextResourceUsage
-
- Args:
- copy_from (ContextResourceUsage|None): if not None, an object to
- copy stats from
- """
- if copy_from is None:
- self.reset()
- else:
- self.ru_utime = copy_from.ru_utime
- self.ru_stime = copy_from.ru_stime
- self.db_txn_count = copy_from.db_txn_count
-
- self.db_txn_duration_sec = copy_from.db_txn_duration_sec
- self.db_sched_duration_sec = copy_from.db_sched_duration_sec
- self.evt_db_fetch_count = copy_from.evt_db_fetch_count
-
- def copy(self):
- return ContextResourceUsage(copy_from=self)
-
- def reset(self):
- self.ru_stime = 0.0
- self.ru_utime = 0.0
- self.db_txn_count = 0
-
- self.db_txn_duration_sec = 0
- self.db_sched_duration_sec = 0
- self.evt_db_fetch_count = 0
-
- def __repr__(self):
- return (
- "<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
- "db_txn_count='%r', db_txn_duration_sec='%r', "
- "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
- ) % (
- self.ru_stime,
- self.ru_utime,
- self.db_txn_count,
- self.db_txn_duration_sec,
- self.db_sched_duration_sec,
- self.evt_db_fetch_count,
- )
-
- def __iadd__(self, other):
- """Add another ContextResourceUsage's stats to this one's.
-
- Args:
- other (ContextResourceUsage): the other resource usage object
- """
- self.ru_utime += other.ru_utime
- self.ru_stime += other.ru_stime
- self.db_txn_count += other.db_txn_count
- self.db_txn_duration_sec += other.db_txn_duration_sec
- self.db_sched_duration_sec += other.db_sched_duration_sec
- self.evt_db_fetch_count += other.evt_db_fetch_count
- return self
-
- def __isub__(self, other):
- self.ru_utime -= other.ru_utime
- self.ru_stime -= other.ru_stime
- self.db_txn_count -= other.db_txn_count
- self.db_txn_duration_sec -= other.db_txn_duration_sec
- self.db_sched_duration_sec -= other.db_sched_duration_sec
- self.evt_db_fetch_count -= other.evt_db_fetch_count
- return self
-
- def __add__(self, other):
- res = ContextResourceUsage(copy_from=self)
- res += other
- return res
-
- def __sub__(self, other):
- res = ContextResourceUsage(copy_from=self)
- res -= other
- return res
-
-
-class LoggingContext(object):
- """Additional context for log formatting. Contexts are scoped within a
- "with" block.
-
- If a parent is given when creating a new context, then:
- - logging fields are copied from the parent to the new context on entry
- - when the new context exits, the cpu usage stats are copied from the
- child to the parent
-
- Args:
- name (str): Name for the context for debugging.
- parent_context (LoggingContext|None): The parent of the new context
- """
-
- __slots__ = [
- "previous_context",
- "name",
- "parent_context",
- "_resource_usage",
- "usage_start",
- "main_thread",
- "alive",
- "request",
- "tag",
- ]
-
- thread_local = threading.local()
-
- class Sentinel(object):
- """Sentinel to represent the root context"""
-
- __slots__ = []
-
- def __str__(self):
- return "sentinel"
-
- def copy_to(self, record):
- pass
-
- def start(self):
- pass
-
- def stop(self):
- pass
-
- def add_database_transaction(self, duration_sec):
- pass
-
- def add_database_scheduled(self, sched_sec):
- pass
-
- def record_event_fetch(self, event_count):
- pass
-
- def __nonzero__(self):
- return False
-
- __bool__ = __nonzero__ # python3
-
- sentinel = Sentinel()
-
- def __init__(self, name=None, parent_context=None, request=None):
- self.previous_context = LoggingContext.current_context()
- self.name = name
-
- # track the resources used by this context so far
- self._resource_usage = ContextResourceUsage()
-
- # If alive has the thread resource usage when the logcontext last
- # became active.
- self.usage_start = None
-
- self.main_thread = threading.current_thread()
- self.request = None
- self.tag = ""
- self.alive = True
-
- self.parent_context = parent_context
-
- if self.parent_context is not None:
- self.parent_context.copy_to(self)
-
- if request is not None:
- # the request param overrides the request from the parent context
- self.request = request
-
- def __str__(self):
- if self.request:
- return str(self.request)
- return "%s@%x" % (self.name, id(self))
-
- @classmethod
- def current_context(cls):
- """Get the current logging context from thread local storage
-
- Returns:
- LoggingContext: the current logging context
- """
- return getattr(cls.thread_local, "current_context", cls.sentinel)
-
- @classmethod
- def set_current_context(cls, context):
- """Set the current logging context in thread local storage
- Args:
- context(LoggingContext): The context to activate.
- Returns:
- The context that was previously active
- """
- current = cls.current_context()
-
- if current is not context:
- current.stop()
- cls.thread_local.current_context = context
- context.start()
- return current
-
- def __enter__(self):
- """Enters this logging context into thread local storage"""
- old_context = self.set_current_context(self)
- if self.previous_context != old_context:
- logger.warn(
- "Expected previous context %r, found %r",
- self.previous_context,
- old_context,
- )
- self.alive = True
-
- return self
-
- def __exit__(self, type, value, traceback):
- """Restore the logging context in thread local storage to the state it
- was before this context was entered.
- Returns:
- None to avoid suppressing any exceptions that were thrown.
- """
- current = self.set_current_context(self.previous_context)
- if current is not self:
- if current is self.sentinel:
- logger.warning("Expected logging context %s was lost", self)
- else:
- logger.warning(
- "Expected logging context %s but found %s", self, current
- )
- self.previous_context = None
- self.alive = False
-
- # if we have a parent, pass our CPU usage stats on
- if self.parent_context is not None and hasattr(
- self.parent_context, "_resource_usage"
- ):
- self.parent_context._resource_usage += self._resource_usage
-
- # reset them in case we get entered again
- self._resource_usage.reset()
-
- def copy_to(self, record):
- """Copy logging fields from this context to a log record or
- another LoggingContext
- """
-
- # 'request' is the only field we currently use in the logger, so that's
- # all we need to copy
- record.request = self.request
-
- def start(self):
- if threading.current_thread() is not self.main_thread:
- logger.warning("Started logcontext %s on different thread", self)
- return
-
- # If we haven't already started record the thread resource usage so
- # far
- if not self.usage_start:
- self.usage_start = get_thread_resource_usage()
-
- def stop(self):
- if threading.current_thread() is not self.main_thread:
- logger.warning("Stopped logcontext %s on different thread", self)
- return
-
- # When we stop, let's record the cpu used since we started
- if not self.usage_start:
- logger.warning("Called stop on logcontext %s without calling start", self)
- return
-
- utime_delta, stime_delta = self._get_cputime()
- self._resource_usage.ru_utime += utime_delta
- self._resource_usage.ru_stime += stime_delta
-
- self.usage_start = None
-
- def get_resource_usage(self):
- """Get resources used by this logcontext so far.
-
- Returns:
- ContextResourceUsage: a *copy* of the object tracking resource
- usage so far
- """
- # we always return a copy, for consistency
- res = self._resource_usage.copy()
-
- # If we are on the correct thread and we're currently running then we
- # can include resource usage so far.
- is_main_thread = threading.current_thread() is self.main_thread
- if self.alive and self.usage_start and is_main_thread:
- utime_delta, stime_delta = self._get_cputime()
- res.ru_utime += utime_delta
- res.ru_stime += stime_delta
-
- return res
-
- def _get_cputime(self):
- """Get the cpu usage time so far
-
- Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
- """
- current = get_thread_resource_usage()
-
- utime_delta = current.ru_utime - self.usage_start.ru_utime
- stime_delta = current.ru_stime - self.usage_start.ru_stime
-
- # sanity check
- if utime_delta < 0:
- logger.error(
- "utime went backwards! %f < %f",
- current.ru_utime,
- self.usage_start.ru_utime,
- )
- utime_delta = 0
-
- if stime_delta < 0:
- logger.error(
- "stime went backwards! %f < %f",
- current.ru_stime,
- self.usage_start.ru_stime,
- )
- stime_delta = 0
-
- return utime_delta, stime_delta
-
- def add_database_transaction(self, duration_sec):
- if duration_sec < 0:
- raise ValueError("DB txn time can only be non-negative")
- self._resource_usage.db_txn_count += 1
- self._resource_usage.db_txn_duration_sec += duration_sec
-
- def add_database_scheduled(self, sched_sec):
- """Record a use of the database pool
-
- Args:
- sched_sec (float): number of seconds it took us to get a
- connection
- """
- if sched_sec < 0:
- raise ValueError("DB scheduling time can only be non-negative")
- self._resource_usage.db_sched_duration_sec += sched_sec
-
- def record_event_fetch(self, event_count):
- """Record a number of events being fetched from the db
-
- Args:
- event_count (int): number of events being fetched
- """
- self._resource_usage.evt_db_fetch_count += event_count
-
-
-class LoggingContextFilter(logging.Filter):
- """Logging filter that adds values from the current logging context to each
- record.
- Args:
- **defaults: Default values to avoid formatters complaining about
- missing fields
- """
-
- def __init__(self, **defaults):
- self.defaults = defaults
-
- def filter(self, record):
- """Add each fields from the logging contexts to the record.
- Returns:
- True to include the record in the log output.
- """
- context = LoggingContext.current_context()
- for key, value in self.defaults.items():
- setattr(record, key, value)
-
- # context should never be None, but if it somehow ends up being, then
- # we end up in a death spiral of infinite loops, so let's check, for
- # robustness' sake.
- if context is not None:
- context.copy_to(record)
-
- return True
-
-
-class PreserveLoggingContext(object):
- """Captures the current logging context and restores it when the scope is
- exited. Used to restore the context after a function using
- @defer.inlineCallbacks is resumed by a callback from the reactor."""
-
- __slots__ = ["current_context", "new_context", "has_parent"]
-
- def __init__(self, new_context=None):
- if new_context is None:
- new_context = LoggingContext.sentinel
- self.new_context = new_context
-
- def __enter__(self):
- """Captures the current logging context"""
- self.current_context = LoggingContext.set_current_context(self.new_context)
-
- if self.current_context:
- self.has_parent = self.current_context.previous_context is not None
- if not self.current_context.alive:
- logger.debug("Entering dead context: %s", self.current_context)
-
- def __exit__(self, type, value, traceback):
- """Restores the current logging context"""
- context = LoggingContext.set_current_context(self.current_context)
-
- if context != self.new_context:
- if context is LoggingContext.sentinel:
- logger.warning("Expected logging context %s was lost", self.new_context)
- else:
- logger.warning(
- "Expected logging context %s but found %s",
- self.new_context,
- context,
- )
-
- if self.current_context is not LoggingContext.sentinel:
- if not self.current_context.alive:
- logger.debug("Restoring dead context: %s", self.current_context)
-
-
-def nested_logging_context(suffix, parent_context=None):
- """Creates a new logging context as a child of another.
-
- The nested logging context will have a 'request' made up of the parent context's
- request, plus the given suffix.
-
- CPU/db usage stats will be added to the parent context's on exit.
-
- Normal usage looks like:
-
- with nested_logging_context(suffix):
- # ... do stuff
-
- Args:
- suffix (str): suffix to add to the parent context's 'request'.
- parent_context (LoggingContext|None): parent context. Will use the current context
- if None.
-
- Returns:
- LoggingContext: new logging context.
- """
- if parent_context is None:
- parent_context = LoggingContext.current_context()
- return LoggingContext(
- parent_context=parent_context, request=parent_context.request + "-" + suffix
- )
-
-
-def preserve_fn(f):
- """Function decorator which wraps the function with run_in_background"""
-
- def g(*args, **kwargs):
- return run_in_background(f, *args, **kwargs)
-
- return g
-
-
-def run_in_background(f, *args, **kwargs):
- """Calls a function, ensuring that the current context is restored after
- return from the function, and that the sentinel context is set once the
- deferred returned by the function completes.
-
- Useful for wrapping functions that return a deferred which you don't yield
- on (for instance because you want to pass it to deferred.gatherResults()).
-
- Note that if you completely discard the result, you should make sure that
- `f` doesn't raise any deferred exceptions, otherwise a scary-looking
- CRITICAL error about an unhandled error will be logged without much
- indication about where it came from.
- """
- current = LoggingContext.current_context()
- try:
- res = f(*args, **kwargs)
- except: # noqa: E722
- # the assumption here is that the caller doesn't want to be disturbed
- # by synchronous exceptions, so let's turn them into Failures.
- return defer.fail()
-
- if not isinstance(res, defer.Deferred):
- return res
-
- if res.called and not res.paused:
- # The function should have maintained the logcontext, so we can
- # optimise out the messing about
- return res
-
- # The function may have reset the context before returning, so
- # we need to restore it now.
- ctx = LoggingContext.set_current_context(current)
-
- # The original context will be restored when the deferred
- # completes, but there is nothing waiting for it, so it will
- # get leaked into the reactor or some other function which
- # wasn't expecting it. We therefore need to reset the context
- # here.
- #
- # (If this feels asymmetric, consider it this way: we are
- # effectively forking a new thread of execution. We are
- # probably currently within a ``with LoggingContext()`` block,
- # which is supposed to have a single entry and exit point. But
- # by spawning off another deferred, we are effectively
- # adding a new exit point.)
- res.addBoth(_set_context_cb, ctx)
- return res
-
-
-def make_deferred_yieldable(deferred):
- """Given a deferred, make it follow the Synapse logcontext rules:
-
- If the deferred has completed (or is not actually a Deferred), essentially
- does nothing (just returns another completed deferred with the
- result/failure).
-
- If the deferred has not yet completed, resets the logcontext before
- returning a deferred. Then, when the deferred completes, restores the
- current logcontext before running callbacks/errbacks.
-
- (This is more-or-less the opposite operation to run_in_background.)
- """
- if not isinstance(deferred, defer.Deferred):
- return deferred
-
- if deferred.called and not deferred.paused:
- # it looks like this deferred is ready to run any callbacks we give it
- # immediately. We may as well optimise out the logcontext faffery.
- return deferred
-
- # ok, we can't be sure that a yield won't block, so let's reset the
- # logcontext, and add a callback to the deferred to restore it.
- prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
- deferred.addBoth(_set_context_cb, prev_context)
- return deferred
-
-
-def _set_context_cb(result, context):
- """A callback function which just sets the logging context"""
- LoggingContext.set_current_context(context)
- return result
-
-
-def defer_to_thread(reactor, f, *args, **kwargs):
- """
- Calls the function `f` using a thread from the reactor's default threadpool and
- returns the result as a Deferred.
-
- Creates a new logcontext for `f`, which is created as a child of the current
- logcontext (so its CPU usage metrics will get attributed to the current
- logcontext). `f` should preserve the logcontext it is given.
-
- The result deferred follows the Synapse logcontext rules: you should `yield`
- on it.
-
- Args:
- reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
- the Deferred will be invoked, and whose threadpool we should use for the
- function.
-
- Normally this will be hs.get_reactor().
-
- f (callable): The function to call.
-
- args: positional arguments to pass to f.
-
- kwargs: keyword arguments to pass to f.
-
- Returns:
- Deferred: A Deferred which fires a callback with the result of `f`, or an
- errback if `f` throws an exception.
- """
- return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
-
-
-def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
- """
- A wrapper for twisted.internet.threads.deferToThreadpool, which handles
- logcontexts correctly.
-
- Calls the function `f` using a thread from the given threadpool and returns
- the result as a Deferred.
-
- Creates a new logcontext for `f`, which is created as a child of the current
- logcontext (so its CPU usage metrics will get attributed to the current
- logcontext). `f` should preserve the logcontext it is given.
-
- The result deferred follows the Synapse logcontext rules: you should `yield`
- on it.
-
- Args:
- reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
- the Deferred will be invoked. Normally this will be hs.get_reactor().
-
- threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
- running `f`. Normally this will be hs.get_reactor().getThreadPool().
-
- f (callable): The function to call.
-
- args: positional arguments to pass to f.
-
- kwargs: keyword arguments to pass to f.
-
- Returns:
- Deferred: A Deferred which fires a callback with the result of `f`, or an
- errback if `f` throws an exception.
- """
- logcontext = LoggingContext.current_context()
-
- def g():
- with LoggingContext(parent_context=logcontext):
- return f(*args, **kwargs)
-
- return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))
+from synapse.logging.context import (
+ LoggingContext,
+ LoggingContextFilter,
+ PreserveLoggingContext,
+ defer_to_thread,
+ make_deferred_yieldable,
+ nested_logging_context,
+ preserve_fn,
+ run_in_background,
+)
+
+__all__ = [
+ "defer_to_thread",
+ "LoggingContext",
+ "LoggingContextFilter",
+ "make_deferred_yieldable",
+ "nested_logging_context",
+ "preserve_fn",
+ "PreserveLoggingContext",
+ "run_in_background",
+]
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
index fbf570c756..320e8f8174 100644
--- a/synapse/util/logformatter.py
+++ b/synapse/util/logformatter.py
@@ -1,5 +1,4 @@
-# -*- coding: utf-8 -*-
-# Copyright 2017 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,41 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+"""
+Backwards compatibility re-exports of ``synapse.logging.formatter`` functionality.
+"""
-import logging
-import traceback
+from synapse.logging.formatter import LogFormatter
-from six import StringIO
-
-
-class LogFormatter(logging.Formatter):
- """Log formatter which gives more detail for exceptions
-
- This is the same as the standard log formatter, except that when logging
- exceptions [typically via log.foo("msg", exc_info=1)], it prints the
- sequence that led up to the point at which the exception was caught.
- (Normally only stack frames between the point the exception was raised and
- where it was caught are logged).
- """
-
- def __init__(self, *args, **kwargs):
- super(LogFormatter, self).__init__(*args, **kwargs)
-
- def formatException(self, ei):
- sio = StringIO()
- (typ, val, tb) = ei
-
- # log the stack above the exception capture point if possible, but
- # check that we actually have an f_back attribute to work around
- # https://twistedmatrix.com/trac/ticket/9305
-
- if tb and hasattr(tb.tb_frame, "f_back"):
- sio.write("Capture point (most recent call last):\n")
- traceback.print_stack(tb.tb_frame.f_back, None, sio)
-
- traceback.print_exception(typ, val, tb, None, sio)
- s = sio.getvalue()
- sio.close()
- if s[-1:] == "\n":
- s = s[:-1]
- return s
+__all__ = ["LogFormatter"]
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 01284d3cf8..c30b6de19c 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -20,8 +20,8 @@ from prometheus_client import Counter
from twisted.internet import defer
+from synapse.logging.context import LoggingContext
from synapse.metrics import InFlightGauge
-from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 06defa8199..5ca4521ce3 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -20,7 +20,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import LimitExceededError
-from synapse.util.logcontext import (
+from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
@@ -36,9 +36,11 @@ class FederationRateLimiter(object):
clock (Clock)
config (FederationRateLimitConfig)
"""
- self.clock = clock
- self._config = config
- self.ratelimiters = {}
+
+ def new_limiter():
+ return _PerHostRatelimiter(clock=clock, config=config)
+
+ self.ratelimiters = collections.defaultdict(new_limiter)
def ratelimit(self, host):
"""Used to ratelimit an incoming request from given host
@@ -53,11 +55,9 @@ class FederationRateLimiter(object):
host (str): Origin of incoming request.
Returns:
- _PerHostRatelimiter
+ context manager which returns a deferred.
"""
- return self.ratelimiters.setdefault(
- host, _PerHostRatelimiter(clock=self.clock, config=self._config)
- ).ratelimit()
+ return self.ratelimiters[host].ratelimit()
class _PerHostRatelimiter(object):
@@ -122,7 +122,7 @@ class _PerHostRatelimiter(object):
self.request_times.append(time_now)
def queue_request():
- if len(self.current_processing) > self.concurrent_requests:
+ if len(self.current_processing) >= self.concurrent_requests:
queue_defer = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer
logger.info(
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 1a77456498..d8d0ceae51 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -17,7 +17,7 @@ import random
from twisted.internet import defer
-import synapse.util.logcontext
+import synapse.logging.context
from synapse.api.errors import CodeMessageException
logger = logging.getLogger(__name__)
@@ -225,4 +225,4 @@ class RetryDestinationLimiter(object):
logger.exception("Failed to store destination_retry_timings")
# we deliberately do this in the background.
- synapse.util.logcontext.run_in_background(store_retry_timings)
+ synapse.logging.context.run_in_background(store_retry_timings)
|