diff options
Diffstat (limited to 'synapse')
81 files changed, 638 insertions, 475 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index 1d9d85a727..4d39996a2e 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ try: except ImportError: pass -__version__ = "1.15.0" +__version__ = "1.15.1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py index d528450c78..55cce2db22 100644 --- a/synapse/_scripts/register_new_matrix_user.py +++ b/synapse/_scripts/register_new_matrix_user.py @@ -23,8 +23,6 @@ import hmac import logging import sys -from six.moves import input - import requests as _requests import yaml diff --git a/synapse/api/errors.py b/synapse/api/errors.py index a07a54580d..5305038c21 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -17,10 +17,9 @@ """Contains exceptions and error codes.""" import logging +from http import HTTPStatus from typing import Dict, List -from six.moves import http_client - from canonicaljson import json from twisted.web import http @@ -173,7 +172,7 @@ class ConsentNotGivenError(SynapseError): consent_url (str): The URL where the user can give their consent """ super(ConsentNotGivenError, self).__init__( - code=http_client.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN + code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN ) self._consent_uri = consent_uri @@ -193,7 +192,7 @@ class UserDeactivatedError(SynapseError): msg (str): The human-readable error message """ super(UserDeactivatedError, self).__init__( - code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED + code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED ) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 8b64d0a285..f988f62a1e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -17,8 +17,6 @@ # limitations under the License. from typing import List -from six import text_type - import jsonschema from canonicaljson import json from jsonschema import FormatChecker @@ -313,7 +311,7 @@ class Filter(object): content = event.get("content", {}) # check if there is a string url field in the content for filtering purposes - contains_url = isinstance(content.get("url"), text_type) + contains_url = isinstance(content.get("url"), str) labels = content.get(EventContentFields.LABELS, []) return self.check_fields(room_id, sender, ev_type, labels, contains_url) diff --git a/synapse/api/urls.py b/synapse/api/urls.py index f34434bd67..bd03ebca5a 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -17,8 +17,7 @@ """Contains the URL paths to prefix various aspects of the server with. """ import hmac from hashlib import sha256 - -from six.moves.urllib.parse import urlencode +from urllib.parse import urlencode from synapse.config import ConfigError diff --git a/synapse/app/_base.py b/synapse/app/_base.py index dedff81af3..373a80a4a7 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -20,6 +20,7 @@ import signal import socket import sys import traceback +from typing import Iterable from daemonize import Daemonize from typing_extensions import NoReturn @@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory import synapse from synapse.app import check_bind_error +from synapse.config.server import ListenerConfig from synapse.crypto import context_factory from synapse.logging.context import PreserveLoggingContext from synapse.util.async_helpers import Linearizer @@ -234,7 +236,7 @@ def refresh_certificate(hs): logger.info("Context factories updated.") -def start(hs, listeners=None): +def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): """ Start a Synapse server or worker. @@ -245,8 +247,8 @@ def start(hs, listeners=None): notify systemd. Args: - hs (synapse.server.HomeServer) - listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml) + hs: homeserver instance + listeners: Listener configuration ('listeners' in homeserver.yaml) """ try: # Set up the SIGHUP machinery. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 53c488d211..27a3fc9ed6 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -37,6 +37,7 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.config.server import ListenerConfig from synapse.federation import send_queue from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.presence import ( @@ -514,13 +515,18 @@ class GenericWorkerSlavedStore( class GenericWorkerServer(HomeServer): DATASTORE_CLASS = GenericWorkerSlavedStore - def _listen_http(self, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - site_tag = listener_config.get("tag", port) + def _listen_http(self, listener_config: ListenerConfig): + port = listener_config.port + bind_addresses = listener_config.bind_addresses + + assert listener_config.http_options is not None + + site_tag = listener_config.http_options.tag + if site_tag is None: + site_tag = port resources = {} - for res in listener_config["resources"]: - for name in res["names"]: + for res in listener_config.http_options.resources: + for name in res.names: if name == "metrics": resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": @@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer): " repository is disabled. Ignoring." ) - if name == "openid" and "federation" not in res["names"]: + if name == "openid" and "federation" not in res.names: # Only load the openid resource separately if federation resource # is not specified since federation resource includes openid # resource. @@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer): logger.info("Synapse worker now listening on port %d", port) - def start_listening(self, listeners): + def start_listening(self, listeners: Iterable[ListenerConfig]): for listener in listeners: - if listener["type"] == "http": + if listener.type == "http": self._listen_http(listener) - elif listener["type"] == "manhole": + elif listener.type == "manhole": _base.listen_tcp( - listener["bind_addresses"], - listener["port"], + listener.bind_addresses, + listener.port, manhole( username="matrix", password="rabbithole", globals={"hs": self} ), ) - elif listener["type"] == "metrics": + elif listener.type == "metrics": if not self.get_config().enable_metrics: logger.warning( ( @@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer): ) ) else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) + _base.listen_metrics(listener.bind_addresses, listener.port) else: - logger.warning("Unrecognized listener type: %s", listener["type"]) + logger.warning("Unsupported listener type: %s", listener.type) self.get_tcp_replication().start_replication(self) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 93bc45208e..299134d00f 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -23,6 +23,7 @@ import math import os import resource import sys +from typing import Iterable from prometheus_client import Gauge @@ -48,6 +49,7 @@ from synapse.app import _base from synapse.app._base import listen_ssl, listen_tcp, quit_with_error from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig +from synapse.config.server import ListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( @@ -87,24 +89,24 @@ def gz_wrap(r): class SynapseHomeServer(HomeServer): DATASTORE_CLASS = DataStore - def _listener_http(self, config, listener_config): - port = listener_config["port"] - bind_addresses = listener_config["bind_addresses"] - tls = listener_config.get("tls", False) - site_tag = listener_config.get("tag", port) + def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig): + port = listener_config.port + bind_addresses = listener_config.bind_addresses + tls = listener_config.tls + site_tag = listener_config.http_options.tag + if site_tag is None: + site_tag = port resources = {} - for res in listener_config["resources"]: - for name in res["names"]: - if name == "openid" and "federation" in res["names"]: + for res in listener_config.http_options.resources: + for name in res.names: + if name == "openid" and "federation" in res.names: # Skip loading openid resource if federation is defined # since federation resource will include openid continue - resources.update( - self._configure_named_resource(name, res.get("compress", False)) - ) + resources.update(self._configure_named_resource(name, res.compress)) - additional_resources = listener_config.get("additional_resources", {}) + additional_resources = listener_config.http_options.additional_resources logger.debug("Configuring additional resources: %r", additional_resources) module_api = ModuleApi(self, self.get_auth_handler()) for path, resmodule in additional_resources.items(): @@ -276,7 +278,7 @@ class SynapseHomeServer(HomeServer): return resources - def start_listening(self, listeners): + def start_listening(self, listeners: Iterable[ListenerConfig]): config = self.get_config() if config.redis_enabled: @@ -286,25 +288,25 @@ class SynapseHomeServer(HomeServer): self.get_tcp_replication().start_replication(self) for listener in listeners: - if listener["type"] == "http": + if listener.type == "http": self._listening_services.extend(self._listener_http(config, listener)) - elif listener["type"] == "manhole": + elif listener.type == "manhole": listen_tcp( - listener["bind_addresses"], - listener["port"], + listener.bind_addresses, + listener.port, manhole( username="matrix", password="rabbithole", globals={"hs": self} ), ) - elif listener["type"] == "replication": + elif listener.type == "replication": services = listen_tcp( - listener["bind_addresses"], - listener["port"], + listener.bind_addresses, + listener.port, ReplicationStreamProtocolFactory(self), ) for s in services: reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) - elif listener["type"] == "metrics": + elif listener.type == "metrics": if not self.get_config().enable_metrics: logger.warning( ( @@ -313,9 +315,11 @@ class SynapseHomeServer(HomeServer): ) ) else: - _base.listen_metrics(listener["bind_addresses"], listener["port"]) + _base.listen_metrics(listener.bind_addresses, listener.port) else: - logger.warning("Unrecognized listener type: %s", listener["type"]) + # this shouldn't happen, as the listener type should have been checked + # during parsing + logger.warning("Unrecognized listener type: %s", listener.type) # Gauges to expose monthly active user control metrics diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 1b13e84425..0323256472 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -15,8 +15,6 @@ import logging import re -from six import string_types - from twisted.internet import defer from synapse.api.constants import EventTypes @@ -156,7 +154,7 @@ class ApplicationService(object): ) regex = regex_obj.get("regex") - if isinstance(regex, string_types): + if isinstance(regex, str): regex_obj["regex"] = re.compile(regex) # Pre-compile regex else: raise ValueError("Expected string for 'regex' in ns '%s'" % ns) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 57174da021..da9a5e86d4 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging - -from six.moves import urllib +import urllib from prometheus_client import Counter diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 30d1050a91..1391e5fc43 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -22,8 +22,6 @@ from collections import OrderedDict from textwrap import dedent from typing import Any, MutableMapping, Optional -from six import integer_types - import yaml @@ -117,7 +115,7 @@ class Config(object): @staticmethod def parse_size(value): - if isinstance(value, integer_types): + if isinstance(value, int): return value sizes = {"K": 1024, "M": 1024 * 1024} size = 1 @@ -129,7 +127,7 @@ class Config(object): @staticmethod def parse_duration(value): - if isinstance(value, integer_types): + if isinstance(value, int): return value second = 1000 minute = 60 * second diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index ca43e96bd1..8ed3e24258 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -14,9 +14,7 @@ import logging from typing import Dict - -from six import string_types -from six.moves.urllib import parse as urlparse +from urllib import parse as urlparse import yaml from netaddr import IPSet @@ -98,17 +96,14 @@ def load_appservices(hostname, config_files): def _load_appservice(hostname, as_info, config_filename): required_string_fields = ["id", "as_token", "hs_token", "sender_localpart"] for field in required_string_fields: - if not isinstance(as_info.get(field), string_types): + if not isinstance(as_info.get(field), str): raise KeyError( "Required string field: '%s' (%s)" % (field, config_filename) ) # 'url' must either be a string or explicitly null, not missing # to avoid accidentally turning off push for ASes. - if ( - not isinstance(as_info.get("url"), string_types) - and as_info.get("url", "") is not None - ): + if not isinstance(as_info.get("url"), str) and as_info.get("url", "") is not None: raise KeyError( "Required string field or explicit null: 'url' (%s)" % (config_filename,) ) @@ -138,7 +133,7 @@ def _load_appservice(hostname, as_info, config_filename): ns, regex_obj, ) - if not isinstance(regex_obj.get("regex"), string_types): + if not isinstance(regex_obj.get("regex"), str): raise ValueError("Missing/bad type 'regex' key in %s", regex_obj) if not isinstance(regex_obj.get("exclusive"), bool): raise ValueError( diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py index e24dd637bc..e0939bce84 100644 --- a/synapse/config/oidc_config.py +++ b/synapse/config/oidc_config.py @@ -89,7 +89,7 @@ class OIDCConfig(Config): # use an OpenID Connect Provider for authentication, instead of its internal # password database. # - # See https://github.com/matrix-org/synapse/blob/master/openid.md. + # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md. # oidc_config: # Uncomment the following to enable authorization against an OpenID Connect diff --git a/synapse/config/server.py b/synapse/config/server.py index 73226e63d5..8204664883 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -19,7 +19,7 @@ import logging import os.path import re from textwrap import indent -from typing import Dict, List, Optional +from typing import Any, Dict, Iterable, List, Optional import attr import yaml @@ -57,6 +57,64 @@ on how to configure the new listener. --------------------------------------------------------------------------------""" +KNOWN_LISTENER_TYPES = { + "http", + "metrics", + "manhole", + "replication", +} + +KNOWN_RESOURCES = { + "client", + "consent", + "federation", + "keys", + "media", + "metrics", + "openid", + "replication", + "static", + "webclient", +} + + +@attr.s(frozen=True) +class HttpResourceConfig: + names = attr.ib( + type=List[str], + factory=list, + validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)), # type: ignore + ) + compress = attr.ib( + type=bool, + default=False, + validator=attr.validators.optional(attr.validators.instance_of(bool)), # type: ignore[arg-type] + ) + + +@attr.s(frozen=True) +class HttpListenerConfig: + """Object describing the http-specific parts of the config of a listener""" + + x_forwarded = attr.ib(type=bool, default=False) + resources = attr.ib(type=List[HttpResourceConfig], factory=list) + additional_resources = attr.ib(type=Dict[str, dict], factory=dict) + tag = attr.ib(type=str, default=None) + + +@attr.s(frozen=True) +class ListenerConfig: + """Object describing the configuration of a single listener.""" + + port = attr.ib(type=int, validator=attr.validators.instance_of(int)) + bind_addresses = attr.ib(type=List[str]) + type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES)) + tls = attr.ib(type=bool, default=False) + + # http_options is only populated if type=http + http_options = attr.ib(type=Optional[HttpListenerConfig], default=None) + + class ServerConfig(Config): section = "server" @@ -379,38 +437,21 @@ class ServerConfig(Config): } ] - self.listeners = [] # type: List[dict] - for listener in config.get("listeners", []): - if not isinstance(listener.get("port", None), int): - raise ConfigError( - "Listener configuration is lacking a valid 'port' option" - ) + self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])] - if listener.setdefault("tls", False): - # no_tls is not really supported any more, but let's grandfather it in - # here. - if config.get("no_tls", False): + # no_tls is not really supported any more, but let's grandfather it in + # here. + if config.get("no_tls", False): + l2 = [] + for listener in self.listeners: + if listener.tls: logger.info( - "Ignoring TLS-enabled listener on port %i due to no_tls" + "Ignoring TLS-enabled listener on port %i due to no_tls", + listener.port, ) - continue - - bind_address = listener.pop("bind_address", None) - bind_addresses = listener.setdefault("bind_addresses", []) - - # if bind_address was specified, add it to the list of addresses - if bind_address: - bind_addresses.append(bind_address) - - # if we still have an empty list of addresses, use the default list - if not bind_addresses: - if listener["type"] == "metrics": - # the metrics listener doesn't support IPv6 - bind_addresses.append("0.0.0.0") else: - bind_addresses.extend(DEFAULT_BIND_ADDRESSES) - - self.listeners.append(listener) + l2.append(listener) + self.listeners = l2 if not self.web_client_location: _warn_if_webclient_configured(self.listeners) @@ -446,43 +487,41 @@ class ServerConfig(Config): bind_host = config.get("bind_host", "") gzip_responses = config.get("gzip_responses", True) + http_options = HttpListenerConfig( + resources=[ + HttpResourceConfig(names=["client"], compress=gzip_responses), + HttpResourceConfig(names=["federation"]), + ], + ) + self.listeners.append( - { - "port": bind_port, - "bind_addresses": [bind_host], - "tls": True, - "type": "http", - "resources": [ - {"names": ["client"], "compress": gzip_responses}, - {"names": ["federation"], "compress": False}, - ], - } + ListenerConfig( + port=bind_port, + bind_addresses=[bind_host], + tls=True, + type="http", + http_options=http_options, + ) ) unsecure_port = config.get("unsecure_port", bind_port - 400) if unsecure_port: self.listeners.append( - { - "port": unsecure_port, - "bind_addresses": [bind_host], - "tls": False, - "type": "http", - "resources": [ - {"names": ["client"], "compress": gzip_responses}, - {"names": ["federation"], "compress": False}, - ], - } + ListenerConfig( + port=unsecure_port, + bind_addresses=[bind_host], + tls=False, + type="http", + http_options=http_options, + ) ) manhole = config.get("manhole") if manhole: self.listeners.append( - { - "port": manhole, - "bind_addresses": ["127.0.0.1"], - "type": "manhole", - "tls": False, - } + ListenerConfig( + port=manhole, bind_addresses=["127.0.0.1"], type="manhole", + ) ) metrics_port = config.get("metrics_port") @@ -490,13 +529,14 @@ class ServerConfig(Config): logger.warning(METRICS_PORT_WARNING) self.listeners.append( - { - "port": metrics_port, - "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")], - "tls": False, - "type": "http", - "resources": [{"names": ["metrics"], "compress": False}], - } + ListenerConfig( + port=metrics_port, + bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")], + type="http", + http_options=HttpListenerConfig( + resources=[HttpResourceConfig(names=["metrics"])] + ), + ) ) _check_resource_config(self.listeners) @@ -522,7 +562,7 @@ class ServerConfig(Config): ) def has_tls_listener(self) -> bool: - return any(listener["tls"] for listener in self.listeners) + return any(listener.tls for listener in self.listeners) def generate_config_section( self, server_name, data_dir_path, open_private_ports, listeners, **kwargs @@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds): ) +def parse_listener_def(listener: Any) -> ListenerConfig: + """parse a listener config from the config file""" + listener_type = listener["type"] + + port = listener.get("port") + if not isinstance(port, int): + raise ConfigError("Listener configuration is lacking a valid 'port' option") + + tls = listener.get("tls", False) + + bind_addresses = listener.get("bind_addresses", []) + bind_address = listener.get("bind_address") + # if bind_address was specified, add it to the list of addresses + if bind_address: + bind_addresses.append(bind_address) + + # if we still have an empty list of addresses, use the default list + if not bind_addresses: + if listener_type == "metrics": + # the metrics listener doesn't support IPv6 + bind_addresses.append("0.0.0.0") + else: + bind_addresses.extend(DEFAULT_BIND_ADDRESSES) + + http_config = None + if listener_type == "http": + http_config = HttpListenerConfig( + x_forwarded=listener.get("x_forwarded", False), + resources=[ + HttpResourceConfig(**res) for res in listener.get("resources", []) + ], + additional_resources=listener.get("additional_resources", {}), + tag=listener.get("tag"), + ) + + return ListenerConfig(port, bind_addresses, listener_type, tls, http_config) + + NO_MORE_WEB_CLIENT_WARNING = """ Synapse no longer includes a web client. To enable a web client, configure web_client_location. To remove this warning, remove 'webclient' from the 'listeners' @@ -1088,40 +1166,27 @@ configuration. """ -def _warn_if_webclient_configured(listeners): +def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None: for listener in listeners: - for res in listener.get("resources", []): - for name in res.get("names", []): + if not listener.http_options: + continue + for res in listener.http_options.resources: + for name in res.names: if name == "webclient": logger.warning(NO_MORE_WEB_CLIENT_WARNING) return -KNOWN_RESOURCES = ( - "client", - "consent", - "federation", - "keys", - "media", - "metrics", - "openid", - "replication", - "static", - "webclient", -) - - -def _check_resource_config(listeners): +def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None: resource_names = { res_name for listener in listeners - for res in listener.get("resources", []) - for res_name in res.get("names", []) + if listener.http_options + for res in listener.http_options.resources + for res_name in res.names } for resource in resource_names: - if resource not in KNOWN_RESOURCES: - raise ConfigError("Unknown listener resource '%s'" % (resource,)) if resource == "consent": try: check_requirements("resources.consent") diff --git a/synapse/config/tls.py b/synapse/config/tls.py index a65538562b..e368ea564d 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -20,8 +20,6 @@ from datetime import datetime from hashlib import sha256 from typing import List -import six - from unpaddedbase64 import encode_base64 from OpenSSL import SSL, crypto @@ -59,7 +57,7 @@ class TlsConfig(Config): logger.warning(ACME_SUPPORT_ENABLED_WARN) # hyperlink complains on py2 if this is not a Unicode - self.acme_url = six.text_type( + self.acme_url = str( acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory") ) self.acme_port = acme_config.get("port", 80) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ed06b91a54..dbc661630c 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -16,6 +16,7 @@ import attr from ._base import Config, ConfigError +from .server import ListenerConfig, parse_listener_def @attr.s @@ -52,7 +53,9 @@ class WorkerConfig(Config): if self.worker_app == "synapse.app.homeserver": self.worker_app = None - self.worker_listeners = config.get("worker_listeners", []) + self.worker_listeners = [ + parse_listener_def(x) for x in config.get("worker_listeners", []) + ] self.worker_daemonize = config.get("worker_daemonize") self.worker_pid_file = config.get("worker_pid_file") self.worker_log_config = config.get("worker_log_config") @@ -75,24 +78,11 @@ class WorkerConfig(Config): manhole = config.get("worker_manhole") if manhole: self.worker_listeners.append( - { - "port": manhole, - "bind_addresses": ["127.0.0.1"], - "type": "manhole", - "tls": False, - } + ListenerConfig( + port=manhole, bind_addresses=["127.0.0.1"], type="manhole", + ) ) - if self.worker_listeners: - for listener in self.worker_listeners: - bind_address = listener.pop("bind_address", None) - bind_addresses = listener.setdefault("bind_addresses", []) - - if bind_address: - bind_addresses.append(bind_address) - elif not bind_addresses: - bind_addresses.append("") - # A map from instance name to host/port of their HTTP replication endpoint. instance_map = config.get("instance_map") or {} self.instance_map = { diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index a9f4025bfe..dbfc3e8972 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -15,11 +15,9 @@ # limitations under the License. import logging +import urllib from collections import defaultdict -import six -from six.moves import urllib - import attr from signedjson.key import ( decode_verify_key_bytes, @@ -661,7 +659,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher): for response in query_response["server_keys"]: # do this first, so that we can give useful errors thereafter server_name = response.get("server_name") - if not isinstance(server_name, six.string_types): + if not isinstance(server_name, str): raise KeyLookupError( "Malformed response from key notary server %s: invalid server_name" % (perspective_name,) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index dd340be9a7..f6b507977f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -16,8 +16,6 @@ import collections import re from typing import Any, Mapping, Union -from six import string_types - from frozendict import frozendict from twisted.internet import defer @@ -318,7 +316,7 @@ def serialize_event( if only_event_fields: if not isinstance(only_event_fields, list) or not all( - isinstance(f, string_types) for f in only_event_fields + isinstance(f, str) for f in only_event_fields ): raise TypeError("only_event_fields must be a list of strings") d = only_fields(d, only_event_fields) diff --git a/synapse/events/validator.py b/synapse/events/validator.py index b001c64bb4..588d222f36 100644 --- a/synapse/events/validator.py +++ b/synapse/events/validator.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six import integer_types, string_types - from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import EventFormatVersions @@ -53,7 +51,7 @@ class EventValidator(object): event_strings = ["origin"] for s in event_strings: - if not isinstance(getattr(event, s), string_types): + if not isinstance(getattr(event, s), str): raise SynapseError(400, "'%s' not a string type" % (s,)) # Depending on the room version, ensure the data is spec compliant JSON. @@ -90,7 +88,7 @@ class EventValidator(object): max_lifetime = event.content.get("max_lifetime") if min_lifetime is not None: - if not isinstance(min_lifetime, integer_types): + if not isinstance(min_lifetime, int): raise SynapseError( code=400, msg="'min_lifetime' must be an integer", @@ -124,7 +122,7 @@ class EventValidator(object): ) if max_lifetime is not None: - if not isinstance(max_lifetime, integer_types): + if not isinstance(max_lifetime, int): raise SynapseError( code=400, msg="'max_lifetime' must be an integer", @@ -183,7 +181,7 @@ class EventValidator(object): strings.append("state_key") for s in strings: - if not isinstance(getattr(event, s), string_types): + if not isinstance(getattr(event, s), str): raise SynapseError(400, "Not '%s' a string type" % (s,)) RoomID.from_string(event.room_id) @@ -223,7 +221,7 @@ class EventValidator(object): for s in keys: if s not in d: raise SynapseError(400, "'%s' not in content" % (s,)) - if not isinstance(d[s], string_types): + if not isinstance(d[s], str): raise SynapseError(400, "'%s' not a string type" % (s,)) def _ensure_state_event(self, event): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index b2ab5bd6a4..420df2385f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -17,8 +17,6 @@ import logging from collections import namedtuple from typing import Iterable, List -import six - from twisted.internet import defer from twisted.internet.defer import Deferred, DeferredList from twisted.python.failure import Failure @@ -294,7 +292,7 @@ def event_from_pdu_json( assert_params_in_dict(pdu_json, ("type", "depth")) depth = pdu_json["depth"] - if not isinstance(depth, six.integer_types): + if not isinstance(depth, int): raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON) if depth < 0: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 6920c23723..afe0a8238b 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -17,8 +17,6 @@ import logging from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union -import six - from canonicaljson import json from prometheus_client import Counter @@ -751,7 +749,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: def _acl_entry_matches(server_name: str, acl_entry: str) -> Match: - if not isinstance(acl_entry, six.string_types): + if not isinstance(acl_entry, str): logger.warning( "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry) ) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 060bf07197..9f99311419 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -15,10 +15,9 @@ # limitations under the License. import logging +import urllib from typing import Any, Dict, Optional -from six.moves import urllib - from twisted.internet import defer from synapse.api.constants import Membership diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 8a9de913b3..8db8ab1b7b 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -17,8 +17,6 @@ import logging -from six import string_types - from synapse.api.errors import Codes, SynapseError from synapse.types import GroupID, RoomID, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute @@ -513,7 +511,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler): for keyname in ("name", "avatar_url", "short_description", "long_description"): if keyname in content: value = content[keyname] - if not isinstance(value, string_types): + if not isinstance(value, str): raise SynapseError(400, "%r value is not a string" % (keyname,)) profile[keyname] = value diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py index 64aaa1335c..76f213723a 100644 --- a/synapse/handlers/cas_handler.py +++ b/synapse/handlers/cas_handler.py @@ -14,11 +14,10 @@ # limitations under the License. import logging +import urllib import xml.etree.ElementTree as ET from typing import Dict, Optional, Tuple -from six.moves import urllib - from twisted.web.client import PartialDownloadError from synapse.api.errors import Codes, LoginError diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 83f8fa1180..31346b56c3 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -691,6 +691,7 @@ class DeviceListUpdater(object): return False + @trace @defer.inlineCallbacks def _maybe_retry_device_resync(self): """Retry to resync device lists that are out of sync, except if another retry is diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 05c4b3eec0..610b08d00b 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -18,8 +18,6 @@ from typing import Any, Dict from canonicaljson import json -from twisted.internet import defer - from synapse.api.errors import SynapseError from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( @@ -51,8 +49,7 @@ class DeviceMessageHandler(object): self._device_list_updater = hs.get_device_handler().device_list_updater - @defer.inlineCallbacks - def on_direct_to_device_edu(self, origin, content): + async def on_direct_to_device_edu(self, origin, content): local_messages = {} sender_user_id = content["sender"] if origin != get_domain_from_id(sender_user_id): @@ -82,11 +79,11 @@ class DeviceMessageHandler(object): } local_messages[user_id] = messages_by_device - yield self._check_for_unknown_devices( + await self._check_for_unknown_devices( message_type, sender_user_id, by_device ) - stream_id = yield self.store.add_messages_from_remote_to_device_inbox( + stream_id = await self.store.add_messages_from_remote_to_device_inbox( origin, message_id, local_messages ) @@ -94,14 +91,13 @@ class DeviceMessageHandler(object): "to_device_key", stream_id, users=local_messages.keys() ) - @defer.inlineCallbacks - def _check_for_unknown_devices( + async def _check_for_unknown_devices( self, message_type: str, sender_user_id: str, by_device: Dict[str, Dict[str, Any]], ): - """Checks inbound device messages for unkown remote devices, and if + """Checks inbound device messages for unknown remote devices, and if found marks the remote cache for the user as stale. """ @@ -115,7 +111,7 @@ class DeviceMessageHandler(object): requesting_device_ids.add(device_id) # Check if we are tracking the devices of the remote user. - room_ids = yield self.store.get_rooms_for_user(sender_user_id) + room_ids = await self.store.get_rooms_for_user(sender_user_id) if not room_ids: logger.info( "Received device message from remote device we don't" @@ -127,7 +123,7 @@ class DeviceMessageHandler(object): # If we are tracking check that we know about the sending # devices. - cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id) + cached_devices = await self.store.get_cached_devices_for_user(sender_user_id) unknown_devices = requesting_device_ids - set(cached_devices) if unknown_devices: @@ -136,15 +132,14 @@ class DeviceMessageHandler(object): sender_user_id, unknown_devices, ) - yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id) + await self.store.mark_remote_user_device_cache_as_stale(sender_user_id) # Immediately attempt a resync in the background run_in_background( self._device_list_updater.user_device_resync, sender_user_id ) - @defer.inlineCallbacks - def send_device_message(self, sender_user_id, message_type, messages): + async def send_device_message(self, sender_user_id, message_type, messages): set_tag("number_of_messages", len(messages)) set_tag("sender", sender_user_id) local_messages = {} @@ -183,7 +178,7 @@ class DeviceMessageHandler(object): } log_kv({"local_messages": local_messages}) - stream_id = yield self.store.add_messages_to_device_inbox( + stream_id = await self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 2efea801bc..f55470a707 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -349,6 +349,7 @@ class E2eRoomKeysHandler(object): raise res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"]) + res["etag"] = str(res["etag"]) return res @trace diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index d6038d9995..873f6bc39f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -19,10 +19,9 @@ import itertools import logging +from http import HTTPStatus from typing import Dict, Iterable, List, Optional, Sequence, Tuple -from six.moves import http_client, zip - import attr from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json @@ -1194,7 +1193,7 @@ class FederationHandler(BaseHandler): ev.event_id, len(ev.prev_event_ids()), ) - raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events") + raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events") if len(ev.auth_event_ids()) > 10: logger.warning( @@ -1202,7 +1201,7 @@ class FederationHandler(BaseHandler): ev.event_id, len(ev.auth_event_ids()), ) - raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events") + raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") async def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. @@ -1545,7 +1544,7 @@ class FederationHandler(BaseHandler): # block any attempts to invite the server notices mxid if event.state_key == self._server_notices_mxid: - raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user") + raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user") # keep a record of the room version, if we don't yet know it. # (this may get overwritten if we later get a different room version in a diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 354da9a3b5..200127d291 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -17,8 +17,6 @@ import logging from typing import Optional, Tuple -from six import string_types - from canonicaljson import encode_canonical_json, json from twisted.internet import defer @@ -715,7 +713,7 @@ class EventCreationHandler(object): spam_error = self.spam_checker.check_event_for_spam(event) if spam_error: - if not isinstance(spam_error, string_types): + if not isinstance(spam_error, str): spam_error = "Spam is not permitted here" raise SynapseError(403, spam_error, Codes.FORBIDDEN) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7fbc229502..da06582d4b 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -15,7 +15,6 @@ # limitations under the License. import logging -from twisted.internet import defer from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership @@ -97,8 +96,7 @@ class PaginationHandler(object): job["longest_max_lifetime"], ) - @defer.inlineCallbacks - def purge_history_for_rooms_in_range(self, min_ms, max_ms): + async def purge_history_for_rooms_in_range(self, min_ms, max_ms): """Purge outdated events from rooms within the given retention range. If a default retention policy is defined in the server's configuration and its @@ -137,7 +135,7 @@ class PaginationHandler(object): include_null, ) - rooms = yield self.store.get_rooms_for_retention_period_in_range( + rooms = await self.store.get_rooms_for_retention_period_in_range( min_ms, max_ms, include_null ) @@ -165,9 +163,9 @@ class PaginationHandler(object): # Figure out what token we should start purging at. ts = self.clock.time_msec() - max_lifetime - stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts) + stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts) - r = yield self.store.get_room_event_before_stream_ordering( + r = await self.store.get_room_event_before_stream_ordering( room_id, stream_ordering, ) if not r: @@ -227,8 +225,7 @@ class PaginationHandler(object): ) return purge_id - @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, token, delete_local_events): + async def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: @@ -237,14 +234,11 @@ class PaginationHandler(object): token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones - - Returns: - Deferred """ self._purges_in_progress_by_room.add(room_id) try: - with (yield self.pagination_lock.write(room_id)): - yield self.storage.purge_events.purge_history( + with await self.pagination_lock.write(room_id): + await self.storage.purge_events.purge_history( room_id, token, delete_local_events ) logger.info("[purge] complete") @@ -282,9 +276,7 @@ class PaginationHandler(object): await self.store.get_room_version_id(room_id) # first check that we have no users in this room - joined = await defer.maybeDeferred( - self.store.is_host_joined, room_id, self._server_name - ) + joined = await self.store.is_host_joined(room_id, self._server_name) if joined: raise SynapseError(400, "Users are still joined to this room") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2e8914be14..d2f25ae12a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,7 +25,7 @@ The methods that define policy are: import abc import logging from contextlib import contextmanager -from typing import Dict, Iterable, List, Set +from typing import Dict, Iterable, List, Set, Tuple from prometheus_client import Counter from typing_extensions import ContextManager @@ -773,7 +773,9 @@ class PresenceHandler(BasePresenceHandler): return False - async def get_all_presence_updates(self, last_id, current_id, limit): + async def get_all_presence_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: """ Gets a list of presence update rows from between the given stream ids. Each row has: @@ -785,10 +787,31 @@ class PresenceHandler(BasePresenceHandler): - last_user_sync_ts(int) - status_msg(int) - currently_active(int) + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ + # TODO(markjh): replicate the unpersisted changes. # This could use the in-memory stores for recent changes. - rows = await self.store.get_all_presence_updates(last_id, current_id, limit) + rows = await self.store.get_all_presence_updates( + instance_name, last_id, current_id, limit + ) return rows def notify_new_event(self): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 302efc1b9a..4b1e3073a8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -15,8 +15,6 @@ import logging -from six import raise_from - from twisted.internet import defer from synapse.api.errors import ( @@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler): ) return result except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to fetch profile"), e) + raise SynapseError(502, "Failed to fetch profile") from e except HttpResponseException as e: raise e.to_synapse_error() @@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler): ignore_backoff=True, ) except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to fetch profile"), e) + raise SynapseError(502, "Failed to fetch profile") from e except HttpResponseException as e: raise e.to_synapse_error() @@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler): ignore_backoff=True, ) except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to fetch profile"), e) + raise SynapseError(502, "Failed to fetch profile") from e except HttpResponseException as e: raise e.to_synapse_error() diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f7401373ca..950a84acd0 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -24,8 +24,6 @@ import string from collections import OrderedDict from typing import Tuple -from six import string_types - from synapse.api.constants import ( EventTypes, JoinRules, @@ -595,7 +593,7 @@ class RoomCreationHandler(BaseHandler): "room_version", self.config.default_room_version.identifier ) - if not isinstance(room_version_id, string_types): + if not isinstance(room_version_id, str): raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON) room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0f7af982f0..27c479da9e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -17,10 +17,9 @@ import abc import logging +from http import HTTPStatus from typing import Dict, Iterable, List, Optional, Tuple -from six.moves import http_client - from synapse import types from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError @@ -361,7 +360,7 @@ class RoomMemberHandler(object): if effective_membership_state == Membership.INVITE: # block any attempts to invite the server notices mxid if target.to_string() == self._server_notices_mxid: - raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user") + raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user") block_invite = False @@ -444,7 +443,7 @@ class RoomMemberHandler(object): is_blocked = await self._is_server_notice_room(room_id) if is_blocked: raise SynapseError( - http_client.FORBIDDEN, + HTTPStatus.FORBIDDEN, "You cannot reject this invite", errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM, ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c7bc14c623..4330abb9f7 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -15,7 +15,7 @@ import logging from collections import namedtuple -from typing import List +from typing import List, Tuple from twisted.internet import defer @@ -259,14 +259,31 @@ class TypingHandler(object): ) async def get_all_typing_updates( - self, last_id: int, current_id: int, limit: int - ) -> List[dict]: - """Get up to `limit` typing updates between the given tokens, earliest - updates first. + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for typing replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ if last_id == current_id: - return [] + return [], current_id, False changed_rooms = self._typing_stream_change_cache.get_all_entities_changed( last_id @@ -280,9 +297,16 @@ class TypingHandler(object): serial = self._room_serials[room_id] if last_id < serial <= current_id: typing = self._room_typing[room_id] - rows.append((serial, room_id, list(typing))) + rows.append((serial, [room_id, list(typing)])) rows.sort() - return rows[:limit] + + limited = False + if len(rows) > limit: + rows = rows[:limit] + current_id = rows[-1][0] + limited = True + + return rows, current_id, limited def get_current_token(self): return self._latest_room_serial diff --git a/synapse/http/client.py b/synapse/http/client.py index 3cef747a4d..8743e9839d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -15,11 +15,9 @@ # limitations under the License. import logging +import urllib from io import BytesIO -from six import raise_from, text_type -from six.moves import urllib - import treq from canonicaljson import encode_canonical_json, json from netaddr import IPAddress @@ -577,7 +575,7 @@ class SimpleHttpClient(object): # This can happen e.g. because the body is too large. raise except Exception as e: - raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e) + raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e return ( length, @@ -638,7 +636,7 @@ def encode_urlencode_args(args): def encode_urlencode_arg(arg): - if isinstance(arg, text_type): + if isinstance(arg, str): return arg.encode("utf-8") elif isinstance(arg, list): return [encode_urlencode_arg(i) for i in arg] diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index f5f917f5ae..c5fc746f2f 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -48,6 +48,9 @@ class MatrixFederationAgent(object): tls_client_options_factory (FederationPolicyForHTTPS|None): factory to use for fetching client tls options, or none to disable TLS. + user_agent (bytes): + The user agent header to use for federation requests. + _srv_resolver (SrvResolver|None): SRVResolver impl to use for looking up SRV records. None to use a default implementation. @@ -61,6 +64,7 @@ class MatrixFederationAgent(object): self, reactor, tls_client_options_factory, + user_agent, _srv_resolver=None, _well_known_resolver=None, ): @@ -78,6 +82,7 @@ class MatrixFederationAgent(object): ), pool=self._pool, ) + self.user_agent = user_agent if _well_known_resolver is None: _well_known_resolver = WellKnownResolver( @@ -87,6 +92,7 @@ class MatrixFederationAgent(object): pool=self._pool, contextFactory=tls_client_options_factory, ), + user_agent=self.user_agent, ) self._well_known_resolver = _well_known_resolver @@ -149,7 +155,7 @@ class MatrixFederationAgent(object): parsed_uri = urllib.parse.urlparse(uri) # We need to make sure the host header is set to the netloc of the - # server. + # server and that a user-agent is provided. if headers is None: headers = Headers() else: @@ -157,6 +163,8 @@ class MatrixFederationAgent(object): if not headers.hasHeader(b"host"): headers.addRawHeader(b"host", parsed_uri.netloc) + if not headers.hasHeader(b"user-agent"): + headers.addRawHeader(b"user-agent", self.user_agent) res = yield make_deferred_yieldable( self._agent.request(method, uri, headers, bodyProducer) diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 7ddfad286d..89a3b041ce 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -23,6 +23,7 @@ import attr from twisted.internet import defer from twisted.web.client import RedirectAgent, readBody from twisted.web.http import stringToDatetime +from twisted.web.http_headers import Headers from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock @@ -78,7 +79,12 @@ class WellKnownResolver(object): """ def __init__( - self, reactor, agent, well_known_cache=None, had_well_known_cache=None + self, + reactor, + agent, + user_agent, + well_known_cache=None, + had_well_known_cache=None, ): self._reactor = reactor self._clock = Clock(reactor) @@ -92,6 +98,7 @@ class WellKnownResolver(object): self._well_known_cache = well_known_cache self._had_valid_well_known_cache = had_well_known_cache self._well_known_agent = RedirectAgent(agent) + self.user_agent = user_agent @defer.inlineCallbacks def get_well_known(self, server_name): @@ -227,6 +234,10 @@ class WellKnownResolver(object): uri = b"https://%s/.well-known/matrix/server" % (server_name,) uri_str = uri.decode("ascii") + headers = { + b"User-Agent": [self.user_agent], + } + i = 0 while True: i += 1 @@ -234,7 +245,9 @@ class WellKnownResolver(object): logger.info("Fetching %s", uri_str) try: response = yield make_deferred_yieldable( - self._well_known_agent.request(b"GET", uri) + self._well_known_agent.request( + b"GET", uri, headers=Headers(headers) + ) ) body = yield make_deferred_yieldable(readBody(response)) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2d47b9ea00..18f6a8fd29 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,11 +17,9 @@ import cgi import logging import random import sys +import urllib from io import BytesIO -from six import raise_from, string_types -from six.moves import urllib - import attr import treq from canonicaljson import encode_canonical_json @@ -199,7 +197,14 @@ class MatrixFederationHttpClient(object): self.reactor = Reactor() - self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory) + user_agent = hs.version_string + if hs.config.user_agent_suffix: + user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix) + user_agent = user_agent.encode("ascii") + + self.agent = MatrixFederationAgent( + self.reactor, tls_client_options_factory, user_agent + ) # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names @@ -432,10 +437,10 @@ class MatrixFederationHttpClient(object): except TimeoutError as e: raise RequestSendFailed(e, can_retry=True) from e except DNSLookupError as e: - raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) + raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: logger.info("Failed to send request: %s", e) - raise_from(RequestSendFailed(e, can_retry=True), e) + raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( request.method, response.code @@ -487,7 +492,7 @@ class MatrixFederationHttpClient(object): # Retry if the error is a 429 (Too Many Requests), # otherwise just raise a standard HttpResponseException if response.code == 429: - raise_from(RequestSendFailed(e, can_retry=True), e) + raise RequestSendFailed(e, can_retry=True) from e else: raise e @@ -998,7 +1003,7 @@ def encode_query_args(args): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, string_types): + if isinstance(vs, str): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] diff --git a/synapse/http/server.py b/synapse/http/server.py index 2487a72171..6aa1dc1f92 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -16,10 +16,10 @@ import collections import html -import http.client import logging import types import urllib +from http import HTTPStatus from io import BytesIO from typing import Awaitable, Callable, TypeVar, Union @@ -188,7 +188,7 @@ def return_html_error( exc_info=(f.type, f.value, f.getTracebackObject()), ) else: - code = http.HTTPStatus.INTERNAL_SERVER_ERROR + code = HTTPStatus.INTERNAL_SERVER_ERROR msg = "Internal server error" logger.error( diff --git a/synapse/http/site.py b/synapse/http/site.py index 167293c46d..cbc37eac6e 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -19,6 +19,7 @@ from typing import Optional from twisted.python.failure import Failure from twisted.web.server import Request, Site +from synapse.config.server import ListenerConfig from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -350,7 +351,7 @@ class SynapseSite(Site): self, logger_name, site_tag, - config, + config: ListenerConfig, resource, server_version_string, *args, @@ -360,7 +361,8 @@ class SynapseSite(Site): self.site_tag = site_tag - proxied = config.get("x_forwarded", False) + assert config.http_options is not None + proxied = config.http_options.x_forwarded self.requestFactory = XForwardedForRequest if proxied else SynapseRequest self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py index fbf570c756..d736ad5b9b 100644 --- a/synapse/logging/formatter.py +++ b/synapse/logging/formatter.py @@ -16,8 +16,7 @@ import logging import traceback - -from six import StringIO +from io import StringIO class LogFormatter(logging.Formatter): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index ecdf1ad69f..a7849cefa5 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -126,7 +126,7 @@ class ModuleApi(object): 'errcode' property for more information on the reason for failure Returns: - Deferred[str]: user_id + defer.Deferred[str]: user_id """ return defer.ensureDeferred( self._hs.get_registration_handler().register_user( @@ -149,10 +149,12 @@ class ModuleApi(object): Returns: defer.Deferred[tuple[str, str]]: Tuple of device ID and access token """ - return self._hs.get_registration_handler().register_device( - user_id=user_id, - device_id=device_id, - initial_display_name=initial_display_name, + return defer.ensureDeferred( + self._hs.get_registration_handler().register_device( + user_id=user_id, + device_id=device_id, + initial_display_name=initial_display_name, + ) ) def record_user_external_id( diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index d57a66a697..dda560b2c2 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -17,12 +17,11 @@ import email.mime.multipart import email.utils import logging import time +import urllib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Iterable, List, TypeVar -from six.moves import urllib - import bleach import jinja2 diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 11032491af..8e0d3a416d 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -18,8 +18,6 @@ import logging import re from typing import Pattern -from six import string_types - from synapse.events import EventBase from synapse.types import UserID from synapse.util.caches import register_cache @@ -131,7 +129,7 @@ class PushRuleEvaluatorForEvent(object): # XXX: optimisation: cache our pattern regexps if condition["key"] == "content.body": body = self._event.content.get("body", None) - if not body: + if not body or not isinstance(body, str): return False return _glob_matches(pattern, body, word_boundary=True) @@ -147,7 +145,7 @@ class PushRuleEvaluatorForEvent(object): return False body = self._event.content.get("body", None) - if not body: + if not body or not isinstance(body, str): return False # Similar to _glob_matches, but do not treat display_name as a glob. @@ -244,7 +242,7 @@ def _flatten_dict(d, prefix=[], result=None): if result is None: result = {} for key, value in d.items(): - if isinstance(value, string_types): + if isinstance(value, str): result[".".join(prefix + [key])] = value.lower() elif hasattr(value, "items"): _flatten_dict(value, prefix=(prefix + [key]), result=result) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 88d203aa44..f6a5458681 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -215,11 +215,9 @@ class PusherPool: try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive - updated_receipts = yield self.store.get_all_updated_receipts( + users_affected = yield self.store.get_users_sent_receipts_between( min_stream_id - 1, max_stream_id ) - # This returns a tuple, user_id is at index 3 - users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8b4312e5a3..d655aba35c 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,11 +66,9 @@ REQUIREMENTS = [ "pymacaroons>=0.13.0", "msgpack>=0.5.2", "phonenumbers>=8.2.0", - "six>=1.10", "prometheus_client>=0.0.18,<0.8.0", - # we use attr.s(slots), which arrived in 16.0.0 - # Twisted 18.7.0 requires attrs>=17.4.0 - "attrs>=17.4.0", + # we use attr.validators.deep_iterable, which arrived in 19.1.0 + "attrs>=19.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 793cef6c26..9caf1e80c1 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -16,12 +16,10 @@ import abc import logging import re +import urllib from inspect import signature from typing import Dict, List, Tuple -from six import raise_from -from six.moves import urllib - from twisted.internet import defer from synapse.api.errors import ( @@ -220,7 +218,7 @@ class ReplicationEndpoint(object): # importantly, not stack traces everywhere) raise e.to_synapse_error() except RequestSendFailed as e: - raise_from(SynapseError(502, "Failed to talk to master"), e) + raise SynapseError(502, "Failed to talk to master") from e return result diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 4acefc8a96..f196eff072 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -264,7 +264,7 @@ class BackfillStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_current_backfill_token), - db_query_to_update_function(store.get_all_new_backfill_event_rows), + store.get_all_new_backfill_event_rows, ) @@ -291,9 +291,7 @@ class PresenceStream(Stream): if hs.config.worker_app is None: # on the master, query the presence handler presence_handler = hs.get_presence_handler() - update_function = db_query_to_update_function( - presence_handler.get_all_presence_updates - ) + update_function = presence_handler.get_all_presence_updates else: # Query master process update_function = make_http_update_function(hs, self.NAME) @@ -318,9 +316,7 @@ class TypingStream(Stream): if hs.config.worker_app is None: # on the master, query the typing handler - update_function = db_query_to_update_function( - typing_handler.get_all_typing_updates - ) + update_function = typing_handler.get_all_typing_updates else: # Query master process update_function = make_http_update_function(hs, self.NAME) @@ -352,7 +348,7 @@ class ReceiptsStream(Stream): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_max_receipt_stream_id), - db_query_to_update_function(store.get_all_updated_receipts), + store.get_all_updated_receipts, ) @@ -367,26 +363,17 @@ class PushRulesStream(Stream): def __init__(self, hs): self.store = hs.get_datastore() + super(PushRulesStream, self).__init__( - hs.get_instance_name(), self._current_token, self._update_function + hs.get_instance_name(), + self._current_token, + self.store.get_all_push_rule_updates, ) def _current_token(self, instance_name: str) -> int: push_rules_token, _ = self.store.get_push_rules_stream_token() return push_rules_token - async def _update_function( - self, instance_name: str, from_token: Token, to_token: Token, limit: int - ): - rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit) - - limited = False - if len(rows) == limit: - to_token = rows[-1][0] - limited = True - - return [(row[0], (row[2],)) for row in rows], to_token, limited - class PushersStream(Stream): """A user has added/changed/removed a pusher diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index fefc8f71fa..e4330c39d6 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -16,9 +16,7 @@ import hashlib import hmac import logging import re - -from six import text_type -from six.moves import http_client +from http import HTTPStatus from synapse.api.constants import UserTypes from synapse.api.errors import Codes, NotFoundError, SynapseError @@ -215,10 +213,7 @@ class UserRestServletV2(RestServlet): await self.store.set_server_admin(target_user, set_admin_to) if "password" in body: - if ( - not isinstance(body["password"], text_type) - or len(body["password"]) > 512 - ): + if not isinstance(body["password"], str) or len(body["password"]) > 512: raise SynapseError(400, "Invalid password") else: new_password = body["password"] @@ -252,7 +247,7 @@ class UserRestServletV2(RestServlet): password = body.get("password") password_hash = None if password is not None: - if not isinstance(password, text_type) or len(password) > 512: + if not isinstance(password, str) or len(password) > 512: raise SynapseError(400, "Invalid password") password_hash = await self.auth_handler.hash(password) @@ -370,10 +365,7 @@ class UserRegisterServlet(RestServlet): 400, "username must be specified", errcode=Codes.BAD_JSON ) else: - if ( - not isinstance(body["username"], text_type) - or len(body["username"]) > 512 - ): + if not isinstance(body["username"], str) or len(body["username"]) > 512: raise SynapseError(400, "Invalid username") username = body["username"].encode("utf-8") @@ -386,7 +378,7 @@ class UserRegisterServlet(RestServlet): ) else: password = body["password"] - if not isinstance(password, text_type) or len(password) > 512: + if not isinstance(password, str) or len(password) > 512: raise SynapseError(400, "Invalid password") password_bytes = password.encode("utf-8") @@ -477,7 +469,7 @@ class DeactivateAccountRestServlet(RestServlet): erase = body.get("erase", False) if not isinstance(erase, bool): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'erase' must be a boolean, if given", Codes.BAD_JSON, ) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index dceb2792fa..c2c9a9c3aa 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -60,10 +60,18 @@ def login_id_thirdparty_from_phone(identifier): Returns: Login identifier dict of type 'm.id.threepid' """ - if "country" not in identifier or "number" not in identifier: + if "country" not in identifier or ( + # The specification requires a "phone" field, while Synapse used to require a "number" + # field. Accept both for backwards compatibility. + "phone" not in identifier + and "number" not in identifier + ): raise SynapseError(400, "Invalid phone-type identifier") - msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"]) + # Accept both "phone" and "number" as valid keys in m.id.phone + phone_number = identifier.get("phone", identifier["number"]) + + msisdn = phone_number_to_msisdn(identifier["country"], phone_number) return {"type": "m.id.thirdparty", "medium": "msisdn", "address": msisdn} diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 7cf007d35e..970fdd5834 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -17,8 +17,6 @@ """ import logging -from six import string_types - from synapse.api.errors import AuthError, SynapseError from synapse.handlers.presence import format_user_presence_state from synapse.http.servlet import RestServlet, parse_json_object_from_request @@ -73,7 +71,7 @@ class PresenceStatusRestServlet(RestServlet): if "status_msg" in content: state["status_msg"] = content.pop("status_msg") - if not isinstance(state["status_msg"], string_types): + if not isinstance(state["status_msg"], str): raise SynapseError(400, "status_msg must be a string.") if content: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 105e0cf4d2..46811abbfa 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -18,8 +18,7 @@ import logging import re from typing import List, Optional - -from six.moves.urllib import parse as urlparse +from urllib import parse as urlparse from canonicaljson import json diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index d4f721b6b9..923bcb9f85 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -15,8 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging - -from six.moves import http_client +from http import HTTPStatus from synapse.api.constants import LoginType from synapse.api.errors import Codes, SynapseError, ThreepidValidationError @@ -321,7 +320,7 @@ class DeactivateAccountRestServlet(RestServlet): erase = body.get("erase", False) if not isinstance(erase, bool): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'erase' must be a boolean, if given", Codes.BAD_JSON, ) @@ -682,7 +681,7 @@ class ThreepidRestServlet(RestServlet): class ThreepidAddRestServlet(RestServlet): - PATTERNS = client_patterns("/account/3pid/add$", releases=(), unstable=True) + PATTERNS = client_patterns("/account/3pid/add$") def __init__(self, hs): super(ThreepidAddRestServlet, self).__init__() @@ -733,7 +732,7 @@ class ThreepidAddRestServlet(RestServlet): class ThreepidBindRestServlet(RestServlet): - PATTERNS = client_patterns("/account/3pid/bind$", releases=(), unstable=True) + PATTERNS = client_patterns("/account/3pid/bind$") def __init__(self, hs): super(ThreepidBindRestServlet, self).__init__() @@ -762,7 +761,7 @@ class ThreepidBindRestServlet(RestServlet): class ThreepidUnbindRestServlet(RestServlet): - PATTERNS = client_patterns("/account/3pid/unbind$", releases=(), unstable=True) + PATTERNS = client_patterns("/account/3pid/unbind$") def __init__(self, hs): super(ThreepidUnbindRestServlet, self).__init__() diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index b9ffe86b2a..141a3f5fac 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -18,8 +18,6 @@ import hmac import logging from typing import List, Union -from six import string_types - import synapse import synapse.api.auth import synapse.types @@ -413,7 +411,7 @@ class RegisterRestServlet(RestServlet): # in sessions. Pull out the username/password provided to us. if "password" in body: password = body.pop("password") - if not isinstance(password, string_types) or len(password) > 512: + if not isinstance(password, str) or len(password) > 512: raise SynapseError(400, "Invalid password") self.password_policy_handler.validate_password(password) @@ -425,10 +423,7 @@ class RegisterRestServlet(RestServlet): desired_username = None if "username" in body: - if ( - not isinstance(body["username"], string_types) - or len(body["username"]) > 512 - ): + if not isinstance(body["username"], str) or len(body["username"]) > 512: raise SynapseError(400, "Invalid username") desired_username = body["username"] @@ -453,7 +448,7 @@ class RegisterRestServlet(RestServlet): access_token = self.auth.get_access_token_from_request(request) - if isinstance(desired_username, string_types): + if isinstance(desired_username, str): result = await self._do_appservice_registration( desired_username, access_token, body ) diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py index f067b5edac..e15927c4ea 100644 --- a/synapse/rest/client/v2_alpha/report_event.py +++ b/synapse/rest/client/v2_alpha/report_event.py @@ -14,9 +14,7 @@ # limitations under the License. import logging - -from six import string_types -from six.moves import http_client +from http import HTTPStatus from synapse.api.errors import Codes, SynapseError from synapse.http.servlet import ( @@ -47,15 +45,15 @@ class ReportEventRestServlet(RestServlet): body = parse_json_object_from_request(request) assert_params_in_dict(body, ("reason", "score")) - if not isinstance(body["reason"], string_types): + if not isinstance(body["reason"], str): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'reason' must be a string", Codes.BAD_JSON, ) if not isinstance(body["score"], int): raise SynapseError( - http_client.BAD_REQUEST, + HTTPStatus.BAD_REQUEST, "Param 'score' must be an integer", Codes.BAD_JSON, ) diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py index 1ddf9997ff..049c16b236 100644 --- a/synapse/rest/consent/consent_resource.py +++ b/synapse/rest/consent/consent_resource.py @@ -16,10 +16,9 @@ import hmac import logging from hashlib import sha256 +from http import HTTPStatus from os import path -from six.moves import http_client - import jinja2 from jinja2 import TemplateNotFound @@ -223,4 +222,4 @@ class ConsentResource(DirectServeResource): ) if not compare_digest(want_mac, userhmac): - raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect") + raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect") diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 3689777266..595849f9d5 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -16,8 +16,7 @@ import logging import os - -from six.moves import urllib +import urllib from twisted.internet import defer from twisted.protocols.basic import FileSender diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 683a79c966..79cb0dddbe 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -17,9 +17,6 @@ import contextlib import logging import os import shutil -import sys - -import six from twisted.internet import defer from twisted.protocols.basic import FileSender @@ -117,12 +114,11 @@ class MediaStorage(object): with open(fname, "wb") as f: yield f, fname, finish except Exception: - t, v, tb = sys.exc_info() try: os.remove(fname) except Exception: pass - six.reraise(t, v, tb) + raise if not finished_called: raise Exception("Finished callback not called") diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index f206605727..f67e0fb3ec 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -24,10 +24,7 @@ import shutil import sys import traceback from typing import Dict, Optional - -import six -from six import string_types -from six.moves import urllib_parse as urlparse +from urllib import parse as urlparse from canonicaljson import json @@ -188,7 +185,7 @@ class PreviewUrlResource(DirectServeResource): # It may be stored as text in the database, not as bytes (such as # PostgreSQL). If so, encode it back before handing it on. og = cache_result["og"] - if isinstance(og, six.text_type): + if isinstance(og, str): og = og.encode("utf8") return og @@ -631,7 +628,7 @@ def _iterate_over_text(tree, *tags_to_ignore): if el is None: return - if isinstance(el, string_types): + if isinstance(el, str): yield el elif el.tag not in tags_to_ignore: # el.text is the text before the first child, so we can immediately diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py index e7e8b8e688..3bfc8d7278 100644 --- a/synapse/server_notices/consent_server_notices.py +++ b/synapse/server_notices/consent_server_notices.py @@ -14,8 +14,6 @@ # limitations under the License. import logging -from six import string_types - from synapse.api.errors import SynapseError from synapse.api.urls import ConsentURIBuilder from synapse.config import ConfigError @@ -118,7 +116,7 @@ def copy_with_str_subst(x, substitutions): Returns: copy of x """ - if isinstance(x, string_types): + if isinstance(x, str): return x % substitutions if isinstance(x, dict): return {k: copy_with_str_subst(v, substitutions) for (k, v) in x.items()} diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 24ce8c4330..a6bb3221ff 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,10 +14,9 @@ # limitations under the License. import itertools import logging +from queue import Empty, PriorityQueue from typing import Dict, List, Optional, Set, Tuple -from six.moves.queue import Empty, PriorityQueue - from twisted.internet import defer from synapse.api.errors import StoreError diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 8a13101f1d..cfd24d2f06 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple from functools import wraps from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple -from six import integer_types, text_type -from six.moves import range - import attr from canonicaljson import json from prometheus_client import Counter @@ -893,8 +890,7 @@ class PersistEventsStore: "received_ts": self._clock.time_msec(), "sender": event.sender, "contains_url": ( - "url" in event.content - and isinstance(event.content["url"], text_type) + "url" in event.content and isinstance(event.content["url"], str) ), } for event, _ in events_and_contexts @@ -1345,10 +1341,10 @@ class PersistEventsStore: ): if ( "min_lifetime" in event.content - and not isinstance(event.content.get("min_lifetime"), integer_types) + and not isinstance(event.content.get("min_lifetime"), int) ) or ( "max_lifetime" in event.content - and not isinstance(event.content.get("max_lifetime"), integer_types) + and not isinstance(event.content.get("max_lifetime"), int) ): # Ignore the event if one of the value isn't an integer. return diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index f54c8b1ee0..62d28f44dc 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -15,8 +15,6 @@ import logging -from six import text_type - from canonicaljson import json from twisted.internet import defer @@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): contains_url = "url" in content if contains_url: - contains_url &= isinstance(content["url"], text_type) + contains_url &= isinstance(content["url"], str) except (KeyError, AttributeError): # If the event is missing a necessary field then # skip over it. diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 213d69100a..a48c7a96ca 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore): "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn ) - def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + async def get_all_new_backfill_event_rows( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for backfill replication stream, including all new + backfilled events and events that have gone from being outliers to not. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_new_backfill_event_rows(txn): sql = ( @@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (-last_id, -current_id, limit)) - new_event_updates = txn.fetchall() + new_event_updates = [(row[0], row[1:]) for row in txn] + limited = False if len(new_event_updates) == limit: upper_bound = new_event_updates[-1][0] + limited = True else: upper_bound = current_id @@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore): " ORDER BY event_stream_ordering DESC" ) txn.execute(sql, (-last_id, -upper_bound)) - new_event_updates.extend(txn.fetchall()) + new_event_updates.extend((row[0], row[1:]) for row in txn) - return new_event_updates + if len(new_event_updates) >= limit: + upper_bound = new_event_updates[-1][0] + limited = True - return self.db.runInteraction( + return new_event_updates, upper_bound, limited + + return await self.db.runInteraction( "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows ) diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py index dab31e0c2d..7574612619 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Tuple + from twisted.internet import defer from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause @@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore): ) txn.execute(sql + clause, [stream_id] + list(args)) - def get_all_presence_updates(self, last_id, current_id, limit): + async def get_all_presence_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for presence replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_presence_updates_txn(txn): sql = """ @@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore): LIMIT ? """ txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + updates = [(row[0], row[1:]) for row in txn] + + upper_bound = current_id + limited = False + if len(updates) >= limit: + upper_bound = updates[-1][0] + limited = True + + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_presence_updates", get_all_presence_updates_txn ) diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index ef8f40959f..f6e78ca590 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -16,7 +16,7 @@ import abc import logging -from typing import Union +from typing import List, Tuple, Union from canonicaljson import json @@ -348,23 +348,53 @@ class PushRulesWorkerStore( results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled return results - def get_all_push_rule_updates(self, last_id, current_id, limit): - """Get all the push rules changes that have happend on the server""" + async def get_all_push_rule_updates( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for push_rules replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_push_rule_updates_txn(txn): - sql = ( - "SELECT stream_id, event_stream_ordering, user_id, rule_id," - " op, priority_class, priority, conditions, actions" - " FROM push_rules_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) + sql = """ + SELECT stream_id, user_id + FROM push_rules_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() + updates = [(stream_id, (user_id,)) for stream_id, user_id in txn] + + limited = False + upper_bound = current_id + if len(updates) == limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_push_rule_updates", get_all_push_rule_updates_txn ) diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py index cebdcd409f..8f5505bd67 100644 --- a/synapse/storage/data_stores/main/receipts.py +++ b/synapse/storage/data_stores/main/receipts.py @@ -16,6 +16,7 @@ import abc import logging +from typing import List, Tuple from canonicaljson import json @@ -24,6 +25,7 @@ from twisted.internet import defer from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore): } return results - def get_all_updated_receipts(self, last_id, current_id, limit=None): + def get_users_sent_receipts_between(self, last_id: int, current_id: int): + """Get all users who sent receipts between `last_id` exclusive and + `current_id` inclusive. + + Returns: + Deferred[List[str]] + """ + if last_id == current_id: return defer.succeed([]) - def get_all_updated_receipts_txn(txn): - sql = ( - "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" - " FROM receipts_linearized" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC" - ) - args = [last_id, current_id] - if limit is not None: - sql += " LIMIT ?" - args.append(limit) - txn.execute(sql, args) + def _get_users_sent_receipts_between_txn(txn): + sql = """ + SELECT DISTINCT user_id FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? + """ + txn.execute(sql, (last_id, current_id)) - return [r[0:5] + (json.loads(r[5]),) for r in txn] + return [r[0] for r in txn] return self.db.runInteraction( + "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn + ) + + async def get_all_updated_receipts( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, list]], int, bool]: + """Get updates for receipts replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + if last_id == current_id: + return [], current_id, False + + def get_all_updated_receipts_txn(txn): + sql = """ + SELECT stream_id, room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + + updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn] + + limited = False + upper_bound = current_id + + if len(updates) == limit: + limited = True + upper_bound = updates[-1][0] + + return updates, upper_bound, limited + + return await self.db.runInteraction( "get_all_updated_receipts", get_all_updated_receipts_txn ) @@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore): room_id, None, update_metrics=False ) - # first handle the Deferred case - if isinstance(res, defer.Deferred): - if res.called: - res = res.result + # first handle the ObservableDeferred case + if isinstance(res, ObservableDeferred): + if res.has_called(): + res = res.get_result() else: res = None diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py index 9b95411fb6..b42c02710a 100644 --- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py +++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py @@ -13,8 +13,6 @@ # limitations under the License. import logging -from six.moves import range - from synapse.config.appservice import load_appservices logger = logging.getLogger(__name__) diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py index 13f49d8060..a8381dc577 100644 --- a/synapse/storage/data_stores/main/search.py +++ b/synapse/storage/data_stores/main/search.py @@ -17,8 +17,6 @@ import logging import re from collections import namedtuple -from six import string_types - from canonicaljson import json from twisted.internet import defer @@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # skip over it. continue - if not isinstance(value, string_types): + if not isinstance(value, str): # If the event body, name or topic isn't a string # then skip over it continue diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index e89f0bffb5..379d758b5d 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -40,8 +40,6 @@ import abc import logging from collections import namedtuple -from six.moves import range - from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 4219018302..f8c776be3f 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -16,8 +16,6 @@ import logging -from six.moves import range - from canonicaljson import json from twisted.internet import defer diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py index b720212e55..5db9f20135 100644 --- a/synapse/storage/data_stores/state/store.py +++ b/synapse/storage/data_stores/state/store.py @@ -17,8 +17,6 @@ import logging from collections import namedtuple from typing import Dict, Iterable, List, Set, Tuple -from six.moves import range - from twisted.internet import defer from synapse.api.constants import EventTypes diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 645a70934c..3be20c866a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -16,6 +16,7 @@ # limitations under the License. import logging import time +from sys import intern from time import monotonic as monotonic_time from typing import ( Any, @@ -29,8 +30,6 @@ from typing import ( TypeVar, ) -from six.moves import intern, range - from prometheus_client import Histogram from twisted.enterprise import adbapi diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 92dfd709bc..ec894a91cb 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -20,8 +20,6 @@ import logging from collections import deque, namedtuple from typing import Iterable, List, Optional, Set, Tuple -from six.moves import range - from prometheus_client import Counter, Histogram from twisted.internet import defer diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index f7af2bca7f..65abf0846e 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -19,8 +19,6 @@ import logging from contextlib import contextmanager from typing import Dict, Sequence, Set, Union -from six.moves import range - import attr from twisted.internet import defer @@ -95,7 +93,7 @@ class ObservableDeferred(object): This returns a brand new deferred that is resolved when the underlying deferred is resolved. Interacting with the returned deferred does not - effect the underdlying deferred. + effect the underlying deferred. """ if not self._result: d = defer.Deferred() diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 2a161bf244..c541bf4579 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -17,8 +17,6 @@ import logging import math from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union -from six import integer_types - from sortedcontainers import SortedDict from synapse.types import Collection @@ -88,7 +86,7 @@ class StreamChangeCache: def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) in integer_types + assert isinstance(stream_pos, int) if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 8b17d1c8b8..6a3f6177b1 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six.moves import queue +import queue from twisted.internet import threads diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 9815bb8667..eab78dd256 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six import binary_type, text_type - from canonicaljson import json from frozendict import frozendict @@ -26,7 +24,7 @@ def freeze(o): if isinstance(o, frozendict): return o - if isinstance(o, (binary_type, text_type)): + if isinstance(o, (bytes, str)): return o try: @@ -41,7 +39,7 @@ def unfreeze(o): if isinstance(o, (dict, frozendict)): return dict({k: unfreeze(v) for k, v in o.items()}) - if isinstance(o, (binary_type, text_type)): + if isinstance(o, (bytes, str)): return o try: diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py index 9bf6a44f75..023beb5ede 100644 --- a/synapse/util/wheel_timer.py +++ b/synapse/util/wheel_timer.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from six.moves import range - class _Entry(object): __slots__ = ["end_key", "queue"] diff --git a/synapse/visibility.py b/synapse/visibility.py index 780927cda1..3dfd4af26c 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -16,8 +16,6 @@ import logging import operator -from six.moves import map - from twisted.internet import defer from synapse.api.constants import EventTypes, Membership |