diff options
Diffstat (limited to 'synapse')
30 files changed, 877 insertions, 619 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d9e943c39c..7ce6540bdd 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -606,21 +606,6 @@ class Auth(object): defer.returnValue(auth_ids) - def check_redaction(self, room_version, event, auth_events): - """Check whether the event sender is allowed to redact the target event. - - Returns: - True if the the sender is allowed to redact the target event if the - target event was created by them. - False if the sender is allowed to redact the target event with no - further checks. - - Raises: - AuthError if the event sender is definitely not allowed to redact - the target event. - """ - return event_auth.check_redaction(room_version, event, auth_events) - @defer.inlineCallbacks def check_can_change_room_list(self, room_id, user): """Check if the user is allowed to edit the room's entry in the diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 807f320b46..540dbd9236 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -149,8 +149,7 @@ def listen_metrics(bind_addresses, port): """ Start Prometheus metrics server. """ - from synapse.metrics import RegistryProxy - from prometheus_client import start_http_server + from synapse.metrics import RegistryProxy, start_http_server for host in bind_addresses: logger.info("Starting metrics listener on %s:%d", host, port) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index be44249ed6..e01f3e5f3b 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -27,8 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index ff11beca82..29bddc4823 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -28,8 +28,7 @@ from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index cacad25eac..042cfd04af 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -28,8 +28,7 @@ from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 11e80dbae0..76a97f8f32 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -29,8 +29,7 @@ from synapse.config.logger import setup_logging from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 97da7bdcbf..fec49d5092 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,9 +28,8 @@ from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import RegistryProxy +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 417a10bbd2..1f1f1df78e 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -30,8 +30,7 @@ from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 639b1429c0..0c075cb3f1 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -55,9 +55,8 @@ from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext -from synapse.metrics import RegistryProxy +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.module_api import ModuleApi from synapse.python_dependencies import check_requirements from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index f23b9b6eda..d70780e9d5 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -28,8 +28,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 4f929edf86..070de7d0b0 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -27,8 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import __func__ from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index de4797fddc..315c030694 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -32,8 +32,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 1177ddd72e..03ef21bd01 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -29,8 +29,7 @@ from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.site import SynapseSite from synapse.logging.context import LoggingContext, run_in_background -from synapse.metrics import RegistryProxy -from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 40502a5798..d321d00b80 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging import logging.config import os @@ -75,10 +76,8 @@ root: class LoggingConfig(Config): def read_config(self, config, **kwargs): - self.verbosity = config.get("verbose", 0) - self.no_redirect_stdio = config.get("no_redirect_stdio", False) self.log_config = self.abspath(config.get("log_config")) - self.log_file = self.abspath(config.get("log_file")) + self.no_redirect_stdio = config.get("no_redirect_stdio", False) def generate_config_section(self, config_dir_path, server_name, **kwargs): log_config = os.path.join(config_dir_path, server_name + ".log.config") @@ -94,39 +93,13 @@ class LoggingConfig(Config): ) def read_arguments(self, args): - if args.verbose is not None: - self.verbosity = args.verbose if args.no_redirect_stdio is not None: self.no_redirect_stdio = args.no_redirect_stdio - if args.log_config is not None: - self.log_config = args.log_config - if args.log_file is not None: - self.log_file = args.log_file @staticmethod def add_arguments(parser): logging_group = parser.add_argument_group("logging") logging_group.add_argument( - "-v", - "--verbose", - dest="verbose", - action="count", - help="The verbosity level. Specify multiple times to increase " - "verbosity. (Ignored if --log-config is specified.)", - ) - logging_group.add_argument( - "-f", - "--log-file", - dest="log_file", - help="File to log to. (Ignored if --log-config is specified.)", - ) - logging_group.add_argument( - "--log-config", - dest="log_config", - default=None, - help="Python logging config file", - ) - logging_group.add_argument( "-n", "--no-redirect-stdio", action="store_true", @@ -153,58 +126,29 @@ def setup_logging(config, use_worker_options=False): config (LoggingConfig | synapse.config.workers.WorkerConfig): configuration data - use_worker_options (bool): True to use 'worker_log_config' and - 'worker_log_file' options instead of 'log_config' and 'log_file'. + use_worker_options (bool): True to use the 'worker_log_config' option + instead of 'log_config'. register_sighup (func | None): Function to call to register a sighup handler. """ log_config = config.worker_log_config if use_worker_options else config.log_config - log_file = config.worker_log_file if use_worker_options else config.log_file - - log_format = ( - "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" - " - %(message)s" - ) if log_config is None: - # We don't have a logfile, so fall back to the 'verbosity' param from - # the config or cmdline. (Note that we generate a log config for new - # installs, so this will be an unusual case) - level = logging.INFO - level_for_storage = logging.INFO - if config.verbosity: - level = logging.DEBUG - if config.verbosity > 1: - level_for_storage = logging.DEBUG + log_format = ( + "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" + " - %(message)s" + ) logger = logging.getLogger("") - logger.setLevel(level) - - logging.getLogger("synapse.storage.SQL").setLevel(level_for_storage) + logger.setLevel(logging.INFO) + logging.getLogger("synapse.storage.SQL").setLevel(logging.INFO) formatter = logging.Formatter(log_format) - if log_file: - # TODO: Customisable file size / backup count - handler = logging.handlers.RotatingFileHandler( - log_file, maxBytes=(1000 * 1000 * 100), backupCount=3, encoding="utf8" - ) - - def sighup(signum, stack): - logger.info("Closing log file due to SIGHUP") - handler.doRollover() - logger.info("Opened new log file due to SIGHUP") - - else: - handler = logging.StreamHandler() - - def sighup(*args): - pass + handler = logging.StreamHandler() handler.setFormatter(formatter) - handler.addFilter(LoggingContextFilter(request="")) - logger.addHandler(handler) else: @@ -218,8 +162,7 @@ def setup_logging(config, use_worker_options=False): logging.info("Reloaded log config from %s due to SIGHUP", log_config) load_log_config() - - appbase.register_sighup(sighup) + appbase.register_sighup(sighup) # make sure that the first thing we log is a thing we can grep backwards # for diff --git a/synapse/config/server.py b/synapse/config/server.py index 080d0630bd..00170f1393 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -136,7 +136,7 @@ class ServerConfig(Config): # Whether to enable experimental MSC1849 (aka relations) support self.experimental_msc1849_support_enabled = config.get( - "experimental_msc1849_support_enabled", False + "experimental_msc1849_support_enabled", True ) # Options to control access by tracking MAU diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py index 63a637984a..a2ce9ab3f6 100644 --- a/synapse/config/tracer.py +++ b/synapse/config/tracer.py @@ -18,33 +18,52 @@ from ._base import Config, ConfigError class TracerConfig(Config): def read_config(self, config, **kwargs): - self.tracer_config = config.get("opentracing") + opentracing_config = config.get("opentracing") + if opentracing_config is None: + opentracing_config = {} - self.tracer_config = config.get("opentracing", {"tracer_enabled": False}) + self.opentracer_enabled = opentracing_config.get("enabled", False) + if not self.opentracer_enabled: + return - if self.tracer_config.get("tracer_enabled", False): - # The tracer is enabled so sanitize the config - # If no whitelists are given - self.tracer_config.setdefault("homeserver_whitelist", []) + # The tracer is enabled so sanitize the config - if not isinstance(self.tracer_config.get("homeserver_whitelist"), list): - raise ConfigError("Tracer homesererver_whitelist config is malformed") + self.opentracer_whitelist = opentracing_config.get("homeserver_whitelist", []) + if not isinstance(self.opentracer_whitelist, list): + raise ConfigError("Tracer homeserver_whitelist config is malformed") def generate_config_section(cls, **kwargs): return """\ ## Opentracing ## - # These settings enable opentracing which implements distributed tracing - # This allows you to observe the causal chain of events across servers - # including requests, key lookups etc. across any server running - # synapse or any other other services which supports opentracing. - # (specifically those implemented with jaeger) - - #opentracing: - # # Enable / disable tracer - # tracer_enabled: false - # # The list of homeservers we wish to expose our current traces to. - # # The list is a list of regexes which are matched against the - # # servername of the homeserver - # homeserver_whitelist: - # - ".*" + + # These settings enable opentracing, which implements distributed tracing. + # This allows you to observe the causal chains of events across servers + # including requests, key lookups etc., across any server running + # synapse or any other other services which supports opentracing + # (specifically those implemented with Jaeger). + # + opentracing: + # tracing is disabled by default. Uncomment the following line to enable it. + # + #enabled: true + + # The list of homeservers we wish to send and receive span contexts and span baggage. + # + # Though it's mostly safe to send and receive span contexts to and from + # untrusted users since span contexts are usually opaque ids it can lead to + # two problems, namely: + # - If the span context is marked as sampled by the sending homeserver the receiver will + # sample it. Therefore two homeservers with wildly disparaging sampling policies + # could incur higher sampling counts than intended. + # - Span baggage can be arbitrary data. For safety this has been disabled in synapse + # but that doesn't prevent another server sending you baggage which will be logged + # to opentracing logs. + # + # This a list of regexes which are matched against the server_name of the + # homeserver. + # + # By defult, it is empty, so no servers are matched. + # + #homeserver_whitelist: + # - ".*" """ diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 3b75471d85..246d72cd61 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -31,8 +31,6 @@ class WorkerConfig(Config): self.worker_listeners = config.get("worker_listeners", []) self.worker_daemonize = config.get("worker_daemonize") self.worker_pid_file = config.get("worker_pid_file") - self.worker_log_file = config.get("worker_log_file") - self.worker_log_config = config.get("worker_log_config") # The host used to connect to the main synapse self.worker_replication_host = config.get("worker_replication_host", None) @@ -78,9 +76,5 @@ class WorkerConfig(Config): if args.daemonize is not None: self.worker_daemonize = args.daemonize - if args.log_config is not None: - self.worker_log_config = args.log_config - if args.log_file is not None: - self.worker_log_file = args.log_file if args.manhole is not None: self.worker_manhole = args.worker_manhole diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index d3de70e671..88ed6d764f 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -104,6 +104,17 @@ class _EventInternalMetadata(object): """ return getattr(self, "proactively_send", True) + def is_redacted(self): + """Whether the event has been redacted. + + This is used for efficiently checking whether an event has been + marked as redacted without needing to make another database call. + + Returns: + bool + """ + return getattr(self, "redacted", False) + def _event_dict_property(key): # We want to be able to use hasattr with the event dict properties. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 987de5cab7..9487a886f5 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -52,10 +52,15 @@ def prune_event(event): from . import event_type_from_format_version - return event_type_from_format_version(event.format_version)( + pruned_event = event_type_from_format_version(event.format_version)( pruned_event_dict, event.internal_metadata.get_dict() ) + # Mark the event as redacted + pruned_event.internal_metadata.redacted = True + + return pruned_event + def prune_event_dict(event_dict): """Redacts the event_dict in the same way as `prune_event`, except it @@ -360,9 +365,12 @@ class EventClientSerializer(object): event_id = event.event_id serialized_event = serialize_event(event, time_now, **kwargs) - # If MSC1849 is enabled then we need to look if thre are any relations - # we need to bundle in with the event - if self.experimental_msc1849_support_enabled and bundle_aggregations: + # If MSC1849 is enabled then we need to look if there are any relations + # we need to bundle in with the event. + # Do not bundle relations if the event has been redacted + if not event.internal_metadata.is_redacted() and ( + self.experimental_msc1849_support_enabled and bundle_aggregations + ): annotations = yield self.store.get_aggregation_groups_for_event(event_id) references = yield self.store.get_relations_for_event( event_id, RelationTypes.REFERENCE, direction="f" diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c45d458d94..663264dec4 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,8 +19,6 @@ import functools import logging import re -from twisted.internet import defer - import synapse import synapse.logging.opentracing as opentracing from synapse.api.errors import Codes, FederationDeniedError, SynapseError @@ -103,8 +102,7 @@ class Authenticator(object): self.federation_domain_whitelist = hs.config.federation_domain_whitelist # A method just so we can pass 'self' as the authenticator to the Servlets - @defer.inlineCallbacks - def authenticate_request(self, request, content): + async def authenticate_request(self, request, content): now = self._clock.time_msec() json_request = { "method": request.method.decode("ascii"), @@ -142,7 +140,7 @@ class Authenticator(object): 401, "Missing Authorization headers", Codes.UNAUTHORIZED ) - yield self.keyring.verify_json_for_server( + await self.keyring.verify_json_for_server( origin, json_request, now, "Incoming request" ) @@ -151,17 +149,16 @@ class Authenticator(object): # If we get a valid signed request from the other side, its probably # alive - retry_timings = yield self.store.get_destination_retry_timings(origin) + retry_timings = await self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings["retry_last_ts"]: run_in_background(self._reset_retry_timings, origin) - defer.returnValue(origin) + return origin - @defer.inlineCallbacks - def _reset_retry_timings(self, origin): + async def _reset_retry_timings(self, origin): try: logger.info("Marking origin %r as up", origin) - yield self.store.set_destination_retry_timings(origin, 0, 0) + await self.store.set_destination_retry_timings(origin, 0, 0) except Exception: logger.exception("Error resetting retry timings on %s", origin) @@ -215,7 +212,8 @@ class BaseFederationServlet(object): match against the request path (excluding the /federation/v1 prefix). The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match - the appropriate HTTP method. These methods have the signature: + the appropriate HTTP method. These methods must be *asynchronous* and have the + signature: on_<METHOD>(self, origin, content, query, **kwargs) @@ -235,7 +233,7 @@ class BaseFederationServlet(object): components as specified in the path match regexp. Returns: - Deferred[(int, object)|None]: either (response code, response object) to + Optional[Tuple[int, object]]: either (response code, response object) to return a JSON response, or None if the request has already been handled. Raises: @@ -258,10 +256,9 @@ class BaseFederationServlet(object): authenticator = self.authenticator ratelimiter = self.ratelimiter - @defer.inlineCallbacks @functools.wraps(func) - def new_func(request, *args, **kwargs): - """ A callback which can be passed to HttpServer.RegisterPaths + async def new_func(request, *args, **kwargs): + """A callback which can be passed to HttpServer.RegisterPaths Args: request (twisted.web.http.Request): @@ -270,8 +267,8 @@ class BaseFederationServlet(object): components as specified in the path match regexp. Returns: - Deferred[(int, object)|None]: (response code, response object) as returned - by the callback method. None if the request has already been handled. + Tuple[int, object]|None: (response code, response object) as returned by + the callback method. None if the request has already been handled. """ content = None if request.method in [b"PUT", b"POST"]: @@ -279,7 +276,7 @@ class BaseFederationServlet(object): content = parse_json_object_from_request(request) try: - origin = yield authenticator.authenticate_request(request, content) + origin = await authenticator.authenticate_request(request, content) except NoAuthenticationError: origin = None if self.REQUIRE_AUTH: @@ -304,16 +301,16 @@ class BaseFederationServlet(object): ): if origin: with ratelimiter.ratelimit(origin) as d: - yield d - response = yield func( + await d + response = await func( origin, content, request.args, *args, **kwargs ) else: - response = yield func( + response = await func( origin, content, request.args, *args, **kwargs ) - defer.returnValue(response) + return response # Extra logic that functools.wraps() doesn't finish new_func.__self__ = func.__self__ @@ -341,8 +338,7 @@ class FederationSendServlet(BaseFederationServlet): self.server_name = server_name # This is when someone is trying to send us a bunch of data. - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, transaction_id): + async def on_PUT(self, origin, content, query, transaction_id): """ Called on PUT /send/<transaction_id>/ Args: @@ -351,7 +347,7 @@ class FederationSendServlet(BaseFederationServlet): request. This is *not* None. Returns: - Deferred: Results in a tuple of `(code, response)`, where + Tuple of `(code, response)`, where `response` is a python dict to be converted into JSON that is used as the response body. """ @@ -380,34 +376,33 @@ class FederationSendServlet(BaseFederationServlet): except Exception as e: logger.exception(e) - defer.returnValue((400, {"error": "Invalid transaction"})) - return + return 400, {"error": "Invalid transaction"} try: - code, response = yield self.handler.on_incoming_transaction( + code, response = await self.handler.on_incoming_transaction( origin, transaction_data ) except Exception: logger.exception("on_incoming_transaction failed") raise - defer.returnValue((code, response)) + return code, response class FederationEventServlet(BaseFederationServlet): PATH = "/event/(?P<event_id>[^/]*)/?" # This is when someone asks for a data item for a given server data_id pair. - def on_GET(self, origin, content, query, event_id): - return self.handler.on_pdu_request(origin, event_id) + async def on_GET(self, origin, content, query, event_id): + return await self.handler.on_pdu_request(origin, event_id) class FederationStateServlet(BaseFederationServlet): PATH = "/state/(?P<context>[^/]*)/?" # This is when someone asks for all data for a given context. - def on_GET(self, origin, content, query, context): - return self.handler.on_context_state_request( + async def on_GET(self, origin, content, query, context): + return await self.handler.on_context_state_request( origin, context, parse_string_from_args(query, "event_id", None, required=True), @@ -417,8 +412,8 @@ class FederationStateServlet(BaseFederationServlet): class FederationStateIdsServlet(BaseFederationServlet): PATH = "/state_ids/(?P<room_id>[^/]*)/?" - def on_GET(self, origin, content, query, room_id): - return self.handler.on_state_ids_request( + async def on_GET(self, origin, content, query, room_id): + return await self.handler.on_state_ids_request( origin, room_id, parse_string_from_args(query, "event_id", None, required=True), @@ -428,22 +423,22 @@ class FederationStateIdsServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/(?P<context>[^/]*)/?" - def on_GET(self, origin, content, query, context): + async def on_GET(self, origin, content, query, context): versions = [x.decode("ascii") for x in query[b"v"]] limit = parse_integer_from_args(query, "limit", None) if not limit: - return defer.succeed((400, {"error": "Did not include limit param"})) + return 400, {"error": "Did not include limit param"} - return self.handler.on_backfill_request(origin, context, versions, limit) + return await self.handler.on_backfill_request(origin, context, versions, limit) class FederationQueryServlet(BaseFederationServlet): PATH = "/query/(?P<query_type>[^/]*)" # This is when we receive a server-server Query - def on_GET(self, origin, content, query, query_type): - return self.handler.on_query_request( + async def on_GET(self, origin, content, query, query_type): + return await self.handler.on_query_request( query_type, {k.decode("utf8"): v[0].decode("utf-8") for k, v in query.items()}, ) @@ -452,8 +447,7 @@ class FederationQueryServlet(BaseFederationServlet): class FederationMakeJoinServlet(BaseFederationServlet): PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)" - @defer.inlineCallbacks - def on_GET(self, origin, _content, query, context, user_id): + async def on_GET(self, origin, _content, query, context, user_id): """ Args: origin (unicode): The authenticated server_name of the calling server @@ -466,8 +460,7 @@ class FederationMakeJoinServlet(BaseFederationServlet): components as specified in the path match regexp. Returns: - Deferred[(int, object)|None]: either (response code, response object) to - return a JSON response, or None if the request has already been handled. + Tuple[int, object]: (response code, response object) """ versions = query.get(b"ver") if versions is not None: @@ -475,64 +468,60 @@ class FederationMakeJoinServlet(BaseFederationServlet): else: supported_versions = ["1"] - content = yield self.handler.on_make_join_request( + content = await self.handler.on_make_join_request( origin, context, user_id, supported_versions=supported_versions ) - defer.returnValue((200, content)) + return 200, content class FederationMakeLeaveServlet(BaseFederationServlet): PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, context, user_id): - content = yield self.handler.on_make_leave_request(origin, context, user_id) - defer.returnValue((200, content)) + async def on_GET(self, origin, content, query, context, user_id): + content = await self.handler.on_make_leave_request(origin, context, user_id) + return 200, content class FederationSendLeaveServlet(BaseFederationServlet): PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)" - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, room_id, event_id): - content = yield self.handler.on_send_leave_request(origin, content, room_id) - defer.returnValue((200, content)) + async def on_PUT(self, origin, content, query, room_id, event_id): + content = await self.handler.on_send_leave_request(origin, content, room_id) + return 200, content class FederationEventAuthServlet(BaseFederationServlet): PATH = "/event_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)" - def on_GET(self, origin, content, query, context, event_id): - return self.handler.on_event_auth(origin, context, event_id) + async def on_GET(self, origin, content, query, context, event_id): + return await self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)" - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, context, event_id): + async def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content - content = yield self.handler.on_send_join_request(origin, content, context) - defer.returnValue((200, content)) + content = await self.handler.on_send_join_request(origin, content, context) + return 200, content class FederationV1InviteServlet(BaseFederationServlet): PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)" - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, context, event_id): + async def on_PUT(self, origin, content, query, context, event_id): # We don't get a room version, so we have to assume its EITHER v1 or # v2. This is "fine" as the only difference between V1 and V2 is the # state resolution algorithm, and we don't use that for processing # invites - content = yield self.handler.on_invite_request( + content = await self.handler.on_invite_request( origin, content, room_version=RoomVersions.V1.identifier ) # V1 federation API is defined to return a content of `[200, {...}]` # due to a historical bug. - defer.returnValue((200, (200, content))) + return 200, (200, content) class FederationV2InviteServlet(BaseFederationServlet): @@ -540,8 +529,7 @@ class FederationV2InviteServlet(BaseFederationServlet): PREFIX = FEDERATION_V2_PREFIX - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, context, event_id): + async def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content @@ -554,69 +542,65 @@ class FederationV2InviteServlet(BaseFederationServlet): event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state - content = yield self.handler.on_invite_request( + content = await self.handler.on_invite_request( origin, event, room_version=room_version ) - defer.returnValue((200, content)) + return 200, content class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet): PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)" - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, room_id): - content = yield self.handler.on_exchange_third_party_invite_request( + async def on_PUT(self, origin, content, query, room_id): + content = await self.handler.on_exchange_third_party_invite_request( origin, room_id, content ) - defer.returnValue((200, content)) + return 200, content class FederationClientKeysQueryServlet(BaseFederationServlet): PATH = "/user/keys/query" - def on_POST(self, origin, content, query): - return self.handler.on_query_client_keys(origin, content) + async def on_POST(self, origin, content, query): + return await self.handler.on_query_client_keys(origin, content) class FederationUserDevicesQueryServlet(BaseFederationServlet): PATH = "/user/devices/(?P<user_id>[^/]*)" - def on_GET(self, origin, content, query, user_id): - return self.handler.on_query_user_devices(origin, user_id) + async def on_GET(self, origin, content, query, user_id): + return await self.handler.on_query_user_devices(origin, user_id) class FederationClientKeysClaimServlet(BaseFederationServlet): PATH = "/user/keys/claim" - @defer.inlineCallbacks - def on_POST(self, origin, content, query): - response = yield self.handler.on_claim_client_keys(origin, content) - defer.returnValue((200, response)) + async def on_POST(self, origin, content, query): + response = await self.handler.on_claim_client_keys(origin, content) + return 200, response class FederationQueryAuthServlet(BaseFederationServlet): PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, context, event_id): - new_content = yield self.handler.on_query_auth_request( + async def on_POST(self, origin, content, query, context, event_id): + new_content = await self.handler.on_query_auth_request( origin, content, context, event_id ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGetMissingEventsServlet(BaseFederationServlet): # TODO(paul): Why does this path alone end with "/?" optional? PATH = "/get_missing_events/(?P<room_id>[^/]*)/?" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, room_id): + async def on_POST(self, origin, content, query, room_id): limit = int(content.get("limit", 10)) earliest_events = content.get("earliest_events", []) latest_events = content.get("latest_events", []) - content = yield self.handler.on_get_missing_events( + content = await self.handler.on_get_missing_events( origin, room_id=room_id, earliest_events=earliest_events, @@ -624,7 +608,7 @@ class FederationGetMissingEventsServlet(BaseFederationServlet): limit=limit, ) - defer.returnValue((200, content)) + return 200, content class On3pidBindServlet(BaseFederationServlet): @@ -632,8 +616,7 @@ class On3pidBindServlet(BaseFederationServlet): REQUIRE_AUTH = False - @defer.inlineCallbacks - def on_POST(self, origin, content, query): + async def on_POST(self, origin, content, query): if "invites" in content: last_exception = None for invite in content["invites"]: @@ -645,7 +628,7 @@ class On3pidBindServlet(BaseFederationServlet): ) logger.info(message) raise SynapseError(400, message) - yield self.handler.exchange_third_party_invite( + await self.handler.exchange_third_party_invite( invite["sender"], invite["mxid"], invite["room_id"], @@ -655,7 +638,7 @@ class On3pidBindServlet(BaseFederationServlet): last_exception = e if last_exception: raise last_exception - defer.returnValue((200, {})) + return 200, {} class OpenIdUserInfo(BaseFederationServlet): @@ -679,29 +662,26 @@ class OpenIdUserInfo(BaseFederationServlet): REQUIRE_AUTH = False - @defer.inlineCallbacks - def on_GET(self, origin, content, query): + async def on_GET(self, origin, content, query): token = query.get(b"access_token", [None])[0] if token is None: - defer.returnValue( - (401, {"errcode": "M_MISSING_TOKEN", "error": "Access Token required"}) + return ( + 401, + {"errcode": "M_MISSING_TOKEN", "error": "Access Token required"}, ) - return - user_id = yield self.handler.on_openid_userinfo(token.decode("ascii")) + user_id = await self.handler.on_openid_userinfo(token.decode("ascii")) if user_id is None: - defer.returnValue( - ( - 401, - { - "errcode": "M_UNKNOWN_TOKEN", - "error": "Access Token unknown or expired", - }, - ) + return ( + 401, + { + "errcode": "M_UNKNOWN_TOKEN", + "error": "Access Token unknown or expired", + }, ) - defer.returnValue((200, {"sub": user_id})) + return 200, {"sub": user_id} class PublicRoomList(BaseFederationServlet): @@ -743,8 +723,7 @@ class PublicRoomList(BaseFederationServlet): ) self.allow_access = allow_access - @defer.inlineCallbacks - def on_GET(self, origin, content, query): + async def on_GET(self, origin, content, query): if not self.allow_access: raise FederationDeniedError(origin) @@ -764,10 +743,10 @@ class PublicRoomList(BaseFederationServlet): else: network_tuple = ThirdPartyInstanceID(None, None) - data = yield self.handler.get_local_public_room_list( + data = await self.handler.get_local_public_room_list( limit, since_token, network_tuple=network_tuple, from_federation=True ) - defer.returnValue((200, data)) + return 200, data class FederationVersionServlet(BaseFederationServlet): @@ -775,12 +754,10 @@ class FederationVersionServlet(BaseFederationServlet): REQUIRE_AUTH = False - def on_GET(self, origin, content, query): - return defer.succeed( - ( - 200, - {"server": {"name": "Synapse", "version": get_version_string(synapse)}}, - ) + async def on_GET(self, origin, content, query): + return ( + 200, + {"server": {"name": "Synapse", "version": get_version_string(synapse)}}, ) @@ -790,41 +767,38 @@ class FederationGroupsProfileServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/profile" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.get_group_profile(group_id, requester_user_id) + new_content = await self.handler.get_group_profile(group_id, requester_user_id) - defer.returnValue((200, new_content)) + return 200, new_content - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id): + async def on_POST(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.update_group_profile( + new_content = await self.handler.update_group_profile( group_id, requester_user_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsSummaryServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/summary" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.get_group_summary(group_id, requester_user_id) + new_content = await self.handler.get_group_summary(group_id, requester_user_id) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsRoomsServlet(BaseFederationServlet): @@ -833,15 +807,14 @@ class FederationGroupsRoomsServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/rooms" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.get_rooms_in_group(group_id, requester_user_id) + new_content = await self.handler.get_rooms_in_group(group_id, requester_user_id) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsAddRoomsServlet(BaseFederationServlet): @@ -850,29 +823,27 @@ class FederationGroupsAddRoomsServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/room/(?P<room_id>[^/]*)" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, room_id): + async def on_POST(self, origin, content, query, group_id, room_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.add_room_to_group( + new_content = await self.handler.add_room_to_group( group_id, requester_user_id, room_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content - @defer.inlineCallbacks - def on_DELETE(self, origin, content, query, group_id, room_id): + async def on_DELETE(self, origin, content, query, group_id, room_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.remove_room_from_group( + new_content = await self.handler.remove_room_from_group( group_id, requester_user_id, room_id ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): @@ -884,17 +855,16 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): "/config/(?P<config_key>[^/]*)" ) - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, room_id, config_key): + async def on_POST(self, origin, content, query, group_id, room_id, config_key): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - result = yield self.groups_handler.update_room_in_group( + result = await self.groups_handler.update_room_in_group( group_id, requester_user_id, room_id, config_key, content ) - defer.returnValue((200, result)) + return 200, result class FederationGroupsUsersServlet(BaseFederationServlet): @@ -903,15 +873,14 @@ class FederationGroupsUsersServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/users" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.get_users_in_group(group_id, requester_user_id) + new_content = await self.handler.get_users_in_group(group_id, requester_user_id) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsInvitedUsersServlet(BaseFederationServlet): @@ -920,17 +889,16 @@ class FederationGroupsInvitedUsersServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/invited_users" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.get_invited_users_in_group( + new_content = await self.handler.get_invited_users_in_group( group_id, requester_user_id ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsInviteServlet(BaseFederationServlet): @@ -939,17 +907,16 @@ class FederationGroupsInviteServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.invite_to_group( + new_content = await self.handler.invite_to_group( group_id, user_id, requester_user_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsAcceptInviteServlet(BaseFederationServlet): @@ -958,14 +925,13 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/accept_invite" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): if get_domain_from_id(user_id) != origin: raise SynapseError(403, "user_id doesn't match origin") - new_content = yield self.handler.accept_invite(group_id, user_id, content) + new_content = await self.handler.accept_invite(group_id, user_id, content) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsJoinServlet(BaseFederationServlet): @@ -974,14 +940,13 @@ class FederationGroupsJoinServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): if get_domain_from_id(user_id) != origin: raise SynapseError(403, "user_id doesn't match origin") - new_content = yield self.handler.join_group(group_id, user_id, content) + new_content = await self.handler.join_group(group_id, user_id, content) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsRemoveUserServlet(BaseFederationServlet): @@ -990,17 +955,16 @@ class FederationGroupsRemoveUserServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.remove_user_from_group( + new_content = await self.handler.remove_user_from_group( group_id, user_id, requester_user_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsLocalInviteServlet(BaseFederationServlet): @@ -1009,14 +973,13 @@ class FederationGroupsLocalInviteServlet(BaseFederationServlet): PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/invite" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): if get_domain_from_id(group_id) != origin: raise SynapseError(403, "group_id doesn't match origin") - new_content = yield self.handler.on_invite(group_id, user_id, content) + new_content = await self.handler.on_invite(group_id, user_id, content) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): @@ -1025,16 +988,15 @@ class FederationGroupsRemoveLocalUserServlet(BaseFederationServlet): PATH = "/groups/local/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/remove" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): if get_domain_from_id(group_id) != origin: raise SynapseError(403, "user_id doesn't match origin") - new_content = yield self.handler.user_removed_from_group( + new_content = await self.handler.user_removed_from_group( group_id, user_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): @@ -1043,15 +1005,14 @@ class FederationGroupsRenewAttestaionServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/renew_attestation/(?P<user_id>[^/]*)" - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, user_id): + async def on_POST(self, origin, content, query, group_id, user_id): # We don't need to check auth here as we check the attestation signatures - new_content = yield self.handler.on_renew_attestation( + new_content = await self.handler.on_renew_attestation( group_id, user_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): @@ -1068,8 +1029,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): "/rooms/(?P<room_id>[^/]*)" ) - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, category_id, room_id): + async def on_POST(self, origin, content, query, group_id, category_id, room_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1077,7 +1037,7 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): if category_id == "": raise SynapseError(400, "category_id cannot be empty string") - resp = yield self.handler.update_group_summary_room( + resp = await self.handler.update_group_summary_room( group_id, requester_user_id, room_id=room_id, @@ -1085,10 +1045,9 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): content=content, ) - defer.returnValue((200, resp)) + return 200, resp - @defer.inlineCallbacks - def on_DELETE(self, origin, content, query, group_id, category_id, room_id): + async def on_DELETE(self, origin, content, query, group_id, category_id, room_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1096,11 +1055,11 @@ class FederationGroupsSummaryRoomsServlet(BaseFederationServlet): if category_id == "": raise SynapseError(400, "category_id cannot be empty string") - resp = yield self.handler.delete_group_summary_room( + resp = await self.handler.delete_group_summary_room( group_id, requester_user_id, room_id=room_id, category_id=category_id ) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsCategoriesServlet(BaseFederationServlet): @@ -1109,15 +1068,14 @@ class FederationGroupsCategoriesServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/categories/?" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - resp = yield self.handler.get_group_categories(group_id, requester_user_id) + resp = await self.handler.get_group_categories(group_id, requester_user_id) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsCategoryServlet(BaseFederationServlet): @@ -1126,20 +1084,18 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/categories/(?P<category_id>[^/]+)" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id, category_id): + async def on_GET(self, origin, content, query, group_id, category_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - resp = yield self.handler.get_group_category( + resp = await self.handler.get_group_category( group_id, requester_user_id, category_id ) - defer.returnValue((200, resp)) + return 200, resp - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, category_id): + async def on_POST(self, origin, content, query, group_id, category_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1147,14 +1103,13 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): if category_id == "": raise SynapseError(400, "category_id cannot be empty string") - resp = yield self.handler.upsert_group_category( + resp = await self.handler.upsert_group_category( group_id, requester_user_id, category_id, content ) - defer.returnValue((200, resp)) + return 200, resp - @defer.inlineCallbacks - def on_DELETE(self, origin, content, query, group_id, category_id): + async def on_DELETE(self, origin, content, query, group_id, category_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1162,11 +1117,11 @@ class FederationGroupsCategoryServlet(BaseFederationServlet): if category_id == "": raise SynapseError(400, "category_id cannot be empty string") - resp = yield self.handler.delete_group_category( + resp = await self.handler.delete_group_category( group_id, requester_user_id, category_id ) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsRolesServlet(BaseFederationServlet): @@ -1175,15 +1130,14 @@ class FederationGroupsRolesServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/roles/?" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id): + async def on_GET(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - resp = yield self.handler.get_group_roles(group_id, requester_user_id) + resp = await self.handler.get_group_roles(group_id, requester_user_id) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsRoleServlet(BaseFederationServlet): @@ -1192,18 +1146,16 @@ class FederationGroupsRoleServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/roles/(?P<role_id>[^/]+)" - @defer.inlineCallbacks - def on_GET(self, origin, content, query, group_id, role_id): + async def on_GET(self, origin, content, query, group_id, role_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - resp = yield self.handler.get_group_role(group_id, requester_user_id, role_id) + resp = await self.handler.get_group_role(group_id, requester_user_id, role_id) - defer.returnValue((200, resp)) + return 200, resp - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, role_id): + async def on_POST(self, origin, content, query, group_id, role_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1211,14 +1163,13 @@ class FederationGroupsRoleServlet(BaseFederationServlet): if role_id == "": raise SynapseError(400, "role_id cannot be empty string") - resp = yield self.handler.update_group_role( + resp = await self.handler.update_group_role( group_id, requester_user_id, role_id, content ) - defer.returnValue((200, resp)) + return 200, resp - @defer.inlineCallbacks - def on_DELETE(self, origin, content, query, group_id, role_id): + async def on_DELETE(self, origin, content, query, group_id, role_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1226,11 +1177,11 @@ class FederationGroupsRoleServlet(BaseFederationServlet): if role_id == "": raise SynapseError(400, "role_id cannot be empty string") - resp = yield self.handler.delete_group_role( + resp = await self.handler.delete_group_role( group_id, requester_user_id, role_id ) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsSummaryUsersServlet(BaseFederationServlet): @@ -1247,8 +1198,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): "/users/(?P<user_id>[^/]*)" ) - @defer.inlineCallbacks - def on_POST(self, origin, content, query, group_id, role_id, user_id): + async def on_POST(self, origin, content, query, group_id, role_id, user_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1256,7 +1206,7 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): if role_id == "": raise SynapseError(400, "role_id cannot be empty string") - resp = yield self.handler.update_group_summary_user( + resp = await self.handler.update_group_summary_user( group_id, requester_user_id, user_id=user_id, @@ -1264,10 +1214,9 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): content=content, ) - defer.returnValue((200, resp)) + return 200, resp - @defer.inlineCallbacks - def on_DELETE(self, origin, content, query, group_id, role_id, user_id): + async def on_DELETE(self, origin, content, query, group_id, role_id, user_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") @@ -1275,11 +1224,11 @@ class FederationGroupsSummaryUsersServlet(BaseFederationServlet): if role_id == "": raise SynapseError(400, "role_id cannot be empty string") - resp = yield self.handler.delete_group_summary_user( + resp = await self.handler.delete_group_summary_user( group_id, requester_user_id, user_id=user_id, role_id=role_id ) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): @@ -1288,13 +1237,12 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet): PATH = "/get_groups_publicised" - @defer.inlineCallbacks - def on_POST(self, origin, content, query): - resp = yield self.handler.bulk_get_publicised_groups( + async def on_POST(self, origin, content, query): + resp = await self.handler.bulk_get_publicised_groups( content["user_ids"], proxy=False ) - defer.returnValue((200, resp)) + return 200, resp class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): @@ -1303,17 +1251,16 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet): PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy" - @defer.inlineCallbacks - def on_PUT(self, origin, content, query, group_id): + async def on_PUT(self, origin, content, query, group_id): requester_user_id = parse_string_from_args(query, "requester_user_id") if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - new_content = yield self.handler.set_group_join_policy( + new_content = await self.handler.set_group_join_policy( group_id, requester_user_id, content ) - defer.returnValue((200, new_content)) + return 200, new_content class RoomComplexityServlet(BaseFederationServlet): @@ -1325,18 +1272,17 @@ class RoomComplexityServlet(BaseFederationServlet): PATH = "/rooms/(?P<room_id>[^/]*)/complexity" PREFIX = FEDERATION_UNSTABLE_PREFIX - @defer.inlineCallbacks - def on_GET(self, origin, content, query, room_id): + async def on_GET(self, origin, content, query, room_id): store = self.handler.hs.get_datastore() - is_public = yield store.is_room_world_readable_or_publicly_joinable(room_id) + is_public = await store.is_room_world_readable_or_publicly_joinable(room_id) if not is_public: raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM) - complexity = yield store.get_room_complexity(room_id) - defer.returnValue((200, complexity)) + complexity = await store.get_room_complexity(room_id) + return 200, complexity FEDERATION_SERVLET_CLASSES = ( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index eaeda7a5cb..6d7a987f13 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from twisted.internet.defer import succeed +from synapse import event_auth from synapse.api.constants import EventTypes, Membership, RelationTypes from synapse.api.errors import ( AuthError, @@ -784,6 +785,20 @@ class EventCreationHandler(object): event.signatures.update(returned_invite.signatures) if event.type == EventTypes.Redaction: + original_event = yield self.store.get_event( + event.redacts, + check_redacted=False, + get_prev_content=False, + allow_rejected=False, + allow_none=True, + check_room_id=event.room_id, + ) + + # we can make some additional checks now if we have the original event. + if original_event: + if original_event.type == EventTypes.Create: + raise AuthError(403, "Redacting create events is not permitted") + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( event, prev_state_ids, for_verification=True @@ -791,18 +806,18 @@ class EventCreationHandler(object): auth_events = yield self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events.values()} room_version = yield self.store.get_room_version(event.room_id) - if self.auth.check_redaction(room_version, event, auth_events=auth_events): - original_event = yield self.store.get_event( - event.redacts, - check_redacted=False, - get_prev_content=False, - allow_rejected=False, - allow_none=False, - ) + + if event_auth.check_redaction(room_version, event, auth_events=auth_events): + # this user doesn't have 'redact' rights, so we need to do some more + # checks on the original event. Let's start by checking the original + # event exists. + if not original_event: + raise NotFoundError("Could not find event %s" % (event.redacts,)) + if event.user_id != original_event.user_id: raise AuthError(403, "You don't have permission to redact events") - # We've already checked. + # all the checks are done. event.internal_metadata.recheck_redaction = False if event.type == EventTypes.Create: diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index f0ceea2a64..56d900080b 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C.d +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,6 +24,15 @@ # this move the methods have work very similarly to opentracing's and it should only # be a matter of few regexes to move over to opentracing's access patterns proper. +import contextlib +import logging +import re +from functools import wraps + +from twisted.internet import defer + +from synapse.config import ConfigError + try: import opentracing except ImportError: @@ -35,12 +44,6 @@ except ImportError: JaegerConfig = None LogContextScopeManager = None -import contextlib -import logging -import re -from functools import wraps - -from twisted.internet import defer logger = logging.getLogger(__name__) @@ -91,7 +94,8 @@ def only_if_tracing(func): return _only_if_tracing_inner -# Block everything by default +# A regex which matches the server_names to expose traces for. +# None means 'block everything'. _homeserver_whitelist = None tags = _DumTagNames @@ -101,31 +105,24 @@ def init_tracer(config): """Set the whitelists and initialise the JaegerClient tracer Args: - config (Config) - The config used by the homeserver. Here it's used to set the service - name to the homeserver's. + config (HomeserverConfig): The config used by the homeserver """ global opentracing - if not config.tracer_config.get("tracer_enabled", False): + if not config.opentracer_enabled: # We don't have a tracer opentracing = None return - if not opentracing: - logger.error( - "The server has been configure to use opentracing but opentracing is not installed." - ) - raise ModuleNotFoundError("opentracing") - - if not JaegerConfig: - logger.error( - "The server has been configure to use opentracing but opentracing is not installed." + if not opentracing or not JaegerConfig: + raise ConfigError( + "The server has been configured to use opentracing but opentracing is not " + "installed." ) # Include the worker name name = config.worker_name if config.worker_name else "master" - set_homeserver_whitelist(config.tracer_config["homeserver_whitelist"]) + set_homeserver_whitelist(config.opentracer_whitelist) jaeger_config = JaegerConfig( config={"sampler": {"type": "const", "param": 1}, "logging": True}, service_name="{} {}".format(config.server_name, name), @@ -232,7 +229,6 @@ def whitelisted_homeserver(destination): """Checks if a destination matches the whitelist Args: destination (String)""" - global _homeserver_whitelist if _homeserver_whitelist: return _homeserver_whitelist.match(destination) return False @@ -344,8 +340,7 @@ def trace_servlet(servlet_name, func): @wraps(func) @defer.inlineCallbacks def _trace_servlet_inner(request, *args, **kwargs): - with start_active_span_from_context( - request.requestHeaders, + with start_active_span( "incoming-client-request", tags={ "request_id": request.get_request_id(), diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py index 91e14462f3..8c661302c9 100644 --- a/synapse/logging/scopecontextmanager.py +++ b/synapse/logging/scopecontextmanager.py @@ -34,9 +34,7 @@ class LogContextScopeManager(ScopeManager): """ def __init__(self, config): - # Set the whitelists - logger.info(config.tracer_config) - self._homeserver_whitelist = config.tracer_config["homeserver_whitelist"] + pass @property def active(self): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index eaf0aaa86e..488280b4a6 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -29,8 +29,16 @@ from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricF from twisted.internet import reactor +from synapse.metrics._exposition import ( + MetricsResource, + generate_latest, + start_http_server, +) + logger = logging.getLogger(__name__) +METRICS_PREFIX = "/_synapse/metrics" + running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] @@ -470,3 +478,12 @@ try: gc.disable() except AttributeError: pass + +__all__ = [ + "MetricsResource", + "generate_latest", + "start_http_server", + "LaterGauge", + "InFlightGauge", + "BucketCollector", +] diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py new file mode 100644 index 0000000000..1933ecd3e3 --- /dev/null +++ b/synapse/metrics/_exposition.py @@ -0,0 +1,258 @@ +# -*- coding: utf-8 -*- +# Copyright 2015-2019 Prometheus Python Client Developers +# Copyright 2019 Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This code is based off `prometheus_client/exposition.py` from version 0.7.1. + +Due to the renaming of metrics in prometheus_client 0.4.0, this customised +vendoring of the code will emit both the old versions that Synapse dashboards +expect, and the newer "best practice" version of the up-to-date official client. +""" + +import math +import threading +from collections import namedtuple +from http.server import BaseHTTPRequestHandler, HTTPServer +from socketserver import ThreadingMixIn +from urllib.parse import parse_qs, urlparse + +from prometheus_client import REGISTRY + +from twisted.web.resource import Resource + +try: + from prometheus_client.samples import Sample +except ImportError: + Sample = namedtuple("Sample", ["name", "labels", "value", "timestamp", "exemplar"]) + + +CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8") + + +INF = float("inf") +MINUS_INF = float("-inf") + + +def floatToGoString(d): + d = float(d) + if d == INF: + return "+Inf" + elif d == MINUS_INF: + return "-Inf" + elif math.isnan(d): + return "NaN" + else: + s = repr(d) + dot = s.find(".") + # Go switches to exponents sooner than Python. + # We only need to care about positive values for le/quantile. + if d > 0 and dot > 6: + mantissa = "{0}.{1}{2}".format(s[0], s[1:dot], s[dot + 1 :]).rstrip("0.") + return "{0}e+0{1}".format(mantissa, dot - 1) + return s + + +def sample_line(line, name): + if line.labels: + labelstr = "{{{0}}}".format( + ",".join( + [ + '{0}="{1}"'.format( + k, + v.replace("\\", r"\\").replace("\n", r"\n").replace('"', r"\""), + ) + for k, v in sorted(line.labels.items()) + ] + ) + ) + else: + labelstr = "" + timestamp = "" + if line.timestamp is not None: + # Convert to milliseconds. + timestamp = " {0:d}".format(int(float(line.timestamp) * 1000)) + return "{0}{1} {2}{3}\n".format( + name, labelstr, floatToGoString(line.value), timestamp + ) + + +def nameify_sample(sample): + """ + If we get a prometheus_client<0.4.0 sample as a tuple, transform it into a + namedtuple which has the names we expect. + """ + if not isinstance(sample, Sample): + sample = Sample(*sample, None, None) + + return sample + + +def generate_latest(registry, emit_help=False): + output = [] + + for metric in registry.collect(): + + if metric.name.startswith("__unused"): + continue + + if not metric.samples: + # No samples, don't bother. + continue + + mname = metric.name + mnewname = metric.name + mtype = metric.type + + # OpenMetrics -> Prometheus + if mtype == "counter": + mnewname = mnewname + "_total" + elif mtype == "info": + mtype = "gauge" + mnewname = mnewname + "_info" + elif mtype == "stateset": + mtype = "gauge" + elif mtype == "gaugehistogram": + mtype = "histogram" + elif mtype == "unknown": + mtype = "untyped" + + # Output in the old format for compatibility. + if emit_help: + output.append( + "# HELP {0} {1}\n".format( + mname, + metric.documentation.replace("\\", r"\\").replace("\n", r"\n"), + ) + ) + output.append("# TYPE {0} {1}\n".format(mname, mtype)) + for sample in map(nameify_sample, metric.samples): + # Get rid of the OpenMetrics specific samples + for suffix in ["_created", "_gsum", "_gcount"]: + if sample.name.endswith(suffix): + break + else: + newname = sample.name.replace(mnewname, mname) + if ":" in newname and newname.endswith("_total"): + newname = newname[: -len("_total")] + output.append(sample_line(sample, newname)) + + # Get rid of the weird colon things while we're at it + if mtype == "counter": + mnewname = mnewname.replace(":total", "") + mnewname = mnewname.replace(":", "_") + + if mname == mnewname: + continue + + # Also output in the new format, if it's different. + if emit_help: + output.append( + "# HELP {0} {1}\n".format( + mnewname, + metric.documentation.replace("\\", r"\\").replace("\n", r"\n"), + ) + ) + output.append("# TYPE {0} {1}\n".format(mnewname, mtype)) + for sample in map(nameify_sample, metric.samples): + # Get rid of the OpenMetrics specific samples + for suffix in ["_created", "_gsum", "_gcount"]: + if sample.name.endswith(suffix): + break + else: + output.append( + sample_line( + sample, sample.name.replace(":total", "").replace(":", "_") + ) + ) + + return "".join(output).encode("utf-8") + + +class MetricsHandler(BaseHTTPRequestHandler): + """HTTP handler that gives metrics from ``REGISTRY``.""" + + registry = REGISTRY + + def do_GET(self): + registry = self.registry + params = parse_qs(urlparse(self.path).query) + + if "help" in params: + emit_help = True + else: + emit_help = False + + try: + output = generate_latest(registry, emit_help=emit_help) + except Exception: + self.send_error(500, "error generating metric output") + raise + self.send_response(200) + self.send_header("Content-Type", CONTENT_TYPE_LATEST) + self.end_headers() + self.wfile.write(output) + + def log_message(self, format, *args): + """Log nothing.""" + + @classmethod + def factory(cls, registry): + """Returns a dynamic MetricsHandler class tied + to the passed registry. + """ + # This implementation relies on MetricsHandler.registry + # (defined above and defaulted to REGISTRY). + + # As we have unicode_literals, we need to create a str() + # object for type(). + cls_name = str(cls.__name__) + MyMetricsHandler = type(cls_name, (cls, object), {"registry": registry}) + return MyMetricsHandler + + +class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer): + """Thread per request HTTP server.""" + + # Make worker threads "fire and forget". Beginning with Python 3.7 this + # prevents a memory leak because ``ThreadingMixIn`` starts to gather all + # non-daemon threads in a list in order to join on them at server close. + # Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the + # same as Python 3.7's ``ThreadingHTTPServer``. + daemon_threads = True + + +def start_http_server(port, addr="", registry=REGISTRY): + """Starts an HTTP server for prometheus metrics as a daemon thread""" + CustomMetricsHandler = MetricsHandler.factory(registry) + httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler) + t = threading.Thread(target=httpd.serve_forever) + t.daemon = True + t.start() + + +class MetricsResource(Resource): + """ + Twisted ``Resource`` that serves prometheus metrics. + """ + + isLeaf = True + + def __init__(self, registry=REGISTRY): + self.registry = registry + + def render_GET(self, request): + request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) + return generate_latest(self.registry) diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py deleted file mode 100644 index 9789359077..0000000000 --- a/synapse/metrics/resource.py +++ /dev/null @@ -1,20 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from prometheus_client.twisted import MetricsResource - -METRICS_PREFIX = "/_synapse/metrics" - -__all__ = ["MetricsResource", "METRICS_PREFIX"] diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e7618057be..c6465c0386 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -65,9 +65,7 @@ REQUIREMENTS = [ "msgpack>=0.5.2", "phonenumbers>=8.2.0", "six>=1.10", - # prometheus_client 0.4.0 changed the format of counter metrics - # (cf https://github.com/matrix-org/synapse/issues/4001) - "prometheus_client>=0.0.18,<0.4.0", + "prometheus_client>=0.0.18,<0.8.0", # we use attr.s(slots), which arrived in 16.0.0 # Twisted 18.7.0 requires attrs>=17.4.0 "attrs>=17.4.0", diff --git a/synapse/rest/client/v2_alpha/relations.py b/synapse/rest/client/v2_alpha/relations.py index 7ce485b471..6e52f6d284 100644 --- a/synapse/rest/client/v2_alpha/relations.py +++ b/synapse/rest/client/v2_alpha/relations.py @@ -34,6 +34,7 @@ from synapse.http.servlet import ( from synapse.rest.client.transactions import HttpTransactionCache from synapse.storage.relations import ( AggregationPaginationToken, + PaginationChunk, RelationPaginationToken, ) @@ -153,23 +154,28 @@ class RelationPaginationServlet(RestServlet): from_token = parse_string(request, "from") to_token = parse_string(request, "to") - if from_token: - from_token = RelationPaginationToken.from_string(from_token) - - if to_token: - to_token = RelationPaginationToken.from_string(to_token) - - result = yield self.store.get_relations_for_event( - event_id=parent_id, - relation_type=relation_type, - event_type=event_type, - limit=limit, - from_token=from_token, - to_token=to_token, - ) + if event.internal_metadata.is_redacted(): + # If the event is redacted, return an empty list of relations + pagination_chunk = PaginationChunk(chunk=[]) + else: + # Return the relations + if from_token: + from_token = RelationPaginationToken.from_string(from_token) + + if to_token: + to_token = RelationPaginationToken.from_string(to_token) + + pagination_chunk = yield self.store.get_relations_for_event( + event_id=parent_id, + relation_type=relation_type, + event_type=event_type, + limit=limit, + from_token=from_token, + to_token=to_token, + ) events = yield self.store.get_events_as_list( - [c["event_id"] for c in result.chunk] + [c["event_id"] for c in pagination_chunk.chunk] ) now = self.clock.time_msec() @@ -186,7 +192,7 @@ class RelationPaginationServlet(RestServlet): events, now, bundle_aggregations=False ) - return_value = result.to_dict() + return_value = pagination_chunk.to_dict() return_value["chunk"] = events return_value["original_event"] = original_event @@ -234,7 +240,7 @@ class RelationAggregationPaginationServlet(RestServlet): # This checks that a) the event exists and b) the user is allowed to # view it. - yield self.event_handler.get_event(requester.user, room_id, parent_id) + event = yield self.event_handler.get_event(requester.user, room_id, parent_id) if relation_type not in (RelationTypes.ANNOTATION, None): raise SynapseError(400, "Relation type must be 'annotation'") @@ -243,21 +249,26 @@ class RelationAggregationPaginationServlet(RestServlet): from_token = parse_string(request, "from") to_token = parse_string(request, "to") - if from_token: - from_token = AggregationPaginationToken.from_string(from_token) - - if to_token: - to_token = AggregationPaginationToken.from_string(to_token) - - res = yield self.store.get_aggregation_groups_for_event( - event_id=parent_id, - event_type=event_type, - limit=limit, - from_token=from_token, - to_token=to_token, - ) - - defer.returnValue((200, res.to_dict())) + if event.internal_metadata.is_redacted(): + # If the event is redacted, return an empty list of relations + pagination_chunk = PaginationChunk(chunk=[]) + else: + # Return the relations + if from_token: + from_token = AggregationPaginationToken.from_string(from_token) + + if to_token: + to_token = AggregationPaginationToken.from_string(to_token) + + pagination_chunk = yield self.store.get_aggregation_groups_for_event( + event_id=parent_id, + event_type=event_type, + limit=limit, + from_token=from_token, + to_token=to_token, + ) + + defer.returnValue((200, pagination_chunk.to_dict())) class RelationAggregationGroupPaginationServlet(RestServlet): diff --git a/synapse/static/index.html b/synapse/static/index.html index d3f1c7dce0..bf46df9097 100644 --- a/synapse/static/index.html +++ b/synapse/static/index.html @@ -48,13 +48,13 @@ </div> <h1>It works! Synapse is running</h1> <p>Your Synapse server is listening on this port and is ready for messages.</p> - <p>To use this server you'll need <a href="https://matrix.org/docs/projects/try-matrix-now.html#clients" target="_blank">a Matrix client</a>. + <p>To use this server you'll need <a href="https://matrix.org/docs/projects/try-matrix-now.html#clients" target="_blank" rel="noopener noreferrer">a Matrix client</a>. </p> <p>Welcome to the Matrix universe :)</p> <hr> <p> <small> - <a href="https://matrix.org" target="_blank"> + <a href="https://matrix.org" target="_blank" rel="noopener noreferrer"> matrix.org </a> </small> diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 874d0a56bc..858fc755a1 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -37,6 +37,7 @@ from synapse.logging.context import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id +from synapse.util import batch_iter from synapse.util.metrics import Measure from ._base import SQLBaseStore @@ -218,9 +219,108 @@ class EventsWorkerStore(SQLBaseStore): if not event_ids: defer.returnValue([]) - event_id_list = event_ids - event_ids = set(event_ids) + # there may be duplicates so we cast the list to a set + event_entry_map = yield self._get_events_from_cache_or_db( + set(event_ids), allow_rejected=allow_rejected + ) + + events = [] + for event_id in event_ids: + entry = event_entry_map.get(event_id, None) + if not entry: + continue + + if not allow_rejected: + assert not entry.event.rejected_reason, ( + "rejected event returned from _get_events_from_cache_or_db despite " + "allow_rejected=False" + ) + + # We may not have had the original event when we received a redaction, so + # we have to recheck auth now. + + if not allow_rejected and entry.event.type == EventTypes.Redaction: + redacted_event_id = entry.event.redacts + event_map = yield self._get_events_from_cache_or_db([redacted_event_id]) + original_event_entry = event_map.get(redacted_event_id) + if not original_event_entry: + # we don't have the redacted event (or it was rejected). + # + # We assume that the redaction isn't authorized for now; if the + # redacted event later turns up, the redaction will be re-checked, + # and if it is found valid, the original will get redacted before it + # is served to the client. + logger.debug( + "Withholding redaction event %s since we don't (yet) have the " + "original %s", + event_id, + redacted_event_id, + ) + continue + + original_event = original_event_entry.event + if original_event.type == EventTypes.Create: + # we never serve redactions of Creates to clients. + logger.info( + "Withholding redaction %s of create event %s", + event_id, + redacted_event_id, + ) + continue + + if entry.event.internal_metadata.need_to_check_redaction(): + original_domain = get_domain_from_id(original_event.sender) + redaction_domain = get_domain_from_id(entry.event.sender) + if original_domain != redaction_domain: + # the senders don't match, so this is forbidden + logger.info( + "Withholding redaction %s whose sender domain %s doesn't " + "match that of redacted event %s %s", + event_id, + redaction_domain, + redacted_event_id, + original_domain, + ) + continue + + # Update the cache to save doing the checks again. + entry.event.internal_metadata.recheck_redaction = False + + if check_redacted and entry.redacted_event: + event = entry.redacted_event + else: + event = entry.event + + events.append(event) + + if get_prev_content: + if "replaces_state" in event.unsigned: + prev = yield self.get_event( + event.unsigned["replaces_state"], + get_prev_content=False, + allow_none=True, + ) + if prev: + event.unsigned = dict(event.unsigned) + event.unsigned["prev_content"] = prev.content + event.unsigned["prev_sender"] = prev.sender + + defer.returnValue(events) + + @defer.inlineCallbacks + def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False): + """Fetch a bunch of events from the cache or the database. + + If events are pulled from the database, they will be cached for future lookups. + Args: + event_ids (Iterable[str]): The event_ids of the events to fetch + allow_rejected (bool): Whether to include rejected events + + Returns: + Deferred[Dict[str, _EventCacheEntry]]: + map from event id to result + """ event_entry_map = self._get_events_from_cache( event_ids, allow_rejected=allow_rejected ) @@ -243,81 +343,7 @@ class EventsWorkerStore(SQLBaseStore): event_entry_map.update(missing_events) - events = [] - for event_id in event_id_list: - entry = event_entry_map.get(event_id, None) - if not entry: - continue - - # Starting in room version v3, some redactions need to be rechecked if we - # didn't have the redacted event at the time, so we recheck on read - # instead. - if not allow_rejected and entry.event.type == EventTypes.Redaction: - if entry.event.internal_metadata.need_to_check_redaction(): - # XXX: we need to avoid calling get_event here. - # - # The problem is that we end up at this point when an event - # which has been redacted is pulled out of the database by - # _enqueue_events, because _enqueue_events needs to check - # the redaction before it can cache the redacted event. So - # obviously, calling get_event to get the redacted event out - # of the database gives us an infinite loop. - # - # For now (quick hack to fix during 0.99 release cycle), we - # just go and fetch the relevant row from the db, but it - # would be nice to think about how we can cache this rather - # than hit the db every time we access a redaction event. - # - # One thought on how to do this: - # 1. split get_events_as_list up so that it is divided into - # (a) get the rawish event from the db/cache, (b) do the - # redaction/rejection filtering - # 2. have _get_event_from_row just call the first half of - # that - - orig_sender = yield self._simple_select_one_onecol( - table="events", - keyvalues={"event_id": entry.event.redacts}, - retcol="sender", - allow_none=True, - ) - - expected_domain = get_domain_from_id(entry.event.sender) - if ( - orig_sender - and get_domain_from_id(orig_sender) == expected_domain - ): - # This redaction event is allowed. Mark as not needing a - # recheck. - entry.event.internal_metadata.recheck_redaction = False - else: - # We don't have the event that is being redacted, so we - # assume that the event isn't authorized for now. (If we - # later receive the event, then we will always redact - # it anyway, since we have this redaction) - continue - - if allow_rejected or not entry.event.rejected_reason: - if check_redacted and entry.redacted_event: - event = entry.redacted_event - else: - event = entry.event - - events.append(event) - - if get_prev_content: - if "replaces_state" in event.unsigned: - prev = yield self.get_event( - event.unsigned["replaces_state"], - get_prev_content=False, - allow_none=True, - ) - if prev: - event.unsigned = dict(event.unsigned) - event.unsigned["prev_content"] = prev.content - event.unsigned["prev_sender"] = prev.sender - - defer.returnValue(events) + return event_entry_map def _invalidate_get_event_cache(self, event_id): self._get_event_cache.invalidate((event_id,)) @@ -326,7 +352,7 @@ class EventsWorkerStore(SQLBaseStore): """Fetch events from the caches Args: - events (list(str)): list of event_ids to fetch + events (Iterable[str]): list of event_ids to fetch allow_rejected (bool): Whether to return events that were rejected update_metrics (bool): Whether to update the cache hit ratio metrics @@ -384,19 +410,16 @@ class EventsWorkerStore(SQLBaseStore): The fetch requests. Each entry consists of a list of event ids to be fetched, and a deferred to be completed once the events have been fetched. - """ with Measure(self._clock, "_fetch_event_list"): try: event_id_lists = list(zip(*event_list))[0] event_ids = [item for sublist in event_id_lists for item in sublist] - rows = self._new_transaction( + row_dict = self._new_transaction( conn, "do_fetch", [], [], self._fetch_event_rows, event_ids ) - row_dict = {r["event_id"]: r for r in rows} - # We only want to resolve deferreds from the main thread def fire(lst, res): for ids, d in lst: @@ -454,7 +477,7 @@ class EventsWorkerStore(SQLBaseStore): logger.debug("Loaded %d events (%d rows)", len(events), len(rows)) if not allow_rejected: - rows[:] = [r for r in rows if not r["rejects"]] + rows[:] = [r for r in rows if r["rejected_reason"] is None] res = yield make_deferred_yieldable( defer.gatherResults( @@ -463,8 +486,8 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_from_row, row["internal_metadata"], row["json"], - row["redacts"], - rejected_reason=row["rejects"], + row["redactions"], + rejected_reason=row["rejected_reason"], format_version=row["format_version"], ) for row in rows @@ -475,49 +498,98 @@ class EventsWorkerStore(SQLBaseStore): defer.returnValue({e.event.event_id: e for e in res if e}) - def _fetch_event_rows(self, txn, events): - rows = [] - N = 200 - for i in range(1 + len(events) // N): - evs = events[i * N : (i + 1) * N] - if not evs: - break + def _fetch_event_rows(self, txn, event_ids): + """Fetch event rows from the database + + Events which are not found are omitted from the result. + + The returned per-event dicts contain the following keys: + + * event_id (str) + + * json (str): json-encoded event structure + + * internal_metadata (str): json-encoded internal metadata dict + + * format_version (int|None): The format of the event. Hopefully one + of EventFormatVersions. 'None' means the event predates + EventFormatVersions (so the event is format V1). + + * rejected_reason (str|None): if the event was rejected, the reason + why. + * redactions (List[str]): a list of event-ids which (claim to) redact + this event. + + Args: + txn (twisted.enterprise.adbapi.Connection): + event_ids (Iterable[str]): event IDs to fetch + + Returns: + Dict[str, Dict]: a map from event id to event info. + """ + event_dict = {} + for evs in batch_iter(event_ids, 200): sql = ( "SELECT " - " e.event_id as event_id, " + " e.event_id, " " e.internal_metadata," " e.json," " e.format_version, " - " r.redacts as redacts," - " rej.event_id as rejects " + " rej.reason " " FROM event_json as e" " LEFT JOIN rejections as rej USING (event_id)" - " LEFT JOIN redactions as r ON e.event_id = r.redacts" " WHERE e.event_id IN (%s)" ) % (",".join(["?"] * len(evs)),) txn.execute(sql, evs) - rows.extend(self.cursor_to_dict(txn)) - return rows + for row in txn: + event_id = row[0] + event_dict[event_id] = { + "event_id": event_id, + "internal_metadata": row[1], + "json": row[2], + "format_version": row[3], + "rejected_reason": row[4], + "redactions": [], + } + + # check for redactions + redactions_sql = ( + "SELECT event_id, redacts FROM redactions WHERE redacts IN (%s)" + ) % (",".join(["?"] * len(evs)),) + + txn.execute(redactions_sql, evs) + + for (redacter, redacted) in txn: + d = event_dict.get(redacted) + if d: + d["redactions"].append(redacter) + + return event_dict @defer.inlineCallbacks def _get_event_from_row( - self, internal_metadata, js, redacted, format_version, rejected_reason=None + self, internal_metadata, js, redactions, format_version, rejected_reason=None ): + """Parse an event row which has been read from the database + + Args: + internal_metadata (str): json-encoded internal_metadata column + js (str): json-encoded event body from event_json + redactions (list[str]): a list of the events which claim to have redacted + this event, from the redactions table + format_version: (str): the 'format_version' column + rejected_reason (str|None): the reason this event was rejected, if any + + Returns: + _EventCacheEntry + """ with Measure(self._clock, "_get_event_from_row"): d = json.loads(js) internal_metadata = json.loads(internal_metadata) - if rejected_reason: - rejected_reason = yield self._simple_select_one_onecol( - table="rejections", - keyvalues={"event_id": rejected_reason}, - retcol="reason", - desc="_get_event_from_row_rejected_reason", - ) - if format_version is None: # This means that we stored the event before we had the concept # of a event format version, so it must be a V1 event. @@ -529,41 +601,7 @@ class EventsWorkerStore(SQLBaseStore): rejected_reason=rejected_reason, ) - redacted_event = None - if redacted: - redacted_event = prune_event(original_ev) - - redaction_id = yield self._simple_select_one_onecol( - table="redactions", - keyvalues={"redacts": redacted_event.event_id}, - retcol="event_id", - desc="_get_event_from_row_redactions", - ) - - redacted_event.unsigned["redacted_by"] = redaction_id - # Get the redaction event. - - because = yield self.get_event( - redaction_id, check_redacted=False, allow_none=True - ) - - if because: - # It's fine to do add the event directly, since get_pdu_json - # will serialise this field correctly - redacted_event.unsigned["redacted_because"] = because - - # Starting in room version v3, some redactions need to be - # rechecked if we didn't have the redacted event at the - # time, so we recheck on read instead. - if because.internal_metadata.need_to_check_redaction(): - expected_domain = get_domain_from_id(original_ev.sender) - if get_domain_from_id(because.sender) == expected_domain: - # This redaction event is allowed. Mark as not needing a - # recheck. - because.internal_metadata.recheck_redaction = False - else: - # Senders don't match, so the event isn't actually redacted - redacted_event = None + redacted_event = yield self._maybe_redact_event_row(original_ev, redactions) cache_entry = _EventCacheEntry( event=original_ev, redacted_event=redacted_event @@ -574,6 +612,60 @@ class EventsWorkerStore(SQLBaseStore): defer.returnValue(cache_entry) @defer.inlineCallbacks + def _maybe_redact_event_row(self, original_ev, redactions): + """Given an event object and a list of possible redacting event ids, + determine whether to honour any of those redactions and if so return a redacted + event. + + Args: + original_ev (EventBase): + redactions (iterable[str]): list of event ids of potential redaction events + + Returns: + Deferred[EventBase|None]: if the event should be redacted, a pruned + event object. Otherwise, None. + """ + if original_ev.type == "m.room.create": + # we choose to ignore redactions of m.room.create events. + return None + + redaction_map = yield self._get_events_from_cache_or_db(redactions) + + for redaction_id in redactions: + redaction_entry = redaction_map.get(redaction_id) + if not redaction_entry: + # we don't have the redaction event, or the redaction event was not + # authorized. + continue + + redaction_event = redaction_entry.event + + # Starting in room version v3, some redactions need to be + # rechecked if we didn't have the redacted event at the + # time, so we recheck on read instead. + if redaction_event.internal_metadata.need_to_check_redaction(): + expected_domain = get_domain_from_id(original_ev.sender) + if get_domain_from_id(redaction_event.sender) == expected_domain: + # This redaction event is allowed. Mark as not needing a recheck. + redaction_event.internal_metadata.recheck_redaction = False + else: + # Senders don't match, so the event isn't actually redacted + continue + + # we found a good redaction event. Redact! + redacted_event = prune_event(original_ev) + redacted_event.unsigned["redacted_by"] = redaction_id + + # It's fine to add the event directly, since get_pdu_json + # will serialise this field correctly + redacted_event.unsigned["redacted_because"] = redaction_event + + return redacted_event + + # no valid redaction found for this event + return None + + @defer.inlineCallbacks def have_events_in_timeline(self, event_ids): """Given a list of event ids, check if we have already processed and stored them as non outliers. |